View a markdown version of this page

為視窗外掛程式建立 Kinesis 代理程式 - Microsoft Windows 的 Amazon Kinesis Kinesis 代理程式

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

為視窗外掛程式建立 Kinesis 代理程式

在大多數情況下,不需要為微軟視窗外掛程式建立 Amazon Kinesis Kinesis 代理程式。針對 Windows 的 Kinesis Agent 可以高度地進行設定,並包含了強大的來源和接收,例如DirectorySourceKinesisStream,這對於大多數情況來說已經足夠了。如需現有來源和接收的詳細資訊,請參閱設定適用於微軟視窗的 Amazon Kinesis

針對不尋常的案例,便可能需要使用自訂外掛程式擴充針對 Windows 的 Kinesis Agent。其中某些案例包含以下項目:

  • 使用 RegexDelimited 記錄剖析器封裝複雜的 DirectorySource 宣告,使其易於套用在許多不同類型的組態檔案中。

  • 建立並非以檔案為基礎,或是超過現有記錄剖析器所提供剖析功能的創新來源。

  • 目前不支援建立 AWS 服務的接收。

開始使用適用於 Windows 外掛程式的 Kinesis 代理程式

自訂外掛程式並不特殊。所有現有的來源和接收都使用和 Windows 相同的機制,和自訂外掛程式相同,在啟 Kinesis 時進行載入,並且會在讀取appsettings.json組態檔案。

當 Windows 版 Kinesis 代理程式啟動時,便會發生以下順序:

  1. 針對 Windows 的 Kinesis Assembly 會掃描安裝目錄內的組件 (%PROGRAMFILES%\Amazon\AWSKinesisTap),用於實現IFactory<T>中定義的介面Amazon.KinesisTap.CoreAssembly。此界面是在Amazon.KinesisTap.Core\Infrastructure\IFactory.cs,以取得 Windows 原始程式碼。

  2. 針對 Windows 的 Kinesis Agent 會載入包含這些類別的組件,並呼叫RegisterFactory這些類的方法。

  3. 適用於視窗的 Kinesis 代理程式會載入appsettings.json組態檔案。針對組態檔案中的每個來源和接收,檢查 SourceTypeSinkType 鍵/值對。若有使用相同名稱做為 SourceTypeSinkType 鍵/值對值註冊的工廠,便會在這些工廠上呼叫 CreateInstance 方法。將組態和其他資訊做為 IPluginContext 物件傳遞給 CreateInstance 方法。CreateInstance 方法會負責設定及初始化外掛程式。

若要讓外掛程式正常運作,必須擁有建立外掛程式的註冊工廠類別,並且也必須定義外掛程式的類別本身。

針對 Windows 來源碼的 Kinesis 代理程式位於https://github.com/awslabs/kinesis-agent-windows

為視窗外掛程式工廠實作 Kinesis 代理程式

遵循下列步驟來實作針對 Windows 外掛程式工廠的 Kinesis 代理程式。

建立適用於視窗外掛程式工廠的 Kinesis 代理程式
  1. 建立瞄準 .NET Framework 4.6 的 C# 程式庫專案。

  2. 新增 Amazon.KinesisTap.Core 組件的參考。此組件位於%PROGRAMFILES%\Amazon\AWSKinesisTap目錄 Kinesis 行 Windows 安裝。

  3. 使用 NuGet 安裝 Microsoft.Extensions.Configuration.Abstractions 套件。

  4. 使用 NuGet 安裝 System.Reactive 套件。

  5. 使用 NuGet 安裝 Microsoft.Extensions.Logging 套件。

  6. 建立工廠類別,針對來源實作 IFactory<IEventSource>,或針對接收實作 IFactory<IEventSink>。新增 RegisterFactoryCreateInstance 方法。

    例如,下列程式碼會建立針對 Windows 外掛程式工廠的 Kinesis Agent,建立產生隨機資料的來源:

    using System; using Amazon.KinesisTap.Core; using Microsoft.Extensions.Configuration; namespace MyCompany.MySources { public class RandomSourceFactory : IFactory<ISource> { public void RegisterFactory(IFactoryCatalog<ISource> catalog) { catalog.RegisterFactory("randomsource", this); } public ISource CreateInstance(string entry, IPlugInContext context) { IConfiguration config = context.Configuration; switch (entry.ToLower()) { case "randomsource": string rateString = config["Rate"]; string maxString = config["Max"]; TimeSpan rate; int max; if (string.IsNullOrWhiteSpace(rateString)) { rate = TimeSpan.FromSeconds(30); } else { if (!TimeSpan.TryParse(rateString, out rate)) { throw new Exception($"Rate {rateString} is invalid for RandomSource."); } } if (string.IsNullOrWhiteSpace(maxString)) { max = 1000; } else { if (!int.TryParse(maxString, out max)) { throw new Exception($"Max {maxString} is invalid for RandomSource."); } } return new RandomSource(rate, max, context); default: throw new ArgumentException($"Source {entry} is not recognized.", entry); } } } }

    若您最後希望增強工廠來建立不同類型的執行個體,switch 陳述式會用於 CreateInstance 方法中。

    若要建立接收工廠來建立不採取任何動作的接收,請使用與以下內容相似的類別:

    using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Amazon.KinesisTap.Core; using Microsoft.Extensions.Configuration; namespace MyCompany.MySinks { public class NullSinkFactory : IFactory<IEventSink> { public void RegisterFactory(IFactoryCatalog<IEventSink> catalog) { catalog.RegisterFactory("nullsink", this); } public IEventSink CreateInstance(string entry, IPlugInContext context) { IConfiguration config = context.Configuration; switch (entry.ToLower()) { case "nullsink": return new NullSink(context); default: throw new Exception("Unrecognized sink type {entry}."); } } } }

