AWS IoT Greengrass Version 1 è entrato nella fase di estensione della vita utile il 30 giugno 2023. Per ulteriori informazioni, consulta la politica AWS IoT Greengrass V1 di manutenzione. Dopo questa data, AWS IoT Greengrass V1 non rilascerà aggiornamenti che forniscano funzionalità, miglioramenti, correzioni di bug o patch di sicurezza. I dispositivi che funzionano AWS IoT Greengrass V1 non subiranno interruzioni e continueranno a funzionare e a connettersi al cloud. Ti consigliamo vivamente di eseguire la migrazione a AWS IoT Greengrass Version 2, che aggiunge nuove importanti funzionalità e supporto per piattaforme aggiuntive.
Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Utilizzalo  StreamManagerClient  per lavorare con gli stream
Le funzioni Lambda definite dall'utente in esecuzione AWS IoT Greengrass sul core possono utilizzare StreamManagerClient l'oggetto nel AWS IoT Greengrass Core SDK per creare stream in stream manager e quindi interagire con gli stream. Quando una funzione Lambda crea un flusso, definisce le Cloud AWS destinazioni, l'assegnazione delle priorità e altre politiche di esportazione e conservazione dei dati per lo stream. Per inviare dati allo stream manager, le funzioni Lambda aggiungono i dati allo stream. Se viene definita una destinazione di esportazione per lo stream, stream manager esporta il flusso automaticamente.
In genere, i client di stream manager sono funzioni Lambda definite dall'utente. Se il tuo business case lo richiede, puoi anche consentire ai processi non Lambda in esecuzione sul core Greengrass (ad esempio, un contenitore Docker) di interagire con lo stream manager. Per ulteriori informazioni, consulta Autenticazione client.
Gli snippet di questo argomento mostrano come i client StreamManagerClient chiamano i metodi per lavorare con gli stream. Per i dettagli di implementazione sui metodi e i relativi argomenti, utilizzate i collegamenti al riferimento SDK elencati dopo ogni frammento. Per i tutorial che includono una funzione Lambda di Python completa, consulta o. Esportazione di flussi di dati su Cloud AWS
(console) Esportazione di flussi di dati su Cloud AWS (CLI)
La tua funzione Lambda dovrebbe creare un'istanza StreamManagerClient all'esterno del gestore delle funzioni. Se viene creata un'istanza nel gestore, la funzione crea un client e una connessione al gestore flussi ogni volta che viene richiamata.
Se si esegue un'istanza StreamManagerClient nel gestore, è necessario chiamare esplicitamente il metodo close() quando client completa il suo lavoro. In caso contrario, client mantiene la connessione aperta e un altro thread in esecuzione fino alla chiusura dello script.
StreamManagerClient supporta le seguenti operazioni:
        Creazione del flusso di messaggi
        Per creare uno stream, una funzione Lambda definita dall'utente chiama il metodo create e passa un oggetto. MessageStreamDefinition Questo oggetto specifica il nome univoco dello stream e definisce come lo stream manager deve gestire i nuovi dati quando viene raggiunta la dimensione massima del flusso. È possibile utilizzare MessageStreamDefinition e relativi tipi di dati (ad esempio ExportDefinition, StrategyOnFull e Persistence) per definire altre proprietà del flusso. Ciò include:
        
             
             
             
             
             
             
        - 
                L'obiettivo AWS IoT Analytics, Kinesis Data AWS IoT SiteWise Streams e le destinazioni Amazon S3 per le esportazioni automatiche. Per ulteriori informazioni, consulta Esporta le configurazioni per le destinazioni supportate Cloud AWS. 
- 
                Priorità di esportazione. Stream manager esporta i flussi con priorità più alta prima dei flussi con priorità più bassa. 
- 
                Dimensione e intervallo massimi del batch per AWS IoT Analytics Kinesis Data Streams e destinazioni. AWS IoT SiteWise
