Gunakan SageMaker Processing untuk rekayasa fitur terdistribusi dari kumpulan data ML skala terabyte - AWS Prescriptive Guidance

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Gunakan SageMaker Processing untuk rekayasa fitur terdistribusi dari kumpulan data ML skala terabyte

Chris Boomhower, Amazon Web Services

Ringkasan

Banyak kumpulan data skala terabyte atau lebih besar sering terdiri dari struktur folder hierarkis, dan file dalam kumpulan data terkadang berbagi saling ketergantungan. Untuk alasan ini, insinyur pembelajaran mesin (ML) dan ilmuwan data harus membuat keputusan desain yang bijaksana untuk menyiapkan data tersebut untuk pelatihan model dan inferensi. Pola ini menunjukkan bagaimana Anda dapat menggunakan teknik macrosharding dan microsharding manual dalam kombinasi dengan Amazon SageMaker Processing dan paralelisasi CPU virtual (vCPU) untuk menskalakan proses rekayasa fitur secara efisien untuk kumpulan data BIG data MLyang rumit. 

Pola ini mendefinisikan macrosharding sebagai pemisahan direktori data di beberapa mesin untuk diproses, dan microsharding sebagai pemisahan data pada setiap mesin di beberapa thread pemrosesan. Pola menunjukkan teknik ini dengan menggunakan Amazon SageMaker dengan contoh catatan bentuk gelombang deret waktu dari kumpulan data MIMIC-III. PhysioNet Dengan menerapkan teknik dalam pola ini, Anda dapat meminimalkan waktu pemrosesan dan biaya untuk rekayasa fitur sambil memaksimalkan pemanfaatan sumber daya dan efisiensi throughput. Pengoptimalan ini bergantung pada SageMaker Pemrosesan terdistribusi pada instans Amazon Elastic Compute Cloud (Amazon EC2) dan v CPUs untuk kumpulan data besar yang serupa, terlepas dari tipe datanya.

Prasyarat dan batasan

Prasyarat

  • Akses ke instance SageMaker notebook atau SageMaker Studio, jika Anda ingin menerapkan pola ini untuk kumpulan data Anda sendiri. Jika Anda menggunakan Amazon SageMaker untuk pertama kalinya, lihat Memulai Amazon SageMaker di dokumentasi AWS.

  • SageMaker Studio, jika Anda ingin menerapkan pola ini dengan data sampel PhysioNet MIMIC-III

  • Pola menggunakan SageMaker Processing, tetapi tidak memerlukan pengalaman menjalankan pekerjaan SageMaker Processing.

Batasan

  • Pola ini sangat cocok untuk kumpulan data ML yang menyertakan file yang saling bergantung. Interdependensi ini paling diuntungkan dari macrosharding manual dan menjalankan beberapa pekerjaan Pemrosesan instance SageMaker tunggal secara paralel. Untuk kumpulan data di mana saling ketergantungan seperti itu tidak ada, ShardedByS3Key fitur dalam SageMaker Processing mungkin merupakan alternatif yang lebih baik untuk macrosharding, karena mengirimkan data sharded ke beberapa instance yang dikelola oleh pekerjaan Pemrosesan yang sama. Namun, Anda dapat menerapkan strategi microsharding pola ini di kedua skenario untuk memanfaatkan instance v. CPUs

Versi produk

  • Amazon SageMaker Python SDK versi 2

Arsitektur

Tumpukan teknologi target

  • Amazon Simple Storage Service (Amazon S3)

  • Amazon SageMaker

Arsitektur target

Macrosharding dan instance terdistribusi EC2

10 proses paralel yang diwakili dalam arsitektur ini mencerminkan struktur dataset MIMIC-III. (Proses diwakili oleh elips untuk penyederhanaan diagram.) Arsitektur serupa berlaku untuk kumpulan data apa pun saat Anda menggunakan macrosharding manual. Dalam kasus MIMIC-III, Anda dapat menggunakan struktur mentah kumpulan data untuk keuntungan Anda dengan memproses setiap folder grup pasien secara terpisah, dengan sedikit usaha. Dalam diagram berikut, blok grup rekaman muncul di sebelah kiri (1). Mengingat sifat data yang terdistribusi, masuk akal untuk membelah oleh kelompok pasien.

Arsitektur untuk microsharding dan instance terdistribusi EC2

