

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 将 Studio 笔记本与 Managed Service for Apache Flink 结合使用
<a name="how-notebook"></a>

适用于 Managed Service for Apache Flink的 Studio 笔记本允许您以交互方式实时查询数据流，并使用标准 SQL、Python 和 Scala 轻松构建和运行流处理应用程序。只需在 AWS 管理控制台中单击几下，即可启动无服务器笔记本来查询数据流并在几秒钟内获得结果。

笔记本是一个基于 Web 的开发环境。借助笔记本，您可以获得简单的交互式开发体验以及 Apache Flink 提供的高级功能。Studio 笔记本使用由 [Apache Zeppelin](https://zeppelin.apache.org/) 提供支持的笔记本，并使用 A [pache Flink](https://flink.apache.org/) 作为流处理引擎。Studio 笔记本无缝结合了这些技术，使所有技能组合的开发人员都可以对数据流进行高级分析。

Apache Zeppelin 为您的 Studio 笔记本提供了一整套分析工具，包括：
+ 数据可视化
+ 将数据导出到文件
+ 控制输出格式以便于分析

要开始使用Managed Service for Apache Flink 和 Apache Zeppelin，请参阅。[教程：在 Managed Service for Apache Flink 中创建 Studio 笔记本](example-notebook.md)有关 Apache Zeppelin 的更多信息，请参阅 [Apache Zeppelin 文档](http://zeppelin.apache.org)。

 使用笔记本，你可以使用 [SQL、Python 或 Scala 中的 Apache Flink Table API 和](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/overview/) SQL，或者在 Scala 中使用 [DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/) 对查询进行建模。只需点击几下，您就可以将 Studio 笔记本升级为持续运行、非交互式、Managed Service for Apache Flink 流处理应用程序，用于您的生产工作负载。

**Topics**
+ [使用正确的 Studio 笔记本运行时版本](studio-notebook-versions.md)
+ [创建 Studio 笔记本](how-zeppelin-creating.md)
+ [对流数据执行交互式分析](how-zeppelin-interactive.md)
+ [作为具有持久状态的应用程序进行部署](how-notebook-durable.md)
+ [IAM 权限](how-zeppelin-iam.md)
+ [使用连接器和依赖关系](how-zeppelin-connectors.md)
+ [用户定义的函数](how-zeppelin-udf.md)
+ [启用检查点](how-zeppelin-checkpoint.md)
+ [升级 Studio 运行时](upgrading-studio-runtime.md)
+ [与... 一起工作 AWS Glue](how-zeppelin-glue.md)
+ [Managed Service for Apache Flink 中的 Studio 笔记本示例和教程](how-zeppelin-examples.md)
+ [Managed Service for Apache Flink 的 Studio 笔记本](how-zeppelin-troubleshooting.md)
+ [为 Managed Service for Apache Flink 的 Studio 笔记本创建自定义 IAM 策略](how-zeppelin-appendix-iam.md)

# 使用正确的 Studio 笔记本运行时版本
<a name="studio-notebook-versions"></a>

通过将 Amazon Managed Service for Apache Flink 与 Studio 结合使用，您可以在交互式笔记本中实时查询数据流，并使用标准 SQL、Python 和 Scala 轻松构建和运行流处理应用程序。Studio 笔记本由 [Apache Zeppelin](https://zeppelin.apache.org/) 提供支持，并使用 [Apache Flink](https://flink.apache.org/) 作为流处理引擎。

**注意**  
我们将于 **2024 年 11 月 5 日弃用带有 Apache Flink 版本 1.11 的 Studio 运行时**。从此日期起，您将无法运行新的笔记本，也无法使用此版本创建新应用程序。我们建议您在此日期之前升级到最新的运行时（Apache Flink 1.15 和 Apache Zeppelin 0.10）。有关如何升级笔记本的指南，请参阅 [升级 Studio 运行时](upgrading-studio-runtime.md)。


**Studio 运行时**  

| Apache Flink 版本 | Apache Zeppelin 版本 | Python 版本 |  | 
| --- | --- | --- | --- | 
| 1.15 | 0.1 | 3.8 | 推荐 | 
| 1.13 | 0.9 | 3.8 | 在 2024 年 10 月 16 日之前支持 | 
| 1.11 | 0.9 | 3.7 | 于 2025 年 2 月 24 日弃用 | 

# 创建 Studio 笔记本
<a name="how-zeppelin-creating"></a>

Studio 笔记本包含用 SQL、Python 或 Scala 编写的查询或程序，这些查询或程序在流数据上运行并返回分析结果。您可以使用控制台或 CLI 创建应用程序，并提供用于分析数据源数据的查询。

您的应用程序具有以下组件：
+ 数据源，例如Amazon MSK 集群、Kinesis 数据流或 Amazon S3 存储桶。
+ 一个 AWS Glue 数据库。此数据库包含用于存储您的数据源、目标架构和端点的表。有关更多信息，请参阅 [与... 一起工作 AWS Glue](how-zeppelin-glue.md)。
+ 您的应用程序代码。您的代码实现了您的分析查询或程序。
+ 您的应用程序设置和运行时系统属性。有关应用程序设置和运行时系统属性的信息，请参阅 [Apache Flink 应用程序开发人员指南](https://docs.aws.amazon.com/managed-flink/latest/java/what-is.html)中的下列主题：
  + **应用程序并行度和扩展：**您可以使用应用程序的 Parallelism 设置来控制应用程序可以同时执行的查询数量。如果您的查询有多个执行路径，则还可以利用更高的并行度，例如在以下情况下：
    + 处理 Kinesis 数据流的多个分片时
    + 使用`KeyBy`运算符对数据进行分区时。
    + 使用多个窗口运算符时

    有关应用程序扩展的更多信息，请参阅 Managed [ Service for Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-scaling.html)
  + **日志和监控：有关应用程序日志和监控**的信息，请参阅 Amazon Managed Service for Apache Flink 中的日志记录和监控 Apache Flink[https://docs.aws.amazon.com/managed-flink/latest/java/monitoring-overview.html](https://docs.aws.amazon.com/managed-flink/latest/java/monitoring-overview.html)。
  + 您的应用程序使用检查点和保存点来实现容错。Studio 笔记本默认不启用检查点和保存点。

您可以使用 AWS 管理控制台 或创建 Studio 笔记本 AWS CLI。

从控制台创建应用程序时，您可以选择以下选项：
+ 在 Amazon MSK 控制台**中，选择您的集群，然后选择实时处理数据**。
+ **在 Kinesis Data Streams 控制台中，选择您的数据流，然后在 “应用程序” 选项卡上**选择 “实时处理数据**”。**
+ 在 Managed Service for Apache Flink控制台中，选择 **Studio 选项卡**，然后选择**创建 Studio 笔记本** 。

# 对流数据执行交互式分析
<a name="how-zeppelin-interactive"></a>

您可以使用由 Apache Zeppelin 提供支持的无服务器笔记本与您的流媒体数据进行交互。您的笔记本可以有多个笔记，每个笔记可以有一个或多个段落供您编写代码。

以下示例 SQL 查询显示了如何从数据源检索数据：

```
%flink.ssql(type=update)
select * from stock;
```

有关 Flink Streaming SQL 查询的更多示例，请参阅下文中的 [Managed Service for Apache Flink 中的 Studio 笔记本示例和教程](how-zeppelin-examples.md) 和 Apache Flink 文档中的[查询](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/overview/)。

您可以在 Studio 笔记本中使用 Flink SQL 查询来查询流媒体数据。你也可以使用 Python（表 API）和 Scala（表和数据流 APIs）来编写程序，以交互方式查询你的流数据。您可以查看查询或程序的结果，在几秒钟内对其进行更新，然后重新运行它们以查看更新的结果。

## Flink 解释器
<a name="how-zeppelin-interactive-interpreters"></a>

*您可以使用解释器指定 Managed Service for Apache Flink使用哪种语言来运行您的应用程序。*以下解释器与 Managed Service for Apache Flink


| Name | 类 | 说明 | 
| --- |--- |--- |
| %flink | FlinkInterpreter | 创建 ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironment并提供一个 Scala 环境 | 
| %flink.pyflink | PyFlinkInterpreter | 提供一个 python 环境 | 
| %flink.ipyflink | IPyFlinkInterpreter | 提供一个 ipython 环境 | 
| %flink.ssql | FlinkStreamSqlInterpreter | 提供一个流 sql 环境 | 
| %flink.bsql | FlinkBatchSqlInterpreter | 提供批处理 sql 环境 | 

有关 Flink 解释器的更多信息，请参阅 [Apache Zeppelin 的 Flink 解释器。](https://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html)

如果您使用`%flink.pyflink`或`%flink.ipyflink`作为解释器，则需要使用在`ZeppelinContext`笔记本中可视化结果。

有关更 PyFlink 具体的示例，请参阅[使用适用于 Apache Flink Studio 和 Python 的托管服务以交互方式查询您的数据流](https://aws.amazon.com/blogs/big-data/query-your-data-streams-interactively-using-kinesis-data-analytics-studio-and-python/)。

## Apache Flink 表环境变量
<a name="how-zeppelin-interactive-env-vars"></a>

Apache Zeppelin 提供使用环境变量访问表环境资源的权限。

您可以使用以下变量访问 Scala 表环境资源：


| 变量 | 资源 | 
| --- |--- |
| senv | StreamExecutionEnvironment | 
| stenv | StreamTableEnvironment for blink planner | 

您可以使用以下变量访问 Python 表环境资源：


| 变量 | 资源 | 
| --- |--- |
| s\$1env | StreamExecutionEnvironment | 
| st\$1env | StreamTableEnvironment for blink planner | 

有关使用表环境的更多信息，请参阅 Apache Flink 文档中的[概念和常见 API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/common/)。

# 作为具有持久状态的应用程序进行部署
<a name="how-notebook-durable"></a>

您可以构建自己的代码并将其导出到 Amazon S3。您可以将您在笔记中编写的代码提升到持续运行的流处理应用程序。在 Managed Service for Apache Flink 上运行 Apache Flink 应用程序有两种模式：使用 Studio 笔记本，您可以交互式地开发代码，实时查看代码结果，并在笔记中对其进行可视化。将备注部署为在流模式下运行后，Managed Service for Apache Flink 会为您创建一个持续运行、从源中读取数据、写入目标、维护长时间运行的应用程序状态并根据源流的吞吐量自动缩放的应用程序。

**注意**  
将应用程序代码导出到 S3 存储桶必须与 Studio 笔记本位于同一区域。

只有在 Studio 笔记本上部署满足以下条件的笔记：
+ 段落必须按顺序排序。部署应用程序时，注释中的所有段落将按笔记中显示的顺序 (left-to-right, top-to-bottom) 执行。您可以通过在备注中选择 “**运行所有段落**” 来检查此顺序。
+ 您的代码是 Python 和 SQL 或 Scala 和 SQL 的组合。我们目前不支持 Python 和 Scala。 deploy-as-application
+ 您的笔记应该只有以下解释器：`%flink`、`%flink.ssql`、`%flink.pyflink`、`%flink.ipyflink`、`%md`。
+ 不支持使用[齐柏林飞艇上下文](https://zeppelin.apache.org/docs/0.9.0/usage/other_features/zeppelin_context.html)对象`z`。不返回任何内容的方法除了记录警告之外什么都不做。其他方法会引发 Python 异常或无法在 Scala 中编译。
+ 注释必须生成一个 Apache Flink 任务。
+ 不支持将带有[动态表单](https://zeppelin.apache.org/docs/0.9.0/usage/dynamic_form/intro.html)的注释部署为应用程序。
+ 在部署为应用程序时，将跳过 %md ([Markdown](https://zeppelin.apache.org/docs/0.9.0/interpreter/markdown.html)) 段落，因为这些段落应包含人类可读的文档，不适合作为生成的应用程序的一部分运行。
+ 在部署为应用程序时，将跳过禁止在齐柏林飞艇中运行的段落。即使禁用的段落使用了不兼容的解释器，例如，`%flink.ipyflink`在带有`%flink``and %flink.ssql`解释器的注释中，也会在将注释部署为应用程序时跳过该段落，并且不会导致错误。
+ 要成功部署应用程序，必须至少有一个支持运行的源代码（Flink SQL PyFlink 或 Flink Scala）段落。
+ 在通过注释部署的应用程序中，在段落中的解释器指令（例如`%flink.ssql(parallelism=32)`）中设置并行度将被忽略。相反，您可以通过 AWS Command Line Interface 或 AWS API 更新已部署的应用程序 AWS 管理控制台，以根据应用程序所需的并行度级别更改 Parallelism and/or ParallelismPer KPU 设置，也可以为已部署的应用程序启用自动缩放。
+ 如果您要部署为具有持久状态的应用程序，则您的 VPC 必须可以访问互联网。如果您的 VPC 无法访问互联网，请参阅[在无法访问互联网的 VPC 中作为具有持久状态的应用程序进行部署](how-zeppelin-troubleshooting.md#how-zeppelin-troubleshooting-deploying-no-internet)。

## Scala/Python 标准
<a name="how-notebook-durable-scala"></a>
+ 在您的 Scala 或 Python 代码中，使用 [Blink 计划程序](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/#dependency-structure)（`senv``stenv`对于 Scala；`s_env`对于 `st_env` Python），而不是使用较旧的 “Flink” 计划程序（`stenv_2`对于 Scala，`st_env_2`对于 Python）。Apache Flink 项目建议在生产用例中使用 Blink 计划程序，这是齐柏林飞艇和 Flink 中的默认计划程序。
+ 你的 Python 段落不得使用 [shell 调用/赋值](https://ipython.readthedocs.io/en/stable/interactive/python-ipython-diff.html#shell-assignment)，也不得在要作为应用程序部署的注释`%conda`中使用`!`[IPython 魔法命令](https://ipython.readthedocs.io/en/stable/interactive/magics.html)，比如`%timeit`或注释。
+ 您不能使用 Scala 案例类作为传递给高阶数据流运算符（如和）的函数的参数。`map` `filter`有关 Scala 案例类的信息，请参阅 Scala 文档中的[案例类](https://docs.scala-lang.org/overviews/scala-book/case-classes.html)。

## SQL 条件
<a name="how-notebook-durable-sql"></a>
+ 不允许使用简单的 SELECT 语句，因为没有任何地方可以与段落的输出部分相提并论，可以传递数据。
+ 在任何给定的段落中，DDL 语句 (`USE`、`CREATE`、`ALTER`、`DROP`、`SET`、`RESET`) 必须在 DML (`INSERT`) 语句之前。这是因为段落中的 DML 语句必须作为单个 Flink 任务一起提交。
+ 最多应该有一个段落中包含 DML 语句。这是因为，对于该 deploy-as-application功能，我们只支持向 Flink 提交单个作业。

有关更多信息和示例，请参阅[通过 Amazon Managed Service for Apache Flink、Amazon Translate 和 Amazon Comprehend 使用 SQL 函数翻译、编辑和分析流数据](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesisanalytics-MyApplicatioamazon-translate-and-amazon-comprehend/)。

# 审核 Studio 笔记本的 IAM 权限
<a name="how-zeppelin-iam"></a>

当您通过 AWS 管理控制台创建 Studio 笔记本时，Managed Service for Apache Flink 会为您创建一个 IAM 角色。它还会将允许以下访问权限的策略与该角色相关联：


****  

| 服务 | 访问  | 
| --- | --- | 
| CloudWatch 日志 | 列表 | 
| Amazon EC2 | 列表 | 
| AWS Glue | 读/写 | 
| Managed Service for Apache Flink | 读取 | 
| Managed Service for Apache Flink V2 | 读取 | 
| Amazon S3 | 读/写 | 

# 使用连接器和依赖关系
<a name="how-zeppelin-connectors"></a>

连接器使您能够跨各种技术读取和写入数据。Managed Service for Apache Flink 将三个默认连接器与您的 Studio 笔记本捆绑在一起。您还可以使用自定义连接器。有关连接器的更多信息，请参阅 Apache Flink [文档中的表和 SQL 连接器](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/)。

## 默认连接器
<a name="zeppelin-default-connectors"></a>

如果您使用创建 Studio 笔记本，则 Apache Flink 托管服务默认包含以下自定义连接器：`flink-sql-connector-kinesis`、`flink-connector-kafka_2.12`和。 AWS 管理控制台 `aws-msk-iam-auth`要在没有这些自定义连接器的情况下通过主机创建 Studio 笔记本，请选择 “**使用自定义设置创建”** 选项。然后，当您进入 “**配置**” 页面时，清除两个连接器旁边的复选框。

如果您使用 [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)API 创建 Studio 笔记本电脑，则默认情况下不包括`flink-sql-connector-flink`和`flink-connector-kafka`连接器。要添加它们，请在`CustomArtifactsConfiguration`数据类型`MavenReference`中将其指定为 a，如以下示例所示。

`aws-msk-iam-auth`连接器是与 Amazon MSK 配合使用的连接器，其中包括自动通过 IAM 进行身份验证的功能。

**注意**  
以下示例中显示的连接器版本是我们唯一支持的版本。

```
For the Kinesis connector:

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "org.apache.flink",

      "ArtifactId": "flink-sql-connector-kinesis",
      "Version": "1.15.4"

   }      
}]

For authenticating with AWS MSK through AWS IAM:

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "software.amazon.msk",
      "ArtifactId": "aws-msk-iam-auth",
      "Version": "1.1.6"
   }      
}]
            
For the Apache Kafka connector:  

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "org.apache.flink",

      "ArtifactId": "flink-connector-kafka",
      "Version": "1.15.4"

   }      
}]
```

要将这些连接器添加到现有笔记本中，请使用 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)API 操作并在`CustomArtifactsConfigurationUpdate`数据类型`MavenReference`中将其指定为。

**注意**  
您可以`failOnError`将表 API 中的`flink-sql-connector-kinesis`连接器设置为 true。

## 添加依赖关系和自定义连接器
<a name="zeppelin-custom-connectors"></a>

要使用向 Studio 笔记本添加依赖项或自定义连接器，请执行以下步骤： AWS 管理控制台 

1. 将您的自定义连接器的文件上载到 Amazon S3。

1. 在中 AWS 管理控制台，选择用于**创建 Studio 笔记本的自定义**创建选项。

1. 按照 Studio 笔记本的创建工作流程进行操作，直到进入**配置**步骤。

1. 在 “**自定义连接器**” 部分中，选择 “**添加自定义连接器**”。

1. 指定依赖关系或自定义连接器的 Amazon S3 位置。

1. 选择**保存更改**。

要在使用 [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)API 创建新的 Studio 笔记本时添加依赖关系 JAR 或自定义连接器，请在`CustomArtifactsConfiguration`数据类型中指定依赖关系 JAR 或自定义连接器的 Amazon S3 位置。要向现有 Studio 笔记本添加依赖项或自定义连接器，请调用 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)API 操作并在`CustomArtifactsConfigurationUpdate`数据类型中指定依赖关系 JAR 或自定义连接器的 Amazon S3 位置。

**注意**  
在包含依赖项或自定义连接器时，还必须包括所有未捆绑在依赖项或自定义连接器中的传递依赖关系。

# 实施用户定义的函数
<a name="how-zeppelin-udf"></a>

用户定义函数 (UDFs) 是扩展点，允许您调用查询中无法以其他方式表达的常用逻辑或自定义逻辑。你可以使用 Python 或 Java 或 Scala 等 JVM 语言在 Studio UDFs 笔记本中的段落中实现。您还可以将包含以 JVM 语言 UDFs 实现的外部 JAR 文件添加到您的 Studio 笔记本中。

在实现 JARs 该注册子类`UserDefinedFunction`（或您自己的抽象类）的抽象类时，请使用 Apache Maven 中提供的作用域、Gradle 中的`compileOnly`依赖项声明、SBT 中提供的作用域或您的 UDF 项目构建配置中的等效指令。这允许 UDF 源代码针对 Flink 进行编译 APIs，但是 Flink API 类本身并未包含在编译工件中。请参阅 UDF jar 示例中的这个 [pom](https://github.com/aws-samples/kinesis-udfs-textanalytics/blob/ec27108faa48f1a4c5d173ed3a2ef4565b58b5b5/kinesis-udfs-textanalytics-linear/pom.xml#L47)，它在 Maven 项目中符合这样的先决条件。

**注意**  
*有关示例设置，请参阅AWS Machine Learning 博客*中的[通过 Amazon Managed Service for Apache Flink、Amazon Translate 和 Amazon Comprehend 使用 SQL 函数翻译、编辑和分析流数据](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesis-data-analytics-amazon-translate-and-amazon-comprehend/)。

要使用控制台将 UDF JAR 文件添加到 Studio 笔记本中，请执行以下步骤：

1. 将您的 UDF JAR 文件上载到 Amazon S3。

1. 在中 AWS 管理控制台，选择用于**创建 Studio 笔记本的自定义**创建选项。

1. 按照 Studio 笔记本的创建工作流程进行操作，直到进入**配置**步骤。

1. 在**用户定义的函数**部分，选择**添加用户定义的函数**。

1. 指定实现 UDF 的 JAR 文件或 ZIP 文件的 Amazon S3 位置。

1. 选择**保存更改**。

要在使用 [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)API 创建新的 Studio 笔记本时添加 UDF JAR，请在`CustomArtifactConfiguration`数据类型中指定 JAR 位置。要将 UDF JAR 添加到现有 Studio 笔记本中，请调用 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)API 操作并在`CustomArtifactsConfigurationUpdate`数据类型中指定 JAR 位置。或者，您可以使用将 UDF JAR 文件 AWS 管理控制台 添加到您的 Studio 笔记本中。

## 用户定义的函数的注意事项
<a name="how-zeppelin-udf-considerations"></a>
+ Managed Service for Apache Flink Studio 使用 [Apache Zeppelin 的术语](https://zeppelin.apache.org/docs/0.9.0/quickstart/explore_ui.html)，其中笔记本是可以包含多个音符的齐柏林飞艇实例。然后，每个注释可以包含多个段落。通过 Managed Service for Apache Flink Studio，解释器流程可以在笔记本中的所有笔记中共享。因此，如果您在一个注释中使用 Function 执行显式[createTemporarySystem函数](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableEnvironment.html#createTemporarySystemFunction-java.lang.String-java.lang.Class-)注册，则可以在同一笔记本的另一个注释中按原样引用相同的函数。

  但是，“*部署为应用程序*” 操作仅适用于*单个*笔记，而不是笔记本中的所有笔记。执行部署为应用程序时，仅使用活动注释的内容来生成应用程序。在其他 笔记本 中执行的任何显式函数注册都不是生成的应用程序依赖关系的一部分。此外，在 “部署为应用程序” 选项期间，通过将 JAR 的主类名转换为小写字符串来进行隐式函数注册。

   例如，如果`TextAnalyticsUDF`是 UDF JAR 的主类，则隐式注册将生成函数名称`textanalyticsudf`。因此，如果 Studio 注释 1 中的显式函数注册如下所示，那么`myNewFuncNameForClass`由于共享解释器，该笔记本中的所有其他注释（比如注释 2）都可以按名称引用该函数：

  `stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())`

   但是，在注释 2 中作为应用程序部署操作期间，此显式注册*将不包含*在依赖项中，因此已部署的应用程序将无法按预期运行。由于采用了隐式注册，因此默认情况下，对该函数的所有引用都应该是 with `textanalyticsudf` 和 not `myNewFuncNameForClass`。

   如果需要注册自定义函数名，那么注释 2 本身应该包含另一段来执行另一次显式注册，如下所示：

  ```
  %flink(parallelism=l)
  import com.amazonaws.kinesis.udf.textanalytics.TextAnalyticsUDF 
  # re-register the JAR for UDF with custom name
  stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())
  ```

  ```
  %flink. ssql(type=update, parallelism=1) 
  INSERT INTO
      table2
  SELECT
      myNewFuncNameForClass(column_name)
  FROM
      table1
  ;
  ```
+ 如果您的 UDF JAR 包含 Flink SDKs，请配置您的 Java 项目，以便 UDF 源代码可以针对 Flink 进行编译 SDKs，但是 Flink SDK 类本身并未包含在构建工件（例如 JAR）中。

  您可以在 Apache Maven 中使用`provided`作用域，在 Gradle 中使用`compileOnly`依赖关系声明，在 SBT 中使用`provided`作用域，或者在他们的 UDF 项目构建配置中使用等效指令。您可以从 UDF jar 示例中引用这个 [pom](https://github.com/aws-samples/kinesis-udfs-textanalytics/blob/ec27108faa48f1a4c5d173ed3a2ef4565b58b5b5/kinesis-udfs-textanalytics-linear/pom.xml#L47)，它在 maven 项目中遵循了这样的先决条件。有关完整的 step-by-step教程，请参阅此[使用适用于 Apache Flink、Amazon Translate 和 Amazon Comprehend 的亚马逊托管服务的 SQL 函数翻译、编辑和分析流数据](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesis-data-analytics-amazon-translate-and-amazon-comprehend/)。

# 启用检查点
<a name="how-zeppelin-checkpoint"></a>

您可以使用环境设置启用检查点功能。有关检查点的信息，请参阅[《Managed Service for Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/) 开发人员指南》中的[容错能力](https://docs.aws.amazon.com/managed-flink/latest/java/how-fault.html)。

## 设置检查点间隔
<a name="how-zeppelin-checkpoint-interval"></a>

以下 Scala 代码示例将应用程序的检查点间隔设置为一分钟：

```
// start a checkpoint every 1 minute
stenv.enableCheckpointing(60000)
```

以下 Python 代码示例将应用程序的检查点间隔设置为一分钟：

```
st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.interval", "1min"
)
```

## 设置检查点类型
<a name="how-zeppelin-checkpoint-type"></a>

以下 Scala 代码示例将应用程序的检查点模式设置为`EXACTLY_ONCE`（默认）：

```
// set mode to exactly-once (this is the default)
stenv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
```

以下 Python 代码示例将应用程序的检查点模式设置为`EXACTLY_ONCE`（默认）：

```
st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.mode", "EXACTLY_ONCE"
)
```

# 升级 Studio 运行时
<a name="upgrading-studio-runtime"></a>

本部分包含有关如何升级 Studio 笔记本运行时的信息。我们建议您始终升级到最新支持的 Studio 运行时。

## 将您的笔记本升级到新的 Studio 运行时
<a name="upgrading-notebook"></a>

根据您使用 Studio 的方式，升级运行时的步骤会有所不同。选择适用于使用案例的选项：

### 没有外部依赖关系的 SQL 查询或 Python 代码
<a name="notebook-no-dependencies"></a>

如果您正在使用没有任何外部依赖关系的 SQL 或 Python，请使用以下运行时升级过程。建议您升级到最新的运行时版本。无论您要升级哪个运行时版本，升级过程均相同。

1. 使用最新的运行时创建新的 Studio 笔记本。

1. 将旧笔记本中每条备注的代码复制并粘贴到新笔记本上。

1. 在新笔记本中，调整代码，使其兼容与先前版本相比已更改的任何 Apache Flink 功能。
   + 运行新笔记本。打开笔记本并按顺序逐条运行备注，然后测试它是否正常运作。
   + 对代码进行任何必要的更改。
   + 停止运行新笔记本。

1. 如果您已将旧笔记本部署为应用程序：
   + 将新笔记本部署为单独的新应用程序。
   + 停止运行旧应用程序。
   + 在没有快照的情况下运行新应用程序。

1. 如果旧笔记本正在运行，请将其停止。根据需要启动新的笔记本以进行交互式使用。

**在没有外部依赖关系的情况下升级的过程流程**

![\[下图显示在没有外部依赖关系的情况下升级笔记本的建议工作流程。\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/images/MSF-Studio-upgrade-without-dependencies.png)


### 具有外部依赖关系的 SQL 查询或 Python 代码
<a name="notebook-dependencies"></a>

如果您使用 SQL 或 Python 并使用外部依赖项，例如连接器或自定义构件（示例包括以 Python 或 Java 实施的用户定义函数），则请遵循此过程。建议您升级到最新的运行时。无论您从哪个运行时版本进行升级，升级过程均相同。

1. 使用最新的运行时创建新的 Studio 笔记本。

1. 将旧笔记本中每条备注的代码复制并粘贴到新笔记本上。

1. 更新外部依赖关系和自定义构件。
   + 寻找与新运行时的 Apache Flink 版本兼容的新连接器。请参阅 Apache Flink 文档中的[表和 SQL 连接器](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/overview/)，寻找适用于 Flink 版本的正确连接器。
   + 更新用户定义函数的代码，使其与 Apache Flink API 中的更改以及用户定义函数使用的任何 Python 或 JAR 依赖关系相匹配。重新打包更新的自定义构件。
   + 将这些新的连接器和构件添加到新笔记本中。

1. 在新笔记本中，调整代码，使其兼容与先前版本相比已更改的任何 Apache Flink 功能。
   + 运行新笔记本。打开笔记本并按顺序逐条运行备注，然后测试它是否正常运作。
   + 对代码进行任何必要的更改。
   + 停止运行新笔记本。

1. 如果您已将旧笔记本部署为应用程序：
   + 将新笔记本部署为单独的新应用程序。
   + 停止运行旧应用程序。
   + 在没有快照的情况下运行新应用程序。

1. 如果旧笔记本正在运行，请将其停止。根据需要启动新的笔记本以进行交互式使用。

**在有外部依赖关系的情况下升级的过程流程**

![\[下图显示在没有外部依赖关系的情况下升级笔记本的建议工作流程。\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/images/MSF-Studio-upgrade-with-dependencies.png)


# 与... 一起工作 AWS Glue
<a name="how-zeppelin-glue"></a>

您的 Studio 笔记本存储并从中获取有关其数据源和接收器的信息 AWS Glue。创建 Studio 笔记本时，需要指定包含您的连接信息 AWS Glue 的数据库。访问数据源和接收器时，需要指定数据库中包含的 AWS Glue 表。您的 AWS Glue 表提供对 AWS Glue 连接的访问权限，这些连接定义了数据源和目标的位置、架构和参数。

Studio 笔记本使用表属性来存储应用程序特定的数据。有关更多信息，请参阅 [表属性](how-zeppelin-glue-properties.md)。

有关如何设置 AWS Glue 连接、数据库和表以用于 Studio 笔记本的示例，请参阅[教程：在 Managed Service for Apache Flink 中创建 Studio 笔记本](example-notebook.md)教程[创建 AWS Glue 数据库](example-notebook.md#example-notebook-glue)中的。

# 表属性
<a name="how-zeppelin-glue-properties"></a>

除了数据字段外，您的 AWS Glue 表还使用表格属性向 Studio 笔记本提供其他信息。适用于 Apache Flink 的托管服务使用以下 AWS Glue 表格属性：
+ [定义 Apache Flink 时间值](#how-zeppelin-glue-timestamp)：这些属性定义了 Managed Service for Apache Flink如何发出 Apache Flink 内部数据处理时间值。
+ [使用 Flink 连接器和格式属性](#how-zeppelin-glue-connector)：这些属性提供有关您的数据流的信息。

要向 AWS Glue 表中添加属性，请执行以下操作：

1. 登录 AWS 管理控制台 并打开 AWS Glue 控制台，网址为[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)。

1. 从表的列表中，选择应用程序用于存储其数据连接信息的表。选择 “**操作****”、“编辑表格详细信息**”。

1. 在 “**表属性**” 下，输入 “**managed-flink.proctime****键**” 和 **user\$1action\$1time** “**值**”。

## 定义 Apache Flink 时间值
<a name="how-zeppelin-glue-timestamp"></a>

[Apache Flink 提供了描述何时发生流处理事件的时间值，例如[处理时间和事件时间](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#processing-time)。](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#event-time)要将这些值包含在应用程序输出中，需要在 AWS Glue 表上定义属性，告诉 Apache Flink 托管服务 Flink 运行时将这些值发送到指定字段中。

您在表属性中使用的键和值如下所示：


| 时间戳类型 | 键 | 值 | 
| --- |--- |--- |
| [处理时间](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#processing-time) | managed-flink.proctim | 用于显示值的列名。 AWS Glue 此列名与现有表列不对应。 | 
| [活动时间](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#event-time) | managed-flink.rowtime | 用于显示值的列名。 AWS Glue 此列名对应于现有的表列。 | 
| managed-flink.waterm。 *column\$1name*.mliseconds | 水印间隔（以毫秒为单位） | 

## 使用 Flink 连接器和格式属性
<a name="how-zeppelin-glue-connector"></a>

您可以使用 AWS Glue 表属性向应用程序的 Flink 连接器提供有关数据源的信息。以下是 Managed Service for Apache Flink 用于连接器的一些属性示例：


| 连接器类型 | 键 | 值 | 
| --- |--- |--- |
| [Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/kafka.html#connector-options) | format | 用于反序列化和序列化 Kafka 消息的格式，例如或。json csv | 
| scan.startup.mode | Kafka 消费者的启动模式，例如earliest-offset或timestamp。 | 
| [Kinesis](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kinesis.html#connector-options) | format | 用于反序列化和序列化 Kinesis 数据流记录的格式，例如或。json csv | 
| aws.region | 定义直播的 AWS 区域。 | 
| [S3（文件系统）](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html) | format | 用于反序列化和序列化文件的格式，例如或。json csv | 
| path | 亚马逊 S3 路径，例如 s3://mybucket/ | 

有关除 Kinesis 和 Apache Kafka 之外的其他连接器的更多信息，请参阅您的连接器文档。

# Managed Service for Apache Flink 中的 Studio 笔记本示例和教程
<a name="how-zeppelin-examples"></a>

**Topics**
+ [教程：在 Managed Service for Apache Flink 中创建 Studio 笔记本](example-notebook.md)
+ [教程：将 Studio 笔记本部署为具有持久状态的 Managed Service for Apache Flink 应用程序。](example-notebook-deploy.md)
+ [查看用于分析 Studio 笔记本中数据的示例查询](how-zeppelin-sql-examples.md)

# 教程：在 Managed Service for Apache Flink 中创建 Studio 笔记本
<a name="example-notebook"></a>

以下教程演示如何创建从 Kinesis 数据流或 Amazon MSK 集群读取数据的 Studio 笔记本。

**Topics**
+ [完成 先决条件](#example-notebook-setup)
+ [创建 AWS Glue 数据库](#example-notebook-glue)
+ [后续步骤：使用 Kinesis Data Streams 或 Amazon MSK 创建 Studio 笔记本](#examples-notebook-nextsteps)
+ [使用 Kinesis Data Streams 创建 Studio 笔记本](example-notebook-streams.md)
+ [使用 Amazon MSK 创建 Studio 笔记本](example-notebook-msk.md)
+ [清除您的应用程序和依赖资源](example-notebook-cleanup.md)

## 完成 先决条件
<a name="example-notebook-setup"></a>

请确保您的版本 AWS CLI 为 2 或更高版本。要安装最新版本 AWS CLI，请参阅[安装、更新和卸载 AWS CLI 版本 2](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html)。

## 创建 AWS Glue 数据库
<a name="example-notebook-glue"></a>

您的 Studio 笔记本使用[AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html)数据库来存储有关您的Amazon MSK 数据来源的元数据。

**创建 AWS Glue 数据库**

1. 打开 AWS Glue 控制台，网址为[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)。

1. 选择 **Add database**（添加数据库）。在“**添加数据库**” 窗口中，输入 **default**“**数据库名称**”。选择**创建**。

## 后续步骤：使用 Kinesis Data Streams 或 Amazon MSK 创建 Studio 笔记本
<a name="examples-notebook-nextsteps"></a>

通过本教程，您可以创建一个使用 Kinesis Data Streams 或 Amazon MSK 的 Studio 笔记本：
+ [使用 Kinesis Data Streams 创建 Studio 笔记本](example-notebook-streams.md)：使用 Kinesis Data Streams，您可以快速创建使用 Kinesis 数据流作为源的应用程序。您只需要创建 Kinesis 数据流作为依赖资源即可。
+ [使用 Amazon MSK 创建 Studio 笔记本](example-notebook-msk.md)：使用 Amazon MSK，您可以创建使用Amazon MSK 集群作为源的应用程序。您需要创建一个 Amazon VPC、一个 Amazon EC2 客户端实例和一个 Amazon MSK 集群作为依赖资源。

# 使用 Kinesis Data Streams 创建 Studio 笔记本
<a name="example-notebook-streams"></a>

本教程描述如何创建使用 Kinesis 数据流作为源的 Studio 笔记本。

**Topics**
+ [完成 先决条件](#example-notebook-streams-setup)
+ [创建 AWS Glue 表格](#example-notebook-streams-glue)
+ [使用 Kinesis Data Streams 创建 Studio 笔记本](#example-notebook-streams-create)
+ [将数据发送到您的 Kinesis 数据流](#example-notebook-streams-send)
+ [测试您的 Studio 笔记本](#example-notebook-streams-test)

## 完成 先决条件
<a name="example-notebook-streams-setup"></a>

在创建 Studio 笔记本之前，请创建 Kinesis 数据流 (`ExampleInputStream`) 。您的应用程序使用此流作为应用程序源。

可以使用 Amazon Kinesis 控制台或以下 AWS CLI 命令创建这些流。有关控制台说明，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[创建和更新数据流](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)。为流命名**ExampleInputStream**并将**打开的分片数**设置为**1**。

要使用创建直播 (`ExampleInputStream`) AWS CLI，请使用以下 Amazon Kinesis 命令`create-stream` AWS CLI 。

```
$ aws kinesis create-stream \
--stream-name ExampleInputStream \
--shard-count 1 \
--region us-east-1 \
--profile adminuser
```

## 创建 AWS Glue 表格
<a name="example-notebook-streams-glue"></a>

您的 Studio 笔记本使用[AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html)数据库来存储有关您的 Kinesis 数据流数据来源的元数据。

**注意**  
您可以先手动创建数据库，也可以让 Managed Service for Apache Flink 在创建笔记本时为您创建数据库。同样，您可以按照本节所述手动创建表，也可以在 Apache Zeppelin 的笔记本中使用 Managed Service for Apache Flink 创建表连接器代码，通过 DDL 语句创建表。然后，您可以签入 AWS Glue 以确保表格已正确创建。

**创建表**

1. 登录 AWS 管理控制台 并打开 AWS Glue 控制台，网址为[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)。

1. 如果您还没有 AWS Glue 数据库，请从左侧导航栏中选择 “**数据库**”。选择 **添加数据库**。在“**添加数据库**” 窗口中，输入 **default**“**数据库名称**”。选择 **创建**。

1. 在左侧导航栏中，选择 **表**。在 “**表**” 页中，选择 “**添加表**”，“**手动添加表**”。

1. 在**设置表的属性**页面中，输入**stock****表格名称**。请务必选择之前创建的数据库。选择 **下一步**。

1. 在**添加数据存储**页面中，选择 **Kinesis**。对于**直播名称**，请输入**ExampleInputStream**。对于 **Kinesis 来源网址**，请选择 Enter。**https://kinesis.us-east-1.amazonaws.com**如果您复制并粘贴 **Kinesis 源网址**，请务必删除所有前导或尾随空格。选择 **下一步**。

1. 在**分类**页面中，选择 **JSON**。选择 **下一步**。

1. 在**定义架构**页面中，选择 Add Column 以添加列。添加具有以下属性的列：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/example-notebook-streams.html)

   选择**下一步**。

1. 在下一页上，验证您的设置，然后选择**完成**。

1. 从表列表中选择新创建的表。

1. 选择 **“编辑表”**，然后添加包含键`managed-flink.proctime`和值的属性`proctime`。

1. 选择**应用**。

## 使用 Kinesis Data Streams 创建 Studio 笔记本
<a name="example-notebook-streams-create"></a>

现在，您已经创建了应用程序使用的资源，接下来就可以创建自己的 Studio 笔记本了。

**Topics**
+ [使用创建 Studio 笔记本 AWS 管理控制台](#example-notebook-create-streams-console)
+ [使用创建 Studio 笔记本 AWS CLI](#example-notebook-msk-create-api)

### 使用创建 Studio 笔记本 AWS 管理控制台
<a name="example-notebook-create-streams-console"></a>

1. [在家打开适用于 Apache Flink 的托管服务控制台？ https://console.aws.amazon.com/managed-flink/ region=us](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)-east-1\$1/应用程序/仪表板。

1. 在 **Managed Service for Apache Flink 应用程序**页面中，选择 **Studio** 选项卡。选择**创建 Studio 笔记本**。
**注意**  
您也可以从 Amazon MSK 或 Kinesis Data Streams 控制台创建 Studio 笔记本，方法是选择输入的 Amazon MSK 集群或 Kinesis 数据流，然后**选择** “实时处理数据”。

1. 在 **创建笔记本实例页面上**，提供以下信息：
   + 输入笔记本**MyNotebook**的名称。
   + 为 Glue **数据库AWS 选择****默认值**。

   选择**创建 Studio 笔记本**。

1. 在**MyNotebook**页面中，选择 “**运行**”。等待“**状态”显示为“****正在运行**”。笔记本电脑运行时会产生费用。

### 使用创建 Studio 笔记本 AWS CLI
<a name="example-notebook-msk-create-api"></a>

要使用创建 Studio 笔记本 AWS CLI，请执行以下操作：

1. 验证账户 ID。创建应用程序时，您需要用到此值。

1. 创建角色`arn:aws:iam::AccountID:role/ZeppelinRole`并通过控制台向自动创建的角色添加以下权限。

   `"kinesis:GetShardIterator",`

   `"kinesis:GetRecords",`

   `"kinesis:ListShards"`

1. 创建以下内容的名为 `create.json` 的文件。确保将占位符值替换为您自己的信息。

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. 要创建应用程序，请运行以下命令：

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. 命令完成后，您会看到显示新 Studio 笔记本详细信息的输出。下面是输出的一个示例。

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. 要开始应用程序，请运行以下命令。请将占位符值替换为账户 ID。

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## 将数据发送到您的 Kinesis 数据流
<a name="example-notebook-streams-send"></a>

要将测试数据发送到您的 Kinesis 数据流，请执行以下操作：

1. 打开 [Kinesis Data Generator (KDG)](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)。

1. 选择**使用创建 Cognito 用户**。 CloudFormation

1.  CloudFormation 控制台随即打开 Kinesis 数据生成器模板。选择**下一步**。

1. 在 **指定堆栈详细信息**页面上，输入 Cognito 用户的用户名和密码。选择 **下一步**。

1. 在 **配置堆栈选项** 页面上，请选择 **下一步**。

1. 在 **Review Kinesis-Data-Generator-Cognito-User** 页面中，选择**我确认 AWS CloudFormation 可能会创建 IAM 资源**。 复选框。选择**创建堆栈**。

1. 等待 CloudFormation 堆栈完成创建。**堆栈完成后，在控制台中打开 **Kinesis-Data-Generator-Cognito-User 堆栈，然后**选择输出选项卡。 CloudFormation **打开为**KinesisDataGeneratorUrl**输出值列出的 URL。

1. 在 **Amazon Kinesis 数据生成器**页面中，使用您在步骤 4 中创建的凭证登录。

1. 在下一页，提供以下值：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/example-notebook-streams.html)

   对于 “**记录模板**”，粘贴以下代码：

   ```
   {
       "ticker": "{{random.arrayElement(
           ["AMZN","MSFT","GOOG"]
       )}}",
       "price": {{random.number(
           {
               "min":10,
               "max":150
           }
       )}}
   }
   ```

1. 选择 “**发送数据**”。

1. 生成器会将数据发送到 Kinesis 数据流。

   在完成下一部分的同时，让发电机继续运行。

## 测试您的 Studio 笔记本
<a name="example-notebook-streams-test"></a>

在本节中，您将使用 Studio 笔记本来查询 Kinesis 数据流中的数据。

1. [在家打开适用于 Apache Flink 的托管服务控制台？ https://console.aws.amazon.com/managed-flink/ region=us](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)-east-1\$1/应用程序/仪表板。

1. 在 **Managed Service for Apache Flink 应用程序**页面上，选择 **Studio 笔记本**选项卡。选择 **MyNotebook**。

1. 在**MyNotebook**页面中，选择 “在 **Apache 齐柏林飞艇中打开**”。

   Apache Zeppelin 接口会在新选项卡中打开。

1. 在《**欢迎来到齐柏**林飞艇》中！ 页面上，选择**齐柏林飞艇笔记**。

1. 在 **Zeppelin Note 页面中**，在新笔记中输入以下查询：

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   选择运行图标。

   片刻之后，注释将显示来自 Kinesis 数据流的数据。

要打开应用程序的 Apache Flink 控制面板以查看操作方面，请选择 **FLINK JOB**。有关 Flink 控制面板的更多信息，请参阅《[Managed Service for Apache Flink [开发者指南》中的 Apache](https://docs.aws.amazon.com/) Flink 控制面板](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html)。

有关 Flink Streaming SQL 查询的更多示例，请参阅 [Apache Fl](https://nightlies.apache.org/flink/flink-docs-release-1.15/) ink 文档中的[查询](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)。

# 使用 Amazon MSK 创建 Studio 笔记本
<a name="example-notebook-msk"></a>

本教程描述如何创建使用 Amazon MSK 集群作为源的 Studio 笔记本。

**Topics**
+ [设置 Amazon MSK 集群](#example-notebook-msk-setup)
+ [将 NAT 网关添加到您的 VPC](#example-notebook-msk-nat)
+ [创建 AWS Glue 连接和表](#example-notebook-msk-glue)
+ [使用 Amazon MSK 创建 Studio 笔记本](#example-notebook-msk-create)
+ [向您的Amazon MSK 集群发送数据](#example-notebook-msk-send)
+ [测试您的 Studio 笔记本](#example-notebook-msk-test)

## 设置 Amazon MSK 集群
<a name="example-notebook-msk-setup"></a>

在此教程中，您需要一个允许纯文本访问的 Amazon MSK 集群。如果您尚未设置 Amazon MSK 集群，请按照[使用亚马逊 MSK 入门](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)教程创建亚马逊 VPC、亚马逊 MSK 集群、主题和亚马逊 EC2 客户端实例。

在学习教程时，执行以下操作：
+ 在[步骤 3：创建 Amazon MSK 集群](https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html)中，在步骤 4 中，将`ClientBroker`值从`TLS`更改为**PLAINTEXT**。

## 将 NAT 网关添加到您的 VPC
<a name="example-notebook-msk-nat"></a>

如果您按照[使用 Amazon MSK 入门教程创建了 Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) 集群，或者您的现有 Amazon VPC 还没有用于其私有子网的 NAT 网关，则必须将 NAT 网关添加到您的 Amazon VPC 中。下图演示了架构。

![\[AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/images/vpc_05.png)


要为您的 Amazon VPC 创建 NAT 网关，请执行以下操作：

1. 打开位于 [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 的 Amazon VPC 控制台。

1. 从左侧导航栏中选择 **NAT 网关**。

1. 在 **NAT 网关**页面上，选择**创建 NAT 网关**。

1. 在**创建 NAT 网关**页面上，提供以下值：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/example-notebook-msk.html)

   选择 **Create NAT Gateway**（创建 NAT 网关）。

1. 在左侧导航栏中，选择 **路由表**。

1. 选择 **Create Route Table**。

1. 在 **创建(路由表)** 页面上，提供以下信息：
   + **名称标签：****ZeppelinRouteTable**
   + **VPC**：选择您的 VPC（例如 **AWS KafkaTutorialVPC**）。

   选择**创建**。

1. 在路由表列表中，选择**ZeppelinRouteTable**。选择 **路由**选项卡，然后选择 **编辑路由**。

1. 在**编辑路由**页面上，选择**添加路由**。

1. 在 ******目标位置**字段，输入**0.0.0.0/0**。对于**目标**，选择 **NAT 网关**ZeppelinGateway****。选择 **保存路由**。选择 **关闭**。

1. 在 “路由表” 页面上，**ZeppelinRouteTable**选中，选择 “**子网关联**” 选项卡。选择**编辑子网关联**。

1. 在**编辑子网关联**页面中，选择 **AWS KafkaTutorialSubnet2** 和 **AWS KafkaTutorialSubnet3**。选择**保存**。

## 创建 AWS Glue 连接和表
<a name="example-notebook-msk-glue"></a>

您的 Studio 笔记本使用[AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html)数据库来存储有关您的Amazon MSK 数据来源的元数据。在本节中，您将创建一个描述如何访问您的 Amazon MSK 集群的 AWS Glue 连接，以及一个描述如何将数据源中的数据呈现给客户端（例如 Studio 笔记本）的 AWS Glue 表。

**创建连接**

1. 登录 AWS 管理控制台 并打开 AWS Glue 控制台，网址为[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)。

1. 如果您还没有 AWS Glue 数据库，请从左侧导航栏中选择 “**数据库**”。选择 **添加数据库**。在“**添加数据库**” 窗口中，输入 **default**“**数据库名称**”。选择 **创建**。

1. 从左侧导航菜单中，选择**连接**。选择 **添加连接**。

1. 在 “**添加连接**” 窗口中，提供以下值：
   + 对于 **连接名称**，输入 **ZeppelinConnection**。
   + 对于 **Connection type (连接类型)**，选择 **Kafka**。
   + 对于 **Kafka 引导服务器 URLs**，请为您的集群提供引导代理字符串。您可以从 MSK 控制台或通过输入以下 CLI 命令来获取引导程序代理：

     ```
     aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
     ```
   + 取消选中 “**需要 SSL 连接**” 复选框。

   选择 **下一步**。

1. 在 **VPC** 页面上，提供以下值：
   + 对于 **VPC**，请选择您的 VPC 的名称（例如 ** AWS KafkaTutorialVPC**。）
   + 对于**子网**，选择 **AWS KafkaTutorialSubnet2**。
   + 对于**安全组**，请选择所有可用的组。

   选择 **下一步**。

1. 在“**连接属性**/**连接访问权限**” 页中，选择“**完成**”

**创建表**
**注意**  
您可以按照以下步骤所述手动创建表，也可以在 Apache Zeppelin 的笔记本中使用 Managed Service for Apache Flink创建表连接器代码，通过 DDL 语句创建表。然后，您可以签入 AWS Glue 以确保表格已正确创建。

1. 在左侧导航栏中，选择 **表**。在 “**表**” 页中，选择 “**添加表**”，“**手动添加表**”。

1. 在**设置表的属性**页面中，输入**stock****表格名称**。请务必选择之前创建的数据库。选择 **下一步**。

1. 在**添加数据存储**页面中，选择 **Kafka**。在**主题名称**中，输入您的主题名称（例如 **AWS KafkaTutorialTopic**）。对于 “**连接**”，选择**ZeppelinConnection**。

1. 在**分类**页面中，选择 **JSON**。选择 **下一步**。

1. 在**定义架构**页面中，选择 Add Column 以添加列。添加具有以下属性的列：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/example-notebook-msk.html)

   选择**下一步**。

1. 在下一页上，验证您的设置，然后选择**完成**。

1. 从表列表中选择新创建的表。

1. 选择**编辑表格**并添加以下属性：
   + 键：`managed-flink.proctime`，值：`proctime`
   + 键：`flink.properties.group.id`，值：`test-consumer-group`
   + 键：`flink.properties.auto.offset.reset`，值：`latest`
   + 键：`classification`，值：`json`

   如果没有这些键/值对，Flink 笔记本就会遇到错误。

1. 选择**应用**。

## 使用 Amazon MSK 创建 Studio 笔记本
<a name="example-notebook-msk-create"></a>

现在，您已经创建了应用程序使用的资源，接下来就可以创建自己的 Studio 笔记本了。

**Topics**
+ [使用创建 Studio 笔记本 AWS 管理控制台](#example-notebook-create-msk-console)
+ [使用创建 Studio 笔记本 AWS CLI](#example-notebook-msk-create-api)

**注意**  
您也可以从 Amazon MSK 控制台创建 Studio 笔记本，方法是选择现有集群，然后选择 “**实时处理数据**”。

### 使用创建 Studio 笔记本 AWS 管理控制台
<a name="example-notebook-create-msk-console"></a>

1. [在家打开适用于 Apache Flink 的托管服务控制台？ https://console.aws.amazon.com/managed-flink/ region=us](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)-east-1\$1/应用程序/仪表板。

1. 在 **Managed Service for Apache Flink 应用程序**页面中，选择 **Studio** 选项卡。选择**创建 Studio 笔记本**。
**注意**  
要从Amazon MSK 或 Kinesis Data Streams 控制台创建 Studio 笔记本，请选择您输入的Amazon MSK 集群或 Kinesis 数据流，然后**选择“**实时处理数据”。

1. 在 **创建笔记本实例** 页面上，提供以下信息：
   + 输入 ****MyNotebook** Studio 笔记本的名称**。
   + 为 **Glue 数据库AWS 选择****默认值**。

   选择**创建 Studio 笔记本**。

1. 在该**MyNotebook**页面中，选择 “**配置**” 选项卡。在 **联网** 部分中，选择 **编辑**。

1. 在**编辑网络连接 MyNotebook**页面中，选择**基于 Amazon MSK 集群的 VPC 配置**。为Amazon MSK 集群选择您的**Amazon MSK** 集群。选择 **Save changes（保存更改）**。

1. 在**MyNotebook**页面中，选择 “**运行**”。等待“**状态”显示为“****正在运行**”。

### 使用创建 Studio 笔记本 AWS CLI
<a name="example-notebook-msk-create-api"></a>

要使用创建 Studio 笔记本 AWS CLI，请执行以下操作：

1. 验证您具有以下信息：创建应用程序时，您需要用到这些值。
   + 您的 账户 ID
   + 包含您的 Amazon MSK 集群的 Amazon VPC 的子网 IDs 和安全组 ID。

1. 创建以下内容的名为 `create.json` 的文件。确保将占位符值替换为您自己的信息。

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "VpcConfigurations": [
               {
                   "SubnetIds": [
                       "SubnetID 1",
                       "SubnetID 2",
                       "SubnetID 3"
                   ],
                   "SecurityGroupIds": [
                       "VPC Security Group ID"
                   ]
               }
           ],
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. 要创建应用程序，请运行以下命令：

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. 命令完成后，您应该会看到类似于以下内容的输出，其中显示了新 Studio 笔记本的详细信息：

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. 要开始应用程序，请运行以下命令。请将占位符值替换为账户 ID。

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## 向您的Amazon MSK 集群发送数据
<a name="example-notebook-msk-send"></a>

在本节中，你将在亚马逊 EC2 客户端中运行 Python 脚本，将数据发送到你的亚马逊 MSK 数据源。

1. 连接到您的亚马逊 EC2 客户端。

1. 运行以下命令安装 Python 版本 3、Pip 和 Kafka for Python 软件包，然后确认操作：

   ```
   sudo yum install python37
   curl -O https://bootstrap.pypa.io/get-pip.py
   python3 get-pip.py --user
   pip install kafka-python
   ```

1. 通过输入以下命令 AWS CLI 在您的客户端计算机上进行配置：

   ```
   aws configure
   ```

   提供您的账户凭证，**us-east-1**并提供`region`。

1. 创建以下内容的名为 `stock.py` 的文件。将示例值替换为您的 Amazon MSK 集群的 Bootstrap Brokers 字符串，如果您的主题不是，请更新主题名称：**AWS KafkaTutorialTopic**

   ```
   from kafka import KafkaProducer
   import json
   import random
   from datetime import datetime
   
   BROKERS = "<<Bootstrap Broker List>>"
   producer = KafkaProducer(
       bootstrap_servers=BROKERS,
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       retry_backoff_ms=500,
       request_timeout_ms=20000,
       security_protocol='PLAINTEXT')
   
   
   def getStock():
       data = {}
       now = datetime.now()
       str_now = now.strftime("%Y-%m-%d %H:%M:%S")
       data['event_time'] = str_now
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data =getStock()
       # print(data)
       try:
           future = producer.send("AWSKafkaTutorialTopic", value=data)
           producer.flush()
           record_metadata = future.get(timeout=10)
           print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
       except Exception as e:
           print(e.with_traceback())
   ```

1. 使用以下命令运行脚本：

   ```
   $ python3 stock.py
   ```

1. 完成以下部分后，请让脚本继续运行。

## 测试您的 Studio 笔记本
<a name="example-notebook-msk-test"></a>

在本节中，您将使用 Studio 笔记本查询来自 Amazon MSK 集群的数据。

1. [在家打开适用于 Apache Flink 的托管服务控制台？ https://console.aws.amazon.com/managed-flink/ region=us](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)-east-1\$1/应用程序/仪表板。

1. 在 **Managed Service for Apache Flink 应用程序**页面上，选择 **Studio 笔记本**选项卡。选择 **MyNotebook**。

1. 在**MyNotebook**页面中，选择 “在 **Apache 齐柏林飞艇中打开**”。

   Apache Zeppelin 接口会在新选项卡中打开。

1. 在**欢迎来到 Zeppelin\$1**页面上，选择**Zeppelin 新笔记**。

1. 在 **Zeppelin Note **页面上，在新笔记中输入以下查询：

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   选择运行图标。

   该应用程序显示来自 Amazon MSK 集群的数据。

要打开应用程序的 Apache Flink 仪表板以查看操作方面，请选择 **FLINK JOB**。有关 Flink 控制面板的更多信息，请参阅《[Managed Service for Apache Flink [开发者指南》中的 Apache](https://docs.aws.amazon.com/) Flink 控制面板](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html)。

有关 Flink Streaming SQL 查询的更多示例，请参阅 [Apache Fl](https://nightlies.apache.org/flink/flink-docs-release-1.15/) ink 文档中的[查询](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)。

# 清除您的应用程序和依赖资源
<a name="example-notebook-cleanup"></a>

## 删除您的 Studio 笔记本
<a name="example-notebook-cleanup-app"></a>

1. 打开 Managed Service for Apache Flink 控制台。

1. 选择 **MyNotebook**。

1. 选择**操作**，然后选择**删除**。

## 删除您的 AWS Glue 数据库和连接
<a name="example-notebook-cleanup-glue"></a>

1. 打开 AWS Glue 控制台，网址为[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)。

1. 从左侧导航栏中选择 **数据库**。选中 “**默认**” 旁边的复选框将其选中。选择**操作**，**删除数据库**。确认您的选择。

1. 从左侧导航菜单中，选择**连接**。选中旁边的复选框**ZeppelinConnection**将其选中。选择**操作**，**删除连接**。确认您的选择。

## 删除 IAM 角色和策略
<a name="example-notebook-msk-cleanup-iam"></a>

1. 使用 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 打开 IAM 控制台。

1. 从左侧导航菜单中，选择 **角色**。

1. 使用搜索栏搜索**ZeppelinRole**角色。

1. 选择**ZeppelinRole**角色。选择 **删除角色**。确认删除操作。

## 删除您的 CloudWatch 日志组
<a name="example-notebook-cleanup-cw"></a>

当您使用控制台创建应用程序时，控制台会为您创建 CloudWatch 日志组和日志流。如果您使用创建应用程序，则没有日志组和流 AWS CLI。

1. 打开 CloudWatch 控制台，网址为[https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/)。

1. 从左侧导航菜单中，选择 **日志组**。

1. 选择**/AWS/KinesisAnalytics/MyNotebook**日志组。

1. 依次选择 **Actions**（操作）和 **Delete log group(s)**（删除日志组）。确认删除操作。

## 清除 Kinesis Data Streams 资源
<a name="example-notebook-cleanup-streams"></a>

要删除您的 Kinesis stream，请打开 Kinesis Data Streams 控制台，选择您的 Kinesis stream，然后选择**操作**、**删除**。

## 清理 MSK 资源
<a name="example-notebook-cleanup-msk"></a>

如果您为本教程创建了 Amazon MSK 集群，请执行本部分中的步骤。本部分包含清理Amazon EC2 客户端实例、Amazon VPC 和 Amazon MSK 集群的说明。

### 删除您的Amazon MSK 集群
<a name="example-notebook-msk-cleanup-msk"></a>

如果您为本教程创建了 Amazon MSK 集群，请执行这些步骤。

1. 在[https://console.aws.amazon.com/msk/家打开亚马逊 MSK 控制台？ region=us](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)-east-1\$1/home/。

1. 选择 **AWS KafkaTutorialCluster**。选择**删除**。**delete**在出现的窗口中输入并确认您的选择。

### 终止您的客户端实例
<a name="example-notebook-msk-cleanup-client"></a>

如果您为本教程创建了 Amazon EC2 客户端实例，请按照以下步骤操作。

1. 打开位于 [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) 的 Amazon EC2 控制台。

1. 从左侧导航栏中选择**实例**。

1. 选中旁边的复选框**ZeppelinClient**将其选中。

1. 依次选择**实例状态**，**终止实例**。

### 删除 Amazon VPC
<a name="example-notebook-msk-cleanup-vpc"></a>

如果您为本教程创建了 Amazon VPC，请按照以下步骤操作。

1. 打开位于 [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) 的 Amazon EC2 控制台。

1. 从左侧导航栏中选择 **“网络接口**”。

1. 在搜索栏输入您的 VPC ID，然后按输入进行搜索。

1. 选中表格标题中的复选框以选择所有显示的网络接口。

1. 依次选择**操作**、**分离**。在出现的窗口中，在 “**强制分离**”下选择 “**启用**”。选择 “**分离**”，然后等待所有网络接口都变为 “**可用**” 状态。

1. 选中表格标题中的复选框，以再次选择所有显示的网络接口。

1. 依次选择**操作**和**删除**。确认该操作。

1. 打开位于 [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 的 Amazon VPC 控制台。

1. 选择 **AWS KafkaTutorialVPC**。依次选择 **操作** 和 **删除 VPC**。输入**delete**并确认删除。

# 教程：将 Studio 笔记本部署为具有持久状态的 Managed Service for Apache Flink 应用程序。
<a name="example-notebook-deploy"></a>

以下教程演示了如何将 Studio Notebook 部署为具有持久状态的 Managed Service for Apache Flink 应用程序。

**Topics**
+ [满足先决条件](#example-notebook-durable-setup)
+ [使用部署具有持久状态的应用程序 AWS 管理控制台](#example-notebook-deploy-console)
+ [使用部署具有持久状态的应用程序 AWS CLI](#example-notebook-deploy-cli)

## 满足先决条件
<a name="example-notebook-durable-setup"></a>

使用 Kinesis Data Streams 或 Amazon MSK 按照[教程：在 Managed Service for Apache Flink 中创建 Studio 笔记本](example-notebook.md)操作创建新的 Studio 笔记本。为 Studio 笔记本命名`ExampleTestDeploy`。

## 使用部署具有持久状态的应用程序 AWS 管理控制台
<a name="example-notebook-deploy-console"></a>

1. 在 “**应用程序代码位置”（控制台中为*可选*）**下添加要存储打包代码的 S3 存储桶位置。这样就可以直接从笔记本部署和运行应用程序的步骤。

1. 向应用程序角色添加所需的权限，以启用您用于读取和写入 Amazon S3 存储桶的角色，以及启动Managed Service for Apache Flink应用程序：
   + 亚马逊 3 FullAccess
   + 亚马逊托管-flinkFullAccess
   + 访问您的来源、目的地 VPCs 以及（如果适用）。有关更多信息，请参阅 [审核 Studio 笔记本的 IAM 权限](how-zeppelin-iam.md)。

1. 使用下面的示例代码：

   ```
   %flink.ssql(type=update) 
   CREATE TABLE exampleoutput (
     'ticket' VARCHAR,
     'price' DOUBLE
   )
   WITH (
     'connector' = 'kinesis',
     'stream' = 'ExampleOutputStream',
     'aws.region' = 'us-east-1',
     'scan.stream.initpos' = 'LATEST',
     'format' = 'json'
   );
   
   INSERT INTO exampleoutput SELECT ticker, price FROM exampleinputstream
   ```

1. 启动此功能后，您将在笔记本中每张笔记的右上角看到一个新的下拉列表，上面写着笔记本的名称。您可执行以下操作：
   + 在中查看 Studio 笔记本的设置 AWS 管理控制台。
   + 制作您的 Zeppelin Note 并将其导出到 Amazon S3。此时，请为您的应用程序提供一个名称，然后选择 “**生成并导出**”。导出完成后，您将收到通知。
   + 如果需要，您可以在 Amazon S3 中查看和运行对可执行文件的任何其他测试。
   + 构建完成后，您将能够将代码部署为具有持久状态和自动扩展功能的 Kinesis 流媒体应用程序。
   + 使用下拉列表并选择**将 Zeppelin Note 部署为 Kinesis 流式应用程序**。查看应用程序名称并选择**通过 AWS 控制台部署**。
   + 这将引导您进入为 Apache Flink 应用程序创建托管服务的 AWS 管理控制台 页面。请注意，应用程序名称、并行度、代码位置、默认 Glue DB、VPC（如果适用）和 IAM 角色已预先填充。验证 IAM 角色是否具有访问您的源和目标所需的权限。默认情况下，快照处于启用状态，以实现持久的应用程序状态管理。
   + 选择**创建应用程序**。
   + 您可以选择**配置**和修改任何设置，然后选择**运行**以启动您的流媒体应用程序。

## 使用部署具有持久状态的应用程序 AWS CLI
<a name="example-notebook-deploy-cli"></a>

要使用部署应用程序 AWS CLI，您必须更新 AWS CLI 以使用与 Beta 2 信息一起提供的服务模型。有关如何使用更新的服务模型的信息，请参阅[完成 先决条件满足先决条件](example-notebook.md#example-notebook-setup)。

以下示例代码将创建一个新的 Studio 笔记本：

```
aws kinesisanalyticsv2 create-application \
     --application-name <app-name> \
     --runtime-environment ZEPPELIN-FLINK-3_0 \
     --application-mode INTERACTIVE \
     --service-execution-role <iam-role>
     --application-configuration '{ 
       "ZeppelinApplicationConfiguration": { 
         "CatalogConfiguration": { 
           "GlueDataCatalogConfiguration": { 
             "DatabaseARN": "arn:aws:glue:us-east-1:<account>:database/<glue-database-name>" 
           } 
         } 
       },
       "FlinkApplicationConfiguration": {
         "ParallelismConfiguration": {
           "ConfigurationType": "CUSTOM",
           "Parallelism": 4,
           "ParallelismPerKPU": 4
         }
       },
       "DeployAsApplicationConfiguration": {
            "S3ContentLocation": { 
               "BucketARN": "arn:aws:s3:::<s3bucket>",
               "BasePath": "/something/"
            }
        },
       "VpcConfigurations": [
         {
           "SecurityGroupIds": [
             "<security-group>"
           ],
           "SubnetIds": [
             "<subnet-1>",
             "<subnet-2>"
           ]
         }
       ]
     }' \
     --region us-east-1
```

以下代码示例将启动一个新的 Studio 笔记本：

```
aws kinesisanalyticsv2 start-application \
    --application-name <app-name> \
    --region us-east-1 \
    --no-verify-ssl
```

以下代码返回应用程序的 Apache Zeppelin 笔记本页面的 URL：

```
aws kinesisanalyticsv2 create-application-presigned-url \
    --application-name <app-name> \
    --url-type ZEPPELIN_UI_URL \

    --region us-east-1 \
    --no-verify-ssl
```

# 查看用于分析 Studio 笔记本中数据的示例查询
<a name="how-zeppelin-sql-examples"></a>

**Topics**
+ [使用 Amazon MSK/Apache Kafka 创建表格](#how-zeppelin-examples-creating-tables)
+ [使用 Kinesis 创建表](#how-zeppelin-examples-creating-tables-with-kinesis)
+ [查询滚动窗口](#how-zeppelin-examples-tumbling)
+ [查询滑动窗口](#how-zeppelin-examples-sliding)
+ [使用交互式 SQL](#how-zeppelin-examples-interactive-sql)
+ [使用 BlackHole SQL 连接器](#how-zeppelin-examples-blackhole-connector-sql)
+ [使用 Scala 生成示例数据](#notebook-example-data-generator)
+ [使用交互式 Scala](#notebook-example-interactive-scala)
+ [使用交互式 Python](#notebook-example-interactive-python)
+ [结合使用交互式 Python、SQL 和 Scala](#notebook-example-interactive-pythonsqlscala)
+ [使用跨账户 Kinesis 数据流](#notebook-example-crossaccount-kds)

有关 Apache Flink SQL 查询设置的信息，请参阅用于交互式数据分析的[齐柏林飞艇笔记本上的 Flink](https://flink.apache.org/ecosystem/2020/06/23/flink-on-zeppelin-part2.html)。

要在 Apache Flink 控制面板中查看您的应用程序，请在应用程序的 **** Zeppelin Note **页面中选择 FLINK ** JOB。

有关窗口查询的更多信息，请参阅 [Apache Flink 文档](https://nightlies.apache.org/flink/flink-docs-release-1.15/)中的 [Windows](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/stream/operators/windows.html)。

有关 Apache Flink Streaming SQL 查询的更多示例，请参阅 [Apache](https://nightlies.apache.org/flink/flink-docs-release-1.15/) Flink 文档中的[查询](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)。

## 使用 Amazon MSK/Apache Kafka 创建表格
<a name="how-zeppelin-examples-creating-tables"></a>

您可以使用带有 Managed Service for Apache Flink Studio 的 Amazon MSK Flink 连接器通过纯文本、SSL 或 IAM 身份验证对您的连接进行身份验证。根据您的要求使用特定属性创建表。

```
-- Plaintext connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- SSL connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
   'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SSL',
  'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts',
  'properties.ssl.truststore.password' = 'changeit',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- IAM connection (or for MSK Serverless)

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'AWS_MSK_IAM',
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);
```

您可以在 [Apache Kafka SQL](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/) Connector 中将它们与其他属性结合使用。

## 使用 Kinesis 创建表
<a name="how-zeppelin-examples-creating-tables-with-kinesis"></a>

在以下示例中，您将使用 Kinesis 创建表：

```
CREATE TABLE KinesisTable (
  `column1` BIGINT,
  `column2` BIGINT,
  `column3` BIGINT,
  `column4` STRING,
  `ts` TIMESTAMP(3)
)
PARTITIONED BY (column1, column2)
WITH (
  'connector' = 'kinesis',
  'stream' = 'test_stream',
  'aws.region' = '<region>',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);
```

有关您可以使用的其他属性的更多信息，请参阅 [Amazon Kinesis Data Streams SQL 连接器](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kinesis/)。

## 查询滚动窗口
<a name="how-zeppelin-examples-tumbling"></a>

以下 Flink Streaming SQL 查询从表中选择每个五秒钟的滚动窗口中的最高价格：`ZeppelinTopic`

```
%flink.ssql(type=update)
SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker
FROM ZeppelinTopic
GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
```

## 查询滑动窗口
<a name="how-zeppelin-examples-sliding"></a>

以下 Apache Flink Streaming SQL 查询从表格中选择每个五秒钟滑动窗口中的最高价格：`ZeppelinTopic`

```
%flink.ssql(type=update)
SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max
FROM ZeppelinTopic//or your table name in AWS Glue
GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
```

## 使用交互式 SQL
<a name="how-zeppelin-examples-interactive-sql"></a>

此示例打印事件时间和处理时间的最大值以及键值表中的值之和。确保您有[使用 Scala 生成示例数据](#notebook-example-data-generator)正在运行的示例数据生成脚本。要在 Studio 笔记本中尝试其他 SQL 查询，例如筛选和联接，请参阅 Apache Flink 文档中的 Apache Flink 文档：[查询](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)。

```
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time.
SELECT
  MAX(`et`) as `et`,
  MAX(`pt`) as `pt`,
  SUM(`value`) as `sum`
FROM
  `key-values`
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive tumbling window query that displays the number of records observed per (event time) second.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT
  TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`,
  `key`,
  SUM(`value`) as `sum`
FROM
  `key-values`
GROUP BY
  TUMBLE(`et`, INTERVAL '1' SECONDS),
  `key`;
```

## 使用 BlackHole SQL 连接器
<a name="how-zeppelin-examples-blackhole-connector-sql"></a>

 BlackHole SQL 连接器不需要您创建 Kinesis 数据流或 Amazon MSK 集群来测试您的查询。有关 BlackHole SQL 连接器的信息，请参阅 Apache Flink 文档中的 [BlackHole SQL 连接器](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/blackhole.html)。在此示例中，默认目录是内存中的目录。

```
%flink.ssql

CREATE TABLE default_catalog.default_database.blackhole_table (
 `key` BIGINT,
 `value` BIGINT,
 `et` TIMESTAMP(3)
) WITH (
 'connector' = 'blackhole'
)
```

```
%flink.ssql(parallelism=1)

INSERT INTO `test-target`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-source`
WHERE
  `key` > 3
```

```
%flink.ssql(parallelism=2)

INSERT INTO `default_catalog`.`default_database`.`blackhole_table`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-target`
WHERE
  `key` > 7
```

## 使用 Scala 生成示例数据
<a name="notebook-example-data-generator"></a>

此示例使用 Scala 生成示例数据。您可以使用此示例数据来测试各种查询。使用 create table 语句创建键值表。

```
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator
import org.apache.flink.streaming.api.scala.DataStream

import java.sql.Timestamp

// ad-hoc convenience methods to be defined on Table 
implicit class TableOps[T](table: DataStream[T]) {
    def asView(name: String): DataStream[T] = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView("`" + name + "`")
      }
      stenv.createTemporaryView("`" + name + "`", table)
      return table;
    }
}
```

```
%flink(parallelism=4)
val stream = senv
 .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000))
 .map(key => (key, 1, new Timestamp(System.currentTimeMillis)))
 .asView("key-values-data-generator")
```

```
%flink.ssql(parallelism=4)
-- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)")
-- in this case the INSERT query will inherit the parallelism of the of the above paragraph
INSERT INTO `key-values`
SELECT
 `_1` as `key`,
 `_2` as `value`,
 `_3` as `et`
FROM
 `key-values-data-generator`
```

## 使用交互式 Scala
<a name="notebook-example-interactive-scala"></a>

这是[使用交互式 SQL](#how-zeppelin-examples-interactive-sql) 的 Scala 翻译。有关更多 Scala 示例，请参阅 Apache Flink 文档中的[表 API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html)。

```
%flink
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
    def asView(name: String): Table = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView(name)
      }
      stenv.createTemporaryView(name, table)
      return table;
    }
}
```

```
%flink(parallelism=4)

// A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time.
val query01 = stenv
  .from("`key-values`")
  .select(
    $"et".max().as("et"),
    $"pt".max().as("pt"),
    $"value".sum().as("sum")
  ).asView("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink(parallelism=4)

// An tumbling window view that displays the number of records observed per (event time) second.
val query02 = stenv
  .from("`key-values`")
  .window(Tumble over 1.seconds on $"et" as $"w")
  .groupBy($"w", $"key")
  .select(
    $"w".start.as("window"),
    $"key",
    $"value".sum().as("sum")
  ).asView("query02")
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## 使用交互式 Python
<a name="notebook-example-interactive-python"></a>

这是[使用交互式 SQL](#how-zeppelin-examples-interactive-sql) 的 Python 翻译。有关更多 Python 示例，请参阅 Apache Flink 文档中的[表 API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html)。

```
%flink.pyflink
from pyflink.table.table import Table

def as_view(table, name):
  if (name in st_env.list_temporary_views()):
    st_env.drop_temporary_view(name)
  st_env.create_temporary_view(name, table)
  return table

Table.as_view = as_view
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`keyvalues`") \
  .select(", ".join([
    "max(et) as et",
    "max(pt) as pt",
    "sum(value) as sum"
  ])) \
  .as_view("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`key-values`") \
  .window(Tumble.over("1.seconds").on("et").alias("w")) \
  .group_by("w, key") \
  .select(", ".join([
    "w.start as window",
    "key",
    "sum(value) as sum"
  ])) \
  .as_view("query02")
```

```
%flink.ssql(type=update, parallelism=16, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## 结合使用交互式 Python、SQL 和 Scala
<a name="notebook-example-interactive-pythonsqlscala"></a>

您可以在笔记本中使用 SQL、Python 和 Scala 的任意组合进行交互式分析。在计划部署为具有持久状态的应用程序的 Studio 笔记本中，可以组合使用 SQL 和 Scala。此示例向您展示了被忽略的部分以及那些在应用程序中部署的具有持久状态的部分。

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-source` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-source-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-target` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-target-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink()

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
  def asView(name: String): Table = {
    if (stenv.listTemporaryViews.contains(name)) {
      stenv.dropTemporaryView(name)
    }
    stenv.createTemporaryView(name, table)
    return table;
  }
}
```

```
%flink(parallelism=1)
val table = stenv
  .from("`default_catalog`.`default_database`.`my-test-source`")
  .select($"key", $"value", $"et")
  .filter($"key" > 10)
  .asView("query01")
```

```
%flink.ssql(parallelism=1)

-- forward data
INSERT INTO `default_catalog`.`default_database`.`my-test-target`
SELECT * FROM `query01`
```

```
%flink.ssql(type=update, parallelism=1, refreshInterval=1000)

-- forward data to local stream (ignored when deployed as application)
SELECT * FROM `query01`
```

```
%flink

// tell me the meaning of life (ignored when deployed as application!)
print("42!")
```

## 使用跨账户 Kinesis 数据流
<a name="notebook-example-crossaccount-kds"></a>

要使用除拥有 Studio 笔记本的账户之外的账户中的 Kinesis 数据流，请在运行 Studio 笔记本的账户中创建服务执行角色，在拥有数据流的账户中创建角色信任策略。在创建表 DDL 语句的 Kinesis 连接器中使用`aws.credentials.provider`、`aws.credentials.role.arn`和`aws.credentials.role.sessionName`，根据数据流创建表。

为 Studio 笔记本帐户使用以下服务执行角色。

```
{
 "Sid": "AllowNotebookToAssumeRole",
 "Effect": "Allow",
 "Action": "sts:AssumeRole"
 "Resource": "*"
}
```

对数据流帐户使用`AmazonKinesisFullAccess`策略和以下角色信任策略。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}
```

------

使用以下段落作为创建表语句。

```
%flink.ssql
CREATE TABLE test1 (
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = 'stream-assume-role-test',
'aws.region' = 'us-east-1',
'aws.credentials.provider' = 'ASSUME_ROLE',
'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role',
'aws.credentials.role.sessionName' = 'stream-assume-role-test-session',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json'
)
```

# Managed Service for Apache Flink 的 Studio 笔记本
<a name="how-zeppelin-troubleshooting"></a>

本节包含 Studio 笔记本的故障排除信息。

## 停止卡住的应用程序
<a name="how-zeppelin-troubleshooting-stopping"></a>

要停止处于瞬态状态的应用程序，请在`Force`参数设置为的情况下调用[StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)操作`true`。有关更多信息，请参阅《Managed Service for Apache Flink 开发人员指南》[https://docs.aws.amazon.com/managed-flink/latest/java/](https://docs.aws.amazon.com/managed-flink/latest/java/)中的[运行应用程序](https://docs.aws.amazon.com/managed-flink/latest/java/how-running-apps.html)。

## 在无法访问互联网的 VPC 中作为具有持久状态的应用程序进行部署
<a name="how-zeppelin-troubleshooting-deploying-no-internet"></a>

适用于 Apache 的托管服务 Flink Studio deploy-as-application 功能不支持无法访问互联网的 VPC 应用程序。我们建议您在 Studio 中构建应用程序，然后使用 Managed Service for Apache Flink 手动创建 Flink 应用程序并选择您在笔记本中构建的 zip 文件。

以下步骤概述了此方法：

1. 构建您的 Studio 应用程序并将其导出到 Amazon S3。这应该是一个 zip 文件。

1. 手动创建 Managed Service for Apache Flink Studio 应用程序，代码路径引用了 Amazon S3 中的 zip 文件位置。此外，还需要使用以下`env`变量配置应用程序（总共2 `groupID`、3`var`）：

1. kinesis.analytics.flink.run.options

   1. python: source/note.py

   1. jarfile：libPythonApplicationDependencies/.jar

1. managed.deploy\$1as\$1app.options

   1. DatabaSearn：*<glue database ARN (Amazon Resource Name)>*

1. 您可能需要向Managed Service for Apache Flink Studio 和 Managed Service for Apache Flink IAM 角色授予应用程序使用的服务的权限。您可以针对这两个应用程序使用相同的 IAM 角色。

## Deploy-as-app 缩短大小和构建时间
<a name="how-zeppelin-troubleshooting-deploying-as-app-reduce-build-time"></a>

Studio deploy-as-app for Python 应用程序打包了 Python 环境中可用的所有内容，因为我们无法确定你需要哪些库。这可能会导致尺寸大于必要的大小。 deploy-as-app以下过程演示如何通过卸载依赖项来减小 deploy-as-app Python 应用程序的大小。

如果您要使用 Studio 中的 deploy-as-app功能构建 Python 应用程序，如果您的应用程序不依赖于，则可以考虑从系统中删除预安装的 Python 包。这不仅有助于减少最终工件的大小以避免突破应用程序大小的服务限制，还可以缩短使用该 deploy-as-app功能的应用程序的构建时间。

可以执行以下命令列出所有已安装的 Python 软件包及其各自的安装大小，并有选择地移除较大的软件包。

```
%flink.pyflink

!pip list --format freeze | awk -F = {'print $1'} | xargs pip show | grep -E 'Location:|Name:' | cut -d ' ' -f 2 | paste -d ' ' - - | awk '{gsub("-","_",$1); print $2 "/" tolower($1)}' | xargs du -sh 2> /dev/null | sort -hr
```

**注意**  
`apache-beam` 是 Flink Python 运作的必要条件。切勿移除此软件包及其依赖项。

以下是 Studio V2 中预安装的 Python 软件包列表，可以考虑移除这些软件包：

```
scipy
statsmodels
plotnine
seaborn
llvmlite
bokeh
pandas
matplotlib
botocore
boto3
numba
```

**要从 Zeppelin 笔记本中移除 Python 软件包，请执行以下操作：**

1. 在移除之前，请检查您的应用程序是否依赖于该软件包或其使用的任何软件包。可以使用 [pipdeptree](https://pypi.org/project/pipdeptree/) 识别软件包的依赖项。

1. 执行以下命令移除软件包：

   ```
   %flink.pyflink
   !pip uninstall -y <package-to-remove>
   ```

1. 如果您需要检索错误移除的软件包，请执行以下命令：

   ```
   %flink.pyflink
   !pip install <package-to-install>
   ```

**Example 示例：在部署带有 deploy-as-app功能的 Python 应用程序之前移除`scipy`软件包。**  

1. 使用 `pipdeptree` 发现所有 `scipy` 使用者并验证是否可以安全地移除 `scipy`。
   + 通过笔记本安装该工具：

     ```
     %flink.pyflink             
     !pip install pipdeptree
     ```
   + 通过运行以下命令获取 `scipy` 的反向依赖项树：

     ```
     %flink.pyflink
     !pip -r -p scipy
     ```

     您应该可以看到类似于如下的输出内容（压缩以提供简洁性）：

     ```
     ...
     ------------------------------------------------------------------------ 
     scipy==1.8.0 
     ├── plotnine==0.5.1 [requires: scipy>=1.0.0] 
     ├── seaborn==0.9.0 [requires: scipy>=0.14.0] 
     └── statsmodels==0.12.2 [requires: scipy>=1.1] 
         └── plotnine==0.5.1 [requires: statsmodels>=0.8.0]
     ```

1. 仔细检查应用程序中 `seaborn`、`statsmodels` 和 `plotnine` 的使用情况。如果您的应用程序不依赖于 `scipy`、`seaborn`、`statemodels` 或 `plotnine`，则可以移除所有这些软件包，或者仅移除应用程序不需要的软件包。

1. 通过运行以下命令移除软件包：

   ```
   !pip uninstall -y scipy plotnine seaborn statemodels
   ```

## 取消作业
<a name="how-notbook-canceling-jobs"></a>

本节向您展示如何取消无法从 Apache Zeppelin 获得的 Apache Flink 任务。如果要取消此类任务，请前往 Apache Flink 控制面板，复制任务 ID，然后在以下示例之一中使用它。

要取消单个任务：

```
%flink.pyflink
import requests

requests.patch("https://zeppelin-flink:8082/jobs/[job_id]", verify=False)
```

要取消所有正在运行的任务：

```
%flink.pyflink
import requests

r = requests.get("https://zeppelin-flink:8082/jobs", verify=False)
jobs = r.json()['jobs']

for job in jobs:
    if (job["status"] == "RUNNING"):
        print(requests.patch("https://zeppelin-flink:8082/jobs/{}".format(job["id"]), verify=False))
```

要取消所有任务：

```
%flink.pyflink
import requests

r = requests.get("https://zeppelin-flink:8082/jobs", verify=False)
jobs = r.json()['jobs']

for job in jobs:
    requests.patch("https://zeppelin-flink:8082/jobs/{}".format(job["id"]), verify=False)
```

## 重新启动 Apache Flink 解释器
<a name="how-notbook-restarting-interpreter"></a>

在 Studio 笔记本中重新启动 Apache Flink 解释器

1. 选择屏幕右上角附近的**配置**。

1. 选择**解释器**。

1. 选择 “**重新启动**”，然后选择 **“确定”**。

# 为 Managed Service for Apache Flink 的 Studio 笔记本创建自定义 IAM 策略
<a name="how-zeppelin-appendix-iam"></a>

您通常使用托管 IAM 策略来允许您的应用程序访问依赖资源。如果您需要更好地控制应用程序的权限，则可以使用自定义 IAM policy。本节包含自定义 IAM 策略的示例。

**注意**  
在以下策略示例中，将占位符文本替换为应用程序的值。

**Topics**
+ [AWS Glue](#how-zeppelin-iam-glue)
+ [CloudWatch 日志](#how-zeppelin-iam-cw)
+ [Kinesis Streams](#how-zeppelin-iam-streams)
+ [Amazon MSK 集群](#how-zeppelin-iam-msk)

## AWS Glue
<a name="how-zeppelin-iam-glue"></a>

以下示例策略授予访问 AWS Glue 数据库的权限。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "GlueTable",
            "Effect": "Allow",
            "Action": [
                "glue:GetConnection",
                "glue:GetTable",
                "glue:GetTables",
                "glue:GetDatabase",
                "glue:CreateTable",
                "glue:UpdateTable"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:123456789012:connection/*",
                "arn:aws:glue:us-east-1:123456789012:table/<database-name>/*",
                "arn:aws:glue:us-east-1:123456789012:database/<database-name>",
                "arn:aws:glue:us-east-1:123456789012:database/hive",
                "arn:aws:glue:us-east-1:123456789012:catalog"
            ]
        },
        {
            "Sid": "GlueDatabase",
            "Effect": "Allow",
            "Action": "glue:GetDatabases",
            "Resource": "*"
        }
    ]
}
```

------

## CloudWatch 日志
<a name="how-zeppelin-iam-cw"></a>

以下策略授予访问 CloudWatch 日志的权限：

```
{
      "Sid": "ListCloudwatchLogGroups",
      "Effect": "Allow",
      "Action": [
        "logs:DescribeLogGroups"
      ],
      "Resource": [
        "arn:aws:logs:<region>:<accountId>:log-group:*"
      ]
    },
    {
      "Sid": "ListCloudwatchLogStreams",
      "Effect": "Allow",
      "Action": [
        "logs:DescribeLogStreams"
      ],
      "Resource": [
        "<logGroupArn>:log-stream:*"
      ]
    },
    {
      "Sid": "PutCloudwatchLogs",
      "Effect": "Allow",
      "Action": [
        "logs:PutLogEvents"
      ],
      "Resource": [
        "<logStreamArn>"
      ]
    }
```

**注意**  
如果您使用控制台创建应用程序，则控制台会向您的应用程序角色添加访问 CloudWatch 日志所需的策略。

## Kinesis Streams
<a name="how-zeppelin-iam-streams"></a>

您的应用程序可以使用 Kinesis Stream 作为源或目标。您的应用程序需要读取权限才能从源流中读取数据，需要写入权限才能写入目标流。

以下策略授予从用作来源的 Kinesis Stream 中进行读取的权限：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KinesisShardDiscovery",
            "Effect": "Allow",
            "Action": "kinesis:ListShards",
            "Resource": "*"
        },
        {
            "Sid": "KinesisShardConsumption",
            "Effect": "Allow",
            "Action": [
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer",
                "kinesis:DeregisterStreamConsumer"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>"
        },
        {
            "Sid": "KinesisEfoConsumer",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStreamConsumer",
                "kinesis:SubscribeToShard"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>/consumer/*"
        }
    ]
}
```

------

以下策略授予写入用作目标的 Kinesis Stream 的权限：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KinesisStreamSink",
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:DescribeStreamSummary",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>"
        }
    ]
}
```

------

如果您的应用程序访问加密的 Kinesis 流，则必须授予访问该流的额外权限和该流的加密密钥。

以下策略授予访问加密源流的权限和直播的加密密钥：

```
{
      "Sid": "ReadEncryptedKinesisStreamSource",
      "Effect": "Allow",
      "Action": [
        "kms:Decrypt"
      ],
      "Resource": [
        "<inputStreamKeyArn>"
      ]
    }
    ,
```

以下策略授予访问加密目标流的权限和直播的加密密钥：

```
{
      "Sid": "WriteEncryptedKinesisStreamSink",
      "Effect": "Allow",
      "Action": [
        "kms:GenerateDataKey"
      ],
      "Resource": [
        "<outputStreamKeyArn>"
      ]
    }
```

## Amazon MSK 集群
<a name="how-zeppelin-iam-msk"></a>

要授予对 Amazon MSK 集群的访问权限，您需要向该集群的 VPC 授予访问权限。有关访问 Amazon VPC 的策略示例，请参阅 [VPC 应用程序权限](https://docs.aws.amazon.com/managed-flink/latest/java/vpc-permissions.html)。