本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 StreamManagerClient 搭配串流
在 Greengrass 核心裝置上執行的使用者定義 Greengrass 元件可以使用 Stream Manager SDK 中的 StreamManagerClient 物件,在串流管理員中建立串流,然後與串流互動。當元件建立串流時,它會定義串流的 AWS 雲端 目的地、優先順序和其他匯出和資料保留政策。若要將資料傳送至串流管理員,元件會將資料附加至串流。如果已定義串流的匯出目的地,串流管理員會自動匯出串流。
一般而言,串流管理員的用戶端是使用者定義的 Greengrass 元件。如果您的商業案例需要,您也可以允許在 Greengrass 核心 (例如 Docker 容器) 上執行的非元件程序與串流管理員互動。如需詳細資訊,請參閱用戶端身分驗證。
本主題中的程式碼片段會示範用戶端如何呼叫StreamManagerClient方法以使用串流。如需方法及其引數的實作詳細資訊,請使用每個程式碼片段之後列出的 SDK 參考連結。
如果您在 Lambda 函數中使用串流管理員,您的 Lambda 函數應該在函數處理常式StreamManagerClient之外執行個體化。如果在處理常式內執行個體化,則該函數在每次叫用時都會建立 client 以及與串流管理員的連線。
如果您在處理常式內將 StreamManagerClient 執行個體化,則必須在 client 完成其工作時明確呼叫 close() 方法。否則,client 會將連線保持在開啟狀態,並將另一個執行緒保持在執行狀態,直到指令碼結束為止。
StreamManagerClient 支援下列操作:
建立訊息串流
若要建立串流,使用者定義的 Greengrass 元件會呼叫建立方法,並在MessageStreamDefinition物件中傳遞。此物件會指定串流的唯一名稱,並定義串流管理員在達到串流大小上限時應如何處理新資料。您可以使用 MessageStreamDefinition 及其資料類型 (例如 ExportDefinition、StrategyOnFull 和 Persistence) 來定義其他串流屬性。其中包含:
-
自動匯出的目標 AWS IoT SiteWise、 AWS IoT Analytics Kinesis Data Streams 和 Amazon S3 目的地。如需詳細資訊,請參閱匯出支援 AWS 雲端 目的地的組態。
-
匯出優先順序。串流管理員會先匯出優先順序較高的串流,然後再匯出較低的串流。
-
AWS IoT Analytics、Kinesis Data Streams 和 AWS IoT SiteWise 目的地的批次大小上限和批次間隔。符合任一條件時,串流管理員會匯出訊息。
-
存留時間 (TTL)。保證串流資料可供處理的時間量。您應該確定資料可以在這段期間內使用。這不是刪除政策。TTL 期間後,資料可能不會立即刪除。
-
串流持久性。選擇此選項可將串流儲存至檔案系統,以便在核心重新啟動期間保留資料,或將串流儲存在記憶體中。
-
起始序號。指定要用作匯出中起始訊息的訊息序號。
如需 的詳細資訊MessageStreamDefinition,請參閱目標語言的 SDK 參考:
StreamManagerClient 也提供目標目的地,可用來將串流匯出至 HTTP 伺服器。此目標僅供測試之用。它不穩定或不支援在生產環境中使用。
建立串流後,您的 Greengrass 元件可以將訊息附加到串流,以傳送資料從串流匯出和讀取訊息以進行本機處理。您建立的串流數量取決於您的硬體功能和商業案例。其中一個策略是為 AWS IoT Analytics 或 Kinesis 資料串流中的每個目標頻道建立串流,但您可以定義串流的多個目標。串流的生命週期相當耐久。
要求
此操作有下列需求:
範例
以下程式碼片段會建立名為 StreamName 的串流。它定義 MessageStreamDefinition和次級資料類型中的串流屬性。
- Python
-
client = StreamManagerClient()
try:
client.create_message_stream(MessageStreamDefinition(
name="StreamName", # Required.
max_size=268435456, # Default is 256 MB.
stream_segment_size=16777216, # Default is 16 MB.
time_to_live_millis=None, # By default, no TTL is enabled.
strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required.
persistence=Persistence.File, # Default is File.
flush_on_write=False, # Default is false.
export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the AWS 雲端.
kinesis=None,
iot_analytics=None,
iot_sitewise=None,
s3_task_executor=None
)
))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 參考:create_message_stream | MessageStreamDefinition
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456L) // Default is 256 MB.
.withStreamSegmentSize(16777216L) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS 雲端.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSitewise(null)
.withS3(null)
)
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java 開發套件參考:createMessageStream | MessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456) // Default is 256 MB.
.withStreamSegmentSize(16777216) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS 雲端.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSiteWise(null)
.withS3(null)
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK 參考:createMessageStream | MessageStreamDefinition
如需設定匯出目的地的詳細資訊,請參閱 匯出支援 AWS 雲端 目的地的組態。
附加訊息
若要將資料傳送至串流管理員以進行匯出,您的 Greengrass 元件會將資料附加至目標串流。匯出目的地會決定要傳遞至此方法的資料類型。
要求
此操作有下列需求:
範例
AWS IoT Analytics 或 Kinesis Data Streams 匯出目的地
下列程式碼片段附加一個訊息到名為 StreamName 的串流。對於 AWS IoT Analytics 或 Kinesis Data Streams 目的地,您的 Greengrass 元件會附加資料的 Blob。
此程式碼片段有下列需求:
- Python
-
client = StreamManagerClient()
try:
sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 參考:end_message
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes());
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK 參考:appendMessage
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array"));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK 參考:appendMessage
AWS IoT SiteWise 匯出目的地
下列程式碼片段附加一個訊息到名為 StreamName 的串流。對於 AWS IoT SiteWise 目的地,您的 Greengrass 元件會附加序列化PutAssetPropertyValueEntry物件。如需詳細資訊,請參閱匯出至 AWS IoT SiteWise。
當您傳送資料至 時 AWS IoT SiteWise,您的資料必須符合 BatchPutAssetPropertyValue動作的要求。如需詳細資訊,請參閱《AWS IoT SiteWise API 參考》中的 BatchPutAssetPropertyValue。
此程式碼片段有下列需求:
- Python
-
client = StreamManagerClient()
try:
# SiteWise requires unique timestamps in all messages and also needs timestamps not earlier
# than 10 minutes in the past. Add some randomness to time and offset.
# Note: To create a new asset property data, you should use the classes defined in the
# greengrasssdk.stream_manager module.
time_in_nanos = TimeInNanos(
time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
)
variant = Variant(double_value=random.random())
asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset)
sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 參考:end_message | PutAssetPropertyValueEntry
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
Random rand = new Random();
// Note: To create a new asset property data, you should use the classes defined in the
// com.amazonaws.greengrass.streammanager.model.sitewise package.
List<AssetPropertyValue> entries = new ArrayList<>() ;
// IoTSiteWise requires unique timestamps in all messages and also needs timestamps not earlier
// than 10 minutes in the past. Add some randomness to time and offset.
final int maxTimeRandomness = 60;
final int maxOffsetRandomness = 10000;
double randomValue = rand.nextDouble();
TimeInNanos timestamp = new TimeInNanos()
.withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness))
.withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness)));
AssetPropertyValue entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
entries.add(entry);
PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(UUID.randomUUID().toString())
.withPropertyAlias("PropertyAlias")
.withPropertyValues(entries);
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java 開發套件參考:appendMessage | PutAssetPropertyValueEntry
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const maxTimeRandomness = 60;
const maxOffsetRandomness = 10000;
const randomValue = Math.random();
// Note: To create a new asset property data, you should use the classes defined in the
// aws-greengrass-core-sdk StreamManager module.
const timestamp = new TimeInNanos()
.withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness))
.withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness));
const entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(`${ENTRY_ID_PREFIX}${i}`)
.withPropertyAlias("PropertyAlias")
.withPropertyValues([entry]);
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK 參考:appendMessage | PutAssetPropertyValueEntry
Amazon S3 匯出目的地
下列程式碼片段會將匯出任務附加至名為 的串流StreamName。對於 Amazon S3 目的地,您的 Greengrass 元件會附加序列化S3ExportTaskDefinition物件,其中包含來源輸入檔案和目標 Amazon S3 物件的相關資訊。如果指定的物件不存在,串流管理員會為您建立它。如需詳細資訊,請參閱匯出至 Amazon S3。
此程式碼片段有下列需求:
- Python
-
client = StreamManagerClient()
try:
# Append an Amazon S3 Task definition and print the sequence number.
s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName")
sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 參考:end_message | S3ExportTaskDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
// Append an Amazon S3 export task definition and print the sequence number.
S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK 參考:appendMessage | S3ExportTaskDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
// Append an Amazon S3 export task definition and print the sequence number.
const taskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition)));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK 參考:appendMessage | S3ExportTaskDefinition
讀取訊息
從串流讀取訊息。
要求
此操作有下列需求:
範例
下列程式碼片段可從名為 StreamName 的串流讀取訊息。讀取方法需要一個選用的 ReadMessagesOptions 物件以指定序號,從要讀取的最小、最大數字和讀取訊息的逾時開始讀取。
- Python
-
client = StreamManagerClient()
try:
message_list = client.read_messages(
stream_name="StreamName",
# By default, if no options are specified, it tries to read one message from the beginning of the stream.
options=ReadMessagesOptions(
desired_start_sequence_number=100,
# Try to read from sequence number 100 or greater. By default, this is 0.
min_message_count=10,
# Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
max_message_count=100, # Accept up to 100 messages. By default this is 1.
read_timeout_millis=5000
# Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
)
)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 參考:Read_messages | ReadMessagesOptions
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
List<Message> messages = client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100L)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
.withMinMessageCount(10L)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100L)
// Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis())
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK 參考:readMessages | ReadMessagesOptions
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messages = await client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1.
.withMinMessageCount(10)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100)
// Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(5 * 1000)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK 參考:readMessages | ReadMessagesOptions
列出串流
取得串流管理員中的串流清單。
要求
此操作有下列需求:
範例
下列程式碼片段可獲取串流管理員中的串流清單 (依據名稱)。
- Python
-
client = StreamManagerClient()
try:
stream_names = client.list_streams()
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 參考:list_streams
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
List<String> streamNames = client.listStreams();
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK 參考:listStreams
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const streams = await client.listStreams();
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK 參考:listStreams
描述訊息串流
取得串流的中繼資料,包括串流定義、大小和匯出狀態。
要求
此操作有下列需求:
範例
下列程式碼片段可獲取名為 StreamName 的串流相關的中繼資料,包括串流定義、大小和匯出工具狀態。
- Python
-
client = StreamManagerClient()
try:
stream_description = client.describe_message_stream(stream_name="StreamName")
if stream_description.export_statuses[0].error_message:
# The last export of export destination 0 failed with some error
# Here is the last sequence number that was successfully exported
stream_description.export_statuses[0].last_exported_sequence_number
if (stream_description.storage_status.newest_sequence_number >
stream_description.export_statuses[0].last_exported_sequence_number):
pass
# The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 參考: describe_message_stream
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
MessageStreamInfo description = client.describeMessageStream("StreamName");
String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage();
if (lastErrorMessage != null && !lastErrorMessage.equals("")) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.getExportStatuses().get(0).getLastExportedSequenceNumber();
}
if (description.getStorageStatus().getNewestSequenceNumber() >
description.getExportStatuses().get(0).getLastExportedSequenceNumber()) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK 參考: describeMessageStream
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const description = await client.describeMessageStream("StreamName");
const lastErrorMessage = description.exportStatuses[0].errorMessage;
if (lastErrorMessage) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.exportStatuses[0].lastExportedSequenceNumber;
}
if (description.storageStatus.newestSequenceNumber >
description.exportStatuses[0].lastExportedSequenceNumber) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK 參考: describeMessageStream
更新訊息串流
更新現有串流的屬性。如果您的需求在建立串流之後變更,建議您更新串流。例如:
您的 Greengrass 元件遵循此高階程序來更新串流:
-
取得串流的描述。
-
更新對應MessageStreamDefinition和次級物件的目標屬性。
-
在更新的 中傳遞 MessageStreamDefinition。請務必包含更新串流的完整物件定義。未定義的屬性會還原為預設值。
您可以指定要用作匯出中起始訊息的訊息序號。
要求
此操作有下列需求:
範例
下列程式碼片段會更新名為 的串流StreamName。它會更新匯出至 Kinesis Data Streams 之串流的多個屬性。
- Python
-
client = StreamManagerClient()
try:
message_stream_info = client.describe_message_stream(STREAM_NAME)
message_stream_info.definition.max_size=536870912
message_stream_info.definition.stream_segment_size=33554432
message_stream_info.definition.time_to_live_millis=3600000
message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
message_stream_info.definition.persistence=Persistence.Memory
message_stream_info.definition.flush_on_write=False
message_stream_info.definition.export_definition.kinesis=
[KinesisConfig(
# Updating Export definition to add a Kinesis Stream configuration.
identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 參考:updateMessageStream | MessageStreamDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME);
// Update the message stream with new values.
client.updateMessageStream(
messageStreamInfo.getDefinition()
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912L) // Update Max Size to 512 MB.
.withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB.
.withFlushOnWrite(true) // Update flush on write to true.
.withPersistence(Persistence.Memory) // Update the persistence to Memory.
.withTimeToLiveMillis(3600000L) // Update TTL to 1 hour.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the AWS 雲端.
messageStreamInfo.getDefinition().getExportDefinition().
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis(new ArrayList<KinesisConfig>() {{
add(new KinesisConfig()
.withIdentifier(EXPORT_IDENTIFIER)
.withKinesisStreamName("test"));
}})
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK 參考:update_message_stream | MessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messageStreamInfo = await c.describeMessageStream(STREAM_NAME);
await client.updateMessageStream(
messageStreamInfo.definition
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB.
.withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB.
.withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour.
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
.withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory
.withFlushOnWrite(true) // Default is false. Updating to true.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the AWS 雲端.
messageStreamInfo.definition.exportDefinition
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())])
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK 參考:updateMessageStream | MessageStreamDefinition
更新串流的限制
更新串流時,適用下列限制條件。除非下列清單中另有說明,否則更新會立即生效。
-
您無法更新串流的持久性。若要變更此行為,請刪除串流並建立定義新持久性政策的串流。
-
您只能在下列情況更新串流的大小上限:
-
您可以將串流區段大小更新為小於串流大小上限的值。更新的設定會套用至新的客群。
-
存留時間 (TTL) 屬性的更新適用於新的附加操作。如果您降低此值,串流管理員也可能會刪除超過 TTL 的現有區段。
-
完整屬性策略的更新適用於新的附加操作。如果您設定策略來覆寫最舊的資料,串流管理員也可能會根據新設定覆寫現有的區段。
-
對寫入屬性排清的更新會套用至新訊息。
-
匯出組態的更新適用於新的匯出。更新請求必須包含您要支援的所有匯出組態。否則,串流管理員會刪除它們。
-
當您更新匯出組態時,請指定目標匯出組態的識別符。
-
若要新增匯出組態,請指定新匯出組態的唯一識別符。
-
若要刪除匯出組態,請省略匯出組態。
-
若要更新串流中匯出組態的起始序號,您必須指定小於最新序號的值。若要尋找此資訊,請描述串流,然後檢查傳回MessageStreamInfo物件的儲存狀態。
刪除訊息串流
刪除串流。刪除串流時,磁碟中該串流的所有儲存資料都會刪除。
要求
此操作有下列需求:
範例
下列程式碼片段會刪除名為 StreamName 的串流。
- Python
-
client = StreamManagerClient()
try:
client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 參考:deleteMessageStream
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
client.deleteMessageStream("StreamName");
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK 參考:Delete_message_stream
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.deleteMessageStream("StreamName");
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK 參考:deleteMessageStream
另請參閱