Stream manager esporta i messaggi quando viene soddisfatta una delle due condizioni. 
- 
                Time-to-live (TTL). Il tempo necessario per garantire che i dati del flusso siano disponibili per l'elaborazione. È necessario assicurarsi che i dati possano essere utilizzati entro questo periodo di tempo. Questa non è una policy di eliminazione. È possibile che i dati non vengano eliminati immediatamente dopo il periodo TTL. 
- 
                Persistenza del flusso. Scegliere di salvare i flussi nel file system per mantenere i dati tra riavvii core o salvare i flussi in memoria. 
- 
                Numero di sequenza iniziale. Specificate il numero di sequenza del messaggio da utilizzare come messaggio iniziale nell'esportazione. 
 
        Per ulteriori informazioni suMessageStreamDefinition, consultate il riferimento SDK per la lingua di destinazione:
        
        StreamManagerClientfornisce anche una destinazione di destinazione che è possibile utilizzare per esportare flussi su un server HTTP. Questo target è destinato esclusivamente a scopi di test. Non è stabile né è supportato per l'uso in ambienti di produzione.
Dopo aver creato uno stream, le funzioni Lambda possono aggiungere messaggi allo stream per inviare dati per l'esportazione e leggere i messaggi dallo stream per l'elaborazione locale. Il numero di flussi creati dipende dalle funzionalità hardware e dal business case. Una strategia consiste nel creare un flusso per ogni canale di destinazione nel AWS IoT Analytics nostro flusso di dati Kinesis, sebbene sia possibile definire più destinazioni per un flusso. Un flusso ha una lunga durata.
         
            Requisiti
            Questa operazione ha i seguenti requisiti:
            
            La creazione di stream con una destinazione AWS IoT SiteWise di esportazione Amazon S3 presenta i seguenti requisiti:
Esempi
            Il frammento di codice seguente crea un flusso denominato StreamName. Definisce le proprietà del flusso nei tipi di dati subordinati. MessageStreamDefinition
            
                - Python
- 
                        client = StreamManagerClient()
 
try:
    client.create_message_stream(MessageStreamDefinition(
        name="StreamName",  # Required.
        max_size=268435456,  # Default is 256 MB.
        stream_segment_size=16777216,  # Default is 16 MB.
        time_to_live_millis=None,  # By default, no TTL is enabled.
        strategy_on_full=StrategyOnFull.OverwriteOldestData,  # Required.
        persistence=Persistence.File,  # Default is File.
        flush_on_write=False,  # Default is false.
        export_definition=ExportDefinition(  # Optional. Choose where/how the stream is exported to the Cloud AWS.
            kinesis=None,
            iot_analytics=None,
            iot_sitewise=None,
            s3_task_executor=None
        )
    ))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
 Riferimento all'SDK Python: create_message_stream | MessageStreamDefinition 
- Java
- 
                        try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    client.createMessageStream(
            new MessageStreamDefinition()
                    .withName("StreamName") // Required.
                    .withMaxSize(268435456L)  // Default is 256 MB.
                    .withStreamSegmentSize(16777216L)  // Default is 16 MB.
                    .withTimeToLiveMillis(null)  // By default, no TTL is enabled.
                    .withStrategyOnFull(StrategyOnFull.OverwriteOldestData)  // Required.
                    .withPersistence(Persistence.File)  // Default is File.
                    .withFlushOnWrite(false)  // Default is false.
                    .withExportDefinition(  // Optional. Choose where/how the stream is exported to the Cloud AWS.
                            new ExportDefinition()
                                    .withKinesis(null)
                                    .withIotAnalytics(null)
                                    .withIotSitewise(null)
                                    .withS3TaskExecutor(null)
                    )
 
    );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
 Riferimento a Java createMessageStreamSDK: | MessageStreamDefinition 
