November 23, 2020

Incremental Merge with Apache Spark

By Ben Sinchai

It is common to ingest a large amount of data into the Hadoop Distributed File System (HDFS) for analysis. And more often than not, we need to periodically update that data with new changes. For a long time, the most common way to achieve this was to use Apache Hive to incrementally merge new or updated records to an existing dataset. An incremental merge can also be performed using Apache Spark. In this post, I am going to review the Hive incremental merge and explore how to incrementally update data using Spark SQL and Spark DataFrame.

All of the code samples in this blog can be found here.

Incremental Merge with Hive

Table: orders

order_no customer_id quantity cost order_date last_updated_date
001 u1 1 $15.00 03/01/2020 03/01/2020
002 u2 1 $30.00 04/01/2020 04/01/2020

Consider the orders table above. Now, let’s suppose we have received a cost update to the order number “002” in the order_updates table. We can perform an incremental merge using the following to merge the two tables and produce an updated order_reconciled table below. 

Table: order_updates

order_no customer_id quantity cost order_date last_updated_date
002 u2 1 $20.00 04/01/2020 04/02/2020

File: reconcile_orders.hql

CREATE TABLE order_reconciled AS
SELECT unioned.*
FROM (
  SELECT * FROM orders x
  UNION ALL
  SELECT * FROM order_updates y
) unioned
JOIN
(
  SELECT
    order_no,
    max(last_updated_date) as max_date
  FROM (
    SELECT * FROM orders
    UNION ALL
    SELECT * FROM order_updates
  ) t
  GROUP BY
    order_no
) grouped
ON
  unioned.order_no = grouped.order_no AND
  unioned.last_updated_date = grouped.max_date;

Table: order_reconciled

order_no customer_id quantity cost order_date last_updated_date
001 u1 1 $15.00 03/01/2020 03/01/2020
002 u2 1 $20.00 04/01/2020 04/02/2020

The query first combines the orders table and order_updates table together, giving the new unioned table the alias t. Then get the latest version for each order_no by grouping table t by order_no and using the MAX() function, denoted with the alias grouped. Join grouped to the unioned set of data on order_no and unioned.last_updated_date = grouped.max_date which filters only rows with the latest last_updated_date for each order_no.

The above example merges the two datasets on a single group-by key, order_no. To merge with multiple keys, SELECT the key columns in the grouped subquery and GROUP BY the keys. Then, update the join conditions to match the keys with their corresponding unioned columns as highlighted in the HQL below.

CREATE TABLE order_reconciled AS
SELECT unioned.*
FROM (...) unioned
JOIN
(
  SELECT
    order_no, customer_id,

    max(last_updated_date) as max_date
  FROM (...) t
  GROUP BY
    Order_no, customer_id
) grouped
ON
  unioned.order_no = grouped.order_no AND
  unioned.customer_id = grouped.customer_id AND
  unioned.last_updated_date = grouped.max_date;

Please note that after HDP 2.6, the MERGE statement can be used to achieve this task instead. For more information please see the HDP documentation here.

Now let’s see how we can do the same thing in Spark.

Incremental Merge with Apache Spark

Spark SQL lets you run SQL statements against structured data inside Spark programs. Here’s how we can use the same HQL above to update the data in Spark.

First, let’s create the data as DataFrame and register them as SQL temporary views.

def createDF(rows: Seq[Row], schema: StructType): DataFrame = {
  spark.createDataFrame(
    sc.parallelize(rows),
    schema
  )
}

val schema = StructType(
  List(
    StructField("order_no", StringType, true),
    StructField("customer_id", StringType, true),
    StructField("quantity", IntegerType, true),
    StructField("cost", DoubleType, true),
    StructField("order_date", DateType, true),
    StructField("last_updated_date", DateType, true)
  )
)

