Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Implementasikan produsen
Aplikasi dalam Tutorial: Memproses data stok real-time menggunakan KPL dan KCL 1.x menggunakan skenario dunia nyata pemantauan perdagangan pasar saham. Prinsip-prinsip berikut menjelaskan secara singkat bagaimana skenario ini memetakan ke produsen dan struktur kode pendukung.
Lihat kode sumber dan tinjau informasi berikut.
- StockTrade kelas
-
Perdagangan saham individu diwakili oleh contoh
StockTradekelas. Contoh ini berisi atribut seperti simbol ticker, harga, jumlah saham, jenis perdagangan (beli atau jual), dan ID yang secara unik mengidentifikasi perdagangan. Kelas ini diterapkan untuk Anda. - Rekam aliran
-
Aliran adalah urutan catatan. Rekaman adalah serialisasi
StockTradeinstance dalam format JSON. Misalnya:{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 } - StockTradeGenerator kelas
-
StockTradeGeneratormemiliki metodegetRandomTrade()yang disebut yang mengembalikan perdagangan saham baru yang dihasilkan secara acak setiap kali dipanggil. Kelas ini diterapkan untuk Anda. - StockTradesWriter kelas
-
mainMetode produsen,StockTradesWriterterus mengambil perdagangan acak dan kemudian mengirimkannya ke Kinesis Data Streams dengan melakukan tugas-tugas berikut:-
Membaca nama aliran dan nama Wilayah sebagai masukan.
-
Menciptakan sebuah
AmazonKinesisClientBuilder. -
Menggunakan pembuat klien untuk mengatur Region, kredensial, dan konfigurasi klien.
-
Membangun
AmazonKinesisklien menggunakan pembuat klien. -
Memeriksa apakah aliran ada dan aktif (jika tidak, ia keluar dengan kesalahan).
-
Dalam loop kontinu, panggil
StockTradeGenerator.getRandomTrade()metode dan kemudiansendStockTrademetode untuk mengirim perdagangan ke aliran setiap 100 milidetik.
sendStockTradeMetodeStockTradesWriterkelas memiliki kode berikut:private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes)); try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); } }Lihat rincian kode berikut:
-
PutRecordAPI mengharapkan array byte, dan Anda harus mengonversitradeke format JSON. Baris kode tunggal ini melakukan operasi itu:byte[] bytes = trade.toJsonAsBytes(); -
Sebelum Anda dapat mengirim perdagangan, Anda membuat
PutRecordRequestinstance baru (dipanggilputRecorddalam kasus ini):PutRecordRequest putRecord = new PutRecordRequest();Setiap
PutRecordpanggilan memerlukan nama aliran, kunci partisi, dan gumpalan data. Kode berikut mengisi bidang-bidang ini dalamputRecordobjek menggunakansetXxxx()metodenya:putRecord.setStreamName(streamName); putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes));Contoh menggunakan tiket saham sebagai kunci partisi, yang memetakan catatan ke pecahan tertentu. Dalam praktiknya, Anda harus memiliki ratusan atau ribuan kunci partisi per pecahan sehingga catatan tersebar merata di seluruh aliran Anda. Untuk informasi selengkapnya tentang cara menambahkan data ke aliran, lihatTambahkan data ke aliran.
Sekarang
putRecordsiap untuk mengirim ke klien (putoperasi):kinesisClient.putRecord(putRecord); -
Pemeriksaan kesalahan dan pencatatan selalu merupakan tambahan yang berguna. Kode ini mencatat kondisi kesalahan:
if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }Tambahkan try/catch blok di sekitar
putoperasi:try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); }Ini karena operasi Kinesis
putData Streams dapat gagal karena kesalahan jaringan, atau karena aliran mencapai batas throughputnya dan terhambat. Sebaiknya pertimbangkan dengan cermat kebijakan coba ulang Anda untukputoperasi untuk menghindari kehilangan data, seperti menggunakan coba lagi. -
Pencatatan status sangat membantu tetapi opsional:
LOG.info("Putting trade: " + trade.toString());
Produser yang ditampilkan di sini menggunakan fungsionalitas rekam tunggal Kinesis Data Streams API,.
PutRecordDalam prakteknya, jika produsen individu menghasilkan banyak catatan, seringkali lebih efisien untuk menggunakan fungsi beberapa catatanPutRecordsdan mengirim batch catatan pada suatu waktu. Untuk informasi selengkapnya, lihat Tambahkan data ke aliran. -
Untuk menjalankan produser
-
Verifikasi bahwa kunci akses dan key pair rahasia yang diambil sebelumnya (saat membuat pengguna IAM) disimpan dalam file.
~/.aws/credentials -
Jalankan
StockTradeWriterkelas dengan argumen berikut:StockTradeStream us-west-2Jika Anda membuat streaming di Wilayah selain
us-west-2, Anda harus menentukan Wilayah tersebut di sini.
Anda akan melihat output yang serupa dengan yang berikut:
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
Aliran perdagangan saham Anda sekarang sedang dicerna oleh Kinesis Data Streams.