Namun, sharding secara manual oleh kelompok pasien berarti bahwa pekerjaan Pemrosesan terpisah diperlukan untuk setiap folder grup pasien, seperti yang Anda lihat di bagian tengah diagram (2), alih-alih satu pekerjaan Pemrosesan dengan beberapa EC2 instance. Karena data MIMIC-III mencakup file bentuk gelombang biner dan file header berbasis teks yang cocok, dan ada ketergantungan yang diperlukan pada pustaka wfdb untuk ekstraksi data biner, semua catatan untuk pasien tertentu harus tersedia pada contoh yang sama. Satu-satunya cara untuk memastikan bahwa setiap file header terkait bentuk gelombang biner juga ada adalah dengan menerapkan sharding manual untuk menjalankan setiap pecahan dalam pekerjaan Processing-nya sendiri, dan untuk menentukan s3_data_distribution_type='FullyReplicated' kapan Anda menentukan input pekerjaan Pemrosesan. Atau, jika semua data tersedia dalam satu direktori dan tidak ada dependensi di antara file, opsi yang lebih cocok mungkin meluncurkan pekerjaan Pemrosesan tunggal dengan beberapa EC2 instance dan ditentukan. s3_data_distribution_type='ShardedByS3Key' Menentukan ShardedByS3Key  sebagai tipe distribusi data Amazon S3 SageMaker mengarahkan untuk mengelola pembagian data secara otomatis di seluruh instance. 

Meluncurkan pekerjaan Pemrosesan untuk setiap folder adalah cara hemat biaya untuk memproses data sebelumnya, karena menjalankan beberapa instance secara bersamaan menghemat waktu. Untuk penghematan biaya dan waktu tambahan, Anda dapat menggunakan microsharding dalam setiap pekerjaan Pemrosesan. 

Microsharding dan parallel v CPUs

Dalam setiap pekerjaan Pemrosesan, data yang dikelompokkan dibagi lagi untuk memaksimalkan penggunaan semua v yang tersedia CPUs pada EC2 instance yang dikelola SageMaker sepenuhnya. Blok di bagian tengah diagram (2) menggambarkan apa yang terjadi dalam setiap pekerjaan Pemrosesan utama. Isi folder catatan pasien diratakan dan dibagi rata berdasarkan jumlah v yang tersedia CPUs pada instance. Setelah isi folder dibagi, kumpulan file berukuran merata didistribusikan di semua v CPUs untuk diproses. Saat pemrosesan selesai, hasil dari setiap vCPU digabungkan menjadi satu file data untuk setiap pekerjaan Pemrosesan. 

Dalam kode terlampir, konsep-konsep ini diwakili di bagian src/feature-engineering-pass1/preprocessing.py file berikut.

def chunks(lst, n):     """     Yield successive n-sized chunks from lst.          :param lst: list of elements to be divided     :param n: number of elements per chunk     :type lst: list     :type n: int     :return: generator comprising evenly sized chunks     :rtype: class 'generator'     """     for i in range(0, len(lst), n):         yield lst[i:i + n]     # Generate list of data files on machine data_dir = input_dir d_subs = next(os.walk(os.path.join(data_dir, '.')))[1] file_list = [] for ds in d_subs:     file_list.extend(os.listdir(os.path.join(data_dir, ds, '.'))) dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat']   # Split list of files into sub-lists cpu_count = multiprocessing.cpu_count() splits = int(len(dat_list) / cpu_count) if splits == 0: splits = 1 dat_chunks = list(chunks(dat_list, splits))   # Parallelize processing of sub-lists across CPUs ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks)   # Compile and pickle patient group dataframe ws_df_group = pd.concat(ws_df_list) ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'}) ws_df_group.to_json(os.path.join(output_dir, group_data_out))

Sebuah fungsichunks, pertama kali didefinisikan untuk mengkonsumsi daftar yang diberikan dengan membaginya menjadi potongan-potongan panjang yang berukuran sama dan dengan mengembalikan hasil ini sebagai generator. Selanjutnya, data diratakan di seluruh folder pasien dengan menyusun daftar semua file bentuk gelombang biner yang ada. Setelah ini selesai, jumlah v CPUs yang tersedia pada EC2 instance diperoleh. Daftar file bentuk gelombang biner dibagi secara merata di v ini CPUs dengan memanggilchunks, dan kemudian setiap sublis bentuk gelombang diproses pada vCPU-nya sendiri dengan menggunakan kelas Paralel joblib. Hasil secara otomatis digabungkan ke dalam satu daftar kerangka data oleh pekerjaan Pemrosesan, yang SageMaker kemudian diproses lebih lanjut sebelum menuliskannya ke Amazon S3 setelah pekerjaan selesai. Dalam contoh ini, ada 10 file yang ditulis ke Amazon S3 oleh pekerjaan Processing (satu untuk setiap pekerjaan).

Ketika semua pekerjaan Pemrosesan awal selesai, pekerjaan Pemrosesan sekunder, yang ditampilkan di blok di sebelah kanan diagram (3) menggabungkan file output yang dihasilkan oleh setiap pekerjaan Pemrosesan utama dan menulis output gabungan ke Amazon S3 (4).