- Node.js
- 
                        const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        await client.createMessageStream(
            new MessageStreamDefinition()
                .withName("StreamName") // Required.
                .withMaxSize(268435456)  // Default is 256 MB.
                .withStreamSegmentSize(16777216)  // Default is 16 MB.
                .withTimeToLiveMillis(null)  // By default, no TTL is enabled.
                .withStrategyOnFull(StrategyOnFull.OverwriteOldestData)  // Required.
                .withPersistence(Persistence.File)  // Default is File.
                .withFlushOnWrite(false)  // Default is false.
                .withExportDefinition(  // Optional. Choose where/how the stream is exported to the Cloud AWS.
                    new ExportDefinition()
                        .withKinesis(null)
                        .withIotAnalytics(null)
                        .withIotSitewise(null)
                        .withS3TaskExecutor(null)
                )
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
 Riferimento all'SDK Node.js: | createMessageStreamMessageStreamDefinition 
Per ulteriori informazioni sulla configurazione delle destinazioni di esportazione, consulta. Esporta le configurazioni per le destinazioni supportate Cloud AWS
         
     
        Aggiunta di un messaggio
        Per inviare dati allo stream manager per l'esportazione, le funzioni Lambda aggiungono i dati allo stream di destinazione. La destinazione di esportazione determina il tipo di dati da passare a questo metodo.
         
            Requisiti
            Questa operazione presenta i seguenti requisiti:
            
            L'aggiunta di messaggi con una destinazione AWS IoT SiteWise di esportazione Amazon S3 presenta i seguenti requisiti:
Esempi
             
                AWS IoT Analytics o destinazioni di esportazione Kinesis Data Streams
                Il frammento di codice seguente aggiunge un messaggio al flusso denominato StreamName. Per le AWS IoT Analytics nostre destinazioni Kinesis Data Streams, le funzioni Lambda aggiungono un blob di dati.
                Questo frammento ha i seguenti requisiti:
                
                
                    - Python
- 
                            client = StreamManagerClient()
 
try:
    sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
 Riferimento Python SDK: append_message 
- Java
- 
                            try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes());
} catch (StreamManagerException e) {
    // Properly handle exception.
}
 Riferimento Java SDK: AppendMessage 
- Node.js
- 
                            const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array"));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
 Riferimento SDK Node.js: appendMessage 
AWS IoT SiteWise destinazioni di esportazione
                Il frammento di codice seguente aggiunge un messaggio al flusso denominato StreamName. Per le AWS IoT SiteWise destinazioni, le funzioni Lambda aggiungono un oggetto serializzato. PutAssetPropertyValueEntry Per ulteriori informazioni, consulta Esportazione in AWS IoT SiteWise.
                Quando si inviano dati a AWS IoT SiteWise, i dati devono soddisfare i requisiti dell'azione. BatchPutAssetPropertyValue Per ulteriori informazioni, consulta BatchPutAssetPropertyValue nella documentazione di riferimento dell'API AWS IoT SiteWise .
Questo frammento presenta i seguenti requisiti:
                
                
                    
                    - Python
- 
                            client = StreamManagerClient()
 
try:
    # SiteWise requires unique timestamps in all messages. Add some randomness to time and offset.
    # Note: To create a new asset property data, you should use the classes defined in the
    # greengrasssdk.stream_manager module.
    time_in_nanos = TimeInNanos(
        time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
    )
    variant = Variant(double_value=random.random())
    asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
    putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset)
    sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
 Riferimento all'SDK Python: append_message | PutAssetPropertyValueEntry 
- Java
- 
                            try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    Random rand = new Random();
    // Note: To create a new asset property data, you should use the classes defined in the
    // com.amazonaws.greengrass.streammanager.model.sitewise package.
    List<AssetPropertyValue> entries = new ArrayList<>() ;
    // IoTSiteWise requires unique timestamps in all messages. Add some randomness to time and offset.
    final int maxTimeRandomness = 60;
    final int maxOffsetRandomness = 10000;
    double randomValue = rand.nextDouble();
    TimeInNanos timestamp = new TimeInNanos()
            .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness))
            .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness)));
    AssetPropertyValue entry = new AssetPropertyValue()
            .withValue(new Variant().withDoubleValue(randomValue))
            .withQuality(Quality.GOOD)
            .withTimestamp(timestamp);
    entries.add(entry);
    PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
            .withEntryId(UUID.randomUUID().toString())
            .withPropertyAlias("PropertyAlias")
            .withPropertyValues(entries);
    long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (StreamManagerException e) {
    // Properly handle exception.
}
 Riferimento Java SDK: appendMessage | PutAssetPropertyValueEntry 
