

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Migrasikan tabel eksternal Oracle ke Amazon Aurora PostgreSQL yang kompatibel
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible"></a>

*anuradha chintha dan Rakesh Raghav, Amazon Web Services*

## Ringkasan
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-summary"></a>

Tabel eksternal memberikan Oracle kemampuan untuk query data yang disimpan di luar database dalam file datar. Anda dapat menggunakan driver ORACLE\$1LOADER untuk mengakses data apa pun yang disimpan dalam format apa pun yang dapat dimuat oleh utilitas SQL\$1 Loader. Anda tidak dapat menggunakan Data Manipulation Language (DHTML) pada tabel eksternal, tetapi Anda dapat menggunakan tabel eksternal untuk operasi kueri, bergabung, dan mengurutkan.

Amazon Aurora PostgreSQL Compatible Edition tidak menyediakan fungsionalitas yang mirip dengan tabel eksternal di Oracle. Sebaliknya, Anda harus menggunakan modernisasi untuk mengembangkan solusi terukur yang memenuhi persyaratan fungsional dan hemat.

Pola ini menyediakan langkah-langkah untuk memigrasikan berbagai jenis tabel eksternal Oracle ke Aurora PostgreSQL Compatible Edition di Amazon Web Services (AWS) Cloud dengan menggunakan ekstensi. `aws_s3`

Kami merekomendasikan pengujian solusi ini secara menyeluruh sebelum menerapkannya di lingkungan produksi.

## Prasyarat dan batasan
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-prereqs"></a>

**Prasyarat**
+ Akun AWS yang aktif
+ Antarmuka Baris Perintah AWS (AWS CLI)
+ Sebuah instance database Aurora PostgreSQL yang kompatibel dengan Aurora.
+ Database Oracle lokal dengan tabel eksternal
+ PG.Client API
+ File data 

**Batasan**
+ Pola ini tidak menyediakan fungsionalitas untuk bertindak sebagai pengganti tabel eksternal Oracle. Namun, langkah-langkah dan kode sampel dapat ditingkatkan lebih lanjut untuk mencapai tujuan modernisasi database Anda.
+ File tidak boleh berisi karakter yang diteruskan sebagai pembatas dalam fungsi `aws_s3` ekspor dan impor.

**Versi produk**
+ Untuk mengimpor dari Amazon S3 ke RDS untuk PostgreSQL database harus menjalankan PostgreSQL versi 10.7 atau yang lebih baru.

## Arsitektur
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-architecture"></a>

**Tumpukan teknologi sumber**
+ Oracle

**Arsitektur sumber**

