

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# MSK Connect の使用開始
<a name="msk-connect-getting-started"></a>

これは、 AWS マネジメントコンソール を使用して MSK クラスターを作成し、クラスターから S3 バケットにデータを送信するシンクコネクタを作成するstep-by-stepチュートリアルです。

**Topics**
+ [MSK Connect に必要なリソースを設定する](mkc-tutorial-setup.md)
+ [カスタムプラグインを作成する](mkc-create-plugin.md)
+ [クライアントマシンと Apache Kafka トピックを作成する](mkc-create-topic.md)
+ [コネクタの作成](mkc-create-connector.md)
+ [MSK クラスターにデータを送信する](mkc-send-data.md)

# MSK Connect に必要なリソースを設定する
<a name="mkc-tutorial-setup"></a>

このステップでは、この入門シナリオに必要な次のリソースを作成します。
+ Amazon S3 バケットは、コネクタからデータを受信する宛先として機能します。
+ データの送信先となる MSK クラスター。次に、コネクタはこのクラスターからデータを読み取り、宛先の S3 バケットに送信します。
+ 送信先の S3 バケットへの書き込むアクセス許可を含む IAM ポリシー。
+ コネクタが宛先 S3 バケットに書き込むことを可能にする IAM ロール。作成した IAM ポリシーを、このロールに追加します。
+ クラスターとコネクタを備えた Amazon VPC から Amazon S3 にデータを送信できるようにする Amazon VPC エンドポイント。

**S3 バケットを作成するには**

1. にサインイン AWS マネジメントコンソール し、[https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/) で Amazon S3 コンソールを開きます。

1. **[バケットを作成]** を選択します。

1. バケットの名前には、`amzn-s3-demo-bucket-mkc-tutorial` などのわかりやすい名前を入力します。

1. 下にスクロールして、**Create bucket** (バケットの作成) を選択します。

1. バケットのリストで、新しく作成されたバケットを選択します。

1. **Create folder** (フォルダの作成) を選択します。

1. フォルダの名前として `tutorial` と入力し、下にスクロールして **Create folder** (フォルダの作成) を選択します。

**クラスターを作成するには**

1. [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/) で Amazon MSK コンソールを開きます。

1. 左側のペインの **MSK Clusters** (MSK クラスター) で、**Clusters** (クラスター) を選択します。

1. **Create cluster** (クラスターの作成) を選択します。

1. **[作成方法]** では、**[カスタム作成]** を選択してください。

1. クラスター名には **mkc-tutorial-cluster** と入力します。

1. **クラスタータイプ** では、 **プロビジョンド** を選択します。

1. [**次へ**] を選択します。

1. **[ネットワーク]** で Amazon VPC を選択します。次に、使用するアベイラビリティーゾーンとサブネットを選択します。このチュートリアルの後半で必要になるため、選択した Amazon VPC とサブネットの ID を覚えておいてください。

1. [**次へ**] を選択します。

1. **Access control methods** (アクセス制御方法) で、**Unauthenticated access** (認証されていないアクセス) のみが選択されていることを確認します。

1. **Encryption** (暗号化) で、**Plaintext** (プレーンテキスト) のみが選択されていることを確認します。

1. ウィザードを続行し、**[クラスターの作成]** を選択します。そのクラスターの詳細ページが表示されます。そのページで、**[適用されたセキュリティグループ]** の下のセキュリティグループ ID を探します。このチュートリアルの後半で必要になるため、その ID を覚えておいてください。

**S3 バケットへの書き込み権限を持つ IAM ポリシーを作成する**

1. IAM コンソール ([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)) を開きます。

1. ナビゲーションペインで [**Policies**] (ポリシー) を選択します。

1. [**Create policy**] (ポリシーの作成) を選択します。