Alat

Alat

  • Python - Contoh kode yang digunakan untuk pola ini adalah Python (versi 3).

  • SageMaker Studio — Amazon SageMaker Studio adalah lingkungan pengembangan terintegrasi (IDE) berbasis web untuk pembelajaran mesin yang memungkinkan Anda membuat, melatih, men-debug, menerapkan, dan memantau model pembelajaran mesin Anda. Anda menjalankan pekerjaan SageMaker Pemrosesan dengan menggunakan notebook Jupyter di dalam Studio. SageMaker

  • SageMaker Pemrosesan - Amazon SageMaker Processing menyediakan cara yang disederhanakan untuk menjalankan beban kerja pemrosesan data Anda. Dalam pola ini, kode rekayasa fitur diimplementasikan dalam skala besar dengan menggunakan pekerjaan SageMaker Pemrosesan.

Kode

File zip terlampir menyediakan kode lengkap untuk pola ini. Bagian berikut menjelaskan langkah-langkah untuk membangun arsitektur untuk pola ini. Setiap langkah diilustrasikan dengan kode sampel dari lampiran.

Epik

TugasDeskripsiKeterampilan yang dibutuhkan
Akses Amazon SageMaker Studio.

Onboard ke SageMaker Studio di akun AWS Anda dengan mengikuti petunjuk yang disediakan dalam SageMaker dokumentasi Amazon.

Ilmuwan data, insinyur ML
Instal utilitas wget.

Instal wget jika Anda onboard dengan konfigurasi SageMaker Studio baru atau jika Anda belum pernah menggunakan utilitas ini di SageMaker Studio sebelumnya. 

Untuk menginstal, buka jendela terminal di konsol SageMaker Studio dan jalankan perintah berikut:

sudo yum install wget
Ilmuwan data, insinyur ML
Unduh dan unzip kode sampel.

Unduh attachments.zip file di bagian Lampiran. Di jendela terminal, arahkan ke folder tempat Anda mengunduh file dan ekstrak isinya:

unzip attachment.zip

Arahkan ke folder tempat Anda mengekstrak file.zip, dan ekstrak konten file. Scaled-Processing.zip

unzip Scaled-Processing.zip
Ilmuwan data, insinyur ML
Unduh kumpulan data sampel dari physionet.org dan unggah ke Amazon S3.

Jalankan get_data.ipynb notebook Jupyter di dalam folder yang berisi file. Scaled-Processing Notebook ini mengunduh contoh kumpulan data MIMIC-III dari physionet.org dan mengunggahnya ke bucket sesi Studio Anda di Amazon S3. SageMaker

Ilmuwan data, insinyur ML
TugasDeskripsiKeterampilan yang dibutuhkan
Ratakan hierarki file di semua subdirektori.

Dalam kumpulan data besar seperti MIMIC-III, file sering didistribusikan di beberapa subdirektori bahkan dalam kelompok induk logis. Skrip Anda harus dikonfigurasi untuk meratakan semua file grup di semua subdirektori, seperti yang ditunjukkan oleh kode berikut.

# Generate list of .dat files on machine data_dir = input_dir d_subs = next(os.walk(os.path.join(data_dir, '.')))[1] file_list = [] for ds in d_subs:     file_list.extend(os.listdir(os.path.join(data_dir, ds, '.'))) dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat']
catatan

    Contoh cuplikan kode dalam epik ini berasal dari src/feature-engineering-pass1/preprocessing.py file, yang disediakan dalam lampiran.

Ilmuwan data, insinyur ML
Bagilah file menjadi subkelompok berdasarkan jumlah vCPU.

File harus dibagi menjadi subkelompok berukuran merata, atau potongan, tergantung pada jumlah v yang CPUs ada pada instance yang menjalankan skrip. Untuk langkah ini, Anda dapat menerapkan kode yang mirip dengan berikut ini.

# Split list of files into sub-lists cpu_count = multiprocessing.cpu_count() splits = int(len(dat_list) / cpu_count) if splits == 0: splits = 1 dat_chunks = list(chunks(dat_list, splits))
Ilmuwan data, insinyur ML
Paralelisasi pemrosesan subkelompok di seluruh v. CPUs

Logika skrip harus dikonfigurasi untuk memproses semua subkelompok secara paralel. Untuk melakukan ini, gunakan Parallel  kelas dan delayed  metode perpustakaan Joblib sebagai berikut. 

# Parallelize processing of sub-lists across CPUs ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks)
Ilmuwan data, insinyur ML
Simpan output grup file tunggal ke Amazon S3.

Ketika pemrosesan vCPU paralel selesai, hasil dari setiap vCPU harus digabungkan dan diunggah ke jalur bucket S3 grup file. Untuk langkah ini, Anda dapat menggunakan kode yang mirip dengan berikut ini.

