

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Pemrosesan Fitur
<a name="feature-store-feature-processing"></a>

Amazon SageMaker Feature Store Feature Processing adalah kemampuan yang dapat digunakan untuk mengubah data mentah menjadi fitur machine learning (ML). Ini memberi Anda SDK Prosesor Fitur yang dengannya Anda dapat mengubah dan menyerap data dari sumber data batch ke dalam grup fitur Anda. Dengan kemampuan ini, Feature Store menangani infrastruktur yang mendasarinya termasuk menyediakan lingkungan komputasi dan membuat serta memelihara Pipelines untuk memuat dan menyerap data. Dengan cara ini Anda dapat fokus pada definisi prosesor fitur Anda yang mencakup fungsi transformasi (misalnya, jumlah tampilan produk, rata-rata nilai transaksi), sumber (tempat menerapkan transformasi ini), dan sink (tempat menulis nilai fitur yang dihitung ke).

Fitur Pipa prosesor adalah pipa Pipelines. Sebagai Pipelines, Anda juga dapat melacak pipeline Prosesor Fitur terjadwal dengan garis keturunan SageMaker AI di konsol. Untuk informasi selengkapnya tentang SageMaker AI Lineage, lihat [Pelacakan SageMaker Silsilah Amazon](lineage-tracking.md) Ini termasuk melacak eksekusi terjadwal, memvisualisasikan garis keturunan untuk melacak fitur kembali ke sumber data mereka, dan melihat prosesor fitur bersama dalam satu lingkungan. Untuk informasi tentang penggunaan Feature Store dengan konsol, lihat[Lihat eksekusi pipeline dari konsol](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-executions-studio).

**Topics**
+ [SDK Prosesor Fitur Toko Fitur](feature-store-feature-processor-sdk.md)
+ [Menjalankan Prosesor Fitur Toko Fitur dari jarak jauh](feature-store-feature-processor-execute-remotely.md)
+ [Membuat dan menjalankan saluran pipa Prosesor Fitur Toko Fitur](feature-store-feature-processor-create-execute-pipeline.md)
+ [Eksekusi terjadwal dan berbasis acara untuk pipeline Prosesor Fitur](feature-store-feature-processor-schedule-pipeline.md)
+ [Pantau SageMaker Pipeline Prosesor Fitur Fitur Amazon Feature Store](feature-store-feature-processor-monitor-pipeline.md)
+ [Izin IAM dan peran eksekusi](feature-store-feature-processor-iam-permissions.md)
+ [Fitur Pembatasan, batas, dan kuota prosesor](feature-store-feature-processor-quotas.md)
+ [Sumber data](feature-store-feature-processor-data-sources.md)
+ [Contoh kode Pemrosesan Fitur untuk kasus penggunaan umum](feature-store-feature-processor-examples.md)

# SDK Prosesor Fitur Toko Fitur
<a name="feature-store-feature-processor-sdk"></a>

Deklarasikan definisi Prosesor Fitur Toko Fitur dengan mendekorasi fungsi transformasi Anda dengan dekorator. `@feature_processor` SageMaker AI SDK for Python (Boto3) secara otomatis memuat data dari sumber data input yang dikonfigurasi, menerapkan fungsi transformasi yang didekorasi, dan kemudian menyerap data yang diubah ke grup fitur target. Fungsi transformasi yang didekorasi harus sesuai dengan tanda tangan yang diharapkan dari `@feature_processor` dekorator. Untuk informasi selengkapnya tentang `@feature_processor` dekorator, lihat [@feature\$1processor Decorator](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-decorator) di Amazon SageMaker Feature Store Read the Docs. 

Dengan `@feature_processor` dekorator, fungsi transformasi Anda berjalan di lingkungan runtime Spark di mana argumen input yang diberikan ke fungsi Anda dan nilai pengembaliannya adalah Spark. DataFrames Jumlah parameter input dalam fungsi transformasi Anda harus sesuai dengan jumlah input yang dikonfigurasi di `@feature_processor` dekorator. 

Untuk informasi selengkapnya tentang `@feature_processor` dekorator, lihat [Feature Processor Feature Store SDK for Python (Boto3)](https://github.com/aws/sagemaker-python-sdk/tree/master/src/sagemaker/feature_store/feature_processor).

Kode berikut adalah contoh dasar tentang cara menggunakan `@feature_processor` dekorator. Untuk contoh kasus penggunaan yang lebih spesifik, lihat[Contoh kode Pemrosesan Fitur untuk kasus penggunaan umum](feature-store-feature-processor-examples.md).

Feature Processor SDK dapat diinstal dari SageMaker Python SDK dan tambahannya menggunakan perintah berikut. 

```
pip install sagemaker[feature-processor]
```

Dalam contoh berikut, `us-east-1` adalah wilayah sumber daya, `111122223333` adalah ID akun pemilik sumber daya, dan `your-feature-group-name` merupakan nama grup fitur.

Berikut ini adalah definisi prosesor fitur dasar, di mana `@feature_processor` dekorator mengonfigurasi input CSV dari Amazon S3 untuk dimuat dan disediakan ke fungsi transformasi Anda (misalnya,`transform`), dan menyiapkannya untuk dikonsumsi ke grup fitur. Baris terakhir menjalankannya.

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df):
   return csv_input_df
   
transform()
```

`@feature_processor`Parameternya meliputi:
+ `inputs`(List [str]): Daftar sumber data yang digunakan dalam Prosesor Fitur Toko Fitur Anda. Jika sumber data Anda adalah grup fitur atau disimpan di Amazon S3, Anda mungkin dapat menggunakan definisi sumber data yang disediakan Toko Fitur untuk pemroses fitur. Untuk daftar lengkap definisi sumber data yang disediakan Toko Fitur, lihat [Sumber Data Prosesor SageMaker Fitur](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-data-source) di Amazon Feature Store Baca Dokumen.
+ `output`(str): ARN dari grup fitur untuk menelan output dari fungsi yang didekorasi.
+ `target_stores`(Opsional [List [str]]): Daftar toko (misalnya, `OnlineStore` atau`OfflineStore`) untuk dicerna ke output. Jika tidak ditentukan, data dicerna ke semua penyimpanan yang diaktifkan grup fitur keluaran.
+ `parameters`(Dict [str, Any]): Kamus yang akan disediakan untuk fungsi transformasi Anda. 
+ `enable_ingestion`(bool): Bendera untuk menunjukkan apakah output fungsi transformasi dicerna ke grup fitur keluaran. Bendera ini berguna selama fase pengembangan. Jika tidak ditentukan, konsumsi diaktifkan.

Parameter fungsi dibungkus opsional (disediakan sebagai argumen jika disediakan dalam tanda tangan fungsi) meliputi:
+ `params`(Dict [str, Any]): Kamus didefinisikan dalam parameter. `@feature_processor` Ini juga berisi parameter yang dikonfigurasi sistem yang dapat direferensikan dengan kunci`system`, seperti `scheduled_time` parameter.
+ `spark`(SparkSession): Referensi ke SparkSession instance yang diinisialisasi untuk Aplikasi Spark.

Kode berikut adalah contoh penggunaan `spark` parameter `params` dan.

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name' 

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df, params, spark):
   
   scheduled_time = params['system']['scheduled_time']
   csv_input_df.createOrReplaceTempView('csv_input_df')
   return spark.sql(f'''
        SELECT *
        FROM csv_input_df
        WHERE date_add(event_time, 1) >= {scheduled_time}
   ''')
   
transform()
```

