Esempi per Firehose con SDK per Java 2.x - Esempi di codice per SDK AWS

Sono disponibili altri esempi per SDK AWS nel repository GitHub della documentazione degli esempi per SDK AWS.

Esempi per Firehose con SDK per Java 2.x

Gli esempi di codice seguenti mostrano come eseguire operazioni e implementare scenari comuni utilizzando AWS SDK for Java 2.x con Firehose.

Le azioni sono estratti di codice da programmi più grandi e devono essere eseguite nel contesto. Sebbene le operazioni mostrino come richiamare le singole funzioni del servizio, è possibile visualizzarle contestualizzate negli scenari correlati.

Scenari: esempi di codice che mostrano come eseguire un’attività specifica chiamando più funzioni all’interno dello stesso servizio o combinate con altri Servizi AWS.

Ogni esempio include un link al codice sorgente completo, in cui vengono fornite le istruzioni su come configurare ed eseguire il codice nel contesto.

Argomenti

Azioni

L’esempio di codice seguente mostra come utilizzare PutRecord.

SDK per Java 2.x
Nota

Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS.

/** * Puts a record to the specified Amazon Kinesis Data Firehose delivery stream. * * @param record The record to be put to the delivery stream. The record must be a {@link Map} of String keys and Object values. * @param deliveryStreamName The name of the Amazon Kinesis Data Firehose delivery stream to which the record should be put. * @throws IllegalArgumentException if the input record or delivery stream name is null or empty. * @throws RuntimeException if there is an error putting the record to the delivery stream. */ public static void putRecord(Map<String, Object> record, String deliveryStreamName) { if (record == null || deliveryStreamName == null || deliveryStreamName.isEmpty()) { throw new IllegalArgumentException("Invalid input: record or delivery stream name cannot be null/empty"); } try { String jsonRecord = new ObjectMapper().writeValueAsString(record); Record firehoseRecord = Record.builder() .data(SdkBytes.fromByteArray(jsonRecord.getBytes(StandardCharsets.UTF_8))) .build(); PutRecordRequest putRecordRequest = PutRecordRequest.builder() .deliveryStreamName(deliveryStreamName) .record(firehoseRecord) .build(); getFirehoseClient().putRecord(putRecordRequest); System.out.println("Record sent: " + jsonRecord); } catch (Exception e) { throw new RuntimeException("Failed to put record: " + e.getMessage(), e); } }
  • Per informazioni dettagliate sull’API, consulta PutRecord nella documentazione di riferimento dell’API AWS SDK for Java 2.x.

L’esempio di codice seguente mostra come utilizzare PutRecordBatch.

SDK per Java 2.x
Nota

Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS.

/** * Puts a batch of records to an Amazon Kinesis Data Firehose delivery stream. * * @param records a list of maps representing the records to be sent * @param batchSize the maximum number of records to include in each batch * @param deliveryStreamName the name of the Kinesis Data Firehose delivery stream * @throws IllegalArgumentException if the input parameters are invalid (null or empty) * @throws RuntimeException if there is an error putting the record batch */ public static void putRecordBatch(List<Map<String, Object>> records, int batchSize, String deliveryStreamName) { if (records == null || records.isEmpty() || deliveryStreamName == null || deliveryStreamName.isEmpty()) { throw new IllegalArgumentException("Invalid input: records or delivery stream name cannot be null/empty"); } ObjectMapper objectMapper = new ObjectMapper(); try { for (int i = 0; i < records.size(); i += batchSize) { List<Map<String, Object>> batch = records.subList(i, Math.min(i + batchSize, records.size())); List<Record> batchRecords = batch.stream().map(record -> { try { String jsonRecord = objectMapper.writeValueAsString(record); return Record.builder() .data(SdkBytes.fromByteArray(jsonRecord.getBytes(StandardCharsets.UTF_8))) .build(); } catch (Exception e) { throw new RuntimeException("Error creating Firehose record", e); } }).collect(Collectors.toList()); PutRecordBatchRequest request = PutRecordBatchRequest.builder() .deliveryStreamName(deliveryStreamName) .records(batchRecords) .build(); PutRecordBatchResponse response = getFirehoseClient().putRecordBatch(request); if (response.failedPutCount() > 0) { response.requestResponses().stream() .filter(r -> r.errorCode() != null) .forEach(r -> System.err.println("Failed record: " + r.errorMessage())); } System.out.println("Batch sent with size: " + batchRecords.size()); } } catch (Exception e) { throw new RuntimeException("Failed to put record batch: " + e.getMessage(), e); } }
  • Per informazioni dettagliate sull’API, consulta PutRecordBatch nella documentazione di riferimento dell’API AWS SDK for Java 2.x.

