

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# MSK Connect 入門
<a name="msk-connect-getting-started"></a>

這是step-by-step教學課程，使用 AWS 管理主控台 建立 MSK 叢集，以及將資料從叢集傳送至 S3 儲存貯體的接收器連接器。

**Topics**
+ [設定 MSK Connect 所需的資源](mkc-tutorial-setup.md)
+ [建立自訂外掛程式](mkc-create-plugin.md)
+ [建立用戶端機器和 Apache Kafka 主題](mkc-create-topic.md)
+ [建立連接器](mkc-create-connector.md)
+ [將資料傳送至 MSK 叢集](mkc-send-data.md)

# 設定 MSK Connect 所需的資源
<a name="mkc-tutorial-setup"></a>

在此步驟中，您需要建立以下此開始使用案例中所需的資源：
+ Amazon S3 儲存貯體，做為從連接器接收資料的目的地。
+ MSK 叢集，作為接收傳送資料的目的地。接著，連接器會從此叢集讀取資料，並將其傳送至目的地 S3 儲存貯體。
+ IAM 政策，其中包含寫入目的地 S3 儲存貯體的許可。
+ IAM 角色，允許連接器寫入目的地 S3 儲存貯體。您將新增您建立的 IAM 政策至此角色。
+ Amazon VPC 端點，讓您可從具有叢集和連接器的 Amazon VPC 將資料傳送至 Amazon S3。

**建立 S3 儲存貯體**

1. 登入 AWS 管理主控台 ，並在 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)：// 開啟 Amazon S3 主控台。

1. 選擇**建立儲存貯體**。

1. 請為儲存貯體名稱輸入描述性名稱，例如 `amzn-s3-demo-bucket-mkc-tutorial`。

1. 向下捲動並選擇**建立儲存貯體**。

1. 在儲存貯體清單中，選擇您新建立的儲存貯體。

1. 選擇 **Create folder** (建立資料夾)。

1. 輸入 `tutorial` 作為資料夾名稱，然後向下捲動並選擇**建立資料夾**。

**建立叢集**

1. 開啟 Amazon MSK 主控台，網址為 [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)。

1. 在左窗格的 **MSK 叢集**下，選擇**叢集**。

1. 選擇 **Create Cluster** (建立叢集)。

1. 在**建立方法**中，選擇**自訂建立**。

1. 針對叢集名稱，請輸入 **mkc-tutorial-cluster**。

1. 在**叢集類型**中，選擇**佈建**。

1. 選擇**下一步**。

1. 在**聯網**下，請選擇 Amazon VPC。然後請選取您要使用的可用區域和子網路。請記住您選取的 Amazon VPC 和子網路的 ID，因為稍後在本教學課程中您將需要這些 ID。

1. 選擇**下一步**。

1. 在**存取控制方法**下，請確認僅選取**未身分驗證的存取**。

1. 在**加密**下，請確認僅選取**純文字**。

1. 繼續執行精靈，然後選擇**建立叢集**。這會帶您前往叢集的詳細資訊頁面。在該頁面的**已套用的安全群組**下，找到安全群組 ID。記住該 ID，因為稍後在本教學可課程中您將需要該 ID。

**建立具有寫入 S3 儲存貯體許可的 IAM 政策**

1. 前往 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 開啟 IAM 主控台。

1. 在導覽窗格中，選擇**政策**。

1. 選擇**建立政策**。