1. **ポリシーエディター**で、**JSON** タブを選択し、エディタウィンドウの JSON を次の JSON に置き換えます。

   次の例では、*<amzn-s3-demo-bucket-my-tutorial>* を自身の S3 バケット名に置き換えてください。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Sid": "AllowListBucket",
         "Effect": "Allow",
         "Action": [
           "s3:ListBucket",
           "s3:GetBucketLocation"
         ],
         "Resource": "arn:aws:s3:::<amzn-s3-demo-bucket-my-tutorial>"
       },
       {
         "Sid": "AllowObjectActions",
         "Effect": "Allow",
         "Action": [
           "s3:PutObject",
           "s3:GetObject",
           "s3:DeleteObject",
           "s3:AbortMultipartUpload",
           "s3:ListMultipartUploadParts",
           "s3:ListBucketMultipartUploads"
         ],
         "Resource": "arn:aws:s3:::<amzn-s3-demo-bucket-my-tutorial>/*"
       }
     ]
   }
   ```

------

   安全なポリシーの記述方法については、「[IAM アクセスコントロール](iam-access-control.md)」を参照してください。

1. [**次へ**] を選択します。

1. **[レビューと作成]** ページで、以下の操作を実行します。

   1. **ポリシー名**にわかりやすい名前 (**mkc-tutorial-policy** など) を入力します。

   1. **このポリシーで定義されているアクセス許可**で、ポリシーで定義されたアクセス許可を確認および/または編集します。

   1. (オプション) ポリシーの識別、整理、検索を簡単にするには、キーと値のペアとして**新規タグを追加**します。たとえば、**Environment** と **Test** のキーと値のペアを使用してポリシーにタグを追加します。

      タグの使用の詳細については、*IAM ユーザーガイド*の[「 AWS Identity and Access Management リソースのタグ](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html)」を参照してください。

1. [**Create policy**] (ポリシーの作成) を選択します。

**送信先バケットに書き込みことができる IAM ロールを作成するには**

1. IAM コンソールのナビゲーション ペインで、**ロール** を選択し、次に **ロールの作成** を選択します。

1. **[信頼されたエンティティを選択]** ページで、以下の操作を実行してください。

   1. **信頼できるエンティティタイプ** で、**AWS のサービス** を選択します。

   1. ［**サービスまたはユースケース**］で、［**S3**］を選択します。

   1. [**Use case**] (ユースケース) で、[**S3**] を選択します。

1. [**次へ**] を選択します。

1. **[アクセス許可を追加]** ページで、以下を実行します。

   1. **権限ポリシー**の下にある検索ボックスに、このチュートリアル用に以前に作成したポリシーの名前を入力します。例えば、**mkc-tutorial-policy** などです。次に、ポリシー名の左側にある、チェックボックスを選択してください。

   1. (オプション) [アクセス許可の境界](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_boundaries.html)を設定します。このアドバンスド機能は、サービスロールで使用できますが、サービスにリンクされたロールではありません。アクセス許可の境界の設定については、*IAM ユーザーガイド*の「[ロールの作成とポリシーのアタッチ (コンソール)](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_job-functions_create-policies.html)」を参照してください。

1. [**次へ**] を選択します。

1. **[名前を付けて、レビューし、作成する]** ページで、以下の操作を実行します。

   1. **ロール名**に、わかりやすい名前 (**mkc-tutorial-role** など) を入力します。
**重要**  
ロールに名前を付けるときは、次のことに注意してください。  
ロール名は 内で一意である必要があり AWS アカウント、大文字と小文字を区別することはできません。  
例えば、**PRODROLE** と **prodrole** の両方の名前でロールを作成することはできません。ロール名がポリシーまたは ARN の一部として使用される場合、ロール名は大文字と小文字が区別されます。ただし、サインインプロセスなど、コンソールにロール名がユーザーに表示される場合、ロール名は大文字と小文字が区別されません。
他のエンティティがロールを参照する可能性があるため、ロールを作成した後にロール名を編集することはできません。

   1. (オプション) **[説明]** にロールの説明を入力します。

   1. (オプション) ロールの使用事例とアクセス許可を編集するには、[**ステップ 1: 信頼されたエンティティを選択**] または [**ステップ 2: アクセス権限を追加**] のセクションで [**編集**] を選択します。

   1. (オプション) ロールの識別、整理、検索を簡単にするには、キーと値のペアとして**新規タグを追加**します。たとえば、**ProductManager** と **John** のキーと値のペアを使用してロールにタグを追加します。

      タグの使用の詳細については、*IAM ユーザーガイド*の[「 AWS Identity and Access Management リソースのタグ](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html)」を参照してください。

1. ロールを確認したら、**[ロールを作成]** を選択します。

**MSK Connect がそのロールを引き受けることができるようにするには**

1. IAM コンソールの左側のペインの**Access management** (アクセス管理) で、**Roles** (ロール) を選択します。

1. `mkc-tutorial-role` を見つけて選択します。

1. ロールの **Summary** (概要) で、**Trust relationships** (信頼関係) タブを選択します。

1. **Edit trust relationship** (信頼関係の編集) を選択します。

1. 既存の信頼ポリシーを次の JSON に置き換えます。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "kafkaconnect.amazonaws.com"
         },
         "Action": "sts:AssumeRole"
       }
     ]
   }
   ```

