Databricks Performance: Fixing the Small File Problem with Delta Lake

Delta Lake Performance

A common Databricks performance problem we see in enterprise data lakes are that of the “Small Files” issue.  One of our customers is a great example – we ingest 0.5TB of JSON and CSV data per day made of 5kb files which equates to millions of files a week in the data lake Raw zone.

Not only this makes it costly to extract analytics, it is incredibly time consuming to read all that data with Spark when you are using a file format that was made for transmission, not for Big Data analytics.

Even with the most efficient code you WILL experience Databricks performance issues with many small files. Delta Lake to the rescue!

This blog post covers a common pattern where the small datasets are created in cloud storage services such as Amazon S3, Azure blob or Azure Data Lake Storage (ADLS gen2) and how we use Delta Lake to address the resulting performance issues.

Architecture Overview

The use case comprises with a set of near real-time APIs and the data needs to get ingested to a Data Lake for advanced analytics and reporting. Following architecture diagram is an abstract of the proposed architecture.

Note:  In this case there is no real-time streaming consumption use-case, although the Delta Lake could still be used for streaming.

Delta Lake Architecture

Use Case Summary

Assume there are 50 APIs and each API is read (pulled) at a regular interval (every 10 seconds). The HTTP response is written in it’s original format (Json) to  the Data Lake RAW zone (ADLS Gen 2 – Hierarchical Folders). We have used a folder partitioning structure up to the Minute granularity, which helps with future reads:

raw/source api/endpoint/api_type/yyyy/mm/dd/hh/mm

In this use case we were writing up to 300 files per min across all the APIs (with a lot more to be added soon), which is around half a million small (under 1MB) JSON files per day.

An option here might be to “roll-you-own” conversion into a Big Data format like parquet, however why reinvent the wheel, when Delta Lake was designed to solve this issue and brings a lot of other benefits like transactions, governance and reliability, not to mention performance!

Processing the Files

We have created set of Databricks notebooks orchestrated via Azure Data Factory (ADF) to parse the JSON data regularly to Delta Lake. The high-level steps are as follows:

  • Create a list of delta files (new files created since last run)
  • Use the spark JSON parser and explode the JSON

spark.read.option(“multiline”, “true”).schema(jsonSchema).json(jsonFile)

  • Write the data to Delta Lake.

Partitioning

With big data the physical data storage has a huge impact to the query performance and the partitioning is one of the key considerations. The partitioning strategy is vastly depend on the data and the consumption use cases. In the above scenario we have used the header timestamp of JSON payload to partition the data.

df = deduplicatedDf.withColumn(“year”,from_unixtime(col(“header_TimeStamp”),’yyyy’)) \                .withColumn(“month”,from_unixtime(col(“header_TimeStamp”),’MM’)) \                .withColumn(“day”,from_unixtime(col(“header_TimeStamp”),’dd’))   df.repartition(deltaLakePartitionNum) \    .write.partitionBy(“year”,”month”,”day”) \    .format(“delta”) \    .mode(“append”) \    .save(destinationPath)

Partitioning the Delta Lake

Choose the right partition column

You can partition a Delta table by a column. The most commonly used partition column is date. Follow these two rules of thumb for deciding on what column to partition by:

If the cardinality of a column will be very high, do not use that column for partitioning. For example, if you partition by a column userId and if there can be 1M distinct user IDs, then that is a bad partitioning strategy.

Amount of data in each partition: You can partition by a column if you expect data in that partition to be at least 1 GB.

https://docs.delta.io/latest/best-practices.html#choose-the-right-partition-column

Re-partition

We have re-partitioned the data-frame in memory during the creation of the table which will enforce a small number of large files vs large number of small files.

 df.repartition(deltaLakePartitionNum)

Later in the post I will be implementing auto optimize which will supersede the requirement of in-memory re-partitioning.

De-duplicating the data

Given the APIs are getting called from a predefined time-frame there is a higher chance of getting redundant data via APIs and have used few techniques to de-duplicate data during the writes.

Within the data-frame we have filtered the distinct data set to remove the duplicate in the current data. However there can still be a scenarios where the last written dateset can still have redundant data.

For generic data identification we have created a new column with each row hash as a unique identifier of a row.

hashedDf = df.withColumn(“row_sha2”, sha2(concat_ws(“||”, *flattenedDf.columns), 256))

