Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Impor file dan pustaka Python ke Athena untuk Spark
Dokumen ini memberikan contoh cara mengimpor file dan pustaka Python ke Amazon Athena untuk Apache Spark.
Pertimbangan dan batasan
-
Versi Python - Saat ini, Athena untuk Spark menggunakan Python versi 3.9.16. Perhatikan bahwa paket Python sensitif terhadap versi Python minor.
-
Athena untuk arsitektur Spark — Athena untuk Spark menggunakan Amazon Linux 2 pada arsitektur. ARM64 Perhatikan bahwa beberapa pustaka Python tidak mendistribusikan binari untuk arsitektur ini.
-
Objek bersama biner (SOs) — Karena SparkContext addPyFile
metode ini tidak mendeteksi objek bersama biner, metode ini tidak dapat digunakan di Athena untuk Spark untuk menambahkan paket Python yang bergantung pada objek bersama. -
Kumpulan Data Terdistribusi Tangguh (RDDs) — RDDs
tidak didukung. -
DataFrame.forEach — Metode PySpark DataFrame.foreach
tidak didukung.
Contoh
Contoh menggunakan konvensi berikut.
-
Lokasi placeholder Amazon S3.
s3://amzn-s3-demo-bucket
Ganti ini dengan lokasi bucket S3 Anda sendiri. -
Semua blok kode yang mengeksekusi dari shell Unix ditampilkan sebagai
directory_name
$
. Misalnya, perintahls
dalam direktori/tmp
dan outputnya ditampilkan sebagai berikut:/tmp $ ls
Keluaran
file1 file2
Impor file teks untuk digunakan dalam perhitungan
Contoh di bagian ini menunjukkan cara mengimpor file teks untuk digunakan dalam perhitungan di buku catatan Anda di Athena untuk Spark.
Contoh berikut menunjukkan cara menulis file ke direktori sementara lokal, menambahkannya ke buku catatan, dan mengujinya.
import os from pyspark import SparkFiles tempdir = '/tmp/' path = os.path.join(tempdir, "test.txt") with open(path, "w") as testFile: _ = testFile.write("5") sc.addFile(path) def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()
Keluaran
Calculation completed.
+---+---+-------+
| _1| _2| col|
+---+---+-------+
| 1| a|[aaaaa]|
| 2| b|[bbbbb]|
+---+---+-------+
Contoh berikut menunjukkan cara mengimpor file dari Amazon S3 ke notebook dan mengujinya.
Untuk mengimpor file dari Amazon S3 ke notebook
-
Buat file bernama
test.txt
yang memiliki satu baris yang berisi nilai5
. -
Tambahkan file ke ember di Amazon S3. Contoh ini menggunakan lokasi
s3://amzn-s3-demo-bucket
. -
Gunakan kode berikut untuk mengimpor file ke buku catatan Anda dan menguji file tersebut.
from pyspark import SparkFiles sc.addFile('s3://amzn-s3-demo-bucket/test.txt') def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()
Keluaran
Calculation completed. +---+---+-------+ | _1| _2| col| +---+---+-------+ | 1| a|[aaaaa]| | 2| b|[bbbbb]| +---+---+-------+
Tambahkan file Python
Contoh di bagian ini menunjukkan cara menambahkan file dan pustaka Python ke buku catatan Spark Anda di Athena.
Contoh berikut menunjukkan cara menambahkan file Python dari Amazon S3 ke notebook Anda dan mendaftarkan UDF.
Untuk menambahkan file Python ke buku catatan Anda dan mendaftarkan UDF
-
Menggunakan lokasi Amazon S3 Anda sendiri, buat file
s3://amzn-s3-demo-bucket/file1.py
dengan konten berikut:def xyz(input): return 'xyz - udf ' + str(input);
-
Di lokasi S3 yang sama, buat file
s3://amzn-s3-demo-bucket/file2.py
dengan konten berikut:from file1 import xyz def uvw(input): return 'uvw -> ' + xyz(input);
-
Di notebook Athena for Spark Anda, jalankan perintah berikut.
sc.addPyFile('s3://amzn-s3-demo-bucket/file1.py') sc.addPyFile('s3://amzn-s3-demo-bucket/file2.py') def func(iterator): from file2 import uvw return [uvw(x) for x in iterator] from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show(10)
Keluaran
Calculation started (calculation_id=1ec09e01-3dec-a096-00ea-57289cdb8ce7) in (session=c8c09e00-6f20-41e5-98bd-4024913d6cee). Checking calculation status... Calculation completed. +---+---+--------------------+ | _1| _2| col| +---+---+--------------------+ | 1 | a|[uvw -> xyz - ud... | | 2 | b|[uvw -> xyz - ud... | +---+---+--------------------+
Anda dapat menggunakan Python addPyFile
dan import
metode untuk mengimpor file.zip Python ke notebook Anda.
catatan
.zip
File yang Anda impor ke Athena Spark mungkin hanya menyertakan paket Python. Misalnya, termasuk paket dengan file berbasis C tidak didukung.
Untuk mengimpor .zip
file Python ke buku catatan
-
Di komputer lokal Anda, di direktori desktop seperti
\tmp
, buat direktori yang disebutmoduletest
. -
Di
moduletest
direktori, buat file bernamahello.py
dengan konten berikut:def hi(input): return 'hi ' + str(input);
-
Di direktori yang sama, tambahkan file kosong dengan nama
__init__.py
.Jika Anda mencantumkan isi direktori, mereka sekarang akan terlihat seperti berikut ini.
/tmp $ ls moduletest __init__.py hello.py
-
Gunakan
zip
perintah untuk menempatkan dua file modul ke dalam file bernamamoduletest.zip
.moduletest $ zip -r9 ../moduletest.zip *
-
Unggah
.zip
file ke bucket Anda di Amazon S3. -
Gunakan kode berikut untuk mengimpor
.zip
file Python ke buku catatan Anda.sc.addPyFile('s3://amzn-s3-demo-bucket/moduletest.zip') from moduletest.hello import hi from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(hi) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", hi_udf(col('_2'))).show()
Keluaran
Calculation started (calculation_id=6ec09e8c-6fe0-4547-5f1b-6b01adb2242c) in (session=dcc09e8c-3f80-9cdc-bfc5-7effa1686b76). Checking calculation status... Calculation completed. +---+---+----+ | _1| _2| col| +---+---+----+ | 1| a|hi a| | 2| b|hi b| +---+---+----+
Contoh kode berikut menunjukkan cara menambahkan dan mengimpor dua versi pustaka Python yang berbeda dari lokasi di Amazon S3 sebagai dua modul terpisah. Kode menambahkan setiap file pustaka dari S3, mengimpornya, dan kemudian mencetak versi pustaka untuk memverifikasi impor.
sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_15.zip') sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_17_6.zip') import simplejson_v3_15 print(simplejson_v3_15.__version__)
Keluaran
3.15.0
import simplejson_v3_17_6 print(simplejson_v3_17_6.__version__)
Keluaran
3.17.6
Contoh ini menggunakan pip
perintah untuk men-download file.zip Python dari proyek bpabel/piglatin
Untuk mengimpor file.zip Python dari PyPI
-
Di desktop lokal Anda, gunakan perintah berikut untuk membuat direktori yang disebut
testpiglatin
dan membuat lingkungan virtual./tmp $ mkdir testpiglatin /tmp $ cd testpiglatin testpiglatin $ virtualenv .
Keluaran
created virtual environment CPython3.9.6.final.0-64 in 410ms creator CPython3Posix(dest=/private/tmp/testpiglatin, clear=False, no_vcs_ignore=False, global=False) seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/Users/user1/Library/Application Support/virtualenv) added seed packages: pip==22.0.4, setuptools==62.1.0, wheel==0.37.1 activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator
-
Buat subdirektori bernama
unpacked
untuk menampung proyek.testpiglatin $ mkdir unpacked
-
Gunakan
pip
perintah untuk menginstal proyek keunpacked
direktori.testpiglatin $ bin/pip install -t $PWD/unpacked piglatin
Keluaran
Collecting piglatin Using cached piglatin-1.0.6-py2.py3-none-any.whl (3.1 kB) Installing collected packages: piglatin Successfully installed piglatin-1.0.6
-
Periksa isi direktori.
testpiglatin $ ls
Keluaran
bin lib pyvenv.cfg unpacked
-
Ubah ke
unpacked
direktori dan tampilkan isinya.testpiglatin $ cd unpacked unpacked $ ls
Keluaran
piglatin piglatin-1.0.6.dist-info
-
Gunakan
zip
perintah untuk menempatkan isi proyek piglatin ke dalam file bernamalibrary.zip
.unpacked $ zip -r9 ../library.zip *
Keluaran
adding: piglatin/ (stored 0%) adding: piglatin/__init__.py (deflated 56%) adding: piglatin/__pycache__/ (stored 0%) adding: piglatin/__pycache__/__init__.cpython-39.pyc (deflated 31%) adding: piglatin-1.0.6.dist-info/ (stored 0%) adding: piglatin-1.0.6.dist-info/RECORD (deflated 39%) adding: piglatin-1.0.6.dist-info/LICENSE (deflated 41%) adding: piglatin-1.0.6.dist-info/WHEEL (deflated 15%) adding: piglatin-1.0.6.dist-info/REQUESTED (stored 0%) adding: piglatin-1.0.6.dist-info/INSTALLER (stored 0%) adding: piglatin-1.0.6.dist-info/METADATA (deflated 48%)
-
(Opsional) Gunakan perintah berikut untuk menguji impor secara lokal.
-
Atur jalur Python ke lokasi
library.zip
file dan mulai Python./home $ PYTHONPATH=/tmp/testpiglatin/library.zip /home $ python3
Keluaran
Python 3.9.6 (default, Jun 29 2021, 06:20:32) [Clang 12.0.0 (clang-1200.0.32.29)] on darwin Type "help", "copyright", "credits" or "license" for more information.
-
Impor pustaka dan jalankan perintah uji.
>>> import piglatin >>> piglatin.translate('hello')
Keluaran
'ello-hay'
-
-
Gunakan perintah seperti berikut ini untuk menambahkan
.zip
file dari Amazon S3, mengimpornya ke notebook Anda di Athena, dan mengujinya.sc.addPyFile('s3://amzn-s3-demo-bucket/library.zip') import piglatin piglatin.translate('hello') from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(piglatin.translate) df = spark.createDataFrame([(1, "hello"), (2, "world")]) df.withColumn("col", hi_udf(col('_2'))).show()
Keluaran
Calculation started (calculation_id=e2c0a06e-f45d-d96d-9b8c-ff6a58b2a525) in (session=82c0a06d-d60e-8c66-5d12-23bcd55a6457). Checking calculation status... Calculation completed. +---+-----+--------+ | _1| _2| col| +---+-----+--------+ | 1|hello|ello-hay| | 2|world|orld-way| +---+-----+--------+
Contoh ini mengimpor paket md2gemini
cjkwrap mistune wcwidth
Untuk mengimpor file.zip Python yang memiliki dependensi
-
Di komputer lokal Anda, gunakan perintah berikut untuk membuat direktori yang disebut
testmd2gemini
dan membuat lingkungan virtual./tmp $ mkdir testmd2gemini /tmp $ cd testmd2gemini testmd2gemini$ virtualenv .
-
Buat subdirektori bernama
unpacked
untuk menampung proyek.testmd2gemini $ mkdir unpacked
-
Gunakan
pip
perintah untuk menginstal proyek keunpacked
direktori./testmd2gemini $ bin/pip install -t $PWD/unpacked md2gemini
Keluaran
Collecting md2gemini Downloading md2gemini-1.9.0-py3-none-any.whl (31 kB) Collecting wcwidth Downloading wcwidth-0.2.5-py2.py3-none-any.whl (30 kB) Collecting mistune<3,>=2.0.0 Downloading mistune-2.0.2-py2.py3-none-any.whl (24 kB) Collecting cjkwrap Downloading CJKwrap-2.2-py2.py3-none-any.whl (4.3 kB) Installing collected packages: wcwidth, mistune, cjkwrap, md2gemini Successfully installed cjkwrap-2.2 md2gemini-1.9.0 mistune-2.0.2 wcwidth-0.2.5 ...
-
Ubah ke
unpacked
direktori dan periksa isinya.testmd2gemini $ cd unpacked unpacked $ ls -lah
Keluaran
total 16 drwxr-xr-x 13 user1 wheel 416B Jun 7 18:43 . drwxr-xr-x 8 user1 wheel 256B Jun 7 18:44 .. drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 CJKwrap-2.2.dist-info drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 __pycache__ drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 bin -rw-r--r-- 1 user1 staff 5.0K Jun 7 18:43 cjkwrap.py drwxr-xr-x 7 user1 staff 224B Jun 7 18:43 md2gemini drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 md2gemini-1.9.0.dist-info drwxr-xr-x 12 user1 staff 384B Jun 7 18:43 mistune drwxr-xr-x 8 user1 staff 256B Jun 7 18:43 mistune-2.0.2.dist-info drwxr-xr-x 16 user1 staff 512B Jun 7 18:43 tests drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 wcwidth drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 wcwidth-0.2.5.dist-info
-
Gunakan
zip
perintah untuk menempatkan isi proyek md2gemini ke dalam file bernama.md2gemini.zip
unpacked $ zip -r9 ../md2gemini *
Keluaran
adding: CJKwrap-2.2.dist-info/ (stored 0%) adding: CJKwrap-2.2.dist-info/RECORD (deflated 37%) .... adding: wcwidth-0.2.5.dist-info/INSTALLER (stored 0%) adding: wcwidth-0.2.5.dist-info/METADATA (deflated 62%)
-
(Opsional) Gunakan perintah berikut untuk menguji apakah perpustakaan berfungsi di komputer lokal Anda.
-
Atur jalur Python ke lokasi
md2gemini.zip
file dan mulai Python./home $ PYTHONPATH=/tmp/testmd2gemini/md2gemini.zip /home python3
-
Impor pustaka dan jalankan pengujian.
>>> from md2gemini import md2gemini >>> print(md2gemini('[abc](https://abc.def)'))
Keluaran
https://abc.def abc
-
-
Gunakan perintah berikut untuk menambahkan
.zip
file dari Amazon S3, mengimpornya ke notebook Anda di Athena, dan melakukan pengujian non UDF.# (non udf test) sc.addPyFile('s3://amzn-s3-demo-bucket/md2gemini.zip') from md2gemini import md2gemini print(md2gemini('[abc](https://abc.def)'))
Keluaran
Calculation started (calculation_id=0ac0a082-6c3f-5a8f-eb6e-f8e9a5f9bc44) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. => https://abc.def (https://abc.def/) abc
-
Gunakan perintah berikut untuk melakukan tes UDF.
# (udf test) from pyspark.sql.functions import udf from pyspark.sql.functions import col from md2gemini import md2gemini hi_udf = udf(md2gemini) df = spark.createDataFrame([(1, "[first website](https://abc.def)"), (2, "[second website](https://aws.com)")]) df.withColumn("col", hi_udf(col('_2'))).show()
Keluaran
Calculation started (calculation_id=60c0a082-f04d-41c1-a10d-d5d365ef5157) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. +---+--------------------+--------------------+ | _1| _2| col| +---+--------------------+--------------------+ | 1|[first website](h...|=> https://abc.de...| | 2|[second website](...|=> https://aws.co...| +---+--------------------+--------------------+