本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
为 Windows 插件创建 Kinesis 代理
对于大多数情况下,不需要为微软 Windows 插件创建 Amazon Kinesis 代理。Windows 的 Kinesis Agent 高度可配置,并包含功能强大的源和接收器,例如DirectorySource和KinesisStream,这对于大多数情况都足够了。有关现有源和接收器的详细信息,请参阅为微软 Windows 配置 Amazon Kinesis 运行代理。
对于不常见的场景,可能需要使用定制插件扩展 Windows 的 Kinesis 代理。这样的一些场景包括:
-
使用
Regex或Delimited记录解析程序打包复杂的DirectorySource声明,使其易于在多种不同类型的配置文件中应用。 -
创建并非基于文件或者超过了现有记录解析程序所提供解析功能的新源。
-
为 AWS 服务创建当前不支持的接收器。
主题
适用于 Windows 插件的 Kinesis 代理入门
关于自定义插件没有特殊要求。所有现有源和接收器使用的机制,与自定义插件在 Windows 启动时加载的机制相同,在读取appsettings.json配置文件。
当 Windows 的 Kinesis 代理程序集时,操作按照以下顺序进行:
-
Windows Kinesis 代理扫描安装目录中的程序集(
%PROGRAMFILES%\Amazon\AWSKinesisTap),用于实现IFactory<T>接口中定义Amazon.KinesisTap.Core程序集。此接口定义在Amazon.KinesisTap.Core\Infrastructure\IFactory.cs在 Kinesis 代理程序的 Windows 源代码中。 -
Windows 的 Kinesis 代理加载包含这些类的程序集并调用
RegisterFactory方法。 -
适用于 Windows 的 Kinesis 代理加载
appsettings.json配置文件。对于配置文件中的每个源和接收器,检查SourceType和SinkType键/值对。如果有工厂的注册名称与SourceType和SinkType键/值对中的值相同,则调用这些工厂上的CreateInstance方法。CreateInstance方法将配置和其他信息作为IPluginContext对象传递。CreateInstance方法负责配置和初始化插件。
要使插件正确工作,必须有创建插件的已注册工厂类,并且插件类本身必须已定义。
Windows Kinesis 代理位于https://github.com/awslabs/kinesis-agent-windows
为 Windows 插件工厂实施 Kinesis 代理
按照以下步骤来实施 Windows 插件工厂的 Kinesis 代理。
为 Windows 插件工厂创建一个 Kinesis 代理
-
创建定位到 .NET Framework 4.6 的 C# 库项目。
-
添加对
Amazon.KinesisTap.Core程序集的引用。此程序集位于%PROGRAMFILES%\Amazon\AWSKinesisTap目录之后的 Kinesis 代理进行 Windows 安装。 -
使用
NuGet安装Microsoft.Extensions.Configuration.Abstractions程序包。 -
使用
NuGet安装System.Reactive程序包。 -
使用
NuGet安装Microsoft.Extensions.Logging程序包。 -
创建为源实施
IFactory<IEventSource>或为接收器实施IFactory<IEventSink>的工厂类。添加RegisterFactory和CreateInstance方法。例如,以下代码创建 Windows 插件工厂的 Kinesis 代理,该工厂创建生成随机数据的源:
using System; using Amazon.KinesisTap.Core; using Microsoft.Extensions.Configuration; namespace MyCompany.MySources { public class RandomSourceFactory : IFactory<ISource> { public void RegisterFactory(IFactoryCatalog<ISource> catalog) { catalog.RegisterFactory("randomsource", this); } public ISource CreateInstance(string entry, IPlugInContext context) { IConfiguration config = context.Configuration; switch (entry.ToLower()) { case "randomsource": string rateString = config["Rate"]; string maxString = config["Max"]; TimeSpan rate; int max; if (string.IsNullOrWhiteSpace(rateString)) { rate = TimeSpan.FromSeconds(30); } else { if (!TimeSpan.TryParse(rateString, out rate)) { throw new Exception($"Rate {rateString} is invalid for RandomSource."); } } if (string.IsNullOrWhiteSpace(maxString)) { max = 1000; } else { if (!int.TryParse(maxString, out max)) { throw new Exception($"Max {maxString} is invalid for RandomSource."); } } return new RandomSource(rate, max, context); default: throw new ArgumentException($"Source {entry} is not recognized.", entry); } } } }当您最终希望增强工厂以创建不同类型的实例时,
switch语句在CreateInstance方法中使用。要创建接收器工厂(该工厂创建不执行任何操作的接收器),请使用类似于下文的类:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Amazon.KinesisTap.Core; using Microsoft.Extensions.Configuration; namespace MyCompany.MySinks { public class NullSinkFactory : IFactory<IEventSink> { public void RegisterFactory(IFactoryCatalog<IEventSink> catalog) { catalog.RegisterFactory("nullsink", this); } public IEventSink CreateInstance(string entry, IPlugInContext context) { IConfiguration config = context.Configuration; switch (entry.ToLower()) { case "nullsink": return new NullSink(context); default: throw new Exception("Unrecognized sink type {entry}."); } } } }
实现 Windows 插件 Kinesis 代理程序
按照以下步骤来实施 Windows 插件源的 Kinesis 代理。
创建 Windows 插件 Kinesis 代理程序
-
将实施
IEventSource<out T>接口的类添加到以前为源创建的项目。例如,使用以下代码来定义生成随机数据的源:
using System; using System.Reactive.Subjects; using System.Timers; using Amazon.KinesisTap.Core; using Microsoft.Extensions.Logging; namespace MyCompany.MySources { public class RandomSource : EventSource<RandomData>, IDisposable { private TimeSpan _rate; private int _max; private Timer _timer = null; private Random _random = new Random(); private ISubject<IEnvelope<RandomData>> _recordSubject = new Subject<IEnvelope<RandomData>>(); public RandomSource(TimeSpan rate, int max, IPlugInContext context) : base(context) { _rate = rate; _max = max; } public override void Start() { try { CleanupTimer(); _timer = new Timer(_rate.TotalMilliseconds); _timer.Elapsed += (Object source, ElapsedEventArgs args) => { var data = new RandomData() { RandomValue = _random.Next(_max) }; _recordSubject.OnNext(new Envelope<RandomData>(data)); }; _timer.AutoReset = true; _timer.Enabled = true; _logger?.LogInformation($"Random source id {this.Id} started with rate {_rate.TotalMilliseconds}."); } catch (Exception e) { _logger?.LogError($"Exception during start of RandomSource id {this.Id}: {e}"); } } public override void Stop() { try { CleanupTimer(); _logger?.LogInformation($"Random source id {this.Id} stopped."); } catch (Exception e) { _logger?.LogError($"Exception during stop of RandomSource id {this.Id}: {e}"); } } private void CleanupTimer() { if (_timer != null) { _timer.Enabled = false; _timer?.Dispose(); _timer = null; } } public override IDisposable Subscribe(IObserver<IEnvelope<RandomData>> observer) { return this._recordSubject.Subscribe(observer); } public void Dispose() { CleanupTimer(); } } }在本示例中,
RandomSource类继承自EventSource<T>类,因为它提供了Id属性。虽然本示例不支持书签,此基类也可用于实施该功能。信封提供了一种存储元数据和包装任意数据用于流式传输到接收器的方法。RandomData类在下一步中定义,表示来自此源的输出对象的类型。 -
添加类到以前定义的项目,该项目中包含从源流式传输的数据。
例如,随机数据的容器可以定义如下所示:
namespace MyCompany.MySources { public class RandomData { public int RandomValue { get; set; } } } -
编译以前定义的项目。
-
将程序集复制到 Windows 的 Kinesis 代理的安装目录。
-
创建或更新
appsettings.json配置文件,并将其放在 Windows 的 Kinesis 代理的安装目录中。 -
停止然后重新启动 Windows 的 Kinesis 代理。
-
检查当前 Kinesis 代理的 Windows 日志文件(通常位于
%PROGRAMDATA%\Amazon\AWSKinesisTap\logs目录)以确保自定义源插件没有问题。 -
确保数据到达目标 AWS 服务。
有关如何扩展DirectorySource功能来实施解析特定日志格式的功能,请参阅Amazon.KinesisTap.Uls\UlsSourceFactory.cs和Amazon.KinesisTap.Uls\UlsLogParser.cs在 Kinesis 代理程序的 Windows 源代码中。
有关如何创建提供书签功能的源的示例,请参阅Amazon.KinesisTap.Windows\WindowsSourceFactory.cs和Amazon.KinesisTap.Windows\EventLogSource.cs在 Kinesis 代理程序的 Windows 源代码中。
实现 Windows 插件接收器的 Kinesis 代理
按照以下步骤来实施 Windows 插件接收器的 Kinesis 代理。
为 Windows 插件接收器创建 Kinesis 代理程序
-
将类添加到之前定义的项目(该项目实施
IEventSink接口)。例如,以下代码实施接收器,该接收器除了对记录到达进行日志记录之外不执行任何操作,随后丢弃到达的记录。
using Amazon.KinesisTap.Core; using Microsoft.Extensions.Logging; namespace MyCompany.MySinks { public class NullSink : EventSink { public NullSink(IPlugInContext context) : base(context) { } public override void OnNext(IEnvelope envelope) { _logger.LogInformation($"Null sink {Id} received {GetRecord(envelope)}."); } public override void Start() { _logger.LogInformation($"Null sink {Id} starting."); } public override void Stop() { _logger.LogInformation($"Null sink {Id} stopped."); } } }在此示例中,
NullSink接收器类继承自EventSink类,因为它提供将记录转换为不同序列化格式(例如 JSON 和 XML)的功能。 -
编译以前定义的项目。
-
将程序集复制到 Windows 的 Kinesis 代理的安装目录。
-
创建或更新
appsettings.json配置文件,并将其放在 Windows 的 Kinesis 代理的安装目录中。例如,要使用RandomSource和NullSink自定义插件,您可以使用以下appsettings.json配置文件:{ "Sources": [ { "Id": "MyRandomSource", "SourceType": "RandomSource", "Rate": "00:00:10", "Max": 50 } ], "Sinks": [ { "Id": "MyNullSink", "SinkType": "NullSink", "Format": "json" } ], "Pipes": [ { "Id": "MyRandomToNullPipe", "SourceRef": "MyRandomSource", "SinkRef": "MyNullSink" } ] }此配置创建发送
RandomData的实例的源,其中RandomValue每隔 10 秒设置为 0 到 50 之间的随机数字。它创建一个接收器,将传入RandomData实例转换为 JSON、记录该 JSON 然后丢弃实例。请确保包含之前所定义项目中的示例工厂、RandomSource源类以及NullSink接收器类以使用示例配置文件。 -
停止然后重新启动 Windows 的 Kinesis 代理。
-
检查当前 Kinesis 代理的 Windows 日志文件(通常位于
%PROGRAMDATA%\Amazon\AWSKinesisTap\logs目录)以确保自定义接收器插件没有问题。 -
确保数据到达目标 AWS 服务。由于示例
NullSink不流式传输到 AWS 服务,您可以通过查找指示已收到记录的日志消息来验证接收器的操作正确。举例来说,您可以看到类似于下文的日志文件:
2018-10-18 12:36:36.3647 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.AWS.AWSEventSinkFactory. 2018-10-18 12:36:36.4018 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.Windows.PerformanceCounterSinkFactory. 2018-10-18 12:36:36.4018 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory MyCompany.MySinks.NullSinkFactory. 2018-10-18 12:36:36.6926 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.Core.DirectorySourceFactory. 2018-10-18 12:36:36.6926 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.ExchangeSource.ExchangeSourceFactory. 2018-10-18 12:36:36.6926 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.Uls.UlsSourceFactory. 2018-10-18 12:36:36.6926 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.Windows.WindowsSourceFactory. 2018-10-18 12:36:36.6926 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory MyCompany.MySources.RandomSourceFactory. 2018-10-18 12:36:36.9601 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.Core.Pipes.PipeFactory. 2018-10-18 12:36:37.4694 Amazon.KinesisTap.Hosting.LogManager INFO Registered factory Amazon.KinesisTap.AutoUpdate.AutoUpdateFactory. 2018-10-18 12:36:37.4807 Amazon.KinesisTap.Hosting.LogManager INFO Performance counter sink started. 2018-10-18 12:36:37.6250 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink starting. 2018-10-18 12:36:37.6250 Amazon.KinesisTap.Hosting.LogManager INFO Connected source MyRandomSource to sink MyNullSink 2018-10-18 12:36:37.6333 Amazon.KinesisTap.Hosting.LogManager INFO Random source id MyRandomSource started with rate 10000. 2018-10-18 12:36:47.8084 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":14}. 2018-10-18 12:36:57.6339 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":5}. 2018-10-18 12:37:07.6490 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":9}. 2018-10-18 12:37:17.6494 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":47}. 2018-10-18 12:37:27.6520 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":25}. 2018-10-18 12:37:37.6676 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":21}. 2018-10-18 12:37:47.6688 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":29}. 2018-10-18 12:37:57.6700 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":22}. 2018-10-18 12:38:07.6838 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":32}. 2018-10-18 12:38:17.6848 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":12}. 2018-10-18 12:38:27.6866 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":46}. 2018-10-18 12:38:37.6880 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":48}. 2018-10-18 12:38:47.6893 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":39}. 2018-10-18 12:38:57.6906 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":18}. 2018-10-18 12:39:07.6995 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":6}. 2018-10-18 12:39:17.7004 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":0}. 2018-10-18 12:39:27.7021 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":3}. 2018-10-18 12:39:37.7023 Amazon.KinesisTap.Hosting.LogManager INFO Null sink MyNullSink received {"RandomValue":19}.
如果您创建访问 AWS 服务的接收器,则可能会发现基类非常有用。对于使用AWSBufferedEventSink基类,请参阅Amazon.KinesisTap.AWS\CloudWatchLogsSink.cs在适用于 Windows 的 Kinesis 代理的源代码中。