Then We have used MERGE_INTOto merge data back to the Delta Lake which will perform and update where the keys are matched, else its an insert.

Delta Lake MERGE INTO queries using partition pruning

In our use case, only the recent data writes can have the duplicate data and it is resource intensive to query all the partitions during the writes. With Partition pruning we have limit the datasets (partitions) required to be read during each ingestion.

deltaTable = DeltaTable.forPath(spark, destinationPath)  deltaTable.alias(“currentDF”).merge(df.alias(“newDF”),”currentDF.row_sha2 = newDF.row_sha2 AND from_unixtime(currentDF.header_TimeStamp) > current_date() – INTERVAL 1 DAYS”) \  .whenNotMatchedInsertAll(“from_unixtime(newDF.header_TimeStamp) > current_date() – INTERVAL 1 DAYS”) \  .execute()

In the above code snippet we are only reading and MERGING the last 1 day of data during the ingestion.

from_unixtime(currentDF.header_TimeStamp) > current_date() – INTERVAL 1 DAYS

Optimize performance

Delta lake supports two layout algorithms to optimize query performance.

  • Compaction (bin-packing)
    • bin-packing improves the read performance by coalescing small files to larger files.
    • If you have a larger data set you can optimize subset of data
    • bin-packing is idempotent, therefore if you run it twice on the same dataset, then the second run will not have an impact.
  • Z-Ordering (multi-dimensional clustering)
    • Z-Ordering on the other hand collating related information sets to files.

If you expect a column to be commonly used in query predicates and if that column has high cardinality (that is, a large number of distinct values), then use ZORDER BY.

https://docs.databricks.com/delta/optimizations/file-mgmt.html#delta-optimize

For the above use case the data is already partitioned by the date and the reads are more around the same granularity. Therefore, for the above use case bin-packing is sufficient.

Auto Optimize

If your atDabricks runtime is above 5.5, then you can get the benefits of the auto optimize feature where the small files automatically get compacted during each write.

 Auto Optimize is particularly useful in the following scenarios:

  • Streaming use cases where latency in the order of minutes is acceptable
  • MERGE INTO is the preferred method of writing into Delta Lake
  • CREATE TABLE AS SELECT or INSERT INTO are commonly used operations

Auto Optimize comprises with two features

  • Optimized writes
    • Azure Databricks automatically optimize the partition size based on actual data and tries to write 128MB files for each partition table. When this is enabled repartition(deltaLakePartitionNum) is not required.
  • Auto Compaction
    • After individual writes Databricks check if the files can further be compacted and then run a quick OPTIMIZE job

Few limitations of Auto Optimize

Azure Databricks does not support Z-Ordering with Auto Compaction as Z-Ordering is significantly more expensive than just compaction.

Auto Compaction generates smaller files (128 MB) than OPTIMIZE (1 GB).

Auto Compaction greedily chooses a limited set of partitions that would best leverage compaction. The number of partitions selected will vary depending on the size of cluster it is launched on. If your cluster has more CPUs, more partitions can be optimized.

Analyzing the nature of the data and the use case we have identified Auto Optimize is a better approach for the above use case.

While you can enable auto optimize at notebook level, spark session level, we have decided to enable Auto Optimize at the table level for better control.

%sqlALTER TABLE {tableName} SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true)

you can validate the table properties by running

DESCRIBE DETAIL {tableName};

Validation

After the OPTIMIZE, the following query completed within 31 seconds. This data set has close 500 million records distributed across 2 million small files and ingested to a delta lake and optimized. The below graph is created by aggregating daily partitions to understand the data volume and distribution daily in each month. This was not possible with the RAW JSON files even with a 12 hour read.

Validation of the Delta Lake

Conclusion

This is an advanced topic and it’s important to get your design right if you are performing analytics on IoT or Big Data. There are numerous articles on the benefits of Delta Lake however your design will vary according to your data model, consumption use-cases and business requirements.

With millions of small files writing to Azure Data Lake Storage the above solution has proven the Delta Lake’s ability enable sub-second query performance and provide a solution to the “small file problem” with correct partitioning and optimization strategy.

Contact us if you need help building or optimising your Enterprise Data Lake! 

Anjana Rupasinghege
Latest posts by Anjana Rupasinghege (see all)

What do you think?

Subscribed! We'll let you know when we have new blogs and events...