Testar um esquema
Enquanto você desenvolve seu código, você deve executar testes locais para verificar se o layout do fluxo de trabalho está correto.
Testes locais não geram trabalhos, crawlers ou acionadores do AWS Glue. Em vez disso, execute o script de layout localmente e use os métodos to_json() e validate() para imprimir objetos e encontrar erros. Esses métodos estão disponíveis em todas as três classes definidas nas bibliotecas.
Há duas maneiras de lidar com os argumentos user_params e system_params que o AWS Glue transmite para sua função de layout. Seu código de banco de teste pode criar um dicionário de valores de parâmetro de blueprint de exemplo e transmiti-lo para a função de layout como o argumento user_params. Ou você pode remover as referências a user_params e substituí-las por strings em código fixo.
Se o seu código usar as propriedades region e accountId no argumento system_params, você pode transmitir seu próprio dicionário para system_params.
Para testar um blueprint
-
Inicie um interpretador Python em um diretório com as bibliotecas ou carregue os arquivos de blueprint e as bibliotecas fornecidas em seu ambiente de desenvolvimento integrado (IDE) preferido.
-
Certifique-se de que o código importe as bibliotecas fornecidas.
-
Adicione código à sua função de layout para chamar
validate()outo_json()em qualquer entidade ou no objetoWorkflow. Por exemplo, se o seu código criar um objetoCrawlerchamadomycrawler, é possível chamarvalidate()como a seguir.mycrawler.validate()Você pode imprimir
mycrawlercomo a seguir:print(mycrawler.to_json())Se você chamar
to_jsonem um objeto, não há necessidade de também chamarvalidate(), poisto_json()chamavalidate().É mais útil chamar esses métodos no objeto de fluxo de trabalho. Supondo que seu script nomeie o objeto de fluxo de trabalho
my_workflow, valide e imprima o objeto de fluxo de trabalho da seguinte forma:print(my_workflow.to_json())Para obter mais informações sobre
to_json()evalidate(), consulte Classe Methods.Você também pode importar
pprinte formatar o estilo do objeto do fluxo de trabalho, conforme mostrado no exemplo mais adiante nesta seção. -
Execute o código, corrija erros e, finalmente, remova quaisquer chamadas para
validate()outo_json().
O exemplo a seguir mostra como construir um dicionário de parâmetros de blueprint de exemplo e transmiti-lo como o argumento user_params para a função de layout generate_compaction_workflow. Ele também mostra como formatar o estilo do objeto de fluxo de trabalho gerado.
from pprint import pprint from awsglue.blueprint.workflow import * from awsglue.blueprint.job import * from awsglue.blueprint.crawler import * USER_PARAMS = {"WorkflowName": "compaction_workflow", "ScriptLocation": "s3://amzn-s3-demo-bucket/scripts/threaded-compaction.py", "PassRole": "arn:aws:iam::111122223333:role/GlueRole-ETL", "DatabaseName": "cloudtrial", "TableName": "ct_cloudtrail", "CoalesceFactor": 4, "MaxThreadWorkers": 200} def generate_compaction_workflow(user_params: dict, system_params: dict) -> Workflow: compaction_job = Job(Name=f"{user_params['WorkflowName']}_etl_job", Command={"Name": "glueetl", "ScriptLocation": user_params['ScriptLocation'], "PythonVersion": "3"}, Role="arn:aws:iam::111122223333:role/AWSGlueServiceRoleDefault", DefaultArguments={"DatabaseName": user_params['DatabaseName'], "TableName": user_params['TableName'], "CoalesceFactor": user_params['CoalesceFactor'], "max_thread_workers": user_params['MaxThreadWorkers']}) catalog_target = {"CatalogTargets": [{"DatabaseName": user_params['DatabaseName'], "Tables": [user_params['TableName']]}]} compacted_files_crawler = Crawler(Name=f"{user_params['WorkflowName']}_post_crawl", Targets = catalog_target, Role=user_params['PassRole'], DependsOn={compaction_job: "SUCCEEDED"}, WaitForDependencies="AND", SchemaChangePolicy={"DeleteBehavior": "LOG"}) compaction_workflow = Workflow(Name=user_params['WorkflowName'], Entities=Entities(Jobs=[compaction_job], Crawlers=[compacted_files_crawler])) return compaction_workflow generated = generate_compaction_workflow(user_params=USER_PARAMS, system_params={}) gen_dict = generated.to_json() pprint(gen_dict)