- Node.js
- 
                            const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const maxTimeRandomness = 60;
        const maxOffsetRandomness = 10000;
        const randomValue = Math.random();
        // Note: To create a new asset property data, you should use the classes defined in the
        // aws-greengrass-core-sdk StreamManager module.
        const timestamp = new TimeInNanos()
            .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness))
            .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness));
        const entry = new AssetPropertyValue()
            .withValue(new Variant().withDoubleValue(randomValue))
            .withQuality(Quality.GOOD)
            .withTimestamp(timestamp);
        const putAssetPropertyValueEntry =  new PutAssetPropertyValueEntry()
            .withEntryId(`${ENTRY_ID_PREFIX}${i}`)
            .withPropertyAlias("PropertyAlias")
            .withPropertyValues([entry]);
        const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
 Riferimento all'SDK Node.js: appendMessage | PutAssetPropertyValueEntry 
Destinazioni di esportazione Amazon S3
                Il seguente frammento aggiunge un'attività di esportazione allo stream denominato. StreamName Per le destinazioni Amazon S3, le funzioni Lambda aggiungono un oggetto serializzato che contiene informazioni sul file di input di origine e sull'S3ExportTaskDefinitionoggetto Amazon S3 di destinazione. Se l'oggetto specificato non esiste, Stream Manager lo crea per te. Per ulteriori informazioni, consulta Esportazione su Amazon S3.
                Questo frammento ha i seguenti requisiti:
                
                
                    
                    - Python
- 
                            client = StreamManagerClient()
 
try:
    # Append an Amazon S3 Task definition and print the sequence number.
    s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName")
    sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
 Riferimento all'SDK Python: append_message | S3 ExportTaskDefinition 
- Java
- 
                            try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    // Append an Amazon S3 export task definition and print the sequence number.
    S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition()
        .withBucket("BucketName")
        .withKey("KeyName")
        .withInputUrl("URLToFile");
    long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition));
} catch (StreamManagerException e) {
    // Properly handle exception.
}
 Riferimento a Java SDK: appendMessage | S3 ExportTaskDefinition 
- Node.js
- 
                            const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
     // Append an Amazon S3 export task definition and print the sequence number.
     const taskDefinition = new S3ExportTaskDefinition()
        .withBucket("BucketName")
        .withKey("KeyName")
        .withInputUrl("URLToFile");
        const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition)));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
 Riferimento all'SDK Node.js: appendMessage | S3 ExportTaskDefinition 
 
        Lettura di messaggi
        Leggi i messaggi da uno stream.
         
            Requisiti
            Questa operazione ha i seguenti requisiti:
            
         
         
            Esempi
            Il frammento di codice seguente legge i messaggi dal flusso denominato StreamName. Il metodo di lettura accetta un oggetto ReadMessagesOptions facoltativo che specifica il numero di sequenza da cui iniziare la lettura, i numeri minimo e massimo da leggere e un timeout per la lettura dei messaggi.
            
                - Python
- 
                        client = StreamManagerClient()
 
try:
    message_list = client.read_messages(
        stream_name="StreamName",
        # By default, if no options are specified, it tries to read one message from the beginning of the stream.
        options=ReadMessagesOptions(
            desired_start_sequence_number=100,
            # Try to read from sequence number 100 or greater. By default, this is 0.
            min_message_count=10,
            # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
            max_message_count=100,  # Accept up to 100 messages. By default this is 1.
            read_timeout_millis=5000
            # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
        )
    )
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
 Riferimento all'SDK Python: read_messages | ReadMessagesOptions 
- Java
- 
                        try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    List<Message> messages = client.readMessages("StreamName",
            // By default, if no options are specified, it tries to read one message from the beginning of the stream.
            new ReadMessagesOptions()
                    // Try to read from sequence number 100 or greater. By default this is 0.
                    .withDesiredStartSequenceNumber(100L)
                    // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
                    .withMinMessageCount(10L)
                    // Accept up to 100 messages. By default this is 1.
                    .withMaxMessageCount(100L)
                    // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
                    .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis())
    );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
 Riferimento a Java SDK: ReadMessages | ReadMessagesOptions 