1. 在**政策編輯器**中，選擇 **JSON**，然後使用下列 JSON 取代編輯器視窗中的 JSON。

   在下列範例中，將 *<amzn-s3-demo-bucket-my-tutorial>* 取代為 S3 儲存貯體的名稱。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Sid": "AllowListBucket",
         "Effect": "Allow",
         "Action": [
           "s3:ListBucket",
           "s3:GetBucketLocation"
         ],
         "Resource": "arn:aws:s3:::<amzn-s3-demo-bucket-my-tutorial>"
       },
       {
         "Sid": "AllowObjectActions",
         "Effect": "Allow",
         "Action": [
           "s3:PutObject",
           "s3:GetObject",
           "s3:DeleteObject",
           "s3:AbortMultipartUpload",
           "s3:ListMultipartUploadParts",
           "s3:ListBucketMultipartUploads"
         ],
         "Resource": "arn:aws:s3:::<amzn-s3-demo-bucket-my-tutorial>/*"
       }
     ]
   }
   ```

------

   如需如何撰寫安全政策的說明，請參閱 [IAM 存取控制](iam-access-control.md)。

1. 選擇**下一步**。

1. 在**檢閱和建立**頁面，執行以下作業：

   1. 針對**政策名稱**，輸入描述性名稱，例如 **mkc-tutorial-policy**。

   1. 在此**政策中定義的許可**中，檢閱和/或編輯政策中定義的許可。

   1. （選用） 若要協助識別、組織或搜尋政策，請選擇**新增標籤**，將標籤新增為鍵/值對。例如，使用 **Environment**和 的鍵值對將標籤新增至您的政策**Test**。

      如需使用標籤的詳細資訊，請參閱《*IAM 使用者指南*》中的[AWS Identity and Access Management 資源的標籤](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html)。

1. 選擇**建立政策**。

**建立可寫入目的地儲存貯體的IAM 角色**

1. 在 IAM 主控台的導覽窗格中，選擇**角色**，然後選擇**建立角色**。

1. 在 **Select trusted entity** (選取信任的實體) 頁面上，執行以下作業：

   1. 對於 **Trusted entity type** (信任的實體類型)，請選擇 **AWS 服務**。

   1. 針對**服務或使用案例**，選擇 **S3**。

   1. 在**使用案例中**，選擇 **S3**。

1. 選擇**下一步**。

1. 在 **Add permissions** (新增許可) 頁面上，執行以下作業：

   1. 在**許可政策**下的搜尋方塊中，輸入您先前為此教學課程建立的政策名稱。例如，**mkc-tutorial-policy**。然後，選擇政策名稱左側的方塊。

   1. (選用) 設定[許可界限](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_boundaries.html)。這是進階功能，可用於服務角色，而不是服務連結的角色。如需有關設定許可界限的資訊，請參閱《*IAM 使用者指南*》中的[建立角色和連接政策 （主控台）](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_job-functions_create-policies.html)。

1. 選擇**下一步**。

1. 在 **Name, review, and create** (命名、檢閱和建立) 頁面上，執行以下作業：

   1. 針對**角色名稱**，輸入描述性名稱，例如 **mkc-tutorial-role**。
**重要**  
當您命名角色時，請注意下列事項：  
角色名稱在您的 中必須是唯一的 AWS 帳戶，而且無法依大小寫設為唯一。  
例如，不要同時建立名為 **PRODROLE** 和 **prodrole** 的角色。當角色名稱用於政策或 ARN 的一部分時，角色名稱會區分大小寫，但是當角色名稱在主控台中顯示給客戶時，例如在登入過程中，角色名稱不會區分大小寫。
因為其他實體可能會參考角色，所以在建立角色之後，就無法編輯其名稱。

   1. (選用) 在**說明**中，輸入角色的說明。

   1. （選用） 若要編輯角色的使用案例和許可，請在**步驟 1：選取信任的實體**或**步驟 2：新增許可**區段中，選擇**編輯**。

   1. （選用） 若要協助識別、組織或搜尋角色，請選擇**新增標籤**，將標籤新增為鍵/值對。例如，使用 **ProductManager**和 的鍵值對將標籤新增至您的角色**John**。

      如需使用標籤的詳細資訊，請參閱《*IAM 使用者指南*》中的[AWS Identity and Access Management 資源的標籤](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html)。

1. 檢閱角色，然後選擇 **Create role** (建立角色)。

**允許 MSK Connect 擔任該角色**

1. 在 IAM 主控台中，於左窗格的**存取管理**下，選擇**角色**。

1. 找到 `mkc-tutorial-role` 並選擇。

1. 在該角色的**摘要**下，選擇**信任關係**索引標籤。

1. 選擇**編輯信任關係**。

1. 使用以下 JSON 來取代現有信任政策。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "kafkaconnect.amazonaws.com"
         },
         "Action": "sts:AssumeRole"
       }
     ]
   }
   ```

------

1. 選擇 **Update Trust Policy** (更新信任政策)。

**建立從叢集的 VPC 到 Amazon S3 的 Amazon VPC 端點**

