Membuat Plugin Kinesis Agent for Windows
Untuk sebagian besar situasi, membuat plugin Amazon Kinesis Agent for Microsoft Windows tidak diperlukan. Kinesis Agent for Windows sangat fleksibel untuk dikonfigurasi dan berisi sumber dan sink yang kuat, seperti DirectorySource dan KinesisStream, yang memadai untuk sebagian besar skenario. Untuk detail tentang sumber dan sink yang ada, lihat Mengonfigurasi Amazon Kinesis Agent for Microsoft Windows.
Untuk skenario yang tidak biasa, mungkin Anda perlu memperluas Kinesis Agent for Windows menggunakan plugin khusus. Beberapa skenario tersebut mencakup hal-hal berikut:
-
Mengemas deklarasi
DirectorySourceyang kompleks menggunakan pengurai catatanRegexatauDelimitedsehingga mudah diterapkan dalam berbagai jenis file konfigurasi. -
Membuat sumber baru yang tidak berbasis file atau yang melebihi kemampuan parsing yang disediakan oleh pengurai catatan yang ada.
-
Membuat sink untuk layanan AWS yang saat ini tidak didukung.
Topik
Memulai dengan Plugin Kinesis Agent for Windows
Tidak ada yang istimewa dari plugin kustom. Semua sumber dan sink yang ada menggunakan mekanisme yang sama dengan yang digunakan plugin kustom untuk memuat saat Kinesis Agent for Windows dijalankan, dan mekanisme itu membuat contoh plugin yang relevan setelah membaca file konfigurasi appsettings.json.
Ketika Kinesis Agent for Windows dimulai, urutan berikut terjadi:
-
Kinesis Agent for Windows memindai rakitan di direktori instalasi (
%PROGRAMFILES%\Amazon\AWSKinesisTap) untuk kelas yang mengimplementasikan antarmukaIFactory<T>yang ditetapkan dalam perakitanAmazon.KinesisTap.Core. Antarmuka ini didefinisikan dalamAmazon.KinesisTap.Core\Infrastructure\IFactory.csdalam kode sumber Kinesis Agent for Windows. -
Kinesis Agent for Windows memuat rakitan yang berisi kelas-kelas ini dan memanggil metode
RegisterFactorypada kelas-kelas ini. -
Kinesis Agent for Windows memuat file konfigurasi
appsettings.json. Untuk setiap sumber dan sink dalam file konfigurasi, pasangan kunci-nilaiSourceTypedanSinkTypeakan diperiksa. Jika ada pabrik yang terdaftar dengan nama yang sama dengan nilai pasangan kunci-nilaiSourceTypedanSinkType, metodeCreateInstancedipanggil pada pabrik tersebut. MetodeCreateInstancemeneruskan konfigurasi dan informasi lainnya sebagai objekIPluginContext. MetodeCreateInstancebertanggung jawab mengonfigurasi dan menginisialisasi plugin.
Agar plugin bekerja dengan benar, harus ada kelas pabrik terdaftar yang membuat plugin, dan kelas plugin itu sendiri harus didefinisikan.
Kode sumber Kinesis Agent for Windows terletak di https://github.com/awslabs/kinesis-agent-windows
Menerapkan Pabrik Plugin Kinesis Agent for Windows
Ikuti langkah-langkah berikut untuk menerapkan pabrik plugin Kinesis Agent for Windows.
Untuk membuat pabrik plugin Kinesis Agent for Windows
-
Buat proyek pustaka C# dengan target .NET Framework 4.6.
-
Tambahkan referensi ke rakitan
Amazon.KinesisTap.Core. Rakitan ini terletak di direktori%PROGRAMFILES%\Amazon\AWSKinesisTapsetelah instalasi Kinesis Agent for Windows. -
Gunakan
NuGetuntuk menginstal paketMicrosoft.Extensions.Configuration.Abstractions. -
Gunakan
NuGetuntuk menginstal paketSystem.Reactive. -
Gunakan
NuGetuntuk menginstal paketMicrosoft.Extensions.Logging. -
Buat kelas pabrik yang mengimplementasikan
IFactory<IEventSource>untuk sumber atauIFactory<IEventSink>untuk sink. Tambahkan metodeRegisterFactorydanCreateInstance.Sebagai contoh, kode berikut membuat pabrik plugin Kinesis Agent for Windows yang membuat sumber yang menghasilkan data acak:
using System; using Amazon.KinesisTap.Core; using Microsoft.Extensions.Configuration; namespace MyCompany.MySources { public class RandomSourceFactory : IFactory<ISource> { public void RegisterFactory(IFactoryCatalog<ISource> catalog) { catalog.RegisterFactory("randomsource", this); } public ISource CreateInstance(string entry, IPlugInContext context) { IConfiguration config = context.Configuration; switch (entry.ToLower()) { case "randomsource": string rateString = config["Rate"]; string maxString = config["Max"]; TimeSpan rate; int max; if (string.IsNullOrWhiteSpace(rateString)) { rate = TimeSpan.FromSeconds(30); } else { if (!TimeSpan.TryParse(rateString, out rate)) { throw new Exception($"Rate {rateString} is invalid for RandomSource."); } } if (string.IsNullOrWhiteSpace(maxString)) { max = 1000; } else { if (!int.TryParse(maxString, out max)) { throw new Exception($"Max {maxString} is invalid for RandomSource."); } } return new RandomSource(rate, max, context); default: throw new ArgumentException($"Source {entry} is not recognized.", entry); } } } }Pernyataan
switchdigunakan dalam metodeCreateInstancejika Anda akhirnya ingin meningkatkan pabrik untuk membuat berbagai jenis instans.Untuk membuat pabrik sink yang membuat sink yang tidak berfungsi apa-apa, gunakan kelas yang mirip dengan berikut ini:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Amazon.KinesisTap.Core; using Microsoft.Extensions.Configuration; namespace MyCompany.MySinks { public class NullSinkFactory : IFactory<IEventSink> { public void RegisterFactory(IFactoryCatalog<IEventSink> catalog) { catalog.RegisterFactory("nullsink", this); } public IEventSink CreateInstance(string entry, IPlugInContext context) { IConfiguration config = context.Configuration; switch (entry.ToLower()) { case "nullsink": return new NullSink(context); default: throw new Exception("Unrecognized sink type {entry}."); } } } }
Menerapkan Sumber Plugin Kinesis Agent for Windows
Ikuti langkah-langkah berikut untuk menerapkan sumber plugin Kinesis Agent untuk Windows.
Untuk membuat sumbr plugin Kinesis Agent for Windows
-
Tambahkan kelas yang mengimplementasikan antarmuka
IEventSource<out T>pada proyek yang dibuat sebelumnya untuk sumber.Sebagai contoh, gunakan kode berikut untuk menentukan sumber yang menghasilkan data acak:
using System; using System.Reactive.Subjects; using System.Timers; using Amazon.KinesisTap.Core; using Microsoft.Extensions.Logging; namespace MyCompany.MySources { public class RandomSource : EventSource<RandomData>, IDisposable { private TimeSpan _rate; private int _max; private Timer _timer = null; private Random _random = new Random(); private ISubject<IEnvelope<RandomData>> _recordSubject = new Subject<IEnvelope<RandomData>>(); public RandomSource(TimeSpan rate, int max, IPlugInContext context) : base(context) { _rate = rate; _max = max; } public override void Start() { try { CleanupTimer(); _timer = new Timer(_rate.TotalMilliseconds); _timer.Elapsed += (Object source, ElapsedEventArgs args) => { var data = new RandomData() { RandomValue = _random.Next(_max) }; _recordSubject.OnNext(new Envelope<RandomData>(data)); }; _timer.AutoReset = true; _timer.Enabled = true; _logger?.LogInformation($"Random source id {this.Id} started with rate {_rate.TotalMilliseconds}."); } catch (Exception e) { _logger?.LogError($"Exception during start of RandomSource id {this.Id}: {e}"); } } public override void Stop() { try { CleanupTimer(); _logger?.LogInformation($"Random source id {this.Id} stopped."); } catch (Exception e) { _logger?.LogError($"Exception during stop of RandomSource id {this.Id}: {e}"); } } private void CleanupTimer() { if (_timer != null) { _timer.Enabled = false; _timer?.Dispose(); _timer = null; } } public override IDisposable Subscribe(IObserver<IEnvelope<RandomData>> observer) { return this._recordSubject.Subscribe(observer); } public void Dispose() { CleanupTimer(); } } }Dalam contoh ini, kelas
RandomSourcemewarisi kelasEventSource<T>karena menyediakan propertiId. Meskipun contoh ini tidak mendukung bookmark, kelas dasar ini juga berguna untuk menerapkan fungsi itu. Amplop menyediakan cara untuk menyimpan metadata dan membungkus data arbitrer untuk streaming ke sink. KelasRandomDatadidefinisikan pada langkah berikutnya dan mewakili tipe objek output dari sumber ini. -
Tambahkan kelas untuk proyek yang ditetapkan sebelumnya yang berisi data yang dialirkan dari sumber.
Misalnya, kontainer untuk data acak dapat didefinisikan sebagai berikut:
namespace MyCompany.MySources { public class RandomData { public int RandomValue { get; set; } } } -
Kompilasi proyek yang telah ditetapkan sebelumnya.
-
Salin rakitan ke direktori instalasi untuk Kinesis Agent for Windows.
-
Buat atau perbarui file konfigurasi
appsettings.jsonyang menggunakan sumber baru, dan letakkan di direktori instalasi untuk Kinesis Agent for Windows. -
Hentikan dan kemudian jalankan Kinesis Agent for Windows.
-
Periksa berkas log Kinesis Agent for Windows saat ini (biasanya terletak di direktori
%PROGRAMDATA%\Amazon\AWSKinesisTap\logs) untuk memastikan tidak ada masalah dengan plugin sumber kustom. -
Pastikan data tiba di layanan AWS yang diinginkan.
Untuk contoh mengenai cara memperluas fungsi DirectorySource guna menerapkan parsing format log tertentu, lihat Amazon.KinesisTap.Uls\UlsSourceFactory.cs dan Amazon.KinesisTap.Uls\UlsLogParser.cs dalam kode sumber Kinesis Agent for Windows.
Untuk contoh cara membuat sumber yang menyediakan fungsionalitas bookmark, lihat Amazon.KinesisTap.Windows\WindowsSourceFactory.cs dan Amazon.KinesisTap.Windows\EventLogSource.cs dalam kode sumber Kinesis Agent for Windows.
Menerapkan Sink Plugin Kinesis Agent for Windows
Ikuti langkah-langkah berikut untuk menerapkan sink plugin Kinesis Agent for Windows.
Untuk membuat sink plugin Kinesis Agent for Windows
-
Tambahkan kelas ke proyek yang dibuat sebelumnya yang mengimplementasikan antarmuka
IEventSink.Misalnya, kode berikut mengimplementasikan sink yang tidak berfungsi apa-apa selain mencatat kedatangan catatan, yang kemudian akan dibuang.
using Amazon.KinesisTap.Core; using Microsoft.Extensions.Logging; namespace MyCompany.MySinks { public class NullSink : EventSink { public NullSink(IPlugInContext context) : base(context) { } public override void OnNext(IEnvelope envelope) { _logger.LogInformation($"Null sink {Id} received {GetRecord(envelope)}."); } public override void Start() { _logger.LogInformation($"Null sink {Id} starting."); } public override void Stop() { _logger.LogInformation($"Null sink {Id} stopped."); } } }Dalam contoh ini, kelas sink
NullSinkmewarisi kelasEventSinkkarena menyediakan kemampuan untuk mentransformasi catatan menjadi berbagai format serialisasi seperti JSON dan XML. -
Kompilasi proyek yang telah ditetapkan sebelumnya.
-
Salin rakitan ke direktori instalasi untuk Kinesis Agent for Windows.
-
Buat atau perbarui file konfigurasi
appsettings.jsonyang menggunakan sink baru, dan letakkan di direktori instalasi untuk Kinesis Agent for Windows. Misalnya, untuk menggunakan plugin kustomRandomSourcedanNullSink, Anda dapat menggunakan file konfigurasiappsettings.jsonberikut:{ "Sources": [ { "Id": "MyRandomSource", "SourceType": "RandomSource", "Rate": "00:00:10", "Max": 50 } ], "Sinks": [ { "Id": "MyNullSink", "SinkType": "NullSink", "Format": "json" } ], "Pipes": [ { "Id": "MyRandomToNullPipe", "SourceRef": "MyRandomSource", "SinkRef": "MyNullSink" } ] }Konfigurasi ini membuat sumber yang mengirimkan sebuah instans
RandomDatadenganRandomValueyang diatur ke angka acak antara 0 hingga 50 setiap 10 detik. Konfigurasi ini membuat sink yang mentransformasi intansRandomDatayang masuk menjadi JSON, mencatat JSON tersebut, kemudian membuang instans tersebut. Pastikan untuk memasukkan kedua contoh pabrik, kelas sumberRandomSource, dan kelas sinkNullSinkdalam proyek yang telah ditetapkan sebelumnya untuk menggunakan contoh file konfigurasi ini. -
Hentikan dan kemudian jalankan Kinesis Agent for Windows.
-
Periksa berkas log Kinesis Agent for Windows saat ini (biasanya terletak di direktori
%PROGRAMDATA%\Amazon\AWSKinesisTap\logs) untuk memastikan tidak ada masalah dengan plugin sink kustom. -
Pastikan data tiba di layanan AWS yang diinginkan. Karena contoh
NullSinktidak mengalirkan ke layanan AWS, Anda dapat memverifikasi operasi sink yang benar dengan mencari pesan log yang menunjukkan bahwa catatan telah diterima.Misalnya, Anda dapat melihat berkas log yang serupa dengan yang berikut ini:
2018-10-18 12:36:36.3647 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.AWS.AWSEventSinkFactory. 2018-10-18 12:36:36.4018 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.Windows.PerformanceCounterSinkFactory. 2018-10-18 12:36:36.4018 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory MyCompany.MySinks.NullSinkFactory. 2018-10-18 12:36:36.6926 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.Core.DirectorySourceFactory. 2018-10-18 12:36:36.6926 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.ExchangeSource.ExchangeSourceFactory. 2018-10-18 12:36:36.6926 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.Uls.UlsSourceFactory. 2018-10-18 12:36:36.6926 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.Windows.WindowsSourceFactory. 2018-10-18 12:36:36.6926 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory MyCompany.MySources.RandomSourceFactory. 2018-10-18 12:36:36.9601 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.Core.Pipes.PipeFactory. 2018-10-18 12:36:37.4694 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.AutoUpdate.AutoUpdateFactory. 2018-10-18 12:36:37.4807 Amazon.KinesisTap.Hosting.LogManager INFO Performance counter sink started. 2018-10-18 12:36:37.6250 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink starting. 2018-10-18 12:36:37.6250 Amazon.KinesisTap.Hosting.LogManager INFO Connected source MyRandomSource to sink MyNullSink 2018-10-18 12:36:37.6333 Amazon.KinesisTap.Hosting.LogManager INFO Random source id MyRandomSource started with rate 10000. 2018-10-18 12:36:47.8084 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":14}. 2018-10-18 12:36:57.6339 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":5}. 2018-10-18 12:37:07.6490 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":9}. 2018-10-18 12:37:17.6494 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":47}. 2018-10-18 12:37:27.6520 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":25}. 2018-10-18 12:37:37.6676 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":21}. 2018-10-18 12:37:47.6688 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":29}. 2018-10-18 12:37:57.6700 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":22}. 2018-10-18 12:38:07.6838 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":32}. 2018-10-18 12:38:17.6848 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":12}. 2018-10-18 12:38:27.6866 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":46}. 2018-10-18 12:38:37.6880 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":48}. 2018-10-18 12:38:47.6893 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":39}. 2018-10-18 12:38:57.6906 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":18}. 2018-10-18 12:39:07.6995 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":6}. 2018-10-18 12:39:17.7004 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":0}. 2018-10-18 12:39:27.7021 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":3}. 2018-10-18 12:39:37.7023 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":19}.
Jika Anda membuat sink yang mengakses layanan AWS, ada kelas dasar yang mungkin dapat membantu Anda. Untuk sink yang menggunakan kelas dasar AWSBufferedEventSink, lihat Amazon.KinesisTap.AWS\CloudWatchLogsSink.cs pada kode sumber untuk Kinesis Agent for Windows.