Parameter `scheduled_time` sistem (disediakan dalam `params` argumen untuk fungsi Anda) adalah nilai penting untuk mendukung percobaan ulang setiap eksekusi. Nilai dapat membantu mengidentifikasi eksekusi Prosesor Fitur secara unik dan dapat digunakan sebagai titik referensi untuk input berbasis daterange (misalnya, hanya memuat data 24 jam terakhir) untuk menjamin rentang input yang independen dari waktu eksekusi aktual kode. Jika Prosesor Fitur berjalan sesuai jadwal (lihat[Eksekusi terjadwal dan berbasis acara untuk pipeline Prosesor Fitur](feature-store-feature-processor-schedule-pipeline.md)) maka nilainya ditetapkan ke waktu yang dijadwalkan untuk dijalankan. Argumen dapat diganti selama eksekusi sinkron menggunakan API eksekusi SDK untuk mendukung kasus penggunaan seperti pengisian ulang data atau menjalankan kembali eksekusi masa lalu yang tidak terjawab. Nilainya adalah waktu saat ini jika Prosesor Fitur berjalan dengan cara lain.

Untuk informasi tentang penulisan kode Spark, lihat Panduan Pemrograman [SQL Spark](https://spark.apache.org/docs/latest/sql-programming-guide.html).

Untuk contoh kode lainnya untuk kasus penggunaan umum, lihat. [Contoh kode Pemrosesan Fitur untuk kasus penggunaan umum](feature-store-feature-processor-examples.md) 

Perhatikan bahwa fungsi transformasi yang didekorasi dengan `@feature_processor` tidak mengembalikan nilai. Untuk menguji fungsi Anda secara terprogram, Anda dapat menghapus atau menambal `@feature_processor` dekorator sedemikian rupa sehingga berfungsi sebagai pass-through ke fungsi yang dibungkus. Untuk detail selengkapnya tentang `@feature_processor` dekorator, lihat [Amazon SageMaker Feature Store Python SDK](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_featurestore.html). 

# Menjalankan Prosesor Fitur Toko Fitur dari jarak jauh
<a name="feature-store-feature-processor-execute-remotely"></a>

Untuk menjalankan Prosesor Fitur Anda pada kumpulan data besar yang membutuhkan perangkat keras yang lebih kuat daripada yang tersedia secara lokal, Anda dapat menghias kode Anda dengan `@remote` dekorator untuk menjalankan kode Python lokal Anda sebagai pekerjaan pelatihan terdistribusi tunggal atau multi-node. SageMaker Untuk informasi selengkapnya tentang menjalankan kode Anda sebagai pekerjaan SageMaker pelatihan, lihat[Jalankan kode lokal Anda sebagai pekerjaan SageMaker pelatihan](train-remote-decorator.md). 

Berikut ini adalah contoh penggunaan `@remote` dekorator bersama dengan `@feature_processor` dekorator.

```
from sagemaker.remote_function.spark_config import SparkConfig
from sagemaker.remote_function import remote
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:123456789012:feature-group/feature-group'

@remote(
    spark_config=SparkConfig(), 
    instance_type="ml.m5.2xlarge",
    dependencies="/local/requirements.txt"
)
@feature_processor(
    inputs=[CSV_DATA_SOURCE], 
    output=OUTPUT_FG,
)
def transform(csv_input_df):
   return csv_input_df
   
transform()
```

`spark_config`Parameter menunjukkan bahwa pekerjaan jarak jauh berjalan sebagai aplikasi Spark. `SparkConfig`Instance ini dapat digunakan untuk mengkonfigurasi Konfigurasi Spark dan memberikan dependensi tambahan ke aplikasi Spark seperti file Python,, dan file. JARs

Untuk iterasi yang lebih cepat saat mengembangkan kode pemrosesan fitur, Anda dapat menentukan `keep_alive_period_in_seconds` argumen di `@remote` dekorator untuk mempertahankan sumber daya yang dikonfigurasi di kolam hangat untuk pekerjaan pelatihan berikutnya. Untuk informasi selengkapnya tentang kolam hangat, lihat `[KeepAlivePeriodInSeconds](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ResourceConfig.html#sagemaker-Type-ResourceConfig-KeepAlivePeriodInSeconds)` di panduan Referensi API.

Kode berikut adalah contoh lokal `requirements.txt:`

```
sagemaker>=2.167.0
```

Ini akan menginstal versi SageMaker SDK yang sesuai dalam pekerjaan jarak jauh yang diperlukan untuk mengeksekusi metode yang dijelaskan oleh. `@feature-processor` 

# Membuat dan menjalankan saluran pipa Prosesor Fitur Toko Fitur
<a name="feature-store-feature-processor-create-execute-pipeline"></a>

Feature Processor SDK menyediakan APIs untuk mempromosikan Definisi Prosesor Fitur Anda ke dalam Pipeline SageMaker AI yang dikelola sepenuhnya. Untuk informasi lebih lanjut tentang Pipelines, lihat[Ikhtisar jaringan pipa](pipelines-overview.md). Untuk mengonversi Definisi Prosesor Fitur menjadi Pipeline SageMaker AI, gunakan `to_pipeline` API dengan definisi Prosesor Fitur Anda. Anda dapat menjadwalkan eksekusi Definisi Prosesor Fitur dapat dijadwalkan, memantau secara operasional dengan CloudWatch metrik, dan mengintegrasikannya EventBridge untuk bertindak sebagai sumber acara atau pelanggan. Untuk informasi selengkapnya tentang pemantauan jaringan pipa yang dibuat dengan Pipelines, lihat. [Pantau SageMaker Pipeline Prosesor Fitur Fitur Amazon Feature Store](feature-store-feature-processor-monitor-pipeline.md)

Untuk melihat pipeline Prosesor Fitur, lihat[Lihat eksekusi pipeline dari konsol](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-executions-studio).

Jika fungsi Anda juga dihiasi dengan `@remote` dekorator, maka konfigurasinya dibawa ke pipa Prosesor Fitur. Anda dapat menentukan konfigurasi lanjutan seperti jenis dan hitungan instans komputasi, dependensi runtime, konfigurasi jaringan dan keamanan menggunakan dekorator. `@remote`

Contoh berikut menggunakan `to_pipeline` dan `execute` APIs.

```
from sagemaker.feature_store.feature_processor import (
    execute, to_pipeline, describe, TransformationCode
)

pipeline_name="feature-processor-pipeline"
pipeline_arn = to_pipeline(
    pipeline_name=pipeline_name,
    step=transform,
    transformation_code=TransformationCode(s3_uri="s3://bucket/prefix"),
)

pipeline_execution_arn = execute(
    pipeline_name=pipeline_name
)
```

`to_pipeline`API secara semantik merupakan operasi upsert. Ini memperbarui pipa jika sudah ada; jika tidak, itu membuat pipa.

`to_pipeline`API secara opsional menerima URI Amazon S3 yang mereferensikan file yang berisi definisi Prosesor Fitur untuk mengaitkannya dengan pipeline Prosesor Fitur untuk melacak fungsi transformasi dan SageMaker versinya dalam garis keturunan pembelajaran mesin AI.

Untuk mengambil daftar setiap pipeline Prosesor Fitur di akun Anda, Anda dapat menggunakan `list_pipelines` API. Permintaan berikutnya ke `describe` API menampilkan detail yang terkait dengan pipeline Prosesor Fitur termasuk, namun tidak terbatas pada, Pipelines dan detail jadwal.

Contoh berikut menggunakan `list_pipelines` dan `describe` APIs.

```
from sagemaker.feature_store.feature_processor import list_pipelines, describe

feature_processor_pipelines = list_pipelines()

pipeline_description = describe(
    pipeline_name = feature_processor_pipelines[0]
)
```

# Eksekusi terjadwal dan berbasis acara untuk pipeline Prosesor Fitur
<a name="feature-store-feature-processor-schedule-pipeline"></a>

Eksekusi pipeline Pemrosesan SageMaker Fitur Amazon Feature Store dapat dikonfigurasi untuk memulai secara otomatis dan asinkron berdasarkan jadwal yang telah dikonfigurasi sebelumnya atau sebagai hasil dari peristiwa layanan lain. AWS Misalnya, Anda dapat menjadwalkan pipeline Pemrosesan Fitur untuk dieksekusi pada bulan pertama setiap bulan atau menghubungkan dua saluran pipa bersama-sama sehingga pipeline target dijalankan secara otomatis setelah eksekusi pipa sumber selesai.

**Topics**
+ [Jadwal eksekusi berdasarkan](#feature-store-feature-processor-schedule-pipeline-schedule-based)
+ [Eksekusi berbasis acara](#feature-store-feature-processor-schedule-pipeline-event-based)

## Jadwal eksekusi berdasarkan
<a name="feature-store-feature-processor-schedule-pipeline-schedule-based"></a>

Feature Processor SDK menyediakan [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule)API untuk menjalankan pipeline Prosesor Fitur secara berulang dengan integrasi Amazon EventBridge Scheduler. Jadwal dapat ditentukan dengan`at`,`rate`, atau `cron` ekspresi menggunakan [https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression](https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression)parameter dengan ekspresi yang sama didukung oleh Amazon EventBridge. API jadwal secara semantik merupakan operasi upsert karena memperbarui jadwal jika sudah ada; jika tidak, itu membuatnya. Untuk informasi selengkapnya tentang EventBridge ekspresi dan contoh, lihat [Jenis jadwal pada EventBridge Penjadwal](https://docs.aws.amazon.com/scheduler/latest/UserGuide/schedule-types.html) di Panduan Pengguna EventBridge Penjadwal.

Contoh berikut menggunakan [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule)API Processor Fitur, menggunakan`at`,`rate`, dan `cron` ekspresi.

```
from sagemaker.feature_store.feature_processor import schedule
pipeline_name='feature-processor-pipeline'

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="at(2020-11-30T00:00:00)"
)

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="rate(24 hours)"
)

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="cron(0 0-23/1 ? * * 2023-2024)"
)
```

Zona waktu default untuk input tanggal dan waktu di `schedule` API ada di UTC. Untuk informasi selengkapnya tentang ekspresi jadwal EventBridge Scheduler, lihat [https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression](https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression)di dokumentasi Referensi API EventBridge Scheduler.

Eksekusi pipeline Prosesor Fitur Terjadwal menyediakan fungsi transformasi Anda dengan waktu eksekusi terjadwal, untuk digunakan sebagai token idempotensi atau titik referensi tetap untuk input berbasis rentang tanggal. Untuk menonaktifkan (yaitu, menjeda) atau mengaktifkan kembali jadwal, gunakan `state` parameter [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule)API dengan `‘DISABLED’` atau`‘ENABLED’`, masing-masing.

Untuk informasi tentang Prosesor Fitur, lihat[Fitur Sumber data SDK Prosesor](feature-store-feature-processor-data-sources-sdk.md). 

## Eksekusi berbasis acara
<a name="feature-store-feature-processor-schedule-pipeline-event-based"></a>

Pipeline Pemrosesan Fitur dapat dikonfigurasi untuk mengeksekusi secara otomatis ketika suatu AWS peristiwa terjadi. Feature Processing SDK menyediakan [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.put_trigger](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.put_trigger)fungsi yang menerima daftar peristiwa sumber dan pipeline target. Peristiwa sumber harus berupa contoh [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent), yang menentukan peristiwa [status pipeline dan eksekusi](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribePipelineExecution.html#sagemaker-DescribePipelineExecution-response-PipelineExecutionStatus). 

`put_trigger`Fungsi ini mengonfigurasi EventBridge aturan Amazon dan menargetkan untuk merutekan peristiwa dan memungkinkan Anda menentukan pola EventBridge peristiwa untuk merespons AWS peristiwa apa pun. Untuk informasi tentang konsep ini, lihat EventBridge [aturan](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rules.html) Amazon, [target](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html), dan [pola peristiwa](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html).

Pemicu dapat diaktifkan atau dinonaktifkan. EventBridge akan memulai eksekusi pipeline target menggunakan peran yang disediakan dalam `role_arn` parameter `put_trigger` API. Peran eksekusi digunakan secara default jika SDK digunakan di lingkungan Amazon SageMaker Studio Classic atau Notebook. Untuk informasi tentang cara mendapatkan peran eksekusi Anda, lihat[Dapatkan peran eksekusi Anda](sagemaker-roles.md#sagemaker-roles-get-execution-role).

Contoh berikut mengatur:
+ Pipeline SageMaker AI menggunakan `to_pipeline` API, yang menggunakan nama pipeline target (`target-pipeline`) dan fungsi transformasi Anda (`transform`). Untuk informasi tentang Prosesor Fitur dan fungsi transformasi, lihat[Fitur Sumber data SDK Prosesor](feature-store-feature-processor-data-sources-sdk.md).
+ Pemicu menggunakan `put_trigger` API, yang digunakan `FeatureProcessorPipelineEvent` untuk acara dan nama pipeline target Anda (`target-pipeline`). 

  `FeatureProcessorPipelineEvent`Mendefinisikan pemicu kapan status source pipeline (`source-pipeline`) Anda menjadi`Succeeded`. Untuk informasi tentang fungsi acara Pipeline Prosesor Fitur, lihat [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent)di Toko Fitur Baca Dokumen. 

```
from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent

to_pipeline(pipeline_name="target-pipeline", step=transform)

put_trigger(
    source_pipeline_events=[
        FeatureProcessorPipelineEvent(
            pipeline_name="source-pipeline",
            status=["Succeeded"]
        )
    ],
    target_pipeline="target-pipeline"
)
```

Untuk contoh penggunaan pemicu berbasis peristiwa untuk membuat eksekusi berkelanjutan dan percobaan ulang otomatis untuk pipeline Prosesor Fitur Anda, lihat. [Eksekusi berkelanjutan dan percobaan ulang otomatis menggunakan pemicu berbasis peristiwa](feature-store-feature-processor-examples.md#feature-store-feature-processor-examples-continuous-execution-automatic-retries)

Untuk contoh menggunakan pemicu berbasis peristiwa untuk membuat *streaming* berkelanjutan dan percobaan ulang otomatis menggunakan pemicu berbasis peristiwa, lihat. [Contoh sumber data khusus streaming](feature-store-feature-processor-data-sources-custom-examples.md#feature-store-feature-processor-data-sources-custom-examples-streaming) 

# Pantau SageMaker Pipeline Prosesor Fitur Fitur Amazon Feature Store
<a name="feature-store-feature-processor-monitor-pipeline"></a>

AWS menyediakan alat pemantauan untuk mengawasi sumber daya dan aplikasi Amazon SageMaker AI Anda secara real time, melaporkan kapan terjadi kesalahan, dan mengambil tindakan otomatis bila perlu. Feature Store Feature Processor pipelines adalah Pipelines, sehingga tersedia mekanisme dan integrasi pemantauan standar. Metrik operasional seperti kegagalan eksekusi dapat dipantau melalui CloudWatch metrik Amazon dan peristiwa Amazon. EventBridge 

Untuk informasi selengkapnya tentang cara memantau dan mengoperasionalkan Prosesor Fitur Toko Fitur, lihat sumber daya berikut:
+ [Memantau AWS sumber daya di Amazon SageMaker AI](monitoring-overview.md)- Panduan umum tentang aktivitas pemantauan dan audit untuk sumber daya SageMaker AI.
+ [SageMaker metrik saluran pipa](monitoring-cloudwatch.md#cloudwatch-metrics-pipelines)- CloudWatch Metrik yang dipancarkan oleh Pipelines.
+ [SageMaker perubahan status eksekusi pipa](automating-sagemaker-with-eventbridge.md#eventbridge-pipeline)- EventBridge peristiwa yang dipancarkan untuk Pipelines dan eksekusi.
+ [Memecahkan Masalah Pipa Amazon SageMaker](pipelines-troubleshooting.md)- Kiat debugging dan pemecahan masalah umum untuk Pipelines.

Fitur Toko Fitur Log eksekusi prosesor dapat ditemukan di Amazon CloudWatch Logs di bawah grup `/aws/sagemaker/TrainingJobs` log, di mana Anda dapat menemukan aliran log eksekusi menggunakan konvensi pencarian. Untuk eksekusi yang dibuat dengan langsung menjalankan fungsi yang `@feature_processor` didekorasi, Anda dapat menemukan log di konsol lingkungan eksekusi lokal Anda. Untuk eksekusi yang ` @remote` didekorasi, nama aliran CloudWatch Log berisi nama fungsi dan stempel waktu eksekusi. Untuk eksekusi pipeline Prosesor Fitur, aliran CloudWatch Log untuk langkah tersebut berisi `feature-processor` string dan ID eksekusi pipeline.

Saluran pipa Prosesor Fitur Toko Fitur dan status eksekusi terbaru dapat ditemukan di Amazon SageMaker Studio Classic untuk grup fitur tertentu di UI Toko Fitur. Grup fitur yang terkait dengan pipeline Prosesor Fitur sebagai input atau output ditampilkan di UI. Selain itu, tampilan garis keturunan dapat memberikan konteks ke dalam eksekusi hulu, seperti jaringan pipa Prosesor Fitur yang memproduksi data dan sumber data, untuk debugging lebih lanjut. Untuk informasi selengkapnya tentang penggunaan tampilan garis keturunan menggunakan Studio Classic, lihat. [Lihat silsilah dari konsol](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio)

# Izin IAM dan peran eksekusi
<a name="feature-store-feature-processor-iam-permissions"></a>

Untuk menggunakan Amazon SageMaker Python SDK memerlukan izin untuk berinteraksi dengannya. Layanan AWS Kebijakan berikut diperlukan untuk fungsionalitas Prosesor Fitur lengkap. Anda dapat melampirkan [AmazonSageMakerFullAccess](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AmazonSageMakerFullAccess.html)dan Kebijakan [AmazonEventBridgeSchedulerFullAccess](https://docs.aws.amazon.com/scheduler/latest/UserGuide/security_iam_id-based-policy-examples.html#security_iam_id-based-policies-managed-policies) AWS Terkelola yang dilampirkan ke peran IAM Anda. Untuk informasi tentang melampirkan kebijakan ke peran IAM Anda, lihat. [Menambahkan kebijakan ke peran IAM Anda](feature-store-adding-policies.md) Lihat contoh berikut untuk detailnya.

Kebijakan kepercayaan dari peran yang diterapkan kebijakan ini harus memungkinkan prinsip “scheduler.amazonaws.com”, “sagemaker.amazonaws.com”, dan “glue.amazonaws.com”.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "scheduler.amazonaws.com",
                    "sagemaker.amazonaws.com",
                    "glue.amazonaws.com"
                ]
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
```

------

# Fitur Pembatasan, batas, dan kuota prosesor
<a name="feature-store-feature-processor-quotas"></a>

Amazon SageMaker Feature Store Feature Processing mengandalkan pelacakan garis keturunan SageMaker AI machine learning (ML). Prosesor Fitur Toko Fitur menggunakan konteks garis keturunan untuk mewakili dan melacak Pipa Pemrosesan Fitur dan versi Pipa. Setiap Prosesor Fitur Toko Fitur mengkonsumsi setidaknya dua konteks garis keturunan (satu untuk Pipeline Pemrosesan Fitur dan satu lagi untuk versi). Jika sumber data input atau output dari Pipeline Pemrosesan Fitur berubah, konteks garis keturunan tambahan akan dibuat. Anda dapat memperbarui batas garis keturunan SageMaker AI dengan menjangkau AWS dukungan untuk peningkatan batas. Batas default untuk sumber daya yang digunakan oleh Prosesor Fitur Toko Fitur adalah sebagai berikut. Untuk informasi tentang pelacakan garis keturunan SageMaker AI ML, lihat. [Pelacakan SageMaker Silsilah Amazon](lineage-tracking.md)

Untuk informasi lebih lanjut tentang kuota SageMaker AI, lihat [titik akhir dan kuota Amazon SageMaker AI](https://docs.aws.amazon.com/general/latest/gr/sagemaker.html).

Batas garis keturunan per Wilayah
+ Konteks - 500 (batas lunak)
+ Artefak - 6.000 (batas lunak)
+ Asosiasi - 6.000 (batas lunak)

Batas Pelatihan per Wilayah
+ Waktu lari terpanjang untuk pekerjaan pelatihan — 432.000 detik
+ Jumlah maksimum instans per pekerjaan pelatihan - 20
+ Jumlah maksimum `CreateTrainingJob` permintaan yang dapat Anda buat, per detik, di akun ini di Wilayah saat ini - 1 TPS
+ Pertahankan periode hidup untuk penggunaan kembali cluster — 3.600 detik

Jumlah maksimum Pipa dan eksekusi pipa bersamaan per Wilayah
+ Jumlah maksimum saluran pipa yang diizinkan per akun - 500
+ Jumlah maksimum eksekusi pipa bersamaan yang diizinkan per akun — 20
+ Waktu di mana waktu eksekusi pipa habis - 672 jam

# Sumber data
<a name="feature-store-feature-processor-data-sources"></a>

Amazon SageMaker Feature Store Feature Processing mendukung beberapa sumber data. Feature Processor SDK for Python (Boto3) menyediakan konstruksi untuk memuat data dari grup fitur atau objek yang disimpan di Amazon S3. Selain itu, Anda dapat membuat sumber data khusus untuk memuat data dari sumber data lain. Untuk informasi tentang sumber data yang disediakan Toko Fitur, lihat [Sumber data Prosesor Fitur Feature Store Python](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py) SDK. 

**Topics**
+ [Fitur Sumber data SDK Prosesor](feature-store-feature-processor-data-sources-sdk.md)
+ [Sumber data kustom](feature-store-feature-processor-data-sources-custom.md)
+ [Contoh sumber data kustom](feature-store-feature-processor-data-sources-custom-examples.md)

# Fitur Sumber data SDK Prosesor
<a name="feature-store-feature-processor-data-sources-sdk"></a>

Amazon SageMaker Feature Store Feature Processor SDK for Python (Boto3) menyediakan konstruksi untuk memuat data dari grup fitur atau objek yang disimpan di Amazon S3. Untuk daftar lengkap definisi sumber data yang disediakan Toko Fitur, lihat [Sumber data Prosesor Fitur Feature Store Python](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py) SDK. 

Untuk contoh tentang cara menggunakan definisi sumber data SDK Python Toko Fitur, lihat. [Contoh kode Pemrosesan Fitur untuk kasus penggunaan umum](feature-store-feature-processor-examples.md)

## FeatureGroupDataSource
<a name="feature-store-feature-processor-data-sources-sdk-featuregroup"></a>

`FeatureGroupDataSource`Ini digunakan untuk menentukan grup fitur sebagai sumber data input untuk Prosesor Fitur. Data dapat dimuat dari grup fitur toko offline. Mencoba memuat data Anda dari grup fitur toko online akan menghasilkan kesalahan validasi. Anda dapat menentukan offset awal dan akhir untuk membatasi data yang dimuat ke rentang waktu tertentu. Misalnya, Anda dapat menentukan offset awal '14 hari' untuk memuat hanya dua minggu terakhir data, dan Anda juga dapat menentukan offset akhir '7 hari' untuk membatasi input ke data minggu sebelumnya.

## Fitur Store menyediakan definisi sumber data
<a name="feature-store-feature-processor-data-sources-sdk-provided-sources"></a>

Feature Store Python SDK berisi definisi sumber data yang dapat digunakan untuk menentukan berbagai sumber data input untuk Prosesor Fitur. Ini termasuk sumber tabel CSV, Parket, dan Gunung Es. Untuk daftar lengkap definisi sumber data yang disediakan Toko Fitur, lihat [Sumber data Prosesor Fitur Feature Store Python](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py) SDK. 

# Sumber data kustom
<a name="feature-store-feature-processor-data-sources-custom"></a>

Pada halaman ini kita akan menjelaskan cara membuat kelas sumber data kustom dan menunjukkan beberapa contoh penggunaan. Dengan sumber data khusus, Anda dapat menggunakan SageMaker AI SDK for Python ( APIs Boto3) yang disediakan dengan cara yang sama seperti jika Anda menggunakan sumber data yang disediakan Amazon Feature Store. SageMaker 

Untuk menggunakan sumber data khusus untuk mengubah dan menyerap data ke dalam grup fitur menggunakan Pemrosesan Fitur, Anda perlu memperluas `PySparkDataSource` kelas dengan anggota dan fungsi kelas berikut.
+ `data_source_name`(str): nama arbitrer untuk sumber data. Misalnya, Amazon Redshift, Snowflake, atau Glue Catalog ARN.
+ `data_source_unique_id`(str): pengenal unik yang mengacu pada sumber daya tertentu yang diakses. Misalnya, nama tabel, DDB Tabel ARN, awalan Amazon S3. Semua penggunaan yang sama `data_source_unique_id` dalam sumber data kustom akan dikaitkan dengan sumber data yang sama dalam tampilan garis keturunan. Lineage mencakup informasi tentang kode eksekusi alur kerja pemrosesan fitur, sumber data apa yang digunakan, dan bagaimana mereka dimasukkan ke dalam grup fitur atau fitur. Untuk informasi tentang melihat silsilah grup fitur di **Studio**, lihat. [Lihat silsilah dari konsol](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio)
+ `read_data`(func): metode yang digunakan untuk terhubung dengan prosesor fitur. Mengembalikan frame data Spark. Sebagai contoh, lihat [Contoh sumber data kustom](feature-store-feature-processor-data-sources-custom-examples.md).

Keduanya `data_source_name` dan `data_source_unique_id` digunakan untuk mengidentifikasi entitas garis keturunan Anda secara unik. Berikut ini adalah contoh untuk kelas sumber data kustom bernama`CustomDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from pyspark.sql import DataFrame

class CustomDataSource(PySparkDataSource):
    
    data_source_name = "custom-data-source-name"
    data_source_unique_id = "custom-data-source-id"
    
    def read_data(self, parameter, spark) -> DataFrame:
        your own code here to read data into a Spark dataframe
        return dataframe
```

# Contoh sumber data kustom
<a name="feature-store-feature-processor-data-sources-custom-examples"></a>

Bagian ini memberikan contoh implementasi sumber data kustom untuk Prosesor Fitur. Untuk informasi selengkapnya tentang sumber data kustom, lihat[Sumber data kustom](feature-store-feature-processor-data-sources-custom.md).

Keamanan adalah tanggung jawab bersama antara AWS dan pelanggan kami. AWS bertanggung jawab untuk melindungi infrastruktur yang menjalankan layanan di AWS Cloud. Pelanggan bertanggung jawab atas semua konfigurasi keamanan dan tugas manajemen yang diperlukan. Misalnya, rahasia seperti kredensyal akses ke penyimpanan data tidak boleh dikodekan keras dalam sumber data kustom Anda. Anda dapat menggunakan AWS Secrets Manager untuk mengelola kredensional ini. Untuk informasi tentang Secrets Manager, lihat [Apa itu AWS Secrets Manager?](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) dalam panduan AWS Secrets Manager pengguna. Contoh berikut akan menggunakan Secrets Manager untuk kredensialmu.

**Topics**
+ [Contoh sumber data kustom Amazon Redshift Clusters (JDBC)](#feature-store-feature-processor-data-sources-custom-examples-redshift)
+ [Contoh sumber data kustom kepingan salju](#feature-store-feature-processor-data-sources-custom-examples-snowflake)
+ [Contoh sumber data kustom Databricks (JDBC)](#feature-store-feature-processor-data-sources-custom-examples-databricks)
+ [Contoh sumber data khusus streaming](#feature-store-feature-processor-data-sources-custom-examples-streaming)

## Contoh sumber data kustom Amazon Redshift Clusters (JDBC)
<a name="feature-store-feature-processor-data-sources-custom-examples-redshift"></a>

Amazon Redshift menawarkan driver JDBC yang dapat digunakan untuk membaca data dengan Spark. Untuk informasi tentang cara mengunduh driver Amazon Redshift JDBC, lihat Mengunduh driver [Amazon Redshift JDBC, versi 2.1](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-download-driver.html). 

Untuk membuat kelas sumber data Amazon Redshift kustom, Anda harus menimpa `read_data` metode dari. [Sumber data kustom](feature-store-feature-processor-data-sources-custom.md) 

Untuk terhubung dengan cluster Amazon Redshift, Anda memerlukan:
+ URL JDBC Amazon Redshift () `jdbc-url`

  Untuk informasi tentang mendapatkan URL Amazon Redshift JDBC, lihat [Mendapatkan URL JDBC di Panduan Pengembang Database](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-obtain-url.html) Amazon Redshift.
+ Nama pengguna Amazon Redshift (`redshift-user`) dan kata sandi () `redshift-password`

  Untuk informasi tentang cara membuat dan mengelola pengguna database menggunakan perintah Amazon Redshift SQL, lihat [Pengguna](https://docs.aws.amazon.com/redshift/latest/dg/r_Users.html) di Panduan Pengembang Database Amazon Redshift.
+ Nama tabel Amazon Redshift () `redshift-table-name`

  Untuk informasi tentang cara membuat tabel dengan beberapa contoh, lihat [MEMBUAT TABEL di Panduan](https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html) Pengembang Database Amazon Redshift.
+ (Opsional) Jika menggunakan Secrets Manager, Anda memerlukan nama rahasia (`secret-redshift-account-info`) tempat menyimpan nama pengguna dan kata sandi akses Amazon Redshift di Secrets Manager.

  Untuk informasi tentang Secrets Manager, lihat [Menemukan rahasia AWS Secrets Manager di](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) Panduan AWS Secrets Manager Pengguna. 
+ Wilayah AWS (`your-region`)

  [Untuk informasi tentang mendapatkan nama wilayah sesi Anda saat ini menggunakan SDK for Python (Boto3), lihat region\$1name dalam dokumentasi Boto3.](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)

Contoh berikut menunjukkan cara mengambil URL JDBC dan token akses pribadi dari Secrets Manager dan mengganti `read_data` untuk kelas sumber data kustom Anda,. `DatabricksDataSource`

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class RedshiftDataSource(PySparkDataSource):
    
    data_source_name = "Redshift"
    data_source_unique_id = "redshift-resource-arn"
    
    def read_data(self, spark, params):
        url = "jdbc-url?user=redshift-user&password=redshift-password"
        aws_iam_role_arn = "redshift-command-access-role"
        secret_name = "secret-redshift-account-info"
        region_name = "your-region"
        
        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = url.replace("jdbc-url", secrets["jdbcurl"]).replace("redshift-user", secrets['username']).replace("redshift-password", secrets['password'])
        
        return spark.read \
             .format("jdbc") \
             .option("url", url) \
             .option("driver", "com.amazon.redshift.Driver") \
             .option("dbtable", "redshift-table-name") \
             .option("tempdir", "s3a://your-bucket-name/your-bucket-prefix") \
             .option("aws_iam_role", aws_iam_role_arn) \
             .load()
```

Contoh berikut menunjukkan bagaimana menghubungkan `RedshiftDataSource` ke `feature_processor` dekorator Anda.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"}
)
def transform(input_df):
    return input_df
```

Untuk menjalankan pekerjaan prosesor fitur dari jarak jauh, Anda perlu menyediakan driver jdbc dengan mendefinisikan `SparkConfig` dan meneruskannya ke dekorator. `@remote`

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Contoh sumber data kustom kepingan salju
<a name="feature-store-feature-processor-data-sources-custom-examples-snowflake"></a>

Snowflake menyediakan konektor Spark yang dapat digunakan untuk dekorator Anda. `feature_processor` Untuk informasi tentang konektor Snowflake untuk Spark, lihat Snowflake [Connector for Spark di dokumentasi Snowflake](https://docs.snowflake.com/en/user-guide/spark-connector).

Untuk membuat kelas sumber data Snowflake kustom, Anda harus mengganti `read_data` metode dari [Sumber data kustom](feature-store-feature-processor-data-sources-custom.md) dan menambahkan paket konektor Spark ke classpath Spark. 

Untuk terhubung dengan sumber data Snowflake yang Anda butuhkan:
+ URL kepingan salju () `sf-url`

  Untuk informasi tentang cara URLs mengakses antarmuka web Snowflake, lihat [Pengenal Akun](https://docs.snowflake.com/en/user-guide/admin-account-identifier) di dokumentasi Snowflake.
+ Database kepingan salju () `sf-database` 

  Untuk informasi tentang mendapatkan nama database Anda menggunakan Snowflake, lihat [CURRENT\$1DATABASE](https://docs.snowflake.com/en/sql-reference/functions/current_database) dalam dokumentasi Snowflake.
+ Skema basis data kepingan salju () `sf-schema` 

  Untuk informasi tentang mendapatkan nama skema Anda menggunakan Snowflake, lihat [CURRENT\$1SCHEMA](https://docs.snowflake.com/en/sql-reference/functions/current_schema) di dokumentasi Snowflake.
+ Gudang kepingan salju () `sf-warehouse`

  Untuk informasi tentang mendapatkan nama gudang Anda menggunakan Snowflake, lihat [CURRENT\$1WAREHOUSE](https://docs.snowflake.com/en/sql-reference/functions/current_warehouse) di dokumentasi Snowflake.
+ Nama tabel kepingan salju () `sf-table-name`
+ (Opsional) Jika menggunakan Secrets Manager, Anda akan memerlukan nama rahasia (`secret-snowflake-account-info`) tempat Anda menyimpan nama pengguna dan kata sandi akses Snowflake di Secrets Manager. 

  Untuk informasi tentang Secrets Manager, lihat [Menemukan rahasia AWS Secrets Manager di](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) Panduan AWS Secrets Manager Pengguna. 
+ Wilayah AWS (`your-region`)

  [Untuk informasi tentang mendapatkan nama wilayah sesi Anda saat ini menggunakan SDK for Python (Boto3), lihat region\$1name dalam dokumentasi Boto3.](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)

Contoh berikut menunjukkan cara mengambil nama pengguna dan kata sandi Snowflake dari Secrets Manager dan mengganti `read_data` fungsi untuk kelas sumber data kustom Anda. `SnowflakeDataSource`

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from sagemaker.feature_store.feature_processor import feature_processor
import json
import boto3


class SnowflakeDataSource(PySparkDataSource):
    
    sf_options = { 
        "sfUrl" : "sf-url",
        "sfDatabase" : "sf-database",
        "sfSchema" : "sf-schema",
        "sfWarehouse" : "sf-warehouse",
    }

    data_source_name = "Snowflake"
    data_source_unique_id = "sf-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-snowflake-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        self.sf_options["sfUser"] = secrets.get("username")
        self.sf_options["sfPassword"] = secrets.get("password")
        
        return spark.read.format("net.snowflake.spark.snowflake") \
                        .options(**self.sf_options) \
                        .option("dbtable", "sf-table-name") \
                        .load()
```

Contoh berikut menunjukkan bagaimana menghubungkan `SnowflakeDataSource` ke `feature_processor` dekorator Anda.

```
from sagemaker.feature_store.feature_processor import feature_processor

@feature_processor(
    inputs=[SnowflakeDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"}
)
def transform(input_df):
    return input_df
```

Untuk menjalankan pekerjaan prosesor fitur dari jarak jauh, Anda perlu menyediakan paket melalui mendefinisikan `SparkConfig` dan meneruskannya ke `@remote` dekorator. Paket Spark dalam contoh berikut sedemikian rupa sehingga `spark-snowflake_2.12` merupakan versi Feature Processor Scala, `2.12.0` adalah versi Snowflake yang ingin Anda gunakan, dan `spark_3.3` merupakan versi Feature Processor Spark. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[SnowflakeDataSource()],
    output="feature-group-arn>",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Contoh sumber data kustom Databricks (JDBC)
<a name="feature-store-feature-processor-data-sources-custom-examples-databricks"></a>

Spark dapat membaca data dari Databricks dengan menggunakan driver Databricks JDBC. Untuk informasi tentang driver JDBC Databricks, lihat [Mengkonfigurasi driver Databricks ODBC dan JDBC dalam dokumentasi Databricks](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#configure-the-databricks-odbc-and-jdbc-drivers).

**catatan**  
Anda dapat membaca data dari database lain dengan memasukkan driver JDBC yang sesuai di Spark classpath. Untuk informasi selengkapnya, lihat [JDBC To Other Databases](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html) di Spark SQL Guide.

Untuk membuat kelas sumber data Databricks kustom, Anda harus mengganti `read_data` metode dari [Sumber data kustom](feature-store-feature-processor-data-sources-custom.md) dan menambahkan jar JDBC ke classpath Spark. 

Untuk terhubung dengan sumber data Databricks yang Anda butuhkan:
+ URL Databricks () `databricks-url`

  Untuk informasi tentang URL Databricks Anda, lihat [Membangun URL koneksi untuk driver Databricks dalam dokumentasi Databricks](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#building-the-connection-url-for-the-databricks-driver).
+ Databricks token akses pribadi () `personal-access-token`

  Untuk informasi tentang token akses Databricks Anda, lihat [otentikasi token akses pribadi Databricks](https://docs.databricks.com/en/dev-tools/auth.html#pat) dalam dokumentasi Databricks.
+ Nama katalog data (`db-catalog`) 

  Untuk informasi tentang nama katalog Databricks Anda, lihat [Nama katalog](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#catalog-name) dalam dokumentasi Databricks.
+ Nama skema () `db-schema`

  [Untuk informasi tentang nama skema Databricks Anda, lihat Nama skema dalam dokumentasi Databricks.](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#schema-name)
+ Nama tabel (`db-table-name`)

  Untuk informasi tentang nama tabel Databricks Anda, lihat [Nama tabel](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#table-name) dalam dokumentasi Databricks.
+ (Opsional) Jika menggunakan Secrets Manager, Anda memerlukan nama rahasia (`secret-databricks-account-info`) tempat menyimpan nama pengguna dan kata sandi akses Databricks di Secrets Manager. 

  Untuk informasi tentang Secrets Manager, lihat [Menemukan rahasia AWS Secrets Manager di](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) Panduan AWS Secrets Manager Pengguna. 
+ Wilayah AWS (`your-region`)

  [Untuk informasi tentang mendapatkan nama wilayah sesi Anda saat ini menggunakan SDK for Python (Boto3), lihat region\$1name dalam dokumentasi Boto3.](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)

Contoh berikut menunjukkan cara mengambil URL JDBC dan token akses pribadi dari Secrets Manager dan menimpa `read_data` untuk kelas sumber data kustom Anda,. `DatabricksDataSource`

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class DatabricksDataSource(PySparkDataSource):
    
    data_source_name = "Databricks"
    data_source_unique_id = "databricks-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-databricks-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = secrets["jdbcurl"].replace("personal-access-token", secrets['pwd'])
         
        return spark.read.format("jdbc") \
                        .option("url", jdbc_url) \
                        .option("dbtable","`db-catalog`.`db-schema`.`db-table-name`") \
                        .option("driver", "com.simba.spark.jdbc.Driver") \
                        .load()
```

Contoh berikut menunjukkan cara mengunggah jar driver JDBC,`jdbc-jar-file-name.jar`, ke Amazon S3 untuk menambahkannya ke classpath Spark. Untuk informasi tentang mengunduh driver Spark JDBC (`jdbc-jar-file-name.jar`) dari Databricks, lihat [Mengunduh Driver JDBC](https://www.databricks.com/spark/jdbc-drivers-download) di situs web Databricks.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[DatabricksDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"}
)
def transform(input_df):
    return input_df
```

Untuk menjalankan pekerjaan prosesor fitur dari jarak jauh, Anda perlu menyediakan stoples dengan mendefinisikan `SparkConfig` dan meneruskannya ke dekorator. `@remote`

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[DatabricksDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Contoh sumber data khusus streaming
<a name="feature-store-feature-processor-data-sources-custom-examples-streaming"></a>

Anda dapat terhubung ke sumber data streaming seperti Amazon Kinesis, dan penulis mengubah dengan Spark Structured Streaming untuk membaca dari sumber data streaming. Untuk informasi tentang konektor Kinesis, lihat Konektor [Kinesis untuk Streaming Terstruktur Spark](https://github.com/roncemer/spark-sql-kinesis) di. GitHub Untuk informasi tentang Amazon Kinesis, lihat [Apa Itu Amazon Kinesis](https://docs.aws.amazon.com/streams/latest/dev/introduction.html) Data Streams? di Panduan Pengembang Amazon Kinesis.

Untuk membuat kelas sumber data Amazon Kinesis kustom, Anda perlu memperluas `BaseDataSource` kelas dan mengganti metode dari`read_data`. [Sumber data kustom](feature-store-feature-processor-data-sources-custom.md)

Untuk terhubung ke aliran data Amazon Kinesis, Anda memerlukan:
+ Kinesis ARN () `kinesis-resource-arn` 

  Untuk informasi tentang aliran data Kinesis ARNs, lihat [Amazon Resource Names (ARNs) untuk Kinesis Data Streams di Panduan Pengembang Amazon Kinesis](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html#kinesis-using-iam-arn-format).
+ Nama aliran data Kinesis () `kinesis-stream-name`
+ Wilayah AWS (`your-region`)

  [Untuk informasi tentang mendapatkan nama wilayah sesi Anda saat ini menggunakan SDK for Python (Boto3), lihat region\$1name dalam dokumentasi Boto3.](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)

```
from sagemaker.feature_store.feature_processor import BaseDataSource
from sagemaker.feature_store.feature_processor import feature_processor

class KinesisDataSource(BaseDataSource):

    data_source_name = "Kinesis"
    data_source_unique_id = "kinesis-resource-arn"
    
    def read_data(self, spark, params): 
        return spark.readStream.format("kinesis") \
            .option("streamName", "kinesis-stream-name") \
            .option("awsUseInstanceProfile", "false") \
            .option("endpointUrl", "https://kinesis.your-region.amazonaws.com")
            .load()
```

Contoh berikut menunjukkan bagaimana menghubungkan `KinesisDataSource` ke `feature_processor` dekorator Anda. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig
import feature_store_pyspark.FeatureStoreManager as fsm

def ingest_micro_batch_into_fg(input_df, epoch_id):
    feature_group_arn = "feature-group-arn"
    fsm.FeatureStoreManager().ingest_data(
        input_data_frame = input_df,
        feature_group_arn = feature_group_arn
    )

@remote(
    spark_config=SparkConfig(
        configuration={
            "Classification": "spark-defaults", 
            "Properties":{
                "spark.sql.streaming.schemaInference": "true",
                "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2"
            }
        }
    ),
    instance_type="ml.m5.2xlarge",
    max_runtime_in_seconds=2419200 # 28 days
)
@feature_processor(
    inputs=[KinesisDataSource()],
    output="feature-group-arn"
)
def transform(input_df):    
    output_stream = (
        input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)")
        .writeStream.foreachBatch(ingest_micro_batch_into_fg)
        .trigger(processingTime="1 minute")
        .option("checkpointLocation", "s3a://checkpoint-path")
        .start()
    )
    output_stream.awaitTermination()
```

Dalam contoh kode di atas, kami menggunakan beberapa opsi Streaming Terstruktur Spark saat mengalirkan batch mikro ke grup fitur Anda. Untuk daftar lengkap opsi, lihat [Panduan Pemrograman Streaming Terstruktur](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) di dokumentasi Apache Spark. 
+ Mode `foreachBatch` wastafel adalah fitur yang memungkinkan Anda menerapkan operasi dan menulis logika pada data keluaran setiap batch mikro dari kueri streaming. 

  Untuk informasi tentang`foreachBatch`, lihat [Menggunakan Foreach dan ForeachBatch](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch) di Panduan Pemrograman Streaming Terstruktur Apache Spark. 
+ `checkpointLocation`Opsi ini secara berkala menyimpan keadaan aplikasi streaming. Log streaming disimpan di lokasi `s3a://checkpoint-path` pos pemeriksaan.

  Untuk informasi tentang `checkpointLocation` opsi, lihat [Memulihkan dari Kegagalan dengan Checkpointing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing) di Panduan Pemrograman Streaming Terstruktur Apache Spark. 
+ `trigger`Pengaturan menentukan seberapa sering pemrosesan batch mikro dipicu dalam aplikasi streaming. Dalam contoh, jenis pemicu waktu pemrosesan digunakan dengan interval batch mikro satu menit, yang ditentukan oleh. `trigger(processingTime="1 minute")` Untuk mengisi ulang dari sumber aliran, Anda dapat menggunakan tipe pemicu yang tersedia sekarang, yang ditentukan oleh. `trigger(availableNow=True)`

  Untuk daftar lengkap `trigger` jenis, lihat [Pemicu](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers) dalam Panduan Pemrograman Streaming Terstruktur Apache Spark.

**Streaming berkelanjutan dan percobaan ulang otomatis menggunakan pemicu berbasis peristiwa**

Prosesor Fitur menggunakan SageMaker Pelatihan sebagai infrastruktur komputasi dan memiliki batas waktu proses maksimum 28 hari. Anda dapat menggunakan pemicu berbasis peristiwa untuk memperpanjang streaming berkelanjutan Anda untuk jangka waktu yang lebih lama dan pulih dari kegagalan sementara. Untuk informasi selengkapnya tentang eksekusi berdasarkan jadwal dan acara, lihat[Eksekusi terjadwal dan berbasis acara untuk pipeline Prosesor Fitur](feature-store-feature-processor-schedule-pipeline.md).

Berikut ini adalah contoh pengaturan pemicu berbasis peristiwa untuk menjaga saluran Prosesor Fitur streaming tetap berjalan terus menerus. Ini menggunakan fungsi transformasi streaming yang didefinisikan dalam contoh sebelumnya. Pipeline target dapat dikonfigurasi untuk dipicu ketika `FAILED` peristiwa `STOPPED` atau terjadi untuk eksekusi pipeline sumber. Perhatikan bahwa pipeline yang sama digunakan sebagai sumber dan target sehingga berjalan terus menerus.

```
import sagemaker.feature_store.feature_processor as fp
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "streaming-pipeline"
streaming_pipeline_arn = fp.to_pipeline(
    pipeline_name = streaming_pipeline_name,
    step = transform # defined in previous section
)

fp.put_trigger(
    source_pipeline_events=FeatureProcessorPipelineEvents(
        pipeline_name=source_pipeline_name, 
        pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
    ),
    target_pipeline=target_pipeline_name
)
```

# Contoh kode Pemrosesan Fitur untuk kasus penggunaan umum
<a name="feature-store-feature-processor-examples"></a>

Contoh berikut memberikan contoh kode Pemrosesan Fitur untuk kasus penggunaan umum. Untuk contoh notebook yang lebih detail yang menampilkan kasus penggunaan tertentu, lihat [Notebook Pemrosesan SageMaker Fitur Amazon Feature Store](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-featurestore/feature_store_feature_processor.ipynb).

Dalam contoh berikut, `us-east-1` adalah wilayah sumber daya, `111122223333` adalah ID akun pemilik sumber daya, dan `your-feature-group-name` merupakan nama grup fitur.

Kumpulan `transactions` data yang digunakan dalam contoh berikut memiliki skema berikut:

```
'FeatureDefinitions': [
  {'FeatureName': 'txn_id', 'FeatureType': 'String'},
  {'FeatureName': 'txn_time', 'FeatureType': 'String'},
  {'FeatureName': 'credit_card_num', 'FeatureType': 'String'},
  {'FeatureName': 'txn_amount', 'FeatureType': 'Fractional'}
]
```

**Topics**
+ [Menggabungkan data dari berbagai sumber data](#feature-store-feature-processor-examples-joining-multiple-sources)
+ [Agregat jendela geser](#feature-store-feature-processor-examples-sliding-window-aggregates)
+ [Agregat jendela jatuh](#feature-store-feature-processor-examples-tumbling-window-aggregates)
+ [Promosi dari toko offline ke toko online](#feature-store-feature-processor-examples-promotion-offline-to-online-store)
+ [Transformasi dengan perpustakaan Pandas](#feature-store-feature-processor-examples-transforms-with-pandas-library)
+ [Eksekusi berkelanjutan dan percobaan ulang otomatis menggunakan pemicu berbasis peristiwa](#feature-store-feature-processor-examples-continuous-execution-automatic-retries)

## Menggabungkan data dari berbagai sumber data
<a name="feature-store-feature-processor-examples-joining-multiple-sources"></a>

```
@feature_processor(
    inputs=[
        CSVDataSource('s3://bucket/customer'), 
        FeatureGroupDataSource('transactions')
    ],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def join(transactions_df, customer_df):
  '''Combine two data sources with an inner join on a common column'''

  return transactions_df.join(
    customer_df, transactions_df.customer_id == customer_df.customer_id, "inner"
  )
```

## Agregat jendela geser
<a name="feature-store-feature-processor-examples-sliding-window-aggregates"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def sliding_window_aggregates(transactions_df):
    '''Aggregates over 1-week windows, across 1-day sliding windows.'''
    from pyspark.sql.functions import window, avg, count
    
    return (
        transactions_df
            .groupBy("credit_card_num", window("txn_time", "1 week", "1 day"))
            .agg(avg("txn_amount").alias("avg_week"), count("*").alias("count_week")) 
            .orderBy("window.start")
            .select("credit_card_num", "window.start", "avg_week", "count_week")
    )
```

## Agregat jendela jatuh
<a name="feature-store-feature-processor-examples-tumbling-window-aggregates"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def tumbling_window_aggregates(transactions_df, spark):
    '''Aggregates over 1-week windows, across 1-day tumbling windows, as a SQL query.'''

    transactions_df.createOrReplaceTempView('transactions')
    return spark.sql(f'''
        SELECT credit_card_num, window.start, AVG(amount) AS avg, COUNT(*) AS count  
        FROM transactions
        GROUP BY credit_card_num, window(txn_time, "1 week")  
        ORDER BY window.start
    ''')
```

## Promosi dari toko offline ke toko online
<a name="feature-store-feature-processor-examples-promotion-offline-to-online-store"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    target_stores=['OnlineStore'],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions'
)
def offline_to_online():
    '''Move data from the offline store to the online store of the same feature group.'''

    transactions_df.createOrReplaceTempView('transactions')
    return spark.sql(f'''
        SELECT txn_id, txn_time, credit_card_num, amount
        FROM
            (SELECT *,
            row_number()
            OVER
                (PARTITION BY txn_id
                ORDER BY "txn_time" DESC, Api_Invocation_Time DESC, write_time DESC)
            AS row_number
            FROM transactions)
        WHERE row_number = 1
    ''')
```

## Transformasi dengan perpustakaan Pandas
<a name="feature-store-feature-processor-examples-transforms-with-pandas-library"></a>

**Transformasi dengan perpustakaan Pandas**

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    target_stores=['OnlineStore'],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions'
)
def pandas(transactions_df):
    '''Author transformations using the Pandas interface.
    
    Requires PyArrow to be installed via pip.
    For more details: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark
    '''
    import pyspark.pandas as ps
    
    # PySpark DF to Pandas-On-Spark DF (Distributed DF with Pandas interface).
    pandas_on_spark_df = transactions_df.pandas_api()
    # Pandas-On-Spark DF to Pandas DF (Single Machine Only).
    pandas_df = pandas_on_spark_df.to_pandas()
    
    # Reverse: Pandas DF to Pandas-On-Spark DF
    pandas_on_spark_df = ps.from_pandas(pandas_df)
    # Reverse: Pandas-On-Spark DF to PySpark DF
    spark_df = pandas_on_spark_df.to_spark()
    
    return spark_df
```

## Eksekusi berkelanjutan dan percobaan ulang otomatis menggunakan pemicu berbasis peristiwa
<a name="feature-store-feature-processor-examples-continuous-execution-automatic-retries"></a>

```
from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "target-pipeline"

to_pipeline(
    pipeline_name=streaming_pipeline_name,
    step=transform
)

put_trigger(
    source_pipeline_events=[
        FeatureProcessorPipelineEvent(
            pipeline_name=streaming_pipeline_name, 
            pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
        )
    ],
    target_pipeline=streaming_pipeline_name
)
```