- Node.js
- 
                        const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const messages = await client.readMessages("StreamName",
            // By default, if no options are specified, it tries to read one message from the beginning of the stream.
            new ReadMessagesOptions()
                // Try to read from sequence number 100 or greater. By default this is 0.
                .withDesiredStartSequenceNumber(100)
                // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1.
                .withMinMessageCount(10)
                // Accept up to 100 messages. By default this is 1.
                .withMaxMessageCount(100)
                // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
                .withReadTimeoutMillis(5 * 1000)
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
 Riferimento all'SDK Node.js: ReadMessages | ReadMessagesOptions 
 
        Visualizzazione dell'elenco di flussi
        Ottieni l'elenco degli stream nello stream manager.
         
            Requisiti
            Questa operazione ha i seguenti requisiti:
            
         
         
            Esempi
            Il frammento di codice seguente ottiene un elenco dei flussi (per nome) in stream manager.
            
                - Python
- 
                        client = StreamManagerClient()
 
try:
    stream_names = client.list_streams()
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
 Riferimento all'SDK Python: list_streams 
- Java
- 
                        try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    List<String> streamNames = client.listStreams();
} catch (StreamManagerException e) {
    // Properly handle exception.
}
 Riferimento Java SDK: ListStreams 
- Node.js
- 
                        const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const streams = await client.listStreams();
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
 Riferimento all'SDK Node.js: ListStreams 
 
        Descrizione del flusso di messaggi
        Ottieni i metadati relativi a uno stream, tra cui la definizione, la dimensione e lo stato dell'esportazione.
         
            Requisiti
            Questa operazione ha i seguenti requisiti:
            
         
         
            Esempi
            Il frammento di codice seguente ottiene i metadati relativi al flusso denominato StreamName, inclusi la definizione, le dimensioni e gli stati di esportatore del flusso.
            
                - Python
- 
                        client = StreamManagerClient()
 
try:
    stream_description = client.describe_message_stream(stream_name="StreamName")
    if stream_description.export_statuses[0].error_message:
        # The last export of export destination 0 failed with some error
        # Here is the last sequence number that was successfully exported
        stream_description.export_statuses[0].last_exported_sequence_number
 
    if (stream_description.storage_status.newest_sequence_number >
            stream_description.export_statuses[0].last_exported_sequence_number):
        pass
        # The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
 Riferimento all'SDK Python: describe_message_stream 
- Java
- 
                        try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    MessageStreamInfo description = client.describeMessageStream("StreamName");
    String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage();
    if (lastErrorMessage != null && !lastErrorMessage.equals("")) {
        // The last export of export destination 0 failed with some error.
        // Here is the last sequence number that was successfully exported.
        description.getExportStatuses().get(0).getLastExportedSequenceNumber();
    }
 
    if (description.getStorageStatus().getNewestSequenceNumber() >
            description.getExportStatuses().get(0).getLastExportedSequenceNumber()) {
        // The end of the stream is ahead of the last exported sequence number.
    }
} catch (StreamManagerException e) {
    // Properly handle exception.
}
 Riferimento a Java SDK: describeMessageStream 
- Node.js
- 
                        const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const description = await client.describeMessageStream("StreamName");
        const lastErrorMessage = description.exportStatuses[0].errorMessage;
        if (lastErrorMessage) {
            // The last export of export destination 0 failed with some error.
            // Here is the last sequence number that was successfully exported.
            description.exportStatuses[0].lastExportedSequenceNumber;
        }
 
        if (description.storageStatus.newestSequenceNumber >
            description.exportStatuses[0].lastExportedSequenceNumber) {
            // The end of the stream is ahead of the last exported sequence number.
        }
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
 Riferimento all'SDK Node.js: describeMessageStream 
 
        Aggiorna il flusso di messaggi
        Aggiorna le proprietà di uno stream esistente. Potresti voler aggiornare uno stream se i tuoi requisiti cambiano dopo la creazione dello stream. Per esempio:
        
             
             
             
        - 
                Aggiungi una nuova configurazione di esportazione per una Cloud AWS destinazione. 
