Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Importer des fichiers et des bibliothèques Python dans Athena pour Spark
Ce document fournit des exemples sur la manière d'importer des fichiers et des bibliothèques Python vers Amazon Athena pour Apache Spark.
Considérations et restrictions
-
Version Python : Athena pour Spark utilise actuellement la version 3.9.16 de Python. Notez que les packages Python sont sensibles aux versions mineures de Python.
-
Athena pour l'architecture Spark — Athena pour Spark utilise Amazon Linux 2 pour l'architecture. ARM64 Notez que certaines bibliothèques Python ne distribuent pas de fichiers binaires pour cette architecture.
-
Objets partagés binaires (SOs) : comme la SparkContext addPyFile
méthode ne détecte pas les objets partagés binaires, elle ne peut pas être utilisée dans Athena for Spark pour ajouter des packages Python qui dépendent d'objets partagés. -
Ensembles de données distribués résilients (RDDs) : ne RDDs
sont pas pris en charge. -
DataFrame.forEach — La méthode .foreach n'est pas prise en PySpark DataFramecharge.
Exemples
Les exemples utilisent les conventions suivantes.
-
Emplacement Amazon S3 réservé
s3://amzn-s3-demo-bucket. Remplacez-le par l'emplacement de votre propre compartiment S3. -
Tous les blocs de code qui s'exécutent à partir d'un shell Unix sont représentés sous
directory_name$la forme. Par exemple, la commandelsdans le répertoire/tmpet sa sortie s'affichent comme suit :/tmp $ lsSortie
file1 file2
Importation de fichiers texte à utiliser dans les calculs
Les exemples de cette rubrique montrent comment importer des fichiers texte pour les utiliser dans les calculs de vos carnets dans Athena pour Spark.
L'exemple suivant montre comment écrire un fichier dans un répertoire temporaire local, l'ajouter à un bloc-notes et le tester.
import os from pyspark import SparkFiles tempdir = '/tmp/' path = os.path.join(tempdir, "test.txt") with open(path, "w") as testFile: _ = testFile.write("5") sc.addFile(path) def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()
Sortie
Calculation completed.
+---+---+-------+
| _1| _2| col|
+---+---+-------+
| 1| a|[aaaaa]|
| 2| b|[bbbbb]|
+---+---+-------+L'exemple suivant montre comment importer un fichier à partir d'Amazon S3 dans un bloc-notes et le tester.
Importation d'un fichier à partir d'Amazon S3 dans un bloc-notes
-
Créez un fichier
test.txtdont le nom comporte une seule ligne contenant la valeur5. -
Ajoutez le fichier à un compartiment dans Amazon S3. Cet exemple utilise l'emplacement
s3://amzn-s3-demo-bucket. -
Utilisez le code suivant pour importer le fichier dans votre bloc-notes et le tester.
from pyspark import SparkFiles sc.addFile('s3://amzn-s3-demo-bucket/test.txt') def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()Sortie
Calculation completed. +---+---+-------+ | _1| _2| col| +---+---+-------+ | 1| a|[aaaaa]| | 2| b|[bbbbb]| +---+---+-------+
Ajouter des fichiers Python
Les exemples de cette rubrique montrent comment ajouter des fichiers et des bibliothèques Python à vos blocs-notes Spark dans Athena.
L'exemple suivant montre comment ajouter des fichiers Python à partir d'Amazon S3 à votre bloc-notes et enregistrer un UDF.
Ajout de fichiers Python à votre bloc-notes et enregistrement d'un UDF
-
En utilisant votre propre emplacement Amazon S3, créez le fichier
s3://amzn-s3-demo-bucket/file1.pyavec le contenu suivant :def xyz(input): return 'xyz - udf ' + str(input); -
Dans le même emplacement S3, créez le fichier
s3://amzn-s3-demo-bucket/file2.pyavec le contenu suivant :from file1 import xyz def uvw(input): return 'uvw -> ' + xyz(input); -
Dans votre bloc-notes Athena pour Spark, exécutez les commandes suivantes.
sc.addPyFile('s3://amzn-s3-demo-bucket/file1.py') sc.addPyFile('s3://amzn-s3-demo-bucket/file2.py') def func(iterator): from file2 import uvw return [uvw(x) for x in iterator] from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show(10)Sortie
Calculation started (calculation_id=1ec09e01-3dec-a096-00ea-57289cdb8ce7) in (session=c8c09e00-6f20-41e5-98bd-4024913d6cee). Checking calculation status... Calculation completed. +---+---+--------------------+ | _1| _2| col| +---+---+--------------------+ | 1 | a|[uvw -> xyz - ud... | | 2 | b|[uvw -> xyz - ud... | +---+---+--------------------+
Vous pouvez utiliser Python addPyFile et ses méthodes import pour importer un fichier .zip Python dans votre bloc-notes.
Note
Les fichiers .zip que vous importez dans Athena Spark peuvent inclure uniquement des packages Python. Par exemple, l'inclusion de packages contenant des fichiers basés sur le langage C n'est pas prise en charge.
Pour importer un fichier .zip Python dans votre bloc-notes
-
Sur votre ordinateur local, dans un répertoire de bureau tel que
\tmp, créez un répertoire appelémoduletest. -
Dans le répertoire
moduletest, créez un fichier nomméhello.pyavec les contenus suivants :def hi(input): return 'hi ' + str(input); -
Dans le même répertoire, ajoutez un fichier vide portant le nom
__init__.py.Si vous listez le contenu du répertoire, il devrait maintenant ressembler à ce qui suit.
/tmp $ ls moduletest __init__.py hello.py -
Utilisez la commande
zippour placer les deux fichiers du module dans un fichier appelémoduletest.zip.moduletest $ zip -r9 ../moduletest.zip * -
Chargez les fichiers
.zipdans votre compartiment Amazon S3. -
Utilisez le code suivant pour importer le fichier
.zipPython dans votre bloc-notes.sc.addPyFile('s3://amzn-s3-demo-bucket/moduletest.zip') from moduletest.hello import hi from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(hi) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", hi_udf(col('_2'))).show()Sortie
Calculation started (calculation_id=6ec09e8c-6fe0-4547-5f1b-6b01adb2242c) in (session=dcc09e8c-3f80-9cdc-bfc5-7effa1686b76). Checking calculation status... Calculation completed. +---+---+----+ | _1| _2| col| +---+---+----+ | 1| a|hi a| | 2| b|hi b| +---+---+----+
Les exemples de code suivants montrent comment ajouter et importer deux versions différentes d'une bibliothèque Python à partir d'un emplacement dans Amazon S3 en tant que deux modules distincts. Le code ajoute chaque fichier de la bibliothèque à partir de S3, l'importe, puis imprime la version de la bibliothèque pour vérifier l'importation.
sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_15.zip') sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_17_6.zip') import simplejson_v3_15 print(simplejson_v3_15.__version__)
Sortie
3.15.0import simplejson_v3_17_6 print(simplejson_v3_17_6.__version__)
Sortie
3.17.6Cet exemple utilise la commande pip pour télécharger un fichier .zip Python du projet bpabel/piglatin
Importer un fichier .zip Python à partir de PyPI
-
Sur votre bureau local, utilisez les commandes suivantes pour créer un répertoire appelé
testpiglatinet un environnement virtuel./tmp $ mkdir testpiglatin /tmp $ cd testpiglatin testpiglatin $ virtualenv .Sortie
created virtual environment CPython3.9.6.final.0-64 in 410ms creator CPython3Posix(dest=/private/tmp/testpiglatin, clear=False, no_vcs_ignore=False, global=False) seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/Users/user1/Library/Application Support/virtualenv) added seed packages: pip==22.0.4, setuptools==62.1.0, wheel==0.37.1 activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator -
Créez un sous-répertoire appelé
unpackedpour contenir le projet.testpiglatin $ mkdir unpacked -
Utilisez la commande
pippour installer le projet dans le répertoireunpacked.testpiglatin $ bin/pip install -t $PWD/unpacked piglatinSortie
Collecting piglatin Using cached piglatin-1.0.6-py2.py3-none-any.whl (3.1 kB) Installing collected packages: piglatin Successfully installed piglatin-1.0.6 -
Vérifiez le contenu du répertoire.
testpiglatin $ lsSortie
bin lib pyvenv.cfg unpacked -
Accédez au répertoire
unpackedet affichez le contenu.testpiglatin $ cd unpacked unpacked $ lsSortie
piglatin piglatin-1.0.6.dist-info -
Utilisez la commande
zippour placer le contenu du projet piglatin dans un fichier appelélibrary.zip.unpacked $ zip -r9 ../library.zip *Sortie
adding: piglatin/ (stored 0%) adding: piglatin/__init__.py (deflated 56%) adding: piglatin/__pycache__/ (stored 0%) adding: piglatin/__pycache__/__init__.cpython-39.pyc (deflated 31%) adding: piglatin-1.0.6.dist-info/ (stored 0%) adding: piglatin-1.0.6.dist-info/RECORD (deflated 39%) adding: piglatin-1.0.6.dist-info/LICENSE (deflated 41%) adding: piglatin-1.0.6.dist-info/WHEEL (deflated 15%) adding: piglatin-1.0.6.dist-info/REQUESTED (stored 0%) adding: piglatin-1.0.6.dist-info/INSTALLER (stored 0%) adding: piglatin-1.0.6.dist-info/METADATA (deflated 48%) -
(Facultatif) Utilisez les commandes suivantes pour tester l'importation localement.
-
Définissez le chemin Python vers l'emplacement du fichier
library.zipet démarrez Python./home $ PYTHONPATH=/tmp/testpiglatin/library.zip /home $ python3Sortie
Python 3.9.6 (default, Jun 29 2021, 06:20:32) [Clang 12.0.0 (clang-1200.0.32.29)] on darwin Type "help", "copyright", "credits" or "license" for more information. -
Importez la bibliothèque et exécutez une commande de test.
>>> import piglatin >>> piglatin.translate('hello')Sortie
'ello-hay'
-
-
Utilisez des commandes telles que les suivantes pour ajouter le fichier
.zipà partir d'Amazon S3, l'importer dans votre bloc-notes dans Athena et le tester.sc.addPyFile('s3://amzn-s3-demo-bucket/library.zip') import piglatin piglatin.translate('hello') from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(piglatin.translate) df = spark.createDataFrame([(1, "hello"), (2, "world")]) df.withColumn("col", hi_udf(col('_2'))).show()Sortie
Calculation started (calculation_id=e2c0a06e-f45d-d96d-9b8c-ff6a58b2a525) in (session=82c0a06d-d60e-8c66-5d12-23bcd55a6457). Checking calculation status... Calculation completed. +---+-----+--------+ | _1| _2| col| +---+-----+--------+ | 1|hello|ello-hay| | 2|world|orld-way| +---+-----+--------+
Cet exemple importe le package md2gemini
cjkwrap mistune wcwidth
Importation d'un fichier .zip Python comportant des dépendances
-
Sur votre ordinateur local, utilisez les commandes suivantes pour créer un répertoire appelé
testmd2geminiet un environnement virtuel./tmp $ mkdir testmd2gemini /tmp $ cd testmd2gemini testmd2gemini$ virtualenv . -
Créez un sous-répertoire appelé
unpackedpour contenir le projet.testmd2gemini $ mkdir unpacked -
Utilisez la commande
pippour installer le projet dans le répertoireunpacked./testmd2gemini $ bin/pip install -t $PWD/unpacked md2geminiSortie
Collecting md2gemini Downloading md2gemini-1.9.0-py3-none-any.whl (31 kB) Collecting wcwidth Downloading wcwidth-0.2.5-py2.py3-none-any.whl (30 kB) Collecting mistune<3,>=2.0.0 Downloading mistune-2.0.2-py2.py3-none-any.whl (24 kB) Collecting cjkwrap Downloading CJKwrap-2.2-py2.py3-none-any.whl (4.3 kB) Installing collected packages: wcwidth, mistune, cjkwrap, md2gemini Successfully installed cjkwrap-2.2 md2gemini-1.9.0 mistune-2.0.2 wcwidth-0.2.5 ... -
Accédez au répertoire
unpackedet vérifiez le contenu.testmd2gemini $ cd unpacked unpacked $ ls -lahSortie
total 16 drwxr-xr-x 13 user1 wheel 416B Jun 7 18:43 . drwxr-xr-x 8 user1 wheel 256B Jun 7 18:44 .. drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 CJKwrap-2.2.dist-info drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 __pycache__ drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 bin -rw-r--r-- 1 user1 staff 5.0K Jun 7 18:43 cjkwrap.py drwxr-xr-x 7 user1 staff 224B Jun 7 18:43 md2gemini drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 md2gemini-1.9.0.dist-info drwxr-xr-x 12 user1 staff 384B Jun 7 18:43 mistune drwxr-xr-x 8 user1 staff 256B Jun 7 18:43 mistune-2.0.2.dist-info drwxr-xr-x 16 user1 staff 512B Jun 7 18:43 tests drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 wcwidth drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 wcwidth-0.2.5.dist-info -
Utilisez la commande
zippour placer le contenu du projet md2gemini dans un fichier appelémd2gemini.zip.unpacked $ zip -r9 ../md2gemini *Sortie
adding: CJKwrap-2.2.dist-info/ (stored 0%) adding: CJKwrap-2.2.dist-info/RECORD (deflated 37%) .... adding: wcwidth-0.2.5.dist-info/INSTALLER (stored 0%) adding: wcwidth-0.2.5.dist-info/METADATA (deflated 62%) -
(Facultatif) Utilisez les commandes suivantes pour vérifier que la bibliothèque fonctionne sur votre ordinateur local.
-
Définissez le chemin Python vers l'emplacement du fichier
md2gemini.zipet démarrez Python./home $ PYTHONPATH=/tmp/testmd2gemini/md2gemini.zip /home python3 -
Importez la bibliothèque et exécutez un test.
>>> from md2gemini import md2gemini >>> print(md2gemini('[abc](https://abc.def)'))Sortie
https://abc.def abc
-
-
Utilisez les commandes suivantes pour ajouter le fichier
.zipà partir d'Amazon S3, l'importer dans votre bloc-notes dans Athena et effectuer un test non UDF.# (non udf test) sc.addPyFile('s3://amzn-s3-demo-bucket/md2gemini.zip') from md2gemini import md2gemini print(md2gemini('[abc](https://abc.def)'))Sortie
Calculation started (calculation_id=0ac0a082-6c3f-5a8f-eb6e-f8e9a5f9bc44) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. => https://abc.def (https://abc.def/) abc -
Utilisez les commandes suivantes pour effectuer un test UDF.
# (udf test) from pyspark.sql.functions import udf from pyspark.sql.functions import col from md2gemini import md2gemini hi_udf = udf(md2gemini) df = spark.createDataFrame([(1, "[first website](https://abc.def)"), (2, "[second website](https://aws.com)")]) df.withColumn("col", hi_udf(col('_2'))).show()Sortie
Calculation started (calculation_id=60c0a082-f04d-41c1-a10d-d5d365ef5157) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. +---+--------------------+--------------------+ | _1| _2| col| +---+--------------------+--------------------+ | 1|[first website](h...|=> https://abc.de...| | 2|[second website](...|=> https://aws.co...| +---+--------------------+--------------------+