Scenari

L’esempio di codice seguente mostra come utilizzare Firehose per elaborare record singoli e batch.

SDK per Java 2.x
Nota

Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS.

Questo esempio inserisce record singoli e batch in Firehose.

/** * Amazon Firehose Scenario example using Java V2 SDK. * * Demonstrates individual and batch record processing, * and monitoring Firehose delivery stream metrics. */ public class FirehoseScenario { private static FirehoseClient firehoseClient; private static CloudWatchClient cloudWatchClient; public static void main(String[] args) { final String usage = """ Usage: <deliveryStreamName> Where: deliveryStreamName - The Firehose delivery stream name. """; if (args.length != 1) { System.out.println(usage); return; } String deliveryStreamName = args[0]; try { // Read and parse sample data. String jsonContent = readJsonFile("sample_records.json"); ObjectMapper objectMapper = new ObjectMapper(); List<Map<String, Object>> sampleData = objectMapper.readValue(jsonContent, new TypeReference<>() {}); // Process individual records. System.out.println("Processing individual records..."); sampleData.subList(0, 100).forEach(record -> { try { putRecord(record, deliveryStreamName); } catch (Exception e) { System.err.println("Error processing record: " + e.getMessage()); } }); // Monitor metrics. monitorMetrics(deliveryStreamName); // Process batch records. System.out.println("Processing batch records..."); putRecordBatch(sampleData.subList(100, sampleData.size()), 500, deliveryStreamName); monitorMetrics(deliveryStreamName); } catch (Exception e) { System.err.println("Scenario failed: " + e.getMessage()); } finally { closeClients(); } } private static FirehoseClient getFirehoseClient() { if (firehoseClient == null) { firehoseClient = FirehoseClient.builder() .region(Region.US_EAST_1) .build(); } return firehoseClient; } private static CloudWatchClient getCloudWatchClient() { if (cloudWatchClient == null) { cloudWatchClient = CloudWatchClient.builder() .region(Region.US_EAST_1) .build(); } return cloudWatchClient; } /** * Puts a record to the specified Amazon Kinesis Data Firehose delivery stream. * * @param record The record to be put to the delivery stream. The record must be a {@link Map} of String keys and Object values. * @param deliveryStreamName The name of the Amazon Kinesis Data Firehose delivery stream to which the record should be put. * @throws IllegalArgumentException if the input record or delivery stream name is null or empty. * @throws RuntimeException if there is an error putting the record to the delivery stream. */ public static void putRecord(Map<String, Object> record, String deliveryStreamName) { if (record == null || deliveryStreamName == null || deliveryStreamName.isEmpty()) { throw new IllegalArgumentException("Invalid input: record or delivery stream name cannot be null/empty"); } try { String jsonRecord = new ObjectMapper().writeValueAsString(record); Record firehoseRecord = Record.builder() .data(SdkBytes.fromByteArray(jsonRecord.getBytes(StandardCharsets.UTF_8))) .build(); PutRecordRequest putRecordRequest = PutRecordRequest.builder() .deliveryStreamName(deliveryStreamName) .record(firehoseRecord) .build(); getFirehoseClient().putRecord(putRecordRequest); System.out.println("Record sent: " + jsonRecord); } catch (Exception e) { throw new RuntimeException("Failed to put record: " + e.getMessage(), e); } } /** * Puts a batch of records to an Amazon Kinesis Data Firehose delivery stream. * * @param records a list of maps representing the records to be sent * @param batchSize the maximum number of records to include in each batch * @param deliveryStreamName the name of the Kinesis Data Firehose delivery stream * @throws IllegalArgumentException if the input parameters are invalid (null or empty) * @throws RuntimeException if there is an error putting the record batch */ public static void putRecordBatch(List<Map<String, Object>> records, int batchSize, String deliveryStreamName) { if (records == null || records.isEmpty() || deliveryStreamName == null || deliveryStreamName.isEmpty()) { throw new IllegalArgumentException("Invalid input: records or delivery stream name cannot be null/empty"); } ObjectMapper objectMapper = new ObjectMapper(); try { for (int i = 0; i < records.size(); i += batchSize) { List<Map<String, Object>> batch = records.subList(i, Math.min(i + batchSize, records.size())); List<Record> batchRecords = batch.stream().map(record -> { try { String jsonRecord = objectMapper.writeValueAsString(record); return Record.builder() .data(SdkBytes.fromByteArray(jsonRecord.getBytes(StandardCharsets.UTF_8))) .build(); } catch (Exception e) { throw new RuntimeException("Error creating Firehose record", e); } }).collect(Collectors.toList()); PutRecordBatchRequest request = PutRecordBatchRequest.builder() .deliveryStreamName(deliveryStreamName) .records(batchRecords) .build(); PutRecordBatchResponse response = getFirehoseClient().putRecordBatch(request); if (response.failedPutCount() > 0) { response.requestResponses().stream() .filter(r -> r.errorCode() != null) .forEach(r -> System.err.println("Failed record: " + r.errorMessage())); } System.out.println("Batch sent with size: " + batchRecords.size()); } } catch (Exception e) { throw new RuntimeException("Failed to put record batch: " + e.getMessage(), e); } } public static void monitorMetrics(String deliveryStreamName) { Instant endTime = Instant.now(); Instant startTime = endTime.minusSeconds(600); List<String> metrics = List.of("IncomingBytes", "IncomingRecords", "FailedPutCount"); metrics.forEach(metric -> monitorMetric(metric, startTime, endTime, deliveryStreamName)); } private static void monitorMetric(String metricName, Instant startTime, Instant endTime, String deliveryStreamName) { try { GetMetricStatisticsRequest request = GetMetricStatisticsRequest.builder() .namespace("AWS/Firehose") .metricName(metricName) .dimensions(Dimension.builder().name("DeliveryStreamName").value(deliveryStreamName).build()) .startTime(startTime) .endTime(endTime) .period(60) .statistics(Statistic.SUM) .build(); GetMetricStatisticsResponse response = getCloudWatchClient().getMetricStatistics(request); double totalSum = response.datapoints().stream().mapToDouble(Datapoint::sum).sum(); System.out.println(metricName + ": " + totalSum); } catch (Exception e) { System.err.println("Failed to monitor metric " + metricName + ": " + e.getMessage()); } } public static String readJsonFile(String fileName) throws IOException { try (InputStream inputStream = FirehoseScenario.class.getResourceAsStream("/" + fileName); Scanner scanner = new Scanner(inputStream, StandardCharsets.UTF_8)) { return scanner.useDelimiter("\\\\A").next(); } catch (Exception e) { throw new RuntimeException("Error reading file: " + fileName, e); } } private static void closeClients() { try { if (firehoseClient != null) firehoseClient.close(); if (cloudWatchClient != null) cloudWatchClient.close(); } catch (Exception e) { System.err.println("Error closing clients: " + e.getMessage()); } } }
  • Per informazioni dettagliate sull’API, consulta i seguenti argomenti nella documentazione di riferimento dell’API AWS SDK for Java 2.x.