Apache Kafka
Apache Kafka(Kafka) 작업은 Amazon Managed Streaming for Apache Kafka(Amazon MSK), Confluent Cloud
참고
이 주제에서는 Apache Kafka 플랫폼 및 관련 개념에 대해 잘 알고 있다고 가정합니다. Apache Kafka에 대한 자세한 내용은 Apache Kafka
요구 사항
이 규칙 작업은 다음 요구 사항을 충족해야 합니다.
-
AWS IoT가
ec2:CreateNetworkInterface,ec2:DescribeNetworkInterfaces,ec2:CreateNetworkInterfacePermission,ec2:DeleteNetworkInterface,ec2:DescribeSubnets,ec2:DescribeVpcs,ec2:DescribeVpcAttribute및ec2:DescribeSecurityGroups작업을 수행하기 위해 수임할 수 있는 IAM 역할입니다. 이 역할은 Amazon Virtual Private Cloud에 대한 탄력적인 네트워크 인터페이스를 생성 및 관리하여 Kafka 브로커에 도달할 수 있습니다. 자세한 내용은 액세스가 필요한 AWS IoT 규칙 권한 부여 단원을 참조하세요.AWS IoT 콘솔에서 역할을 선택하거나 생성하여 AWS IoT Core가 규칙 작업을 수행할 수 있도록 합니다.
네트워크 인터페이스에 대한 자세한 내용은 Amazon EC2 사용 설명서의 탄력적 네트워크 인터페이스를 참조하세요.
지정한 역할에 연결된 정책은 다음 예시와 같습니다.
-
AWS Secrets Manager를 사용하여 Kafka 브로커에 연결하는 데 필요한 자격 증명을 저장하는 경우 AWS IoT Core가
secretsmanager:GetSecretValue및secretsmanager:DescribeSecret작업을 수행하기 위해 수임할 수 있는 IAM 역할을 생성해야 합니다.지정한 역할에 연결된 정책은 다음 예시와 같습니다.
-
Amazon Virtual Private Cloud(VPC) 내에서 Apache Kafka 클러스터를 실행할 수 있습니다. Amazon VPC 대상을 생성하고 서브넷에서 NAT 게이트웨이를 사용하여 AWS IoT에서 퍼블릭 Kafka 클러스터에 메시지를 전달해야 합니다. AWS IoT 규칙 엔진은 VPC 대상에 나열된 각 서브넷에 네트워크 인터페이스를 생성하여 트래픽을 VPC로 직접 라우팅합니다. VPC 대상을 생성할 때 AWS IoT 규칙 엔진은 VPC 규칙 작업을 자동으로 생성합니다. VPC 규칙 작업에 대한 자세한 내용은 Virtual Private Cloud(VPC) 대상 단원을 참조하세요.
-
고객 관리형 AWS KMS key(KMS 키)를 사용하여 저장 데이터를 암호화하는 경우 호출자를 대신하여 KMS 키를 사용할 수 있는 권한이 서비스에 있어야 합니다. 자세한 내용은 Amazon Managed Streaming for Apache Kafka 개발자 안내서의 Amazon MSK 암호화를 참조하세요.
파라미터
이 작업을 포함한 AWS IoT 규칙을 생성할 때 다음 정보를 지정해야 합니다.
- destinationArn
-
VPC 대상의 Amazon 리소스 이름(ARN)입니다. VPC 대상 생성에 대한 자세한 내용은 Virtual Private Cloud(VPC) 대상 단원을 참조하세요.
- 주제
-
메시지에 대한 Kafka 주제는 Kafka 브로커로 전송됩니다.
대체 템플릿을 사용하여 이 필드를 대체할 수 있습니다. 자세한 내용은 대체 템플릿 단원을 참조하세요.
- key(선택 사항)
-
Kafka 메시지 키입니다.
대체 템플릿을 사용하여 이 필드를 대체할 수 있습니다. 자세한 내용은 대체 템플릿 섹션을 참조하세요.
- 헤더(선택 사항)
-
지정한 Kafka 헤더 목록입니다. 각 헤더는 Kafka 작업을 생성할 때 지정할 수 있는 키-값 페어입니다. 메시지 페이로드를 수정하지 않고도 이러한 헤더를 사용하여 IoT 클라이언트의 데이터를 다운스트림 Kafka 클러스터로 라우팅할 수 있습니다.
대체 템플릿을 사용하여 이 필드를 대체할 수 있습니다. Kafka 작업의 헤더에서 인라인 규칙의 함수를 대체 템플릿으로 전달하는 방법을 이해하려면 예시를 참조하세요. 자세한 내용은 대체 템플릿 섹션을 참조하세요.
참고
바이너리 형식의 헤더는 지원되지 않습니다.
- partition(선택 사항)
-
Kafka 메시지 파티션입니다.
대체 템플릿을 사용하여 이 필드를 대체할 수 있습니다. 자세한 내용은 대체 템플릿 단원을 참조하세요.
- clientProperties
-
Apache Kafka 프로듀서 클라이언트의 속성을 정의하는 객체입니다.
- acks(선택 사항)
-
프로듀서가 요청이 완료되었음을 고려하기 전에 서버가 수신해야 하는 승인 수입니다.
값으로 0을 지정하면 프로듀서는 서버의 승인을 기다리지 않습니다. 서버에서 메시지를 받지 못하면 프로듀서는 메시지를 전송하려고 다시 시도하지 않습니다.
유효한 값:
-1,0,1,all. 기본값은1입니다. - bootstrap.servers
-
Kafka 클러스터에 대한 초기 연결을 설정하는 데 사용되는 호스트 및 포트 페어 목록(예:
host1:port1,host2:port2)입니다. - compression.type(선택 사항)
-
생산자가 생성한 모든 데이터에 대한 압축 유형입니다.
유효한 값:
none,gzip,snappy,lz4.zstd기본값은none입니다. - security.protocol
-
Kafka 브로커에 연결하는 데 사용되는 보안 프로토콜입니다.
유효한 값:
SSL,SASL_SSL. 기본값은SSL입니다. - key.serializer
-
ProducerRecord와 함께 제공되는 키 객체를 바이트로 변환하는 방법을 지정합니다.유효한 값:
StringSerializer. - value.serializer
-
ProducerRecord와 함께 제공하는 값 객체를 바이트로 변환하는 방법을 지정합니다.유효한 값:
ByteBufferSerializer. - ssl.truststore
-
base64 형식의 truststore 파일 또는 AWS Secrets Manager의 truststore 파일 위치입니다. Amazon 인증 기관(CA)에서 사용자의 트러스트 스토어를 신뢰하는 경우에는 이 값이 필요하지 않습니다.
이 필드는 대체 템플릿을 지원합니다. Kafka 브로커에 연결하는 데 필요한 보안 인증 정보를 저장하는 데 Secrets Manager를 사용하는 경우,
get_secretSQL 함수를 사용하여 이 필드의 값을 검색할 수 있습니다. 대체 변수에 대한 자세한 내용은 대체 템플릿 단원을 참조하세요.get_secretSQL 함수에 대한 자세한 내용은 get_secret(secretId, secretType, key, roleArn) 섹션을 참조하세요. truststore가 파일 형식인 경우SecretBinary파라미터를 사용합니다. truststore가 문자열 형식인 경우SecretString파라미터를 사용합니다.이 값의 최대 크기는 65KB입니다.
- ssl.truststore.password
-
truststore의 암호입니다. 이 값은 truststore의 암호를 만든 경우에만 필요합니다.
- ssl.keystore
-
keystore 파일입니다.
SSL을security.protocol의 값으로 지정할 때 이 값이 필요합니다.이 필드는 대체 템플릿을 지원합니다. Secrets Manager를 사용하여 Kafka 브로커에 연결하는 데 필요한 보안 인증 정보를 저장합니다.
get_secretSQL 함수를 사용하여 이 필드의 값을 검색합니다. 대체 변수에 대한 자세한 내용은 대체 템플릿 단원을 참조하세요.get_secretSQL 함수에 대한 자세한 내용은 get_secret(secretId, secretType, key, roleArn) 섹션을 참조하세요.SecretBinary파라미터를 사용합니다. - ssl.keystore.password
-
keystore 파일의 스토어 암호입니다.
ssl.keystore에 값이 지정된 경우 이 값이 필요합니다.이 필드의 값은 일반 텍스트일 수 있습니다. 이 필드는 대체 템플릿도 지원합니다. Secrets Manager를 사용하여 Kafka 브로커에 연결하는 데 필요한 보안 인증 정보를 저장합니다.
get_secretSQL 함수를 사용하여 이 필드의 값을 검색합니다. 대체 변수에 대한 자세한 내용은 대체 템플릿 단원을 참조하세요.get_secretSQL 함수에 대한 자세한 내용은 get_secret(secretId, secretType, key, roleArn) 섹션을 참조하세요.SecretString파라미터를 사용합니다. - ssl.key.password
-
keystore 파일에 있는 프라이빗 키의 암호입니다.
이 필드는 대체 템플릿을 지원합니다. Secrets Manager를 사용하여 Kafka 브로커에 연결하는 데 필요한 보안 인증 정보를 저장합니다.
get_secretSQL 함수를 사용하여 이 필드의 값을 검색합니다. 대체 변수에 대한 자세한 내용은 대체 템플릿 단원을 참조하세요.get_secretSQL 함수에 대한 자세한 내용은 get_secret(secretId, secretType, key, roleArn) 섹션을 참조하세요.SecretString파라미터를 사용합니다. - sasl.mechanism
-
Kafka 브로커에 연결하는 데 사용되는 보안 메커니즘입니다.
security.protocol에SASL_SSL을 지정할 때 이 값이 필요합니다.유효한 값:
PLAIN,SCRAM-SHA-512,GSSAPI.참고
SCRAM-SHA-512는 cn-north-1, cn-northwest-1, us-gov-east-1, and us-gov-west-1 리전에서 유일하게 지원되는 보안 메커니즘입니다. - sasl.plain.username
-
Secrets Manager에서 보안 문자열을 검색하는 데 사용되는 사용자 이름입니다.
security.protocol에SASL_SSL,sasl.mechanism에PLAIN을 지정할 때 이 값이 필요합니다. - sasl.plain.password
-
Secrets Manager에서 보안 문자열을 검색하는 데 사용되는 암호입니다.
security.protocol에SASL_SSL,sasl.mechanism에PLAIN을 지정할 때 이 값이 필요합니다. - sasl.scram.username
-
Secrets Manager에서 보안 문자열을 검색하는 데 사용되는 사용자 이름입니다.
security.protocol에SASL_SSL,sasl.mechanism에SCRAM-SHA-512을 지정할 때 이 값이 필요합니다. - sasl.scram.password
-
Secrets Manager에서 보안 문자열을 검색하는 데 사용되는 암호입니다.
security.protocol에SASL_SSL,sasl.mechanism에SCRAM-SHA-512을 지정할 때 이 값이 필요합니다. - sasl.kerberos.keytab
-
Secrets Manager의 Kerberos 인증을 위한 keytab 파일입니다.
security.protocol에SASL_SSL,sasl.mechanism에GSSAPI을 지정할 때 이 값이 필요합니다.이 필드는 대체 템플릿을 지원합니다. Secrets Manager를 사용하여 Kafka 브로커에 연결하는 데 필요한 보안 인증 정보를 저장합니다.
get_secretSQL 함수를 사용하여 이 필드의 값을 검색합니다. 대체 변수에 대한 자세한 내용은 대체 템플릿 단원을 참조하세요.get_secretSQL 함수에 대한 자세한 내용은 get_secret(secretId, secretType, key, roleArn) 섹션을 참조하세요.SecretBinary파라미터를 사용합니다. - sasl.kerberos.service.name
-
Apache Kafka가 실행되는 Kerberos 보안 주체 이름입니다.
security.protocol에SASL_SSL,sasl.mechanism에GSSAPI을 지정할 때 이 값이 필요합니다. - sasl.kerberos.krb5.kdc
-
Apache Kafka 프로듀서 클라이언트가 연결하는 키 배포 센터(KDC)의 호스트 이름입니다.
security.protocol에SASL_SSL,sasl.mechanism에GSSAPI을 지정할 때 이 값이 필요합니다. - sasl.kerberos.krb5.realm
-
Apache Kafka 프로듀서 클라이언트가 연결되는 영역입니다.
security.protocol에SASL_SSL,sasl.mechanism에GSSAPI을 지정할 때 이 값이 필요합니다. - sasl.kerberos.principal
-
Kerberos가 Kerberos 인식 서비스에 액세스하기 위해 티켓을 할당할 수 있는 고유한 Kerberos ID입니다.
security.protocol에SASL_SSL,sasl.mechanism에GSSAPI을 지정할 때 이 값이 필요합니다.
예시
다음 JSON 예제는 AWS IoT 규칙에서 Apache Kafka 작업을 정의합니다. 다음 예시에서는 sourceIp() 인라인 함수를 Kafka 작업 헤더의 대체 템플릿으로 전달합니다.
{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }
Kerberos 설정에 대한 중요 참고 사항
-
대상 VPC 내의 프라이빗 도메인 이름 시스템(DNS)을 통해 키 배포 센터(KDC) 를 확인할 수 있어야 합니다. 한 가지 가능한 방법은 KDC DNS 항목을 프라이빗 호스팅 영역에 추가하는 것입니다. 이 접근법에 대한 자세한 내용은 프라이빗 호스팅 영역 작업을 참조하세요.
-
각 VPC에는 DNS 확인이 활성화되어 있어야 합니다. 자세한 내용은 VPC에서 DNS 사용하기 단원을 참조하세요.
-
VPC 대상의 네트워크 인터페이스 보안 그룹과 인스턴스 수준 보안 그룹은 다음 포트에서 VPC 내의 트래픽을 허용해야 합니다.
-
부트스트랩 브로커 리스너 포트의 TCP 트래픽(대개 9092이지만 9000~9100 범위 내에 있어야 함)
-
포트 88에서 KDC에 대한 TCP 및 UDP 트래픽
-
-
SCRAM-SHA-512는 cn-north-1, cn-northwest-1, us-gov-east-1, and us-gov-west-1 리전에서 유일하게 지원되는 보안 메커니즘입니다.