

# AWS Glue スキーマレジストリ
<a name="schema-registry"></a>

**注記**  
AWS Glue スキーマレジストリは、中東 (UAE) リージョンの AWS Glue コンソールではサポートされていません。

AWS Glue スキーマレジストリを使用すると、データストリームスキーマを一元的に検出、制御、展開できます。*スキーマ* は、データレコードの構造と形式を定義します。AWS Glue のスキーマレジストリを使用すると、Apache Kafka、[Amazon Managed Streaming for Apache Kafka](https://aws.amazon.com/msk/)、[Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams/)、[Amazon Managed Service for Apache Flink](https://aws.amazon.com/kinesis/data-analytics/)、[AWS Lambda](https://aws.amazon.com/lambda/) と便利な統合を使用するデータストリームアプリケーションでスキーマを管理して実施することができます。

スキーマレジストリは、AVRO (v1.11.4) データ形式、[Everit ライブラリ](https://github.com/everit-org/json-schema)を使用した JSON スキーマ検証を含むスキーマ用 (Draft-04、Draft-06、Draft-07 仕様) の [JSON スキーマ形式](https://json-schema.org/)の JSON データ形式、`extensions` または `groups` のサポートが含まれないプロトコルバッファ (Protobuf) バージョンの proto2 および proto3、Java 言語対応をサポートしており、今後は他のデータ形式や言語にも対応していきます。サポートされている機能には、互換性、メタデータを介したスキーマのソーシング、スキーマの自動登録、IAM 互換性、ストレージとデータ転送を削減する ZLIB 圧縮のオプションがあります。スキーマレジストリはサーバーレスで無料で使用できます。

プロデューサとコンシューマの間のデータ形式契約としてスキーマを使用すると、データガバナンスおよびデータ品質の向上につながります。さらにデータのコンシューマは、上流で行われる互換性に関する変更に対する復旧力を得ることができます。

スキーマレジストリを使用すると、異なるシステムでシリアル化と非シリアル化用のスキーマを共有できます。例えば、データのプロデューサーとコンシューマーを考えてみます。データの公開時、プロデューサーはスキーマを把握しています。スキーマレジストリは、Amazon MSK や Apache Kafka などの特定のシステム用に、シリアル化と非シリアル化の機能を提供します。

 詳しくは、「[スキーマレジストリの仕組み](schema-registry-works.md) 」を参照してください。

**Topics**
+ [スキーマ](#schema-registry-schemas)
+ [レジストリ](#schema-registry-registries)
+ [スキーマのバージョニングと互換性](#schema-registry-compatibility)
+ [オープンソース Serde ライブラリ](#schema-registry-serde-libraries)
+ [スキーマレジストリのクォータ](#schema-registry-quotas)
+ [スキーマレジストリの仕組み](schema-registry-works.md)
+ [スキーマレジストリの開始方法](schema-registry-gs.md)

## スキーマ
<a name="schema-registry-schemas"></a>

*スキーマ* は、データレコードの構造と形式を定義します。スキーマは、信頼性の高いデータの公開、利用、または保存のための仕様をバージョニングしたものです。

この例の Avro のスキーマでは、形式と構造はレイアウトとフィールド名によって定義され、フィールド名の形式はデータ型 (例:`string`、`int`) により定義されています

```
{
    "type": "record",
    "namespace": "ABC_Organization",
    "name": "Employee",
    "fields": [
        {
            "name": "Name",
            "type": "string"
        },
        {
            "name": "Age",
            "type": "int"
        },
        {
            "name": "address",
            "type": {
                "type": "record",
                "name": "addressRecord",
                "fields": [
                    {
                        "name": "street",
                        "type": "string"
                    },
                    {
                        "name": "zipcode",
                        "type": "int" 
                    }
                ]
            }
        }
    ]
}
```

この例の JSON 用の JSON スキーマ draft-07 では、形式を [JSON スキーマ組織](https://json-schema.org/)が定義しています。

```
{
	"$id": "https://example.com/person.schema.json",
	"$schema": "http://json-schema.org/draft-07/schema#",
	"title": "Person",
	"type": "object",
	"properties": {
		"firstName": {
			"type": "string",
			"description": "The person's first name."
		},
		"lastName": {
			"type": "string",
			"description": "The person's last name."
		},
		"age": {
			"description": "Age in years which must be equal to or greater than zero.",
			"type": "integer",
			"minimum": 0
		}
	}
}
```

この例の Protobuf では、データ形式を [Protocol Buffers l言語のバージョン 2 (proto2)](https://developers.google.com/protocol-buffers/docs/reference/proto2-spec) で定義しています。

```
syntax = "proto2";

package tutorial;

option java_multiple_files = true;
option java_package = "com.example.tutorial.protos";
option java_outer_classname = "AddressBookProtos";

message Person {
  optional string name = 1;
  optional int32 id = 2;
  optional string email = 3;

  enum PhoneType {
    MOBILE = 0;
    HOME = 1;
    WORK = 2;
  }

  message PhoneNumber {
    optional string number = 1;
    optional PhoneType type = 2 [default = HOME];
  }

  repeated PhoneNumber phones = 4;
}

message AddressBook {
  repeated Person people = 1;
}
```

## レジストリ
<a name="schema-registry-registries"></a>

*レジストリ* は、スキーマの論理コンテナです。レジストリを使用すると、スキーマを整理したり、アプリケーションのアクセス制御を管理したりできます。レジストリは Amazon リソースネーム (ARN) を持っており、これにより、レジストリ内のスキーマ操作に対するさまざまなアクセス許可の整理と設定を可能にしています。

レジストリはデフォルトのまま使用することも可能ですが、必要な数の新しいレジストリを作成することもできます。


**AWS Glue スキーマレジストリでの階層**  

|  | 
| --- |
|  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/schema-registry.html)  | 
|  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/schema-registry.html)  | 
|  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/schema-registry.html)  | 
|  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/schema-registry.html)  | 

## スキーマのバージョニングと互換性
<a name="schema-registry-compatibility"></a>

各スキーマは、複数のバージョン作成することができます。バージョニングは、スキーマに適用される互換性ルールによって制御されます。新しいスキーマバージョンの登録に関するリクエストについては、この規則に対するチェックがスキーマレジストリにより行われ後で処理が続行されます。

チェックポイントとしてマークされたスキーマバージョンは、スキーマの新しいバージョンを登録する際の互換性判断のために使用されます。スキーマが最初に作成されると、最初のバージョンがデフォルトのチェックポイントになります。スキーマのバージョンが増加した場合には、CLI/SDK により、一連の制約に従う `UpdateSchema` APIを使用しながら、チェックポイントに使用するスキーマのバージョンを変更できます。デフォルトでは、コンソールでスキーマの定義または互換モードを編集することで、チェックポイントが最新バージョンに変更されます。

互換モードを使用すると、スキーマを時間の経過とともにどのように増加させるか、あるいは増加させないかを制御できます。これらのモードは、データのプロデューサおよびコンシューマである、アプリケーション間の契約を形成します。スキーマの新しいバージョンがレジストリに送信されると、そのスキーマ名に適用されている互換性規則を使用して、新しいバージョンの受け付が可能かどうかが判断されます。互換モードには、NONE、DISABLED、BACKWARD、BACKWARD\$1ALL、FORWARD、FORWARD\$1ALL、FULL、FULL\$1ALL の 8 種類があります

Avro データ形式では、フィールドはオプションまたは必須の両方の場合があります。フィールドがオプションの場合、`Type` には null が含まれます。必須フィールドでは、`Type` は null に設定されません。

Protobuf のデータ形式の場合、 proto2 構文ではフィールドを (繰り返しを含み) オプションまたは必須にすることができます。一方、proto3 構文ではすべてのフィールドが (繰り返しを含み) オプションとなります。すべての互換性ルールは、Protocol Buffers の仕様と、[Google の Protocol Buffers ドキュメント](https://developers.google.com/protocol-buffers/docs/overview#updating)を詳細に把握した上で定義されています。
+ *NONE*: 互換モードは適用されません。このモードは、開発向けのシナリオか、スキーマに適用する互換モードがわからない場合に使用できます。新しいバージョンが追加されたとき、互換性チェックを受けずに受け入れられます。
+ *DISABLED*: 互換性についてこの選択をすると、特定のスキーマのバージョン管理が不要になります。新しいバージョンを追加することはできません。
+ *BACKWARD*: これは推奨の互換性モードで、コンシューマは、現在および過去のスキーマバージョンの両方からの読み取りが可能です。このモードを使用すると、フィールドの削除や任意のフィールドの追加を行う際に、以前のスキーマバージョンとの互換性をチェックできます。アプリケーションが最新のスキーマ用に作成された場合などが、BACKWARD の一般的なユースケースです。

**AVRO**  
例えば、名前 (必須)、姓 (必須)、E メール (必須)、電話番号 (オプション) で定義されているスキーマがあるとします。

  次のスキーマバージョンで、必須である E メールフィールドが削除されたとしても、これは正常に登録されます。後方互換性のために、コンシューマは、現在および 1 つ前のスキーマバージョンを読み取れることが必要です。古いメッセージの余分な E メールフィールドが無視されるため、コンシューマーは新しいスキーマを読み取ることができます。

  仮に、郵便番号などのフィールドを必須として追加する新しいスキーマバージョンが提案された場合、BACKWARD の互換モードでは正常に登録されません。新しいバージョンのコンシューマーは、必須な郵便番号フィールドを含まない、スキーマが変更される前の古いメッセージを読み取ることができないためです。ただし、新しいスキーマで郵便番号フィールドがオプションとして設定されていれば、提案されたバージョンは正常に登録されます。郵便番号フィールドはオプションなので、コンシューマーはこのフィールドなしでも古いスキーマを読み取れるためです。

**JSON**  
例えば、あるスキーマバージョンが、名 (オプション)、姓 (オプション)、E メール (オプション)、電話番号 (オプション)で定義されているとします。

  仮に、次に続くスキーマバージョンで電話番号プロパティがオプションとして追加された場合でも、元のスキーマバージョンで `additionalProperties` フィールドを false に設定しプロパティの追加を禁止しているのであれば、この新しいバージョンは正しく登録されます。後方互換性のために、コンシューマは、現在および 1 つ前のスキーマバージョンを読み取れることが必要です。コンシューマは、電話番号プロパティが存在しない元のスキーマで生成されたデータを読み取ることができます。

  BACKWARD の互換モードで、オプションの電話番号プロパティを追加する新しいスキーマバージョンが提案された際に、元のスキーマバージョンで `additionalProperties` フィールドを true に設定し任意のプロパティの追加を許可していると、このバージョンは正しく登録されません。新しいバージョンのコンシューマは、スキーマが変更される前の古いメッセージを読み取ることができません。これは、別の型 (数ではなく文字列など) で記述された電話番号プロパティデータを、読み取ることができないためです。

**PROTOBUF**  
例えば、`first name` (必須)、`last name` (必須)、`email` (必須) `phone number` (オプション) フィールドを持つ、proto2 構文の Message `Person` で定義されているスキーマバージョンを考えます。

  AVRO での場合と同様に、次に続くスキーマバージョンで、必須である `email` フィールドが削除されたとしても、これは正常に登録されます。後方互換性のために、コンシューマは、現在および 1 つ前のスキーマバージョンを読み取れることが必要です。古いメッセージでは、余分な `email` フィールドが無視されるため、コンシューマーは新しいスキーマを読み取ることができます。

  BACKWARD の互換モードでは、仮に `zip code` などのフィールドを必須として追加する新しいスキーマバージョンが提案された場合、これは正常に登録されません。スキーマが変更される前の古いメッセージには、必須な `zip code` フィールドが含まていないため、新しいバージョンのコンシューマーは、そのメッセージを読み取ることができないためです。ただし、新しいスキーマで `zip code` フィールドがオプションとして設定されていれば、提案されたバージョンは正常に登録されます。`zip code` フィールドはオプションなので、コンシューマーはこのフィールドなしでも古いスキーマを読み取れます。

  gRPC ユースケースの場合、新しい RPC サービスまたは RPC メソッドを追加することで、後方互換性が変更されます。例えば、2 つの RPC メソッド `Foo` および `Bar` を使用する RPC サービス `MyService` で定義されている、スキーマバージョンがあるとします。

  次に続くスキーマバージョンで、`Baz` という新しい RPC メソッドが追加された場合、これは正常に登録されます。新しく追加された RPC メソッド `Baz` はオプションのため、コンシューマーは元のスキーマで生成されたデータを、BACKWARD 互換性に従って読み取ることができます。

  提案された新しいスキーマバージョンで、既存の RPC メソッド `Foo` を削除する場合、BACKWARD 互換性では正常に登録されません。新しいバージョンのコンシューマーは、スキーマが変更される前の古いメッセージを読み取ることができません。これは、gRPC アプリケーション内に RPC メソッド `Foo` が存在せず、データを理解して読み取れないためです。
+ *BACKWARD\$1ALL*: この互換モードでは、コンシューマは現在および過去のすべてのスキーマバージョンの読み取りが可能です。このモードを使用すると、フィールドを削除したり任意のフィールドを追加する際に、以前のすべてのスキーマバージョンとの互換性をチェックできます。
+ *FORWARD*: この互換モードにより、コンシューマは現在とその次のスキーマバージョンの両方を読み取ることができますが、それより後に続くバージョンを必ずしも読み取れる訳ではありません。このオプションを使用すると、フィールドを追加したり、任意のフィールドを削除したりする際に、最新のスキーマバージョンとの互換性をチェックできます。過去のスキーマ用に作成されたアプリケーションが、より新しいスキーマを処理できる必要がある場合などが、FORWARD の一般的なユースケースです。

**AVRO**  
例えば、あるスキーマバージョンが、名 (必須)、姓 (必須)、E メール (オプション) で定義されているとします。

  この場合、電話番号などのフィールドを必須として追加する新しいスキーマバージョンは正常に登録されます。FORWARD 互換モードでは、新しいスキーマで生成されたデータを、コンシューマーが以前のバージョンを使用して読み取れることが必要です。

  必須である名 (ファーストネーム) のフィールドを削除するスキーマバージョンが提案されとしても、FORWARD 互換モードでは正常に登録されません。名の必須フィールドがないので、以前のバージョンのコンシューマーが提案されたスキーマを読み取ることができなくなるためです。しかし、名 (ファーストネーム) のフィールドが本来オプションであれば、オプションの名のフィールドを持たない新しいスキーマに基づいたデータをコンシューマーが読み取りできるため、提案された新しいスキーマは正常に登録されます。

**JSON**  
例えば、あるスキーマバージョンが、名 (オプション)、姓 (オプション)、E メール (オプション)、電話番号 (オプション)で定義されているとします。

  オプションの電話番号プロパティを削除する新しいスキーマバージョンの場合、このスキーマバージョンで `additionalProperties` フィールドを false に設定しプロパティの追加を禁止しているのであれば、正しく登録されます。FORWARD 互換モードでは、新しいスキーマで生成されたデータを、コンシューマーが以前のバージョンを使用して読み取れることが必要です。

  オプションの電話番号プロパティを削除するスキーマバージョンが提案された際に、この新しいスキーマバージョンで `additionalProperties` フィールドを true に設定し任意のプロパティの追加を許可していると、FORWARD の互換モードでは正しく登録されません。提案されたスキーマが、異なる型 (数ではなく文字列など) の電話番号プロパティを持つ場合には、以前のバージョンを使用するコンシューマで、このスキーマの読み取りが不可能になるためです。

**PROTOBUF**  
例えば、`first name` (必須)、`last name` (必須)、`email` (オプション) のフィールドを持つ、proto2 構文の Message `Person` で定義されているスキーマバージョンを考えます。

  AVRO の場合と同様に、`phone number` などの必須フィールドを追加する新しいスキーマバージョンの場合、これは正常に登録されます。FORWARD 互換モードでは、新しいスキーマで生成されたデータを、コンシューマーが以前のバージョンを使用して読み取れることが必要です。

  必須である名 `first name` フィールドを削除するスキーマバージョンが提案されとしても、FORWARD 互換モードでは正常に登録されません。必須である `first name` フィールドがないので、以前のバージョンのコンシューマーは、提案されたスキーマを読み取ることができなくなるためです。しかし、`first name` フィールドが本来オプションであれば、新しいスキーマに `first name` フィールドがないとしてもこれはオプションであるため、コンシューマーはこのスキーマ基づいたデータを読み取ることが可能であり、したがって提案された新しいスキーマは正常に登録されます。

  gRPC ユースケースの場合、RPC サービスまたは RPC メソッドを削除すると、前方互換性が変更されます。例えば、2 つの RPC メソッド `Foo` および `Bar` を使用する RPC サービス `MyService` で定義されている、スキーマバージョンがあるとします。

  次に続くスキーマバージョンが、`Foo` という名前の既存の RPC メソッドを削除する場合も、これは FORWARD 互換性に従って正常に登録されます。コンシューマーは、新しいスキーマで生成されたデータを、以前のバージョンを使用して読み取ることが可能です。RPC メソッド `Baz` を追加するスキーマバージョンが提案され場合、これは FORWARD 互換モードでは正常に登録されません。RPC メソッド `Baz` がないので、以前のバージョンのコンシューマーは、この提案されたスキーマを読み取ることができないためです。
+ *FORWARD\$1ALL*: この互換モードでは、任意の新しい登録済みスキーマのプロデューサーによって書き込まれたデータを読み取ることを、コンシューマに許可します。この選択は、フィールドを追加したり、オプションフィールドを削除したり、以前のすべてのスキーマバージョンとの互換性を確認したりする必要がある場合に使用できます。
+ *FULL*: この互換モードにより、コンシューマーは、1 つ前または次のバージョンのスキーマを使用するプロデューサーによって書き込まれたデータを読み取ることができますが、それ以前またはそれ以降のバージョンについては読み取れません。このモードでは、任意のフィールドを追加または削除する際に、最新のスキーマバージョンとの互換性をチェックできます。
+ *FULL\$1ALL*: この互換モードにより、コンシューマーは、過去の任意のスキーマバージョンを使用するプロデューサーによって書き込まれたデータを読み取ることができます。このモードは、オプションのフィールドを追加または削除する際に、過去のすべてのスキーマバージョンとの互換性をチェックするために使用できます。

## オープンソース Serde ライブラリ
<a name="schema-registry-serde-libraries"></a>

AWS では、データをシリアル化および非シリアル化するためのフレームワークとして、オープンソースの Serde ライブラリが用意されています。このオープンソースなライブラリ設計により、一般的なオープンソースのアプリケーションやフレームワークが、それぞれのプロジェクトでこれらのライブラリをサポートできるようにしています。

Serde ライブラリの仕組みの詳細については、「[スキーマレジストリの仕組み](schema-registry-works.md)」を参照してください。

## スキーマレジストリのクォータ
<a name="schema-registry-quotas"></a>

AWS のクォータ (制限とも呼ばれます) は、AWS アカウントのリソース、アクション、および制限の最大値です。以下に、AWS Glue のスキーマレジストリにおけるソフト制限を示します。

**スキーマバージョンのメタデータでのキー値ペアの数。**  
AWS リージョンごとの各 SchemaVersion において、最大 10 個のキーと値のペアを使用できます。

メタデータでのキーと値のペアは、[QuerySchemaVersionMetadata アクション (Python: query\$1schema\$1version\$1metadata)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-QuerySchemaVersionMetadata) または [PutSchemaVersionMetadata アクション (Python: put\$1schema\$1version\$1metadata)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-PutSchemaVersionMetadata) APIを使用して表示または設定できます。

AWS Glue のスキーマレジストリでのハード制限は次のとおりです。

**レジストリ**  
このアカウントでは、AWS リージョンごとに最大 100 のレジストリを使用できます。

**SchemaVersion**  
このアカウントでは、AWS リージョンごとに最大 10000 のスキーマバージョンを使用できます。

各新しいスキーマは新しいスキーマバージョンを作成します。したがって、各スキーマに 1 つのバージョンしかない場合、リージョンごとの各アカウントには理論的に最大 10000 のスキーマを使用できます。

**スキーマペイロード**  
スキーマペイロードには、170 KB のサイズ制限があります。

# スキーマレジストリの仕組み
<a name="schema-registry-works"></a>

このセクションでは、スキーマレジストリでシリアル化および非シリアル化のプロセスの仕組みについて説明します。

1. スキーマの登録: スキーマがレジストリにまだ存在しない場合、スキーマを保存先の名前と同じスキーマ名で登録することができます (test\$1topic、test\$1stream、prod\$1firehose など)。あるいは、プロデューサーがスキーマのカスタム名を指定することも可能です。プロデューサーは、source: msk\$1kafka\$1topic\$1A などのメタデータとしてスキーマにキーと値のペアを追加したり、スキーマ作成時に AWS タグをそのスキーマに追加したりできます。スキーマが登録されると、Schema Registry はスキーマのバージョン ID をシリアライザに返します。スキーマが存在するものの、シリアライザが存在しない新しいバージョンを使用している場合、それを新しいバージョンとして登録する前に、Schema Registry はスキーマ参照で互換性ルールを確認して、新しいバージョンに関する互換性を確保します。

   スキーマの登録には、手動登録と自動登録の 2 つの方法があります。スキーマを手動で登録するには、AWS Glue コンソールまたは CLI/SDK を使用します。

   シリアライザ設定で自動登録が有効化されている場合は、スキーマの自動登録が実行されます。プロデューサ設定で `REGISTRY_NAME` が指定されていない場合、自動登録はデフォルトのレジストリ (default-registry) の下に新しいスキーマバージョンを登録します。自動登録プロパティの指定については、「[SerDe ライブラリのインストール](schema-registry-gs-serde.md)」を参照してください。

1. シリアライザがスキーマに関してデータレコードを検証: データを生成するアプリケーションがそのスキーマを登録した場合、Schema Registry のシリアライザは、アプリケーションによって生成されるレコードが、登録されたスキーマに一致するフィールドとデータ型で構造化されていることを検証します。レコードのスキーマが登録されたスキーマと一致しない場合、シリアライザは例外を出力し、アプリケーションは、そのレコードを送信先にデリバリすることに失敗します。

   スキーマが存在せず、プロデューサー設定を介してスキーマ名が指定されてもいない場合は、スキーマがトピック名 (Apache Kafka または Amazon MSK の場合) あるいはストリーム名 (Kinesis Data Streams の場合) と同じ名前で作成されます。

   すべてのレコードは、スキーマ定義とデータを含みます。スキーマ定義は、Schema Registry 内の既存のスキーマおよびバージョンに関して照会されます。

   デフォルトでは、プロデューサは、登録済みスキーマのスキーマ定義とスキーマバージョン ID をキャッシュします。レコードのスキーマバージョン定義がキャッシュ内に保存されたものと一致しない場合、プロデューサーは Schema Registry を使用してスキーマの検証を試みます。スキーマのバージョンが有効と認められた場合、そのバージョン ID と定義はプロデューサにローカルにキャッシュされます。

   [SerDe ライブラリのインストール](schema-registry-gs-serde.md) のステップ 3 のように、オプションのプロデューサプロパティにより、デフォルトのキャッシュ期間 (24 時間) を調整することが可能です。

1. レコードのシリアル化と配信: レコードがスキーマに準拠している場合、シリアライザは各レコードをスキーマのバージョン ID で装飾し、選択したデータ形式 (AVRO、JSON、Protobuf 他の形式は追加予定) に基づいてシリアル化した上で、そのレコードを (プロデューサーのオプション設定により) 圧縮し送信先に送ります。

1. コンシューマーがデータを非シリアル化: このデータを読み取るコンシューマーは、レコードペイロードからのスキーマバージョン ID を解析する、Schema Registry のデシリアライザ用ライブラリを使用します。

1. デシリアライザが Schema Registry からのスキーマをリクエスト: デシリアライザが特定のスキーマバージョン ID を持つレコードを初めて認識すると、そのデシリアライザは、スキーマバージョン ID を使用してスキーマレジストリからスキーマを要求します。その後、そのスキーマをコンシューマ上でローカルにキャッシュします。Schema Registry でレコードの非シリアル化が不可能な場合、コンシューマはレコードからのデータをログに記録し、処理を続行するかアプリケーションを停止できます。

1. デシリアライザがスキーマを使用してレコードを非シリアル化: デシリアライザは、Schema Registry からスキーマバージョン ID を取得する際に (プロデューサーによって送信されたレコードが圧縮されている場合は) レコードを解凍し、スキーマを使用してレコードを非シリアル化します。これで、アプリケーションはレコードを処理できます。

**注記**  
暗号化: クライアントは、HTTPS 上での TLS 暗号化を使用して転送中のデータを暗号化する API 呼び出しを介して Schema Registry と通信します。Schema Registry に保存されているスキーマは、サービス管理のAWS Key Management Service(AWS KMS) キーにより常に暗号化されます。

**注記**  
ユーザー認可: Schema Registry は、アイデンティティベースの IAM ポリシーをサポートします。

# スキーマレジストリの開始方法
<a name="schema-registry-gs"></a>

以下のセクションでは、Schema Registry のセットアップと使用に関する概要とチュートリアルを示します。スキーマレジストリの概念および各コンポーネントの詳細については、「[AWS Glue スキーマレジストリ](schema-registry.md)」を参照してください。

**Topics**
+ [SerDe ライブラリのインストール](schema-registry-gs-serde.md)
+ [AWS Glue Schema Registry との統合](schema-registry-integrations.md)
+ [サードパーティー製のスキーマレジストリから AWS Glue Schema Registry への移行](schema-registry-integrations-migration.md)

# SerDe ライブラリのインストール
<a name="schema-registry-gs-serde"></a>

SerDe ライブラリは、データのシリアル化と非シリアル化のためのフレームワークを提供します。

データを生成するアプリケーション (総称してシリアライザ) 用の、オープンソースのシリアライザをインストールします。シリアライザは、シリアル化、圧縮、および Schema Registry とのやり取りを処理します。シリアライザは、Schema Registry 対応の送信先 (Amazon MSK など) に書き込まれるレコードから、スキーマを自動的に抽出します。同様に、データを利用するアプリケーションには、オープンソースのデシリアライザをインストールします。

# Java 実装
<a name="schema-registry-gs-serde-java"></a>

**注記**  
前提条件: 次のステップを実行する前に、Amazon Managed Streaming for Apache Kafka (Amazon MSK) または Apache Kafka クラスターを起動しておく必要があります。使用するプロデューサーとコンシューマーは、Java 8 以上で実行する必要があります。

プロデューサとコンシューマにライブラリをインストールするには

1. プロデューサとコンシューマ両方の pom.xml ファイルの中で、以下のコードにより依存関係を追加します。

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-serde</artifactId>
       <version>1.1.5</version>
   </dependency>
   ```

   または、[AWS Glue Schema Registry GitHub リポジトリ](https://github.com/awslabs/aws-glue-schema-registry)からクローンを作成することもできます。

1. 次の必須プロパティを使用してプロデューサをセットアップします。

   ```
   props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use
   props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
   properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"
   ```

   既存のスキーマがない場合は、自動登録を有効にします (次のステップ)。適用できるスキーマがある場合は、「my-schema」をそのスキーマ名に置き換えます。また、スキーマの自動登録が無効になっている場合は、「registry-name」を指定する必要もあります。スキーマが「default-registry」の下に作成されている場合は、レジストリ名を省略できます。

1. (オプション) これらのオプションのプロデューサープロパティのいずれかを設定します。プロパティの詳細については、[ReadMe ファイル](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md) を参照してください。

   ```
   props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false"
   props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams)
   props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry"
   props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours)
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD
   props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description
   props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed
   ```

   自動登録では、スキーマのバージョンがデフォルトのレジストリ (default-registry) に登録されます。`SCHEMA_NAME` が前のステップで指定されていない場合、トピック名は `SCHEMA_NAME` として推定されます。

   互換モードの詳細については、[スキーマのバージョニングと互換性](schema-registry.md#schema-registry-compatibility) を参照してください。

1. 以下の必須プロパティを使用してコンシューマを設定します。

   ```
   props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
   props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an AWS リージョン
   props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format
   ```

1. (オプション) これらのオプションのコンシューマプロパティを設定します。プロパティの詳細については、[ReadMe ファイル](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md) を参照してください。

   ```
   properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario
   ```

# C\$1 実装
<a name="schema-registry-gs-serde-csharp"></a>

**注記**  
前提条件: 次のステップを実行する前に、Amazon Managed Streaming for Apache Kafka (Amazon MSK) または Apache Kafka クラスターを起動しておく必要があります。使用するプロデューサーとコンシューマーは、.NET 8.0 以上で実行する必要があります。

## インストール
<a name="schema-registry-gs-serde-csharp-install"></a>

C\$1 アプリケーションの場合は、次のいずれかの方法を使用して AWS Glue Schema Registry SerDe NuGet パッケージをインストールします。

**.NET CLI:**  
パッケージをインストールするには、次のコマンドを使用します。

```
dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>
```

ここで、`<rid>` は `1.0.0-linux-x64`、`1.0.0-linux-musl-x64` または `1.0.0-linux-arm64` です。

**PackageReference (.csproj ファイル内):**  
プロジェクトファイルに次の内容を追加します。

```
<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />
```

ここで、`<rid>` は `1.0.0-linux-x64`、`1.0.0-linux-musl-x64` または `1.0.0-linux-arm64` です。

## 設定ファイルのセットアップ
<a name="schema-registry-gs-serde-csharp-config"></a>

必要な設定で設定プロパティファイル (`gsr-config.properties` など) を作成します。

**最小限の設定:**  
以下に最小限の設定例を示します。

```
region=us-east-1
registry.name=default-registry
dataFormat=AVRO
schemaAutoRegistrationEnabled=true
```

## Kafka SerDes 用の C\$1 Glue スキーマクライアントライブラリの使用
<a name="schema-registry-gs-serde-csharp-kafka"></a>

**シリアライザーの使用例:**  
次の例は、シリアライザーの使用方法を示しています。

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH);
var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName);
// send serialized bytes to Kafka using producer.Produce(serialized)
```

**デシリアライザーの使用例:**  
次の例は、デシリアライザーの使用方法を示しています。

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var dataConfig = new GlueSchemaRegistryDataFormatConfiguration(
    new Dictionary<string, dynamic>
    {
        {
            GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor
        }
    }
);
var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig);

// read message from Kafka using serialized = consumer.Consume()
var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);
```

## SerDes 用の KafkaFlow での C\$1 Glue スキーマクライアントライブラリの使用
<a name="schema-registry-gs-serde-csharp-kafkaflow"></a>

**シリアライザーの使用例:**  
次の例は、シリアライザーを使用して KafkaFlow を設定する方法を示しています。

```
services.AddKafka(kafka => kafka
    .UseConsoleLog()
    .AddCluster(cluster => cluster
        .WithBrokers(new[] { "localhost:9092" })
        .AddProducer<CustomerProducer>(producer => producer
            .DefaultTopic("customer-events")
            .AddMiddlewares(m => m
                .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>(
                    () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties")
                )
            )
        )
    )
);
```

**デシリアライザーの使用例:**  
次の例は、デシリアライザーを使用して KafkaFlow を設定する方法を示しています。

```
.AddConsumer(consumer => consumer
    .Topic("customer-events")
    .WithGroupId("customer-group")
    .WithBufferSize(100)
    .WithWorkersCount(10)
    .AddMiddlewares(middlewares => middlewares
        .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>(
            () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties")
        )
        .AddTypedHandlers(h => h.AddHandler<CustomerHandler>())
    )
)
```

## オプションのプロデューサープロパティ
<a name="schema-registry-gs-serde-csharp-optional"></a>

追加のオプションプロパティを使用して、設定ファイルを拡張できます。

```
# Auto-registration (if not passed, uses "false")
schemaAutoRegistrationEnabled=true

# Schema name (if not passed, uses topic name)
schema.name=my-schema

# Registry name (if not passed, uses "default-registry")
registry.name=my-registry

# Cache settings
cacheTimeToLiveMillis=86400000
cacheSize=200

# Compatibility mode (if not passed, uses BACKWARD)
compatibility=FULL

# Registry description
description=This registry is used for several purposes.

# Compression (if not passed, records are sent uncompressed)
compressionType=ZLIB
```

## サポートされている日付書式
<a name="schema-registry-gs-serde-supported-formats"></a>

Java と C\$1 の両方の実装は、同じデータ形式をサポートしています。
+ *AVRO*: Apache Avro バイナリ形式
+ *JSON*: JSON スキーマ形式
+ *PROTOBUF*: プロトコルバッファ形式

## 注意事項
<a name="schema-registry-gs-serde-csharp-notes"></a>
+ ライブラリの使用を開始するには、[https://www.nuget.org/packages/AWS.Glue.SchemaRegistry](https://www.nuget.org/packages/AWS.Glue.SchemaRegistry) にアクセスしてください
+ ソースコードは、[https://github.com/awslabs/aws-glue-schema-registry](https://github.com/awslabs/aws-glue-schema-registry) で入手できます

# レジストリを作成する
<a name="schema-registry-gs3"></a>

AWS Glue API または AWS Glue コンソールを使用して、デフォルトのレジストリを使用することも、必要な数の新しいレジストリを作成することもできます。

**AWS Glue API**  
ここでの手順により、AWS Glue API を使用しながら対象のタスクを実行できます。

AWS Glue Schema Registry API で AWS CLI を使用するには、最新バージョンの AWS CLI 使用する必要が有ります。

 新しいレジストリを追加するには、[CreateRegistry アクション (Python: create\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateRegistry) APIを使用します。`RegistryName` では、作成するレジストリの名前を指定します。この文字数は最大 255 までで、文字、数字、ハイフン、アンダースコア、ドル記号、およびハッシュ記号のみ使用できます。

2,048 バイト以下で「[URI アドレスの複数行の文字列パターン](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-common.html#aws-glue-api-common-_string-patterns)」に一致する文字列として `Description` を指定します。

オプションで、キーと値のペアのマップ配列として、1 つ以上の `Tags` をレジストリに指定します。

```
aws glue create-registry --registry-name registryName1 --description description
```

作成されたレジストリには、Amazon リソースネーム (ARN) が割り当てられます。これは、`RegistryArn` API 応答により表示することが可能です。この段階でレジストリ作成が完了しているので、そのレジストリのために 1 つ以上のスキーマを作成します。

**AWS Glue コンソール**  
AWS Glue コンソールで新しいレジストリを追加するには

1. AWS マネジメントコンソールにサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)) を開きます。

1. ナビゲーションペインの [**Data catalog**] (データカタログ) で、[**Schema registries**] (スキーマレジストリ) をクリックします。

1. [**Add registry**] (レジストリを追加) をクリックします。

1. [**Registry name**] (レジストリ名) に、文字、数字、ハイフン、アンダースコアを含むレジストリの名前を入力します。この名前は変更できません。

1. レジストリの [**Description**] (説明) を入力します　(オプション)。

1. オプションで、1 つ以上のタグをレジストリに適用します。[**Add new tag**] (新しいタグを追加) を選択し、[**Tag key**] (タグキー) とオプションの [**Tag value**] (タグ値) を指定します。

1. [**Add registry**] (レジストリを追加) をクリックします。

![\[レジストリの作成例。\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/schema_reg_create_registry.png)


作成されたレジストリには、Amazon リソースネーム (ARN) が割り当てられます。これは、[**Schema registries**] (スキーマレジストリ) のリストから選択して表示することが可能です。この段階でレジストリ作成が完了しているので、そのレジストリのために 1 つ以上のスキーマを作成します。

# JSON の特定のレコード (JAVA POJO) 処理する
<a name="schema-registry-gs-json-java-pojo"></a>

従来の単純な Java オブジェクト (POJO) を使用して、オブジェクトをレコードとして渡すことができます。これは、AVRO における特定のレコードの概念と類似しています。[mbknor-jackson-jsonschema](https://github.com/mbknor/mbknor-jackson-jsonSchema) により、渡された POJO の JSON スキーマを生成できます。また、このライブラリでは、JSON スキーマに追加情報を挿入することもできます。

AWS Glue Schema Registry ライブラリは、スキーマに注入された「className」フィールドを使用して、完全に分類されたクラス名を提供します。「className」フィールドは、そのクラスのオブジェクト内での非シリアル化のために、デシリアライザによって使用されます。

```
 Example class :

@JsonSchemaDescription("This is a car")
@JsonSchemaTitle("Simple Car Schema")
@Builder
@AllArgsConstructor
@EqualsAndHashCode
// Fully qualified class name to be added to an additionally injected property
// called className for deserializer to determine which class to deserialize
// the bytes into
@JsonSchemaInject(
        strings = {@JsonSchemaString(path = "className",
                value = "com.amazonaws.services.schemaregistry.integrationtests.generators.Car")}
)
// List of annotations to help infer JSON Schema are defined by https://github.com/mbknor/mbknor-jackson-jsonSchema
public class Car {
    @JsonProperty(required = true)
    private String make;

    @JsonProperty(required = true)
    private String model;

    @JsonSchemaDefault("true")
    @JsonProperty
    public boolean used;

    @JsonSchemaInject(ints = {@JsonSchemaInt(path = "multipleOf", value = 1000)})
    @Max(200000)
    @JsonProperty
    private int miles;

    @Min(2000)
    @JsonProperty
    private int year;

    @JsonProperty
    private Date purchaseDate;

    @JsonProperty
    @JsonFormat(shape = JsonFormat.Shape.NUMBER)
    private Date listedDate;

    @JsonProperty
    private String[] owners;

    @JsonProperty
    private Collection<Float> serviceChecks;

    // Empty constructor is required by Jackson to deserialize bytes
    // into an Object of this class
    public Car() {}
}
```

# スキーマの作成
<a name="schema-registry-gs4"></a>

スキーマは、AWS Glue API または AWS Glue コンソールを使用して作成できます。

**AWS Glue API**  
ここでの手順により、AWS Glue API を使用しながら対象のタスクを実行できます。

新しいスキーマを追加するには [CreateSchema アクション (Python: create\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateSchema) API を使用します。

`RegistryId` 構造体を使用して、スキーマのレジストリを指定します。または、`RegistryId` を省略すると、デフォルトのレジストリが使用されます。

文字、数字、ハイフン、アンダースコアを使用し、`DataFormat` として **AVRO** または **JSON** を設定しながら `SchemaName` を指定します。一度スキーマに設定された `DataFormat` は変更できません。

`Compatibility` モードを指定する。
+ *Backward (推奨)* – コンシューマは、現在のバージョンと 1 つ前のバージョンの両方を読み取ることができます。
+ *Backward all* – コンシューマは、現在のバージョンと過去のすべてのバージョンを読み取ることができます。
+ *Forward* – コンシューマは、現在のバージョンと次に続くバージョンの両方を読み取ることができます。
+ *Forward all* – コンシューマは、現在のバージョンと後続のすべてのバージョンの両方を読み取ることができます。
+ *Full* – Backward all と Forward all を組み合わせたモードです。
+ *Full all* – Backward all と Forward all を組み合わせたモードです。
+ *None* – 互換性チェックは実行されません。
+ *Disabled* – このスキーマのバージョニングを抑止します。

オプションで、スキーマに `Tags` を指定します。

`SchemaDefinition` を指定することで、スキーマのデータ形式を Avro、JSON、もしくは Protobuf として定義します。例を参照してください。

Avro データ形式の場合:

```
aws glue create-schema --registry-id RegistryName="registryName1" --schema-name testschema --compatibility NONE --data-format AVRO --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1" --schema-name testschema --compatibility NONE --data-format AVRO  --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

JSON データ形式の場合:

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

Protobuf データ形式の場合:

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

**AWS Glue コンソール**  
AWS Glue コンソールを使用して新しいスキーマを追加するには

1. AWS マネジメントコンソールにサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)) を開きます。

