

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Utiliser SageMaker le traitement pour l'ingénierie des fonctionnalités distribuées d'ensembles de données ML à l'échelle du téraoctet
<a name="use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets"></a>

*Chris Boomhower, Amazon Web Services*

## Résumé
<a name="use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets-summary"></a>

De nombreux ensembles de données de plusieurs téraoctets ou plus se composent souvent d'une structure de dossiers hiérarchique, et les fichiers du jeu de données partagent parfois des interdépendances. C'est pourquoi les ingénieurs en apprentissage automatique (ML) et les scientifiques des données doivent prendre des décisions de conception réfléchies afin de préparer ces données pour l'entraînement et l'inférence des modèles. Ce modèle montre comment vous pouvez utiliser des techniques manuelles de macrosharding et de microsharding en combinaison avec Amazon SageMaker Processing et la parallélisation des processeurs virtuels (vCPU) pour adapter efficacement les processus d'ingénierie des fonctionnalités aux ensembles de données Big Data ML complexes. 

Ce modèle définit le *macrosharding* comme la division de répertoires de données sur plusieurs machines pour le traitement, et le *microsharding* comme le partage des données de chaque machine sur plusieurs threads de traitement. Le modèle illustre ces techniques en utilisant Amazon SageMaker avec des exemples d'enregistrements de formes d'onde de séries chronologiques issus du jeu de données [PhysioNet MIMIC-III](https://physionet.org/content/mimic3wdb/1.0/). En mettant en œuvre les techniques de ce modèle, vous pouvez minimiser le temps de traitement et les coûts liés à l'ingénierie des fonctionnalités tout en maximisant l'utilisation des ressources et l'efficacité du débit. Ces optimisations reposent sur le SageMaker traitement distribué sur les instances Amazon Elastic Compute Cloud EC2 (Amazon) et v CPUs pour des ensembles de données volumineux similaires, quel que soit le type de données.

## Conditions préalables et limitations
<a name="use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets-prereqs"></a>

**Conditions préalables**
+ Accès aux instances de SageMaker bloc-notes ou à SageMaker Studio, si vous souhaitez implémenter ce modèle pour votre propre ensemble de données. Si vous utilisez Amazon SageMaker pour la première fois, consultez la section [Commencer avec Amazon SageMaker](https://docs.aws.amazon.com/sagemaker/latest/dg/gs.html) dans la documentation AWS.
+ SageMaker Studio, si vous souhaitez implémenter ce modèle avec les exemples de données [PhysioNet MIMIC-III](https://physionet.org/content/mimic3wdb/1.0/). 
+ Le modèle utilise le SageMaker traitement, mais ne nécessite aucune expérience dans l'exécution de tâches SageMaker de traitement.

**Limites**
+ Ce modèle convient parfaitement aux ensembles de données ML qui incluent des fichiers interdépendants. Ces interdépendances tirent le meilleur parti du macrosharding manuel et de l'exécution en parallèle de plusieurs SageMaker tâches de traitement en instance unique. Pour les ensembles de données où de telles interdépendances n'existent pas, la `ShardedByS3Key` fonctionnalité de SageMaker Processing peut constituer une meilleure alternative au macrosharding, car elle envoie des données fragmentées à plusieurs instances gérées par la même tâche de traitement. Cependant, vous pouvez implémenter la stratégie de microsharding de ce modèle dans les deux scénarios afin d'utiliser au mieux l'instance v. CPUs

**Versions du produit**
+ Kit de développement logiciel Amazon SageMaker Python version 2

## Architecture
<a name="use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets-architecture"></a>

**Pile technologique cible**
+ Amazon Simple Storage Service (Amazon S3)
+ Amazon SageMaker

**Architecture cible**

*Macrosharding et instances distribuées EC2 *

Les 10 processus parallèles représentés dans cette architecture reflètent la structure du jeu de données MIMIC-III. (Les processus sont représentés par des ellipses pour simplifier les diagrammes.) Une architecture similaire s'applique à n'importe quel ensemble de données lorsque vous utilisez le macrosharding manuel. Dans le cas de MIMIC-III, vous pouvez utiliser la structure brute de l'ensemble de données à votre avantage en traitant chaque dossier de groupe de patients séparément, avec un minimum d'effort. Dans le schéma suivant, le bloc des groupes d'enregistrements apparaît sur la gauche (1). Compte tenu de la nature distribuée des données, il est logique de les partager par groupe de patients.

![Architecture pour le microsharding et les instances distribuées EC2](http://docs.aws.amazon.com/fr_fr/prescriptive-guidance/latest/patterns/images/pattern-img/e7a90b31-de8f-41fd-bb3f-c7c6100fc306/images/c19a8f87-ac59-458e-89cb-50be17ca4a0c.png)


Toutefois, le découpage manuel par groupe de patients signifie qu'une tâche de traitement distincte est requise pour chaque dossier de groupe de patients, comme vous pouvez le voir dans la partie centrale du diagramme (2), au lieu d'une tâche de traitement unique avec plusieurs EC2 instances. Étant donné que les données de MIMIC-III incluent à la fois des fichiers de formes d'onde binaires et des fichiers d'en-tête textuels correspondants, et que l'extraction de données binaires nécessite une dépendance à la [bibliothèque wfdb](https://wfdb.readthedocs.io/en/latest/), tous les dossiers d'un patient spécifique doivent être disponibles sur la même instance. La seule façon de s'assurer que le fichier d'en-tête associé à chaque fichier de forme d'onde binaire est également présent est d'implémenter le sharding manuel pour exécuter chaque partition dans le cadre de sa propre tâche de traitement, et de spécifier `s3_data_distribution_type='FullyReplicated'` quand vous définissez l'entrée de la tâche de traitement. Sinon, si toutes les données étaient disponibles dans un seul répertoire et qu'aucune dépendance n'existait entre les fichiers, une option plus appropriée pourrait être de lancer une seule tâche de traitement avec plusieurs EC2 instances `s3_data_distribution_type='ShardedByS3Key'` spécifiées. Spécifier `ShardedByS3Key ` comme le type de distribution de données Amazon S3 indique SageMaker de gérer automatiquement le partitionnement des données entre les instances. 

Le lancement d'une tâche de traitement pour chaque dossier est un moyen rentable de prétraiter les données, car l'exécution simultanée de plusieurs instances permet de gagner du temps. Pour économiser du temps et des coûts supplémentaires, vous pouvez utiliser le microsharding dans chaque tâche de traitement. 

*Microsharding et parallel v CPUs*

Au sein de chaque tâche de traitement, les données groupées sont ensuite divisées afin de maximiser l'utilisation de tous les v disponibles CPUs sur l' EC2 instance SageMaker entièrement gérée. Les blocs situés dans la partie centrale du diagramme (2) décrivent ce qui se passe dans le cadre de chaque tâche de traitement principale. Le contenu des dossiers des patients est aplati et divisé de manière égale en fonction du nombre de v disponibles CPUs sur l'instance. Une fois le contenu du dossier divisé, l'ensemble de fichiers de taille uniforme est réparti sur tous les v CPUs pour être traité. Une fois le traitement terminé, les résultats de chaque vCPU sont combinés dans un seul fichier de données pour chaque tâche de traitement. 

Dans le code joint, ces concepts sont représentés dans la section suivante du `src/feature-engineering-pass1/preprocessing.py` fichier.

```
def chunks(lst, n):
    """
    Yield successive n-sized chunks from lst.
    
    :param lst: list of elements to be divided
    :param n: number of elements per chunk
    :type lst: list
    :type n: int
    :return: generator comprising evenly sized chunks
    :rtype: class 'generator'
    """
    for i in range(0, len(lst), n):
        yield lst[i:i + n]
 
 
# Generate list of data files on machine
data_dir = input_dir
d_subs = next(os.walk(os.path.join(data_dir, '.')))[1]
file_list = []
for ds in d_subs:
    file_list.extend(os.listdir(os.path.join(data_dir, ds, '.')))
dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat']
 
# Split list of files into sub-lists
cpu_count = multiprocessing.cpu_count()
splits = int(len(dat_list) / cpu_count)
if splits == 0: splits = 1
dat_chunks = list(chunks(dat_list, splits))
 
# Parallelize processing of sub-lists across CPUs
ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks)
 
# Compile and pickle patient group dataframe
ws_df_group = pd.concat(ws_df_list)
ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'})
ws_df_group.to_json(os.path.join(output_dir, group_data_out))
```

Une fonction est d'abord définie pour consommer une liste donnée en la divisant en morceaux de taille égale `n ` et en renvoyant ces résultats sous forme de générateur. `chunks` Ensuite, les données sont aplaties dans les dossiers des patients en compilant une liste de tous les fichiers de formes d'onde binaires présents. Une fois cela fait, le nombre de v CPUs disponibles sur l' EC2 instance est obtenu. [La liste des fichiers de formes d'onde binaires est répartie uniformément sur ces v CPUs par appel`chunks`, puis chaque sous-liste de formes d'onde est traitée sur son propre vCPU en utilisant la classe Parallel de joblib.](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html) Les résultats sont automatiquement combinés dans une liste unique de dataframes par la tâche de traitement, qui poursuit SageMaker ensuite le traitement avant de les écrire dans Amazon S3 une fois la tâche terminée. Dans cet exemple, 10 fichiers ont été écrits sur Amazon S3 par les tâches de traitement (un pour chaque tâche).

Lorsque toutes les tâches de traitement initiales sont terminées, une tâche de traitement secondaire, illustrée dans le bloc à droite du diagramme (3), combine les fichiers de sortie produits par chaque tâche de traitement principale et écrit la sortie combinée sur Amazon S3 (4).

## Outils
<a name="use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets-tools"></a>

**Outils**
+ [Python](https://www.python.org/) — L'exemple de code utilisé pour ce modèle est Python (version 3).
+ [SageMaker Studio](https://docs.aws.amazon.com/sagemaker/latest/dg/studio.html) — Amazon SageMaker Studio est un environnement de développement intégré (IDE) basé sur le Web pour l'apprentissage automatique qui vous permet de créer, de former, de déboguer, de déployer et de surveiller vos modèles d'apprentissage automatique. Vous exécutez SageMaker des tâches de traitement en utilisant des blocs-notes Jupyter dans Studio. SageMaker 
+ [SageMaker Traitement](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html) — Amazon SageMaker Processing fournit un moyen simplifié d'exécuter vos charges de travail de traitement des données. Dans ce modèle, le code d'ingénierie des fonctionnalités est implémenté à grande échelle à l'aide de tâches SageMaker de traitement.

**Code**

Le fichier .zip joint fournit le code complet de ce modèle. La section suivante décrit les étapes à suivre pour créer l'architecture de ce modèle. Chaque étape est illustrée par un exemple de code extrait de la pièce jointe.

## Épopées
<a name="use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets-epics"></a>

### Configuration de votre environnement SageMaker Studio
<a name="set-up-your-sagemaker-studio-environment"></a>


| Sous-tâche | Description | Compétences requises | 
| --- | --- | --- | 
| Accédez à Amazon SageMaker Studio. | Accédez à SageMaker Studio depuis votre compte AWS en suivant les instructions fournies dans la [ SageMaker documentation Amazon](https://docs.aws.amazon.com/sagemaker/latest/dg/onboard-quick-start.html). | Scientifique des données, ingénieur ML | 
| Installez l'utilitaire wget. | Installez *wget* si vous avez intégré une nouvelle configuration de SageMaker Studio ou si vous n'avez jamais utilisé ces utilitaires dans SageMaker Studio auparavant. <br />Pour l'installer, ouvrez une fenêtre de terminal dans la console SageMaker Studio et exécutez la commande suivante :<pre>sudo yum install wget</pre> | Scientifique des données, ingénieur ML | 
| Téléchargez et décompressez l'exemple de code. | Téléchargez le `attachments.zip` fichier dans la section *Pièces jointes*. Dans une fenêtre de terminal, accédez au dossier dans lequel vous avez téléchargé le fichier et extrayez son contenu :<pre>unzip attachment.zip</pre><br />Accédez au dossier dans lequel vous avez extrait le fichier .zip et extrayez le contenu du `Scaled-Processing.zip` fichier.<pre>unzip Scaled-Processing.zip</pre> | Scientifique des données, ingénieur ML | 
| Téléchargez l'exemple de jeu de données sur physionet.org et chargez-le sur Amazon S3. | Exécutez le bloc-notes `get_data.ipynb` Jupyter dans le dossier contenant les `Scaled-Processing` fichiers. Ce bloc-notes télécharge un exemple de jeu de données MIMIC-III depuis [physionet.org](https://physionet.org) et le charge dans votre SageMaker compartiment de session Studio dans Amazon S3. | Scientifique des données, ingénieur ML | 

### Configuration du premier script de prétraitement
<a name="configure-the-first-preprocessing-script"></a>


| Sous-tâche | Description | Compétences requises | 
| --- | --- | --- | 
| Aplatissez la hiérarchie des fichiers dans tous les sous-répertoires. | Dans les grands ensembles de données tels que MIMIC-III, les fichiers sont souvent répartis dans plusieurs sous-répertoires, même au sein d'un groupe parent logique. Votre script doit être configuré pour aplatir tous les fichiers de groupe dans tous les sous-répertoires, comme le montre le code suivant.<pre># Generate list of .dat files on machine<br />data_dir = input_dir<br />d_subs = next(os.walk(os.path.join(data_dir, '.')))[1]<br />file_list = []<br />for ds in d_subs:<br />    file_list.extend(os.listdir(os.path.join(data_dir, ds, '.')))<br />dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat']</pre>    Les exemples d'extraits de code présentés dans cette épopée proviennent du `src/feature-engineering-pass1/preprocessing.py` fichier fourni en pièce jointe. | Scientifique des données, ingénieur ML | 
| Divisez les fichiers en sous-groupes en fonction du nombre de vCPU. | Les fichiers doivent être divisés en sous-groupes ou en morceaux de taille égale, en fonction du nombre de v CPUs présents sur l'instance qui exécute le script. Pour cette étape, vous pouvez implémenter un code similaire au suivant.<pre># Split list of files into sub-lists<br />cpu_count = multiprocessing.cpu_count()<br />splits = int(len(dat_list) / cpu_count)<br />if splits == 0: splits = 1<br />dat_chunks = list(chunks(dat_list, splits))</pre> | Scientifique des données, ingénieur ML | 
| Paralléliser le traitement des sous-groupes dans v. CPUs | La logique du script doit être configurée pour traiter tous les sous-groupes en parallèle. Pour ce faire, utilisez la `Parallel ` classe et la `delayed ` méthode de la bibliothèque Joblib comme suit. <pre># Parallelize processing of sub-lists across CPUs<br />ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks)</pre> | Scientifique des données, ingénieur ML | 
| Enregistrez la sortie d'un seul groupe de fichiers sur Amazon S3. | Lorsque le traitement parallèle des vCPU est terminé, les résultats de chaque vCPU doivent être combinés et téléchargés dans le chemin du compartiment S3 du groupe de fichiers. Pour cette étape, vous pouvez utiliser un code similaire au suivant.<pre># Compile and pickle patient group dataframe<br />ws_df_group = pd.concat(ws_df_list)<br />ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'})<br />ws_df_group.to_json(os.path.join(output_dir, group_data_out))</pre> | Scientifique des données, ingénieur ML | 

### Configuration du deuxième script de prétraitement
<a name="configure-the-second-preprocessing-script"></a>


| Sous-tâche | Description | Compétences requises | 
| --- | --- | --- | 
| Combinez les fichiers de données produits dans toutes les tâches de traitement qui ont exécuté le premier script. | Le script précédent génère un fichier unique pour chaque tâche de SageMaker traitement qui traite un groupe de fichiers de l'ensemble de données.  Ensuite, vous devez combiner ces fichiers de sortie en un seul objet et écrire un ensemble de données de sortie unique sur Amazon S3. Cela est démontré dans le `src/feature-engineering-pass1p5/preprocessing.py` fichier, qui est fourni en pièce jointe, comme suit.<pre>def write_parquet(wavs_df, path):<br />    """<br />    Write waveform summary dataframe to S3 in parquet format.<br />    <br />    :param wavs_df: waveform summary dataframe<br />    :param path: S3 directory prefix<br />    :type wavs_df: pandas dataframe<br />    :type path: str<br />    :return: None<br />    """<br />    extra_args = {"ServerSideEncryption": "aws:kms"}<br />    wr.s3.to_parquet(<br />        df=wavs_df,<br />        path=path,<br />        compression='snappy',<br />        s3_additional_kwargs=extra_args)<br /> <br /> <br />def combine_data():<br />    """<br />    Get combined data and write to parquet.<br />    <br />    :return: waveform summary dataframe<br />    :rtype: pandas dataframe<br />    """<br />    wavs_df = get_data()<br />    wavs_df = normalize_signal_names(wavs_df)<br />    write_parquet(wavs_df, "s3://{}/{}/{}".format(bucket_xform, dataset_prefix, pass1p5out_data))<br /> <br />    return wavs_df<br /> <br /> <br />wavs_df = combine_data()</pre> | Scientifique des données, ingénieur ML | 

### Exécuter des tâches de traitement
<a name="run-processing-jobs"></a>


| Sous-tâche | Description | Compétences requises | 
| --- | --- | --- | 
| Exécutez la première tâche de traitement. | Pour effectuer le macrosharding, exécutez une tâche de traitement distincte pour chaque groupe de fichiers. Le microsharding est effectué dans chaque tâche de traitement, car chaque tâche exécute votre premier script. Le code suivant montre comment lancer une tâche de traitement pour chaque répertoire de groupe de fichiers dans l'extrait suivant (inclus dans`notebooks/FeatExtract_Pass1.ipynb`).<pre>pat_groups = list(range(30,40))<br />ts = str(int(time.time()))<br /> <br />for group in pat_groups:<br />    sklearn_processor = SKLearnProcessor(framework_version='0.20.0',<br />                                     role=role,<br />                                     instance_type='ml.m5.4xlarge',<br />                                     instance_count=1,<br />                                     volume_size_in_gb=5)<br />    sklearn_processor.run(<br />        code='../src/feature-engineering-pass1/preprocessing.py',<br />        job_name='-'.join(['scaled-processing-p1', str(group), ts]),<br />        arguments=[<br />            "input_path", "/opt/ml/processing/input",<br />            "output_path", "/opt/ml/processing/output",<br />            "group_data_out", "ws_df_group.json"<br />        ],<br />        inputs=<br />        [<br />            ProcessingInput(<br />                source=f's3://{sess.default_bucket()}/data_inputs/{group}',<br />                destination='/opt/ml/processing/input',<br />                s3_data_distribution_type='FullyReplicated'<br />            )<br />        ],<br />        outputs=<br />        [<br />            ProcessingOutput(<br />                source='/opt/ml/processing/output',<br />                destination=f's3://{sess.default_bucket()}/data_outputs/{group}'<br />            )<br />        ],<br />        wait=False<br />    )</pre> | Scientifique des données, ingénieur ML | 
| Exécutez le deuxième travail de traitement. | Pour combiner les sorties générées par le premier ensemble de tâches de traitement et effectuer des calculs supplémentaires pour le prétraitement, vous devez exécuter votre deuxième script à l'aide d'une seule tâche de SageMaker traitement. Le code suivant illustre cela (inclus dans`notebooks/FeatExtract_Pass1p5.ipynb`).<pre>ts = str(int(time.time()))<br />bucket = sess.default_bucket()<br />     <br />sklearn_processor = SKLearnProcessor(framework_version='0.20.0',<br />                                 role=role,<br />                                 instance_type='ml.t3.2xlarge',<br />                                 instance_count=1,<br />                                 volume_size_in_gb=5)<br />sklearn_processor.run(<br />    code='../src/feature-engineering-pass1p5/preprocessing.py',<br />    job_name='-'.join(['scaled-processing', 'p1p5', ts]),<br />    arguments=['bucket', bucket,<br />               'pass1out_prefix', 'data_outputs',<br />               'pass1out_data', 'ws_df_group.json',<br />               'pass1p5out_data', 'waveform_summary.parquet',<br />               'statsdata_name', 'signal_stats.csv'],<br />    wait=True<br />)</pre> | Scientifique des données, ingénieur ML | 

## Ressources connexes
<a name="use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets-resources"></a>
+ [Intégration à Amazon SageMaker Studio à l'aide de Quick Start](https://docs.aws.amazon.com/sagemaker/latest/dg/onboard-quick-start.html) (SageMaker documentation)
+ [Données de processus](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html) (SageMaker documentation) 
+ [Traitement des données avec scikit-learn (documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/use-scikit-learn-processing-container.html)) SageMaker  
+ [Documentation JobLib.Parallel](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)
+ Moody, B., Moody, G., Villarroel, M., Clifford, G.D., & Silva, I. (2020). [Base de données de formes d'onde MIMIC-III](https://doi.org/10.13026/c2607m) (version 1.0). *PhysioNet*.
+ Johnson, A.E.W., Pollard, T.J., Shen, L., Lehman, L.H., Feng, M., Ghassemi, M., Moody, B., Szolovits, P., Celi, L.A., et Mark, R.G. (2016). [MIMIC-III, une base de données sur les soins intensifs accessible gratuitement](https://dx.doi.org/10.1038/sdata.2016.35). Données scientifiques, 3, 160035.
+ [Licence de base de données de formes d'onde MIMIC-III](https://physionet.org/content/mimic3wdb/1.0/LICENSE.txt)

## Pièces jointes
<a name="attachments-e7a90b31-de8f-41fd-bb3f-c7c6100fc306"></a>

[Pour accéder au contenu supplémentaire associé à ce document, décompressez le fichier suivant : attachment.zip](samples/p-attach/e7a90b31-de8f-41fd-bb3f-c7c6100fc306/attachments/attachment.zip)