# Compile and pickle patient group dataframe ws_df_group = pd.concat(ws_df_list) ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'}) ws_df_group.to_json(os.path.join(output_dir, group_data_out))
Ilmuwan data, insinyur ML
TugasDeskripsiKeterampilan yang dibutuhkan
Gabungkan file data yang dihasilkan di semua pekerjaan Pemrosesan yang menjalankan skrip pertama.

Script sebelumnya mengeluarkan satu file untuk setiap pekerjaan SageMaker Processing yang memproses sekelompok file dari dataset.  Selanjutnya, Anda perlu menggabungkan file output ini menjadi satu objek dan menulis satu set data output ke Amazon S3. Ini ditunjukkan dalam src/feature-engineering-pass1p5/preprocessing.py file, yang disediakan dalam lampiran, sebagai berikut.

def write_parquet(wavs_df, path):     """     Write waveform summary dataframe to S3 in parquet format.          :param wavs_df: waveform summary dataframe     :param path: S3 directory prefix     :type wavs_df: pandas dataframe     :type path: str     :return: None     """     extra_args = {"ServerSideEncryption": "aws:kms"}     wr.s3.to_parquet(         df=wavs_df,         path=path,         compression='snappy',         s3_additional_kwargs=extra_args)     def combine_data():     """     Get combined data and write to parquet.          :return: waveform summary dataframe     :rtype: pandas dataframe     """     wavs_df = get_data()     wavs_df = normalize_signal_names(wavs_df)     write_parquet(wavs_df, "s3://{}/{}/{}".format(bucket_xform, dataset_prefix, pass1p5out_data))       return wavs_df     wavs_df = combine_data()
Ilmuwan data, insinyur ML
TugasDeskripsiKeterampilan yang dibutuhkan
Jalankan pekerjaan Processing pertama.

Untuk melakukan macrosharding, jalankan pekerjaan Pemrosesan terpisah untuk setiap grup file. Microsharding dilakukan di dalam setiap pekerjaan Pemrosesan, karena setiap pekerjaan menjalankan skrip pertama Anda. Kode berikut menunjukkan cara meluncurkan pekerjaan Processing untuk setiap direktori grup file dalam cuplikan berikut (termasuk dalam). notebooks/FeatExtract_Pass1.ipynb

pat_groups = list(range(30,40)) ts = str(int(time.time()))   for group in pat_groups:     sklearn_processor = SKLearnProcessor(framework_version='0.20.0',                                      role=role,                                      instance_type='ml.m5.4xlarge',                                      instance_count=1,                                      volume_size_in_gb=5)     sklearn_processor.run(         code='../src/feature-engineering-pass1/preprocessing.py',         job_name='-'.join(['scaled-processing-p1', str(group), ts]),         arguments=[             "input_path", "/opt/ml/processing/input",             "output_path", "/opt/ml/processing/output",             "group_data_out", "ws_df_group.json"         ],         inputs=         [             ProcessingInput(                 source=f's3://{sess.default_bucket()}/data_inputs/{group}',                 destination='/opt/ml/processing/input',                 s3_data_distribution_type='FullyReplicated'             )         ],         outputs=         [             ProcessingOutput(                 source='/opt/ml/processing/output',                 destination=f's3://{sess.default_bucket()}/data_outputs/{group}'             )         ],         wait=False     )
Ilmuwan data, insinyur ML
Jalankan pekerjaan Pemrosesan kedua.

Untuk menggabungkan output yang dihasilkan oleh set pertama pekerjaan pemrosesan dan melakukan perhitungan tambahan untuk preprocessing, Anda menjalankan skrip kedua Anda dengan menggunakan satu pekerjaan Processing. SageMaker Kode berikut menunjukkan ini (termasuk dalamnotebooks/FeatExtract_Pass1p5.ipynb).

ts = str(int(time.time())) bucket = sess.default_bucket()       sklearn_processor = SKLearnProcessor(framework_version='0.20.0',                                  role=role,                                  instance_type='ml.t3.2xlarge',                                  instance_count=1,                                  volume_size_in_gb=5) sklearn_processor.run(     code='../src/feature-engineering-pass1p5/preprocessing.py',     job_name='-'.join(['scaled-processing', 'p1p5', ts]),     arguments=['bucket', bucket,                'pass1out_prefix', 'data_outputs',                'pass1out_data', 'ws_df_group.json',                'pass1p5out_data', 'waveform_summary.parquet',                'statsdata_name', 'signal_stats.csv'],     wait=True )
Ilmuwan data, insinyur ML

Sumber daya terkait

Lampiran

Untuk mengakses konten tambahan yang terkait dengan dokumen ini, unzip file berikut: attachment.zip