Verwenden des SAP-OData-Statusverwaltungsskripts
Gehen Sie wie folgt vor, um das SAP-OData-Statusverwaltungsskript in Ihrem AWS Glue-Auftrag zu verwenden:
Laden Sie das SAP-OData-Statusverwaltungsskript
s3://aws-blogs-artifacts-public/artifacts/BDB-4789/sap_odata_state_management.zipaus dem öffentlichen Amazon-S3-Bucket herunter.Laden Sie das Skript in einen Amazon-S3-Bucket hoch, für den Ihr AWS Glue-Auftrag Zugriffsberechtigungen besitzt.
Verweisen Sie in Ihrem AWS Glue-Auftrag auf das Skript: Wenn Sie Ihren AWS Glue-Auftrag erstellen oder aktualisieren, übergeben Sie die Option
'--extra-py-files', die auf den Skriptpfad in Ihrem Amazon-S3-Bucket verweist. Beispiel:--extra-py-files s3://your-bucket/path/to/sap_odata_state_management.pyImportieren und verwenden Sie die Bibliothek für die Statusverwaltung in Ihren AWS Glue-Auftragsskripts.
Beispiel für inkrementelle Übertragung auf Delta-Token-Basis
Hier ist ein Beispiel für die Verwendung des Statusverwaltungsskripts für inkrementelle Übertragungen auf Delta-Token-Basis:
from sap_odata_state_management import StateManagerFactory, StateManagerType, StateType # Initialize the state manager state_manager = StateManagerFactory.create_manager( manager_type=StateManagerType.JOB_TAG, state_type=StateType.DELTA_TOKEN, options={ "job_name": args['JOB_NAME'], "logger": logger } ) # Get connector options (including delta token if available) key = "SAPODataNode" connector_options = state_manager.get_connector_options(key) # Use the connector options in your Glue job df = glueContext.create_dynamic_frame.from_options( connection_type="SAPOData", connection_options={ "connectionName": "connectionName", "ENTITY_NAME": "entityName", "ENABLE_CDC": "true", **connector_options } ) # Process your data here... # Update the state after processing state_manager.update_state(key, sapodata_df.toDF())
Beispiel für auf Zeitstempeln basierende inkrementelle Übertragung
Hier ist ein Beispiel für die Verwendung des Statusverwaltungsskripts für inkrementelle Übertragungen auf Delta-Token-Basis:
from sap_odata_state_management import StateManagerFactory, StateManagerType, StateType # Initialize the state manager state_manager = StateManagerFactory.create_manager( manager_type=StateManagerType.JOB_TAG, state_type=StateType.DELTA_TOKEN, options={ "job_name": args['JOB_NAME'], "logger": logger } ) # Get connector options (including delta token if available) key = "SAPODataNode" connector_options = state_manager.get_connector_options(key) # Use the connector options in your Glue job df = glueContext.create_dynamic_frame.from_options( connection_type="SAPOData", connection_options={ "connectionName": "connectionName", "ENTITY_NAME": "entityName", "ENABLE_CDC": "true", **connector_options } ) # Process your data here... # Update the state after processing state_manager.update_state(key, sapodata_df.toDF())
In beiden Beispielen bewältigt das Statusverwaltungsskript die Komplexität der Speicherung des Status (entweder Delta-Token oder Zeitstempel) zwischen Auftragsausführungen. Es ruft beim Abrufen der Connector-Optionen automatisch den letzten bekannten Status ab und aktualisiert den Status nach der Verarbeitung. So wird sichergestellt, dass bei jeder Ausführung des Auftrags nur neue oder geänderte Daten verarbeitet werden.