為視窗外掛程式來源實作 Kinesis 代理程式

遵循下列步驟來實作針對 Windows 外掛程式來源的 Kinesis 代理程式。

建立 Windows 外掛程式來源的 Kinesis 代理程式
  1. 新增類別,實作 IEventSource<out T> 界面至先前為來源所建立的專案。

    例如,使用以下程式碼來定義產生隨機資料的來源:

    using System; using System.Reactive.Subjects; using System.Timers; using Amazon.KinesisTap.Core; using Microsoft.Extensions.Logging; namespace MyCompany.MySources { public class RandomSource : EventSource<RandomData>, IDisposable { private TimeSpan _rate; private int _max; private Timer _timer = null; private Random _random = new Random(); private ISubject<IEnvelope<RandomData>> _recordSubject = new Subject<IEnvelope<RandomData>>(); public RandomSource(TimeSpan rate, int max, IPlugInContext context) : base(context) { _rate = rate; _max = max; } public override void Start() { try { CleanupTimer(); _timer = new Timer(_rate.TotalMilliseconds); _timer.Elapsed += (Object source, ElapsedEventArgs args) => { var data = new RandomData() { RandomValue = _random.Next(_max) }; _recordSubject.OnNext(new Envelope<RandomData>(data)); }; _timer.AutoReset = true; _timer.Enabled = true; _logger?.LogInformation($"Random source id {this.Id} started with rate {_rate.TotalMilliseconds}."); } catch (Exception e) { _logger?.LogError($"Exception during start of RandomSource id {this.Id}: {e}"); } } public override void Stop() { try { CleanupTimer(); _logger?.LogInformation($"Random source id {this.Id} stopped."); } catch (Exception e) { _logger?.LogError($"Exception during stop of RandomSource id {this.Id}: {e}"); } } private void CleanupTimer() { if (_timer != null) { _timer.Enabled = false; _timer?.Dispose(); _timer = null; } } public override IDisposable Subscribe(IObserver<IEnvelope<RandomData>> observer) { return this._recordSubject.Subscribe(observer); } public void Dispose() { CleanupTimer(); } } }

    在此範例中,RandomSource 類別會繼承自 EventSource<T> 類別,因為它提供了 Id 屬性。雖然此範例不支援書籤,但此基礎類別在實作該功能上也相同有用。信封可提供一種方式存放中繼資料,並包裝任意資料以串流至接收。RandomData 類別會在下一個步驟中定義,並代表來自此來源輸出物件的類型。

  2. 新增類別至先前定義的專案,該專案包含了從來源串流的資料。

    例如,隨機資料的容器可定義如下:

    namespace MyCompany.MySources { public class RandomData { public int RandomValue { get; set; } } }
  3. 編譯先前定義的專案。

  4. 將組件複製到 Windows 的 Kinesis 代理程式的安裝目錄。

  5. 建立或更新appsettings.json組態檔案,並將它置放在適用於 Windows 的 Kinesis Agent 的安裝目錄內。

  6. 停止並啟動 Windows 的 Kinesis 代理程式。

  7. 檢查目前的 Kinesis 代理程式是否有 Windows 記錄檔 (通常位於%PROGRAMDATA%\Amazon\AWSKinesisTap\logs目錄),確認自訂來源外掛程式沒有任何問題。

  8. 確認資料抵達所需的 AWS 服務。

如需如何擴展DirectorySource功能來實作剖析特定日誌格式,請參閱Amazon.KinesisTap.Uls\UlsSourceFactory.csAmazon.KinesisTap.Uls\UlsLogParser.cs,以取得 Windows 原始程式碼。

如需如何建立來源,以提供書籤功能的範例,請參閱Amazon.KinesisTap.Windows\WindowsSourceFactory.csAmazon.KinesisTap.Windows\EventLogSource.cs,以取得 Windows 原始程式碼。

為視窗外掛程式接收器實作 Kinesis 代理程式

遵循下列步驟來實作針對 Windows 外掛程式接收的 Kinesis 代理程式。

