

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 创建并运行适用于 Apache Flink 的托管服务应用程序
<a name="get-started-exercise"></a>

在本练习中，您将创建面向应用程序的适用于 Apache Flink 的托管服务，并将数据流作为源和接收器。

**Topics**
+ [创建两个 Amazon Kinesis 数据流](#get-started-exercise-1)
+ [将示例记录写入输入流](#get-started-exercise-2)
+ [下载并检查 Apache Flink 流式处理 Java 代码](#get-started-exercise-5)
+ [编译应用程序代码](#get-started-exercise-5.5)
+ [上传 Apache Flink 流式处理 Java 代码](#get-started-exercise-6)
+ [创建并运行适用于 Apache Flink 的托管服务](#get-started-exercise-7)

## 创建两个 Amazon Kinesis 数据流
<a name="get-started-exercise-1"></a>

在为本练习创建适用于 Apache Flink 的亚马逊托管服务之前，请创建两个 Kinesis 数据流（`ExampleInputStream` 和 `ExampleOutputStream`）。您的应用程序将这些数据流用于应用程序源和目标流。

可以使用 Amazon Kinesis 控制台或以下 AWS CLI 命令创建这些流。有关控制台说明，请参阅[创建和更新数据流](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)。

**创建数据流 (AWS CLI)**

1. 要创建第一个直播 (`ExampleInputStream`)，请使用以下 Amazon Kinesis 命令`create-stream` AWS CLI 。

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

1. 要创建应用程序用来写入输出的第二个流，请运行同一命令（将流名称更改为 `ExampleOutputStream`）。

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

## 将示例记录写入输入流
<a name="get-started-exercise-2"></a>

在本节中，您使用 Python 脚本将示例记录写入流，以供应用程序处理。

**注意**  
此部分需要 [AWS SDK for Python (Boto)](https://aws.amazon.com/developers/getting-started/python/)。

1. 使用以下内容创建名为 `stock.py` 的文件：

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

1. 在本教程的后面部分，您运行 `stock.py` 脚本，以将数据发送到应用程序。

   ```
   $ python stock.py
   ```

## 下载并检查 Apache Flink 流式处理 Java 代码
<a name="get-started-exercise-5"></a>

此示例的 Java 应用程序代码可从中获得 GitHub。要下载应用程序代码，请执行以下操作：

1. 使用以下命令克隆远程存储库：

   ```
   git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
   ```

1. 导航到 `GettingStarted` 目录。

应用程序代码位于 `CustomSinkStreamingJob.java` 和 `CloudWatchLogSink.java` 文件中。请注意有关应用程序代码的以下信息：
+ 应用程序使用 Kinesis 源从源流中进行读取。以下代码段创建 Kinesis 接收器：

  ```
  return env.addSource(new FlinkKinesisConsumer<>(inputStreamName,
                  new SimpleStringSchema(), inputProperties));
  ```

## 编译应用程序代码
<a name="get-started-exercise-5.5"></a>

在本节中，您使用 Apache Maven 编译器创建应用程序的 Java 代码。有关安装 Apache Maven 和 Java 开发工具包 (JDK) 的信息，请参阅[完成练习的先决条件](tutorial-stock-data.md#setting-up-prerequisites)。

您的 Java 应用程序需要以下组件：
+ 一个[项目对象模型 (pom.xml)](https://maven.apache.org/guides/introduction/introduction-to-the-pom.html) 文件。此文件包含有关应用程序的配置和从属项的信息，包括适用于 Apache Flink 的亚马逊托管服务库。
+ 它是一种 `main` 方法，其中包含应用程序的逻辑。

**注意**  
**要将 Kinesis 连接器用于以下应用程序，您必须下载连接器源代码并构建该连接器，如 [Apache Flink 文档](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/connectors/kinesis.html)中所述。**

**创建并编译应用程序代码**

1. 在您的开发环境中创建 Java/Maven 应用程序。有关创建应用程序的信息，请参阅有关开发环境的文档：
   + [创建您的第一个 Java 项目 (Eclipse Java Neon)](https://help.eclipse.org/neon/index.jsp?topic=%2Forg.eclipse.jdt.doc.user%2FgettingStarted%2Fqs-3.htm)
   + [创建、运行和打包您的第一个 Java 应用程序 (IntelliJ Idea)](https://www.jetbrains.com/help/idea/creating-and-running-your-first-java-application.html)

1. 将以下代码用于名为 `StreamingJob.java` 的文件。

   ```
    
   package com.amazonaws.services.kinesisanalytics;
   
   import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
   import org.apache.flink.api.common.serialization.SimpleStringSchema;
   import org.apache.flink.streaming.api.datastream.DataStream;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
   import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
   
   import java.io.IOException;
   import java.util.Map;
   import java.util.Properties;
   
   public class StreamingJob {
   
       private static final String region = "us-east-1";
       private static final String inputStreamName = "ExampleInputStream";
       private static final String outputStreamName = "ExampleOutputStream";
   
       private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
           Properties inputProperties = new Properties();
           inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
   
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
       }
   
       private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
               throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
                   applicationProperties.get("ConsumerConfigProperties")));
       }
   
       private static FlinkKinesisProducer<String> createSinkFromStaticConfig() {
           Properties outputProperties = new Properties();
           outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           outputProperties.setProperty("AggregationEnabled", "false");
   
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties);
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(),
                   applicationProperties.get("ProducerConfigProperties"));
   
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       public static void main(String[] args) throws Exception {
           // set up the streaming execution environment
           final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * DataStream<String> input = createSourceFromApplicationProperties(env);
            */
   
           DataStream<String> input = createSourceFromStaticConfig(env);
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * input.addSink(createSinkFromApplicationProperties())
            */
   
           input.addSink(createSinkFromStaticConfig());
   
           env.execute("Flink Streaming Java API Skeleton");
       }
   }
   ```

   请注意以下有关上述代码示例的信息：
   + 此文件包含 `main` 方法，它定义应用程序的功能。
   + 您的应用程序使用 `StreamExecutionEnvironment` 对象创建源和接收连接器以访问外部资源。
   + 该应用程序将使用静态属性创建源和接收连接器。要使用动态应用程序属性，请使用 `createSourceFromApplicationProperties` 和 `createSinkFromApplicationProperties` 方法以创建连接器。这些方法读取应用程序的属性来配置连接器。

1. 要使用您的应用程序代码，您将其编译和打包成 JAR 文件。您可以通过两种方式之一编译和打包您的代码：
   + 使用命令行 Maven 工具。在包含 `pom.xml` 文件的目录中通过运行以下命令创建您的 JAR 文件：

     ```
     mvn package
     ```
   + 设置开发环境。有关详细信息，请参阅您的开发环境文档。

   您可以作为 JAR 文件上传您的包，也可以将包压缩为 ZIP 文件并上传。如果您使用创建应用程序 AWS CLI，则需要指定代码内容类型（JAR 或 ZIP）。

1. 如果编译时出错，请验证 `JAVA_HOME` 环境变量设置正确。

如果应用程序成功编译，则创建以下文件：

`target/java-getting-started-1.0.jar`

## 上传 Apache Flink 流式处理 Java 代码
<a name="get-started-exercise-6"></a>

在本节中，您创建 Amazon Simple Storage Service (Amazon S3) 存储桶并上传应用程序代码。

**上传应用程序代码**

1. 打开 Amazon S3 控制台，网址为 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)。

1. 选择 **创建存储桶 **。

1. 在 **存储桶名称** 字段中输入 **ka-app-code-*<username>***。将后缀（如您的用户名）添加到存储桶名称，以使其具有全局唯一性。选择 **下一步**。

1. 在**配置选项**步骤中，让设置保持原样，然后选择**下一步**。

1. 在**设置权限**步骤中，让设置保持原样，然后选择**下一步**。

1. 选择 **创建存储桶 **。

1. 在 Amazon S3 控制台中，选择 **ka-app-code-*<username>*** 存储桶，然后选择**上传**。

1. 在**选择文件**步骤中，选择**添加文件**。导航到您在上一步中创建的 `java-getting-started-1.0.jar` 文件。选择 **下一步**。

1. 在**设置权限**步骤中，让设置保持原样。选择 **下一步**。

1. 在**设置属性**步骤中，让设置保持原样。选择**上传**。

您的应用程序代码现在存储在 Amazon S3 存储桶中，应用程序可以在其中访问代码。

## 创建并运行适用于 Apache Flink 的托管服务
<a name="get-started-exercise-7"></a>

您可以使用控制台或 AWS CLI创建和运行适用于 Apache Flink 的托管服务的应用程序。

**注意**  
当您使用控制台创建应用程序时，系统会为您创建您的 AWS Identity and Access Management (IAM) 和 A CloudWatch mazon Logs 资源。使用创建应用程序时 AWS CLI，可以单独创建这些资源。

**Topics**
+ [创建并运行应用程序（控制台）](#get-started-exercise-7-console)
+ [创建并运行应用程序（AWS CLI）](#get-started-exercise-7-cli)

### 创建并运行应用程序（控制台）
<a name="get-started-exercise-7-console"></a>

按照以下步骤，使用控制台创建、配置、更新和运行应用程序。

#### 创建应用程序
<a name="get-started-exercise-7-console-create"></a>

1. [在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在 Amazon Kinesis 控制面板上，选择**创建分析应用程序**。

1. 在 **Kinesis Analytics – 创建应用程序**页面上，提供应用程序详细信息，如下所示：
   + 对于 **应用程序名称 **，输入 **MyApplication**。
   + 对于**描述**，输入 **My java test app**。
   + 对于 **Runtime (运行时)**，请选择 **Apache Flink 1.6**。

1. 对于**访问权限**，请选择 **创建/更新 IAM 角色 `kinesis-analytics-MyApplication-us-west-2`**。

1. 选择**创建应用程序**。

**注意**  
在使用控制台创建适用于 Apache Flink 的亚马逊托管服务应用程序时，您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的，如下所示：  
策略：`kinesis-analytics-service-MyApplication-us-west-2`
角色：`kinesis-analytics-MyApplication-us-west-2`

#### 编辑 IAM 策略
<a name="get-started-exercise-7-console-iam"></a>

编辑 IAM policy 以添加访问 Kinesis 数据流的权限。

1. 使用 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 打开 IAM 控制台。

1. 选择**策略**。选择控制台在上一部分中为您创建的 **`kinesis-analytics-service-MyApplication-us-west-2`** 策略。

1. 在 **摘要** 页面上，选择 **编辑策略**。选择 **JSON** 选项卡。

1. 将以下策略示例中突出显示的部分添加到策略中。将示例账户 IDs (*012345678901*) 替换为您的账户 ID。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

#### 配置应用程序
<a name="get-started-exercise-7-console-configure"></a>

1. 在**MyApplication**页面上，选择**配置**。

1. 在 **配置应用程序** 页面上，提供 **代码位置**：
   + 对于**Amazon S3 存储桶**，请输入**ka-app-code-*<username>***。
   + **在 Amazon S3 对象的路径**中，输入**java-getting-started-1.0.jar**。

1. 在 **对应用程序的访问权限** 下，对于 **访问权限**，选择 **创建/更新 IAM 角色 `kinesis-analytics-MyApplication-us-west-2`**。

1. 在 **Properties (属性)** 下，对于 **Group ID (组 ID)**，输入 **ProducerConfigProperties**。

1. 输入以下应用程序属性和值：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/streams/latest/dev/get-started-exercise.html)

1. 在 **监控** 下，确保 ** 监控指标级别** 设置为 **应用程序**。

1. 要进行**CloudWatch 日志记录**，请选中 “**启用**” 复选框。

1. 选择**更新**。

**注意**  
当您选择启用 CloudWatch 日志记录时，适用于 Apache Flink 的托管服务会为您创建日志组和日志流。这些资源的名称如下所示：  
日志组：`/aws/kinesis-analytics/MyApplication`
日志流：`kinesis-analytics-log-stream`

#### 运行应用程序
<a name="get-started-exercise-7-console-run"></a>

1. 在**MyApplication**页面上，选择 “**运行**”。确认该操作。

1. 当应用程序正在运行时，请刷新页面。控制台将显示 **Application graph (应用程序图表)**。

#### 停止应用程序
<a name="get-started-exercise-7-console-stop"></a>

在**MyApplication**页面上，选择 “**停止**”。确认该操作。

#### 更新应用程序
<a name="get-started-exercise-7-console-update"></a>

使用控制台，您可以更新应用程序设置，例如应用程序属性、监控设置，或应用程序 JAR 文件的位置和文件名。如果您需要更新应用程序代码，您还可以从 Amazon S3 存储桶重新加载应用程序 JAR。

在**MyApplication**页面上，选择**配置**。更新应用程序设置，然后选择**更新**。

### 创建并运行应用程序（AWS CLI）
<a name="get-started-exercise-7-cli"></a>

在本节中，您将使用创建和运行适用 AWS CLI 于 Apache Flink 的托管服务应用程序。适用于 Apache Flink 的托管服务使用该`kinesisanalyticsv2` AWS CLI 命令为 Apache Flink 应用程序创建托管服务并与之交互。

#### 创建权限策略
<a name="get-started-exercise-7-cli-policy"></a>

首先，使用两个语句创建权限策略：一个语句授予对源流执行 `read` 操作的权限，另一个语句授予对接收器流执行 `write` 操作的权限。然后，将策略附加到 IAM 角色（下一部分中将创建此角色）。因此，在适用于 Apache Flink 的托管服务代入该角色时，服务具有必要的权限从源流进行读取和写入接收器流。

使用以下代码创建 `KAReadSourceStreamWriteSinkStream` 权限策略。将 `username` 替换为您用于创建 Amazon S3 存储桶来存储应用程序代码的用户名。将 Amazon 资源名称 (ARNs) (`012345678901`) 中的账户 ID 替换为您的账户 ID。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:GetObjectVersion"
            ],
            "Resource": ["arn:aws:s3:::ka-app-code-username",
                "arn:aws:s3:::ka-app-code-username/*"
            ]
        },
        {
            "Sid": "ReadInputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
        },
        {
            "Sid": "WriteOutputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
        }
    ]
}
```

------

有关创建权限策略的 step-by-step说明，请参阅 *IAM 用户指南*中的[教程：创建并附加您的第一个客户托管策略](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_managed-policies.html#part-two-create-policy)。

**注意**  
要访问其他 AWS 服务，可以使用 适用于 Java 的 AWS SDK。Managed Service for Apache Flink 会自动将软件开发工具包所需的证书设置为与您的应用程序关联的服务执行 IAM 角色的证书。无需执行其他步骤。

#### 创建 IAM 角色
<a name="get-started-exercise-7-cli-role"></a>

在本节中，您将创建一个 IAM 角色，适用于 Apache Flink 的托管服务可以代入此角色来读取源流和写入接收器流。

权限不足时，适用于 Apache Flink 的托管服务无法访问您的串流。您通过 IAM 角色授予这些权限。每个 IAM 角色附加了两种策略。此信任策略授予适用于 Apache Flink 的托管服务代入该角色的权限，权限策略确定适用于 Apache Flink 的托管服务代入这个角色后可以执行的操作。

您将在上一部分中创建的权限策略附加到此角色。

**创建 IAM 角色**

1. 使用 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 打开 IAM 控制台。

1. 在导航窗格中，选择 **角色** 和 **创建角色**。

1. 在 **选择受信任实体的类型** 下，选择 **AWS 服务**。在 **选择将使用此角色的服务** 下，选择 **Kinesis**。在**选择您的使用案例**下，选择 **Kinesis Analytics**。

   选择**下一步: 权限**。

1. 在 **附加权限策略** 页面上，选择 **下一步: 审核**。在创建角色后，您可以附加权限策略。

1. 在 **创建角色** 页面上，输入**KA-stream-rw-role**作为**角色名称**。选择 **创建角色**。

   现在，您已经创建了一个名为 `KA-stream-rw-role` 的新 IAM 角色。接下来，您更新角色的信任和权限策略。

1. 将权限策略附加到角色。
**注意**  
对于本练习，适用于 Apache Flink 的托管服务代入此角色，以便同时从 Kinesis 数据流（源）读取数据和将输出写入另一个 Kinesis 数据流。因此，您附加在上一步（[创建权限策略](#get-started-exercise-7-cli-policy)）中创建的策略。

   1. 在 **摘要** 页上，选择 **权限** 选项卡。

   1. 选择**附加策略**。

   1. 在搜索框中，输入 **KAReadSourceStreamWriteSinkStream**（您在上一部分中创建的策略）。

   1. 选择**KAReadInputStreamWriteOutputStream**策略，然后选择**附加策略**。

现在，您已经创建了应用程序用来访问资源的服务执行角色。记下新角色的 ARN。

有关创建角色的 step-by-step说明，请参阅 [IAM *用户指南中的创建 IAM* 角色（控制台）](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user.html#roles-creatingrole-user-console)。

#### 创建适用于 Apache Flink 的托管服务应用程序
<a name="get-started-exercise-7-cli-create"></a>

1. 将以下 JSON 代码保存到名为 `create_request.json` 的文件中。将示例角色 ARN 替换为您之前创建的角色的 ARN。将存储桶 ARN 后缀 (`username`) 替换为在前一部分中选择的后缀。将服务执行角色中的示例账户 ID (`012345678901`) 替换为您的账户 ID。

   ```
   {
       "ApplicationName": "test",
       "ApplicationDescription": "my java test app",
       "RuntimeEnvironment": "FLINK-1_6",
       "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role",
       "ApplicationConfiguration": {
           "ApplicationCodeConfiguration": {
               "CodeContent": {
                   "S3ContentLocation": {
                       "BucketARN": "arn:aws:s3:::ka-app-code-username",
                       "FileKey": "java-getting-started-1.0.jar"
                   }
               },
               "CodeContentType": "ZIPFILE"
           },
           "EnvironmentProperties":  { 
            "PropertyGroups": [ 
               { 
                  "PropertyGroupId": "ProducerConfigProperties",
                  "PropertyMap" : {
                       "flink.stream.initpos" : "LATEST",
                       "aws.region" : "us-west-2",
                       "AggregationEnabled" : "false"
                  }
               },
               { 
                  "PropertyGroupId": "ConsumerConfigProperties",
                  "PropertyMap" : {
                       "aws.region" : "us-west-2"
                  }
               }
            ]
         }
       }
   }
   ```

1. 使用上述请求执行 [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html) 操作来创建应用程序：

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
   ```

应用程序现已创建。您在下一步中启动应用程序。

#### 启动应用程序
<a name="get-started-exercise-7-cli-start"></a>

在本节中，您使用 [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html) 操作来启动应用程序。

**启动应用程序**

1. 将以下 JSON 代码保存到名为 `start_request.json` 的文件中。

   ```
   {
       "ApplicationName": "test",
       "RunConfiguration": {
           "ApplicationRestoreConfiguration": { 
            "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
            }
       }
   }
   ```

1. 使用上述请求执行 [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html) 操作来启动应用程序：

   ```
   aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
   ```

应用程序正在运行。您可以在亚马逊 CloudWatch 控制台上查看托管服务的 Apache Flink 指标，以验证应用程序是否正常运行。

#### 停止应用程序
<a name="get-started-exercise-7-cli-stop"></a>

在本节中，您使用 [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html) 操作来停止应用程序。

**停止应用程序**

1. 将以下 JSON 代码保存到名为 `stop_request.json` 的文件中。

   ```
   {"ApplicationName": "test"
   }
   ```

1. 使用下面的请求执行 [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html) 操作来停止应用程序：

   ```
   aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
   ```

应用程序现已停止。