// Create orders dataframe
val orders = Seq(
  Row(

    "001", "u1", 1, 15.00,

    Date.valueOf("2020-03-01"), Date.valueOf("2020-03-01")

  ),
  Row(

    "002", "u2", 1, 30.00,

    Date.valueOf("2020-04-01"), Date.valueOf("2020-04-01")

  )
)
val ordersDF = createDF(orders, schema)

// Create order_updates dataframe
val orderUpdates = Seq(
  Row(

    "002", "u2", 1, 20.00,

    Date.valueOf("2020-04-01"), Date.valueOf("2020-04-02")

  )
)
val orderUpdatesDF = createDF(orderUpdates, schema)

// Register temporary views
ordersDF.createOrReplaceTempView("orders")
orderUpdatesDF.createOrReplaceTempView("order_updates")

Now, we can use the same HQL with Spark SQL to incrementally merge the two DataFrames.

val orderReconciledDF = spark.sql(
  """
  |SELECT unioned.*
  |FROM (
  |  SELECT * FROM orders x
  |  UNION ALL
  |  SELECT * FROM order_updates y
  |) unioned
  |JOIN
  |(

  |  SELECT
  |    order_no,
  |    max(last_updated_date) as max_date
  |  FROM (
  |    SELECT * FROM orders
  |    UNION ALL
  |    SELECT * FROM order_updates
  |  ) t
  |  GROUP BY
  |    order_no
  |) grouped
  |ON
  |  unioned.order_no = grouped.order_no AND
  |  unioned.last_updated_date = grouped.max_date
  """.stripMargin
)

Output:

scala> orderReconciledDF.show()
order_no customer_id quantity cost order_date last_updated_date
002

001

u2

u1

1

1

12.0

20.0

2020-03-01

2020-04-01

2020-03-01

2020-04-02

Using Spark DataFrame API

Spark SQL also allows users to manipulate data using functional transformations with the DataFrame API. It is also easier when dealing with multiple tables or composite keys instead of hardcoded them in the HQL. In the code below, note that the unioned variable is when the union step is performed. The following Scala code shows how to incrementally merge the orders datasets.

val keys = Seq("order_no", "customer_id")
val timestampCol = "last_updated_date"

val keysColumns = keys.map(ordersDF(_))

val unioned = ordersDF.union(orderUpdatesDF)

val grouped = unioned.groupBy(keysColumns: _*)
  .agg(
    max(timestampCol).as(timestampCol)
  )

val reconciled = grouped.join(unioned, keys :+ timestampCol)

Output:

scala> reconciled.show()
order_no customer_id last_updated_date quantity cost order_date
001

002

u1

u2

2020-03-01

2020-04-02

1

1

12.0

20.0

2020-03-01

2020-04-01

Writing the result back to Hive is simple and more efficient with Spark. When a query is run in Hive, it gets transformed into map-reduce jobs which perform disk I/O operations and store immediate results on Disk. Spark processes data in-memory on a distributed cluster of machines and uses a more optimized Directed Acyclic Graphs (DAG) processing engine to perform tasks.

reconciled.write
    .mode(SaveMode.Overwrite)
    .saveAsTable("order_reconciled")

Spark Outperforms Hive

Spark provides flexibility and options for users to best express their data transformations. The optimized Spark execution engine outperforms that of Hive. Both SQL query and DataFrame API use the same execution engine when computing a result, so there should not be any difference performance-wise. However when dealing with more tables and keys, the programmatic and functional approaches of Spark DataFrame provide a better way to achieve best practices.

Check out http://bitbucket.org/bsinchai/spark_incremental_merge for full code samples.

phData builds and optimizes Spark execution engines that make a noticeable impact on performance. For more information, read about our AWS Data Engineering services. Or, if you have specific questions about your Spark jobs, reach out to info@phdata.io and one of our experts will be happy to help out!


		

Data Coach is our premium analytics training program with one-on-one coaching from renowned experts.

Accelerate and automate your data projects with the phData Toolkit