若要建立 Windows 外掛程式接收器的 Kinesis 代理程式
  1. 新增類別至先前定義的專案,實作 IEventSink 界面。

    例如,以下程式碼會實作接收,除了記錄記錄抵達之外不採取任何動作,並隨即捨棄記錄。

    using Amazon.KinesisTap.Core; using Microsoft.Extensions.Logging; namespace MyCompany.MySinks { public class NullSink : EventSink { public NullSink(IPlugInContext context) : base(context) { } public override void OnNext(IEnvelope envelope) { _logger.LogInformation($"Null sink {Id} received {GetRecord(envelope)}."); } public override void Start() { _logger.LogInformation($"Null sink {Id} starting."); } public override void Stop() { _logger.LogInformation($"Null sink {Id} stopped."); } } }

    在此範例中,NullSink 接收類別繼承自 EventSink 類別,因為它提供了將記錄轉換成不同序列化格式 (例如 JSON 和 XML) 的能力。

  2. 編譯先前定義的專案。

  3. 將組件複製到 Windows 的 Kinesis 代理程式的安裝目錄。

  4. 建立或更新appsettings.json組態檔案,並將它置放在適用於 Windows 的 Kinesis Agent 的安裝目錄內。例如,若要使用 RandomSourceNullSink 自訂外掛程式,您可以使用以下 appsettings.json 組態檔案:

    { "Sources": [ { "Id": "MyRandomSource", "SourceType": "RandomSource", "Rate": "00:00:10", "Max": 50 } ], "Sinks": [ { "Id": "MyNullSink", "SinkType": "NullSink", "Format": "json" } ], "Pipes": [ { "Id": "MyRandomToNullPipe", "SourceRef": "MyRandomSource", "SinkRef": "MyNullSink" } ] }

    此組態會建立來源,傳送 RandomData 的執行個體,其中包含每 10 秒便會設為 0 至 50 間隨機數字的 RandomValue。它會建立一個接收,將傳入的 RandomData 執行個體轉換成 JSON,記錄該 JSON,然後捨棄執行個體。請務必在先前使用範例組態檔案的定義專案中包含兩個範例工廠 (RandomSource 來源類別及 NullSink 接收類別)。

  5. 停止並啟動 Windows 的 Kinesis 代理程式。

  6. 檢查目前的 Kinesis 代理程式是否有 Windows 記錄檔 (通常位於%PROGRAMDATA%\Amazon\AWSKinesisTap\logs目錄),確認自訂接收外掛程式沒有任何問題。

  7. 確認資料抵達所需的 AWS 服務。因為範例 NullSink 不會串流至 AWS 服務,您可以查看指出已接收記錄的日誌訊息,驗證接收已正常運作。

    例如,您應該會看到與以下內容相似的日誌檔案:

    2018-10-18 12:36:36.3647 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.AWS.AWSEventSinkFactory. 2018-10-18 12:36:36.4018 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.Windows.PerformanceCounterSinkFactory. 2018-10-18 12:36:36.4018 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory MyCompany.MySinks.NullSinkFactory. 2018-10-18 12:36:36.6926 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.Core.DirectorySourceFactory. 2018-10-18 12:36:36.6926 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.ExchangeSource.ExchangeSourceFactory. 2018-10-18 12:36:36.6926 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.Uls.UlsSourceFactory. 2018-10-18 12:36:36.6926 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.Windows.WindowsSourceFactory. 2018-10-18 12:36:36.6926 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory MyCompany.MySources.RandomSourceFactory. 2018-10-18 12:36:36.9601 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.Core.Pipes.PipeFactory. 2018-10-18 12:36:37.4694 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.AutoUpdate.AutoUpdateFactory. 2018-10-18 12:36:37.4807 Amazon.KinesisTap.Hosting.LogManager INFO Performance counter sink started. 2018-10-18 12:36:37.6250 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink starting. 2018-10-18 12:36:37.6250 Amazon.KinesisTap.Hosting.LogManager INFO Connected source MyRandomSource to sink MyNullSink 2018-10-18 12:36:37.6333 Amazon.KinesisTap.Hosting.LogManager INFO Random source id MyRandomSource started with rate 10000. 2018-10-18 12:36:47.8084 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":14}. 2018-10-18 12:36:57.6339 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":5}. 2018-10-18 12:37:07.6490 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":9}. 2018-10-18 12:37:17.6494 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":47}. 2018-10-18 12:37:27.6520 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":25}. 2018-10-18 12:37:37.6676 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":21}. 2018-10-18 12:37:47.6688 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":29}. 2018-10-18 12:37:57.6700 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":22}. 2018-10-18 12:38:07.6838 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":32}. 2018-10-18 12:38:17.6848 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":12}. 2018-10-18 12:38:27.6866 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":46}. 2018-10-18 12:38:37.6880 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":48}. 2018-10-18 12:38:47.6893 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":39}. 2018-10-18 12:38:57.6906 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":18}. 2018-10-18 12:39:07.6995 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":6}. 2018-10-18 12:39:17.7004 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":0}. 2018-10-18 12:39:27.7021 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":3}. 2018-10-18 12:39:37.7023 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":19}.

若您要建立存取 AWS 服務的接收,有些基礎類別可能會有所幫助。對於使用AWSBufferedEventSink基底類別,請參閱Amazon.KinesisTap.AWS\CloudWatchLogsSink.cs在原始程 Kinesis 碼中。