

# Hudi
<a name="emr-hudi"></a>

[Apache Hudi](https://hudi.apache.org/) is an open-source data management framework used to simplify incremental data processing and data pipeline development by providing record-level insert, update, upsert, and delete capabilities. *Upsert* refers to the ability to insert records into an existing dataset if they do not already exist or to update them if they do. By efficiently managing how data is laid out in Amazon S3, Hudi allows data to be ingested and updated in near real time. Hudi carefully maintains metadata of the actions performed on the dataset to help ensure that the actions are atomic and consistent.

Hudi is integrated with [Apache Spark](https://aws.amazon.com/emr/features/spark/), [Apache Hive](https://hive.apache.org/), and [Presto](https://prestodb.github.io). In Amazon EMR release versions 6.1.0 and later, Hudi is also integrated with [Trino (PrestoSQL)](https://trino.io/). 

With Amazon EMR release version 5.28.0 and later, EMR installs Hudi components by default when Spark, Hive, Presto, or Flink are installed. You can use Spark or the Hudi DeltaStreamer utility to create or update Hudi datasets. You can use Hive, Spark, Presto, or Flink to query a Hudi dataset interactively or build data processing pipelines using *incremental pull*. Incremental pull refers to the ability to pull only the data that changed between two actions.

These features make Hudi suitable for the following use cases:
+ Working with streaming data from sensors and other Internet of Things (IoT) devices that require specific data insertion and update events.
+ Complying with data privacy regulations in applications where users might choose to be forgotten or modify their consent for how their data can be used.
+ Implementing a [change data capture (CDC) system](https://en.wikipedia.org/wiki/Change_data_capture) that allows you to apply changes to a dataset over time.

The following table lists the version of Hudi included in the latest release of the Amazon EMR 7.x series, along with the components that Amazon EMR installs with Hudi.

For the version of components installed with Hudi in this release, see [Release 7.12.0 Component Versions](emr-7120-release.md).


**Hudi version information for emr-7.12.0**  

| Amazon EMR Release Label | Hudi Version | Components Installed With Hudi | 
| --- | --- | --- | 
| emr-7.12.0 | Hudi 1.0.2-amzn-1 | Not available. | 

The following table lists the version of Hudi included in the latest release of the Amazon EMR 6.x series, along with the components that Amazon EMR installs with Hudi.

For the version of components installed with Hudi in this release, see [Release 6.15.0 Component Versions](emr-6150-release.md).


**Hudi version information for emr-6.15.0**  

| Amazon EMR Release Label | Hudi Version | Components Installed With Hudi | 
| --- | --- | --- | 
| emr-6.15.0 | Hudi 0.14.0-amzn-0 | Not available. | 

**Note**  
Amazon EMR release 6.8.0 comes with [Apache Hudi](https://hudi.apache.org/) 0.11.1; however, Amazon EMR 6.8.0 clusters are also compatible with the open-source `hudi-spark3.3-bundle_2.12` from Hudi 0.12.0.

The following table lists the version of Hudi included in the latest release of the Amazon EMR 5.x series, along with the components that Amazon EMR installs with Hudi.

For the version of components installed with Hudi in this release, see [Release 5.36.2 Component Versions](emr-5362-release.md).


**Hudi version information for emr-5.36.2**  

| Amazon EMR Release Label | Hudi Version | Components Installed With Hudi | 
| --- | --- | --- | 
| emr-5.36.2 | Hudi 0.10.1-amzn-1 | Not available. | 

**Topics**
+ [How Hudi works](emr-hudi-how-it-works.md)
+ [Considerations and limitations for using Hudi on Amazon EMR](emr-hudi-considerations.md)
+ [Create a cluster with Hudi installed](emr-hudi-installation-and-configuration.md)
+ [Work with a Hudi dataset](emr-hudi-work-with-dataset.md)
+ [Use the Hudi CLI](emr-hudi-cli.md)
+ [Hudi release history](Hudi-release-history.md)

# How Hudi works
<a name="emr-hudi-how-it-works"></a>

When using Hudi with Amazon EMR, you can write data to the dataset using the Spark Data Source API or the Hudi DeltaStreamer utility. Hudi organizes a dataset into a partitioned directory structure under a `basepath` that is similar to a traditional Hive table. The specifics of how the data is laid out as files in these directories depend on the dataset type that you choose. You can choose either Copy on Write (CoW) or Merge on Read (MoR).

Regardless of the dataset type, each partition in a dataset is uniquely identified by its `partitionpath` relative to the `basepath`. Within each partition, records are distributed into multiple data files. For more information, see [File management](https://hudi.apache.org/docs/concepts.html#file-management) in the Apache Hudi documentation.

Each action in Hudi has a corresponding commit, identified by a monotonically increasing timestamp known as an *Instant*. Hudi keeps a series of all actions performed on the dataset as a timeline. Hudi relies on the timeline to provide snapshot isolation between readers and writers, and to enable roll back to a previous point in time. For more information about the actions that Hudi records and the state of actions, see [Timeline](https://hudi.apache.org/docs/concepts.html#timeline) in the Apache Hudi documentation.

## Understanding dataset storage types: Copy on write vs. merge on read
<a name="emr-hudi-data-files"></a>

When you create a Hudi dataset, you specify that the dataset is either copy on write or merge on read.
+ **Copy on Write (CoW)** – Data is stored in a columnar format (Parquet), and each update creates a new version of files during a write. CoW is the default storage type. 
+ **Merge on Read (MoR)** – Data is stored using a combination of columnar (Parquet) and row-based (Avro) formats. Updates are logged to row-based *delta* files and are compacted as needed to create new versions of the columnar files.

With CoW datasets, each time there is an update to a record, the file that contains the record is rewritten with the updated values. With a MoR dataset, each time there is an update, Hudi writes only the row for the changed record. MoR is better suited for write- or change-heavy workloads with fewer reads. CoW is better suited for read-heavy workloads on data that changes less frequently.

Hudi provides three logical views for accessing the data:
+ **Read-optimized view** – Provides the latest committed dataset from CoW tables and the latest compacted dataset from MoR tables.
+ **Incremental view** – Provides a change stream between two actions out of a CoW dataset to feed downstream jobs and extract, transform, load (ETL) workflows.
+ **Real-time view** – Provides the latest committed data from a MoR table by merging the columnar and row-based files inline.

When you query the read-optimized view, the query returns all compacted data but does not include the latest delta commits. Querying this data provides good read performance but omits the freshest data. When you query the real-time view, Hudi merges the compacted data with the delta commits on read. The freshest data is available to query, but the computational overhead of merging makes the query less performant. The ability to query either compacted data or real-time data allows you to choose between performance and flexibility when you query.

For more information about the tradeoffs between storage types, see [Storage types & views](https://hudi.apache.org/docs/concepts.html#storage-types--views) in Apache Hudi documentation.

Hudi creates two tables in the Hive metastore for MoR: a table with the name that you specified, which is a read-optimized view, and a table with the same name appended with `_rt`, which is a real-time view. You can query both tables.

## Registering a Hudi dataset with your metastore
<a name="emr-hudi-hive-metastore"></a>

When you register a Hudi table with the Hive metastore, you can query Hudi tables using Hive, Spark SQL or Presto as you would any other table. In addition, you can integrate Hudi with AWS Glue by configuring Hive and Spark to use the AWS Glue Data Catalog as the metastore. For MoR tables, Hudi registers the dataset as two tables in the Metastore: a table with the name that you specified, which is a read-optimized view, and a table with the same name appended with `_rt`, which is a real-time view.

You register a Hudi table with the Hive metastore when you use Spark to create a Hudi dataset by setting the `HIVE_SYNC_ENABLED_OPT_KEY` option to `"true"` and providing other required properties. For more information, see [Work with a Hudi dataset](emr-hudi-work-with-dataset.md). In addition, you can use the hive\$1sync\$1tool command line utility to register a Hudi dataset as a table in your metastore, separately. 

# Considerations and limitations for using Hudi on Amazon EMR
<a name="emr-hudi-considerations"></a>
+ **Record key field cannot be null or empty** – The field that you specify as the record key field cannot have `null` or empty values.
+ **Schema updated by default on upsert and insert** – Hudi provides an interface, `HoodieRecordPayload` that determines how the input DataFrame and existing Hudi dataset are merged to produce a new, updated dataset. Hudi provides a default implementation of this class, `OverwriteWithLatestAvroPayload`, that overwrites existing records and updates the schema as specified in the input DataFrame. To customize this logic for implementing merge and partial updates, you can provide an implementation of the `HoodieRecordPayload` interface using the `DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY` parameter.
+ **Deletion requires schema** – When deleting, you must specify the record key, the partition key, and the pre-combine key fields. Other columns can be made `null` or empty, but the full schema is required.
+ **MoR table limitations** – MoR tables do not support savepointing. You can query MoR tables using the read-optimized view or the real-time view (`tableName_rt`) from Spark SQL, Presto, or Hive. Using the read-optimized view only exposes base file data, and does not expose a merged view of base and log data.
+ **Hive**
  + For registering tables in the Hive metastore, Hudi expects the Hive Thrift server to be running at the default port `10000`. If you override this port with a custom port, pass the `HIVE_URL_OPT_KEY` option as shown in the following example.

    ```
    .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://localhost:override-port-number
    ```
  + The `timestamp` data type in Spark is registerd as `long` data type in Hive, and not as Hive's `timestamp` type.
+ **Presto**
  + Presto does not support reading MoR real time tables in Hudi versions below 0.6.0. 
  + Presto only supports snapshot queries.
  + For Presto to correctly interpret Hudi dataset columns, set the `hive.parquet_use_column_names` value to `true`.
    + To set the value for a session, in the Presto shell, run the following command:

      ```
      set session hive.parquet_use_column_names=true
      ```
    + To set the value at the cluster level, use the `presto-connector-hive` configuration classification to set `hive.parquet.use_column_names` to `true`, as shown in the following example. For more information, see [Configure applications](emr-configure-apps.md).

      ```
      [
        {
          "Classification": "presto-connector-hive",
          "Properties": {
            "hive.parquet.use-column-names": "true"
          }
        }
      ]
      ```
+ **HBase Index**
  + The HBase version used to *build* Hudi might be different from what is listed in the EMR Release Guide. To pull in the correct dependencies for your Spark session, run the following command.

    ```
    spark-shell \
    --jars /usr/lib/spark/external/lib/spark-avro.jar,/usr/lib/hudi/cli/lib/*.jar \
    --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
    --conf "spark.sql.hive.convertMetastoreParquet=false"
    ```
+ **Settings for best performance** – For EMR 7.3\$1/Hudi 0.15\$1, customers are recommended to set this config to reduce Kryo serialization overhead: 

  ```
  --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
  ```
**Note**  
If you are using fine-grained access controle (FGAC) on EMR Serverless, this configuration isn't needed, because users must use JavaSerializer rather than KryoSerializer.

# Create a cluster with Hudi installed
<a name="emr-hudi-installation-and-configuration"></a>

With Amazon EMR release version 5.28.0 and later, Amazon EMR installs Hudi components by default when Spark, Hive, or Presto is installed. To use Hudi on Amazon EMR, create a cluster with one or more of the following applications installed:
+ Hadoop
+ Hive
+ Spark
+ Presto
+ Flink

You can create a cluster using the AWS Management Console, the AWS CLI, or the Amazon EMR API.

## To create a cluster with Hudi using the AWS Management Console
<a name="emr-hudi-create-cluster-console"></a>

1. Navigate to the new Amazon EMR console and select **Switch to the old console** from the side navigation. For more information on what to expect when you switch to the old console, see [Using the old console](https://docs.aws.amazon.com/emr/latest/ManagementGuide/whats-new-in-console.html#console-opt-in).

1. Choose **Create cluster**, **Go to advanced options**.

1. Under Software Configuration, choose **emr-5.28.0** or later for **Release** and select **Hadoop**, **Hive**, **Spark**, **Presto**, and **Tez** along with other applications that your cluster requires.

1. Configure other options as required for your application, and then choose **Next**.

1. Configure options for **Hardware** and **General cluster settings** as desired.

1. For **Security Options**, we recommend that you select an **EC2 key pair** that you can use to connect to the master node command line using SSH. This allows you to run the Spark shell commands, Hive CLI commands, and Hudi CLI commands described in this guide.

1. Choose other security options as desired, and then choose **Create cluster**.

# Work with a Hudi dataset
<a name="emr-hudi-work-with-dataset"></a>

Hudi supports inserting, updating, and deleting data in Hudi datasets through Spark. For more information, see [Writing Hudi tables](https://hudi.apache.org/docs/writing_data.html) in Apache Hudi documentation.

The following examples demonstrate how to launch the interactive Spark shell, use Spark submit, or use Amazon EMR Notebooks to work with Hudi on Amazon EMR. You can also use the Hudi DeltaStreamer utility or other tools to write to a dataset. Throughout this section, the examples demonstrate working with datasets using the Spark shell while connected to the master node using SSH as the default `hadoop` user.

## Launch the Spark shell using Amazon EMR 6.7 and later
<a name="hudi-datasets-67"></a>

When running `spark-shell`, `spark-submit`, or `spark-sql` using Amazon EMR 6.7.0 or later, pass the following commands.

**Note**  
Amazon EMR 6.7.0 uses [Apache Hudi](https://hudi.apache.org/) 0.11.0-amzn-0, which contains significant improvements over previous Hudi versions. For more information, see the [Apache Hudi 0.11.0 Migration Guide](https://hudi.apache.org/releases/release-0.11.0/#migration-guide). The examples on this tab reflect these changes.

**To open the Spark shell on the primary node**

1. Connect to the primary node using SSH. For more information, see [Connect to the primary node using SSH](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html) in the *Amazon EMR Management Guide*.

1. Enter the following command to launch the Spark shell. To use the PySpark shell, replace *spark-shell* with *pyspark*.

   ```
   spark-shell --jars /usr/lib/hudi/hudi-spark-bundle.jar \
   --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \    
   --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog"  \
   --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
   ```

## Launch the Spark shell using Amazon EMR 6.6 and earlier
<a name="hudi-datasets-67"></a>

When running `spark-shell`, `spark-submit`, or `spark-sql` using Amazon EMR 6.6.x or earlier, pass the following commands.

**Note**  
Amazon EMR 6.2 and 5.31 and later (Hudi 0.6.x and later) can omit the `spark-avro.jar` from the configuration.
Amazon EMR 6.5 and 5.35 and later (Hudi 0.9.x and later) can omit `spark.sql.hive.convertMetastoreParquet=false` from the configuration.
Amazon EMR 6.6 and 5.36 and later (Hudi 0.10.x and later) must include the `HoodieSparkSessionExtension` config as described in the [Version: 0.10.0 Spark Guide](https://hudi.apache.org/docs/0.10.0/quick-start-guide/):   

  ```
  --conf  "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
  ```

**To open the Spark shell on the primary node**

1. Connect to the primary node using SSH. For more information, see [Connect to the primary node using SSH](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html) in the *Amazon EMR Management Guide*.

1. Enter the following command to launch the Spark shell. To use the PySpark shell, replace *spark-shell* with *pyspark*.

   ```
   spark-shell \
   --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
   --conf "spark.sql.hive.convertMetastoreParquet=false" \
   --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
   ```

## Use Hudi with Amazon EMR Notebooks using Amazon EMR 6.7 and later
<a name="hudi-datasets-notebooks"></a>

To use Hudi with Amazon EMR Notebooks, you must first copy the Hudi jar files from the local file system to HDFS on the master node of the notebook cluster. You then use the notebook editor to configure your EMR notebook to use Hudi.

**To use Hudi with Amazon EMR Notebooks**

1. Create and launch a cluster for Amazon EMR Notebooks. For more information, see [Creating Amazon EMR clusters for notebooks](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-managed-notebooks-cluster.html) in the *Amazon EMR Management Guide*.

1. Connect to the master node of the cluster using SSH and then copy the jar files from the local filesystem to HDFS as shown in the following examples. In the example, we create a directory in HDFS for clarity of file management. You can choose your own destination in HDFS, if desired.

   ```
   hdfs dfs -mkdir -p /apps/hudi/lib
   ```

   ```
   hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
   ```

1. Open the notebook editor, enter the code from the following example, and run it.

   ```
   %%configure
   { "conf": {
               "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar",
               "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
               "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
               "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
             }}
   ```

## Use Hudi with Amazon EMR Notebooks using Amazon EMR 6.6 and earlier
<a name="hudi-datasets-notebooks-66"></a>

To use Hudi with Amazon EMR Notebooks, you must first copy the Hudi jar files from the local file system to HDFS on the master node of the notebook cluster. You then use the notebook editor to configure your EMR notebook to use Hudi.

**To use Hudi with Amazon EMR Notebooks**

1. Create and launch a cluster for Amazon EMR Notebooks. For more information, see [Creating Amazon EMR clusters for notebooks](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-managed-notebooks-cluster.html) in the *Amazon EMR Management Guide*.

1. Connect to the master node of the cluster using SSH and then copy the jar files from the local filesystem to HDFS as shown in the following examples. In the example, we create a directory in HDFS for clarity of file management. You can choose your own destination in HDFS, if desired.

   ```
   hdfs dfs -mkdir -p /apps/hudi/lib
   ```

   ```
   hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
   ```

   ```
   hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
   ```

1. Open the notebook editor, enter the code from the following example, and run it.

   ```
   { "conf": {
               "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar",
               "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
               "spark.sql.hive.convertMetastoreParquet":"false"
             }}
   ```

## Initialize a Spark session for Hudi
<a name="emr-hudi-initialize-session"></a>

When you use Scala, you must import the following classes in your Spark session. This needs to be done once per Spark session.

```
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.HiveSyncConfig
import org.apache.hudi.sync.common.HoodieSyncConfig
```

## Write to a Hudi dataset
<a name="emr-hudi-dataframe"></a>

The following examples show how to create a DataFrame and write it as a Hudi dataset.

**Note**  
To paste code samples into the Spark shell, type **:paste** at the prompt, paste the example, and then press **CTRL** \$1 **D**.

Each time you write a DataFrame to a Hudi dataset, you must specify `DataSourceWriteOptions`. Many of these options are likely to be identical between write operations. The following example specifies common options using the `hudiOptions` variable, which subsequent examples use.

### Write using Scala with Amazon EMR 6.7 and later
<a name="scala-examples-67"></a>

**Note**  
Amazon EMR 6.7.0 uses [Apache Hudi](https://hudi.apache.org/) 0.11.0-amzn-0, which contains significant improvements over previous Hudi versions. For more information, see the [Apache Hudi 0.11.0 Migration Guide](https://hudi.apache.org/releases/release-0.11.0/#migration-guide). The examples on this tab reflect these changes.

```
// Create a DataFrame
val inputDF = Seq(
 ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
 ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
 ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
 ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
 ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
 ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z")
 ).toDF("id", "creation_date", "last_update_time")

//Specify common DataSourceWriteOptions in the single hudiOptions variable 
val hudiOptions = Map[String,String](
  HoodieWriteConfig.TBL_NAME.key -> "tableName",
  DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", 
  DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
  DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date",
  DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time",
  DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
  DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName",
  DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date",
  HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
  HoodieSyncConfig.META_SYNC_ENABLED.key -> "true",
  HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms",
  HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName",
  HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date"
)

// Write the DataFrame as a Hudi dataset
(inputDF.write
    .format("hudi")
    .options(hudiOptions)
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert")
    .mode(SaveMode.Overwrite)
    .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
```

### Write using Scala with Amazon EMR 6.6 and earlier
<a name="scala-examples-66"></a>

```
// Create a DataFrame
val inputDF = Seq(
 ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
 ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
 ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
 ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
 ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
 ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z")
 ).toDF("id", "creation_date", "last_update_time")

//Specify common DataSourceWriteOptions in the single hudiOptions variable 
val hudiOptions = Map[String,String](
  HoodieWriteConfig.TABLE_NAME -> "tableName",
  DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", 
  DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
  DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date",
  DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time",
  DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
  DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName",
  DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date",
  DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName
)

// Write the DataFrame as a Hudi dataset
(inputDF.write
    .format("org.apache.hudi")
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
    .options(hudiOptions)
    .mode(SaveMode.Overwrite)
    .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
```

### Write using PySpark
<a name="pyspark-examples"></a>

```
# Create a DataFrame
inputDF = spark.createDataFrame(
    [
        ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
        ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
        ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
        ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
        ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
        ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"),
    ],
    ["id", "creation_date", "last_update_time"]
)

# Specify common DataSourceWriteOptions in the single hudiOptions variable
hudiOptions = {
'hoodie.table.name': 'tableName',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.partitionpath.field': 'creation_date',
'hoodie.datasource.write.precombine.field': 'last_update_time',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.table': 'tableName',
'hoodie.datasource.hive_sync.partition_fields': 'creation_date',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
}

# Write a DataFrame as a Hudi dataset
inputDF.write \
.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'insert') \
.options(**hudiOptions) \
.mode('overwrite') \
.save('s3://amzn-s3-demo-bucket/myhudidataset/')
```

**Note**  
You might see "hoodie" instead of Hudi in code examples and notifications. The Hudi codebase widely uses the old "hoodie" spelling.


**DataSourceWriteOptions reference for Hudi**  

| Option | Description | 
| --- | --- | 
|  TABLE\$1NAME  |  The table name under which to register the dataset.  | 
|  TABLE\$1TYPE\$1OPT\$1KEY  |  Optional. Specifies whether the dataset is created as `"COPY_ON_WRITE"` or `"MERGE_ON_READ"`. The default is `"COPY_ON_WRITE"`.  | 
|  RECORDKEY\$1FIELD\$1OPT\$1KEY  |  The record key field whose value will be used as the `recordKey` component of `HoodieKey`. Actual value will be obtained by invoking `.toString()` on the field value. Nested fields can be specified using the dot notation, for example, `a.b.c`.   | 
|  PARTITIONPATH\$1FIELD\$1OPT\$1KEY  |  The partition path field whose value will be used as the `partitionPath` component of `HoodieKey`. The actual value will be obtained by invoking `.toString()` on the field value.  | 
|  PRECOMBINE\$1FIELD\$1OPT\$1KEY  |  The field used in pre-combining before actual write. When two records have the same key value, Hudi picks the one with the largest value for the precombine field as determined by `Object.compareTo(..)`.  | 

The following options are required only to register the Hudi dataset table in your metastore. If you do not register your Hudi dataset as a table in the Hive metastore, these options are not required.


**DataSourceWriteOptions reference for Hive**  

| Option | Description | 
| --- | --- | 
|  HIVE\$1DATABASE\$1OPT\$1KEY  |  The Hive database to sync to. The default is `"default"`.  | 
|  HIVE\$1PARTITION\$1EXTRACTOR\$1CLASS\$1OPT\$1KEY  |  The class used to extract partition field values into Hive partition columns.   | 
|  HIVE\$1PARTITION\$1FIELDS\$1OPT\$1KEY  |  The field in the dataset to use for determining Hive partition columns.  | 
|  HIVE\$1SYNC\$1ENABLED\$1OPT\$1KEY  |  When set to `"true"`, registers the dataset with the Apache Hive metastore. The default is `"false"`.  | 
|  HIVE\$1TABLE\$1OPT\$1KEY  |  Required. The name of the table in Hive to sync to. For example, `"my_hudi_table_cow"`.  | 
|  HIVE\$1USER\$1OPT\$1KEY  |  Optional. The Hive user name to use when syncing. For example, `"hadoop"`.  | 
|  HIVE\$1PASS\$1OPT\$1KEY  |  Optional. The Hive password for the user specified by `HIVE_USER_OPT_KEY`.  | 
|  HIVE\$1URL\$1OPT\$1KEY  |  The Hive metastore URL.  | 

## Upsert data
<a name="emr-hudi-upsert-to-datasets"></a>

The following example demonstrates how to upsert data by writing a DataFrame. Unlike the previous insert example, the `OPERATION_OPT_KEY` value is set to `UPSERT_OPERATION_OPT_VAL`. In addition, `.mode(SaveMode.Append)` is specified to indicate that the record should be appended.

### Upsert using Scala with Amazon EMR 6.7 and later
<a name="scala-upsert-67"></a>

**Note**  
Amazon EMR 6.7.0 uses [Apache Hudi](https://hudi.apache.org/) 0.11.0-amzn-0, which contains significant improvements over previous Hudi versions. For more information, see the [Apache Hudi 0.11.0 Migration Guide](https://hudi.apache.org/releases/release-0.11.0/#migration-guide). The examples on this tab reflect these changes.

```
// Create a new DataFrame from the first row of inputDF with a different creation_date value
val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value"))

(updateDF.write
    .format("hudi")
    .options(hudiOptions)
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert")
    .mode(SaveMode.Append)
    .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
```

### Upsert using Scala with Amazon EMR 6.6 and earlier
<a name="scala-upsert-66"></a>

```
// Create a new DataFrame from the first row of inputDF with a different creation_date value
val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value"))

(updateDF.write
    .format("org.apache.hudi")
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
    .options(hudiOptions)
    .mode(SaveMode.Append)
    .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
```

### Upsert using PySpark
<a name="pyspark-upsert"></a>

```
from pyspark.sql.functions import lit

# Create a new DataFrame from the first row of inputDF with a different creation_date value
updateDF = inputDF.limit(1).withColumn('creation_date', lit('new_value'))

updateDF.write \
    .format('org.apache.hudi') \
    .option('hoodie.datasource.write.operation', 'upsert') \
    .options(**hudiOptions) \
    .mode('append') \
    .save('s3://amzn-s3-demo-bucket/myhudidataset/')
```

## Delete a record
<a name="emr-hudi-delete-from-datasets"></a>

To hard delete a record, you can upsert an empty payload. In this case, the `PAYLOAD_CLASS_OPT_KEY` option specifies the `EmptyHoodieRecordPayload` class. The example uses the same DataFrame, `updateDF`, used in the upsert example to specify the same record.

### Delete using Scala with Amazon EMR 6.7 and later
<a name="scala-delete-67"></a>

**Note**  
Amazon EMR 6.7.0 uses [Apache Hudi](https://hudi.apache.org/) 0.11.0-amzn-0, which contains significant improvements over previous Hudi versions. For more information, see the [Apache Hudi 0.11.0 Migration Guide](https://hudi.apache.org/releases/release-0.11.0/#migration-guide). The examples on this tab reflect these changes.

```
(updateDF.write
    .format("hudi")
    .options(hudiOptions)
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete")
    .mode(SaveMode.Append)
    .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
```

### Delete using Scala with Amazon EMR 6.6 and earlier
<a name="scala-delete-66"></a>

```
(updateDF.write
    .format("org.apache.hudi")
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
    .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload")
    .mode(SaveMode.Append)
    .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
```

### Delete using PySpark
<a name="pyspark-delete"></a>

```
updateDF.write \
    .format('org.apache.hudi') \
    .option('hoodie.datasource.write.operation', 'upsert') \
    .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \
    .options(**hudiOptions) \
    .mode('append') \
    .save('s3://amzn-s3-demo-bucket/myhudidataset/')
```

You can also hard delete data by setting `OPERATION_OPT_KEY `to `DELETE_OPERATION_OPT_VAL` to remove all records in the dataset you submit. For instructions on performing soft deletes, and for more information about deleting data stored in Hudi tables, see [Deletes](https://hudi.apache.org/docs/writing_data.html#deletes) in the Apache Hudi documentation.

## Read from a Hudi dataset
<a name="emr-hudi-read-dataset"></a>

To retrieve data at the present point in time, Hudi performs snapshot queries by default. Following is an example of querying the dataset written to S3 in [Write to a Hudi dataset](#emr-hudi-dataframe). Replace *s3://amzn-s3-demo-bucket/myhudidataset* with your table path, and add wildcard asterisks for each partition level, *plus one additional asterisk*. In this example, there is one partition level, so we've added two wildcard symbols.

### Read using Scala with Amazon EMR 6.7 and later
<a name="scala-read-67"></a>

**Note**  
Amazon EMR 6.7.0 uses [Apache Hudi](https://hudi.apache.org/) 0.11.0-amzn-0, which contains significant improvements over previous Hudi versions. For more information, see the [Apache Hudi 0.11.0 Migration Guide](https://hudi.apache.org/releases/release-0.11.0/#migration-guide). The examples on this tab reflect these changes.

```
val snapshotQueryDF = spark.read
    .format("hudi")
    .load("s3://amzn-s3-demo-bucket/myhudidataset") 
    .show()
```

### Read using Scala with Amazon EMR 6.6 and earlier
<a name="scala-read-66"></a>

```
(val snapshotQueryDF = spark.read
    .format("org.apache.hudi")
    .load("s3://amzn-s3-demo-bucket/myhudidataset" + "/*/*"))

snapshotQueryDF.show()
```

### Read using PySpark
<a name="pyspark-read"></a>

```
snapshotQueryDF = spark.read \
    .format('org.apache.hudi') \
    .load('s3://amzn-s3-demo-bucket/myhudidataset' + '/*/*')
    
snapshotQueryDF.show()
```

### Incremental queries
<a name="emr-hudi-incremental-query"></a>

You can also perform incremental queries with Hudi to get a stream of records that have changed since a given commit timestamp. To do so, set the `QUERY_TYPE_OPT_KEY` field to `QUERY_TYPE_INCREMENTAL_OPT_VAL`. Then, add a value for `BEGIN_INSTANTTIME_OPT_KEY` to obtain all records written since the specified time. Incremental queries are typically ten times more efficient than their batch counterparts since they only process changed records.

When you perform incremental queries, use the root (base) table path without the wildcard asterisks used for Snapshot queries.

**Note**  
Presto does not support incremental queries.

#### Incremental queries using Scala
<a name="scala-incremental-queries"></a>

```
val incQueryDF = spark.read
    .format("org.apache.hudi")
    .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
    .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, <beginInstantTime>)
    .load("s3://amzn-s3-demo-bucket/myhudidataset")
     
incQueryDF.show()
```

#### Incremental queries using PySpark
<a name="pyspark-incremental-queries"></a>

```
readOptions = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': <beginInstantTime>,
}

incQueryDF = spark.read \
    .format('org.apache.hudi') \
    .options(**readOptions) \
    .load('s3://amzn-s3-demo-bucket/myhudidataset')
    
incQueryDF.show()
```

For more information about reading from Hudi datasets, see [Querying Hudi tables](https://hudi.apache.org/docs/querying_data.html) in the Apache Hudi documentation.

# Use the Hudi CLI
<a name="emr-hudi-cli"></a>

You can use the Hudi CLI to administer Hudi datasets to view information about commits, the filesystem, statistics, and more. You can also use the CLI to manually perform compactions, schedule compactions, or cancel scheduled compactions. For more information, see [Interacting via CLI](https://hudi.apache.org/docs/cli/) in the Apache Hudi documentation.

**To start the Hudi CLI and connect to a dataset**

1. Connect to the master node using SSH. For more information, see [Connect to the master node using SSH](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html) in the *Amazon EMR Management Guide*.

1. At the command line, type `/usr/lib/hudi/cli/bin/hudi-cli.sh`.

   The command prompt changes to `hudi->`.

1. Use the following command to connect to a dataset. Replace *s3://amzn-s3-demo-bucket/myhudidataset* with the path to the dataset that you want to work with. The value we use is the same as the value established in earlier examples.

   ```
   connect --path s3://amzn-s3-demo-bucket/myhudidataset
   ```

   The command prompt changes to include the dataset that you're connected to, as shown in the following example.

   ```
   hudi:myhudidataset->
   ```

By default, the `hudi-cli.sh` script in Amazon EMR release 7.3.0 to 7.8.0 uses `hudi-cli-bundle.jar`. If you run into issues, you can switch back to the classic Hudi CLI with the following command:

```
/usr/lib/hudi/cli/bin/hudi-cli.sh --cliBundle false
```

This command runs the `hudi-cli.sh` script, sets the `--cliBundle` flag, and instructs the CLI to use the individual JAR files instead of the bundled JAR. By default, the `--cliBundle` is set to true, which means the CLI uses the bundled JAR instead.

## Using Amazon EMR 7.9.0 and higher releases
<a name="emr-hudi-cli-start"></a>

**Note**  
 The **hudi-cli.sh** script has been deprecated in EMR release 7.9.0 and higher. Amazon EMR release 7.9.0 and higher uses **hudi-cli-bundle.jar**. 

**To start the Hudi CLI and connect to a dataset:**

1. Connect to the master node using SSH. For more information, see [Connect to the master node using SSH](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html) in the *Amazon EMR Management Guide*.

1. At the command line, type **/usr/lib/hudi/cli-bundle/bin/hudi-cli-with-bundle.sh** or simply type **hudi-cli-with-bundle** or **>hudi-cli**.

   The command prompt changes to **hudi- >**.

1. Use the following command to connect to a dataset. Replace **s3://amzn-s3-demo-bucket/myhudidataset** with the path to the dataset that you want to work with. The value we use is the same as the value established in earlier examples.

   ```
   connect --path s3://amzn-s3-demo-bucket/myhudidataset
   ```

1. The command prompt changes to include the dataset that you're connected to, as shown in the following example.

   ```
   hudi:myhudidataset->
   ```

# Hudi release history
<a name="Hudi-release-history"></a>

The following table lists the version of Hudi included in each release version of Amazon EMR, along with the components installed with the application. For component versions in each release, see the Component Version section for your release in [Amazon EMR 7.x release versions](emr-release-7x.md), [Amazon EMR 6.x release versions](emr-release-6x.md), or [Amazon EMR 5.x release versions](emr-release-5x.md).


**Hudi version information**  

| Amazon EMR Release Label | Hudi Version | Components Installed With Hudi | 
| --- | --- | --- | 
| emr-7.12.0 | 1.0.2-amzn-1 | Not available. | 
| emr-7.11.0 | 1.0.2-amzn-0 | Not available. | 
| emr-7.10.0 | 0.15.0-amzn-7 | Not available. | 
| emr-7.9.0 | 0.15.0-amzn-6 | Not available. | 
| emr-7.8.0 | 0.15.0-amzn-5 | Not available. | 
| emr-7.7.0 | 0.15.0-amzn-4 | Not available. | 
| emr-7.6.0 | 0.15.0-amzn-3 | Not available. | 
| emr-7.5.0 | 0.15.0-amzn-2 | Not available. | 
| emr-7.4.0 | 0.15.0-amzn-1 | Not available. | 
| emr-7.3.0 | 0.15.0-amzn-0 | Not available. | 
| emr-7.2.0 | 0.14.1-amzn-1 | Not available. | 
| emr-5.36.2 | 0.10.1-amzn-1 | Not available. | 
| emr-7.1.0 | 0.14.1-amzn-0 | Not available. | 
| emr-7.0.0 | 0.14.0-amzn-1 | Not available. | 
| emr-6.15.0 | 0.14.0-amzn-0 | Not available. | 
| emr-6.14.0 | 0.13.1-amzn-2 | Not available. | 
| emr-6.13.0 | 0.13.1-amzn-1 | Not available. | 
| emr-6.12.0 | 0.13.1-amzn-0 | Not available. | 
| emr-6.11.1 | 0.13.0-amzn-0 | Not available. | 
| emr-6.11.0 | 0.13.0-amzn-0 | Not available. | 
| emr-6.10.1 | 0.12.2-amzn-0 | Not available. | 
| emr-6.10.0 | 0.12.2-amzn-0 | Not available. | 
| emr-6.9.1 | 0.12.1-amzn-0 | Not available. | 
| emr-6.9.0 | 0.12.1-amzn-0 | Not available. | 
| emr-6.8.1 | 0.11.1-amzn-0 | Not available. | 
| emr-6.8.0 | 0.11.1-amzn-0 | Not available. | 
| emr-6.7.0 | 0.11.0-amzn-0 | Not available. | 
| emr-5.36.1 | 0.10.1-amzn-1 | Not available. | 
| emr-5.36.0 | 0.10.1-amzn-1 | Not available. | 
| emr-6.6.0 | 0.10.1-amzn-0 | Not available. | 
| emr-5.35.0 | 0.9.0-amzn-2 | Not available. | 
| emr-6.5.0 | 0.9.0-amzn-1 | Not available. | 
| emr-6.4.0 | 0.8.0-amzn-0 | Not available. | 
| emr-6.3.1 | 0.7.0-amzn-0 | Not available. | 
| emr-6.3.0 | 0.7.0-amzn-0 | Not available. | 
| emr-6.2.1 | 0.6.0-amzn-1 | Not available. | 
| emr-6.2.0 | 0.6.0-amzn-1 | Not available. | 
| emr-6.1.1 | 0.5.2-incubating-amzn-2 | Not available. | 
| emr-6.1.0 | 0.5.2-incubating-amzn-2 | Not available. | 
| emr-6.0.1 | 0.5.0-incubating-amzn-1 | Not available. | 
| emr-6.0.0 | 0.5.0-incubating-amzn-1 | Not available. | 
| emr-5.34.0 | 0.9.0-amzn-0 | Not available. | 
| emr-5.33.1 | 0.7.0-amzn-1 | Not available. | 
| emr-5.33.0 | 0.7.0-amzn-1 | Not available. | 
| emr-5.32.1 | 0.6.0-amzn-0 | Not available. | 
| emr-5.32.0 | 0.6.0-amzn-0 | Not available. | 
| emr-5.31.1 | 0.6.0-amzn-0 | Not available. | 
| emr-5.31.0 | 0.6.0-amzn-0 | Not available. | 
| emr-5.30.2 | 0.5.2-incubating | Not available. | 
| emr-5.30.1 | 0.5.2-incubating | Not available. | 
| emr-5.30.0 | 0.5.2-incubating | Not available. | 
| emr-5.29.0 | 0.5.0-incubating | Not available. | 
| emr-5.28.1 | 0.5.0-incubating | Not available. | 
| emr-5.28.0 | 0.5.0-incubating | Not available. | 