

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# AWS Glue Concetti di streaming
<a name="glue-streaming-concepts"></a>

 Le seguenti sezioni forniscono informazioni sui concetti di AWS Glue Streaming. 

**Topics**
+ [Anatomia di un lavoro AWS Glue in streaming](#glue-streaming-anatomy)

## Anatomia di un lavoro AWS Glue in streaming
<a name="glue-streaming-anatomy"></a>

 AWS Glue i lavori di streaming si basano sul paradigma dello streaming Spark e sfruttano lo streaming strutturato del framework Spark. I processi di streaming effettuano costantemente il polling dall'origine dati di streaming a un intervallo di tempo specifico per recuperare i record sotto forma di microbatch. Le seguenti sezioni esaminano le diverse parti di un processo di streaming. AWS Glue 

![La schermata mostra un registro di Amazon CloudWatch monitoraggio, AWS Glue per l'esempio fornito sopra, che esamina il numero di esecutori necessari (linea arancione) e ridimensiona gli esecutori (linea blu) in modo che corrispondano a tale numero senza bisogno di regolazioni manuali.](http://docs.aws.amazon.com/it_it/glue/latest/dg/images/glue-streaming-anatomy.png)


### forEachBatch
<a name="glue-streaming-anatomy-batch"></a>

 Il `forEachBatch` metodo è il punto di ingresso dell'esecuzione di un processo di streaming. AWS Glue AWS Glue streaming jobs utilizza il `forEachBatch` metodo per eseguire il polling dei dati funzionando come un iteratore che rimane attivo durante il ciclo di vita del processo di streaming e interroga regolarmente la fonte di streaming alla ricerca di nuovi dati ed elabora i dati più recenti in microbatch. 

```
glueContext.forEachBatch(
    frame=dataFrame_AmazonKinesis_node1696872487972,    
    batch_function=processBatch,
    options={
        "windowSize": "100 seconds",
        "checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/",
    },
)
```

 Configura la proprietà `frame` di `forEachBatch` per specificare un'origine di streaming. In questo esempio, il nodo di origine creato nell'area di disegno vuota durante la creazione del lavoro viene popolato con l'impostazione predefinita del lavoro. DataFrame Imposta la proprietà `batch_function` come `function` che decidi di richiamare per ogni operazione di microbatch. Per gestire la trasformazione in batch sui dati in entrata è necessario definire una funzione. 

### Origine
<a name="glue-streaming-anatomy-source"></a>

 Nella prima fase della `processBatch` funzione, il programma verifica il conteggio dei DataFrame record definiti come proprietà frame di. `forEachBatch` Il programma aggiunge un timestamp di inserimento a un valore non vuoto. DataFrame La clausola `data_frame.count()>0` determina se l'ultimo microbatch non è vuoto ed è pronto per un'ulteriore elaborazione. 

```
def processBatch(data_frame, batchId):
    if data_frame.count() >0:       
        AmazonKinesis_node1696872487972 = DynamicFrame.fromDF(
            glueContext.add_ingestion_time_columns(data_frame, "hour"),
            glueContext,
            "from_data_frame",
        )
```

### Mapping
<a name="glue-streaming-anatomy-mapping"></a>

 La sezione successiva del programma consiste nell'applicare la mappatura. Il `Mapping.apply` metodo su Spark DataFrame consente di definire una regola di trasformazione relativa agli elementi di dati. In genere è possibile rinominare, modificare il tipo di dati o applicare una funzione personalizzata alla colonna di dati di origine e mapparli alle colonne di destinazione. 

```
    #Script generated for node ChangeSchema        
    ChangeSchema_node16986872679326 = ApplyMapping.apply(
        frame =  AmazonKinesis_node1696872487972,
        mappings = [
            ("eventtime", "string", "eventtime", "string"),
            ("manufacturer", "string", "manufacturer", "string"),
            ("minutevolume", "long", "minutevolume", "int"),
            ("o2stats", "long", "OxygenSaturation", "int"),
            ("pressurecontrol", "long", "pressurecontrol", "int"),
            ("serialnumber", "string", "serialnumber", "string"),
            ("ventilatorid", "long", "ventilatorid", "long"),
            ("ingest_year", "string", "ingest_year", "string"),
            ("ingest_month", "string", "ingest_month", "string"),
            ("ingest_day", "string", "ingest_day", "string"),
            ("ingest_hour", "string", "ingest_hour", "string"),
        ],
        transformation_ctx="ChangeSchema_node16986872679326",
    )
        )
```

### Sink
<a name="glue-streaming-anatomy-sink"></a>

 In questa sezione, il set di dati in entrata dall'origine di streaming viene archiviato in una posizione di destinazione. In questo esempio scriveremo i dati in una posizione Amazon S3. I dettagli della proprietà `AmazonS3_node_path` sono precompilati in base alle impostazioni utilizzate durante la creazione del processo dal canvas. È possibile impostare `updateBehavior` in base al proprio caso d'uso e decidere di non aggiornare la tabella del Catalogo dati, creare il Catalogo dati e aggiornare il relativo schema nelle esecuzioni successive oppure creare una tabella di catalogo e non aggiornare la definizione dello schema nelle esecuzioni successive. 

 La proprietà `partitionKeys` definisce l'opzione della partizione di archiviazione. Il comportamento predefinito consiste nel partizionare i dati in base al valore `ingestion_time_columns` fornito nella sezione di origine. La proprietà `compression` consente di impostare l'algoritmo di compressione da applicare durante la scrittura della destinazione. È possibile impostare la tecnica di compressione su Snappy, LZO o GZIP. La proprietà `enableUpdateCatalog` controlla se la tabella del catalogo AWS Glue deve essere aggiornata. Le opzioni disponibili per questa proprietà sono `True` o `False`. 

```
    #Script generated for node Amazon S3        
    AmazonS3_node1696872743449 = glueContext.getSink(
        path =  AmazonS3_node1696872743449_path,
        connection_type = "s3",
        updateBehavior = "UPDATE_IN_DATABASE",
        partitionKeys = ["ingest_year", "ingest_month", "ingest_day", "ingest_hour"],
        compression = "snappy",
        enableUpdateCatalog = True,
        transformation_ctx = "AmazonS3_node1696872743449",
    )
```

### AWS Glue Lavello da catalogo
<a name="glue-streaming-anatomy-catalog-sink"></a>

 Questa sezione del lavoro controlla il comportamento di aggiornamento della tabella del AWS Glue catalogo. Imposta `catalogDatabase` una `catalogTableName` proprietà in base al nome del database del AWS Glue catalogo e al nome della tabella associata al AWS Glue lavoro che stai progettando. È possibile definire il formato di file dei dati di destinazione tramite la proprietà `setFormat`. Per questo esempio, i dati verranno archiviati in formato Parquet. 

 Una volta configurato ed eseguito il processo di AWS Glue streaming che fa riferimento a questo tutorial, i dati di streaming prodotti Amazon Kinesis Data Streams verranno archiviati nella sede di Amazon S3 in un formato parquet con compressione rapida. Una volta eseguito correttamente il processo di streaming, potrai interrogare i dati tramite Amazon Athena. 

```
       
    AmazonS3_node1696872743449 = setCatalogInfo(
        catalogDatabase =  "demo", catalogTableName = "demo_stream_transform_result"
    )
    AmazonS3_node1696872743449.setFormat("glueparquet")
    AmazonS3_node1696872743449.writeFormat("ChangeSchema_node16986872679326")
    )
```