- 
                Aumenta la dimensione massima di uno stream per modificare il modo in cui i dati vengono esportati o conservati. Ad esempio, la dimensione dello stream in combinazione con la tua strategia sulle impostazioni complete potrebbe comportare l'eliminazione o il rifiuto dei dati prima che lo stream manager possa elaborarli. 
- 
                Metti in pausa e riprendi le esportazioni, ad esempio se le attività di esportazione richiedono molto tempo e desideri razionare i dati di caricamento. 
 
        Le tue funzioni Lambda seguono questo processo di alto livello per aggiornare uno stream:
        
             
             
             
        - 
                Ottieni la descrizione dello stream. 
- 
                Aggiorna le proprietà di destinazione sugli oggetti corrispondenti MessageStreamDefinitione subordinati.
 
- 
                Passa l'aggiornamentoMessageStreamDefinition. Assicurati di includere le definizioni complete degli oggetti per lo stream aggiornato. Le proprietà non definite tornano ai valori predefiniti.
 È possibile specificare il numero di sequenza del messaggio da utilizzare come messaggio iniziale nell'esportazione. 
 
         
            Requisiti
            Questa operazione presenta i seguenti requisiti:
            
         
         
            Esempi
            Il seguente frammento aggiorna lo stream denominato. StreamName Aggiorna più proprietà di un flusso che esporta in Kinesis Data Streams.
            
                
                - Python
- 
                        client = StreamManagerClient()
 
try:
    message_stream_info = client.describe_message_stream(STREAM_NAME)
    message_stream_info.definition.max_size=536870912
    message_stream_info.definition.stream_segment_size=33554432
    message_stream_info.definition.time_to_live_millis=3600000
    message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
    message_stream_info.definition.persistence=Persistence.Memory
    message_stream_info.definition.flush_on_write=False
    message_stream_info.definition.export_definition.kinesis=
        [KinesisConfig(  
            # Updating Export definition to add a Kinesis Stream configuration.
            identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
    client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
 Riferimento a Python SDK: | updateMessageStreamMessageStreamDefinition 
- Java
- 
                        try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME);
    // Update the message stream with new values.
    client.updateMessageStream(
        messageStreamInfo.getDefinition()
            .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
            // Max Size update should be greater than initial Max Size defined in Create Message Stream request
            .withMaxSize(536870912L) // Update Max Size to 512 MB.
            .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB.
            .withFlushOnWrite(true) // Update flush on write to true.
            .withPersistence(Persistence.Memory) // Update the persistence to Memory.
            .withTimeToLiveMillis(3600000L)  // Update TTL to 1 hour.
            .withExportDefinition(
                // Optional. Choose where/how the stream is exported to the Cloud AWS.
                messageStreamInfo.getDefinition().getExportDefinition().
                    // Updating Export definition to add a Kinesis Stream configuration.
                    .withKinesis(new ArrayList<KinesisConfig>() {{
                        add(new KinesisConfig()
                            .withIdentifier(EXPORT_IDENTIFIER)
                            .withKinesisStreamName("test"));
                        }})
            );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
 Riferimento a Java SDK: update_message_stream | MessageStreamDefinition 
- Node.js
- 
                        const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const messageStreamInfo = await c.describeMessageStream(STREAM_NAME);
        await client.updateMessageStream(
            messageStreamInfo.definition
                // Max Size update should be greater than initial Max Size defined in Create Message Stream request
                .withMaxSize(536870912)  // Default is 256 MB. Updating Max Size to 512 MB.
                .withStreamSegmentSize(33554432)  // Default is 16 MB. Updating Segment Size to 32 MB.
                .withTimeToLiveMillis(3600000)  // By default, no TTL is enabled. Update TTL to 1 hour.
                .withStrategyOnFull(StrategyOnFull.RejectNewData)  // Required. Updating Strategy on full to reject new data.
                .withPersistence(Persistence.Memory)  // Default is File. Update the persistence to Memory
                .withFlushOnWrite(true)  // Default is false. Updating to true.
                .withExportDefinition(  
                    // Optional. Choose where/how the stream is exported to the Cloud AWS.
                    messageStreamInfo.definition.exportDefinition
                        // Updating Export definition to add a Kinesis Stream configuration.
                        .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())])
                )
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
 Riferimento all'updateMessageStreamSDK Node.js: | MessageStreamDefinition 