1. ナビゲーションペインの [**Data catalog**] (データカタログ) で、[**Schemas**] (スキーマ) をクリックします。

1. [**Add schema**] (スキーマを追加) をクリックします。

1. [**Schema name**] (スキーマ名) に、文字、数字、ハイフン、アンダースコア、ドル記号、ハッシュマークで構成された名前を入力します。この名前は変更できません。

1. [**Registry**] (レジストリ) ドロップダウンメニューから、スキーマの保存先となるレジストリを選択します。作成後は、親レジストリを変更することはできません。

1. [**Data format**] (データ形式) は、「*Apache Avro*」または「*JSON*」のままにしておきます。この形式は、このスキーマのすべてのバージョンに適用されます。

1. [**Compatibility mode**] (互換モード) をクリックします。
   + *Backward (推奨)* – レシーバーは、現在のバージョンと 1 つ前のバージョンの両方を読み取ることができます。
   + *Backward All* – レシーバーは、現在および過去のすべてのバージョンを読み取ることができます。
   + *Forward* – センダーは、現在のバージョンと 1 つ前のバージョンの両方に書き込むことができます。
   + *Forward All* – センダーは、現在のバージョンと過去のすべてのバージョンに書き込むことができます。
   + *Full* – Backward と Forward を組み合わせたモードです。
   + *Full All* – Backward All と Forward All を組み合わせたモードです。
   + *None* – 互換性チェックは実行されません。
   + *Disabled* – このスキーマのバージョニングを抑止します。

