

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 了解 MSK Connect
<a name="msk-connect"></a>

MSK Connect 是 Amazon MSK 的一項功能，可讓開發人員輕鬆將資料串流至或串流出 Apache Kafka 叢集。MSK Connect 使用 Kafka Connect 2.7.1 或 3.7.x 版，這是將 Apache Kafka 叢集與資料庫、搜尋索引和檔案系統等外部系統連線的開放原始碼架構。您可以利用 MSK Connect 來部署專為 Kafka Connect 建立的全受管連接器，將資料移入 Amazon S3 和 Amazon OpenSearch Service 等熱門資料存放區，或從這類熱門資料存放區中提取資料。您可以部署由第三方 (例如 Debezium) 開發的連接器，用於將變更日誌從資料庫串流至 Apache Kafka 叢集，或在不需變更程式碼的情況下部署現有連接器。連接器會自動擴展以適應負載的變化，您僅需按照實際使用的資源量付費。

使用來源連接器將資料從外部系統匯入至您的主題中。您可以使用目的地連接器將主題中的資料匯出至外部系統。

MSK Connect 會利用與 Amazon VPC 的連線來支援 Apache Kafka 叢集的連接器，無論叢集是 MSK 叢集還是獨立託管的 Apache Kafka 叢集皆可。

MSK Connect 會持續監控連接器運作狀態和傳送狀態、修補程式和管理基礎硬體，並自動擴展連接器以符合輸送量的變更。

若要開始使用 MSK Connect，請參閱 [MSK Connect 入門](msk-connect-getting-started.md)。

若要了解您可以使用 MSK Connect 建立 AWS 的資源，請參閱 [了解連接器](msk-connect-connectors.md)、 [建立自訂外掛程式](msk-connect-plugins.md)和 [了解 MSK Connect 工作者](msk-connect-workers.md)。

如需有關 MSK Connect API 的相關資訊，請參閱 [Amazon MSK Connect API Reference](https://docs.aws.amazon.com/MSKC/latest/mskc/Welcome.html)。

## 使用 Amazon MSK Connect 的優勢
<a name="msk-connect-benefits"></a>

Apache Kafka 是最廣為採用的開放原始碼串流平台之一，用於擷取和處理即時資料串流。使用 Apache Kafka，您可以分離和獨立擴展產生資料和耗用資料的應用程式。

Kafka Connect 是使用 Apache Kafka 建置和執行串流應用程式的重要元件。Kafka Connect 提供標準化的方式來在 Kafka 和外部系統之間移動資料。Kafka Connect 具有高度可擴展性，可以處理大量資料 Kafka Connect 提供一組強大的 API 操作和工具，用於設定、部署和監控連接器，以在 Kafka 主題和外部系統之間移動資料。您可以使用這些工具來自訂和擴展 Kafka Connect 的功能，以滿足串流應用程式的特定需求。

當您自行操作 Apache Kafka Connect 叢集，或嘗試將開放原始碼 Apache Kafka Connect 應用程式遷移到其中時，您可能會遇到挑戰 AWS。這些挑戰包括設定基礎設施和部署應用程式所需的時間、設定自我管理 Apache Kafka Connect 叢集時的工程障礙，以及管理營運開銷。

為了解決這些挑戰，我們建議您使用 Amazon Managed Streaming for Apache Kafka Connect (Amazon MSK Connect) 將開放原始碼 Apache Kafka Connect 應用程式遷移到其中 AWS。Amazon MSK Connect 使用 Kafka Connect 在 Apache Kafka 叢集和外部系統之間串流資料，例如資料庫、搜尋索引和檔案系統，以簡化程序。

以下是遷移至 Amazon MSK Connect 的一些優點：
+ **消除營運開銷** — Amazon MSK Connect 消除與修補、佈建和擴展 Apache Kafka Connect 叢集相關的營運負擔。Amazon MSK Connect 會持續監控 Connect 叢集的運作狀態，並自動化修補和版本升級，而不會造成工作負載中斷。
+ **自動重新啟動 Connect 任務** — Amazon MSK Connect 可以自動復原失敗的任務，以減少生產中斷。任務失敗可能是由暫時性錯誤造成，例如違反 Kafka 的 TCP 連線限制，以及當新工作者加入接收器連接器的取用者群組時的任務重新平衡。
+ **自動水平和垂直擴展** — Amazon MSK Connect 可讓連接器應用程式自動擴展以支援更高的輸送量。Amazon MSK Connect 會為您管理擴展。您只需要指定自動擴展群組中的工作者數量和使用率閾值。您可以使用 Amazon MSK Connect `UpdateConnector` API 操作，在 1 到 8 個 vCPUs 之間垂直擴展或縮減 vCPUs，以支援可變輸送量。
+ **私有網路連線** — Amazon MSK Connect 會使用 AWS PrivateLink 和私有 DNS 名稱私下連線至來源和接收器系統。

# MSK Connect 入門
<a name="msk-connect-getting-started"></a>

這是step-by-step教學課程，使用 AWS 管理主控台 建立 MSK 叢集，以及將資料從叢集傳送至 S3 儲存貯體的接收器連接器。

**Topics**
+ [設定 MSK Connect 所需的資源](mkc-tutorial-setup.md)
+ [建立自訂外掛程式](mkc-create-plugin.md)
+ [建立用戶端機器和 Apache Kafka 主題](mkc-create-topic.md)
+ [建立連接器](mkc-create-connector.md)
+ [將資料傳送至 MSK 叢集](mkc-send-data.md)

# 設定 MSK Connect 所需的資源
<a name="mkc-tutorial-setup"></a>

在此步驟中，您需要建立以下此開始使用案例中所需的資源：
+ Amazon S3 儲存貯體，做為從連接器接收資料的目的地。
+ MSK 叢集，作為接收傳送資料的目的地。接著，連接器會從此叢集讀取資料，並將其傳送至目的地 S3 儲存貯體。
+ IAM 政策，其中包含寫入目的地 S3 儲存貯體的許可。
+ IAM 角色，允許連接器寫入目的地 S3 儲存貯體。您將新增您建立的 IAM 政策至此角色。
+ Amazon VPC 端點，讓您可從具有叢集和連接器的 Amazon VPC 將資料傳送至 Amazon S3。

**建立 S3 儲存貯體**

1. 登入 AWS 管理主控台 ，並在 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)：// 開啟 Amazon S3 主控台。

1. 選擇**建立儲存貯體**。

1. 請為儲存貯體名稱輸入描述性名稱，例如 `amzn-s3-demo-bucket-mkc-tutorial`。

1. 向下捲動並選擇**建立儲存貯體**。

1. 在儲存貯體清單中，選擇您新建立的儲存貯體。

1. 選擇 **Create folder** (建立資料夾)。

1. 輸入 `tutorial` 作為資料夾名稱，然後向下捲動並選擇**建立資料夾**。

**建立叢集**

1. 開啟 Amazon MSK 主控台，網址為 [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)。

1. 在左窗格的 **MSK 叢集**下，選擇**叢集**。

1. 選擇 **Create Cluster** (建立叢集)。

1. 在**建立方法**中，選擇**自訂建立**。

1. 針對叢集名稱，請輸入 **mkc-tutorial-cluster**。

1. 在**叢集類型**中，選擇**佈建**。

1. 選擇**下一步**。

1. 在**聯網**下，請選擇 Amazon VPC。然後請選取您要使用的可用區域和子網路。請記住您選取的 Amazon VPC 和子網路的 ID，因為稍後在本教學課程中您將需要這些 ID。

1. 選擇**下一步**。

1. 在**存取控制方法**下，請確認僅選取**未身分驗證的存取**。

1. 在**加密**下，請確認僅選取**純文字**。

1. 繼續執行精靈，然後選擇**建立叢集**。這會帶您前往叢集的詳細資訊頁面。在該頁面的**已套用的安全群組**下，找到安全群組 ID。記住該 ID，因為稍後在本教學可課程中您將需要該 ID。

**建立具有寫入 S3 儲存貯體許可的 IAM 政策**

1. 前往 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 開啟 IAM 主控台。

1. 在導覽窗格中，選擇**政策**。

1. 選擇**建立政策**。

1. 在**政策編輯器**中，選擇 **JSON**，然後使用下列 JSON 取代編輯器視窗中的 JSON。

   在下列範例中，將 *<amzn-s3-demo-bucket-my-tutorial>* 取代為 S3 儲存貯體的名稱。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Sid": "AllowListBucket",
         "Effect": "Allow",
         "Action": [
           "s3:ListBucket",
           "s3:GetBucketLocation"
         ],
         "Resource": "arn:aws:s3:::<amzn-s3-demo-bucket-my-tutorial>"
       },
       {
         "Sid": "AllowObjectActions",
         "Effect": "Allow",
         "Action": [
           "s3:PutObject",
           "s3:GetObject",
           "s3:DeleteObject",
           "s3:AbortMultipartUpload",
           "s3:ListMultipartUploadParts",
           "s3:ListBucketMultipartUploads"
         ],
         "Resource": "arn:aws:s3:::<amzn-s3-demo-bucket-my-tutorial>/*"
       }
     ]
   }
   ```

------

   如需如何撰寫安全政策的說明，請參閱 [IAM 存取控制](iam-access-control.md)。

1. 選擇**下一步**。

1. 在**檢閱和建立**頁面，執行以下作業：

   1. 針對**政策名稱**，輸入描述性名稱，例如 **mkc-tutorial-policy**。

   1. 在此**政策中定義的許可**中，檢閱和/或編輯政策中定義的許可。

   1. （選用） 若要協助識別、組織或搜尋政策，請選擇**新增標籤**，將標籤新增為鍵/值對。例如，使用 **Environment**和 的鍵值對將標籤新增至您的政策**Test**。

      如需使用標籤的詳細資訊，請參閱《*IAM 使用者指南*》中的[AWS Identity and Access Management 資源的標籤](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html)。

1. 選擇**建立政策**。

**建立可寫入目的地儲存貯體的IAM 角色**

1. 在 IAM 主控台的導覽窗格中，選擇**角色**，然後選擇**建立角色**。

1. 在 **Select trusted entity** (選取信任的實體) 頁面上，執行以下作業：

   1. 對於 **Trusted entity type** (信任的實體類型)，請選擇 **AWS 服務**。

   1. 針對**服務或使用案例**，選擇 **S3**。

   1. 在**使用案例中**，選擇 **S3**。

1. 選擇**下一步**。

1. 在 **Add permissions** (新增許可) 頁面上，執行以下作業：

   1. 在**許可政策**下的搜尋方塊中，輸入您先前為此教學課程建立的政策名稱。例如，**mkc-tutorial-policy**。然後，選擇政策名稱左側的方塊。

   1. (選用) 設定[許可界限](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_boundaries.html)。這是進階功能，可用於服務角色，而不是服務連結的角色。如需有關設定許可界限的資訊，請參閱《*IAM 使用者指南*》中的[建立角色和連接政策 （主控台）](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_job-functions_create-policies.html)。

1. 選擇**下一步**。

1. 在 **Name, review, and create** (命名、檢閱和建立) 頁面上，執行以下作業：

   1. 針對**角色名稱**，輸入描述性名稱，例如 **mkc-tutorial-role**。
**重要**  
當您命名角色時，請注意下列事項：  
角色名稱在您的 中必須是唯一的 AWS 帳戶，而且無法依大小寫設為唯一。  
例如，不要同時建立名為 **PRODROLE** 和 **prodrole** 的角色。當角色名稱用於政策或 ARN 的一部分時，角色名稱會區分大小寫，但是當角色名稱在主控台中顯示給客戶時，例如在登入過程中，角色名稱不會區分大小寫。
因為其他實體可能會參考角色，所以在建立角色之後，就無法編輯其名稱。

   1. (選用) 在**說明**中，輸入角色的說明。

   1. （選用） 若要編輯角色的使用案例和許可，請在**步驟 1：選取信任的實體**或**步驟 2：新增許可**區段中，選擇**編輯**。

   1. （選用） 若要協助識別、組織或搜尋角色，請選擇**新增標籤**，將標籤新增為鍵/值對。例如，使用 **ProductManager**和 的鍵值對將標籤新增至您的角色**John**。

      如需使用標籤的詳細資訊，請參閱《*IAM 使用者指南*》中的[AWS Identity and Access Management 資源的標籤](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html)。

1. 檢閱角色，然後選擇 **Create role** (建立角色)。

**允許 MSK Connect 擔任該角色**

1. 在 IAM 主控台中，於左窗格的**存取管理**下，選擇**角色**。

1. 找到 `mkc-tutorial-role` 並選擇。

1. 在該角色的**摘要**下，選擇**信任關係**索引標籤。

1. 選擇**編輯信任關係**。

1. 使用以下 JSON 來取代現有信任政策。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "kafkaconnect.amazonaws.com"
         },
         "Action": "sts:AssumeRole"
       }
     ]
   }
   ```

------

1. 選擇 **Update Trust Policy** (更新信任政策)。

**建立從叢集的 VPC 到 Amazon S3 的 Amazon VPC 端點**

