本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
將 Oracle 外部資料表遷移至 Amazon Aurora PostgreSQL 相容
anuradha chintha 和 Rakesh Raghav,Amazon Web Services
Summary
外部資料表可讓 Oracle 查詢以一般檔案存放在資料庫外部的資料。您可以使用 ORACLE_LOADER 驅動程式來存取以 SQL*Loader 公用程式可載入之任何格式儲存的任何資料。您無法在外部資料表上使用資料處理語言 (DML),但可以使用外部資料表進行查詢、聯結和排序操作。
Amazon Aurora PostgreSQL 相容版本不提供類似於 Oracle 中外部資料表的功能。反之,您必須使用現代化來開發符合功能需求的可擴展解決方案,而且是正式的。
此模式提供使用 aws_s3擴充功能,將不同類型的 Oracle 外部資料表遷移至 Amazon Web Services (AWS) 雲端上的 Aurora PostgreSQL 相容版本的步驟。
我們建議在生產環境中實作此解決方案之前,先徹底測試此解決方案。
先決條件和限制
先決條件
- 作用中的 AWS 帳戶 
- AWS 命令列界面 (AWS CLI) 
- 可用的 Aurora PostgreSQL 相容資料庫執行個體。 
- 具有外部資料表的現場部署 Oracle 資料庫 
- pg.Client API 
- 資料檔案 
限制
- 此模式不提供可取代 Oracle 外部資料表的功能。不過,您可以進一步增強步驟和範本程式碼,以實現資料庫現代化目標。 
- 檔案不應包含以分隔符號形式在 - aws_s3匯出和匯入函數中傳遞的字元。
產品版本
- 若要從 Amazon S3 匯入 RDS for PostgreSQL,資料庫必須執行 PostgreSQL 10.7 版或更新版本。 
架構
來源技術堆疊
- Oracle 
來源架構

目標技術堆疊
- Amazon Aurora PostgreSQL 相容 
- Amazon CloudWatch 
- AWS Lambda 
- AWS Secrets Manager 
- Amazon Simple Notification Service (Amazon SNS) 
- Amazon Simple Storage Service (Amazon S3) 
目標架構
下圖顯示解決方案的高階表示法。

