

# AWS Glue Streaming の概念
<a name="glue-streaming-concepts"></a>

 以下のセクションでは、AWS Glue Streaming の概念について説明します。

**Topics**
+ [AWS Glue Streaming ジョブの構造](#glue-streaming-anatomy)

## AWS Glue Streaming ジョブの構造
<a name="glue-streaming-anatomy"></a>

 AWS Glue Streaming ジョブは Spark ストリーミングパラダイムで動作し、Spark フレームワークの構造化されたストリーミングを活用します。ストリーミングジョブは、特定の時間間隔で常にストリーミングデータソースをポーリングし、レコードをマイクロバッチとして取得します。以下のセクションでは、AWS Glue Streaming ジョブのさまざまな部分について説明します。

![このスクリーンショットは、上記の例の Amazon CloudWatch モニタリングのログの、AWS Glue を示しています。必要なエグゼキュターの数 (オレンジ色の線) を参照し、手動で調整する必要なく、それに合わせてエグゼキュターをスケールします (青色の線)。](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/glue-streaming-anatomy.png)


### forEachBatch
<a name="glue-streaming-anatomy-batch"></a>

 `forEachBatch` メソッドは、AWS Glue Streaming ジョブ実行のエントリポイントです。AWS Glue Streaming ジョブは、`forEachBatch` メソッドを使用して、イテレーターのように機能するデータをポーリングします。このデータはストリーミングジョブのライフサイクル中ずっとアクティブなままです。新しいデータがないかストリーミングソースを定期的にポーリングして、最新のデータをマイクロバッチで処理します。

```
glueContext.forEachBatch(
    frame=dataFrame_AmazonKinesis_node1696872487972,    
    batch_function=processBatch,
    options={
        "windowSize": "100 seconds",
        "checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/",
    },
)
```

 `forEachBatch` の `frame` プロパティを設定してストリーミングソースを指定します。この例では、ジョブの作成中に空白のキャンバスに作成したソースノードに、ジョブのデフォルトの DataFrame が入力されます。`batch_function` プロパティを、マイクロバッチ操作ごとに呼び出す `function` として設定します。受信データのバッチ変換を処理する関数を定義する必要があります。

### ソース
<a name="glue-streaming-anatomy-source"></a>

 `processBatch` 関数の最初のステップで、プログラムは、`forEachBatch` のフレームプロパティとして定義した DataFrame のレコード数を検証します。プログラムは、空でない DataFrame に取り込みタイムスタンプを追加します。`data_frame.count()>0` 句は、最新のマイクロバッチが空ではなく、さらに処理できる状態にあるかどうかを判断します。

```
def processBatch(data_frame, batchId):
    if data_frame.count() >0:       
        AmazonKinesis_node1696872487972 = DynamicFrame.fromDF(
            glueContext.add_ingestion_time_columns(data_frame, "hour"),
            glueContext,
            "from_data_frame",
        )
```

### マッピング
<a name="glue-streaming-anatomy-mapping"></a>

 プログラムの次のセクションでは、マッピングを適用します。Spark DataFrame で `Mapping.apply` メソッドを使用すると、データ要素に関する変換ルールを定義できます。通常、名前を変更したり、データ型を変更したり、ソースデータ列でカスタム関数を適用してターゲット列にマッピングしたりすることができます。

```
    #Script generated for node ChangeSchema        
    ChangeSchema_node16986872679326 = ApplyMapping.apply(
        frame =  AmazonKinesis_node1696872487972,
        mappings = [
            ("eventtime", "string", "eventtime", "string"),
            ("manufacturer", "string", "manufacturer", "string"),
            ("minutevolume", "long", "minutevolume", "int"),
            ("o2stats", "long", "OxygenSaturation", "int"),
            ("pressurecontrol", "long", "pressurecontrol", "int"),
            ("serialnumber", "string", "serialnumber", "string"),
            ("ventilatorid", "long", "ventilatorid", "long"),
            ("ingest_year", "string", "ingest_year", "string"),
            ("ingest_month", "string", "ingest_month", "string"),
            ("ingest_day", "string", "ingest_day", "string"),
            ("ingest_hour", "string", "ingest_hour", "string"),
        ],
        transformation_ctx="ChangeSchema_node16986872679326",
    )
        )
```

### シンク
<a name="glue-streaming-anatomy-sink"></a>

 このセクションでは、ストリーミングソースから受信したデータセットがターゲットの場所に保存されます。この例では、Amazon S3 の場所にデータを書き込みます。`AmazonS3_node_path` プロパティの詳細は、キャンバスからのジョブ作成時に使用した設定に基づいて事前入力されます。ユースケースに基づいて `updateBehavior` を設定し、データカタログテーブルを更新しないか、それ以降の実行時にデータカタログを作成してデータカタログスキーマを更新するか、カタログテーブルを作成してそれ以降の実行時にスキーマ定義を更新しないかを決定できます。

 `partitionKeys` プロパティは、ストレージパーティションオプションを定義します。デフォルトの動作では、ソースセクションで使用できるようになった `ingestion_time_columns` ごとにデータをパーティション化します。`compression` プロパティでは、ターゲットへの書き込み時に適用される圧縮アルゴリズムを設定できます。圧縮方式として Snappy、LZO、または GZIP を設定するオプションがあります。`enableUpdateCatalog` プロパティは、AWS Glue カタログテーブルを更新する必要があるかどうかを制御します。このプロパティで使用できるオプションは `True` または `False` です。

```
    #Script generated for node Amazon S3        
    AmazonS3_node1696872743449 = glueContext.getSink(
        path =  AmazonS3_node1696872743449_path,
        connection_type = "s3",
        updateBehavior = "UPDATE_IN_DATABASE",
        partitionKeys = ["ingest_year", "ingest_month", "ingest_day", "ingest_hour"],
        compression = "snappy",
        enableUpdateCatalog = True,
        transformation_ctx = "AmazonS3_node1696872743449",
    )
```

### AWS Glue カタログシンク
<a name="glue-streaming-anatomy-catalog-sink"></a>

 ジョブのこのセクションでは、AWS Glue カタログテーブルの更新動作を制御します。設計中の AWS Glue ジョブに関連する AWS Glue カタログデータベース名とテーブル名ごとに `catalogDatabase` プロパティと `catalogTableName` プロパティを設定します。`setFormat` プロパティを使用してターゲットデータのファイル形式を定義できます。この例では、データを parquet 形式で保存します。

 このチュートリアルを参照して AWS Glue Streaming ジョブを設定して実行すると、Amazon Kinesis Data Streams で生成されたストリーミングデータは、Snappy 圧縮方式、parquet 形式で、Amazon S3 の場所に保存されます。ストリーミングジョブが正常に実行されると、Amazon Athena でデータをクエリできるようになります。

```
       
    AmazonS3_node1696872743449 = setCatalogInfo(
        catalogDatabase =  "demo", catalogTableName = "demo_stream_transform_result"
    )
    AmazonS3_node1696872743449.setFormat("glueparquet")
    AmazonS3_node1696872743449.writeFormat("ChangeSchema_node16986872679326")
    )
```