1. 在 [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 開啟 Amazon VPC 主控台。

1. 在左側窗格中選擇**端點**。

1. 選擇**建立端點**。

1. 在**服務名稱**下，選擇 **com.amazonaws.us-east-1.s3** 服務和**閘道**類型。

1. 選擇叢集的 VPC，然後選取與叢集子網路相關聯之路由表左側的方塊。

1. 選擇**建立端點**。

**後續步驟**

[建立自訂外掛程式](mkc-create-plugin.md)

# 建立自訂外掛程式
<a name="mkc-create-plugin"></a>

外掛程式包含定義連接器邏輯的程式碼。在此步驟中，您會建立具有 Lenses Amazon S3 目的地連接器程式碼的自訂外掛程式。在稍後的步驟中，當您建立 MSK 連接器時，您需說明其程式碼位於此自訂外掛程式中。您可以使用相同外掛程式來建立具有不同組態的多個 MSK 連接器。

**建立自訂外掛程式**

1. 下載 [S3 連接器](https://www.confluent.io/hub/confluentinc/kafka-connect-s3)。

1. 上傳 ZIP 檔案至您有權存取的 S3 儲存貯體。如需有關如何將檔案上傳到 Amazon S3 的詳細資訊，請參閱《Amazon S3 使用者指南》中的[上傳物件](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html)。

1. 開啟位於 [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) 的 Amazon MSK 主控台。

1. 在左窗格中展開 **MSK Connect**，然後選擇**自訂外掛程式**。

1. 選擇**建立自訂外掛程式**。

1. 選擇 **Browse S3** (瀏覽 S3)。

1. 在儲存貯體清單中，找到您上傳 ZIP 檔案的儲存貯體，然後選擇該儲存貯體。

1. 在儲存貯體內的物件清單中，選取 ZIP 檔案左側的選項按鈕，然後選擇標示為**選擇**的按鈕。

1. 輸入 `mkc-tutorial-plugin` 作為自訂外掛程式名稱，然後選擇**建立自訂外掛程式**。

建立自訂外掛程式可能需要 AWS 幾分鐘的時間。在建立程序完成後，您會在瀏覽器視窗頂端的橫幅中看到以下訊息。

```
Custom plugin mkc-tutorial-plugin was successfully created
The custom plugin was created. You can now create a connector using this custom plugin.
```

**後續步驟**

[建立用戶端機器和 Apache Kafka 主題](mkc-create-topic.md)

# 建立用戶端機器和 Apache Kafka 主題
<a name="mkc-create-topic"></a>

在此步驟中，您會建立 Amazon EC2 執行個體，以用作 Apache Kafka 用戶端執行個體。然後，您可使用此執行個體在叢集上建立主題。

**建立用戶端機器**

1. 前往 [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) 開啟 Amazon EC2 主控台。

1. 選擇**啟動執行個體**。

1. 輸入用戶端機器的**名稱**，例如 **mkc-tutorial-client**。

1. 確認已選擇 **Amazon Linux 2 AMI (HVM) – Kernel 5.10，SSD 磁碟區類型**作為 **Amazon Machine Image (AMI) 類型**。

1. 選擇 **t2.xlarge** 執行個體類型。

1. 在**金鑰對 (登入)** 下，選擇**建立新金鑰對**。輸入 **mkc-tutorial-key-pair** 作為**金鑰對名稱**，然後選擇**下載金鑰對**。或者，您也可以使用現有的金鑰對。

1. 選擇**啟動執行個體**。

1. 選擇**檢視執行個體**。然後，在**安全群組**資料欄中，選擇與新執行個體相關聯的安全群組。複製並儲存安全群組的 ID，以供日後使用。

**允許新建立的用戶端將資料傳送至叢集**

1. 在 [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 開啟 Amazon VPC 主控台。

1. 在左窗格的**安全**下，選擇**安全群組**。在**安全群組 ID** 資料欄中，尋找叢集的安全群組。在 [設定 MSK Connect 所需的資源](mkc-tutorial-setup.md) 中建立叢集後，便已儲存此安全群組的 ID。請選取安全群組該列左側的方塊，以選擇此安全群組。請確認並未同時選取其他安全群組。

1. 在畫面下半部中，選擇**傳入規則**索引標籤。

1. 選擇 **Edit inbound Rules** (編輯傳入規則)。

1. 在畫面左下方，選擇**新增規則**。

1. 在新規則中，於**類型**資料欄中選擇**所有流量**。在**來源**資料欄的右側欄位中，輸入用戶端機器安全群組的 ID。這是您在建立用戶端機器後儲存的安全群組 ID。

1. 選擇**儲存規則**。MSK 叢集現在可接受您在上一個程序中建立之用戶端的所有流量。

**若要建立主題**

1. 前往 [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) 開啟 Amazon EC2 主控台。

1. 在執行個體表格中選擇 `mkc-tutorial-client`。

1. 在畫面頂端附近選擇**連線**，然後依照指示連線至執行個體。

1. 執行以下命令，在用戶端執行個體上安裝 Java：

   ```
   sudo yum install java-1.8.0
   ```

1. 執行下列命令下載 Apache Kafka。

   ```
   wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz
   ```
**注意**  
如果您想要使用此命令中以外的鏡像網站，您可以在 [Apache](https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.11-1.1.1.tgz) 網站上選擇不同的鏡像網站。

1. 在您在先前步驟中下載 TAR 檔案的目錄中執行下列命令。

   ```
   tar -xzf kafka_2.12-2.2.1.tgz
   ```

1. 前往 **kafka\$12.12-2.2.1** 目錄。

1. 開啟 Amazon MSK 主控台，網址為 [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)。

1. 在左窗格中選擇**叢集**，然後選擇名稱 `mkc-tutorial-cluster`。

1. 選擇**檢視用戶端資訊**。

1. 複製**純文字**連線字串。

1. 選擇**完成**。

1. 在用戶端執行個體 (`mkc-tutorial-client`) 上執行以下命令，使用您檢視叢集用戶端資訊時所儲存的值來取代 *bootstrapServerString*。

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server bootstrapServerString --replication-factor 2 --partitions 1 --topic mkc-tutorial-topic
   ```

   如果命令成功，您會看到以下訊息：`Created topic mkc-tutorial-topic.`

**後續步驟**

[建立連接器](mkc-create-connector.md)

# 建立連接器
<a name="mkc-create-connector"></a>

此程序說明如何使用 建立連接器 AWS 管理主控台。

**建立連接器**

1. 登入 AWS 管理主控台，並在 [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)：// 開啟 Amazon MSK 主控台。

1. 在左窗格中展開 **MSK Connect**，然後選擇**連接器**。

1. 選擇 **Create connector (建立連接器)**。

1. 在外掛程式清單中，選擇 `mkc-tutorial-plugin`，然後選擇**下一步**。

1. 針對連接器名稱，請輸入 `mkc-tutorial-connector`。

1. 在叢集清單中，選擇 `mkc-tutorial-cluster`。

1. 在**連接器網路設定**區段中，針對網路類型選擇下列其中一項：
   + **IPv4** （預設） - 僅適用於透過 IPv4 連線至目的地
   + **雙堆疊** - 用於透過 IPv4 和 IPv6 連線至目的地 （僅當子網路具有與其相關聯的 IPv4 和 IPv6 CIDR 區塊時可用）

1. 複製以下組態並貼入連接器組態欄位。

   請務必將區域取代為您建立連接器之 AWS 區域 的程式碼。此外，在下列範例中，將 Amazon S3 儲存貯體名稱 *<amzn-s3-demo-bucket-my-tutorial>* 取代為您的儲存貯體名稱。

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   s3.region=us-east-1
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   flush.size=1
   schema.compatibility=NONE
   tasks.max=2
   topics=mkc-tutorial-topic
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   storage.class=io.confluent.connect.s3.storage.S3Storage
   s3.bucket.name=<amzn-s3-demo-bucket-my-tutorial>
   topics.dir=tutorial
   ```

1. 在**存取許可**下選擇 `mkc-tutorial-role`。

1. 選擇**下一步**。在**安全**頁面上，再次選擇**下一步**。

1. 在**日誌**頁面上，選擇**下一步**。

1. 在**檢閱和建立**頁面上，檢閱您的連接器組態，然後選擇**建立連接器**。

**後續步驟**

[將資料傳送至 MSK 叢集](mkc-send-data.md)

# 將資料傳送至 MSK 叢集
<a name="mkc-send-data"></a>

在此步驟中，您會將資料傳送至先前建立的 Apache Kafka 主題，然後在目的地 S3 儲存貯體中尋找同一項資料。

**傳送資料至 MSK 叢集**

1. 在用戶端執行個體上安裝 Apache Kafka 的 `bin` 資料夾中，建立一個名為 `client.properties` 且具有以下內容的文字檔案。

   ```
   security.protocol=SASL_SSL
   sasl.mechanism=AWS_MSK_IAM
   ```

1. 執行下列命令以建立主控台生產者。使用您在執行先前命令時所取得的值來取代 *BootstrapBrokerString*。

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-producer.sh --broker-list BootstrapBrokerString --producer.config client.properties --topic mkc-tutorial-topic
   ```

1. 輸入您想要的任何訊息，然後按 **Enter** 鍵。重複此步驟兩次或三次。每次輸入一行，然後按 **Enter**，該行會作為單獨訊息傳送到您的 Apache Kafka 叢集中。

1. 在目的地 Amazon S3 儲存貯體中查看，尋找您在上一個步驟中傳送的訊息。