1. [**Description**] (説明) に、レジストリのための説明 (オプション) を、最大 250 文字で入力します。  
![\[スキーマの作成例。\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/schema_reg_create_schema.png)

1. オプションで、1 つ以上のタグをスキーマに適用します。[**Add new tag**] (新しいタグを追加) を選択し、[**Tag key**] (タグキー) とオプションの [**Tag value**] (タグ値) を指定します。

1. [**First schema バージョニング**] (最初のスキーマバージョン) ボックスに、初期スキーマを入力するか貼り付けます。

   Avro 形式については「[Avro データ形式での作業](#schema-registry-avro)」を参照

   JSON 形式については「[JSON データ形式での操作](#schema-registry-json)」を参照

1. 必要に応じて、[**Add metadata**] (メタデータを追加) をクリックして、スキーマバージョンの注釈付けや分類を行うためのバージョンメタデータを追加します。

1. [**Create schema and version**] (スキーマとバージョンを作成する) をクリックします。

![\[スキーマの作成例。\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/schema_reg_create_schema2.png)


スキーマが作成され、[**Schemas**] (スキーマ) の下に一覧表示されます。

## Avro データ形式での作業
<a name="schema-registry-avro"></a>

Avro では、データのシリアル化とデータ交換サービスが利用できます。Avro は、可読性と解釈しやすさのために、データ定義が JSON 形式で格納されます。データ自体はバイナリ形式で保存されます。

Apache Avro スキーマの定義については、「[Apache Avro specification](http://avro.apache.org/docs/current/spec.html)」を参照してください。

## JSON データ形式での操作
<a name="schema-registry-json"></a>

JSON 形式では、データをシリアル化できます。JSON スキーマ形式の標準は、「[JSON Schema format](https://json-schema.org/)」で定義されています。

# スキーマまたはレジストリの更新
<a name="schema-registry-gs5"></a>

作成したスキーマ、スキーマバージョン、またはレジストリは、編集することができます。

## レジストリの更新
<a name="schema-registry-gs5a"></a>

レジストリは、AWS Glue API または AWS Glue コンソールを使用して更新できます。既存のレジストリの名前は変更できません。レジストリの説明は編集が可能です。

**AWS Glue API**  
既存のレジストリを更新するには、[UpdateRegistry アクション (Python: update\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateRegistry) API を使用します。

`RegistryId` 構造体を使用して、更新するレジストリを指定します。レジストリの説明を変更するには、`Description` を渡します。

```
aws glue update-registry --description updatedDescription --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

**AWS Glue コンソール**  
AWS Glue コンソールを使用してレジストリを更新するには

1. AWS マネジメントコンソールにサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)) を開きます。

1. ナビゲーションペインの [**Data catalog**] (データカタログ) で、[**Schema registries**] (スキーマレジストリ) をクリックします。

1. レジストリの一覧から、そのチェックボックスをオンにして、レジストリを選択します。

1. [**Action**] (アクション) メニューで、[**Edit registry**] (レジストリの編集) を選択します。

# スキーマの更新
<a name="schema-registry-gs5b"></a>

スキーマでは、説明または互換性設定の更新が行えます。

既存のスキーマを更新するには、[UpdateSchema アクション (Python: update\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateSchema) API を使用します。

`SchemaId` 構造体を使用して、更新するスキーマを指定します。`VersionNumber` または `Compatibility` のいずれかを指定する必要があります。

コード例 11:

```
aws glue update-schema --description testDescription --schema-id SchemaName="testSchema1",RegistryName="registryName1" --schema-version-number LatestVersion=true --compatibility NONE
```

```
aws glue update-schema --description testDescription --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/testSchema1" --schema-version-number LatestVersion=true --compatibility NONE
```

# スキーマバージョンの追加。
<a name="schema-registry-gs5c"></a>

スキーマバージョンを追加する際は、そのバージョンを比較して、新しいスキーマが受け入れられることを確認する必要があります。

既存のスキーマに新しいバージョンを追加するには、[RegisterSchemaVersion アクション (Python: register\$1schema\$1version)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-RegisterSchemaVersion) API を使用します。

`SchemaId` 構造体を使用して、バージョンを追加するスキーマを指定し、`SchemaDefinition` によりスキーマを定義します。

コード例 12:

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaArn="arn:aws:glue:us-east-1:901234567890:schema/registryName/testschema"
```

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaName="testschema",RegistryName="testregistry"
```

1. AWS マネジメントコンソール にサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)) を開きます。

1. ナビゲーションペインの [**Data catalog**] (データカタログ) で、[**Schemas**] (スキーマ) をクリックします。

1. チェックボックスをオンにして、スキーマのリストからスキーマを選択します。

1. チェックボックスをクリックして、リストから 1 つ以上のスキーマを選択します。

1. [**Action**] (アクションメニュー) から、[**Register new version**] (新しいバージョンを登録) を選択します。

1. [**New version**] (新しいバージョン) ボックスに、新しいスキーマを入力または貼り付けます。

1. **[Compare with previous version]** (過去のバージョンと比較) をクリックして、以前のスキーマのバージョンとの違いを確認します。

1. 必要に応じて、[**Add metadata**] (メタデータを追加) をクリックして、スキーマバージョンの注釈付けや分類を行うためのバージョンメタデータを追加します。[**Key**] (キー) および (オプションの) [**Value**] (値) を入力します。

1. [**Register version**] (バージョンの登録) をクリックします。

![\[スキーマバージョンの追加。\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/schema_reg_add_schema_version.png)


バージョンの一覧の中に、スキーマのバージョンが表示されます。バージョンで互換モードが変更された場合、そのバージョンはチェックポイントとしてマークされます。

## スキーマのバージョン比較の例。
<a name="schema-registry-gs5c1"></a>

[**Compare with previous version**] (過去のバージョンと比較) をクリックすると、以前のバージョンと新しいバージョンが一緒に表示されます。変更された情報は、次のように強調表示されます。
+ *黄色*: 変更された情報を示します。
+ *緑*: 最新バージョンで追加されたコンテンツを示します。
+ *赤*: 最新バージョンで削除されたコンテンツを示します。

より古いバージョンと比較することも可能です。

![\[スキーマのバージョン比較の例。\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/schema_reg_version_comparison.png)


# スキーマまたはレジストリの削除
<a name="schema-registry-gs7"></a>

スキーマ、スキーマバージョン、またはレジストリの削除は永続的な操作であり、元に戻すことはできません。

## スキーマの削除
<a name="schema-registry-gs7a"></a>

レジストリ内で使用する必要がなくなったスキーマは、AWS マネジメントコンソール または [DeleteSchema アクション (Python: delete\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema) API を使用して削除することができます。。

1 つ以上のスキーマを削除することは永続的なアクションであり、元に戻すことはできません。(1 つあるいは複数の) スキーマが不要になったことを確認します。

レジストリからスキーマを削除するには、`SchemaId` 構造体により対象のスキーマを特定しながら [DeleteSchema アクション (Python: delete\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema) API を呼び出します。

例: 

```
aws glue delete-schema --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/schemaname"
```

```
aws glue delete-schema --schema-id SchemaName="TestSchema6-deleteschemabyname",RegistryName="default-registry"
```

**AWS Glue コンソール**  
AWS Glue コンソールからスキーマを削除するには

1. AWS マネジメントコンソールにサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)) を開きます。

1. ナビゲーションペインの [**Data catalog**] (データカタログ) で、[**Schema registries**] (スキーマレジストリ) をクリックします。

1. レジストリのリストから、自分のスキーマを含むレジストリを選択します。

1. チェックボックスをクリックして、リストから 1 つ以上のスキーマを選択します。

1. [**Action**] (アクション) メニューで、[**Delete schema**] (スキーマの削除) をクリックします。

1. フィールドに「**Delete**」というテキストを入力して、削除を確定します。

1. **[削除]** を選択します。

指定した (1 つ以上の) スキーマがレジストリから削除されます。

## スキーマバージョンの削除
<a name="schema-registry-gs7b"></a>

スキーマはレジストリに蓄積されるので、不要なスキーマバージョンは、AWS マネジメントコンソール または [DeleteSchemaVersions アクション (Python: delete\$1schema\$1versions)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchemaVersions) API を使用して削除できます。1 つ以上のスキーマバージョンを削除することは永続的なアクションであり、元に戻すことはできません。そのスキーマバージョンが不要であることを確認します。

スキーマのバージョンを削除する場合は、以下の制約に注意してください。
+ チェックポイントとなっているバージョンを削除することはできません。
+ 25 を超えて連続するバージョンの範囲を削除することはできません。
+ 最新のスキーマバージョンが保留状態にある場合は、削除は行えません。

`SchemaId` 構造体を使用してスキーマを指定し、削除するバージョンの範囲を `Versions` で指定します。バージョンまたはバージョンの範囲の指定の詳細については、「[DeleteRegistry アクション (Python: delete\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry)」を参照してください。指定したスキーマバージョンがレジストリから削除されます。

この呼び出しの後に [ListSchemaVersions アクション (Python: list\$1schema\$1versions)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-ListSchemaVersions) API を呼び出すと、削除されたバージョンのステータスが一覧表示されます。

例: 

```
aws glue delete-schema-versions --schema-id SchemaName="TestSchema6",RegistryName="default-registry" --versions "1-1"
```

```
aws glue delete-schema-versions --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/default-registry/TestSchema6-NON-Existent" --versions "1-1"
```

1. AWS マネジメントコンソールにサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)) を開きます。

1. ナビゲーションペインの [**Data catalog**] (データカタログ) で、[**Schema registries**] (スキーマレジストリ) をクリックします。

1. レジストリのリストから、自分のスキーマを含むレジストリを選択します。

1. チェックボックスをクリックして、リストから 1 つ以上のスキーマを選択します。

1. [**Action**] (アクション) メニューで、[**Delete schema**] (スキーマの削除) をクリックします。

1. フィールドに「**Delete**」というテキストを入力して、削除を確定します。

1. **[削除]** を選択します。

指定したスキーマバージョンがレジストリから削除されます。

# レジストリの削除
<a name="schema-registry-gs7c"></a>

レジストリに含まれるスキーマの整理の必要性がなくなった場合は、そのレジストリを削除することができます。これらのスキーマは、別のレジストリに再割り当てする必要があります。

1 つ以上のレジストリを削除することは永続的なアクションであり、元に戻すことはできません。(1 つもしくは複数の) レジストリが不要であることを確認します。

デフォルトのレジストリは、AWS CLI を使用して削除できます。

**AWS Glue API**  
レジストリ全体を、登録されたスキーマとそのすべてのバージョンとともに削除するには、[DeleteRegistry アクション (Python: delete\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry) API を呼び出します。`RegistryId` 構造体を使用し、レジストリを特定します。

例: 

```
aws glue delete-registry --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

```
aws glue delete-registry --registry-id RegistryName="TestRegistry-deletebyname"
```

削除オペレーションのステータスを取得するには、非同期呼び出し後に `GetRegistry` API を呼び出します。

**AWS Glue コンソール**  
AWS Glue コンソールからレジストリの削除を行うには

1. AWS マネジメントコンソールにサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)) を開きます。

1. ナビゲーションペインの [**Data catalog**] (データカタログ) で、[**Schema registries**] (スキーマレジストリ) をクリックします。

1. チェックボックスをオンにして、リストからレジストリを選択します。

1. [**Action**] (アクション) メニューで、[**Delete registry**] (レジストリの削除) を選択します。

1. フィールドに「**Delete**」というテキストを入力して、削除を確定します。

1. **[削除]** を選択します。

選択したレジストリが AWS Glue から削除されます。

## シリアライザ用の IAM の例
<a name="schema-registry-gs1"></a>

**注記**  
AWS 管理ポリシーは、一般的ユースケースに必要なアクセス許可を付与します。管理ポリシーを使用してスキーマのレジストリを管理する方法については、「[AWS Glue の AWS マネージド (事前定義) ポリシー](security-iam-awsmanpol.md#access-policy-examples-aws-managed)」を参照してください。

シリアライザの場合、以下と同様の最小限のポリシーを作成して、特定のスキーマ定義のために `schemaVersionId` を検索できるようにします。レジストリ内のスキーマを読み取るには、そのレジストリに対する読み取り許可が必要であることに注意してください。読み取り可能なレジストリは、`Resource` 句を使用して制限できます。

コード例 13:

```
{
    "Sid" : "GetSchemaByDefinition",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition"
    ],
        "Resource" : ["arn:aws:glue:us-east-2:012345678:registry/registryname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-2"
                     ]
}
```

さらに、以下の新しいメソッドを追加して、プロデューサに対し新しいスキーマとバージョンの作成を許可することもできます。レジストリ内でスキーマを追加/削除/拡大させるためには、そのレジストリを調査できることが必要です。調査することが可能なレジストリは、`Resource` 句を使用して制限できます。

コード例 14:

```
{
    "Sid" : "RegisterSchemaWithMetadata",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition",
        "glue:CreateSchema",
        "glue:RegisterSchemaVersion",
        "glue:PutSchemaVersionMetadata",
    ],
    "Resource" : ["arn:aws:glue:aws-region:123456789012:registry/registryname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-2"
                 ]
}
```

## デシリアライザー用の IAM の例
<a name="schema-registry-gs1b"></a>

デシリアライザ (コンシューマ側) の場合、以下のようなポリシーを作成する必要があります。これにより、非シリアル化のために Schema Registry からスキーマを取得することを、デシリアライザに対し許可します。レジストリ内のスキーマを取得するためには、そのレジストリを調査することが許可されている必要があります。

コード例 15:

```
{
    "Sid" : "GetSchemaVersion",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaVersion"
    ],
    "Resource" : ["*"]
}
```

## AWS PrivateLink を使用したプライベート接続
<a name="schema-registry-gs-private"></a>

AWS Glue でインターフェイス VPC エンドポイントを定義しながら AWS PrivateLinkを使用すると、データのプロデユーサの VPC を AWS Glue に接続することができます。VPC インターフェイスエンドポイントにより、AWS ネットワーク内全体で VPC と AWS Glue 間の通信を処理します。詳細については、「[Using AWS Glue with VPC Endpoints](https://docs.aws.amazon.com/glue/latest/dg/vpc-endpoint.html)」を参照してください。

# Amazon CloudWatch メトリクスへのアクセス
<a name="schema-registry-gs-monitoring"></a>

Amazon CloudWatch メトリクスは、CloudWatch の無料利用枠の一部として利用できます。これらのメトリクスは CloudWatch コンソールでアクセスできます。APIレベルのメトリクスとしては、CreateSchema (Success および Latency)、GetSchemaByDefinition (Success および Latency)、GetSchemaVersion (Success および Latency)、RegisterSchemaVersion (Success および Latency)、PutSchemaVersionMetadata (Success および Latency) があります。リソースレベルのメトリクスには、Registry.ThrottledByLimit、SchemaVersion.ThrottledByLimit、SchemaVersion.Size があります。

# スキーマレジストリの CloudFormation テンプレート例
<a name="schema-registry-integrations-cfn"></a>

以下に、CloudFormation で Schema Registry リソースを作成するためのテンプレート例を示します。アカウントにこのスタックを作成するには、上記のテンプレートを `SampleTemplate.yaml` ファイルにコピーした上で、次のコマンドを実行します。

```
aws cloudformation create-stack --stack-name ABCSchemaRegistryStack --template-body "'cat SampleTemplate.yaml'"
```

この例では、レジストリを作成するために `AWS::Glue::Registry` を、スキーマを作成するために `AWS::Glue::Schema` を、スキーマバージョンを作成するために `AWS::Glue::SchemaVersion` を使用し、`AWS::Glue::SchemaVersionMetadata` によりスキーマバージョンのメタデータを記述しています。

```
Description: "A sample CloudFormation template for creating Schema Registry resources."
Resources:
  ABCRegistry:
    Type: "AWS::Glue::Registry"
    Properties:
      Name: "ABCSchemaRegistry"
      Description: "ABC Corp. Schema Registry"
      Tags:
        Project: "Foo"
  ABCSchema:
    Type: "AWS::Glue::Schema"
    Properties:
      Registry:
        Arn: !Ref ABCRegistry
      Name: "TestSchema"
      Compatibility: "NONE"
      DataFormat: "AVRO"
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
      Tags:
        Project: "Foo"
  SecondSchemaVersion:
    Type: "AWS::Glue::SchemaVersion"
    Properties:
      Schema:
        SchemaArn: !Ref ABCSchema
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"status","type":"string", "default":"ON"}, {"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
  FirstSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !GetAtt ABCSchema.InitialSchemaVersionId
      Key: "Application"
      Value: "Kinesis"
  SecondSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !Ref SecondSchemaVersion
      Key: "Application"
      Value: "Kinesis"
```

# AWS Glue Schema Registry との統合
<a name="schema-registry-integrations"></a>

これらのセクションでは、AWS Glue スキーマレジストリとの統合について説明します。このセクションの例では、AVRO データ形式のスキーマを使用します。JSON データ形式のスキーマなど、その他の例については、「[AWS Glue Schema Registry open source repository](https://github.com/awslabs/aws-glue-schema-registry)」で、統合テストと ReadMe に関する情報をご覧ください。

**Topics**
+ [ユースケース: Schema Registry を Amazon MSK または Apache Kafka に接続する](#schema-registry-integrations-amazon-msk)
+ [ユースケース: Amazon Kinesis Data Streams と AWS Glue Schema Registry との統合](#schema-registry-integrations-kds)
+ [Amazon Managed Service for Apache Flink のユースケース](#schema-registry-integrations-kinesis-data-analytics-apache-flink)
+ [ユースケース: AWS Lambda との統合](#schema-registry-integrations-aws-lambda)
+ [ユースケース: AWS Glue Data Catalog](#schema-registry-integrations-aws-glue-data-catalog)
+ [ユースケース: AWS Glue ストリーミング](#schema-registry-integrations-aws-glue-streaming)
+ [ユースケース: Apache Kafka ストリーム](#schema-registry-integrations-apache-kafka-streams)

## ユースケース: Schema Registry を Amazon MSK または Apache Kafka に接続する
<a name="schema-registry-integrations-amazon-msk"></a>

Apache Kafka トピックにデータを書き込む場合には、以下の手順に従い作業を開始します。

1. Amazon Managed Streaming for Apache Kafka(Amazon MSK) または Apache Kafka のクラスターを作成し、少なくとも 1 つのトピックを含めます。Amazon MSK クラスターを作成する場合は、AWS マネジメントコンソール を使用します。「*Amazon Managed Streaming for Apache Kafka デベロッパーガイド*」の「[Getting Started Using Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)」にある手順に従います。

1. 上記の [SerDe ライブラリのインストール](schema-registry-gs-serde.md) ステップを実行します。

1. スキーマのレジストリ、スキーマ、またはスキーマバージョンを作成するには、このドキュメントにある [スキーマレジストリの開始方法](schema-registry-gs.md) セクションの手順に従います。

1. Amazon MSK または Apache Kafka のトピックとの間で、レコードの書き込みや読み取りを行うために、Schema Registry を使用してプロデューサとコンシューマを起動します。プロデューサとコンシューマのコード例は、Serdeライブラリの [ReadMe ファイル](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md)から入手できます。プロデューサの Schema Registry ライブラリは、レコードを自動的にシリアル化し、スキーマバージョン ID でそのレコードを修飾します。

1. このレコードにスキーマが入力済みの場合、または自動登録が有効になっている場合には、スキーマが Schema Registry に登録されます。

1. AWS Glue Schema Registry ライブラリを使用して、Amazon MSK または Apache Kafka のトピックからスキーマの読み取りを行うコンシューマは、自動的に Schema Registry からスキーマを検索します。

## ユースケース: Amazon Kinesis Data Streams と AWS Glue Schema Registry との統合
<a name="schema-registry-integrations-kds"></a>

この統合には、既存の Amazon Kinesis データストリーム が必要です。詳細については、「*Amazon Kinesis Data Streams デベロッパーガイド*」の「[Getting Started with Amazon Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/getting-started.html)」を参照してください。

Kinesis データストリームでは、データの操作用に以下の 2 つの方法があります。
+ Java の Kinesis Producer Library (KPL) および Kinesis Client Library (KCL) ライブラリを使用します。多言語サポートは提供されていません。
+ AWS SDK for Java に用意されている `PutRecords`、`PutRecord`、および `GetRecords` Kinesis Data Streams API を使用します。

現在、KPL/KCL ライブラリを使用中であれば、そのメソッドを引き続き使用することをお勧めします。ここでの例に示すように、Schema Registry が統合済みの、更新された KCL および KPL バージョンを使用できます。それ以外で、KDS API を直接使用している場合には、サンプルコードを通じて AWS Glue Schema Registry を利用します。

Schema Registry との統合は、KPL v0.14.2 以降と KCL v2.3 以降でのみ使用できます。JSON データ形式による Schema Registry との統合は、KPL v0.14.8 以降および KCL v2.3.6 以降で使用できます。

### Kinesis SDK V2 を使用したデータの操作
<a name="schema-registry-integrations-kds-sdk-v2"></a>

このセクションでは、Kinesis SDK V2 による Kinesis の操作について説明します。

```
// Example JSON Record, you can construct a AVRO record also
private static final JsonDataWithSchema record = JsonDataWithSchema.builder(schemaString, payloadString);
private static final DataFormat dataFormat = DataFormat.JSON;

//Configurations for Schema Registry
GlueSchemaRegistryConfiguration gsrConfig = new GlueSchemaRegistryConfiguration("us-east-1");

GlueSchemaRegistrySerializer glueSchemaRegistrySerializer =
        new GlueSchemaRegistrySerializerImpl(awsCredentialsProvider, gsrConfig);
GlueSchemaRegistryDataFormatSerializer dataFormatSerializer =
        new GlueSchemaRegistrySerializerFactory().getInstance(dataFormat, gsrConfig);

Schema gsrSchema =
        new Schema(dataFormatSerializer.getSchemaDefinition(record), dataFormat.name(), "MySchema");

byte[] serializedBytes = dataFormatSerializer.serialize(record);

byte[] gsrEncodedBytes = glueSchemaRegistrySerializer.encode(streamName, gsrSchema, serializedBytes);

PutRecordRequest putRecordRequest = PutRecordRequest.builder()
        .streamName(streamName)
        .partitionKey("partitionKey")
        .data(SdkBytes.fromByteArray(gsrEncodedBytes))
        .build();
shardId = kinesisClient.putRecord(putRecordRequest)
        .get()
        .shardId();

GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = new GlueSchemaRegistryDeserializerImpl(awsCredentialsProvider, gsrConfig);

GlueSchemaRegistryDataFormatDeserializer gsrDataFormatDeserializer =
        glueSchemaRegistryDeserializerFactory.getInstance(dataFormat, gsrConfig);

GetShardIteratorRequest getShardIteratorRequest = GetShardIteratorRequest.builder()
        .streamName(streamName)
        .shardId(shardId)
        .shardIteratorType(ShardIteratorType.TRIM_HORIZON)
        .build();

String shardIterator = kinesisClient.getShardIterator(getShardIteratorRequest)
        .get()
        .shardIterator();

GetRecordsRequest getRecordRequest = GetRecordsRequest.builder()
        .shardIterator(shardIterator)
        .build();
GetRecordsResponse recordsResponse = kinesisClient.getRecords(getRecordRequest)
        .get();

List<Object> consumerRecords = new ArrayList<>();
List<Record> recordsFromKinesis = recordsResponse.records();

for (int i = 0; i < recordsFromKinesis.size(); i++) {
    byte[] consumedBytes = recordsFromKinesis.get(i)
            .data()
            .asByteArray();

    Schema gsrSchema = glueSchemaRegistryDeserializer.getSchema(consumedBytes);
    Object decodedRecord = gsrDataFormatDeserializer.deserialize(ByteBuffer.wrap(consumedBytes),
                                                                    gsrSchema.getSchemaDefinition());
    consumerRecords.add(decodedRecord);
}
```

### KPL/KCL ライブラリを使用したデータの操作
<a name="schema-registry-integrations-kds-libraries"></a>

このセクションでは、KPL/KCL ライブラリを使用しての Kinesis Data Streams と Schema Registry の統合について説明します。KPL/KCL の使用方法については、「*Amazon Kinesis Data Streams デベロッパーガイド*」の「[Developing Producers Using the Amazon Kinesis Producer Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)」を参照してください。

#### KPL で Schema Registry を設定する
<a name="schema-registry-integrations-kds-libraries-kpl"></a>

1. AWS Glue Schema Registryで作成したデータ、データ形式、スキーマ名のスキーマ定義を行います。

1. 必要に応じて、`GlueSchemaRegistryConfiguration` オブジェクトも構成します。

1. `addUserRecord API` にスキーマオブジェクトを渡します。

   ```
   private static final String SCHEMA_DEFINITION = "{"namespace": "example.avro",\n"
   + " "type": "record",\n"
   + " "name": "User",\n"
   + " "fields": [\n"
   + " {"name": "name", "type": "string"},\n"
   + " {"name": "favorite_number", "type": ["int", "null"]},\n"
   + " {"name": "favorite_color", "type": ["string", "null"]}\n"
   + " ]\n"
   + "}";
   
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   config.setRegion("us-west-1")
   
   //[Optional] configuration for Schema Registry.
   
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
   new GlueSchemaRegistryConfiguration("us-west-1");
   
   schemaRegistryConfig.setCompression(true);
   
   config.setGlueSchemaRegistryConfiguration(schemaRegistryConfig);
   
   ///Optional configuration ends.
   
   final KinesisProducer producer =
         new KinesisProducer(config);
   
   final ByteBuffer data = getDataToSend();
   
   com.amazonaws.services.schemaregistry.common.Schema gsrSchema =
       new Schema(SCHEMA_DEFINITION, DataFormat.AVRO.toString(), "demoSchema");
   
   ListenableFuture<UserRecordResult> f = producer.addUserRecord(
   config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data, gsrSchema);
   
   private static ByteBuffer getDataToSend() {
         org.apache.avro.Schema avroSchema =
           new org.apache.avro.Schema.Parser().parse(SCHEMA_DEFINITION);
   
         GenericRecord user = new GenericData.Record(avroSchema);
         user.put("name", "Emily");
         user.put("favorite_number", 32);
         user.put("favorite_color", "green");
   
         ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
         Encoder encoder = EncoderFactory.get().directBinaryEncoder(outBytes, null);
         new GenericDatumWriter<>(avroSchema).write(user, encoder);
         encoder.flush();
         return ByteBuffer.wrap(outBytes.toByteArray());
    }
   ```

#### Kinesis Client Library のセットアップ
<a name="schema-registry-integrations-kds-libraries-kcl"></a>

Kinesis Client Library コンシューマーを、Java により構築します。詳細については、「*Amazon Kinesis Data Streams デベロッパーガイド*」の「[Developing a Kinesis Client Library Consumer in Java](https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-java-example.html)」を参照してください。

1. `GlueSchemaRegistryConfiguration` オブジェクトを渡すことで `GlueSchemaRegistryDeserializer` インスタンスを作成します。

1. `GlueSchemaRegistryDeserializer` を `retrievalConfig.glueSchemaRegistryDeserializer` に渡します。

1. `kinesisClientRecord.getSchema()` を呼び出して、受信メッセージのスキーマにアクセスします。

   ```
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
       new GlueSchemaRegistryConfiguration(this.region.toString());
   
    GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer =
       new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), schemaRegistryConfig);
   
    RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient));
    retrievalConfig.glueSchemaRegistryDeserializer(glueSchemaRegistryDeserializer);
   
     Scheduler scheduler = new Scheduler(
               configsBuilder.checkpointConfig(),
               configsBuilder.coordinatorConfig(),
               configsBuilder.leaseManagementConfig(),
               configsBuilder.lifecycleConfig(),
               configsBuilder.metricsConfig(),
               configsBuilder.processorConfig(),
               retrievalConfig
           );
   
    public void processRecords(ProcessRecordsInput processRecordsInput) {
               MDC.put(SHARD_ID_MDC_KEY, shardId);
               try {
                   log.info("Processing {} record(s)",
                   processRecordsInput.records().size());
                   processRecordsInput.records()
                   .forEach(
                       r ->
                           log.info("Processed record pk: {} -- Seq: {} : data {} with schema: {}",
                           r.partitionKey(), r.sequenceNumber(), recordToAvroObj(r).toString(), r.getSchema()));
               } catch (Throwable t) {
                   log.error("Caught throwable while processing records. Aborting.");
                   Runtime.getRuntime().halt(1);
               } finally {
                   MDC.remove(SHARD_ID_MDC_KEY);
               }
    }
   
    private GenericRecord recordToAvroObj(KinesisClientRecord r) {
       byte[] data = new byte[r.data().remaining()];
       r.data().get(data, 0, data.length);
       org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(r.schema().getSchemaDefinition());
       DatumReader datumReader = new GenericDatumReader<>(schema);
   
       BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(data, 0, data.length, null);
       return (GenericRecord) datumReader.read(null, binaryDecoder);
    }
   ```

#### Kinesis Data Streams API を使用したデータの操作
<a name="schema-registry-integrations-kds-apis"></a>

このセクションでは、Kinesis Data Streams API を使用しての、Kinesis Data Streams と Schema Registry の統合について説明します。

1. Maven の以下の依存関係を更新します。

   ```
   <dependencyManagement>
           <dependencies>
               <dependency>
                   <groupId>com.amazonaws</groupId>
                   <artifactId>aws-java-sdk-bom</artifactId>
                   <version>1.11.884</version>
                   <type>pom</type>
                   <scope>import</scope>
               </dependency>
           </dependencies>
       </dependencyManagement>
   
       <dependencies>
           <dependency>
               <groupId>com.amazonaws</groupId>
               <artifactId>aws-java-sdk-kinesis</artifactId>
           </dependency>
   
           <dependency>
               <groupId>software.amazon.glue</groupId>
               <artifactId>schema-registry-serde</artifactId>
               <version>1.1.5</version>
           </dependency>
   
           <dependency>
               <groupId>com.fasterxml.jackson.dataformat</groupId>
               <artifactId>jackson-dataformat-cbor</artifactId>
               <version>2.11.3</version>
           </dependency>
       </dependencies>
   ```

1. `PutRecords` または Kinesis Data Streams の `PutRecord` API を使用しながら、プロデューサ内にスキーマヘッダー情報を追加します。

   ```
   //The following lines add a Schema Header to the record
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                   schemaName);
           GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
               new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(getConfigs()));
           byte[] recordWithSchemaHeader =
               glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);
   ```

1. プロデューサ内で `PutRecords` または `PutRecord` API を使用して、レコードをデータストリームに配置します。

1. コンシューマ内で、ヘッダーからスキーマレコードを削除し、Avro スキーマレコードをシリアル化します。

   ```
   //The following lines remove Schema Header from record
           GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
               new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), getConfigs());
           byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
           recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);
           byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);
   
           //The following lines serialize an AVRO schema record
           if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
               Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
               Object genericRecord = convertBytesToRecord(avroSchema, record);
               System.out.println(genericRecord);
           }
   ```

#### Kinesis Data Streams API を使用したデータの操作
<a name="schema-registry-integrations-kds-apis-reference"></a>

以下に、`PutRecords` および `GetRecords` API を使用するコード例を示します。

```
//Full sample code
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializerImpl;
import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializerImpl;
import com.amazonaws.services.schemaregistry.utils.AVROUtils;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.glue.model.DataFormat;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;


public class PutAndGetExampleWithEncodedData {
    static final String regionName = "us-east-2";
    static final String streamName = "testStream1";
    static final String schemaName = "User-Topic";
    static final String AVRO_USER_SCHEMA_FILE = "src/main/resources/user.avsc";
    KinesisApi kinesisApi = new KinesisApi();

    void runSampleForPutRecord() throws IOException {
        Object testRecord = getTestRecord();
        byte[] recordAsBytes = convertRecordToBytes(testRecord);
        String schemaDefinition = AVROUtils.getInstance().getSchemaDefinition(testRecord);

        //The following lines add a Schema Header to a record
        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                schemaName);
        GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
            new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeader =
            glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);

        //Use PutRecords api to pass a list of records
        kinesisApi.putRecords(Collections.singletonList(recordWithSchemaHeader), streamName, regionName);

        //OR
        //Use PutRecord api to pass single record
        //kinesisApi.putRecord(recordWithSchemaHeader, streamName, regionName);
    }

    byte[] runSampleForGetRecord() throws IOException {
        ByteBuffer recordWithSchemaHeader = kinesisApi.getRecords(streamName, regionName);

        //The following lines remove the schema registry header
        GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
            new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
        recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);

        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);

        byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);

        //The following lines serialize an AVRO schema record
        if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
            Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
            Object genericRecord = convertBytesToRecord(avroSchema, record);
            System.out.println(genericRecord);
        }

        return record;
    }

    private byte[] convertRecordToBytes(final Object record) throws IOException {
        ByteArrayOutputStream recordAsBytes = new ByteArrayOutputStream();
        Encoder encoder = EncoderFactory.get().directBinaryEncoder(recordAsBytes, null);
        GenericDatumWriter datumWriter = new GenericDatumWriter<>(AVROUtils.getInstance().getSchema(record));
        datumWriter.write(record, encoder);
        encoder.flush();
        return recordAsBytes.toByteArray();
    }

    private GenericRecord convertBytesToRecord(Schema avroSchema, byte[] record) throws IOException {
        final GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(avroSchema);
        Decoder decoder = DecoderFactory.get().binaryDecoder(record, null);
        GenericRecord genericRecord = datumReader.read(null, decoder);
        return genericRecord;
    }

    private Map<String, String> getMetadata() {
        Map<String, String> metadata = new HashMap<>();
        metadata.put("event-source-1", "topic1");
        metadata.put("event-source-2", "topic2");
        metadata.put("event-source-3", "topic3");
        metadata.put("event-source-4", "topic4");
        metadata.put("event-source-5", "topic5");
        return metadata;
    }

    private GlueSchemaRegistryConfiguration getConfigs() {
        GlueSchemaRegistryConfiguration configs = new GlueSchemaRegistryConfiguration(regionName);
        configs.setSchemaName(schemaName);
        configs.setAutoRegistration(true);
        configs.setMetadata(getMetadata());
        return configs;
    }

    private Object getTestRecord() throws IOException {
        GenericRecord genericRecord;
        Schema.Parser parser = new Schema.Parser();
        Schema avroSchema = parser.parse(new File(AVRO_USER_SCHEMA_FILE));

        genericRecord = new GenericData.Record(avroSchema);
        genericRecord.put("name", "testName");
        genericRecord.put("favorite_number", 99);
        genericRecord.put("favorite_color", "red");

        return genericRecord;
    }
}
```

## Amazon Managed Service for Apache Flink のユースケース
<a name="schema-registry-integrations-kinesis-data-analytics-apache-flink"></a>

Apache Flinkは、無制限および制限付きのデータストリームに対するステートフルな計算に広く使用されている、オープンソースフレームワークの分散処理エンジンです。Amazon Managed Service for Apache Flink は、ストリーミングデータを処理するため、Apache Flink アプリケーションを構築して管理できるようにする完全マネージド型の AWS サービスです。

オープンソースの Apache Flink では、多数のソースとシンクを利用できます。例えば、事前定義済みのデータソースには、ファイル、ディレクトリ、およびソケットからの読み込みや、コレクションとイテレータからのデータの取り込みなどが含まれています。Apache Flink DataStream Connector は、Apache Flinkが、Apache Kafka や Kinesis などの各種サードパーティー製システムと、ソースおよび/またはシンクとしてインターフェースするためのコードを提供します。

詳細については、「[Amazon Kinesis Data Analytics デベロッパーガイド](https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html)」を参照してください。

### Apache Flink Kafka Connector
<a name="schema-registry-integrations-kafka-connector"></a>

Apache Flinkは、Kafka のトピックに対するデータの読み取りおよび書き込みを、正確に一度で行えるようにするための、Apache Kafka データストリームのコネクタを提供します。Flink の Kafka コンシューマ `FlinkKafkaConsumer` では、1 つ以上の Kafka トピックから読み取りを行うアクセスが提供されます。Apache Flink の Kafka プロデューサ `FlinkKafkaProducer` では、1 つ以上の Kafka トピックに対しレコードのストリームを書き込むことができます。詳細については、「[Apache Kafka Connector](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html)」を参照してください。

### Apache Flink Kinesis Streams Connector
<a name="schema-registry-integrations-kinesis-connector"></a>

Kinesis データストリームのコネクタは、Amazon Kinesis Data Streams へのアクセスを提供します。並列ストリーミングデータソース `FlinkKinesisConsumer` は、同じAWS のサービスリージョン内で複数の Kinesis ストリームにサブスクライブされ、ジョブの実行中にストリームの再シャーディングを透過的に (確実に 1 回で) 処理できます。コンシューマーの各サブタスクが、複数の Kinesis シャードからのデータレコードの取得を受け持ちます。各サブタスクによって取得されるシャードの数は、Kinesis によってシャードが閉じられ、また作成されるたびに変化します。`FlinkKinesisProducer` は Kinesis Producer Library (KPL) を使用して、Kinesis ストリーム内に Apache Flink ストリームからのデータを配置します。詳細については、「[Amazon Kinesis Streams Connector](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kinesis.html)」を参照してください。

詳細については、[AWS Glue のGitHub リポジトリ](https://github.com/awslabs/aws-glue-schema-registry)を参照してください。

### Apache Flink との統合
<a name="schema-registry-integrations-apache-flink-integrate"></a>

Schema Registry で提供されている SerDes ライブラリは、Apache Flink と統合されています。Apache Flink を使用するには、[https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java) および [https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java](https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java) のインターフェイス (`GlueSchemaRegistryAvroSerializationSchema` および `GlueSchemaRegistryAvroDeserializationSchema`) を実装する必要があります。これらは、Apache Flink コネクタにプラグインして使用します。

### Apache Flink アプリケーションへの AWS Glue Schema Registry の依存関係の追加
<a name="schema-registry-integrations-kinesis-data-analytics-dependencies"></a>

Apache Flink アプリケーション内で AWS Glue Schema Registry との統合の依存関係をセットアップするには

1. `pom.xml` ファイルに依存関係を追加します。

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-flink-serde</artifactId>
       <version>1.0.0</version>
   </dependency>
   ```

#### Kafka または Amazon MSK を Apache Flink と統合する
<a name="schema-registry-integrations-kda-integrate-msk"></a>

Kafka をソースまたはシンクとしながら、Apache Flink 対応の Managed Service for Apache Flink を使用できます。

**Kafka をソースとする場合**  
次の図は、Kafka をソースとしながら、Kinesis Data Streams と Apache Flink 対応の Managed Service for Apache Flink を統合した様子です。

![\[Kafka をソースとする場合。\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/gsr-kafka-source.png)


**Kafka をシンクとする場合**  
次の図は、Kafka をシンクとしながら、Kinesis Data Streams と Apache Flink 対応の Managed Service for Apache Flink を統合した様子です。

![\[Kafka をシンクとする場合。\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/gsr-kafka-sink.png)


Kafka をソースまたはシンクとしながら、Kafka (または Amazon MSK) を Apache Flink 対応の Managed Service for Apache Flink と統合するには、以下のコード変更を行います。太字で示されたコードブロックを、類似するセクション内の対応するコードにそれぞれ追加します。

Kafka をソースとする場合は、デシリアライザ用コード (ブロック 2) を使用します。Kafka をシンクとする場合は、シリアライザコード (ブロック 3) を使用します。

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String topic = "topic";
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKafkaConsumer<GenericRecord> consumer = new FlinkKafkaConsumer<>(
    topic,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKafkaProducer<GenericRecord> producer = new FlinkKafkaProducer<>(
    topic,
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

#### Kinesis Data Streams と Apache Flink との統合
<a name="schema-registry-integrations-integrate-kds"></a>

Kinesis Data Streams をソースまたはシンクとしながら、Apache Flink 対応の Managed Service for Apache Flink を使用できます。

**Kinesis Data Streams をソースとする場合**  
次の図は、Kinesis Data Streams をソースとしながら、Kinesis Data Streams と Apache Flink 対応の Managed Service for Apache Flink を統合した様子です。

![\[Kinesis Data Streams をソースとする場合。\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/gsr-kinesis-source.png)


**Kinesis Data Streams をシンクとする場合**  
次の図は、Kinesis Data Streams をシンクとしながら、Kinesis Data Streams と Apache Flink 対応の Managed Service for Apache Flink を統合した様子です。

![\[Kinesis Data Streams をシンクとする場合。\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/gsr-kinesis-sink.png)


Kinesis Data Streams をソースまたはシンクとしながら、Kinesis Data Streams と Apache Flink 対応の Managed Service for Apache Flink を統合するには、以下のコード変更を行います。太字で示されたコードブロックを、類似するセクション内の対応するコードにそれぞれ追加します。

Kinesis Data Streams をソースとする場合は、デシリアライザコード (ブロック 2) を使用します。Kinesis Data Streams をシンクとする場合は、シリアライザコード (ブロック 3) を使用します。

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String streamName = "stream";
Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, "aws-region");
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKinesisConsumer<GenericRecord> consumer = new FlinkKinesisConsumer<>(
    streamName,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKinesisProducer<GenericRecord> producer = new FlinkKinesisProducer<>(
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);
producer.setDefaultStream(streamName);
producer.setDefaultPartition("0");

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

## ユースケース: AWS Lambda との統合
<a name="schema-registry-integrations-aws-lambda"></a>

AWS Lambda 関数を Apache Kafka/Amazon MSK のコンシューマーとして使用し、Avro でエンコードされたメッセージを AWS Glue Schema Registry により非シリアル化するには、[MSK ラボのページ](https://amazonmsk-labs.workshop.aws/en/msklambda/gsrschemareg.html)をご覧ください。

## ユースケース: AWS Glue Data Catalog
<a name="schema-registry-integrations-aws-glue-data-catalog"></a>

AWS Glue テーブルは、手動による指定、または AWS Glue Schema Registry への参照によって指定できる、スキーマをサポートしています。AWS Glue テーブルまたは Data Catalog のパーティションを作成または更新する際に、オプションで Schema Registry に格納されているスキーマを使用できるように、Schema Registry には Data Catalog が統合されています。Schema Registry のスキーマ定義を特定するには、少なくとも、対象となるスキーマの ARN を知る必要があります。スキーマ定義を含むスキーマのスキーマバージョンは、UUID またはバージョン番号により参照が可能です。スキーマバージョンの中でも「最新」バージョンについては、バージョン番号または UUID を把握しなくても常に参照することができます。

`CreateTable` または `UpdateTable` オペレーションの呼び出し時は、Schema Registry 内の既存のスキーマに対する `TableInput` を指定するために `SchemaReference` 構造体 (`StorageDescriptor` を含む) を渡します。同様に、`GetTable` または `GetPartition` API を呼び出す場合は、そのレスポンスにスキーマと `SchemaReference` が含まれます。スキーマ参照を使用してテーブルまたはパーティションが作成されると、Data Catalog はこのスキーマ参照のスキーマ取得を試みます。Schema Registry 内にスキーマが見つからない場合は、`GetTable` レスポンスで空のスキーマを返します。それ以外では、このレスポンスにスキーマとスキーマ参照の両方が出力されます。

また、AWS Glue コンソールからアクションを実行することも可能です。

これらのオペレーションを実行し、スキーマ情報を作成、更新、表示するには、呼び出しユーザーに、`GetSchemaVersion` API へのアクセス権限を付与する、IAM ロールを付与する必要があります。

### テーブルの追加またはテーブルのスキーマの更新
<a name="schema-registry-integrations-aws-glue-data-catalog-table"></a>

既存のスキーマから新しいテーブルを追加すると、そのテーブルは特定のスキーマバージョンにバインドされます。新しいスキーマバージョンの登録が完了すると、このテーブル定義が、AWS Glue コンソールの [View table] (テーブルの表示) ページ、もしくは [UpdateTable アクション (Python: update\$1table)](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable) API を使用して更新できるようになります。

#### 既存のスキーマからのテーブルの追加
<a name="schema-registry-integrations-aws-glue-data-catalog-table-existing"></a>

AWS Glue コンソールまたは `CreateTable` API を使用して、レジストリ内のスキーマバージョンから AWS Glue を作成できます。

**AWS Glue API**  
`CreateTable` API を呼び出す際に、(`StorageDescriptor` に`SchemaReference` が指定されている) `TableInput` を、スキーマレジストリの既存のスキーマに追加します。

**AWS Glue コンソール**  
AWS Glue コンソールを使用してテーブルを作成するには

1. AWS マネジメントコンソール にサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)) を開きます。

1. ナビゲーションペインの [**Data catalog**] (データカタログ) で、[**Tables**] (テーブル) をクリックします。

1. [**Add Tables**] (テーブルの追加) メニューで、[**Add table from existing schema**] (既存のスキーマからテーブルを追加する) をクリックします。

1. テーブルのプロパティとデータストアを、AWS Glue デベロッパーガイドに沿って設定します。

1. [**Choose a Glue schema**] (Glue スキーマの選択) ページで、スキーマが置かれている [**Registry**] (レジストリ) を選択します。

1. [**Schema name**] (スキーマ名) をクリックし、適用するスキーマの [**Version**] (バージョン) を選択します。

1. スキーマのプレビューを確認し、[**Next**] (次へ) をクリックします。

1. テーブルを確認し、作成します。

作成したテーブルに適用されたスキーマとバージョンは、テーブルの一覧内で [**Glue schema**] (Glue スキーマ) 列に表示されます。テーブルを表示すると、さらに詳細を確認できます。

#### テーブルのスキーマの更新
<a name="schema-registry-integrations-aws-glue-data-catalog-table-updating"></a>

新しいスキーマバージョンが使用可能になったら、テーブルのスキーマを [UpdateTable アクション (Python: update\$1table)](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable) API または AWS Glue コンソールにより更新することができます 

**重要**  
手動で指定された AWS Glue スキーマを含む既存のテーブル用にスキーマを更新する場合、Schema Registry で参照される新しいスキーマは互換性を持たない可能性があります。この場合、ジョブが失敗することがあります。

**AWS Glue API**  
`UpdateTable` API を呼び出す際に、(`StorageDescriptor` に`SchemaReference` が指定されている) `TableInput` を、スキーマレジストリの既存のスキーマに追加します。

**AWS Glue コンソール**  
AWS Glue コンソールからテーブルのスキーマを更新するには

1. AWS マネジメントコンソール にサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)) を開きます。

1. ナビゲーションペインの [**Data catalog**] (データカタログ) で、[**Tables**] (テーブル) をクリックします。

1. テーブルの一覧でテーブルを表示します。

1. 新しいバージョンの情報が表示されたボックスで、[**Update schema**] (スキーマの更新) をクリックします。

1. 現在のスキーマと更新後のスキーマの違いを確認します。

1. さらに詳細を表示するには、[**Show all schema differences**] (スキーマの違いをすべて表示) をクリックします。

1. [**Save table**] (テーブルを保存) をクリックし、新しいバージョンを受け入れます。

## ユースケース: AWS Glue ストリーミング
<a name="schema-registry-integrations-aws-glue-streaming"></a>

AWS Glue ストリーミングは、ストリーミングソースからのデータを消費し、出力シンクに書き込む前に ETL オペレーションを実行します。入力ストリーミングソースは、データテーブルを使用して指定するか、ソース構成を指定して直接指定することができます。

AWS Glue ストリーミングは、AWS Glue スキーマレジストリに存在するスキーマで作成されたストリーミングソースのデータカタログテーブルをサポートします。AWS Glue スキーマレジストリにスキーマを作成し、そのスキーマを使用してストリーミングソースで AWS Glue テーブルを作成できます。この AWS Glue テーブルは、 AWS Glue ストリーミングジョブへの入力として使用し、入力ストリーミングのデータを逆シリアル化することができます。

注意すべき点は、AWS Glue スキーマレジスト内のスキーマが変化した場合、AWS Glue ストリーミングジョブを再度開始して、スキーマの変更を反映させる必要があることです。

## ユースケース: Apache Kafka ストリーム
<a name="schema-registry-integrations-apache-kafka-streams"></a>

Apache Kafka Streams APIは、Apache Kafka に格納されているデータを処理・分析するためのクライアントライブラリです。このセクションでは、Apache Kafka Streams と AWS Glue Schema Registry の統合について説明します。これにより、データストリーミングアプリケーションのスキーマを、管理および適用できるようになります。Apache Kafka Streams の詳細については、「[Apache Kafka Streams](https://kafka.apache.org/documentation/streams/)」を参照してください。

### SerDes ライブラリとの統合
<a name="schema-registry-integrations-apache-kafka-streams-integrate"></a>

`GlueSchemaRegistryKafkaStreamsSerde` クラスにより、Streams のアプリケーションを設定できます。

#### Kafka Streams アプリケーションのコード例
<a name="schema-registry-integrations-apache-kafka-streams-application"></a>

Apache Kafka Streams アプリケーション内で AWS Glue Schema Registry を使用するには

1. Kafka Streams アプリケーションを設定します。

   ```
   final Properties props = new Properties();
       props.put(StreamsConfig.APPLICATION_ID_CONFIG, "avro-streams");
       props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
       props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
       props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, AWSKafkaAvroSerDe.class.getName());
       props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
   
       props.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
       props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
       props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
   	props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
   ```

1. トピック avro-input からストリームを作成します。

   ```
   StreamsBuilder builder = new StreamsBuilder();
   final KStream<String, GenericRecord> source = builder.stream("avro-input");
   ```

1. データレコードを処理します (favorite\$1color の値がピンクであるか、値が 15 となっているレコードの除外など)。

   ```
   final KStream<String, GenericRecord> result = source
       .filter((key, value) -> !"pink".equals(String.valueOf(value.get("favorite_color"))));
       .filter((key, value) -> !"15.0".equals(String.valueOf(value.get("amount"))));
   ```

1. トピック avro-output に結果を書き込みます。

   ```
   result.to("avro-output");
   ```

1. Apache Kafka Streams アプリケーションを起動します。

   ```
   KafkaStreams streams = new KafkaStreams(builder.build(), props);
   streams.start();
   ```

#### 実装結果
<a name="schema-registry-integrations-apache-kafka-streams-results"></a>

以下の結果は、ステップ 3 において (favorite\$1color が「pink」であるか値が「15.0」であるために) 除外されたレコードに関するフィルタリング処理を示しています。

フィルタリング前のレコード:

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}
{"name": "Jay", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
	{"id": "commute_1","amount": 15}
```

フィルタリング後のレコード:

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
```

### ユースケース: Apache Kafka Connect
<a name="schema-registry-integrations-apache-kafka-connect"></a>

Apache Kafka Connect と AWS Glue Schema Registry を統合することで、コネクタからスキーマ情報を取得できるようになります。Apache Kafka のコンバータにより、Apache Kafka 内のデータ形式と、Apache Kafka Connect データへの変換方法を指定します。すべての Apache Kafka Connect ユーザーは、これらのコンバータを Apache Kafka との間でロードまたは保存する際に、データに適用する形式に基づいた設定を行う必要があります。これにより、Apache Kafka Connect データを AWS Glue Schema Registry で使用する型 (例: Avro) に変換する独自のコンバータを定義し、さらにシリアライザを使用してスキーマを登録しシリアル化を実行します。その後コンバータはデシリアライザを使用して、Apache Kafka から受信したデータを逆シリアル化し、元の Apache Kafka Connect データに変換することができます。ワークフローの例を以下の図に示します。

![\[Apache Kafka Connect ワークフロー。\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/schema_reg_int_kafka_connect.png)


1. [AWS Glue Schema Registry 用 Githubリポジトリ](https://github.com/awslabs/aws-glue-schema-registry)をクローンして、`aws-glue-schema-registry` プロジェクトをインストールします。

   ```
   git clone git@github.com:awslabs/aws-glue-schema-registry.git
   cd aws-glue-schema-registry
   mvn clean install
   mvn dependency:copy-dependencies
   ```

1. Apache Kafka Connect を *Standalone* モードで使用する予定の場合、このステップで以下に示した手順を使用して、**connect-standalone.properties** を更新します。Apache Kafka Connect を *Distributed* モードで使用する予定の場合は、同じ手順により **connect-avro-distributed.properties** を更新します。

   1. Apache Kafka の接続プロパティファイルにも、これらのプロパティを追加します。

      ```
      key.converter.region=aws-region
      value.converter.region=aws-region
      key.converter.schemaAutoRegistrationEnabled=true
      value.converter.schemaAutoRegistrationEnabled=true
      key.converter.avroRecordType=GENERIC_RECORD
      value.converter.avroRecordType=GENERIC_RECORD
      ```

   1. 以下のコマンドを、[**kafka-run-class.sh**] の下の [**Launch mode**] (起動モード) セクションに追加します。

      ```
      -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*"
      ```

1. [**kafka-run-class.sh**] の下の [**Launch mode**] (起動モード) セクションに以下のコマンドを追加する

   ```
   -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*" 
   ```

   プリンシパルは以下のようになります。

   ```
   # Launch mode
   if [ "x$DAEMON_MODE" = "xtrue" ]; then
     nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
   else
     exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@"
   fi
   ```

1. bash を使用している場合は、以下のコマンドを実行して、bash\$1profile で CLASSPATH を設定します。他のシェルの場合は、それに応じて環境を更新します。

   ```
   echo 'export GSR_LIB_BASE_DIR=<>' >>~/.bash_profile
   echo 'export GSR_LIB_VERSION=1.0.0' >>~/.bash_profile
   echo 'export KAFKA_HOME=<your Apache Kafka installation directory>' >>~/.bash_profile
   echo 'export CLASSPATH=$CLASSPATH:$GSR_LIB_BASE_DIR/avro-kafkaconnect-converter/target/schema-registry-kafkaconnect-converter-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/common/target/schema-registry-common-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/avro-serializer-deserializer/target/schema-registry-serde-$GSR_LIB_VERSION.jar' >>~/.bash_profile
   source ~/.bash_profile
   ```

1. (オプション) 単純なファイルをソースとして使用しテストを行う場合は、ファイルソースコネクタのクローンを作成します。

   ```
   git clone https://github.com/mmolimar/kafka-connect-fs.git
   cd kafka-connect-fs/
   ```

   1. ソースコネクタの設定で、データ形式を Avro に、ファイルリーダーを `AvroFileReader` に変更します。さらに、読み込んでいるファイルパスからサンプルの Avro オブジェクトを更新します。例: 

      ```
      vim config/kafka-connect-fs.properties
      ```

      ```
      fs.uris=<path to a sample avro object>
      policy.regexp=^.*\.avro$
      file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader
      ```

   1. ソースコネクタをインストールします。

      ```
      mvn clean package
      echo "export CLASSPATH=\$CLASSPATH:\"\$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')\"" >>~/.bash_profile
      source ~/.bash_profile
      ```

   1. `<your Apache Kafka installation directory>/config/connect-file-sink.properties` のシンクのプロパティを更新し、トピック名と出力ファイル名を更新します。

      ```
      file=<output file full path>
      topics=<my topic>
      ```

1. Source Connector (この例では、ソースファイルのコネクタ) を起動します。

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/kafka-connect-fs.properties
   ```

1. Sink Connector (この例では、シンクファイルのコネクタ) を実行します。

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-sink.properties
   ```

   Kafka Connect の使用例については、[AWS Glue Schema Registry 用 Githubリポジトリ](https://github.com/awslabs/aws-glue-schema-registry/tree/master/integration-tests)の、integration-tests フォルダにある run-local-tests.sh スクリプトでご確認ください。

# サードパーティー製のスキーマレジストリから AWS Glue Schema Registry への移行
<a name="schema-registry-integrations-migration"></a>

サードパーティー製スキーマレジストリから AWS Glue Schema Registry への移行作業は、その時点での既存のサードパーティー製スキーマレジストリにより異なります。サードパーティーのスキーマレジストリを使用して送信された Apache Kafka トピック内にレコードがある場合、コンシューマはサードパーティーのスキーマレジストリを使用して、それらのレコードを非シリアル化する必要があります。`AWSKafkaAvroDeserializer` には、セカンダリのデシリアライザクラスを定義する機能があります。このクラスは、サードパーティーのデシリアライザを特定し、対象のレコードを非シリアル化するために使用できます、

サードパーティー製スキーマには、使用停止に関して 2 つの基準があります。1 つ目の基準では、サードパーティー製スキーマレジストリを使用する Apache Kafka トピックのレコードを必要とするコンシューマが、なくなった後にのみ使用停止となります。2 つ目の基準では、トピックに指定された保持期間に応じ、Apache Kafka トピックの試用期間が終了することで使用停止となります。無制限の保持期間を持つトピックの場合は、AWS Glue Schema Registry への移行は可能ですが、サードパーティー製スキーマレジストリを使用停止にすることはできません。この回避策としては、アプリケーションまたは Mirror Maker 2 により現在のトピックを読み取り、AWS Glue Schema Registry を使用する新しいトピックとして作成し直すことができます。

サードパーティー製スキーマレジストリから AWS Glue Schema Registry へ移行するには

1. AWS Glue Schema Registry にレジストリを作成するか、デフォルトのレジストリを使用します。

1. コンシューマを停止します。AWS Glue Schema Registry をプライマリのデシリアライザとして含めるように、コンシューマを変更します、サードパーティーのスキーマレジストリをセカンダリに変更します。
   + コンシューマのプロパティを設定します。この例では、secondary\$1decerializer は別のデシリアライザに設定されています。動作は次のとおりです。コンシューマは Amazon MSK からレコードを取得し、最初に `AWSKafkaAvroDeserializer` の使用を試みます。AWS Glue Schema Registry のスキーマの、Avro スキーマ ID を含むマジックバイトを読み取ることができない場合、`AWSKafkaAvroDeserializer`は、secondary\$1deserializer で指定されるデシリアライザクラスの使用を試みます。セカンダリのデシリアライザに固有のプロパティは、以下に示すように、schema\$1registry\$1url\$1config や specific\$1avro\$1reader\$1config などのコンシューマプロパティでも指定する必要があります。

     ```
     consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, KafkaClickstreamConsumer.gsrRegion);
     consumerProps.setProperty(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, KafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "URL for third-party schema registry");
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
     ```

1. コンシューマを再起動します。

1. プロデューサを停止し、そのプロデューサで AWS Glue Schema Registry を指定します。

   1. プロデューサのプロパティを設定します。この例のプロデューサでは、デフォルトのレジストリと自動登録スキーマバージョンを使用しています。

      ```
      producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
      producerProps.setProperty(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true");
      ```

1. (オプション) 既存のスキーマおよびスキーマバージョンを、現在のサードパーティー製スキーマレジストリから AWS Glue Schema Registry (AWS Glue Schema Registry のデフォルトレジストリ、または AWS Glue Schema Registry 内の特定の非デフォルトレジストリ) に手動で移行します。これには、サードパーティーのスキーマレジストリから JSON 形式でスキーマをエクスポートし、AWS マネジメントコンソール または AWS CLI を使用しながら AWS Glue Schema Registry の新しいスキーマを作成します。

    このステップは、AWS CLI および AWS マネジメントコンソール を使用して、新しく作成されたスキーマバージョンについて、以前のスキーマバージョンとの互換性チェックを有効にする必要がある場合、またはスキーマバージョンの自動登録を有効にした新しいスキーマで、プロデユーサがメッセージを送信する場合に重要です。

1. プロデューサを起動します。