![\[Diagram file data yang masuk ke direktori dan tabel di database Oracle lokal.\]](http://docs.aws.amazon.com/id_id/prescriptive-guidance/latest/patterns/images/pattern-img/555e69af-36fc-4ff5-b66c-af22b4cf262a/images/3fbc507d-b0fa-4e05-b999-043dc7327ed7.png)


**Tumpukan teknologi target**
+ Kompatibel dengan Amazon Aurora PostgreSQL
+ Amazon CloudWatch
+ AWS Lambda
+ AWS Secrets Manager
+ Amazon Simple Notification Service (Amazon SNS)
+ Amazon Simple Storage Service (Amazon S3)

**Arsitektur target**

Diagram berikut menunjukkan representasi tingkat tinggi dari solusi.

![\[Deskripsi adalah setelah diagram.\]](http://docs.aws.amazon.com/id_id/prescriptive-guidance/latest/patterns/images/pattern-img/555e69af-36fc-4ff5-b66c-af22b4cf262a/images/5421540e-d2e3-4361-89cc-d8415fcb21fd.png)


1. File diunggah ke bucket S3.

1. Fungsi Lambda dimulai.

1. Fungsi Lambda memulai panggilan fungsi DB.

1. Secrets Manager menyediakan kredensyal untuk akses database.

1. Tergantung pada fungsi DB, alarm SNS dibuat.

**Otomatisasi dan skala**

Setiap penambahan atau perubahan pada tabel eksternal dapat ditangani dengan pemeliharaan metadata.

## Alat
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-tools"></a>
+ [Kompatibel dengan Amazon Aurora PostgreSQL](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/CHAP_AuroraOverview.html) — Amazon Aurora PostgreSQL Compatible Edition adalah mesin basis data relasional relasional yang dikelola sepenuhnya, kompatibel dengan PostgreSQL, dan sesuai dengan Asam yang menggabungkan kecepatan dan keandalan database komersial kelas atas dengan efektivitas biaya database sumber terbuka.
+ [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-welcome.html) — AWS Command Line Interface (AWS CLI) Command Line Interface (AWS CLI) adalah alat terpadu untuk mengelola layanan AWS Anda. Dengan hanya satu alat untuk mengunduh dan mengonfigurasi, Anda dapat mengontrol beberapa layanan AWS dari baris perintah dan mengotomatiskannya melalui skrip.
+ [Amazon CloudWatch](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/WhatIsCloudWatch.html) — Amazon CloudWatch memantau sumber daya dan pemanfaatan Amazon S3.
+ [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/welcome.html) — AWS Lambda adalah layanan komputasi tanpa server yang mendukung menjalankan kode tanpa menyediakan atau mengelola server, membuat logika penskalaan klaster yang sadar beban kerja, mempertahankan integrasi peristiwa, atau mengelola runtime. Dalam pola ini, Lambda menjalankan fungsi database setiap kali file diunggah ke Amazon S3.
+ [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) — AWS Secrets Manager adalah layanan untuk penyimpanan dan pengambilan kredensyal. Dengan menggunakan Secrets Manager, Anda dapat mengganti kredensi hardcode dalam kode Anda, termasuk kata sandi, dengan panggilan API ke Secrets Manager untuk mengambil rahasia secara terprogram.
+ [Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html) - Amazon Simple Storage Service (Amazon S3) Simple Storage Service menyediakan lapisan penyimpanan untuk menerima dan menyimpan file untuk konsumsi dan transmisi ke dan dari cluster yang kompatibel dengan Aurora PostgreSQL.
+ [aws\$1s3 - Ekstensi `aws_s3` mengintegrasikan Amazon S3](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/PostgreSQL.Procedural.Importing.html#aws_s3.table_import_from_s3) dan Aurora PostgreSQL kompatibel.
+ [Amazon SNS](https://docs.aws.amazon.com/sns/latest/dg/welcome.html) - Amazon Simple Notification Service (Amazon SNS) mengoordinasikan dan mengelola pengiriman atau pengiriman pesan antara penerbit dan klien. Dalam pola ini, Amazon SNS digunakan untuk mengirim notifikasi.

**Kode**

Setiap kali file ditempatkan di bucket S3, fungsi DB harus dibuat dan dipanggil dari aplikasi pemrosesan atau fungsi Lambda. Untuk detailnya, lihat kode (terlampir).

## Epik
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-epics"></a>

### Buat file eksternal
<a name="create-an-external-file"></a>


| Tugas | Deskripsi | Keterampilan yang dibutuhkan | 
| --- | --- | --- | 
| Tambahkan file eksternal ke database sumber. | Buat file eksternal, dan pindahkan ke `oracle` direktori. | DBA | 

### Konfigurasikan target (kompatibel dengan Aurora PostgreSQL)
<a name="configure-the-target-aurora-postgresql-compatible"></a>


| Tugas | Deskripsi | Keterampilan yang dibutuhkan | 
| --- | --- | --- | 
| Buat database Aurora PostgreSQL. | Buat instans DB di klaster yang kompatibel dengan Amazon Aurora PostgreSQL. | DBA | 
| Buat skema, ekstensi aws\$1s3, dan tabel. | Gunakan kode di bawah `ext_tbl_scripts` di bagian *Informasi tambahan*. Tabel termasuk tabel aktual, tabel pementasan, kesalahan dan tabel log, dan metatable. | DBA, Pengembang | 
| Buat fungsi DB. | Untuk membuat fungsi DB, gunakan kode di bawah `load_external_table_latest` fungsi di bagian *Informasi tambahan*. | DBA, Pengembang | 

### Buat fungsi Lambda dengan konsol
<a name="create-and-configure-the-lambda-function"></a>


| Tugas | Deskripsi | Keterampilan yang dibutuhkan | 
| --- | --- | --- | 
| Buat peran. | Buat peran dengan izin untuk mengakses Amazon S3 dan Amazon Relational Database Service (Amazon RDS). Peran ini akan ditugaskan ke Lambda untuk menjalankan pola. | DBA | 
| Buat fungsi Lambda. | Buat fungsi Lambda yang membaca nama file dari Amazon S3 (misalnya`file_key = info.get('object', {}).get('key')`,) dan memanggil fungsi DB (misalnya`curs.callproc("load_external_tables", [file_key])`,) dengan nama file sebagai parameter input.Bergantung pada hasil pemanggilan fungsi, pemberitahuan SNS akan dimulai (misalnya,`client.publish(TopicArn='arn:',Message='fileloadsuccess',Subject='fileloadsuccess')`).Berdasarkan kebutuhan bisnis Anda, Anda dapat membuat fungsi Lambda dengan kode tambahan jika diperlukan. Untuk informasi selengkapnya, lihat dokumentasi [Lambda](https://docs.aws.amazon.com/lambda/latest/dg/welcome.html). | DBA | 
| Konfigurasikan pemicu peristiwa bucket S3. | Konfigurasikan mekanisme untuk memanggil fungsi Lambda untuk semua peristiwa pembuatan objek di bucket S3. | DBA | 
| Buat rahasia. | Buat nama rahasia untuk kredensyal database dengan menggunakan Secrets Manager. Lewati rahasia dalam fungsi Lambda. | DBA | 
| Unggah file pendukung Lambda. | Unggah file.zip yang berisi paket dukungan Lambda dan skrip Python terlampir untuk menghubungkan ke Aurora PostgreSQL yang kompatibel. Kode Python memanggil fungsi yang Anda buat dalam database. | DBA | 
| Membuat sebuah topik SNS. | Buat topik SNS untuk mengirim email untuk keberhasilan atau kegagalan pemuatan data. | DBA | 

### Tambahkan integrasi dengan Amazon S3
<a name="add-integration-with-amazon-s3"></a>


| Tugas | Deskripsi | Keterampilan yang dibutuhkan | 
| --- | --- | --- | 
| Buat ember S3. | Di konsol Amazon S3, buat bucket S3 dengan nama unik yang tidak mengandung garis miring di depan. Nama bucket S3 unik secara global, dan namespace dibagikan oleh semua akun AWS. | DBA | 
| Buat kebijakan IAM. | Untuk membuat kebijakan AWS Identity and Access Management (IAM), gunakan kode `s3bucketpolicy_for_import` di bagian *Informasi tambahan*. | DBA | 
| Buat peran. | Buat dua peran untuk Aurora PostgreSQL kompatibel, satu peran untuk Impor dan satu peran untuk Ekspor. Tetapkan kebijakan yang sesuai untuk peran. | DBA | 
| Lampirkan peran ke cluster yang kompatibel dengan Aurora PostgreSQL. | Di bawah **Kelola peran**, lampirkan peran Impor dan Ekspor ke klaster PostgreSQL Aurora. | DBA | 
| Buat objek pendukung untuk Aurora PostgreSQL kompatibel. | Untuk skrip tabel, gunakan kode di bawah `ext_tbl_scripts` di bagian *Informasi tambahan*.Untuk fungsi kustom, gunakan kode di bawah `load_external_Table_latest` di bagian *Informasi tambahan*. | DBA | 

### Memproses file uji
<a name="process-a-test-file"></a>


| Tugas | Deskripsi | Keterampilan yang dibutuhkan | 
| --- | --- | --- | 
| Unggah file ke bucket S3. | Untuk mengunggah file pengujian ke bucket S3, gunakan konsol atau perintah berikut di AWS CLI. <pre>aws s3 cp /Users/Desktop/ukpost/exttbl/"testing files"/aps s3://s3importtest/inputext/aps</pre>Segera setelah file diunggah, peristiwa bucket memulai fungsi Lambda, yang menjalankan fungsi yang kompatibel dengan Aurora PostgreSQL. | DBA | 
| Periksa data dan file log dan kesalahan. | Fungsi Aurora PostgreSQL yang kompatibel memuat file ke dalam tabel utama, dan itu membuat `.log` dan `.bad` file di bucket S3. | DBA | 
| Pantau solusinya. | Di CloudWatch konsol Amazon, pantau fungsi Lambda. | DBA | 

## Sumber daya terkait
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-resources"></a>
+ [Integrasi Amazon S3](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/oracle-s3-integration.html)
+ [Amazon S3](https://aws.amazon.com/s3/)
+ [Bekerja dengan Amazon Aurora PostgreSQL Edisi yang kompatibel](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/Aurora.AuroraPostgreSQL.html)
+ [AWS Lambda](https://aws.amazon.com/lambda/)
+ [Amazon CloudWatch](https://aws.amazon.com/cloudwatch/)
+ [AWS Secrets Manager](https://aws.amazon.com/secrets-manager/)
+ [Menyiapkan notifikasi Amazon SNS](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/US_SetupSNS.html)

## Informasi tambahan
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-additional"></a>

**ext\$1table\$1scripts**

```
CREATE EXTENSION aws_s3 CASCADE;
CREATE TABLE IF NOT EXISTS meta_EXTERNAL_TABLE
(
    table_name_stg character varying(100) ,
    table_name character varying(100)  ,
    col_list character varying(1000)  ,
    data_type character varying(100)  ,
    col_order numeric,
    start_pos numeric,
    end_pos numeric,
    no_position character varying(100)  ,
    date_mask character varying(100)  ,
    delimeter character(1)  ,
    directory character varying(100)  ,
    file_name character varying(100)  ,
    header_exist character varying(5)
);
CREATE TABLE IF NOT EXISTS ext_tbl_stg
(
    col1 text
);
CREATE TABLE IF NOT EXISTS error_table
(
    error_details text,
    file_name character varying(100),
    processed_time timestamp without time zone
);
CREATE TABLE IF NOT EXISTS log_table
(
    file_name character varying(50) COLLATE pg_catalog."default",
    processed_date timestamp without time zone,
    tot_rec_count numeric,
    proc_rec_count numeric,
    error_rec_count numeric
);
sample insert scripts of meta data:
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'source_filename', 'character varying', 2, 8, 27, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'record_type_identifier', 'character varying', 3, 28, 30, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'fad_code', 'numeric', 4, 31, 36, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'session_sequence_number', 'numeric', 5, 37, 42, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'transaction_sequence_number', 'numeric', 6, 43, 48, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
```

**s3bucketpolicy\$1untuk impor**

```
---Import role policy
--Create an IAM policy to allow, Get,  and list actions on S3 bucket
 {
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "s3import",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:s3:::s3importtest",
                "arn:aws:s3:::s3importtest/*"
            ]
        }
    ]
}
--Export Role policy
--Create an IAM policy to allow, put,  and list actions on S3 bucket
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "s3export",
            "Action": [
                "S3:PutObject",
                "s3:ListBucket"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:s3:::s3importtest/*"
            ]
        }
    ]
}
```

**Contoh fungsi DB load\$1external\$1tables\$1latest**

```
CREATE OR REPLACE FUNCTION public.load_external_tables(pi_filename text)
 RETURNS character varying
 LANGUAGE plpgsql
AS $function$
/* Loading data from S3 bucket into a APG table */
DECLARE
 v_final_sql TEXT;
 pi_ext_table TEXT;
 r refCURSOR;
 v_sqlerrm text;
 v_chunk numeric;
 i integer;
 v_col_list TEXT;
 v_postion_list CHARACTER VARYING(1000);
 v_len  integer;
 v_delim varchar;
 v_file_name CHARACTER VARYING(1000);
 v_directory CHARACTER VARYING(1000);
 v_table_name_stg CHARACTER VARYING(1000);
 v_sql_col TEXT;
 v_sql TEXT;
 v_sql1 TEXT;
 v_sql2 TEXT;
 v_sql3 TEXT;
 v_cnt integer;
 v_sql_dynamic TEXT;
 v_sql_ins TEXT;
 proc_rec_COUNT integer;
 error_rec_COUNT integer;
 tot_rec_COUNT integer;
 v_rec_val integer;
 rec record;
 v_col_cnt integer;
 kv record;
 v_val text;
 v_header text;
 j integer;
 ERCODE VARCHAR(5);
 v_region text;
 cr CURSOR FOR
 SELECT distinct DELIMETER,
   FILE_NAME,
   DIRECTORY
 FROM  meta_EXTERNAL_TABLE
 WHERE table_name = pi_ext_table
   AND DELIMETER IS NOT NULL;


 cr1 CURSOR FOR
   SELECT   col_list,
   data_type,
   start_pos,
   END_pos,
   concat_ws('',' ',TABLE_NAME_STG) as TABLE_NAME_STG,
   no_position,date_mask
 FROM  meta_EXTERNAL_TABLE
 WHERE table_name = pi_ext_table
 order by col_order asc;
cr2 cursor FOR
SELECT  distinct table_name,table_name_stg
   FROM  meta_EXTERNAL_TABLE
   WHERE upper(file_name) = upper(pi_filename);


BEGIN
 -- PERFORM utl_file_utility.init();
   v_region := 'us-east-1';
   /* find tab details from file name */


   --DELETE FROM  ERROR_TABLE WHERE file_name= pi_filename;
  -- DELETE FROM  log_table WHERE file_name= pi_filename;


 BEGIN


   SELECT distinct table_name,table_name_stg INTO strict pi_ext_table,v_table_name_stg
   FROM  meta_EXTERNAL_TABLE
   WHERE upper(file_name) = upper(pi_filename);
 EXCEPTION
   WHEN NO_DATA_FOUND THEN
    raise notice 'error 1,%',sqlerrm;
    pi_ext_table := null;
    v_table_name_stg := null;
      RAISE USING errcode = 'NTFIP' ;
    when others then
        raise notice 'error others,%',sqlerrm;
 END;
 j :=1 ;
  
for rec in  cr2
 LOOP




  pi_ext_table     := rec.table_name;
  v_table_name_stg := rec.table_name_stg;
  v_col_list := null;


 IF pi_ext_table IS NOT NULL
  THEN
    --EXECUTE concat_ws('','truncate table  ' ,pi_ext_table) ;
   EXECUTE concat_ws('','truncate table  ' ,v_table_name_stg) ;




       SELECT distinct DELIMETER INTO STRICT v_delim
       FROM  meta_EXTERNAL_TABLE
       WHERE table_name = pi_ext_table;


       IF v_delim IS NOT NULL THEN
     SELECT distinct DELIMETER,
       FILE_NAME,
       DIRECTORY ,
       concat_ws('',' ',table_name_stg),
       case  header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist
     INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header
     FROM  meta_EXTERNAL_TABLE
     WHERE table_name = pi_ext_table
       AND DELIMETER IS NOT NULL;


     IF    upper(v_delim) = 'CSV'
     THEN
       v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3 ( ''',
       v_table_name_stg,''','''',
       ''DELIMITER '''','''' CSV HEADER QUOTE ''''"'''''', aws_commons.create_s3_uri ( ''',
       v_directory,''',''',v_file_name,''', ''',v_region,'''))');
       ELSE
       v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',
           v_table_name_stg, ''','''', ''DELIMITER AS ''''^''''',''',','
          aws_commons.create_s3_uri
           ( ''',v_directory, ''',''',
           v_file_name, ''',',
            '''',v_region,''')
          )');
          raise notice 'v_sql , %',v_sql;
       begin
        EXECUTE  v_sql;
       EXCEPTION
         WHEN OTHERS THEN
           raise notice 'error 1';
         RAISE USING errcode = 'S3IMP' ;
       END;


       select count(col_list) INTO v_col_cnt
       from  meta_EXTERNAL_TABLE where table_name = pi_ext_table;






        -- raise notice 'v_sql 2, %',concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');


       execute concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');




       i :=1;
       FOR rec in cr1
       loop
       v_sql1 := concat_ws('',v_sql1,'split_part(col1,''',v_delim,''',', i,')',' as ',rec.col_list,',');
       v_sql2 := concat_ws('',v_sql2,rec.col_list,',');
   --    v_sql3 := concat_ws('',v_sql3,'rec.',rec.col_list,'::',rec.data_type,',');


       case
         WHEN upper(rec.data_type) = 'NUMERIC'
         THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
                THEN null
                 ELSE
                 coalesce((trim(split_part(col1,''',v_delim,''',', i,')))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;
         WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'
         THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
                THEN null
                 ELSE
                 to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');
         WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask =  'MM/DD/YYYY hh24:mi:ss'
         THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
                THEN null
                 ELSE
                 to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''01/01/9999 0024:00:00''),''MM/DD/YYYY hh24:mi:ss'')::',rec.data_type,' END as ',rec.col_list,',');
          ELSE
        v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
                THEN null
                 ELSE
                  coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;
       END case;


       i :=i+1;
       end loop;


         -- raise notice 'v_sql 3, %',v_sql3;


       SELECT trim(trailing ' ' FROM v_sql1) INTO v_sql1;
       SELECT trim(trailing ',' FROM v_sql1) INTO v_sql1;


       SELECT trim(trailing ' ' FROM v_sql2) INTO v_sql2;
       SELECT trim(trailing ',' FROM v_sql2) INTO v_sql2;


       SELECT trim(trailing ' ' FROM v_sql3) INTO v_sql3;
       SELECT trim(trailing ',' FROM v_sql3) INTO v_sql3;


       END IF;
      raise notice 'v_delim , %',v_delim;


     EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg)  INTO v_cnt;


    raise notice 'stg cnt , %',v_cnt;


    /* if upper(v_delim) = 'CSV' then
       v_sql_ins := concat_ws('', ' SELECT * from ' ,v_table_name_stg );
     else
      -- v_sql_ins := concat_ws('',' SELECT ',v_sql1,'  from (select col1 from ' ,v_table_name_stg , ')sub ');
       v_sql_ins := concat_ws('',' SELECT ',v_sql3,'  from (select col1 from ' ,v_table_name_stg , ')sub ');
       END IF;*/


v_chunk := v_cnt/100;




for i in 1..101
loop
     BEGIN
    -- raise notice 'v_sql , %',v_sql;
       -- raise notice 'Chunk number , %',i;
       v_sql_ins := concat_ws('',' SELECT ',v_sql3,'  from (select col1 from ' ,v_table_name_stg , ' offset ',v_chunk*(i-1), ' limit ',v_chunk,') sub ');


     v_sql := concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins);
     -- raise notice 'select statement , %',v_sql_ins;
          -- v_sql := null;
     -- EXECUTE concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins, 'offset ',v_chunk*(i-1), ' limit ',v_chunk );
     --v_sql := concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins );


     -- raise notice 'insert statement , %',v_sql;


    raise NOTICE 'CHUNK START %',v_chunk*(i-1);
   raise NOTICE 'CHUNK END %',v_chunk;


     EXECUTE v_sql;


  EXCEPTION
       WHEN OTHERS THEN
       -- v_sql_ins := concat_ws('',' SELECT ',v_sql1, '  from (select col1 from ' ,v_table_name_stg , ' )sub ');
         -- raise notice 'Chunk number for cursor , %',i;


    raise NOTICE 'Cursor - CHUNK START %',v_chunk*(i-1);
   raise NOTICE 'Cursor -  CHUNK END %',v_chunk;
         v_sql_ins := concat_ws('',' SELECT ',v_sql3, '  from (select col1 from ' ,v_table_name_stg , ' )sub ');


         v_final_sql := REPLACE (v_sql_ins, ''''::text, ''''''::text);
        -- raise notice 'v_final_sql %',v_final_sql;
         v_sql :=concat_ws('','do $a$ declare  r refcursor;v_sql text; i numeric;v_conname text;  v_typ  ',pi_ext_table,'[]; v_rec  ','record',';
           begin






           open r for execute ''select col1 from ',v_table_name_stg ,'  offset ',v_chunk*(i-1), ' limit ',v_chunk,''';
           loop
           begin
           fetch r into v_rec;
           EXIT WHEN NOT FOUND;




           v_sql := concat_ws('''',''insert into  ',pi_ext_table,' SELECT ',REPLACE (v_sql3, ''''::text, ''''''::text) , '  from ( select '''''',v_rec.col1,'''''' as col1) v'');
            execute v_sql;


           exception
            when others then
          v_sql := ''INSERT INTO  ERROR_TABLE VALUES (concat_ws('''''''',''''Error Name: '''',$$''||SQLERRM||''$$,''''Error State: '''',''''''||SQLSTATE||'''''',''''record : '''',$$''||v_rec.col1||''$$),'''''||pi_filename||''''',now())'';


               execute v_sql;
             continue;
           end ;
           end loop;
           close r;
           exception
           when others then
         raise;
           end ; $a$');
      -- raise notice ' inside excp v_sql %',v_sql;
          execute v_sql;
      --  raise notice 'v_sql %',v_sql;
       END;
  END LOOP;
     ELSE


     SELECT distinct DELIMETER,FILE_NAME,DIRECTORY ,concat_ws('',' ',table_name_stg),
       case  header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist
       INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header
     FROM  meta_EXTERNAL_TABLE
     WHERE table_name = pi_ext_table                  ;
     v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',
       v_table_name_stg, ''','''', ''DELIMITER AS ''''#'''' ',v_header,' '',','
      aws_commons.create_s3_uri
       ( ''',v_directory, ''',''',
       v_file_name, ''',',
        '''',v_region,''')
      )');
         EXECUTE  v_sql;


     FOR rec in cr1
     LOOP


      IF rec.start_pos IS NULL AND rec.END_pos IS NULL AND rec.no_position = 'recnum'
      THEN
        v_rec_val := 1;
      ELSE


       case
         WHEN upper(rec.data_type) = 'NUMERIC'
         THEN v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
                THEN null
                 ELSE
                 coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1)))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;
         WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'
         THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
                THEN null
                 ELSE
                 to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');
         WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDDHH24MISS'
         THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
                THEN null
                 ELSE
                 to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''9999010100240000''),''YYYYMMDDHH24MISS'')::',rec.data_type,' END as ',rec.col_list,',');
          ELSE
        v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
                THEN null
                 ELSE
                  coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;
       END case;


      END IF;
      v_col_list := concat_ws('',v_col_list ,v_sql1);
     END LOOP;




           SELECT trim(trailing ' ' FROM v_col_list) INTO v_col_list;
           SELECT trim(trailing ',' FROM v_col_list) INTO v_col_list;


           v_sql_col   :=  concat_ws('',trim(trailing ',' FROM v_col_list) , ' FROM  ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 ');




           v_sql_dynamic := v_sql_col;


           EXECUTE  concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO v_cnt;




         IF v_rec_val = 1 THEN
             v_sql_ins := concat_ws('',' select row_number() over(order by ctid) as line_number ,' ,v_sql_dynamic) ;


         ELSE
               v_sql_ins := concat_ws('',' SELECT' ,v_sql_dynamic) ;
           END IF;


     BEGIN
       EXECUTE concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins);
           EXCEPTION
              WHEN OTHERS THEN
          IF v_rec_val = 1 THEN
                  v_final_sql := ' select row_number() over(order by ctid) as line_number ,col1 from ';
                ELSE
                 v_final_sql := ' SELECT col1 from';
               END IF;
       v_sql :=concat_ws('','do $a$ declare  r refcursor;v_rec_val numeric := ',coalesce(v_rec_val,0),';line_number numeric; col1 text; v_typ  ',pi_ext_table,'[]; v_rec  ',pi_ext_table,';
             begin
             open r for execute ''',v_final_sql, ' ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 '' ;
             loop
             begin
             if   v_rec_val = 1 then
             fetch r into line_number,col1;
             else
             fetch r into col1;
             end if;


             EXIT WHEN NOT FOUND;
              if v_rec_val = 1 then
              select line_number,',trim(trailing ',' FROM v_col_list) ,' into v_rec;
              else
                select ',trim(trailing ',' FROM v_col_list) ,' into v_rec;
              end if;


             insert into  ',pi_ext_table,' select v_rec.*;
              exception
              when others then
               INSERT INTO  ERROR_TABLE VALUES (concat_ws('''',''Error Name: '',SQLERRM,''Error State: '',SQLSTATE,''record : '',v_rec),''',pi_filename,''',now());
               continue;
              end ;
               end loop;
             close r;
              exception
              when others then
              raise;
              end ; $a$');
         execute v_sql;


     END;


         END IF;


   EXECUTE concat_ws('','SELECT COUNT(*) FROM  ' ,pi_ext_table)   INTO proc_rec_COUNT;


   EXECUTE concat_ws('','SELECT COUNT(*) FROM  error_table WHERE file_name =''',pi_filename,''' and processed_time::date = clock_timestamp()::date')  INTO error_rec_COUNT;


   EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg)   INTO tot_rec_COUNT;


   INSERT INTO  log_table values(pi_filename,now(),tot_rec_COUNT,proc_rec_COUNT, error_rec_COUNT);


   raise notice 'v_directory, %',v_directory;


   raise notice 'pi_filename, %',pi_filename;


   raise notice 'v_region, %',v_region;


  perform aws_s3.query_export_to_s3('SELECT replace(trim(substring(error_details,position(''('' in error_details)+1),'')''),'','','';''),file_name,processed_time FROM  error_table WHERE file_name = '''||pi_filename||'''',
   aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),
   options :='FORmat csv, header, delimiter $$,$$'
   );


raise notice 'v_directory, %',v_directory;


   raise notice 'pi_filename, %',pi_filename;


   raise notice 'v_region, %',v_region;


  perform aws_s3.query_export_to_s3('SELECT * FROM  log_table WHERE file_name = '''||pi_filename||'''',
   aws_commons.create_s3_uri(v_directory, pi_filename||'.log', v_region),
   options :='FORmat csv, header, delimiter $$,$$'
   );




   END IF;
 j := j+1;
 END LOOP;


       RETURN 'OK';
EXCEPTION
    WHEN  OTHERS THEN
  raise notice 'error %',sqlerrm;
   ERCODE=SQLSTATE;
   IF ERCODE = 'NTFIP' THEN
     v_sqlerrm := concat_Ws('',sqlerrm,'No data for the filename');
   ELSIF ERCODE = 'S3IMP' THEN
    v_sqlerrm := concat_Ws('',sqlerrm,'Error While exporting the file from S3');
   ELSE
      v_sqlerrm := sqlerrm;
   END IF;


 select distinct directory into v_directory from  meta_EXTERNAL_TABLE;




 raise notice 'exc v_directory, %',v_directory;


   raise notice 'exc pi_filename, %',pi_filename;


   raise notice 'exc v_region, %',v_region;


  perform aws_s3.query_export_to_s3('SELECT * FROM  error_table WHERE file_name = '''||pi_filename||'''',
   aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),
   options :='FORmat csv, header, delimiter $$,$$'
   );
    RETURN null;
END;
$function$
```