Managed Service for Apache Flink API 範例程式碼 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Managed Service for Apache Flink API 範例程式碼

本主題包含適用於 Managed Service for Apache Flink 動作的範例請求區塊。

若要使用 JSON 做為 AWS Command Line Interface (AWS CLI) 動作的輸入,請將請求儲存在 JSON 檔案中。然後使用 --cli-input-json 參數將檔案名稱傳遞至動作。

以下範例示範如何將 JSON 檔案用於動作中。

$ aws kinesisanalyticsv2 start-application --cli-input-json file://start.json

如需搭配 使用 JSON 的詳細資訊 AWS CLI,請參閱AWS Command Line Interface 《 使用者指南》中的產生 CLI Skeleton 和 CLI 輸入 JSON 參數

AddApplicationCloudWatchLoggingOption

AddApplicationCloudWatchLoggingOption 動作的下列範例請求程式碼會將 Amazon CloudWatch 日誌選項新增至 Managed Service for Apache Flink 應用程式:

{ "ApplicationName": "MyApplication", "CloudWatchLoggingOption": { "LogStreamARN": "arn:aws:logs:us-east-1:123456789123:log-group:my-log-group:log-stream:My-LogStream" }, "CurrentApplicationVersionId": 2 }

AddApplicationInput

AddApplicationInput 動作的下列範例請求程式碼會將應用程式輸入新增至 Managed Service for Apache Flink 應用程式:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "Input": { "InputParallelism": { "Count": 2 }, "InputSchema": { "RecordColumns": [ { "Mapping": "$.TICKER", "Name": "TICKER_SYMBOL", "SqlType": "VARCHAR(50)" }, { "SqlType": "REAL", "Name": "PRICE", "Mapping": "$.PRICE" } ], "RecordEncoding": "UTF-8", "RecordFormat": { "MappingParameters": { "JSONMappingParameters": { "RecordRowPath": "$" } }, "RecordFormatType": "JSON" } }, "KinesisStreamsInput": { "ResourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" } } }

AddApplicationInputProcessingConfiguration

AddApplicationInputProcessingConfiguration 動作的下列範例請求程式碼會將應用程式輸入處理組態新增至 Managed Service for Apache Flink 應用程式:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "InputId": "2.1", "InputProcessingConfiguration": { "InputLambdaProcessor": { "ResourceARN": "arn:aws:lambda:us-east-1:012345678901:function:MyLambdaFunction" } } }

AddApplicationOutput

AddApplicationOutput 動作的下列範例請求程式碼會將 Kinesis 資料串流作為應用程式輸出新增至 Managed Service for Apache Flink 應用程式:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "Output": { "DestinationSchema": { "RecordFormatType": "JSON" }, "KinesisStreamsOutput": { "ResourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" }, "Name": "DESTINATION_SQL_STREAM" } }

AddApplicationReferenceDataSource

AddApplicationReferenceDataSource 動作的下列範例請求程式碼會將 CSV 應用程式參考資料來源新增至 Managed Service for Apache Flink 應用程式:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 5, "ReferenceDataSource": { "ReferenceSchema": { "RecordColumns": [ { "Mapping": "$.TICKER", "Name": "TICKER", "SqlType": "VARCHAR(4)" }, { "Mapping": "$.COMPANYNAME", "Name": "COMPANY_NAME", "SqlType": "VARCHAR(40)" }, ], "RecordEncoding": "UTF-8", "RecordFormat": { "MappingParameters": { "CSVMappingParameters": { "RecordColumnDelimiter": " ", "RecordRowDelimiter": "\r\n" } }, "RecordFormatType": "CSV" } }, "S3ReferenceDataSource": { "BucketARN": "arn:aws:s3:::amzn-s3-demo-bucket", "FileKey": "TickerReference.csv" }, "TableName": "string" } }

AddApplicationVpcConfiguration

AddApplicationVpcConfiguration 動作的下列範例請求程式碼會將 VPC 組態新增至現有的應用程式:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 9, "VpcConfiguration": { "SecurityGroupIds": [ "sg-0123456789abcdef0" ], "SubnetIds": [ "subnet-0123456789abcdef0" ] } }

CreateApplication

CreateApplication 動作的下列範例請求程式碼會建立 Managed Service for Apache Flink 應用程式:

{ "ApplicationName":"MyApplication", "ApplicationDescription":"My-Application-Description", "RuntimeEnvironment":"FLINK-1_15", "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole", "CloudWatchLoggingOptions":[ { "LogStreamARN":"arn:aws:logs:us-east-1:123456789123:log-group:my-log-group:log-stream:My-LogStream" } ], "ApplicationConfiguration": { "EnvironmentProperties": {"PropertyGroups": [ {"PropertyGroupId": "ConsumerConfigProperties", "PropertyMap": {"aws.region": "us-east-1", "flink.stream.initpos": "LATEST"} }, {"PropertyGroupId": "ProducerConfigProperties", "PropertyMap": {"aws.region": "us-east-1"} }, ] }, "ApplicationCodeConfiguration":{ "CodeContent":{ "S3ContentLocation":{ "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket", "FileKey":"myflink.jar", "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345" } }, "CodeContentType":"ZIPFILE" }, "FlinkApplicationConfiguration":{ "ParallelismConfiguration":{ "ConfigurationType":"CUSTOM", "Parallelism":2, "ParallelismPerKPU":1, "AutoScalingEnabled":true } } } }