1. 在 [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 開啟 Amazon VPC 主控台。

1. 在左側窗格中選擇**端點**。

1. 選擇**建立端點**。

1. 在**服務名稱**下，選擇 **com.amazonaws.us-east-1.s3** 服務和**閘道**類型。

1. 選擇叢集的 VPC，然後選取與叢集子網路相關聯之路由表左側的方塊。

1. 選擇**建立端點**。

**後續步驟**

[建立自訂外掛程式](mkc-create-plugin.md)

# 建立自訂外掛程式
<a name="mkc-create-plugin"></a>

外掛程式包含定義連接器邏輯的程式碼。在此步驟中，您會建立具有 Lenses Amazon S3 目的地連接器程式碼的自訂外掛程式。在稍後的步驟中，當您建立 MSK 連接器時，您需說明其程式碼位於此自訂外掛程式中。您可以使用相同外掛程式來建立具有不同組態的多個 MSK 連接器。

**建立自訂外掛程式**

1. 下載 [S3 連接器](https://www.confluent.io/hub/confluentinc/kafka-connect-s3)。

1. 上傳 ZIP 檔案至您有權存取的 S3 儲存貯體。如需有關如何將檔案上傳到 Amazon S3 的詳細資訊，請參閱《Amazon S3 使用者指南》中的[上傳物件](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html)。

1. 開啟位於 [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) 的 Amazon MSK 主控台。

1. 在左窗格中展開 **MSK Connect**，然後選擇**自訂外掛程式**。

1. 選擇**建立自訂外掛程式**。

1. 選擇 **Browse S3** (瀏覽 S3)。

1. 在儲存貯體清單中，找到您上傳 ZIP 檔案的儲存貯體，然後選擇該儲存貯體。

1. 在儲存貯體內的物件清單中，選取 ZIP 檔案左側的選項按鈕，然後選擇標示為**選擇**的按鈕。

1. 輸入 `mkc-tutorial-plugin` 作為自訂外掛程式名稱，然後選擇**建立自訂外掛程式**。

建立自訂外掛程式可能需要 AWS 幾分鐘的時間。在建立程序完成後，您會在瀏覽器視窗頂端的橫幅中看到以下訊息。

```
Custom plugin mkc-tutorial-plugin was successfully created
The custom plugin was created. You can now create a connector using this custom plugin.
```

**後續步驟**

[建立用戶端機器和 Apache Kafka 主題](mkc-create-topic.md)

# 建立用戶端機器和 Apache Kafka 主題
<a name="mkc-create-topic"></a>

在此步驟中，您會建立 Amazon EC2 執行個體，以用作 Apache Kafka 用戶端執行個體。然後，您可使用此執行個體在叢集上建立主題。

**建立用戶端機器**

1. 前往 [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) 開啟 Amazon EC2 主控台。

1. 選擇**啟動執行個體**。

1. 輸入用戶端機器的**名稱**，例如 **mkc-tutorial-client**。

1. 確認已選擇 **Amazon Linux 2 AMI (HVM) – Kernel 5.10，SSD 磁碟區類型**作為 **Amazon Machine Image (AMI) 類型**。

1. 選擇 **t2.xlarge** 執行個體類型。

1. 在**金鑰對 (登入)** 下，選擇**建立新金鑰對**。輸入 **mkc-tutorial-key-pair** 作為**金鑰對名稱**，然後選擇**下載金鑰對**。或者，您也可以使用現有的金鑰對。

1. 選擇**啟動執行個體**。

1. 選擇**檢視執行個體**。然後，在**安全群組**資料欄中，選擇與新執行個體相關聯的安全群組。複製並儲存安全群組的 ID，以供日後使用。

**允許新建立的用戶端將資料傳送至叢集**

1. 在 [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 開啟 Amazon VPC 主控台。

1. 在左窗格的**安全**下，選擇**安全群組**。在**安全群組 ID** 資料欄中，尋找叢集的安全群組。在 [設定 MSK Connect 所需的資源](mkc-tutorial-setup.md) 中建立叢集後，便已儲存此安全群組的 ID。請選取安全群組該列左側的方塊，以選擇此安全群組。請確認並未同時選取其他安全群組。

1. 在畫面下半部中，選擇**傳入規則**索引標籤。

1. 選擇 **Edit inbound Rules** (編輯傳入規則)。

1. 在畫面左下方，選擇**新增規則**。

1. 在新規則中，於**類型**資料欄中選擇**所有流量**。在**來源**資料欄的右側欄位中，輸入用戶端機器安全群組的 ID。這是您在建立用戶端機器後儲存的安全群組 ID。

1. 選擇**儲存規則**。MSK 叢集現在可接受您在上一個程序中建立之用戶端的所有流量。

**若要建立主題**

1. 前往 [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) 開啟 Amazon EC2 主控台。

1. 在執行個體表格中選擇 `mkc-tutorial-client`。

1. 在畫面頂端附近選擇**連線**，然後依照指示連線至執行個體。

1. 執行以下命令，在用戶端執行個體上安裝 Java：

   ```
   sudo yum install java-1.8.0
   ```

1. 執行下列命令下載 Apache Kafka。

   ```
   wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz
   ```
**注意**  
如果您想要使用此命令中以外的鏡像網站，您可以在 [Apache](https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.11-1.1.1.tgz) 網站上選擇不同的鏡像網站。

1. 在您在先前步驟中下載 TAR 檔案的目錄中執行下列命令。

   ```
   tar -xzf kafka_2.12-2.2.1.tgz
   ```

1. 前往 **kafka\$12.12-2.2.1** 目錄。

1. 開啟 Amazon MSK 主控台，網址為 [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)。

1. 在左窗格中選擇**叢集**，然後選擇名稱 `mkc-tutorial-cluster`。

1. 選擇**檢視用戶端資訊**。

1. 複製**純文字**連線字串。

1. 選擇**完成**。

1. 在用戶端執行個體 (`mkc-tutorial-client`) 上執行以下命令，使用您檢視叢集用戶端資訊時所儲存的值來取代 *bootstrapServerString*。

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server bootstrapServerString --replication-factor 2 --partitions 1 --topic mkc-tutorial-topic
   ```

   如果命令成功，您會看到以下訊息：`Created topic mkc-tutorial-topic.`

**後續步驟**

[建立連接器](mkc-create-connector.md)

# 建立連接器
<a name="mkc-create-connector"></a>

此程序說明如何使用 建立連接器 AWS 管理主控台。

**建立連接器**

1. 登入 AWS 管理主控台，並在 [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)：// 開啟 Amazon MSK 主控台。

1. 在左窗格中展開 **MSK Connect**，然後選擇**連接器**。

1. 選擇 **Create connector (建立連接器)**。

1. 在外掛程式清單中，選擇 `mkc-tutorial-plugin`，然後選擇**下一步**。

1. 針對連接器名稱，請輸入 `mkc-tutorial-connector`。

1. 在叢集清單中，選擇 `mkc-tutorial-cluster`。

1. 在**連接器網路設定**區段中，針對網路類型選擇下列其中一項：
   + **IPv4** （預設） - 僅適用於透過 IPv4 連線至目的地
   + **雙堆疊** - 用於透過 IPv4 和 IPv6 連線至目的地 （僅當子網路具有與其相關聯的 IPv4 和 IPv6 CIDR 區塊時可用）

1. 複製以下組態並貼入連接器組態欄位。

   請務必將區域取代為您建立連接器之 AWS 區域 的程式碼。此外，在下列範例中，將 Amazon S3 儲存貯體名稱 *<amzn-s3-demo-bucket-my-tutorial>* 取代為您的儲存貯體名稱。

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   s3.region=us-east-1
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   flush.size=1
   schema.compatibility=NONE
   tasks.max=2
   topics=mkc-tutorial-topic
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   storage.class=io.confluent.connect.s3.storage.S3Storage
   s3.bucket.name=<amzn-s3-demo-bucket-my-tutorial>
   topics.dir=tutorial
   ```

1. 在**存取許可**下選擇 `mkc-tutorial-role`。

1. 選擇**下一步**。在**安全**頁面上，再次選擇**下一步**。

1. 在**日誌**頁面上，選擇**下一步**。

1. 在**檢閱和建立**頁面上，檢閱您的連接器組態，然後選擇**建立連接器**。

**後續步驟**

[將資料傳送至 MSK 叢集](mkc-send-data.md)

# 將資料傳送至 MSK 叢集
<a name="mkc-send-data"></a>

在此步驟中，您會將資料傳送至先前建立的 Apache Kafka 主題，然後在目的地 S3 儲存貯體中尋找同一項資料。

**傳送資料至 MSK 叢集**

1. 在用戶端執行個體上安裝 Apache Kafka 的 `bin` 資料夾中，建立一個名為 `client.properties` 且具有以下內容的文字檔案。

   ```
   security.protocol=SASL_SSL
   sasl.mechanism=AWS_MSK_IAM
   ```

1. 執行下列命令以建立主控台生產者。使用您在執行先前命令時所取得的值來取代 *BootstrapBrokerString*。

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-producer.sh --broker-list BootstrapBrokerString --producer.config client.properties --topic mkc-tutorial-topic
   ```

1. 輸入您想要的任何訊息，然後按 **Enter** 鍵。重複此步驟兩次或三次。每次輸入一行，然後按 **Enter**，該行會作為單獨訊息傳送到您的 Apache Kafka 叢集中。

1. 在目的地 Amazon S3 儲存貯體中查看，尋找您在上一個步驟中傳送的訊息。

# 了解連接器
<a name="msk-connect-connectors"></a>

連接器會將外部系統和 Amazon 服務與 Apache Kafka 整合，方法是持續將資料來源的串流資料複製到 Apache Kafka 叢集中，或持續將叢集中的資料複製到資料目的地中。在將資料傳送至目的地之前，連接器也可執行輕量型邏輯，例如轉換、格式轉換或篩選資料。來源連接器會從資料來源提取資料，並將此資料推送至叢集中，同時，目的地連接器則會從叢集提取資料，並將此資料推送至資料目的地中。

以下圖表說明連接器的架構。工作程序是執行連接器邏輯的 Java 虛擬機器 (JVM) 程序。每個工作程序皆會建立一組在平行執行緒中執行的任務，並執行複製資料的工作。任務不會存放狀態，因此可以隨時啟動、停止或重新啟動，以提供彈性且可擴展的資料管道。

![\[顯示連接器叢集之架構的圖表。\]](http://docs.aws.amazon.com/zh_tw/msk/latest/developerguide/images/mkc-worker-architecture.png)


# 了解連接器容量
<a name="msk-connect-capacity"></a>

連接器的總容量將依該連接器具有的工作程序數量，以及每個工作程序的 MSK Connect 單位 (MCU) 數量而定。一個 MCU 代表 1 個 vCPU 的運算和 4 GiB 的記憶體。MCU 記憶體與工作程序執行個體的總記憶體有關，而非與使用中的堆積記憶體相關。

MSK Connect 工作者會使用客戶提供的子網路中的 IP 地址。每個工作者都會從其中一個客戶提供的子網路使用一個 IP 地址。您應該確保在提供給 CreateConnector 請求的子網路中有足夠的可用 IP 地址，以考慮其指定的容量，尤其是在自動擴展連接器時，工作者數量可能會波動。

若要建立連接器，您必須選擇以下兩種容量模式之一。
+ *佈建* - 若您知道連接器的容量需求，請選擇此模式。您可以指定兩個值：
  + 工作程序數量。
  + 每個工作程序的 MCU 數目。
+ *自動擴展* - 若連接器的容量需求可變，或者您事先不知道容量需求，請選擇此模式。在您使用自動擴展模式時，Amazon MSK Connect 會使用與連接器中執行的工作程序數量以及每個工作程序的 MCU 數量成比例的值，來覆寫連接器的 `tasks.max` 屬性。

  您可以指定三組值：
  + 工作程序數量下限和上限。
  + CP 使用率的縮減和橫向擴展百分比，依據 `CpuUtilization` 指標決定。在連接器的 `CpuUtilization` 指標超過橫向擴展百分比時，MSK Connect 會增加連接器中執行的工作程序數量。在 `CpuUtilization` 指標低於縮減百分比時，MSK Connect 會減少工作程序的數量。工作程序的數目會永遠維持在您建立連接器時指定的數量下限和上限之間。
  + 每個工作程序的 MCU 數目。
  + （選用） *自動擴展任務計數上限* - 在自動擴展操作期間配置給連接器的任務數量上限。此參數可讓您設定任務建立的上限，以便更好地控制與 Kafka 主題分割區相關的資源使用率和平行處理。

如需工作者的詳細資訊，請參閱 [了解 MSK Connect 工作者](msk-connect-workers.md)，如需自動擴展任務計數上限的詳細資訊，請參閱 [了解自動擴展任務計數上限](msk-connect-max-autoscaling-task-count.md)。若要了解 MSK Connect 指標，請參閱 [監控 Amazon MSK Connect](mkc-monitoring-overview.md)。

# 了解自動擴展任務計數上限
<a name="msk-connect-max-autoscaling-task-count"></a>

`maxAutoscalingTaskCount` 參數是選用的容量欄位，可用於 Amazon MSK Connect 中的自動擴展連接器。此參數可讓您針對連接器自動調整規模操作期間可建立的任務數量上限設定上限，以便更好地控制資源使用率和效能。

當您使用自動擴展容量模式時，Amazon MSK Connect 會自動以與每個工作者的工作者和 MCUs 數量成比例的值覆寫連接器的`tasks.max`屬性。`maxAutoscalingTaskCount` 參數提供額外的可設定選項，以限制為連接器建立的任務數量上限。

當您想要控制與 Kafka 叢集中主題分割區數量相關的平行處理層級時，此功能特別有用。透過設定此限制，您可以最佳化效能，並防止在自動計算的任務計數超過工作負載需求時可能發生的效率低下任務分佈。

## 組態需求
<a name="msk-connect-max-autoscaling-task-count-requirements"></a>

`maxAutoscalingTaskCount` 參數必須符合下列需求：

```
maxAutoscalingTaskCount ≥ maxWorkerCount
```

此需求會維護每個工作者至少一項任務，以確保有效率的資源使用率。系統會強制執行此最小值，以最佳化連接器功能。

當您指定 時`maxAutoscalingTaskCount`，限制會在連接器建立時以及所有後續擴展事件期間立即套用。隨著工作者數量在自動擴展操作期間增加或減少，系統會繼續遵守此限制。此`tasks.max`值會根據每個工作者的工作者和 MCUs 數量按比例調整，但絕不會超過設定`maxAutoscalingTaskCount`的值。

如果您未指定此參數，連接器會使用無任何限制的標準計算：`tasks.max = workerCount × mcuCount × tasksPerMcu`（其中 tasksPerMcu 為 2)。

## 何時使用 maxAutoscalingTaskCount
<a name="msk-connect-max-autoscaling-task-count-when-to-use"></a>

在下列案例中考慮使用 `maxAutoscalingTaskCount` ：
+ *有限分割區計數*：當您的 Kafka 主題具有低於自動計算任務計數的固定分割區數量時，設定限制會阻止建立閒置任務，而不需要執行任何工作。
+ *效能最佳化*：當您確定特定任務計數為您的工作負載提供最佳輸送量時，您可以限制任務上限，以維持一致的效能。
+ *資源管理*：當您想要控制連接器的最大平行處理和資源耗用量時，無論有多少工作者正在執行。

## 範例
<a name="msk-connect-max-autoscaling-task-count-example"></a>

對於具有下列組態的連接器：

```
minWorkerCount: 1
maxWorkerCount: 4
mcuCount: 8
maxAutoscalingTaskCount: 15
```

如果沒有 `maxAutoscalingTaskCount`，當擴展到 4 個工作者時，連接器會建立 64 個任務 (4 個工作者 × 8 MCUs × 每個 MCU 2 個任務）。將 `maxAutoscalingTaskCount`設為 15 時，連接器只會建立 15 個任務，如果您的 Kafka 主題具有 15 個或更少的分割區，則可能更合適。

# 設定雙堆疊網路類型
<a name="msk-connect-dual-stack"></a>

Amazon MSK Connect 支援新連接器的雙堆疊網路類型。透過雙堆疊聯網，您的連接器可以透過 IPv4 和 IPv6 連線到目的地。請注意，IPv6 連線僅適用於雙堆疊模式 (IPv4 \$1 IPv6) - 不支援IPv6-only的聯網。

根據預設，新連接器會使用 IPv4 網路類型。若要建立具有雙堆疊網路類型的連接器，請確定您已滿足下節所述的先決條件。請注意，一旦使用雙堆疊網路類型建立連接器，您就無法修改其網路類型。若要變更網路類型，您必須刪除並重新建立連接器。

Amazon MSK Connect 也支援透過 IPv6 和 IPv4 的服務 API 端點連線。若要使用 IPv6 連線進行 API 呼叫，您需要使用雙堆疊端點。如需 MSK Connect 服務端點的詳細資訊，請參閱 [Amazon MSK Connect 端點和配額](https://docs.aws.amazon.com/general/latest/gr/msk-connect.html)。

## 使用雙堆疊網路類型的先決條件
<a name="dual-stack-prerequisites"></a>

為連接器設定雙堆疊網路類型之前，請確定您在連接器建立期間提供的所有子網路都已指派 IPv6 和 IPv4 CIDR 區塊。

## 使用雙堆疊網路類型的考量
<a name="dual-stack-considerations"></a>
+ IPv6 支援目前僅在雙堆疊模式 (IPv4 \$1 IPv6) 中可用，而不是IPv6-only
+ 啟用雙堆疊的連接器可以透過 IPv4 和 IPv6 連線到 MSK 和接收器或來源資料系統
+ 建立連接器後無法修改網路類型 - 您必須刪除並重新建立連接器以變更網路類型
+ 連接器建立期間指定的所有子網路都必須支援雙堆疊，才能建立連接器，以成功使用雙堆疊網路類型
+ 如果使用雙堆疊子網路，但未指定網路類型，連接器會預設為IPv4-only以確保回溯相容性
+ 對於現有的連接器，您無法更新網路類型 - 您必須刪除並重新建立連接器以變更網路類型
+ 使用雙堆疊聯網不會產生額外費用

# 建立 連接器
<a name="mkc-create-connector-intro"></a>

此程序說明如何使用 建立連接器 AWS 管理主控台。

**使用 建立連接器 AWS 管理主控台**

1. 開啟位於 [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) 的 Amazon MSK 主控台。

1. 在左窗格的 **MSK Connect** 下，選擇**連接器**。

1. 選擇 **Create connector (建立連接器)**。

1. 您可選擇使用現有的自訂外掛程式來建立連接器，或先建立新的自訂外掛程式。如需有關自訂外掛程式以及如何建立的相關資訊，請參閱 [建立自訂外掛程式](msk-connect-plugins.md)。在此過程中，請假設您具有要使用的自訂外掛程式。在自訂外掛程式清單中，找到您要使用的外掛程式，然後選取其左側的方塊，然後選擇**下一步**。

1. 輸入名稱，以及描述 (非必要)。

1. 選擇您要連線到的叢集。

1. 在**連接器網路設定**區段中，針對網路類型選擇下列其中一項：
   + **IPv4** （預設） - 僅適用於透過 IPv4 連線至目的地
   + **雙堆疊** - 用於透過 IPv4 和 IPv6 連線至目的地 （僅當子網路具有與其相關聯的 IPv4 和 IPv6 CIDR 區塊時可用）

1. 指定連接器組態。您需要指定的組態參數會依據您要建立的連接器類型而定。然而，有部分參數是所有連接器都要指定的，例如 `connector.class` 和 `tasks.max` 參數。以下是 [Confluent Amazon S3 目的地連接器](https://www.confluent.io/hub/confluentinc/kafka-connect-s3)的範例組態。

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   tasks.max=2
   topics=my-example-topic
   s3.region=us-east-1
   s3.bucket.name=amzn-s3-demo-bucket
   flush.size=1
   storage.class=io.confluent.connect.s3.storage.S3Storage
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   key.converter=org.apache.kafka.connect.storage.StringConverter
   value.converter=org.apache.kafka.connect.storage.StringConverter
   schema.compatibility=NONE
   ```

1. 接下來，您需設定連接器容量。您可在兩種容量模式中選擇其一：佈建和自動擴展。如需關於這兩個選項的詳細資訊，請參閱[了解連接器容量](msk-connect-capacity.md)。

1. （選用） 在**最大自動擴展任務計數**區段中，使用最大自動擴展任務計數欄位，輸入要在自動擴展操作期間配置給連接器的任務數量上限。值必須至少等於您的工作者計數上限。如果您未指定值，連接器會無任何限制地使用標準計算。如需詳細資訊，請參閱[了解自動擴展任務計數上限](msk-connect-max-autoscaling-task-count.md)。

1. 在預設工作程序組態或自訂工作程序組態中選擇其一。如需有關建立自訂工作程序組態的詳細資訊，請參閱 [了解 MSK Connect 工作者](msk-connect-workers.md)。

1. 接下來，您需指定服務執行角色。這必須是 MSK Connect 可以擔任的 IAM 角色，並授予連接器存取必要 AWS 資源所需的所有許可。這些許可會依連接器的邏輯而定。如需如何建立此角色的資訊，請參閱 [了解服務執行角色](msk-connect-service-execution-role.md)。

1. 選擇**下一步**，檢閱安全性資訊，然後再次選擇**下一步**。

1. 指定所需的記錄選項，然後選擇**下一步**。如需日誌記錄的相關資訊，請參閱[MSK Connect 的日誌](msk-connect-logging.md)。

1. 在**檢閱和建立**頁面上，檢閱您的連接器組態，然後選擇**建立連接器**。

若要使用 MSK Connect API 建立連接器，請參閱 [CreateConnector](https://docs.aws.amazon.com/MSKC/latest/mskc/API_CreateConnector.html)。

您可以使用 `UpdateConnector` API 來修改連接器的組態。如需詳細資訊，請參閱[更新連接器](mkc-update-connector.md)。

# 更新連接器
<a name="mkc-update-connector"></a>

此程序說明如何使用 更新現有 MSK Connect 連接器的組態 AWS 管理主控台。

**使用 更新連接器組態 AWS 管理主控台**

1. 開啟位於 [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) 的 Amazon MSK 主控台。

1. 在左窗格的 **MSK Connect** 下，選擇**連接器**。

1. 選取 existig 連接器。

1. 選擇**編輯連接器組態**。

1. 更新連接器組態。您無法使用 `connector.class` UpdateConnector 覆寫 。下列範例顯示 Confluent Amazon S3 Sink 連接器的範例組態。

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   tasks.max=2
   topics=my-example-topic
   s3.region=us-east-1
   s3.bucket.name=amzn-s3-demo-bucket
   flush.size=1
   storage.class=io.confluent.connect.s3.storage.S3Storage
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   key.converter=org.apache.kafka.connect.storage.StringConverter
   value.converter=org.apache.kafka.connect.storage.StringConverter
   schema.compatibility=NONE
   ```

1. 選擇**提交**。

1. 然後，您可以在連接器的操作****索引標籤中監控操作的目前狀態。

若要使用 MSK Connect API 更新連接器的組態，請參閱 [UpdateConnector](https://docs.aws.amazon.com/MSKC/latest/mskc/API_UpdateConnector.html)。

# 從連接器連線
<a name="msk-connect-from-connectors"></a>

以下最佳實務可改善您與 Amazon MSK Connect 的連線效能。

## 請勿重疊 Amazon VPC 對等互連或 Transit Gateway 的 IP
<a name="CIDR-ip-ranges"></a>

若您使用 Amazon VPC 對等互連或 Transit Gateway 搭配 Amazon MSK Connect，請勿將連接器設定為透過 CIDR 範圍中的 IP 連線對等互聯的 VPC 資源：
+ "10.99.0.0/16"
+ "192.168.0.0/16"
+ "172.21.0.0/16"

# 建立自訂外掛程式
<a name="msk-connect-plugins"></a>

外掛程式是一種 AWS 資源，其中包含定義連接器邏輯的程式碼。您可以將 JAR 檔案 (或包含一個或多個 JAR 檔案的 ZIP 檔案) 上傳至 S3 儲存貯體，並在建立外掛程式時指定儲存貯體的位置。建立外掛程式時，MSK Connect 會在該時間點複製 S3 物件的內容。它不會維護 S3 物件的連結，因此對物件的任何後續修改都不會影響外掛程式或其連接器。在建立連接器時，您可以指定要讓 MSK Connect 用於連接器的外掛程式。外掛程式與連接器的關係是one-to-many：您可以從相同的外掛程式建立一個或多個連接器。

**注意**  
自訂外掛程式無法正確更新。若要使用新版本的外掛程式程式碼，請刪除參考外掛程式的所有連接器、刪除外掛程式，然後重新建立。

**自訂外掛程式的相依性封裝**  
我們建議您包含外掛程式的所有必要 JAR 檔案和相依性。將您的連接器封裝為下列其中一項：  
包含外掛程式所有必要 JAR 檔案和相依性的 ZIP 檔案。
單一 uber JAR，其中包含外掛程式及其相依性的所有類別檔案。
未綁定您的外掛程式相依性可能會影響執行時間環境中的可用性或相容性，並導致非預期的錯誤。

如需如何開發連接器程式碼的相關資訊，請參閱 Apache Kafka 文件中的 [Connector Development Guide](https://kafka.apache.org/documentation/#connect_development)。

**使用 建立自訂外掛程式 AWS 管理主控台**

1. 開啟位於 [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) 的 Amazon MSK 主控台。

1. 在左窗格的 **MSK Connect** 下選擇**自訂外掛程式**。

1. 選擇**建立自訂外掛程式**。

1. 選擇 **Browse S3** (瀏覽 S3)。

1. 在 S3 儲存貯體清單中，為外掛程式選擇具有 JAR 或 ZIP 檔案的儲存貯體。

1. 在物件清單中，為外掛程式選取 JAR 或 ZIP 檔案左邊的方塊，然後選擇**選擇**。

1. 選擇**建立自訂外掛程式**。

若要使用 MSK Connect API 建立自訂外掛程式，請參閱 [CreateCustomPlugin](https://docs.aws.amazon.com/MSKC/latest/mskc/API_CreateCustomPlugin.html)。

# 了解 MSK Connect 工作者
<a name="msk-connect-workers"></a>

工作程序是執行連接器邏輯的 Java 虛擬機器 (JVM) 程序。每個工作程序皆會建立一組在平行執行緒中執行的任務，並執行複製資料的工作。任務不會存放狀態，因此可以隨時啟動、停止或重新啟動，以提供彈性且可擴展的資料管道。無論是源自於擴展事件還是非預期的失敗，其餘工作程序會自動偵測到工作程序數量的變更。它們會協調以重新平衡整組其餘工作程序的任務。Connect 工作程序會使用 Apache Kafka 的取用者群組來進行協調和重新平衡。

若連接器的容量需求可變或難以估計，您可以讓 MSK Connect 根據需要在您指定的下限和上限之間擴展工作程序數量。您也可以指定要執行連接器邏輯的工作程序確切數量。如需詳細資訊，請參閱[了解連接器容量](msk-connect-capacity.md)。

**MSK Connect 工作者使用 IP 地址**  
MSK Connect 工作者會使用客戶提供的子網路中的 IP 地址。每個工作者都會從其中一個客戶提供的子網路使用一個 IP 地址。您應該確保在提供給 CreateConnector 請求的子網路中有足夠的可用 IP 地址，以考慮其指定的容量，尤其是在自動擴展連接器時，工作者數量可能會波動。

## 預設工作程序組態
<a name="msk-connect-default-worker-config"></a>

MSK Connect 會提供以下預設工作程序組態：

```
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
```

# 支援的工作程序組態屬性
<a name="msk-connect-supported-worker-config-properties"></a>

MSK Connect 會提供預設工作程序組態。您也可以選擇建立自訂工作程序組態，以與連接器搭配使用。以下清單列出了 Amazon MSK Connect 支援或不支援的工作程序組態屬性訊。
+ `key.converter` 和 `value.converter` 屬性為必要項目。
+ MSK Connect 支援以下 `producer.` 組態屬性。

  ```
  producer.acks
  producer.batch.size
  producer.buffer.memory
  producer.compression.type
  producer.enable.idempotence
  producer.key.serializer
  producer.linger.ms
  producer.max.request.size
  producer.metadata.max.age.ms
  producer.metadata.max.idle.ms
  producer.partitioner.class
  producer.reconnect.backoff.max.ms
  producer.reconnect.backoff.ms
  producer.request.timeout.ms
  producer.retry.backoff.ms
  producer.value.serializer
  ```
+ MSK Connect 支援以下 `consumer.` 組態屬性。

  ```
  consumer.allow.auto.create.topics
  consumer.auto.offset.reset
  consumer.check.crcs
  consumer.fetch.max.bytes
  consumer.fetch.max.wait.ms
  consumer.fetch.min.bytes
  consumer.heartbeat.interval.ms
  consumer.key.deserializer
  consumer.max.partition.fetch.bytes
  consumer.max.poll.interval.ms
  consumer.max.poll.records
  consumer.metadata.max.age.ms
  consumer.partition.assignment.strategy
  consumer.reconnect.backoff.max.ms
  consumer.reconnect.backoff.ms
  consumer.request.timeout.ms
  consumer.retry.backoff.ms
  consumer.session.timeout.ms
  consumer.value.deserializer
  ```
+ 支援所有不以 `producer.` 或 `consumer.` 字首開頭的組態屬性，以下屬性外*除外*。

  ```
  access.control.
  admin.
  admin.listeners.https.
  client.
  connect.
  inter.worker.
  internal.
  listeners.https.
  metrics.
  metrics.context.
  rest.
  sasl.
  security.
  socket.
  ssl.
  topic.tracking.
  worker.
  bootstrap.servers
  config.storage.topic
  connections.max.idle.ms
  connector.client.config.override.policy
  group.id
  listeners
  metric.reporters
  plugin.path
  receive.buffer.bytes
  response.http.headers.config
  scheduled.rebalance.max.delay.ms
  send.buffer.bytes
  status.storage.topic
  ```

如需有關工作程序組態屬性和其代表內容的詳細資訊，請參閱 Apache Kafka 文件中的 [Kafka Connect Configs](https://kafka.apache.org/documentation/#connectconfigs)。

# 建立自訂工作者組態
<a name="msk-connect-create-custom-worker-config"></a>

此程序說明如何使用 建立自訂工作者組態 AWS 管理主控台。

**使用 建立自訂工作者組態 AWS 管理主控台**

1. 開啟位於 [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) 的 Amazon MSK 主控台。

1. 在左窗格的 **MSK Connect**下，選擇**工作程序組態**。

1. 選擇**建立工作程序組態**。

1. 輸入名稱和選用描述，然後加入您要為其設定的屬性和值。

1. 選擇**建立工作程序組態**。

若要使用 MSK Connect API 建立工作程序組態，請參閱 [CreateWorkerConfiguration](https://docs.aws.amazon.com/MSKC/latest/mskc/API_CreateWorkerConfiguration.html)。

# 使用 管理來源連接器位移 `offset.storage.topic`
<a name="msk-connect-manage-connector-offsets"></a>

本節提供的資訊有助您使用*偏移儲存主題*來管理來源連接器偏移。偏移儲存主題是一項內部主題，Kafka Connect 會用來存放連接器和任務組態偏移。

## 考量事項
<a name="msk-connect-manage-connector-offsets-considerations"></a>

在管理來源連接器偏移時，請考量以下事項。
+ 若要指定偏移儲存主題，請提供 Kafka 主題的名稱，其連接器偏移會被儲存為工作程序組態中的 `offset.storage.topic` 值。
+ 在變更連接器組態時請小心。若來源連接器會將組態中的值用於關鍵偏移記錄，則變更組態值可能會導致意外的連接器行為。我們建議您參考外掛程式的文件以取得指引。
+ **自訂預設分區數量** - 除了藉由新增 `offset.storage.topic` 的方式來自訂工作程序組態之外，您還可以自訂偏移量狀態儲存主題的分區數量。內部主題的預設分區數量如下。
  + `config.storage.topic`：1，不可設定，必須為單一分區主題
  + `offset.storage.topic`：25，藉由提供 `offset.storage.partitions` 進行設定
  + `status.storage.topic`：5，藉由提供 `status.storage.partitions` 進行設定
+ **手動刪除主題** - Amazon MSK Connect 會在連接器的每個部署上建立新的 Kafka 連接內部主題 (主題名稱開頭為 `__amazon_msk_connect`)。連接至已刪除連接器的舊主題不會自動受到移除，這是因為內部主題 (例如 `offset.storage.topic`) 可在連接器之間重複使用。然而，您可以手動刪除由 MSK Connect 建立但未使用的內部主題。內部主題會依 `__amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id` 格式來命名。

  規則表達式 `__amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id` 可用於刪除內部主題。您不應刪除執行中連接器目前正在使用的內部主題。
+ **將相同名稱用於 MSK Connect 建立的內部主題** - 若您要重複使用偏移儲存主題來使用先前建立之連接器的偏移，您必須為新連接器指定與舊連接器相同的名稱。可以使用工作程序組態來設定 `offset.storage.topic` 屬性，將相同名稱指派給 `offset.storage.topic`，並在不同連接器之間重複使用。此組態在[管理連接器偏移](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-create-custom-worker-config)中有所描述。MSK Connect 不允許不同連接器共用 `config.storage.topic` 和 `status.storage.topic`。每次您在 MSKC 中建立新連接器時，皆會建立這些主題。它們會依 `__amazon_msk_connect_<status|configs>_connector_name_connector_id` 格式自動命名，因此它們的名稱在您建立的不同連接器之間也會有所不同。

# 使用預設位移儲存主題
<a name="msk-connect-default-offset-storage-topic"></a>

根據預設，Amazon MSK Connect 會為您建立的每個連接器，在 Kafka 叢集上產生新的偏移儲存主題。MSK 會使用連接器 ARN 的部分來建構預設主題名稱。例如 `__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2`。

# 使用自訂位移儲存主題
<a name="msk-connect-set-offset-storage-topic"></a>

若要在來源連接器之間提供偏移連續性，您可以使用所選偏移儲存主題，而非預設主題。指定偏移儲存主題有助您完成任務，例如，建立來源連接器從上一個連接器的最後一個偏移恢復讀取。

若要指定偏移儲存主題，請在建立連接器之前，在工作程序組態中提供 `offset.storage.topic` 屬性值。若您要重複使用偏移儲存主題來使用先前建立之連接器的偏移，您必須為新連接器指定與舊連接器相同的名稱。若您建立自訂偏移儲存主題，則必須在主題組態中將 [https://kafka.apache.org/27/documentation.html#topicconfigs_cleanup.policy](https://kafka.apache.org/27/documentation.html#topicconfigs_cleanup.policy) 設定為 `compact`。

**注意**  
若您在建立*目的地*連接器時指定偏移儲存主題，則若該主題尚不存在，MSK Connect 將會建立該主題。然而，該主題將不會用於儲存連接器偏移。  
反而是使用 Kafka 取用者群組通訊協定來管理目的地連接器偏移。每個目的地連接器都會建立名為 `connect-{CONNECTOR_NAME}` 的群組。只要取用者群組存在，您使用相同 `CONNECTOR_NAME` 值建立的任何連續目的地連接器都會從上次遞交的偏移繼續執行。

**重要**  
如果您想要在維持位移連續性的同時更新現有的連接器組態，請使用 UpdateConnector API。如需詳細資訊，請參閱[更新連接器](mkc-update-connector.md)。

**Example ：在重新建立來源連接器時指定位移儲存主題**  
如果您需要刪除並重新建立連接器，同時保持偏移連續性，您可以在工作者組態中指定偏移儲存主題。例如，假設您有一個變更資料擷取 (CDC) 連接器，並且想要重新建立它，而不會遺失您在 CDC 串流中的位置。以下步驟會說明如何完成這項任務。  

1. 在用戶端電腦上，執行以下命令以尋找連接器的偏移儲存主題名稱。使用您叢集的引導代理程式字串來取代 `<bootstrapBrokerString>`。如需有關取得引導代理程式字串的指示，請參閱 [取得 Amazon MSK 叢集的引導代理程式](msk-get-bootstrap-brokers.md)。

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --list --bootstrap-server <bootstrapBrokerString>
   ```

   以下輸出顯示所有叢集主題的清單，包括任何預設內部連接器主題。在此範例中，現有 CDC 連接器會使用 MSK Connect 建立的[預設偏移儲存主題](msk-connect-default-offset-storage-topic.md)。此即為偏移儲存主題被稱為 `__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2` 之原因。

   ```
   __consumer_offsets
   __amazon_msk_canary
   __amazon_msk_connect_configs_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   my-msk-topic-1
   my-msk-topic-2
   ```

1. 開啟位於 [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk) 的 Amazon MSK 主控台。

1. 從**連接器**清單中選擇您的連接器。複製並儲存**連接器組態**欄位的內容，以便修改並使用該內容來建立新的連接器。

1. 選擇**刪除**以刪除連接器。文字輸入在欄位中輸入連接器名稱以確認刪除。

1. 使用符合您案例的值來建立自訂工作程序組態。如需說明，請參閱[建立自訂工作者組態](msk-connect-create-custom-worker-config.md)。

   在工作程序組態中，您必須將之前擷取之偏移儲存主題的名稱指定為 `offset.storage.topic` 的值，如以下組態所示。

   ```
   config.providers.secretManager.param.aws.region=eu-west-3
   key.converter=<org.apache.kafka.connect.storage.StringConverter>
   value.converter=<org.apache.kafka.connect.storage.StringConverter>
   config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
   config.providers=secretManager
   offset.storage.topic=__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   ```

1. 
**重要**  
您必須為新連接器取名為與舊連接器相同的名稱。

   使用您在之前步驟中設定的工作程序組態來建立新的連接器。如需說明，請參閱[建立 連接器](mkc-create-connector-intro.md)。

# 教學課程：使用組態提供者將敏感資訊外部化
<a name="msk-connect-config-provider"></a>

此範例說明如何使用開放原始碼組態供應商將 Amazon MSK Connect 的敏感資訊外部化。組態供應商讓您可在連接器或工作程序組態中指定變數 (而非純文字)，而在連接器中執行的工作程序會在執行期解析這些變數。此做法可避免系統以純文字方式儲存憑證和其他秘密。範例中的組態提供者支援從 AWS Secrets Manager、Amazon S3 和 Systems Manager (SSM) 擷取組態參數。在[步驟 2](#msk-connect-config-providers) 中，您可以了解如何設定服務的儲存和敏感資訊擷取。

## 考量事項
<a name="msk-connect-config-providers-considerations"></a>

在搭配使用 MSK 組態供應商和 Amazon MSK Connect 時，請考量以下事項：
+ 在使用組態供應商時，將適當的許可指派至 IAM 服務執行角色。
+ 在工作程序組態中定義組態供應商，並在連接器組態中定義其實作。
+ 如果外掛程式未將這些值定義為秘密，則敏感組態值可能會顯示在連接器日誌中。Kafka Connect 會將未定義的組態值視為與任何其他純文字值相同。如需詳細資訊，請參閱 [避免秘密顯示在連接器日誌中](msk-connect-logging.md#msk-connect-logging-secrets)。
+ 根據預設，在連接器使用組態供應商時，MSK Connect 會經常重新啟動連接器。若要關閉此重新啟動行為，您可以在連接器組態中將 `config.action.reload` 值設定為 `none`。

## 建立自訂外掛程式並上傳至 S3
<a name="msk-connect-config-providers-create-custom-plugin"></a>

 若要建立自訂外掛程式，請在本機上執行以下命令，建立包含連接器和 msk-config-provider 的 zip 檔案。

**使用終端機視窗和 Debezium 作為連接器來建立自訂外掛程式**

使用 AWS CLI 以超級使用者身分執行命令，其中包含可讓您存取 AWS S3 儲存貯體的登入資料。如需安裝和設定 AWS CLI 的資訊，請參閱*AWS Command Line Interface 《 使用者指南*》中的 [CLI AWS 入門](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html)。如需搭配 Amazon S3 使用 AWS CLI 的詳細資訊，請參閱*AWS Command Line Interface 《 使用者指南*》中的[搭配 AWS CLI 使用 Amazon S3](https://docs.aws.amazon.com/cli/latest/userguide/cli-services-s3.html)。

1. 在終端機視窗中，使用以下命令在工作區中建立名為 `custom-plugin` 的資料夾。

   ```
   mkdir custom-plugin && cd custom-plugin
   ```

1. 使用以下命令，從 [Debezium 網站](https://debezium.io/releases/)下載 **MySQL Connector Plug-in** 的最新穩定版本。

   ```
   wget https://repo1.maven.org/maven2/io/debezium/debezium-connectormysql/
   2.2.0.Final/debezium-connector-mysql-2.2.0.Final-plugin.tar.gz
   ```

   使用以下命令將下載的 gzip 文件解壓縮至 `custom-plugin` 文件夾中。

   ```
   tar xzf debezium-connector-mysql-2.2.0.Final-plugin.tar.gz
   ```

1. 使用以下命令來下載 [MSK 組態供應商 zip 檔案](https://github.com/aws-samples/msk-config-providers/releases/download/r0.4.0/msk-config-providers-0.4.0-with-dependencies.zip)。

   ```
   wget https://github.com/aws-samples/msk-config-providers/releases/download/r0.4.0/msk-config-providers-0.4.0-with-dependencies.zip
   ```

   使用以下命令將下載的 zip 文件解壓縮至 `custom-plugin` 文件夾中。

   ```
   unzip msk-config-providers-0.4.0-with-dependencies.zip
   ```

1. 將上述步驟中 MSK 組態供應商的內容和自訂連接器壓縮至名為 `custom-plugin.zip` 的單一檔案中。

   ```
   zip -r ../custom-plugin.zip * 
   ```

1. 將檔案上傳至 S3 以供稍後參考。

   ```
   aws s3 cp ../custom-plugin.zip s3:<S3_URI_BUCKET_LOCATION>
   ```

1. 在 Amazon MSK 主控台的 **MSK Connect** 區段下，選擇**自訂外掛程式**，然後選擇**建立自訂外掛程式**，並瀏覽 **s3:<*S3\$1URI\$1BUCKET\$1LOCATION*>** S3 儲存貯體，以選取您剛剛上傳的自訂外掛程式 ZIP 檔案。  
![\[Amazon S3 bucket interface showing a single custom-plugin.zip file in the debezium folder.\]](http://docs.aws.amazon.com/zh_tw/msk/latest/developerguide/images/s3-object-browser.png)

1. 輸入 **debezium-custom-plugin** 作為外掛程式名稱。輸入描述 (選用)，然後選擇**建立自訂外掛程式**。  
![\[Amazon S3 bucket interface showing a single custom-plugin.zip file in the debezium folder.\]](http://docs.aws.amazon.com/zh_tw/msk/latest/developerguide/images/create-custom-plugin.png)

## 設定不同供應商的參數和許可
<a name="msk-connect-config-providers"></a>

您可以在以下三項服務中設定參數值：
+ Secrets Manager
+ Systems Manager Parameter Store
+ S3 - Simple Storage Service

選擇下列其中一個索引標籤，以取得設定參數和該服務相關許可的指示。

------
#### [ Configure in Secrets Manager ]

**在 Secrets Manager 中設定參數值**

1. 開啟 [Secrets Manager 主控台](https://console.aws.amazon.com/secretsmanager/)。

1. 建立新秘密以存放您的憑證或秘密。如需說明，請參閱*AWS Secrets Manager 《 使用者指南*》中的[建立 AWS Secrets Manager 秘密](https://docs.aws.amazon.com/secretsmanager/latest/userguide/create_secret.html)。

1. 複製您秘密的 ARN。

1. 將以下範例政策中的 Secrets Manager 許可新增至您的[服務執行角色](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-service-execution-role.html)。將範例 ARN `arn:aws:secretsmanager:us-east-1:123456789012:secret:MySecret-1234`取代為您秘密的 ARN。

1. 新增工作程序組態和連接器指示。  
****  

   ```
   {
           "Version":"2012-10-17",		 	 	 
           "Statement": [
               {
                   "Effect": "Allow",
                   "Action": [
                       "secretsmanager:GetResourcePolicy",
                       "secretsmanager:GetSecretValue",
                       "secretsmanager:DescribeSecret",
                       "secretsmanager:ListSecretVersionIds"
                   ],
                   "Resource": [
                   "arn:aws:secretsmanager:us-east-1:123456789012:secret:MySecret-1234"
                   ]
               }
           ]
       }
   ```

1. 若要使用 Secrets Manager 組態供應商，請在步驟 3 中將下列程式碼行複製到工作程序組態文字方塊：

   ```
   # define name of config provider:
   
   config.providers = secretsmanager
   
   # provide implementation classes for secrets manager:
   
   config.providers.secretsmanager.class = com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
   
   # configure a config provider (if it needs additional initialization), for example you can provide a region where the secrets or parameters are located:
   
   config.providers.secretsmanager.param.region = us-east-1
   ```

1. 如果使用 Secrets Manager 組態供應商，請在步驟 4 中複製連接器組態的以下程式碼行。

   ```
   #Example implementation for secrets manager variable
   database.user=${secretsmanager:MSKAuroraDBCredentials:username}
   database.password=${secretsmanager:MSKAuroraDBCredentials:password}
   ```

您也可以針對更多組態供應商使用上述步驟。

------
#### [ Configure in Systems Manager Parameter Store ]

**在 Systems Manager Parameter Store 中設定參數值**

1. 開啟 [Systems Manager 主控台](https://console.aws.amazon.com/systems-manager/)。

1. 在導覽窗格中，選擇 **Parameter Store (參數存放區)**。

1. 建立新參數以存放在 Systems Manager 中。如需說明，請參閱 AWS Systems Manager 《 使用者指南》中的[建立 Systems Manager 參數 （主控台）](https://docs.aws.amazon.com/systems-manager/latest/userguide/parameter-create-console.html)。

1. 複製您參數的 ARN。

1. 將以下範例政策中的 Systems Manager 許可新增至您的[服務執行角色](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-service-execution-role.html)。使用您參數的 ARN 來取代 *<arn:aws:ssm:us-east-1:123456789000:parameter/MyParameterName>*。  
****  

   ```
   {
           "Version":"2012-10-17",		 	 	 
           "Statement": [
               {
                   "Sid": "VisualEditor0",
                   "Effect": "Allow",
                   "Action": [
                       "ssm:GetParameterHistory",
                       "ssm:GetParametersByPath",
                       "ssm:GetParameters",
                       "ssm:GetParameter"
                   ],
                   "Resource": "arn:aws:ssm:us-east-1:123456789000:parameter/MyParameterName"
               }
           ]
       }
   ```

1. 若要使用 Parameter Store 組態供應商，請在步驟 3 將下列程式碼行複製到工作程序組態文字方塊：

   ```
   # define name of config provider:
   
   config.providers = ssm
   
   # provide implementation classes for parameter store:
   
   config.providers.ssm.class = com.amazonaws.kafka.config.providers.SsmParamStoreConfigProvider
   
   # configure a config provider (if it needs additional initialization), for example you can provide a region where the secrets or parameters are located:
   
   config.providers.ssm.param.region = us-east-1
   ```

1. 如果使用 Parameter Store 組態供應商，請在步驟 5 中複製連接器組態的以下程式碼行。

   ```
   #Example implementation for parameter store variable
   schema.history.internal.kafka.bootstrap.servers=${ssm::MSKBootstrapServerAddress}
   ```

   您也可以將更多組態供應商綁定上述兩個步驟。

------
#### [ Configure in Amazon S3 ]

**在 Amazon S3 中設定物件/檔案**

1. 開啟 [Amazon S3 主控台](https://console.aws.amazon.com/s3/)。

1. 在 S3 中將您的物件上傳至儲存貯體。如需相關說明，請參閱[上傳物件](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html)。

1. 複製您物件的 ARN。

1. 將以下範例政策中的 Amazon S3 Object Read 許可新增至您的[服務執行角色](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-service-execution-role.html)。將範例 ARN `arn:aws:s3:::<MY_S3_BUCKET/path/to/custom-plugin.zip>`取代為您物件的 ARN。  
****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
               {
                   "Sid": "VisualEditor0",
                   "Effect": "Allow",
                   "Action": "s3:GetObject",
                   "Resource": "arn:aws:s3:::<MY_S3_BUCKET/path/to/custom-plugin.zip>"
               }
           ]
       }
   ```

1. 若要使用 Amazon S3 組態供應商，請在步驟 3 中將下列程式碼行複製到工作程序組態文字方塊：

   ```
   # define name of config provider:
   
   config.providers = s3import
   # provide implementation classes for S3:
   
   config.providers.s3import.class = com.amazonaws.kafka.config.providers.S3ImportConfigProvider
   ```

1. 如果使用 Amazon S3 組態供應商，請在步驟 4 中將以下程式碼行複製至連接器組態。

   ```
   #Example implementation for S3 object
   
   database.ssl.truststore.location = ${s3import:us-west-2:my_cert_bucket/path/to/trustore_unique_filename.jks}
   ```

   您也可以將更多組態供應商綁定上述兩個步驟。

------

## 使用您組態供應商的資訊來建立自訂工作程序組態
<a name="msk-connect-config-providers-create-custom-config"></a>

1. 選取 **Amazon MSK Connect** 區段下的**工作程序組態**。

1. 選取**建立工作程序組態**。

1. 在工作程序組態名稱文字方塊中輸入 `SourceDebeziumCustomConfig`。描述為選用。

1. 根據所需的供應商複製相關組態程式碼，然後將其貼到**工作程序組態**文字方塊中。

1. 以下是所有三個供應商的工作程序組態範例：

   ```
   key.converter=org.apache.kafka.connect.storage.StringConverter
   key.converter.schemas.enable=false
   value.converter=org.apache.kafka.connect.json.JsonConverter
   value.converter.schemas.enable=false
   offset.storage.topic=offsets_my_debezium_source_connector
   
   # define names of config providers:
   
   config.providers=secretsmanager,ssm,s3import
   
   # provide implementation classes for each provider:
   
   config.providers.secretsmanager.class    = com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
   config.providers.ssm.class               = com.amazonaws.kafka.config.providers.SsmParamStoreConfigProvider
   config.providers.s3import.class          = com.amazonaws.kafka.config.providers.S3ImportConfigProvider
   
   # configure a config provider (if it needs additional initialization), for example you can provide a region where the secrets or parameters are located:
   
   
   config.providers.secretsmanager.param.region = us-east-1
   config.providers.ssm.param.region = us-east-1
   ```

1. 按一下「建立工作程序組態」。

## 建立連接器
<a name="msk-connect-config-providers-create-connector"></a>

1. 根據[建立新連接器](https://docs.aws.amazon.com/msk/latest/developerguide/mkc-create-connector.html)中的指示來建立新連接器。

1. 選擇您在 [建立自訂外掛程式並上傳至 S3](#msk-connect-config-providers-create-custom-plugin) 中上傳到 S3 儲存貯體的 `custom-plugin.zip` 檔案作為自訂外掛程式的來源。

1. 根據所需的供應商複製相關組態程式碼，然後將其貼到連接器組態欄位中。

1. 以下是所有三個供應商的連接器組態範例：

   ```
   #Example implementation for parameter store variable
   schema.history.internal.kafka.bootstrap.servers=${ssm::MSKBootstrapServerAddress}
   
   #Example implementation for secrets manager variable
   database.user=${secretsmanager:MSKAuroraDBCredentials:username}
   database.password=${secretsmanager:MSKAuroraDBCredentials:password}
   
   #Example implementation for Amazon S3 file/object
   database.ssl.truststore.location = ${s3import:us-west-2:my_cert_bucket/path/to/trustore_unique_filename.jks}
   ```

1. 選取**使用自訂組態**，然後從**工作程序組態**下拉式選單中選擇 **SourceDebeziumCustomConfig**。

1. 依照[建立連接器](https://docs.aws.amazon.com/msk/latest/developerguide/mkc-create-connector.html)中指示的其餘步驟進行。

# MSK Connect 的 IAM 角色和政策
<a name="msk-connect-iam"></a>

本節可協助您設定適當的 IAM 政策和角色，以在 AWS 環境中安全地部署和管理 Amazon MSK Connect。下列各節說明必須與 MSK Connect 搭配使用的服務執行角色，包括必要的信任政策，以及連線至 IAM 驗證 MSK 叢集時所需的其他許可。此頁面也提供完整 IAM 政策的範例，以授予 MSK Connect 功能的完整存取權，以及服務可用的 AWS 受管政策詳細資訊。

**Topics**
+ [了解服務執行角色](msk-connect-service-execution-role.md)
+ [MSK Connect 的 IAM 政策範例](mkc-iam-policy-examples.md)
+ [防止跨服務混淆代理人問題](cross-service-confused-deputy-prevention.md)
+ [AWS MSK Connect 的 受管政策](mkc-security-iam-awsmanpol.md)
+ [使用 MSK Connect 的服務連結角色](mkc-using-service-linked-roles.md)

# 了解服務執行角色
<a name="msk-connect-service-execution-role"></a>

**注意**  
Amazon MSK Connect 不支援使用[服務連結角色](mkc-using-service-linked-roles.md)作為服務執行角色。您必須建立個別的服務執行角色。如需如何建立自訂 IAM 角色的指示，請參閱《*IAM 使用者指南*》中的[建立角色以將許可委派給 AWS 服務](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html)。

當您使用 MSK Connect 建立連接器時，您必須指定要與其搭配使用的 AWS Identity and Access Management (IAM) 角色。您的服務執行角色必須具有以下信任政策，MSK Connect 才能擔任該角色。如需有關此政策中條件內容鍵的詳細資訊，請參閱 [防止跨服務混淆代理人問題](cross-service-confused-deputy-prevention.md)。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "aws:SourceAccount": "123456789012"
        },
        "ArnLike": {
          "aws:SourceArn": "arn:aws:kafkaconnect:us-east-1:123456789012:connector/myConnector/abc12345-abcd-4444-a8b9-123456f513ed-2"
        }
      }
    }   
  ]
}
```

------

如果要與連接器搭配使用的 Amazon MSK 叢集是使用 IAM 身分驗證的叢集，則您必須將以下許可政策新增至連接器的服務執行角色。如需如何尋找叢集 UUID 以及如何建構主題 ARNs 的資訊，請參閱 [授權政策資源](kafka-actions.md#msk-iam-resources)。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:000000000001:cluster/testClusterName/300d0000-0000-0005-000f-00000000000b-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTopic"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/myCluster/300a0000-0000-0003-000a-00000000000b-6/__amazon_msk_connect_read"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:WriteData",
                "kafka-cluster:DescribeTopic"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/testCluster/300f0000-0000-0008-000d-00000000000m-7/__amazon_msk_connect_write"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:CreateTopic",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTopic"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/testCluster/300f0000-0000-0008-000d-00000000000m-7/__amazon_msk_connect_*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:group/testCluster/300d0000-0000-0005-000f-00000000000b-1/__amazon_msk_connect_*",
                "arn:aws:kafka:us-east-1:123456789012:group/testCluster/300d0000-0000-0005-000f-00000000000b-1/connect-*"
            ]
        }
    ]
}
```

------

根據連接器的類型，您可能還需要將許可政策連接到服務執行角色，以允許它存取 AWS 資源。例如，若您的連接器需要將資料傳送至 S3 儲存貯體，則服務執行角色必須具有授予寫入該儲存貯體的許可政策。為了進行測試，您可以使用其中一個預先建立的 IAM 政策 (例如 `arn:aws:iam::aws:policy/AmazonS3FullAccess`) 來提供完整存取權。不過，為了安全起見，我們建議您使用最嚴格的政策，讓您的連接器能夠從 AWS 來源讀取或寫入 AWS 接收器。

# MSK Connect 的 IAM 政策範例
<a name="mkc-iam-policy-examples"></a>

若要讓非管理員使用者完整存取所有 MSK Connect 功能，請將類似以下政策的政策連接至使用者的 IAM 角色。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Sid": "MSKConnectFullAccess",
      "Effect": "Allow",
      "Action": [
        "kafkaconnect:CreateConnector",
        "kafkaconnect:DeleteConnector",
        "kafkaconnect:DescribeConnector",
        "kafkaconnect:ListConnectors",
        "kafkaconnect:UpdateConnector",
        "kafkaconnect:CreateCustomPlugin",
        "kafkaconnect:DeleteCustomPlugin",
        "kafkaconnect:DescribeCustomPlugin",
        "kafkaconnect:ListCustomPlugins",
        "kafkaconnect:CreateWorkerConfiguration",
        "kafkaconnect:DeleteWorkerConfiguration",
        "kafkaconnect:DescribeWorkerConfiguration",
        "kafkaconnect:ListWorkerConfigurations"
      ],
      "Resource": "*"
    },
    {
      "Sid": "IAMPassRole",
      "Effect": "Allow",
      "Action": "iam:PassRole",
      "Resource": "arn:aws:iam::123456789012:role/MSKConnectServiceRole",
      "Condition": {
        "StringEquals": {
          "iam:PassedToService": "kafkaconnect.amazonaws.com"
        }
      }
    },
    {
      "Sid": "EC2NetworkAccess",
      "Effect": "Allow",
      "Action": [
        "ec2:CreateNetworkInterface",
        "ec2:DescribeNetworkInterfaces",
        "ec2:DeleteNetworkInterface",
        "ec2:DescribeVpcs",
        "ec2:DescribeSubnets",
        "ec2:DescribeSecurityGroups"
      ],
      "Resource": "*"
    },
    {
      "Sid": "MSKClusterAccess",
      "Effect": "Allow",
      "Action": [
        "kafka:DescribeCluster",
        "kafka:DescribeClusterV2",
        "kafka:GetBootstrapBrokers"
      ],
      "Resource": "arn:aws:kafka:us-east-1:123456789012:cluster/myCluster/"
    },
    {
      "Sid": "MSKLogGroupAccess",
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents",
        "logs:DescribeLogStreams",
        "logs:DescribeLogGroups"
      ],
      "Resource": [
        "arn:aws:logs:us-east-1:123456789012:log-group:/aws/msk-connect/*"
      ]
    },
    {
      "Sid": "S3PluginAccess",
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket",
        "s3:PutObject"
      ],
      "Resource": [
        "arn:aws:s3:::amzn-s3-demo-bucket1-custom-plugins",
        "arn:aws:s3:::amzn-s3-demo-bucket1-custom-plugins/*"
      ]
    }
  ]
}
```

------

# 防止跨服務混淆代理人問題
<a name="cross-service-confused-deputy-prevention"></a>

混淆代理人問題屬於安全性議題，其中沒有執行動作許可的實體可以強制具有更多許可的實體執行該動作。在 中 AWS，跨服務模擬可能會導致混淆代理人問題。在某個服務 (*呼叫服務*) 呼叫另一個服務 (*被呼叫服務*) 時，可能會發生跨服務模擬。可以操縱呼叫服務來使用其許可，以其不應有存取許可的方式對其他客戶的資源採取動作。為了預防這種情況， AWS 提供的工具可協助您保護所有服務的資料，而這些服務主體已獲得您帳戶中資源的存取權。

若要限制 MSK Connect 為資源提供另一項服務的許可，我們建議在資源政策中使用 [https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourcearn](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourcearn) 和 [https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourceaccount](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourceaccount) 全域條件內容鍵。如果 `aws:SourceArn` 值不包含帳戶 ID (例如 Amazon S3 儲存貯體 ARN 不包含帳戶 ID)，則您必須使用這兩個全域條件內容鍵來限制許可。如果同時使用這兩個全域條件內容索引鍵，且 `aws:SourceArn` 值包含帳戶 ID，則在相同政策陳述式中使用 `aws:SourceAccount` 值和 `aws:SourceArn` 值中的帳戶時，必須使用相同的帳戶 ID。如果您想要僅允許一個資源與跨服務存取相關聯，則請使用 `aws:SourceArn`。如果您想要允許該帳戶中的任何資源與跨服務使用相關聯，請使用 `aws:SourceAccount`。

若為 MSK Connect，`aws:SourceArn` 的值必須是 MSK 連接器。

防範混淆代理人問題最有效的方法，是使用 `aws:SourceArn` 全域條件內容金鑰，以及資源的完整 ARN。如果不知道資源的完整 ARN，或者如果您指定了多個資源，請使用 `aws:SourceArn` 全域條件內容金鑰，同時使用萬用字元 (`*`) 表示 ARN 的未知部分。例如，*arn:aws:kafkaconnect:us-east-1:123456789012:connector/\$1* 代表美國東部 (維吉尼亞北部) 區域中屬於帳戶 123456789012 的所有連接器。

下列範例示範如何使用 MSK Connect 中的 `aws:SourceArn` 和 `aws:SourceAccount` 全域條件內容鍵，來預防混淆代理人問題。將 *123456789012* 和 arn：aws：kafkaconnect：*us-east-1*：*123456789012*：connector/*my-S3-Sink-Connector*/*abcd1234-5678-90ab-cdef-1234567890ab* 取代為您的 AWS 帳戶 和 連接器資訊。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": " kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "aws:SourceAccount": "123456789012"
        },
        "ArnLike": {
        "aws:SourceArn": "arn:aws:kafkaconnect:us-east-1:123456789012:connector/my-S3-Sink-Connector/abcd1234-5678-90ab-cdef-1234567890ab"
        }
      }
    }   
  ]
}
```

------

# AWS MSK Connect 的 受管政策
<a name="mkc-security-iam-awsmanpol"></a>

 AWS 受管政策是由 AWS AWS 受管政策建立和管理的獨立政策旨在為許多常用案例提供許可，以便您可以開始將許可指派給使用者、群組和角色。

請記住， AWS 受管政策可能不會授予特定使用案例的最低權限許可，因為這些許可可供所有 AWS 客戶使用。我們建議您定義特定於使用案例的[客戶管理政策](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_managed-vs-inline.html#customer-managed-policies)，以便進一步減少許可。

您無法變更 AWS 受管政策中定義的許可。如果 AWS 更新受 AWS 管政策中定義的許可，則更新會影響政策連接的所有主體身分 （使用者、群組和角色）。當新的 AWS 服務 啟動或新的 API 操作可供現有服務使用時， AWS 最有可能更新 AWS 受管政策。

如需詳細資訊，請參閱 *IAM 使用者指南*中的 [AWS 受管政策](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_managed-vs-inline.html#aws-managed-policies)。

## AWS 受管政策：AmazonMSKConnectReadOnlyAccess
<a name="security-iam-awsmanpol-AmazonMSKConnectReadOnlyAccess"></a>

此政策會授予使用者列出和描述 MSK Connect 資源所需的許可。

您可將 `AmazonMSKConnectReadOnlyAccess` 政策連接到 IAM 身分。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:ListConnectors",
                "kafkaconnect:ListCustomPlugins",
                "kafkaconnect:ListWorkerConfigurations"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:DescribeConnector"
            ],
            "Resource": [
                "arn:aws:kafkaconnect:*:*:connector/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:DescribeCustomPlugin"
            ],
            "Resource": [
                "arn:aws:kafkaconnect:*:*:custom-plugin/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:DescribeWorkerConfiguration"
            ],
            "Resource": [
                "arn:aws:kafkaconnect:*:*:worker-configuration/*"
            ]
        }
    ]
}
```

------

## AWS 受管政策：KafkaConnectServiceRolePolicy
<a name="security-iam-awsmanpol-KafkaConnectServiceRolePolicy"></a>

此政策會授予 MSK Connect 服務建立和管理具有標籤 `AmazonMSKConnectManaged:true` 之網路介面所需的許可。這些網路介面可讓 MSK Connect 網路存取 Amazon VPC 中的資源，例如 Apache Kafka 叢集、來源或目的地。

您無法將 KafkaConnectServiceRolePolicy 連接至 IAM 實體。此政策會連接到服務連結角色，從而 MSK Connect 可代表您執行動作。

------
#### [ JSON ]

****  

```
{
	"Version":"2012-10-17",		 	 	 
	"Statement": [
		{
			"Effect": "Allow",
			"Action": [
				"ec2:CreateNetworkInterface"
			],
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"aws:RequestTag/AmazonMSKConnectManaged": "true"
				},
				"ForAllValues:StringEquals": {
					"aws:TagKeys": "AmazonMSKConnectManaged"
				}
			}
		},
		{
			"Effect": "Allow",
			"Action": [
				"ec2:CreateNetworkInterface"
			],
			"Resource": [
				"arn:aws:ec2:*:*:subnet/*",
				"arn:aws:ec2:*:*:security-group/*"
			]
		},
		{
			"Effect": "Allow",
			"Action": [
				"ec2:CreateTags"
			],
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"ec2:CreateAction": "CreateNetworkInterface"
				}
			}
		},
		{
			"Effect": "Allow",
			"Action": [
				"ec2:DescribeNetworkInterfaces",
				"ec2:CreateNetworkInterfacePermission",
				"ec2:AttachNetworkInterface",
				"ec2:DetachNetworkInterface",
				"ec2:DeleteNetworkInterface"
			],
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"ec2:ResourceTag/AmazonMSKConnectManaged": "true"
				}
			}
		}
	]
}
```

------

## AWS 受管政策的 MSK Connect 更新
<a name="security-iam-awsmanpol-updates"></a>

檢視自此服務開始追蹤這些變更以來 MSK Connect AWS 受管政策更新的詳細資訊。


| 變更 | 描述 | Date | 
| --- | --- | --- | 
|  MSK Connect 已更新唯讀政策  |  MSK Connect 已更新 AmazonMSKConnectReadOnlyAccess 政策，移除對列出操作的限制。  | 2021 年 10 月 13 日 | 
|  MSK Connect 已開始追蹤變更  |  MSK Connect 開始追蹤其 AWS 受管政策的變更。  | 2021 年 9 月 14 日 | 

# 使用 MSK Connect 的服務連結角色
<a name="mkc-using-service-linked-roles"></a>

Amazon MSK Connect 使用 AWS Identity and Access Management (IAM)[ 服務連結角色](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_terms-and-concepts.html#iam-term-service-linked-role)。服務連結角色是直接連結至 MSK Connect 的一種特殊 IAM 角色類型。服務連結角色由 MSK Connect 預先定義，並包含該服務代表您呼叫其他 AWS 服務所需的所有許可。

服務連結角色可讓設定 MSK Connect 更為簡單，因為您不必手動新增必要的許可。MSK Connect 會定義其服務連結角色的許可，除非另有定義，否則僅有 MSK Connect 可以擔任其角色。定義的許可包括信任政策和許可政策，並且該許可政策不能連接到任何其他 IAM 實體。

如需關於支援服務連結角色的其他服務的資訊，請參閱[可搭配 IAM 運作的AWS 服務](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_aws-services-that-work-with-iam.html)，並尋找 **Service-Linked Role** (服務連結角色) 欄顯示為 **Yes** (是) 的服務。選擇具有連結的**是**，以檢視該服務的服務連結角色文件。

## MSK Connect 的服務連結角色許可
<a name="slr-permissions"></a>

MSK Connect 會使用名為 **AWSServiceRoleForKafkaConnect** 的服務連結角色 - 允許 Amazon MSK Connect 代表您存取 Amazon 資源。

AWSServiceRoleForKafkaConnect 服務連結角色信任 `kafkaconnect.amazonaws.com` 服務擔任該角色。

如需有關角色使用之許可政策的詳細資訊，請參閱 [AWS 受管政策：KafkaConnectServiceRolePolicy](mkc-security-iam-awsmanpol.md#security-iam-awsmanpol-KafkaConnectServiceRolePolicy)。

您必須設定許可，IAM 實體 (如使用者、群組或角色) 才可建立、編輯或刪除服務連結角色。如需詳細資訊，請參閱《*IAM 使用者指南*》中的[服務連結角色許可](https://docs.aws.amazon.com/IAM/latest/UserGuide/using-service-linked-roles.html#service-linked-role-permissions)。

## 建立 MSK Connect 的服務連結角色
<a name="create-slr"></a>

您不需要手動建立服務連結角色，當您在 AWS 管理主控台、 AWS CLI或 AWS API 中建立連接器時，MSK Connect 會為您建立服務連結角色。

若您刪除此服務連結角色，之後需要再次建立，您可以在帳戶中使用相同程序重新建立角色。當您建立連接器時，MSK Connect 會再次為您建立服務連結角色。

## 編輯 MSK Connect 的服務連結角色
<a name="edit-slr"></a>

MSK Connect 不允許您編輯 AWSServiceRoleForKafkaConnect 服務連結角色。因為可能有各種實體會參考服務連結角色，所以您無法在建立角色之後變更其名稱。然而，您可使用 IAM 來編輯角色描述。如需詳細資訊，請參閱《*IAM 使用者指南*》中的[編輯服務連結角色](https://docs.aws.amazon.com/IAM/latest/UserGuide/using-service-linked-roles.html#edit-service-linked-role)。

## 刪除 MSK Connect 的服務連結角色
<a name="delete-slr"></a>

您可以使用 IAM 主控台、 AWS CLI 或 AWS API 來手動刪除服務連結角色。若要執行此操作，您必須先手動刪除所有 MSK Connect 連接器，然後再手動刪除該角色。如需詳細資訊，請參閱《*IAM 使用者指南*》中的[刪除服務連結角色](https://docs.aws.amazon.com/IAM/latest/UserGuide/using-service-linked-roles.html#delete-service-linked-role)。

## MSK Connect 服務連結角色支援的區域
<a name="slr-regions"></a>

MSK Connect 支援在所有提供此服務的區域中使用服務連結角色。如需詳細資訊，請參閱 [AWS 區域與端點](https://docs.aws.amazon.com/general/latest/gr/rande.html)。

# 啟用 Amazon MSK Connect 的網際網路存取
<a name="msk-connect-internet-access"></a>

如果您的 Amazon MSK Connect 連接器需要存取網際網路，建議您使用下列 Amazon Virtual Private Cloud (VPC) 設定來啟用該存取。
+ 使用私有子網路來設定連接器。
+ 在公有子網路中為您的 VPC 建立公有 [NAT 閘道](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)或 [NAT 執行個體](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_NAT_Instance.html)。如需詳細資訊，請參閱《Amazon Virtual Private Cloud使用指南》****中的[使用 NAT 裝置將子網路連線到網際網路或其他 VPC](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html) 頁面。
+ 允許流量從私有子網路輸出至 NAT 閘道或執行個體。

# 設定 Amazon MSK Connect 的 NAT 閘道
<a name="msk-connect-internet-access-private-subnets-example"></a>

以下步驟會顯示設定 NAT 閘道以啟用連接器網際網路存取權的方法。您必須先完成這些步驟，才能在私有子網路中建立連接器。

## 完成設定 NAT 閘道的先決條件
<a name="msk-connect-internet-access-private-subnets-prereq"></a>

請確認您已具備以下物件。
+ 與您的叢集相關聯的 Amazon Virtual Private Cloud (VPC) ID。例如，*vpc-123456ab*。
+ VPC 中私有子網路的 ID。例如，*subnet-a1b2c3de*、*subnet-f4g5h6ij* 等。您必須使用私有子網路來設定連接器。

## 為連接器啟用網際網路存取的步驟
<a name="msk-connect-internet-access-private-subnets-steps"></a>

**啟用連接器的網際網路存取**

1. 在 https：//[https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 開啟 Amazon Virtual Private Cloud 主控台。

1. 使用描述性名稱為 NAT 閘道建立公有子網路，並記下子網路 ID。如需詳細說明，請參閱[在 VPC 中建立子網路](https://docs.aws.amazon.com/vpc/latest/userguide/working-with-vpcs.html#AddaSubnet)。

1. 建立網際網路閘道，讓您的 VPC 可和網際網路通訊，並記下閘道 ID。將網際網路閘道連接至 VPC。如需說明，請參閱[建立並連接網際網路閘道](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Attach_Gateway)。

1. 佈建公有 NAT 閘道，以便讓私有子網路中的主機可連線到您的公有子網路。在建立 NAT 閘道時，請選取您稍早建立的公有子網路。如需說明，請參閱[建立 NAT 閘道](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html#nat-gateway-creating)。

1. 設定路由表。您總共須有兩個路由表才能完成此設定。您應已經擁有與 VPC 同時自動建立的主路由表。在此步驟中，您可為公有子網路建立額外的路由表。

   1. 使用以下設定來修改 VPC 的主路由表，以便讓私有子網路將流量路由到 NAT 閘道。如需說明，請參閱《Amazon Virtual Private Cloud使用者指南》****中的[使用路由表](https://docs.aws.amazon.com/vpc/latest/userguide/WorkWithRouteTables.html)。  
**私有 MSKC 路由表**    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/msk/latest/developerguide/msk-connect-internet-access-private-subnets-example.html)

   1. 依照[建立自訂路由表](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Routing)中的說明，建立公有子網路的路由表。在建立該表時，請在**名稱標籤**欄位中輸入描述性名稱，以協助您識別與該表相關聯的子網路。例如，**Public MSKC**。

   1. 使用以下設定來設定您的 **Public MSKC** 路由表。  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/msk/latest/developerguide/msk-connect-internet-access-private-subnets-example.html)

# 了解私有 DNS 主機名稱
<a name="msk-connect-dns"></a>

藉由 MSK Connect 中的私有 DNS 主機名稱支援，您可以設定連接器來參照公有或私有網域名稱。支援會依據 VPC *DHCP 選項組*中指定的 DNS 伺服器而定。

DHCP 選項集是 EC2 執行個體在 VPC 中使用的一組網路組態，以透過虛擬網路進行通訊。每個 VPC 都具有預設 DHCP 選項集，但如果您要讓 VPC 中的執行個體使用不同的 DNS 伺服器 (而不是 Amazon 提供之 DNS 伺服器) 來進行網域名稱解析，則可以建立自訂 DHCP 選項集。請參閱 [Amazon VPC 中的 DHCP 選項集](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_DHCP_Options.html)。

在使用 MSK Connect 中納入私有 DNS 解析能力/功能之前，連接器會使用服務 VPC DNS 解析器來進行客戶連接器的 DNS 查詢。連接器並未使用客戶 VPC DHCP 選項集中定義的 DNS 伺服器進行 DNS 解析。

連接器僅能參考客戶連接器組態或外掛程式中可公開解析的主機名稱。它們無法解析在私有託管區域中定義的私有主機名稱，或在其他客戶網路中使用 DNS 伺服器。

若無私有 DNS，則選擇讓資料庫、資料倉儲和系統 (例如自身無法存取網際網路的 VPC 中的 Secrets Manager) 的客戶，將無法使用 MSK 連接器。客戶經常會使用私有 DNS 主機名稱來符合企業安全性狀態。

# 為您的連接器設定 VPC DHCP 選項集
<a name="msk-connect-dns-config-dhcp"></a>

連接器建立後，連接器會自動使用其 VPC DHCP 選項集中定義的 DNS 伺服器。在建立連接器之前，請確認已針對連接器的 DNS 主機名稱解析需求設定 VPC DHCP 選項集。

您在 MSK Connect 提供私有 DNS 主機名稱功能之前所建立的連接器，會繼續使用先前的 DNS 解析組態，且無需進行任何修改。

若您僅需連接器中的可公開解析 DNS 主機名稱解析，為了能更輕鬆地進行設定，建議您在建立連接器時使用帳戶的預設 VPC。如需有關 Amazon 提供之 DNS 伺服器或 Amazon Route 53 Resolver 的詳細資訊，請參閱《Amazon VPC 使用者指南》**中的 [Amazon DNS 伺服器](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-dns.html#AmazonDNS)。

若您需要解析私有 DNS 主機名稱，請確認連接器建立期間傳送的 VPC 已正確設定其 DHCP 選項。如需詳細資訊，請參閱《Amazon VPC 使用者指南》**中的[使用 DHCP 選項集](https://docs.aws.amazon.com/vpc/latest/userguide/DHCPOptionSet.html)。

在您為私有 DNS 主機名稱解析設定 DHCP 選項集時，請確認連接器可連接至您在 DHCP 選項集中設定的自訂 DNS 伺服器。否則，您的連接器建立會失敗。

在您自訂 VPC DHCP 選項集後，隨後在該 VPC 中建立的連接器會使用您在選項集中指定的 DNS 伺服器。若您在建立連接器後變更選項集，則連接器會在幾分鐘後採用新選項集中的設定。

# 設定 VPC 的 DNS 屬性
<a name="msk-connect-dns-attributes"></a>

請確認您已正確設定 VPC DNS 屬性，如《Amazon VPC 使用者指南》**中的 [VPC 的 DNS 屬性](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-dns.html#vpc-dns-support)和 [DNS 主機名稱](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-dns.html#vpc-dns-hostnames)所述。

請參閱《Amazon Route 53 開發人員指南》**中的[在 VPC 和您網路之間解析 DNS 查詢](https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/resolver.html)，了解有關使用傳入和傳出解析器端點將其他網路連接至 VPC 以與連接器搭配使用的資訊。

# 處理連接器建立失敗
<a name="msk-connect-dns-failure-handling"></a>

本節會描述可能發生的與 DNS 解析相關聯的連接器建立失敗，以及解決問題的建議行動。


| 失敗 | 建議動作 | 
| --- | --- | 
|  若 DNS 解析查詢失敗，或者無法連接器無法連線 DNS 伺服器，則連接器建立會失敗。  |  若您已為連接器設定這些日誌，則可在 CloudWatch 日誌中看到因 DNS 解析查詢失敗而導致的連接器建立失敗。 檢查 DNS 伺服器組態，並確保連接器與 DNS 伺服器之間的網路連線。  | 
|  若在連接器執行時變更 VPC DHCP 選項集中的 DNS 伺服器組態，則來自連接器的 DNS 解析查詢可能會失敗。若 DNS 解析失敗，部分連接器任務可能會進入失敗狀態。  |  若您已為連接器設定這些日誌，則可在 CloudWatch 日誌中看到因 DNS 解析查詢失敗而導致的連接器建立失敗。 失敗的任務應會自動重新啟動，以恢復連接器。若未發生此情況，您可以聯絡支援團隊以重新啟動連接器的失敗任務，或者您可重新建立連接器。  | 

# MSK Connect 的安全性
<a name="msk-connect-security"></a>

您可以使用採用 技術的介面 VPC 端點 AWS PrivateLink，以防止 Amazon VPC 和 Amazon MSK-Connect 相容 APIs 之間的流量離開 Amazon 網路。介面 VPC 端點不需要網際網路閘道、NAT 裝置、VPN 連線或 AWS Direct Connect 連線。如需詳細資訊，請參閱[搭配界面 VPC 端點使用 Amazon MSK APIs](privatelink-vpc-endpoints.md)。

# MSK Connect 的日誌
<a name="msk-connect-logging"></a>

MSK Connect 能寫入日誌事件，您可用來對連接器進行偵錯。在建立連接器時，您可以指定下列零個或多個日誌目的地：
+ Amazon CloudWatch Logs：您可以指定要 MSK Connect 向其傳送連接器日誌事件的日誌群組。如需詳細資訊，請參閱《CloudWatch Logs 使用者指南》** 中的[建立日誌群組](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html#Create-Log-Group)。
+ Amazon S3：您可以指定要 MSK Connect 向其傳送連接器日誌事件的 S3 儲存貯體。如需有關建立 S3 儲存貯體的詳細資訊，請參閱《Amazon S3 使用者指南》**中的[建立儲存貯體](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html)。
+ Amazon Data Firehose：您可以指定 MSK Connect 傳送連接器日誌事件的目標交付串流。如需有關如何建立交付串流的資訊，請參閱 [Firehose 使用者指南中的建立 Amazon Data Firehose 交付串流](https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html)。 **

若要進一步了解如何設定日誌，請參閱《Amazon CloudWatch Logs 使用者指南》**中的[啟用從 AWS 服務記錄日誌](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/AWS-logs-and-resource-policy.html)。

MSK Connect 會發出下列類型的日誌事件：


****  

| Level | Description | 
| --- | --- | 
| INFO | 啟動和關閉時需要關注的執行期事件。 | 
| WARN | 執行期狀況並非錯誤，但不想要或未預期該執行期狀況。 | 
| FATAL | 導致過早終止的嚴重錯誤。 | 
| ERROR | 不嚴重的意外狀況和執行期錯誤。 | 

以下是傳送至 CloudWatch Logs 的日誌事件範例：

```
[Worker-0bb8afa0b01391c41] [2021-09-06 16:02:54,151] WARN [Producer clientId=producer-1] Connection to node 1 (b-1.my-test-cluster.twwhtj.c2.kafka.us-east-1.amazonaws.com/INTERNAL_IP) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient:782)
```

## 避免秘密顯示在連接器日誌中
<a name="msk-connect-logging-secrets"></a>

**注意**  
如果外掛程式未將這些值定義為秘密，則敏感組態值可能會顯示在連接器日誌中。Kafka Connect 會將未定義的組態值視為與任何其他純文字值相同。

如果您的外掛程式將屬性定義為秘密，則 Kafka Connect 會在連接器日誌中遮蔽該屬性的值。例如，下列連接器日誌會示範若外掛程式將 `aws.secret.key` 定義為 `PASSWORD` 類型，則其值將被取代為 `[hidden]`。

```
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] [2022-01-11 15:18:55,150] INFO SecretsManagerConfigProviderConfig values:
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] aws.access.key = my_access_key
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] aws.region = us-east-1
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] aws.secret.key = [hidden]
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] secret.prefix =
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] secret.ttl.ms = 300000
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] (com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProviderConfig:361)
```

若要避免秘密出現在連接器日誌檔案中，外掛程式開發人員必須使用 Kafka Connect 列舉常數 [https://kafka.apache.org/27/javadoc/org/apache/kafka/common/config/ConfigDef.Type.html#PASSWORD](https://kafka.apache.org/27/javadoc/org/apache/kafka/common/config/ConfigDef.Type.html#PASSWORD) 來定義敏感屬性。當屬性是 `ConfigDef.Type.PASSWORD` 類型時，即使該值以純文字形式傳送，Kafka Connect 也會從連接器日誌中排除其值。

# 監控 Amazon MSK Connect
<a name="mkc-monitoring-overview"></a>

監控是維護 MSK Connect 和其他 AWS 解決方案可靠性、可用性和效能的重要部分。Amazon CloudWatch AWS 會即時監控您的 AWS 資源和您在 上執行的應用程式。您可以收集和追蹤指標、建立自訂儀板表，以及設定警示，在特定指標達到您指定的閾值時通知您或採取動作。例如，您可以讓 CloudWatch 追蹤 CPU 使用率或其他連接器指標，以便在需要時增加其容量。如需更多資訊，請參閱 [Amazon CloudWatch 使用者指南](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/)。

您可以使用下列 API 操作：
+ `DescribeConnectorOperation`：監控連接器更新操作的狀態。
+ `ListConnectorOperations`：追蹤先前在連接器上執行的更新。

下表顯示 MSK Connect 在 `ConnectorName` 維度下傳送至 CloudWatch 的指標。MSK Connect 預設會免費傳送這些指標。CloudWatch 會保留這些指標 15 個月，以便您存取歷史資訊，並更加了解您的連線器執行情況。您也可以設定留意特定閾值的警示，當滿足這些閾值時傳送通知或採取動作。如需詳細資訊，請參閱 [Amazon CloudWatch 使用者指南](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/)。


| 指標名稱 | Description | 
| --- | --- | 
| CpuUtilization | CPU 使用率 (依系統和使用者)。 | 
| ErroredTaskCount | 錯誤的任務數。 | 
| MemoryUtilization | 工作程序執行個體的總記憶體百分比，而非僅是目前使用中的 Java 虛擬機器 (JVM) 堆積記憶體。JVM 通常不會釋放記憶體使其回到作業系統。因此，JVM 堆積大小 (MemoryUtilization) 通常會從堆積大小下限開始，然後逐漸增加穩定至約上限的 80-90%。JVM 堆積使用量可能會隨連接器的實際記憶體使用量變更而增加或減少。 | 
| RebalanceCompletedTotal | 此連接器完成的重新平衡總數。 | 
| RebalanceTimeAvg | 連接器重新平衡所花費的平均時間 (毫秒)。 | 
| RebalanceTimeMax | 連接器重新平衡所花費的最大時間 (毫秒)。 | 
| RebalanceTimeSinceLast |  自此連接器完成最近重新平衡之後的時間 (毫秒)。  | 
| RunningTaskCount | 連接器中執行的任務數。 | 
| SinkConsumerByteRate | Kafka Connect 架構的接收器取用者在將任何轉換套用至資料之前，每秒平均耗用的位元組數。 | 
| SinkRecordReadRate | 從 Apache Kafka 或 Amazon MSK 叢集平均每秒讀取的記錄數。 | 
| SinkRecordSendRate | 平均每秒從轉換輸出並傳送至目的地的記錄數。此數字不包含已篩選的記錄。 | 
| SourceRecordPollRate | 平均每秒產生或輪詢的記錄數。 | 
| SourceProducerByteRate | 將任何轉換套用至資料後，Kafka Connect 架構的來源生產者每秒產生的平均位元組數。 | 
| SourceRecordWriteRate | 平均每秒從輸出轉換並寫入 Apache Kafka 或 Amazon MSK 叢集的記錄數。 | 
| TaskStartupAttemptsTotal | 連接器嘗試啟動的任務總數。您可以使用此指標來識別任務啟動嘗試中的異常情況。 | 
| TaskStartupSuccessPercentage | 連接器成功啟動任務的平均百分比。您可以使用此指標來識別任務啟動嘗試中的異常情況。 | 
| WorkerCount | 連接器中執行的工作程序數目。 | 
| BytesInPerSec | 傳輸到 Kafka Connect 架構的中繼資料位元組，以便在工作者之間進行通訊。 | 
| BytesOutPerSec | 從 Kafka Connect 架構傳輸的中繼資料位元組，用於工作者之間的通訊。 | 

# 設定 Amazon MSK Connect 資源的範例
<a name="msk-connect-examples"></a>

本節包含的範例有助您設定 Amazon MSK Connect 資源，例如常見的第三方連接器和組態供應商。

**Topics**
+ [設定 Amazon S3 接收器連接器](mkc-S3sink-connector-example.md)
+ [設定 MSK Connect 的 EventBridge Kafka 接收器連接器](mkc-eventbridge-kafka-connector.md)
+ [搭配組態提供者使用 Debezium 來源連接器](mkc-debeziumsource-connector-example.md)

# 設定 Amazon S3 接收器連接器
<a name="mkc-S3sink-connector-example"></a>

此範例示範如何使用 Confluent [Amazon S3 接收器連接器](https://www.confluent.io/hub/confluentinc/kafka-connect-s3)和 AWS CLI 在 MSK Connect 中建立 Amazon S3 接收器連接器。

1. 複製以下 JSON 並貼到新檔案中。使用對應至 Amazon MSK 叢集的引導程序伺服器連線字串以及叢集的子網路和安全群組 ID 的值來取代預留位置字串。如需有關如何設定服務執行角色的詳細資訊，請參閱 [MSK Connect 的 IAM 角色和政策](msk-connect-iam.md)。

   ```
   {
       "connectorConfiguration": {
           "connector.class": "io.confluent.connect.s3.S3SinkConnector",
           "s3.region": "us-east-1",
           "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
           "flush.size": "1",
           "schema.compatibility": "NONE",
           "topics": "my-test-topic",
           "tasks.max": "2",
           "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
           "storage.class": "io.confluent.connect.s3.storage.S3Storage",
           "s3.bucket.name": "amzn-s3-demo-bucket"
       },
       "connectorName": "example-S3-sink-connector",
       "kafkaCluster": {
           "apacheKafkaCluster": {
               "bootstrapServers": "<cluster-bootstrap-servers-string>",
               "vpc": {
                   "subnets": [
                       "<cluster-subnet-1>",
                       "<cluster-subnet-2>",
                       "<cluster-subnet-3>"
                   ],
                   "securityGroups": ["<cluster-security-group-id>"]
               }
           }
       },
       "capacity": {
           "provisionedCapacity": {
               "mcuCount": 2,
               "workerCount": 4
           }
       },
       "kafkaConnectVersion": "2.7.1",
       "serviceExecutionRoleArn": "<arn-of-a-role-that-msk-connect-can-assume>",
       "plugins": [
           {
               "customPlugin": {
                   "customPluginArn": "<arn-of-custom-plugin-that-contains-connector-code>",
                   "revision": 1
               }
           }
       ],
       "kafkaClusterEncryptionInTransit": {"encryptionType": "PLAINTEXT"},
       "kafkaClusterClientAuthentication": {"authenticationType": "NONE"}
   }
   ```

1. 在您在上一個步驟中儲存 JSON 檔案的資料夾中執行下列 AWS CLI 命令。

   ```
   aws kafkaconnect create-connector --cli-input-json file://connector-info.json
   ```

   以下是成功執行命令時所得到的輸出範例。

   ```
   {
       "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-S3-sink-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", 
       "ConnectorState": "CREATING", 
       "ConnectorName": "example-S3-sink-connector"
   }
   ```

# 設定 MSK Connect 的 EventBridge Kafka 接收器連接器
<a name="mkc-eventbridge-kafka-connector"></a>

本主題說明如何設定 MSK Connect 的 [EventBridge Kafka 接收器連接器](https://github.com/awslabs/eventbridge-kafka-connector)。此連接器可讓您將事件從 MSK 叢集傳送至 EventBridge [事件匯流排](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-bus.html)。本主題說明建立必要資源和設定連接器以啟用 Kafka 和 EventBridge 之間無縫資料流程的程序。

**Topics**
+ [先決條件](#mkc-eb-kafka-prerequisites)
+ [設定 MSK Connect 所需的資源](#mkc-eb-kafka-set-up-resources)
+ [建立連接器](#mkc-eb-kafka-create-connector)
+ [傳送訊息至 Kafka](#mkc-eb-kafka-send-json-encoded-messages)

## 先決條件
<a name="mkc-eb-kafka-prerequisites"></a>

部署連接器之前，請確定您有下列資源：
+ **Amazon MSK 叢集**：用於產生和使用 Kafka 訊息的作用中 MSK 叢集。
+ **Amazon EventBridge 事件匯流排**：接收 Kafka 主題事件的 EventBridge 事件匯流排。
+ **IAM 角色**：建立具有 MSK Connect 和 EventBridge 連接器必要許可的 IAM 角色。
+ 從 MSK Connect 或 MSK 叢集的 [VPC 和子網路中建立的 EventBridge VPC 介面端點](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-related-service-vpc.html)[存取公有網際網路](msk-connect-internet-access.md)。 EventBridge 這可協助您避免周遊公有網際網路，無需 NAT 閘道。
+ [用戶端機器](create-serverless-cluster-client.md)，例如 Amazon EC2 執行個體或 [AWS CloudShell](https://aws.amazon.com/cloudshell/)，用於建立主題並將記錄傳送至 Kafka。

## 設定 MSK Connect 所需的資源
<a name="mkc-eb-kafka-set-up-resources"></a>

您可以為連接器建立 IAM 角色，然後建立連接器。您也可以建立 EventBridge 規則來篩選傳送至 EventBridge 事件匯流排的 Kafka 事件。

**Topics**
+ [連接器的 IAM 角色](#mkc-eb-kafka-iam-role-connector)
+ [傳入事件的 EventBridge 規則](#mkc-eb-kafka-create-rule)

### 連接器的 IAM 角色
<a name="mkc-eb-kafka-iam-role-connector"></a>

您與連接器關聯的 IAM 角色必須具有 [PutEvents](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-permissions-reference.html) 許可，才能允許將事件傳送至 EventBridge。下列 IAM 政策範例會授予您許可，以將事件傳送至名為 的事件匯流排`example-event-bus`。請務必使用事件匯流排的 ARN 取代下列範例中的資源 ARN。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "events:PutEvents"
      ],
      "Resource": "arn:aws:events:us-east-1:123456789012:event-bus/example-event-bus"
    }
  ]
}
```

------

此外，您必須確定連接器的 IAM 角色包含下列信任政策。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
```

------

### 傳入事件的 EventBridge 規則
<a name="mkc-eb-kafka-create-rule"></a>

您可以建立符合傳入事件與事件資料條件的[規則](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rules.html)，稱為[https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html)。使用事件模式，您可以定義篩選條件傳入事件的條件，並判斷哪些事件應觸發特定規則，然後路由至指定的[目標](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html)。下列事件模式範例符合傳送至 EventBridge 事件匯流排的 Kafka 事件。

```
{
  "detail": {
    "topic": ["msk-eventbridge-tutorial"]
  }
}
```

以下是使用 Kafka 接收器連接器從 Kafka 傳送至 EventBridge 的事件範例。

```
{
  "version": "0",
  "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c57",
  "account": "123456789012",
  "time": "2025-03-26T10:15:00Z",
  "region": "us-east-1",
  "detail-type": "msk-eventbridge-tutorial",
  "source": "kafka-connect.msk-eventbridge-tutorial",
  "resources": [],
  "detail": {
    "topic": "msk-eventbridge-tutorial",
    "partition": 0,
    "offset": 0,
    "timestamp": 1742984100000,
    "timestampType": "CreateTime",
    "headers": [],
    "key": "order-1",
    "value": {
      "orderItems": [
        "item-1",
        "item-2"
      ],
      "orderCreatedTime": "Wed Mar 26 10:15:00 UTC 2025"
    }
  }
}
```

在 EventBridge 主控台中，使用此範例模式在事件匯流排上[建立規則](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-create-rule.html)，並指定目標，例如 CloudWatch Logs 群組。EventBridge 主控台會自動設定 CloudWatch Logs 群組的必要存取政策。

## 建立連接器
<a name="mkc-eb-kafka-create-connector"></a>

在下一節中，您會使用 建立和部署 [EventBridge Kafka 接收器連接器](https://github.com/awslabs/eventbridge-kafka-connector) AWS 管理主控台。

**Topics**
+ [步驟 1：下載連接器](#mkc-eb-kafka-download-connector)
+ [步驟 2：建立 Amazon S3 儲存貯體](#mkc-eb-kafka-s3-bucket-create)
+ [步驟 3：在 MSK Connect 中建立外掛程式](#mkc-eb-kafka-create-plugin)
+ [步驟 4：建立連接器](#mkc-eb-kafka-create-connector)

### 步驟 1：下載連接器
<a name="mkc-eb-kafka-download-connector"></a>

從 EventBridge Kafka 連接器的 [GitHub 版本頁面](https://github.com/awslabs/eventbridge-kafka-connector/releases)下載最新的 EventBridge 連接器接收器 JAR。例如，若要下載版本 v1.4.1，請選擇 JAR 檔案連結 `kafka-eventbridge-sink-with-dependencies.jar`來下載連接器。然後，將檔案儲存到機器上偏好的位置。

### 步驟 2：建立 Amazon S3 儲存貯體
<a name="mkc-eb-kafka-s3-bucket-create"></a>

1. 若要將 JAR 檔案存放在 Amazon S3 中以搭配 MSK Connect 使用，請開啟 AWS 管理主控台，然後選擇 Amazon S3。

1. 在 Amazon S3 主控台中，選擇**建立儲存貯**體，然後輸入唯一的儲存貯體名稱。例如 **amzn-s3-demo-bucket1-eb-connector**。

1. 為您的 Amazon S3 儲存貯體選擇適當的區域。請確定其與您的 MSK 叢集部署所在的區域相符。

1. 對於**儲存貯體設定**，保留預設選項或視需要調整。

1. 選擇**建立儲存貯體**。

1. 將 JAR 檔案上傳至 Amazon S3 儲存貯體。

### 步驟 3：在 MSK Connect 中建立外掛程式
<a name="mkc-eb-kafka-create-plugin"></a>

1. 開啟 AWS 管理主控台，然後導覽至 **MSK Connect**。

1. 在左側導覽窗格中，選擇**自訂外掛程式**。

1. 選擇**建立外掛程式**，然後輸入**外掛程式名稱**。例如 **eventbridge-sink-plugin**。

1. 針對**自訂外掛程式位置**，貼上 **S3 物件 URL**。

1. 新增外掛程式的選用描述。

1. 選擇**建立外掛程式**。

外掛程式建立後，您可以使用它在 MSK Connect 中設定和部署 EventBridge Kafka 連接器。

### 步驟 4：建立連接器
<a name="mkc-eb-kafka-create-connector"></a>

在建立連接器之前，建議您建立必要的 Kafka 主題，以避免連接器錯誤。若要建立 主題，請使用您的用戶端機器。

1. 在 MSK 主控台的左側窗格中，選擇**連接器**，然後選擇**建立連接器**。

1. 在外掛程式清單中，選擇 **eventbridge-sink-plugin**，然後選擇**下一步**。

1. 針對連接器名稱，輸入 **EventBridgeSink**。

1. 在叢集清單中，選擇您的 MSK 叢集。

1. <a name="connector-ex"></a>複製下列連接器組態，並將其貼到**連接器組態**欄位中

   視需要取代下列組態中的預留位置。
   + `aws.eventbridge.endpoint.uri` 如果您的 MSK 叢集具有公有網際網路存取，請移除 。
   + 如果您使用 PrivateLink 從 MSK 安全地連線至 EventBridge，請將 之後的 DNS 部分取代為您先前建立的 EventBridge 的 （選用） VPC 界面端點的`https://`正確私有 DNS 名稱。
   + 將下列組態中的 EventBridge 事件匯流排 ARN 取代為事件匯流排的 ARN。
   + 更新任何區域特定的值。

   ```
   {
     "connector.class": "software.amazon.event.kafkaconnector.EventBridgeSinkConnector",
     "aws.eventbridge.connector.id": "msk-eventbridge-tutorial",
     "topics": "msk-eventbridge-tutorial",
     "tasks.max": "1",
     "aws.eventbridge.endpoint.uri": "https://events.us-east-1.amazonaws.com",
     "aws.eventbridge.eventbus.arn": "arn:aws:events:us-east-1:123456789012:event-bus/example-event-bus",
     "value.converter.schemas.enable": "false",
     "value.converter": "org.apache.kafka.connect.json.JsonConverter",
     "aws.eventbridge.region": "us-east-1",
     "auto.offset.reset": "earliest",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter"
   }
   ```

   如需連接器組態的詳細資訊，請參閱 [eventbridge-kafka-connector](https://github.com/awslabs/eventbridge-kafka-connector)。

   如有需要，請變更工作者和自動調整規模的設定。我們也建議使用下拉式清單中最新的可用 （建議） Apache Kafka Connect 版本。在**存取許可**下，使用先前建立的角色。我們也建議啟用 CloudWatch 的記錄功能，以提供可觀測性和故障診斷。根據您的需求調整其他選用設定，例如標籤。然後，部署連接器並等待狀態進入執行中狀態。

## 傳送訊息至 Kafka
<a name="mkc-eb-kafka-send-json-encoded-messages"></a>

您可以使用 指定不同的轉換器，並選擇性地指定 Kafka Connect 中可用的`key.converter`設定，以設定訊息編碼，例如 Apache Avro `value.converter`和 JSON。

本主題[connector example](#connector-ex)中的 設定為使用 JSON 編碼的訊息，如使用 `org.apache.kafka.connect.json.JsonConverter`的 所示`value converter`。當連接器處於執行中狀態時，請從用戶端機器傳送記錄至 `msk-eventbridge-tutorial` Kafka 主題。

# 搭配組態提供者使用 Debezium 來源連接器
<a name="mkc-debeziumsource-connector-example"></a>

此範例顯示如何使用具有與 MySQL 相容 [Amazon Aurora](https://aws.amazon.com/rds/aurora/) 資料庫的 Debezium MySQL 連接器外掛程式做為來源。在此範例中，我們也會設定開放原始碼 [AWS Secrets Manager Config Provider](https://github.com/jcustenborder/kafka-config-provider-aws)，以在 AWS Secrets Manager中將資料庫憑證外部化。如需有關組態供應商的詳細資訊，請參閱 [教學課程：使用組態提供者將敏感資訊外部化](msk-connect-config-provider.md)。

**重要**  
Debezium MySQL 連接器外掛程式[僅支援一個任務](https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-tasks-max)，不適用於 Amazon MSK Connect 的自動擴展容量模式。您應改用佈建容量模式，並在連接器組態中將 `workerCount` 設定為等於一。如需有關 MSK Connect 容量模式的詳細資訊，請參閱 [了解連接器容量](msk-connect-capacity.md)。

# 使用 Debezium 來源連接器的完整先決條件
<a name="mkc-debeziumsource-connector-example-prereqs"></a>

您的連接器必須能夠存取網際網路，以便與 AWS Secrets Manager 以外的服務互動 Amazon Virtual Private Cloud。本節中的步驟有助您完成以下任務，以啟用網際網路存取。
+ 設定託管 NAT 閘道的公有子網路，並將流量路由至 VPC 中的網際網路閘道。
+ 建立預設路由，將您的私有子網路流量導向 NAT 閘道。

如需詳細資訊，請參閱[啟用 Amazon MSK Connect 的網際網路存取](msk-connect-internet-access.md)。

**先決條件**

您需先準備好以下事項，才可啟用網際網路存取：
+ 與您的叢集相關聯的 Amazon Virtual Private Cloud (VPC) ID。例如，*vpc-123456ab*。
+ VPC 中私有子網路的 ID。例如，*subnet-a1b2c3de*、*subnet-f4g5h6ij* 等。您必須使用私有子網路來設定連接器。

**啟用連接器的網際網路存取**

1. 在 https：//[https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 開啟 Amazon Virtual Private Cloud 主控台。

1. 使用描述性名稱為 NAT 閘道建立公有子網路，並記下子網路 ID。如需詳細說明，請參閱[在 VPC 中建立子網路](https://docs.aws.amazon.com/vpc/latest/userguide/working-with-vpcs.html#AddaSubnet)。

1. 建立網際網路閘道，讓您的 VPC 可和網際網路通訊，並記下閘道 ID。將網際網路閘道連接至 VPC。如需說明，請參閱[建立並連接網際網路閘道](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Attach_Gateway)。

1. 佈建公有 NAT 閘道，以便讓私有子網路中的主機可連線到您的公有子網路。在建立 NAT 閘道時，請選取您稍早建立的公有子網路。如需說明，請參閱[建立 NAT 閘道](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html#nat-gateway-creating)。

1. 設定路由表。您總共須有兩個路由表才能完成此設定。您應已經擁有與 VPC 同時自動建立的主路由表。在此步驟中，您可為公有子網路建立額外的路由表。

   1. 使用以下設定來修改 VPC 的主路由表，以便讓私有子網路將流量路由到 NAT 閘道。如需說明，請參閱《Amazon Virtual Private Cloud使用者指南》****中的[使用路由表](https://docs.aws.amazon.com/vpc/latest/userguide/WorkWithRouteTables.html)。  
**私有 MSKC 路由表**    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/msk/latest/developerguide/mkc-debeziumsource-connector-example-prereqs.html)

   1. 依照[建立自訂路由表](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Routing)中的說明，建立公有子網路的路由表。在建立該表時，請在**名稱標籤**欄位中輸入描述性名稱，以協助您識別與該表相關聯的子網路。例如，**Public MSKC**。

   1. 使用以下設定來設定您的 **Public MSKC** 路由表。  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/msk/latest/developerguide/mkc-debeziumsource-connector-example-prereqs.html)

現在已為 Amazon MSK Connect 啟用網際網路存取，您可以建立連接器了。

# 建立 Debezium 來源連接器
<a name="msk-connect-debeziumsource-connector-example-steps"></a>

此程序說明如何建立 Debezium 來源連接器。

1. 

**建立自訂外掛程式**

   1. 從 [Debezium](https://debezium.io/releases/) 網站下載 MySQL 連線器外掛程式的最新穩定版本。請記下您下載的 Debezium 發行版本 (版本 2.x 或較舊的 1.x 系列)。稍後在此程序中，您將根據您的 Debezium 版本建立連接器。

   1. 下載並解壓縮 [AWS Secrets Manager Config Provider](https://www.confluent.io/hub/jcustenborder/kafka-config-provider-aws)。

   1. 將以下封存放入相同的目錄中：
      + `debezium-connector-mysql` 資料夾
      + `jcusten-border-kafka-config-provider-aws-0.1.1` 資料夾

   1. 將您在上一個步驟中建立的目錄壓縮為 ZIP 檔案，然後將該 ZIP 檔案上傳至 S3 儲存貯體。如需說明，請參閱《Amazon S3 使用者指南》**中的[上傳物件](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html)。

   1. 複製以下 JSON 並貼到檔案中。例如 `debezium-source-custom-plugin.json`。將 *<example-custom-plugin-name>* 取代為您希望外掛程式擁有的名稱、將 *<amzn-s3-demo-bucket-arn>* 取代為您上傳 ZIP 檔案之 Amazon S3 儲存貯體的 ARN，以及`<file-key-of-ZIP-object>`將 ZIP 物件的檔案金鑰取代為您上傳至 S3 的 ZIP 物件。

      ```
      {
          "name": "<example-custom-plugin-name>",
          "contentType": "ZIP",
          "location": {
              "s3Location": {
                  "bucketArn": "<amzn-s3-demo-bucket-arn>",
                  "fileKey": "<file-key-of-ZIP-object>"
              }
          }
      }
      ```

   1. 從您儲存 JSON 檔案的資料夾執行下列 AWS CLI 命令，以建立外掛程式。

      ```
      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>
      ```

      您應該會看到類似以下範例的輸出。

      ```
      {
          "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1",
          "CustomPluginState": "CREATING",
          "Name": "example-custom-plugin-name",
          "Revision": 1
      }
      ```

   1. 執行以下命令來檢查外掛程式狀態。狀態應從 `CREATING` 變更為 `ACTIVE`。使用您在上一個命令輸出中獲得的 ARN 來取代 ARN 預留位置。

      ```
      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
      ```

1. 

**為您的資料庫登入資料設定 AWS Secrets Manager 和建立秘密**

   1. 前往以下位置開啟機密管理員控制台：[https://console.aws.amazon.com/secretsmanager/](https://console.aws.amazon.com/secretsmanager/)。

   1. 建立新秘密以存放您的資料庫登入憑證。如需說明，請參閱《AWS Secrets Manager使用者指南》**中的[建立秘密](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_create-basic-secret.html)。

   1. 複製您秘密的 ARN。

   1. 將以下範例政策中的 Secrets Manager 許可新增至您的 [了解服務執行角色](msk-connect-service-execution-role.md)。使用您秘密的 ARN 來取代 *<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>*。

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [
          {
            "Effect": "Allow",
            "Action": [
              "secretsmanager:GetResourcePolicy",
              "secretsmanager:GetSecretValue",
              "secretsmanager:DescribeSecret",
              "secretsmanager:ListSecretVersionIds"
            ],
            "Resource": [
            "arn:aws:secretsmanager:us-east-1:123456789012:secret:MySecret-1234"
            ]
          }
        ]
      }
      ```

------

      如需有關如何新增 IAM 許可的說明，請參閱《IAM 使用者指南》**中的[新增和移除 IAM 身分許可](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。

1. 

**使用您組態供應商的資訊來建立自訂工作程序組態**

   1. 將以下工作程序組態屬性複製到檔案中，並使用對應至您案例的值來取代預留位置字串。如需有關 AWS Secrets Manager Config Provider 組態屬性的詳細資訊，請參閱外掛程式文件中的 [SecretsManagerConfigProvider](https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-config-provider-aws/configProviders/SecretsManagerConfigProvider.html)。

      ```
      key.converter=<org.apache.kafka.connect.storage.StringConverter>
      value.converter=<org.apache.kafka.connect.storage.StringConverter>
      config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
      config.providers=secretManager
      config.providers.secretManager.param.aws.region=<us-east-1>
      ```

   1. 執行下列 AWS CLI 命令來建立您的自訂工作者組態。

      取代以下的值：
      + *<my-worker-config-name>* - 自訂工作程序組態的描述性名稱
      + *<encoded-properties-file-content-string>* - 您在上一個步驟中複製的 base64 編碼版本純文字屬性。

      ```
      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
      ```

1. 

**建立連接器**

   1. 複製以下對應至您 Debezium 版本 (2.x 或 1.x) 的 JSON，然後將其貼至新檔案中。使用對應至您案例的值來取代 `<placeholder>` 字串。如需有關如何設定服務執行角色的詳細資訊，請參閱 [MSK Connect 的 IAM 角色和政策](msk-connect-iam.md)。

      請注意，組態會使用類似 `${secretManager:MySecret-1234:dbusername}` 的變數 (而非純文字) 來指定資料庫憑證。使用您秘密的名稱來取代 `MySecret-1234`，然後加入您要擷取的索引鍵名稱。您也必須使用自訂工作程序組態的 ARN 來取代 `<arn-of-config-provider-worker-configuration>`。

------
#### [ Debezium 2.x ]

      針對 Debezium 2.x 版本，請複製以下 JSON 並將其貼至新檔案中。使用對應至您案例的值來取代 *<placeholder>* 字串。

      ```
      {
      	"connectorConfiguration": {
      		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
      		"tasks.max": "1",
      		"database.hostname": "<aurora-database-writer-instance-endpoint>",
      		"database.port": "3306",
      		"database.user": "<${secretManager:MySecret-1234:dbusername}>",
      		"database.password": "<${secretManager:MySecret-1234:dbpassword}>",
      		"database.server.id": "123456",
      		"database.include.list": "<list-of-databases-hosted-by-specified-server>",
      		"topic.prefix": "<logical-name-of-database-server>",
      		"schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>",
      		"schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>",
      		"schema.history.internal.consumer.security.protocol": "SASL_SSL",
      		"schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM",
      		"schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"schema.history.internal.producer.security.protocol": "SASL_SSL",
      		"schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM",
      		"schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"include.schema.changes": "true"
      	},
      	"connectorName": "example-Debezium-source-connector",
      	"kafkaCluster": {
      		"apacheKafkaCluster": {
      			"bootstrapServers": "<cluster-bootstrap-servers-string>",
      			"vpc": {
      				"subnets": [
      					"<cluster-subnet-1>",
      					"<cluster-subnet-2>",
      					"<cluster-subnet-3>"
      				],
      				"securityGroups": ["<id-of-cluster-security-group>"]
      			}
      		}
      	},
      	"capacity": {
      		"provisionedCapacity": {
      			"mcuCount": 2,
      			"workerCount": 1
      		}
      	},
      	"kafkaConnectVersion": "2.7.1",
      	"serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>",
      	"plugins": [{
      		"customPlugin": {
      			"customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>",
      			"revision": 1
      		}
      	}],
      	"kafkaClusterEncryptionInTransit": {
      		"encryptionType": "TLS"
      	},
      	"kafkaClusterClientAuthentication": {
      		"authenticationType": "IAM"
      	},
      	"workerConfiguration": {
      		"workerConfigurationArn": "<arn-of-config-provider-worker-configuration>",
      		"revision": 1
      	}
      }
      ```

------
#### [ Debezium 1.x ]

      針對 Debezium 1.x 版本，請複製以下 JSON 並將其貼至新檔案中。使用對應至您案例的值來取代 *<placeholder>* 字串。

      ```
      {
      	"connectorConfiguration": {
      		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
      		"tasks.max": "1",
      		"database.hostname": "<aurora-database-writer-instance-endpoint>",
      		"database.port": "3306",
      		"database.user": "<${secretManager:MySecret-1234:dbusername}>",
      		"database.password": "<${secretManager:MySecret-1234:dbpassword}>",
      		"database.server.id": "123456",
      		"database.server.name": "<logical-name-of-database-server>",
      		"database.include.list": "<list-of-databases-hosted-by-specified-server>",
      		"database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>",
      		"database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>",
      		"database.history.consumer.security.protocol": "SASL_SSL",
      		"database.history.consumer.sasl.mechanism": "AWS_MSK_IAM",
      		"database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"database.history.producer.security.protocol": "SASL_SSL",
      		"database.history.producer.sasl.mechanism": "AWS_MSK_IAM",
      		"database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"include.schema.changes": "true"
      	},
      	"connectorName": "example-Debezium-source-connector",
      	"kafkaCluster": {
      		"apacheKafkaCluster": {
      			"bootstrapServers": "<cluster-bootstrap-servers-string>",
      			"vpc": {
      				"subnets": [
      					"<cluster-subnet-1>",
      					"<cluster-subnet-2>",
      					"<cluster-subnet-3>"
      				],
      				"securityGroups": ["<id-of-cluster-security-group>"]
      			}
      		}
      	},
      	"capacity": {
      		"provisionedCapacity": {
      			"mcuCount": 2,
      			"workerCount": 1
      		}
      	},
      	"kafkaConnectVersion": "2.7.1",
      	"serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>",
      	"plugins": [{
      		"customPlugin": {
      			"customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>",
      			"revision": 1
      		}
      	}],
      	"kafkaClusterEncryptionInTransit": {
      		"encryptionType": "TLS"
      	},
      	"kafkaClusterClientAuthentication": {
      		"authenticationType": "IAM"
      	},
      	"workerConfiguration": {
      		"workerConfigurationArn": "<arn-of-config-provider-worker-configuration>",
      		"revision": 1
      	}
      }
      ```

------

   1. 在您在上一個步驟中儲存 JSON 檔案的資料夾中執行下列 AWS CLI 命令。

      ```
      aws kafkaconnect create-connector --cli-input-json file://connector-info.json
      ```

      以下是成功執行命令時所得到的輸出範例。

      ```
      {
          "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", 
          "ConnectorState": "CREATING", 
          "ConnectorName": "example-Debezium-source-connector"
      }
      ```

# 更新 Debezium 連接器組態
<a name="mkc-debeziumsource-connector-update"></a>

若要更新 Debezium 連接器的組態，請遵循下列步驟：

1. 複製下列 JSON 並將其貼到新檔案。使用對應至您案例的值來取代 `<placeholder>` 字串。

   ```
   {
      "connectorArn": <connector_arn>,
      "connectorConfiguration": <new_configuration_in_json>,
      "currentVersion": <current_version>
   }
   ```

1. 在您在上一個步驟中儲存 JSON 檔案的資料夾中執行下列 AWS CLI 命令。

   ```
   aws kafkaconnect update-connector --cli-input-json file://connector-info.json
   ```

   以下是您成功執行 命令時的輸出範例。

   ```
   {
       "connectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2",
       "connectorOperationArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector-operation/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2/41b6ad56-3184-479b-850a-a8bedd5a02f3",
       "connectorState": "UPDATING"
   }
   ```

1. 您現在可以執行下列命令來監控操作的目前狀態：

   ```
   aws kafkaconnect describe-connector-operation --connector-operation-arn <operation_arn>
   ```

如需具有詳細步驟的 Debezium 連接器範例，請參閱 [Introducing Amazon MSK Connect - Stream Data to and from Your Apache Kafka Clusters Using Managed Connectors](https://aws.amazon.com/blogs/aws/introducing-amazon-msk-connect-stream-data-to-and-from-your-apache-kafka-clusters-using-managed-connectors/)。

# 遷移至 Amazon MSK Connect
<a name="msk-connect-migrating"></a>

本節說明如何將 Apache Kafka 連接器應用程式遷移至 Amazon Managed Streaming for Apache Kafka Connect (Amazon MSK Connect)。若要進一步了解遷移至 Amazon MSK Connect 的好處，請參閱 [使用 Amazon MSK Connect 的優勢](msk-connect.md#msk-connect-benefits)。

本節也說明 Kafka Connect 和 Amazon MSK Connect 使用的狀態管理主題，並涵蓋遷移來源和接收器連接器的程序。

# 了解 Kafka Connect 使用的內部主題
<a name="msk-connect-kafka-connect-topics"></a>

在分散式模式下執行的 Apache Kafka Connect 應用程式會使用 Kafka 叢集和群組成員資格中的內部主題來存放其狀態。以下是對應至 Kafka Connect 應用程式所用內部主題的組態值：
+ 組態主題，透過 指定 `config.storage.topic`

  在組態主題中，Kafka Connect 會儲存使用者啟動之所有連接器和任務的組態。每次使用者更新連接器的組態，或當連接器請求重新設定 （例如，連接器偵測到可以啟動更多任務） 時，都會將記錄發送到此主題。此主題已啟用壓縮，因此一律會保持每個實體的最後一個狀態。
+ 位移主題，透過 指定 `offset.storage.topic`

  在偏移主題中，Kafka Connect 會存放來源連接器的偏移。如同組態主題，偏移主題已啟用壓縮。本主題僅用於將來源位置寫入從外部系統產生資料的來源連接器。從 Kafka 讀取資料並傳送至外部系統的接收器連接器，會使用一般 Kafka 取用者群組來存放取用者位移。
+ 狀態主題，透過 指定 `status.storage.topic`

  在狀態主題中，Kafka Connect 會儲存連接器和任務的目前狀態。此主題用作 REST API 使用者所查詢資料的中心位置。本主題允許使用者查詢任何工作者，但仍取得所有執行中外掛程式的狀態。如同組態和偏移主題，狀態主題也會啟用壓縮。

除了這些主題之外，Kafka Connect 還廣泛使用 Kafka 的群組成員資格 API。群組會以連接器名稱命名。例如，對於名為 file-sink 的連接器，群組名為 connect-file-sink。群組中的每個取用者都會將記錄提供給單一任務。您可以使用一般取用者群組工具擷取這些群組及其位移，例如 `Kafka-consumer-group.sh`。對於每個接收器連接器，Connect 執行時間會執行從 Kafka 擷取記錄的一般取用者群組。

# Amazon MSK Connect 應用程式的狀態管理
<a name="msk-connect-state-management"></a>

根據預設，Amazon MSK Connect 會在 Kafka 叢集中為每個 Amazon MSK Connector 建立三個不同的主題，以存放連接器的組態、位移和狀態。預設主題名稱的結構如下：
+ \$1\$1msk\$1connect\$1configs\$1*connector-name*\$1*connector-id*
+ \$1\$1msk\$1connect\$1status\$1*connector-name*\$1*connector-id*
+ \$1\$1msk\$1connect\$1offsets\$1*connector-name*\$1*connector-id*

**注意**  
若要在來源連接器之間提供位移連續性，您可以使用您選擇的位移儲存主題，而不是預設主題。指定偏移儲存主題有助您完成任務，例如，建立來源連接器從上一個連接器的最後一個偏移恢復讀取。若要指定偏移儲存主題，請在建立連接器之前，在 Amazon MSK Connect 工作者組態中提供 [https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-manage-connector-offsets](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-manage-connector-offsets) 屬性的值。

# 將來源連接器遷移至 Amazon MSK Connect
<a name="msk-connect-migrate-source-connectors"></a>

來源連接器是將記錄從外部系統匯入 Kafka 的 Apache Kafka Connect 應用程式。本節說明將執行內部部署或自我管理 Kafka Connect 叢集的 Apache Kafka Connect 來源連接器應用程式遷移 AWS 至 Amazon MSK Connect 的程序。

Kafka Connect 來源連接器應用程式會將位移存放在名為 的主題中，該主題具有為組態屬性 設定的值`offset.storage.topic`。以下是 JDBC 連接器的範例位移訊息，該連接器正在執行從兩個名為 `movies`和 的不同資料表匯入資料的兩個任務`shows`。從資料表電影匯入的最近一列主要 ID 為 `18343`。從 顯示資料表匯入的最近資料列主要 ID 為 `732`。

```
["jdbcsource",{"protocol":"1","table":"sample.movies"}] {"incrementing":18343}
["jdbcsource",{"protocol":"1","table":"sample.shows"}] {"incrementing":732}
```

若要將來源連接器遷移至 Amazon MSK Connect，請執行下列動作：

1. 從您的內部部署或自我管理 Kafka Connect 叢集中提取連接器程式庫，以建立 Amazon MSK Connect [自訂外掛程式](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-plugins.html)。

1. 建立 Amazon MSK Connect [工作者屬性](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-config-provider.html#msk-connect-config-providers-create-custom-config)，並將屬性 `key.converter`、 `value.converter`和 `offset.storage.topic` 設定為與現有 Kafka Connect 叢集中執行之 Kafka 連接器相同的值。

1. 在現有 Kafka Connect 叢集上提出`PUT /connectors/connector-name/pause`請求，以暫停現有叢集上的連接器應用程式。

1. 確定連接器應用程式的所有任務都已完全停止。您可以停止任務，方法是在現有的 Kafka Connect 叢集上提出`GET /connectors/connector-name/status`請求，或取用屬性 所設定主題名稱的訊息`status.storage.topic`。

1. 從現有叢集取得連接器組態。您可以在現有叢集上提出`GET /connectors/connector-name/config/`請求，或取用屬性 所設定主題名稱的訊息，以取得連接器組態`config.storage.topic`。

1. 使用與現有叢集相同的名稱建立新的 [Amazon MSK Connector](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-connectors.html)。使用您在步驟 1 中建立的連接器自訂外掛程式、您在步驟 2 中建立的工作者屬性，以及您在步驟 5 中擷取的連接器組態，來建立此連接器。

1. 當 Amazon MSK Connector 狀態為 時`active`，請檢視日誌以確認連接器已開始從來源系統匯入資料。

1. 透過提出`DELETE /connectors/connector-name`請求，刪除現有叢集中的連接器。

# 將接收器連接器遷移至 Amazon MSK Connect
<a name="msk-connect-migrate-sink-connectors"></a>

接收器連接器是將資料從 Kafka 匯出到外部系統的 Apache Kafka Connect 應用程式。本節說明將執行內部部署或自我管理 Kafka Connect 叢集的 Apache Kafka Connect 接收器連接器應用程式遷移 AWS 至 Amazon MSK Connect 的程序。

Kafka Connect 接收器連接器使用 Kafka 群組成員資格 API，並將偏移存放在與典型消費者應用程式相同的`__consumer_offset`主題中。此行為可簡化從自我管理叢集到 Amazon MSK Connect 的接收器連接器遷移。

若要將接收器連接器遷移至 Amazon MSK Connect，請執行下列動作：

1. 從您的內部部署或自我管理 Kafka Connect 叢集中提取連接器程式庫，以建立 Amazon MSK Connect [自訂外掛程式](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-plugins.html)。

1. 建立 Amazon MSK Connect [工作者屬性](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-config-provider.html#msk-connect-config-providers-create-custom-config)，並將屬性 `value.converter` `key.converter`和 設定為與現有 Kafka Connect 叢集中執行之 Kafka 連接器相同的值。

1. 在現有的 Kafka Connect 叢集上提出`PUT /connectors/connector-name/pause`請求，以暫停現有叢集上的連接器應用程式。

1. 確定連接器應用程式的所有任務都已完全停止。您可以停止任務，方法是在現有的 Kafka Connect 叢集上提出`GET /connectors/connector-name/status`請求，或取用屬性 所設定主題名稱的訊息`status.storage.topic`。

1. 從現有叢集取得連接器組態。您可以透過在現有叢集上提出`GET /connectors/connector-name/config`請求，或從屬性 設定的主題名稱取用訊息，來取得連接器組態`config.storage.topic`。

1. 使用與現有叢集相同的名稱建立新的 [Amazon MSK Connector](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-connectors.html)。使用您在步驟 1 中建立的連接器自訂外掛程式、您在步驟 2 中建立的工作者屬性，以及您在步驟 5 中擷取的連接器組態，來建立此連接器。

1. 當 Amazon MSK Connector 狀態為 時`active`，請檢視日誌以確認連接器已開始從來源系統匯入資料。

1. 透過提出`DELETE /connectors/connector-name`請求，刪除現有叢集中的連接器。

# 對 Amazon MSK Connect 中的問題進行故障診斷
<a name="msk-connect-troubleshooting"></a>

下列資訊有助您針對使用 MSK Connect 時可能發生的問題，進行疑難排解。您也可以將問題張貼到 [AWS re:Post](https://repost.aws/)。

**連接器無法存取在公用網際網路上託管的資源**  
請參閱[啟用 Amazon MSK Connect 的網際網路存取](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-internet-access.html)。

**連接器的執行中任務數量不等於 tasks.max 中指定的任務數量**  
以下是連接器使用的任務數量少於 tasks.max 組態所指定數量的一些原因：
+ 部分連接器實作會限制可用的任務數量。例如，MySQL 的 Debezium 連接器僅限使用單一任務。
+ 在使用自動調整規模容量模式時，Amazon MSK Connect 會以與連接器中執行的工作程序數量和每個工作程序的 MCU 數量成比例的值，來覆寫連接器的 tasks.max 屬性。如果您已設定選用`maxAutoscalingTaskCount`參數，則`tasks.max`值不會超過此限制。如需詳細資訊，請參閱[了解自動擴展任務計數上限](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-connectors.html#msk-connect-max-autoscaling-task-count)。
+ 針對目的地連接器，平行處理數量 (任務數量) 不得超過主題分區的數量。雖然您可以將 tasks.max 設定為大於該值，但單一分區一次不會被多個任務處理。
+ 在 Kafka Connect 2.7.x 中，預設的取用者分區指派者是 `RangeAssignor`。此指派者的行為是將每個主題的第一個分區提供給單一取用者、將每個主題的第二個分區提供給單一取用者等。這表示使用 `RangeAssignor` 之目的地連接器的作用中任務數量上限會等於任何單一主題中所取用的分區數量上限。若這不適用於您的使用案例，則您應該[建立工作程序組態](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-create-custom-worker-config)，並在其中將 `consumer.partition.assignment.strategy` 屬性設定為更適合的取用者分區指派者。請參閱 [Kafka 2.7 Interface ConsumerPartitionAssignor：*All Known Implementing Classes*](https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html)。