- 檔案會上傳至 S3 儲存貯體。 
- Lambda 函數已啟動。 
- Lambda 函數會啟動資料庫函數呼叫。 
- Secrets Manager 提供資料庫存取的登入資料。 
- 根據資料庫函數,會建立 SNS 警示。 
自動化和擴展
外部資料表的任何新增或變更都可以使用中繼資料維護來處理。
工具
- Amazon Aurora PostgreSQL 相容 – Amazon Aurora PostgreSQL 相容版本是全受管、PostgreSQL 相容和 ACID 相容的關係資料庫引擎,結合了高階商業資料庫的速度和可靠性,以及開放原始碼資料庫的成本效益。 
- AWS CLI – AWS 命令列界面 (AWS CLI) 是管理 AWS 服務的統一工具。只需下載和設定一個工具,您就可以從命令列控制多個 AWS 服務,並透過指令碼將其自動化。 
- Amazon CloudWatch – Amazon CloudWatch 會監控 Amazon S3 資源和使用率。 
- AWS Lambda – AWS Lambda 是一種無伺服器運算服務,支援執行程式碼,無需佈建或管理伺服器、建立工作負載感知叢集擴展邏輯、維護事件整合,或管理執行時間。在此模式中,每當檔案上傳至 Amazon S3 時,Lambda 都會執行資料庫函數。 
- AWS Secrets Manager – AWS Secrets Manager 是一種用於憑證儲存和擷取的服務。使用 Secrets Manager,您可以使用以程式設計方式呼叫 Secrets Manager 擷取秘密的 API,取代程式碼中的硬式編碼登入資料,包括密碼。 
- Amazon S3 – Amazon Simple Storage Service (Amazon S3) 提供儲存層,用於接收和存放檔案,以供取用和往返 Aurora PostgreSQL 相容叢集傳輸。 
- aws_s3 – - aws_s3延伸模組整合 Amazon S3 和 Aurora PostgreSQL 相容。
- Amazon SNS – Amazon Simple Notification Service (Amazon SNS) 會協調和管理發佈者和用戶端之間的訊息傳遞或傳送。在此模式中,Amazon SNS 用於傳送通知。 
Code
每當檔案放入 S3 儲存貯體時,都必須從處理應用程式或 Lambda 函數建立和呼叫資料庫函數。如需詳細資訊,請參閱程式碼 (已連接)。
史詩
| 任務 | 描述 | 所需的技能 | 
|---|---|---|
| 將外部檔案新增至來源資料庫。 | 建立外部檔案,並將其移至  | DBA | 
| 任務 | 描述 | 所需的技能 | 
|---|---|---|
| 建立 Aurora PostgreSQL 資料庫。 | 在 Amazon Aurora PostgreSQL 相容叢集中建立資料庫執行個體。 | DBA | 
| 建立結構描述、aws_s3 延伸模組和資料表。 | 在其他資訊區段 | DBA、開發人員 | 
| 建立 資料庫函數。 | 若要建立資料庫函數,請使用其他資訊區段中函數下的 | DBA、開發人員 | 
| 任務 | 描述 | 所需的技能 | 
|---|---|---|
| 建立角色。 | 建立具有存取 Amazon S3 和 Amazon Relational Database Service (Amazon RDS) 許可的角色。此角色將指派給 Lambda 以執行模式。 | DBA | 
| 建立 Lambda 函數。 | 建立從 Amazon S3 讀取檔案名稱的 Lambda 函數 (例如  根據函數呼叫結果,將會啟動 SNS 通知 (例如  根據您的業務需求,您可以視需要建立具有額外程式碼的 Lambda 函數。如需詳細資訊,請參閱 Lambda 文件。 | DBA | 
| 設定 S3 儲存貯體事件觸發。 | 設定機制來呼叫 S3 儲存貯體中所有物件建立事件的 Lambda 函數。 | DBA | 
| 建立秘密。 | 使用 Secrets Manager 建立資料庫登入資料的秘密名稱。在 Lambda 函數中傳遞秘密。 | DBA | 
| 上傳 Lambda 支援檔案。 | 上傳 .zip 檔案,其中包含 Lambda 支援套件和連接的 Python 指令碼,以連線至 Aurora PostgreSQL 相容。Python 程式碼會呼叫您在資料庫中建立的 函數。 | DBA | 
| 建立 SNS 主題。 | 建立 SNS 主題以傳送郵件,確保資料載入成功或失敗。 | DBA | 
| 任務 | 描述 | 所需的技能 | 
|---|---|---|
| 建立 S3 儲存貯體。 | 在 Amazon S3 主控台上,使用不包含正斜線的唯一名稱建立 S3 儲存貯體。S3 儲存貯體名稱全域唯一,且命名空間由所有 AWS 帳戶共用。 | DBA | 
| 建立 IAM 政策。 | 若要建立 AWS Identity and Access Management (IAM) 政策,請使用其他資訊區段 | DBA | 
| 建立角色。 | 為 Aurora PostgreSQL 相容建立兩個角色,一個用於匯入的角色,另一個用於匯出的角色。將對應的政策指派給角色。 | DBA | 
| 將角色連接至 Aurora PostgreSQL 相容叢集。 | 在管理角色下,將匯入和匯出角色連接至 Aurora PostgreSQL 叢集。 | DBA | 
| 為 Aurora PostgreSQL 相容建立支援物件。 | 對於資料表指令碼,請在其他資訊區段 對於自訂函數,請在其他資訊區段 | DBA | 
| 任務 | 描述 | 所需的技能 | 
|---|---|---|
| 將檔案上傳至 S3 儲存貯體。 | 若要將測試檔案上傳至 S3 儲存貯體,請使用主控台或 AWS CLI 中的下列命令。 
 一旦上傳檔案,儲存貯體事件就會啟動 Lambda 函數,執行 Aurora PostgreSQL 相容函數。 | DBA | 
| 檢查資料以及日誌和錯誤檔案。 | Aurora PostgreSQL 相容函數會將檔案載入主資料表,並在 S3 儲存貯體中建立  | DBA | 
| 監控解決方案。 | 在 Amazon CloudWatch 主控台中,監控 Lambda 函數。 | DBA | 
相關資源
其他資訊
ext_table_scripts
CREATE EXTENSION aws_s3 CASCADE; CREATE TABLE IF NOT EXISTS meta_EXTERNAL_TABLE ( table_name_stg character varying(100) , table_name character varying(100) , col_list character varying(1000) , data_type character varying(100) , col_order numeric, start_pos numeric, end_pos numeric, no_position character varying(100) , date_mask character varying(100) , delimeter character(1) , directory character varying(100) , file_name character varying(100) , header_exist character varying(5) ); CREATE TABLE IF NOT EXISTS ext_tbl_stg ( col1 text ); CREATE TABLE IF NOT EXISTS error_table ( error_details text, file_name character varying(100), processed_time timestamp without time zone ); CREATE TABLE IF NOT EXISTS log_table ( file_name character varying(50) COLLATE pg_catalog."default", processed_date timestamp without time zone, tot_rec_count numeric, proc_rec_count numeric, error_rec_count numeric ); sample insert scripts of meta data: INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'source_filename', 'character varying', 2, 8, 27, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'record_type_identifier', 'character varying', 3, 28, 30, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'fad_code', 'numeric', 4, 31, 36, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'session_sequence_number', 'numeric', 5, 37, 42, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'transaction_sequence_number', 'numeric', 6, 43, 48, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
s3bucketpolicy_for 匯入
---Import role policy --Create an IAM policy to allow, Get, and list actions on S3 bucket { "Version": "2012-10-17", "Statement": [ { "Sid": "s3import", "Action": [ "s3:GetObject", "s3:ListBucket" ], "Effect": "Allow", "Resource": [ "arn:aws:s3:::s3importtest", "arn:aws:s3:::s3importtest/*" ] } ] } --Export Role policy --Create an IAM policy to allow, put, and list actions on S3 bucket { "Version": "2012-10-17", "Statement": [ { "Sid": "s3export", "Action": [ "S3:PutObject", "s3:ListBucket" ], "Effect": "Allow", "Resource": [ "arn:aws:s3:::s3importtest/*" ] } ] }
資料庫函數 load_external_tables_latest 範例
CREATE OR REPLACE FUNCTION public.load_external_tables(pi_filename text) RETURNS character varying LANGUAGE plpgsql AS $function$ /* Loading data from S3 bucket into a APG table */ DECLARE v_final_sql TEXT; pi_ext_table TEXT; r refCURSOR; v_sqlerrm text; v_chunk numeric; i integer; v_col_list TEXT; v_postion_list CHARACTER VARYING(1000); v_len integer; v_delim varchar; v_file_name CHARACTER VARYING(1000); v_directory CHARACTER VARYING(1000); v_table_name_stg CHARACTER VARYING(1000); v_sql_col TEXT; v_sql TEXT; v_sql1 TEXT; v_sql2 TEXT; v_sql3 TEXT; v_cnt integer; v_sql_dynamic TEXT; v_sql_ins TEXT; proc_rec_COUNT integer; error_rec_COUNT integer; tot_rec_COUNT integer; v_rec_val integer; rec record; v_col_cnt integer; kv record; v_val text; v_header text; j integer; ERCODE VARCHAR(5); v_region text; cr CURSOR FOR SELECT distinct DELIMETER, FILE_NAME, DIRECTORY FROM meta_EXTERNAL_TABLE WHERE table_name = pi_ext_table AND DELIMETER IS NOT NULL; cr1 CURSOR FOR SELECT col_list, data_type, start_pos, END_pos, concat_ws('',' ',TABLE_NAME_STG) as TABLE_NAME_STG, no_position,date_mask FROM meta_EXTERNAL_TABLE WHERE table_name = pi_ext_table order by col_order asc; cr2 cursor FOR SELECT distinct table_name,table_name_stg FROM meta_EXTERNAL_TABLE WHERE upper(file_name) = upper(pi_filename); BEGIN -- PERFORM utl_file_utility.init(); v_region := 'us-east-1'; /* find tab details from file name */ --DELETE FROM ERROR_TABLE WHERE file_name= pi_filename; -- DELETE FROM log_table WHERE file_name= pi_filename; BEGIN SELECT distinct table_name,table_name_stg INTO strict pi_ext_table,v_table_name_stg FROM meta_EXTERNAL_TABLE WHERE upper(file_name) = upper(pi_filename); EXCEPTION WHEN NO_DATA_FOUND THEN raise notice 'error 1,%',sqlerrm; pi_ext_table := null; v_table_name_stg := null; RAISE USING errcode = 'NTFIP' ; when others then raise notice 'error others,%',sqlerrm; END; j :=1 ; for rec in cr2 LOOP pi_ext_table := rec.table_name; v_table_name_stg := rec.table_name_stg; v_col_list := null; IF pi_ext_table IS NOT NULL THEN --EXECUTE concat_ws('','truncate table ' ,pi_ext_table) ; EXECUTE concat_ws('','truncate table ' ,v_table_name_stg) ; SELECT distinct DELIMETER INTO STRICT v_delim FROM meta_EXTERNAL_TABLE WHERE table_name = pi_ext_table; IF v_delim IS NOT NULL THEN SELECT distinct DELIMETER, FILE_NAME, DIRECTORY , concat_ws('',' ',table_name_stg), case header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header FROM meta_EXTERNAL_TABLE WHERE table_name = pi_ext_table AND DELIMETER IS NOT NULL; IF upper(v_delim) = 'CSV' THEN v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3 ( ''', v_table_name_stg,''','''', ''DELIMITER '''','''' CSV HEADER QUOTE ''''"'''''', aws_commons.create_s3_uri ( ''', v_directory,''',''',v_file_name,''', ''',v_region,'''))'); ELSE v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''', v_table_name_stg, ''','''', ''DELIMITER AS ''''^''''',''',',' aws_commons.create_s3_uri ( ''',v_directory, ''',''', v_file_name, ''',', '''',v_region,''') )'); raise notice 'v_sql , %',v_sql; begin EXECUTE v_sql; EXCEPTION WHEN OTHERS THEN raise notice 'error 1'; RAISE USING errcode = 'S3IMP' ; END; select count(col_list) INTO v_col_cnt from meta_EXTERNAL_TABLE where table_name = pi_ext_table; -- raise notice 'v_sql 2, %',concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,''''); execute concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,''''); i :=1; FOR rec in cr1 loop v_sql1 := concat_ws('',v_sql1,'split_part(col1,''',v_delim,''',', i,')',' as ',rec.col_list,','); v_sql2 := concat_ws('',v_sql2,rec.col_list,','); -- v_sql3 := concat_ws('',v_sql3,'rec.',rec.col_list,'::',rec.data_type,','); case WHEN upper(rec.data_type) = 'NUMERIC' THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0 THEN null ELSE coalesce((trim(split_part(col1,''',v_delim,''',', i,')))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ; WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD' THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0 THEN null ELSE to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,','); WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'MM/DD/YYYY hh24:mi:ss' THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0 THEN null ELSE to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''01/01/9999 0024:00:00''),''MM/DD/YYYY hh24:mi:ss'')::',rec.data_type,' END as ',rec.col_list,','); ELSE v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0 THEN null ELSE coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),'''')::',rec.data_type,' END as ',rec.col_list,',') ; END case; i :=i+1; end loop; -- raise notice 'v_sql 3, %',v_sql3; SELECT trim(trailing ' ' FROM v_sql1) INTO v_sql1; SELECT trim(trailing ',' FROM v_sql1) INTO v_sql1; SELECT trim(trailing ' ' FROM v_sql2) INTO v_sql2; SELECT trim(trailing ',' FROM v_sql2) INTO v_sql2; SELECT trim(trailing ' ' FROM v_sql3) INTO v_sql3; SELECT trim(trailing ',' FROM v_sql3) INTO v_sql3; END IF; raise notice 'v_delim , %',v_delim; EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO v_cnt; raise notice 'stg cnt , %',v_cnt; /* if upper(v_delim) = 'CSV' then v_sql_ins := concat_ws('', ' SELECT * from ' ,v_table_name_stg ); else -- v_sql_ins := concat_ws('',' SELECT ',v_sql1,' from (select col1 from ' ,v_table_name_stg , ')sub '); v_sql_ins := concat_ws('',' SELECT ',v_sql3,' from (select col1 from ' ,v_table_name_stg , ')sub '); END IF;*/ v_chunk := v_cnt/100; for i in 1..101 loop BEGIN -- raise notice 'v_sql , %',v_sql; -- raise notice 'Chunk number , %',i; v_sql_ins := concat_ws('',' SELECT ',v_sql3,' from (select col1 from ' ,v_table_name_stg , ' offset ',v_chunk*(i-1), ' limit ',v_chunk,') sub '); v_sql := concat_ws('','insert into ', pi_ext_table ,' ', v_sql_ins); -- raise notice 'select statement , %',v_sql_ins; -- v_sql := null; -- EXECUTE concat_ws('','insert into ', pi_ext_table ,' ', v_sql_ins, 'offset ',v_chunk*(i-1), ' limit ',v_chunk ); --v_sql := concat_ws('','insert into ', pi_ext_table ,' ', v_sql_ins ); -- raise notice 'insert statement , %',v_sql; raise NOTICE 'CHUNK START %',v_chunk*(i-1); raise NOTICE 'CHUNK END %',v_chunk; EXECUTE v_sql; EXCEPTION WHEN OTHERS THEN -- v_sql_ins := concat_ws('',' SELECT ',v_sql1, ' from (select col1 from ' ,v_table_name_stg , ' )sub '); -- raise notice 'Chunk number for cursor , %',i; raise NOTICE 'Cursor - CHUNK START %',v_chunk*(i-1); raise NOTICE 'Cursor - CHUNK END %',v_chunk; v_sql_ins := concat_ws('',' SELECT ',v_sql3, ' from (select col1 from ' ,v_table_name_stg , ' )sub '); v_final_sql := REPLACE (v_sql_ins, ''''::text, ''''''::text); -- raise notice 'v_final_sql %',v_final_sql; v_sql :=concat_ws('','do $a$ declare r refcursor;v_sql text; i numeric;v_conname text; v_typ ',pi_ext_table,'[]; v_rec ','record','; begin open r for execute ''select col1 from ',v_table_name_stg ,' offset ',v_chunk*(i-1), ' limit ',v_chunk,'''; loop begin fetch r into v_rec; EXIT WHEN NOT FOUND; v_sql := concat_ws('''',''insert into ',pi_ext_table,' SELECT ',REPLACE (v_sql3, ''''::text, ''''''::text) , ' from ( select '''''',v_rec.col1,'''''' as col1) v''); execute v_sql; exception when others then v_sql := ''INSERT INTO ERROR_TABLE VALUES (concat_ws('''''''',''''Error Name: '''',$$''||SQLERRM||''$$,''''Error State: '''',''''''||SQLSTATE||'''''',''''record : '''',$$''||v_rec.col1||''$$),'''''||pi_filename||''''',now())''; execute v_sql; continue; end ; end loop; close r; exception when others then raise; end ; $a$'); -- raise notice ' inside excp v_sql %',v_sql; execute v_sql; -- raise notice 'v_sql %',v_sql; END; END LOOP; ELSE SELECT distinct DELIMETER,FILE_NAME,DIRECTORY ,concat_ws('',' ',table_name_stg), case header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header FROM meta_EXTERNAL_TABLE WHERE table_name = pi_ext_table ; v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''', v_table_name_stg, ''','''', ''DELIMITER AS ''''#'''' ',v_header,' '',',' aws_commons.create_s3_uri ( ''',v_directory, ''',''', v_file_name, ''',', '''',v_region,''') )'); EXECUTE v_sql; FOR rec in cr1 LOOP IF rec.start_pos IS NULL AND rec.END_pos IS NULL AND rec.no_position = 'recnum' THEN v_rec_val := 1; ELSE case WHEN upper(rec.data_type) = 'NUMERIC' THEN v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0 THEN null ELSE coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1)))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ; WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD' THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0 THEN null ELSE to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,','); WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDDHH24MISS' THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0 THEN null ELSE to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''9999010100240000''),''YYYYMMDDHH24MISS'')::',rec.data_type,' END as ',rec.col_list,','); ELSE v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0 THEN null ELSE coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),'''')::',rec.data_type,' END as ',rec.col_list,',') ; END case; END IF; v_col_list := concat_ws('',v_col_list ,v_sql1); END LOOP; SELECT trim(trailing ' ' FROM v_col_list) INTO v_col_list; SELECT trim(trailing ',' FROM v_col_list) INTO v_col_list; v_sql_col := concat_ws('',trim(trailing ',' FROM v_col_list) , ' FROM ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 '); v_sql_dynamic := v_sql_col; EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO v_cnt; IF v_rec_val = 1 THEN v_sql_ins := concat_ws('',' select row_number() over(order by ctid) as line_number ,' ,v_sql_dynamic) ; ELSE v_sql_ins := concat_ws('',' SELECT' ,v_sql_dynamic) ; END IF; BEGIN EXECUTE concat_ws('','insert into ', pi_ext_table ,' ', v_sql_ins); EXCEPTION WHEN OTHERS THEN IF v_rec_val = 1 THEN v_final_sql := ' select row_number() over(order by ctid) as line_number ,col1 from '; ELSE v_final_sql := ' SELECT col1 from'; END IF; v_sql :=concat_ws('','do $a$ declare r refcursor;v_rec_val numeric := ',coalesce(v_rec_val,0),';line_number numeric; col1 text; v_typ ',pi_ext_table,'[]; v_rec ',pi_ext_table,'; begin open r for execute ''',v_final_sql, ' ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 '' ; loop begin if v_rec_val = 1 then fetch r into line_number,col1; else fetch r into col1; end if; EXIT WHEN NOT FOUND; if v_rec_val = 1 then select line_number,',trim(trailing ',' FROM v_col_list) ,' into v_rec; else select ',trim(trailing ',' FROM v_col_list) ,' into v_rec; end if; insert into ',pi_ext_table,' select v_rec.*; exception when others then INSERT INTO ERROR_TABLE VALUES (concat_ws('''',''Error Name: '',SQLERRM,''Error State: '',SQLSTATE,''record : '',v_rec),''',pi_filename,''',now()); continue; end ; end loop; close r; exception when others then raise; end ; $a$'); execute v_sql; END; END IF; EXECUTE concat_ws('','SELECT COUNT(*) FROM ' ,pi_ext_table) INTO proc_rec_COUNT; EXECUTE concat_ws('','SELECT COUNT(*) FROM error_table WHERE file_name =''',pi_filename,''' and processed_time::date = clock_timestamp()::date') INTO error_rec_COUNT; EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO tot_rec_COUNT; INSERT INTO log_table values(pi_filename,now(),tot_rec_COUNT,proc_rec_COUNT, error_rec_COUNT); raise notice 'v_directory, %',v_directory; raise notice 'pi_filename, %',pi_filename; raise notice 'v_region, %',v_region; perform aws_s3.query_export_to_s3('SELECT replace(trim(substring(error_details,position(''('' in error_details)+1),'')''),'','','';''),file_name,processed_time FROM error_table WHERE file_name = '''||pi_filename||'''', aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region), options :='FORmat csv, header, delimiter $$,$$' ); raise notice 'v_directory, %',v_directory; raise notice 'pi_filename, %',pi_filename; raise notice 'v_region, %',v_region; perform aws_s3.query_export_to_s3('SELECT * FROM log_table WHERE file_name = '''||pi_filename||'''', aws_commons.create_s3_uri(v_directory, pi_filename||'.log', v_region), options :='FORmat csv, header, delimiter $$,$$' ); END IF; j := j+1; END LOOP; RETURN 'OK'; EXCEPTION WHEN OTHERS THEN raise notice 'error %',sqlerrm; ERCODE=SQLSTATE; IF ERCODE = 'NTFIP' THEN v_sqlerrm := concat_Ws('',sqlerrm,'No data for the filename'); ELSIF ERCODE = 'S3IMP' THEN v_sqlerrm := concat_Ws('',sqlerrm,'Error While exporting the file from S3'); ELSE v_sqlerrm := sqlerrm; END IF; select distinct directory into v_directory from meta_EXTERNAL_TABLE; raise notice 'exc v_directory, %',v_directory; raise notice 'exc pi_filename, %',pi_filename; raise notice 'exc v_region, %',v_region; perform aws_s3.query_export_to_s3('SELECT * FROM error_table WHERE file_name = '''||pi_filename||'''', aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region), options :='FORmat csv, header, delimiter $$,$$' ); RETURN null; END; $function$