------

1. **信頼ポリシーの更新** を選択します。

**クラスターの VPC から Amazon S3 への Amazon VPC エンドポイントを作成するには**

1.  Amazon VPC コンソール[https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/)を開きます。

1. 左側のペインで、**Endpoints** (エンドポイント) を選択します。

1. **Create endpoint** (エンドポイントの作成) を選択します。

1. **Service Name** (サービス名) で、**com.amazonaws.us-east-1.s3** サービスと**Gateway** (ゲートウェイ) タイプを選択します。

1. クラスターの VPC を選択してから、クラスターのサブネットに関連付けられているルートテーブルの左側にあるボックスを選択します。

1. **Create endpoint** (エンドポイントの作成) を選択します。

**次のステップ**

[カスタムプラグインを作成する](mkc-create-plugin.md)

# カスタムプラグインを作成する
<a name="mkc-create-plugin"></a>

プラグインには、コネクタのロジックを定義するコードが含まれています。このステップでは、Lenses Amazon S3 Sink Connector のコードを含むカスタムプラグインを作成します。後のステップで、MSK コネクタを作成するときに、そのコードがこのカスタム プラグインにあることを指定します。同じプラグインを使用して、構成が異なる複数の MSK コネクタを作成できます。

**カスタムプラグインを作成するには**