Vincoli per l'aggiornamento degli stream
            I seguenti vincoli si applicano all'aggiornamento dei flussi. A meno che non sia indicato nell'elenco seguente, gli aggiornamenti hanno effetto immediato.
            
                 
                 
                 
                 
                 
                 
                 
                 
            - 
                    Non puoi aggiornare la persistenza di uno stream. Per modificare questo comportamento, elimina lo stream e crea uno stream che definisca la nuova politica di persistenza. 
- 
                    Puoi aggiornare la dimensione massima di uno stream solo nelle seguenti condizioni: 
                         
                         
                    - 
                            La dimensione massima deve essere maggiore o uguale alla dimensione corrente dello stream. Per trovare queste informazioni, descrivi lo stream e poi controlla lo stato di archiviazione dell'MessageStreamInfooggetto restituito.
 
- 
                            La dimensione massima deve essere maggiore o uguale alla dimensione del segmento dello stream. 
 
 
- 
                    Puoi aggiornare la dimensione del segmento di stream a un valore inferiore alla dimensione massima dello stream. L'impostazione aggiornata si applica ai nuovi segmenti. 
- 
                    Gli aggiornamenti alla proprietà time to live (TTL) si applicano alle nuove operazioni di aggiunta. Se riduci questo valore, stream manager potrebbe anche eliminare i segmenti esistenti che superano il TTL. 
- 
                    Gli aggiornamenti alla strategia sulla proprietà completa si applicano alle nuove operazioni di aggiunta. Se imposti la strategia per sovrascrivere i dati più vecchi, stream manager potrebbe anche sovrascrivere i segmenti esistenti in base alla nuova impostazione. 
- 
                    Gli aggiornamenti alla proprietà flush on write si applicano ai nuovi messaggi. 
- 
                    Gli aggiornamenti alle configurazioni di esportazione si applicano alle nuove esportazioni. La richiesta di aggiornamento deve includere tutte le configurazioni di esportazione che si desidera supportare. Altrimenti, stream manager le elimina. 
                         
                         
                         
                    - 
                            Quando aggiorni una configurazione di esportazione, specifica l'identificatore della configurazione di esportazione di destinazione. 
- 
                            Per aggiungere una configurazione di esportazione, specificate un identificatore univoco per la nuova configurazione di esportazione. 
- 
                            Per eliminare una configurazione di esportazione, omettete la configurazione di esportazione. 
 
 
- 
                    Per aggiornare il numero di sequenza iniziale di una configurazione di esportazione in uno stream, è necessario specificare un valore inferiore al numero di sequenza più recente. Per trovare queste informazioni, descrivi lo stream e quindi controlla lo stato di archiviazione dell'MessageStreamInfooggetto restituito.
 
 
         
     
        Eliminazione del flusso di messaggi
        Elimina un flusso. Quando si elimina un flusso, tutti i dati memorizzati per il flusso vengono eliminati dal disco.
         
            Requisiti
            Questa operazione ha i seguenti requisiti:
            
         
         
            Esempi
            Il frammento di codice seguente elimina il flusso denominato StreamName.
            
                - Python
- 
                        client = StreamManagerClient()
 
try:
    client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
 Riferimento all'SDK Python: deleteMessageStream 
- Java
- 
                        try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    client.deleteMessageStream("StreamName");
} catch (StreamManagerException e) {
    // Properly handle exception.
}
 Riferimento Java SDK: delete_message_stream 
- Node.js
- 
                        const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        await client.deleteMessageStream("StreamName");
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
 Riferimento all'SDK Node.js: deleteMessageStream 
Consulta anche