

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Managed Service for Apache Flink에서 싱크를 사용하여 데이터 쓰기
<a name="how-sinks"></a>

애플리케이션 코드에서는 Kinesis Data Streams 및 DynamoDB 같은 AWS 서비스를 포함한 외부 시스템에 데이터를 쓰기 위해 어떤 [Apache Flink 싱크](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) 커넥터든 사용할 수 있습니다.

Apache Flink는 파일과 소켓용 싱크를 제공하며 사용자 지정 싱크를 구현할 수도 있습니다. 지원되는 여러 싱크 중 다음 싱크가 자주 사용됩니다.

## Kinesis 데이터 스트림 사용
<a name="sinks-streams"></a>

Apache Flink는 Apache Flink 설명서에서 [Kinesis Data Streams 커넥터](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/)에 대한 정보를 제공합니다.

입력 및 출력에 Kinesis 데이터 스트림을 사용하는 애플리케이션의 예는 [자습서: Managed Service for Apache Flink에서 DataStream API 사용 시작하기](getting-started.md) 섹션을 참조하세요.

## Apache Kafka 및 Amazon Managed Streaming for Apache Kafka(MSK) 사용
<a name="sinks-MSK"></a>

[Apache Flink Kafka 커넥터](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink)는 Apache Kafka와 Amazon MSK로 데이터를 게시하기 위한 광범위한 기능을 제공하며, 정확히 한 번 처리 보장도 지원합니다. Kafka에 데이터를 쓰는 방법은 Apache Flink 설명서의 [Kafka 커넥터 예제](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors)를 참조하세요.

## Amazon S3 사용
<a name="sinks-s3"></a>

Amazon S3 버킷에 객체를 쓰는 데 Apache Flink `StreamingFileSink`를 사용할 수 있습니다.

S3에 객체를 쓰는 방법에 대한 예는 [예: Amazon S3 버킷에 쓰기](earlier.md#examples-s3) 섹션을 참조하세요.

## Firehose 사용
<a name="sinks-firehose"></a>

`FlinkKinesisFirehoseProducer`는 [Firehose](https://docs.aws.amazon.com/firehose/latest/dev/) 서비스를 사용하여 애플리케이션 출력을 저장하기 위한 안정적이고 확장 가능한 Apache Flink 싱크입니다. 이 섹션에서는 Maven 프로젝트를 설정하여 `FlinkKinesisFirehoseProducer`를 생성하고 사용하는 방법을 설명합니다.

**Topics**
+ [`FlinkKinesisFirehoseProducer` 생성](#sinks-firehose-create)
+ [`FlinkKinesisFirehoseProducer` 코드 예](#sinks-firehose-sample)

### `FlinkKinesisFirehoseProducer` 생성
<a name="sinks-firehose-create"></a>

다음 코드 예는 `FlinkKinesisFirehoseProducer`를 생성하는 방법을 보여 줍니다.

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### `FlinkKinesisFirehoseProducer` 코드 예
<a name="sinks-firehose-sample"></a>

다음 코드 예제는 `FlinkKinesisFirehoseProducer`를 생성 및 구성하고 Apache Flink 데이터 스트림에서 Firehose 서비스로 데이터를 전송하는 방법을 보여줍니다.

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

Firehose 싱크를 사용하는 방법에 대한 전체 자습서는 [예제: Firehose에 쓰기](earlier.md#get-started-exercise-fh) 섹션을 참조하세요.