Sono disponibili altri esempi per SDK AWS nel repository GitHub della documentazione degli esempi per SDK AWS
Esempi per l’API Servizio gestito per Apache Flink con SDK per Python (Boto3)
Gli esempi di codice seguenti mostrano come eseguire operazioni e implementare scenari comuni utilizzando AWS SDK per Python (Boto3) con Servizio gestito da Amazon per Apache Flink.
Le azioni sono estratti di codice da programmi più grandi e devono essere eseguite nel contesto. Sebbene le operazioni mostrino come richiamare le singole funzioni del servizio, è possibile visualizzarle contestualizzate negli scenari correlati.
Ogni esempio include un link al codice sorgente completo, in cui vengono fornite le istruzioni su come configurare ed eseguire il codice nel contesto.
Argomenti
Azioni
L’esempio di codice seguente mostra come utilizzare AddApplicationInput.
- SDK per Python (Boto3)
-
Nota
Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def add_input(self, input_prefix, stream_arn, input_schema): """ Adds an input stream to the application. The input stream data is mapped to an in-application stream that can be processed by your code running in Kinesis Data Analytics. :param input_prefix: The prefix prepended to in-application input stream names. :param stream_arn: The ARN of the input stream. :param input_schema: A schema that maps the data in the input stream to the runtime environment. This can be automatically generated by using `discover_input_schema` or you can create it yourself. :return: Metadata about the newly added input. """ try: response = self.analytics_client.add_application_input( ApplicationName=self.name, CurrentApplicationVersionId=self.version_id, Input={ "NamePrefix": input_prefix, "KinesisStreamsInput": {"ResourceARN": stream_arn}, "InputSchema": input_schema, }, ) self.version_id = response["ApplicationVersionId"] logger.info("Add input stream %s to application %s.", stream_arn, self.name) except ClientError: logger.exception( "Couldn't add input stream %s to application %s.", stream_arn, self.name ) raise else: return response-
Per informazioni dettagliate sull’API, consulta AddApplicationInput nella documentazione di riferimento dell’API AWS SDK per Python (Boto3).
-
L’esempio di codice seguente mostra come utilizzare AddApplicationOutput.
- SDK per Python (Boto3)
-
Nota
Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def add_output(self, in_app_stream_name, output_arn): """ Adds an output stream to the application. Kinesis Data Analytics maps data from the specified in-application stream to the output stream. :param in_app_stream_name: The name of the in-application stream to map to the output stream. :param output_arn: The ARN of the output stream. :return: A list of metadata about the output resources currently assigned to the application. """ try: response = self.analytics_client.add_application_output( ApplicationName=self.name, CurrentApplicationVersionId=self.version_id, Output={ "Name": in_app_stream_name, "KinesisStreamsOutput": {"ResourceARN": output_arn}, "DestinationSchema": {"RecordFormatType": "JSON"}, }, ) outputs = response["OutputDescriptions"] self.version_id = response["ApplicationVersionId"] logging.info( "Added output %s to %s, which now has %s outputs.", output_arn, self.name, len(outputs), ) except ClientError: logger.exception("Couldn't add output %s to %s.", output_arn, self.name) raise else: return outputs-
Per informazioni dettagliate sull’API, consulta AddApplicationOutput nella documentazione di riferimento dell’API AWS SDK per Python (Boto3).
-
L’esempio di codice seguente mostra come utilizzare CreateApplication.
- SDK per Python (Boto3)
-
Nota
Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def create(self, app_name, role_arn, env="SQL-1_0"): """ Creates a Kinesis Data Analytics application. :param app_name: The name of the application. :param role_arn: The ARN of a role that can be assumed by Kinesis Data Analytics and grants needed permissions. :param env: The runtime environment of the application, such as SQL. Code uploaded to the application runs in this environment. :return: Metadata about the newly created application. """ try: response = self.analytics_client.create_application( ApplicationName=app_name, RuntimeEnvironment=env, ServiceExecutionRole=role_arn, ) details = response["ApplicationDetail"] self._update_details(details) logger.info("Application %s created.", app_name) except ClientError: logger.exception("Couldn't create application %s.", app_name) raise else: return details-
Per informazioni dettagliate sull’API, consulta CreateApplication nella documentazione di riferimento dell’API AWS SDK per Python (Boto3).
-
L’esempio di codice seguente mostra come utilizzare DeleteApplication.
- SDK per Python (Boto3)
-
Nota
Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def delete(self): """ Deletes an application. """ try: self.analytics_client.delete_application( ApplicationName=self.name, CreateTimestamp=self.create_timestamp ) logger.info("Deleted application %s.", self.name) except ClientError: logger.exception("Couldn't delete application %s.", self.name) raise-
Per informazioni dettagliate sull’API, consulta DeleteApplication nella documentazione di riferimento dell’API AWS SDK per Python (Boto3).
-
L’esempio di codice seguente mostra come utilizzare DescribeApplication.
- SDK per Python (Boto3)
-
Nota
Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def describe(self, name): """ Gets metadata about an application. :param name: The name of the application to look up. :return: Metadata about the application. """ try: response = self.analytics_client.describe_application(ApplicationName=name) details = response["ApplicationDetail"] self._update_details(details) logger.info("Got metadata for application %s.", name) except ClientError: logger.exception("Couldn't get metadata for application %s.", name) raise else: return details-
Per informazioni dettagliate sull’API, consulta DescribeApplication nella documentazione di riferimento dell’API AWS SDK per Python (Boto3).
-
L’esempio di codice seguente mostra come utilizzare DescribeApplicationSnapshot.
- SDK per Python (Boto3)
-
Nota
Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def describe_snapshot(self, application_name, snapshot_name): """ Gets metadata about a previously saved application snapshot. :param application_name: The name of the application. :param snapshot_name: The name of the snapshot. :return: Metadata about the snapshot. """ try: response = self.analytics_client.describe_application_snapshot( ApplicationName=application_name, SnapshotName=snapshot_name ) snapshot = response["SnapshotDetails"] logger.info( "Got metadata for snapshot %s of application %s.", snapshot_name, application_name, ) except ClientError: logger.exception( "Couldn't get metadata for snapshot %s of application %s.", snapshot_name, application_name, ) raise else: return snapshot-
Per informazioni dettagliate sull’API, consulta DescribeApplicationSnapshot nella documentazione di riferimento dell’API AWS SDK per Python (Boto3).
-
L’esempio di codice seguente mostra come utilizzare DiscoverInputSchema.
- SDK per Python (Boto3)
-
Nota
Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def discover_input_schema(self, stream_arn, role_arn): """ Discovers a schema that maps data in a stream to a format that is usable by an application's runtime environment. The stream must be active and have enough data moving through it for the service to sample. The returned schema can be used when you add the stream as an input to the application or you can write your own schema. :param stream_arn: The ARN of the stream to map. :param role_arn: A role that lets Kinesis Data Analytics read from the stream. :return: The discovered schema of the data in the input stream. """ try: response = self.analytics_client.discover_input_schema( ResourceARN=stream_arn, ServiceExecutionRole=role_arn, InputStartingPositionConfiguration={"InputStartingPosition": "NOW"}, ) schema = response["InputSchema"] logger.info("Discovered input schema for stream %s.", stream_arn) except ClientError: logger.exception( "Couldn't discover input schema for stream %s.", stream_arn ) raise else: return schema-
Per informazioni dettagliate sull’API, consulta DiscoverInputSchema nella documentazione di riferimento dell’API AWS SDK per Python (Boto3).
-
L’esempio di codice seguente mostra come utilizzare StartApplication.
- SDK per Python (Boto3)
-
Nota
Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def start(self, input_id): """ Starts an application. After the application is running, it reads from the specified input stream and runs the application code on the incoming data. :param input_id: The ID of the input to read. """ try: self.analytics_client.start_application( ApplicationName=self.name, RunConfiguration={ "SqlRunConfigurations": [ { "InputId": input_id, "InputStartingPositionConfiguration": { "InputStartingPosition": "NOW" }, } ] }, ) logger.info("Started application %s.", self.name) except ClientError: logger.exception("Couldn't start application %s.", self.name) raise-
Per informazioni dettagliate sull’API, consulta StartApplication nella documentazione di riferimento dell’API AWS SDK per Python (Boto3).
-
L’esempio di codice seguente mostra come utilizzare StopApplication.
- SDK per Python (Boto3)
-
Nota
Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def stop(self): """ Stops an application. This stops the application from processing data but does not delete any resources. """ try: self.analytics_client.stop_application(ApplicationName=self.name) logger.info("Stopping application %s.", self.name) except ClientError: logger.exception("Couldn't stop application %s.", self.name) raise-
Per informazioni dettagliate sull’API, consulta StopApplication nella documentazione di riferimento dell’API AWS SDK per Python (Boto3).
-
L’esempio di codice seguente mostra come utilizzare UpdateApplication.
- SDK per Python (Boto3)
-
Nota
Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS
. Questo esempio aggiorna il codice eseguito in un’applicazione esistente.
class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def update_code(self, code): """ Updates the code that runs in the application. The code must run in the runtime environment of the application, such as SQL. Application code typically reads data from in-application streams and transforms it in some way. :param code: The code to upload. This completely replaces any existing code in the application. :return: Metadata about the application. """ try: response = self.analytics_client.update_application( ApplicationName=self.name, CurrentApplicationVersionId=self.version_id, ApplicationConfigurationUpdate={ "ApplicationCodeConfigurationUpdate": { "CodeContentTypeUpdate": "PLAINTEXT", "CodeContentUpdate": {"TextContentUpdate": code}, } }, ) details = response["ApplicationDetail"] self.version_id = details["ApplicationVersionId"] logger.info("Update code for application %s.", self.name) except ClientError: logger.exception("Couldn't update code for application %s.", self.name) raise else: return details-
Per informazioni dettagliate sull’API, consulta UpdateApplication nella documentazione di riferimento dell’API AWS SDK per Python (Boto3).
-
Generatore di dati
L’esempio di codice seguente mostra come generare uno flusso Kinesis con un referrer.
- SDK per Python (Boto3)
-
Nota
Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS
. import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return {"REFERRER": "http://www.amazon.com"} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
L’esempio di codice seguente mostra come generare uno flusso Kinesis con anomalie a livello di pressione sanguigna.
- SDK per Python (Boto3)
-
Nota
Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS
. from enum import Enum import json import random import boto3 STREAM_NAME = "ExampleInputStream" class PressureType(Enum): low = "LOW" normal = "NORMAL" high = "HIGH" def get_blood_pressure(pressure_type): pressure = {"BloodPressureLevel": pressure_type.value} if pressure_type == PressureType.low: pressure["Systolic"] = random.randint(50, 80) pressure["Diastolic"] = random.randint(30, 50) elif pressure_type == PressureType.normal: pressure["Systolic"] = random.randint(90, 120) pressure["Diastolic"] = random.randint(60, 80) elif pressure_type == PressureType.high: pressure["Systolic"] = random.randint(130, 200) pressure["Diastolic"] = random.randint(90, 150) else: raise TypeError return pressure def generate(stream_name, kinesis_client): while True: rnd = random.random() pressure_type = ( PressureType.low if rnd < 0.005 else PressureType.high if rnd > 0.995 else PressureType.normal ) blood_pressure = get_blood_pressure(pressure_type) print(blood_pressure) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(blood_pressure), PartitionKey="partitionkey", ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
L’esempio di codice seguente mostra come generare uno flusso Kinesis con dati nelle colonne.
- SDK per Python (Boto3)
-
Nota
Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS
. import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return {"Col_A": "a", "Col_B": "b", "Col_C": "c", "Col_E_Unstructured": "x,y,z"} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
L’esempio di codice seguente mostra come generare uno flusso Kinesis con anomalie a livello di frequenza cardiaca.
- SDK per Python (Boto3)
-
Nota
Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS
. from enum import Enum import json import random import boto3 STREAM_NAME = "ExampleInputStream" class RateType(Enum): normal = "NORMAL" high = "HIGH" def get_heart_rate(rate_type): if rate_type == RateType.normal: rate = random.randint(60, 100) elif rate_type == RateType.high: rate = random.randint(150, 200) else: raise TypeError return {"heartRate": rate, "rateType": rate_type.value} def generate(stream_name, kinesis_client, output=True): while True: rnd = random.random() rate_type = RateType.high if rnd < 0.01 else RateType.normal heart_rate = get_heart_rate(rate_type) if output: print(heart_rate) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(heart_rate), PartitionKey="partitionkey", ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
L’esempio di codice seguente mostra come generare uno flusso Kinesis con hotspot.
- SDK per Python (Boto3)
-
Nota
Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS
. import json from pprint import pprint import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_hotspot(field, spot_size): hotspot = { "left": field["left"] + random.random() * (field["width"] - spot_size), "width": spot_size, "top": field["top"] + random.random() * (field["height"] - spot_size), "height": spot_size, } return hotspot def get_record(field, hotspot, hotspot_weight): rectangle = hotspot if random.random() < hotspot_weight else field point = { "x": rectangle["left"] + random.random() * rectangle["width"], "y": rectangle["top"] + random.random() * rectangle["height"], "is_hot": "Y" if rectangle is hotspot else "N", } return {"Data": json.dumps(point), "PartitionKey": "partition_key"} def generate( stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client ): """ Generates points used as input to a hotspot detection algorithm. With probability hotspot_weight (20%), a point is drawn from the hotspot; otherwise, it is drawn from the base field. The location of the hotspot changes for every 1000 points generated. """ points_generated = 0 hotspot = None while True: if points_generated % 1000 == 0: hotspot = get_hotspot(field, hotspot_size) records = [ get_record(field, hotspot, hotspot_weight) for _ in range(batch_size) ] points_generated += len(records) pprint(records) kinesis_client.put_records(StreamName=stream_name, Records=records) time.sleep(0.1) if __name__ == "__main__": generate( stream_name=STREAM_NAME, field={"left": 0, "width": 10, "top": 0, "height": 10}, hotspot_size=1, hotspot_weight=0.2, batch_size=10, kinesis_client=boto3.client("kinesis"), )
L’esempio di codice seguente mostra come generare uno flusso Kinesis con voci di log.
- SDK per Python (Boto3)
-
Nota
Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS
. import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] " '"GET /index.php HTTP/1.1" 200 125 "-" ' '"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0"' } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
L’esempio di codice seguente mostra come generare uno flusso Kinesis con dati sfalsati.
- SDK per Python (Boto3)
-
Nota
Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS
. import datetime import json import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): event_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=10) return { "EVENT_TIME": event_time.isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), } def generate(stream_name, kinesis_client): while True: data = get_data() # Send six records, ten seconds apart, with the same event time and ticker for _ in range(6): print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey", ) time.sleep(10) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
L’esempio di codice seguente mostra come generare uno flusso Kinesis con dati dei ticker azionari.
- SDK per Python (Boto3)
-
Nota
Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS
. import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
L’esempio di codice seguente mostra come generare uno flusso Kinesis con due tipi di dati.
- SDK per Python (Boto3)
-
Nota
Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS
. import json import random import boto3 STREAM_NAME = "OrdersAndTradesStream" PARTITION_KEY = "partition_key" def get_order(order_id, ticker): return { "RecordType": "Order", "Oid": order_id, "Oticker": ticker, "Oprice": random.randint(500, 10000), "Otype": "Sell", } def get_trade(order_id, trade_id, ticker): return { "RecordType": "Trade", "Tid": trade_id, "Toid": order_id, "Tticker": ticker, "Tprice": random.randint(0, 3000), } def generate(stream_name, kinesis_client): order_id = 1 while True: ticker = random.choice(["AAAA", "BBBB", "CCCC"]) order = get_order(order_id, ticker) print(order) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(order), PartitionKey=PARTITION_KEY ) for trade_id in range(1, random.randint(0, 6)): trade = get_trade(order_id, trade_id, ticker) print(trade) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(trade), PartitionKey=PARTITION_KEY, ) order_id += 1 if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
L’esempio di codice seguente mostra come generare uno flusso Kinesis con dati dei log web.
- SDK per Python (Boto3)
-
Nota
Ulteriori informazioni su GitHub. Trova l’esempio completo e scopri di più sulla configurazione e l’esecuzione nel Repository di esempi di codice AWS
. import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "log": "192.168.254.30 - John [24/May/2004:22:01:02 -0700] " '"GET /icons/apache_pb.gif HTTP/1.1" 304 0' } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))