CreateApplicationSnapshot

CreateApplicationSnapshot 動作的下列範例請求程式碼會建立應用程式狀態的快照:

{ "ApplicationName": "MyApplication", "SnapshotName": "MySnapshot" }

DeleteApplication

DeleteApplication 動作的下列範例請求程式碼會刪除 Managed Service for Apache Flink 應用程式:

{"ApplicationName": "MyApplication", "CreateTimestamp": 12345678912}

DeleteApplicationCloudWatchLoggingOption

DeleteApplicationCloudWatchLoggingOption 動作的下列範例請求程式碼會從 Managed Service for Apache Flink 應用程式中刪除 Amazon CloudWatch 記錄選項:

{ "ApplicationName": "MyApplication", "CloudWatchLoggingOptionId": "3.1" "CurrentApplicationVersionId": 3 }

DeleteApplicationInputProcessingConfiguration

DeleteApplicationInputProcessingConfiguration 動作的下列範例請求程式碼會從 Managed Service for Apache Flink 應用程式中移除輸入處理組態:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 4, "InputId": "2.1" }

DeleteApplicationOutput

DeleteApplicationOutput 動作的下列範例請求程式碼會從 Managed Service for Apache Flink 應用程式中移除應用程式輸出:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 4, "OutputId": "4.1" }

DeleteApplicationReferenceDataSource

DeleteApplicationReferenceDataSource 動作的下列範例請求程式碼會從 Managed Service for Apache Flink 應用程式中移除應用程式參考資料來源:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 5, "ReferenceId": "5.1" }

DeleteApplicationSnapshot

DeleteApplicationSnapshot 動作的下列範例請求程式碼會刪除應用程式狀態的快照:

{ "ApplicationName": "MyApplication", "SnapshotCreationTimestamp": 12345678912, "SnapshotName": "MySnapshot" }

DeleteApplicationVpcConfiguration

DeleteApplicationVpcConfiguration 動作的下列範例請求程式碼會從應用程式中移除現有的 VPC 組態:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 9, "VpcConfigurationId": "1.1" }

DescribeApplication

DescribeApplication 動作的下列範例請求程式碼會傳回 Managed Service for Apache Flink 應用程式的詳細資訊:

{"ApplicationName": "MyApplication"}

DescribeApplicationSnapshot

DescribeApplicationSnapshot 動作的下列請求程式碼範例會傳回應用程式狀態快照的詳細資訊:

{ "ApplicationName": "MyApplication", "SnapshotName": "MySnapshot" }

DiscoverInputSchema

DiscoverInputSchema 動作的下列請求程式碼範例會從串流來源產生結構描述:

{ "InputProcessingConfiguration": { "InputLambdaProcessor": { "ResourceARN": "arn:aws:lambda:us-east-1:012345678901:function:MyLambdaFunction" } }, "InputStartingPositionConfiguration": { "InputStartingPosition": "NOW" }, "ResourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream", "S3Configuration": { "BucketARN": "string", "FileKey": "string" }, "ServiceExecutionRole": "string" }

DiscoverInputSchema 動作的下列請求程式碼範例會從參考來源產生結構描述:

{ "S3Configuration": { "BucketARN": "arn:aws:s3:::amzn-s3-demo-bucket", "FileKey": "TickerReference.csv" }, "ServiceExecutionRole": "arn:aws:iam::123456789123:role/myrole" }

ListApplications

ListApplications 動作的下列範例請求程式碼會傳回您帳戶中 Managed Service for Apache Flink 應用程式的清單:

{ "ExclusiveStartApplicationName": "MyApplication", "Limit": 50 }

ListApplicationSnapshots

ListApplicationSnapshots 動作的下列請求程式碼範例會傳回應用程式狀態的快照清單:

{"ApplicationName": "MyApplication", "Limit": 50, "NextToken": "aBcDeFgHiJkLmNoPqRsTuVwXyZ0123" }

StartApplication

StartApplication 動作的下列範例請求程式碼會啟動 Managed Service for Apache Flink 應用程式,並從最新的快照 (如有) 載入應用程式狀態:

{ "ApplicationName": "MyApplication", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }

StopApplication

API_StopApplication 動作的下列範例請求程式碼會停止 Managed Service for Apache Flink 應用程式:

{"ApplicationName": "MyApplication"}

UpdateApplication

UpdateApplication 動作的下列範例請求程式碼會更新 Managed Service for Apache Flink 應用程式,以變更應用程式程式碼的位置:

{"ApplicationName": "MyApplication", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "ApplicationCodeConfigurationUpdate": { "CodeContentTypeUpdate": "ZIPFILE", "CodeContentUpdate": { "S3ContentLocationUpdate": { "BucketARNUpdate": "arn:aws:s3:::amzn-s3-demo-bucket", "FileKeyUpdate": "my_new_code.zip", "ObjectVersionUpdate": "2" } } } }