1. [S3 コネクタ](https://www.confluent.io/hub/confluentinc/kafka-connect-s3)をダウンロードします。

1. アクセスできる S3 バケットに ZIP ファイルをアップロードします。Amazon S3 にファイルをアップロードする方法については、Amazon S3 ユーザーガイドの[オブジェクトのアップロード](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html)を参照してください。

1. [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) で Amazon MSK コンソールを開きます。

1. 左側のペインで **MSK Connect** を展開し、**Custom plugins** (カスタムプラグイン) を選択します。

1. **Create custom plugin** (カスタムプラグインの作成) を選択します。

1. **Browse S3** (S3 の参照) を選択します。

1. バケットのリストで、ZIP ファイルをアップロードしたバケットを見つけて、そのバケットを選択します。

1. バケット内のオブジェクトのリストで、ZIP ファイルの左側にあるラジオボタンを選択し、**選択** というラベル付けたボタンを選択します。

1. カスタムプラグイン名に `mkc-tutorial-plugin` と入力し、**Create custom plugin** (カスタムプラグインの作成) を選択します。

カスタムプラグインの作成が完了するまで AWS に数分かかる場合があります。作成プロセスが完了すると、ブラウザウィンドウの上部にあるバナーに次のメッセージが表示されます。

```
Custom plugin mkc-tutorial-plugin was successfully created
The custom plugin was created. You can now create a connector using this custom plugin.
```

**次のステップ**

[クライアントマシンと Apache Kafka トピックを作成する](mkc-create-topic.md)

# クライアントマシンと Apache Kafka トピックを作成する
<a name="mkc-create-topic"></a>

このステップでは、Apache Kafka クライアントインスタンスとして使用する Amazon EC2 インスタンスを作成します。次に、このインスタンスを使用して、クラスター上にトピックを作成します。

**クライアントマシンを作成するには**

1. Amazon EC2 コンソールの [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) を開いてください。

1. **[Launch Instances]** (インスタンスの起動) を選択してください。

1. クライアントマシンの **[名前]** (**mkc-tutorial-client** など) を入力します。

1. **[Amazon マシンイメージ (AMI) のタイプ]** については、**[Amazon Linux 2 AMI (HVM) - カーネル 5.10、SSD ボリューム タイプ]** を選択したままにします。

1. **[t2.xlarge]** インスタンスタイプを選択します。

1. **[キーペア (ログイン)]** で、**[新しいキーペアの作成]** を選択します。**[キーペア名]** に **mkc-tutorial-key-pair** を入力し、**[キーペアのダウンロード]** を選択します。既存のキーペアを使用することもできます。

1. **[インスタンスを起動]** を選択します。

1. [**インスタンスの表示**] を選択します。次に、**[セキュリティグループ]** 列で、新しいインスタンスに関連付けられているセキュリティグループを選択します。セキュリティグループの ID をコピーし、後で使用できるように保存します。

**新しく作成されたクライアントがクラスターにデータを送信するのを許可するには**

1. Amazon VPC コンソール[https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/)を開きます。

1. 左側のペインの**SECURITY** (セキュリティ) で、**Security Groups** (セキュリティグループ) を選択します。**Security group ID** (セキュリティグループID) 列で、クラスターのセキュリティグループを見つけます。[MSK Connect に必要なリソースを設定する](mkc-tutorial-setup.md) でクラスターを作成したときに、このセキュリティグループのIDを保存しました。行の左側にあるボックスを選択して、このセキュリティグループを選択します。他のセキュリティグループが同時に選択されていないことを確認してください。

1. 画面の下半分で、**Inbound rules** (インバウンドルール) タブを選択します。

1. **Edit inbound rules** (インバウンドルールの編集) を選択します。

1. 画面の左下で、**Add rule** (ルールの追加) を選択します。

1. 新しいルールで、**Type** (タイプ) 列の **All traffic** (すべてのトラフィック) を選択します。**[ソース]** 列の右側のフィールドに、クライアントマシンのセキュリティグループの ID を入力します。これは、クライアントマシンを作成した後に保存したセキュリティグループ ID です。

1. **Save rules** (ルールの保存) を選択します。これで、MSK クラスターは、前の手順で作成したクライアントからのすべてのトラフィックを受け入れます。

**トピックを作成する**

1. Amazon EC2 コンソール[https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/)を開きます。

1. インスタンスのテーブルで `mkc-tutorial-client` を選択します。

1. 画面上部の**Connect** (接続) を選択し、指示に従ってインスタンスに接続します。

1. 次のコマンドを実行して、クライアントインスタンスに Java をインストールします。

   ```
   sudo yum install java-1.8.0
   ```

1. 次のコマンドを実行して、Apache Kafka をダウンロードします。

   ```
   wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz
   ```
**注記**  
このコマンドで使用されているもの以外のミラーサイトを使用する場合は、[Apache](https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.11-1.1.1.tgz) ウェブサイトで別のサイトを選択できます。

1. 前のステップで TAR ファイルをダウンロードしたディレクトリで次のコマンドを実行します。

   ```
   tar -xzf kafka_2.12-2.2.1.tgz
   ```

1. **kafka\$12.12-2.2.1** ディレクトリに移動します。

1. [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/) で Amazon MSK コンソールを開きます。

1. 左側のペインで **Clusters** (クラスター) を選択してから、`mkc-tutorial-cluster` という名前を選択します。

1. **View client information** (ライアント情報の表示) を選択します。

1. **プレーンテキスト**の接続文字列をコピーします。

1. **[Done]** (完了) をクリックします。

1. クライアントインスタンス (`mkc-tutorial-client`) で次のコマンドを実行し、*bootstrapServerString* を、クラスターのクライアント情報を表示したときに保存した値に置き換えます。

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server bootstrapServerString --replication-factor 2 --partitions 1 --topic mkc-tutorial-topic
   ```

   コマンドが成功すると、次のメッセージが表示されます : `Created topic mkc-tutorial-topic.`

**次のステップ**

[コネクタの作成](mkc-create-connector.md)

# コネクタの作成
<a name="mkc-create-connector"></a>

この手順では、 AWS マネジメントコンソールを使用してコネクタを作成する方法について説明します。

**コネクタを作成するには**

1. にサインインし AWS マネジメントコンソール、[https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/) で Amazon MSK コンソールを開きます。

1. 左側のペインで、**MSK Connect** を展開し、**Connectors** (コネクタ) を選択します。

1. **Create connector** (コネクタの作成) を選択します。

1. プラグインのリストで、`mkc-tutorial-plugin` を選択し、**Next** を選択します。

1. コネクタ名に `mkc-tutorial-connector` と入力します。

1. クラスターのリストから、`mkc-tutorial-cluster` を選択します。

1. **コネクタネットワーク設定**セクションで、ネットワークタイプに次のいずれかを選択します。
   + **IPv4** (デフォルト) - IPv4 経由の送信先への接続のみ
   + **デュアルスタック -** IPv4 と IPv6 の両方を介した送信先への接続用 (サブネットに IPv4 と IPv6 CIDR ブロックが関連付けられている場合にのみ使用可能)

1. 次の構成をコピーして、コネクタ構成フィールドに貼り付けます。

   region は、コネクタ AWS リージョン を作成する のコードに置き換えてください。同様に、Amazon S3 バケット名 *<amzn-s3-demo-bucket-my-tutorial>* を次の例のバケット名に置き換えます。

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   s3.region=us-east-1
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   flush.size=1
   schema.compatibility=NONE
   tasks.max=2
   topics=mkc-tutorial-topic
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   storage.class=io.confluent.connect.s3.storage.S3Storage
   s3.bucket.name=<amzn-s3-demo-bucket-my-tutorial>
   topics.dir=tutorial
   ```

1. **Access permissions** (アクセス許可) で `mkc-tutorial-role` を選択します。

1. **Next** (次へ) を選択します。**Security** (セキュリティ) ページで、もう一度 **Next** (次へ) を選択します。

1. **Logs** (ログ) ページで、**Next** (次へ) を選択します。

1. **確認と作成**ページで、コネクタ設定を確認し、**コネクタの作成**を選択します。

**次のステップ**

[MSK クラスターにデータを送信する](mkc-send-data.md)

# MSK クラスターにデータを送信する
<a name="mkc-send-data"></a>

このステップでは、前に作成した Apache Kafka トピックにデータを送信し、宛先 S3 バケットで同じデータを探します。

**MSK クラスターにデータを送信するには**

1. クライアントインスタンスの Apache Kafka インストールの `bin` フォルダに、次の内容の `client.properties` という名前のテキストファイルを作成します。

   ```
   security.protocol=SASL_SSL
   sasl.mechanism=AWS_MSK_IAM
   ```

1. 次のコマンドを実行して、コンソールプロデューサーを作成します。*BootstrapBrokerString* を前のコマンドを実行したときに取得した値に置き換えます。

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-producer.sh --broker-list BootstrapBrokerString --producer.config client.properties --topic mkc-tutorial-topic
   ```

1. 必要なメッセージを入力して、**Enter** キーを押します。このステップを 2、3 回繰り返します。行を入力して **Enter** キーを押すたびに、その行は個別のメッセージとして Apache Kafka クラスターに送信されます。

1. 宛先の Amazon S3 バケットを調べて、前のステップで送信したメッセージを見つけます。