Autoloader

 

Autoloader for Handling Streaming Data

Autoloader is a solution designed to handle streaming data workloads, particularly for cloud storage systems. It provides additional functionalities that enhance Spark's Structured Streaming capabilities by efficiently handling new data files as they arrive.


What is Autoloader?

Autoloader extends Spark Structured Streaming to process streaming data. It provides more advanced features like schema evolution, file state management, and improved performance for large volumes of data. Unlike the standard Structured Streaming API, Autoloader automatically detects and processes new files that arrive in cloud storage, making it ideal for real-time data processing.

When Should You Use Autoloader?

Autoloader is especially useful when you need to process data continuously from cloud storage such as AWS S3, Azure Data Lake Storage Gen2, or Google Cloud Storage. Use cases include:

  • Continuous file arrival, such as logs or sensor data.
  • Streaming data workloads where new files are added to the storage frequently.

Example Scenario: Using Autoloader in Databricks

Let’s say we have an example where files arrive every minute, and we want to process them using Autoloader.

  1. Create Delta Table (orders):
create table orders (
    order_id int,
    order_date string,
    customer_id int,
    order_status string
) using delta
  1. Upload Data (CSV file):

Upload the orders.csv file into DBFS (Databricks File System) under the data folder.

%fs ls /FileStore/data

This will list the orders.csv file.

  1. Copy Data into Delta Table:
%sql
copy into orders
from (select order_id :: int, order_date, customer_id :: int, order_status from 'dbfs:/FileStore/data/*')
fileformat = csv
format_options('header' = 'true')
  1. Check Data in the Table:
%sql
select * from orders

Challenges with copy into

  • Idempotent Operation: copy into does not reload already processed files and works well for batch processing.
  • Schema Evolution: copy into doesn’t support schema evolution. If the schema changes (e.g., a new column is added or a data type changes), the data will either fail to load or get lost.
  • Handling Errors: You must use create table if not exists to avoid errors if the table already exists.

Advantages of Autoloader Over copy into

Autoloader is designed for streaming data, and its primary advantages include:

  1. Schema Evolution: Automatically handles schema changes such as new columns or datatype changes.
  2. Resilient Data Processing: Keeps track of already processed files and can restart from the point of failure, unlike copy into which may reload already processed data.
  3. Scalability: Works well with millions or billions of files, unlike copy into, which is suited for smaller data sets.
  4. Cloud Integration: Supports data from cloud storage like AWS S3, Azure Blob, and Google Cloud Storage.

Using Autoloader for Streaming Data

1. Set Up Variables:

landing_zone = "dbfs:/FileStore/retail_data"
orders_data = landing_zone + "/orders_data"
checkpoint_path = landing_zone + "/orders_checkpoint"

2. Create DataFrame with Autoloader:

Autoloader is used by specifying .format("cloudFiles") with necessary options for schema inference and cloud file format.

orders_df = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "csv") \
    .option("cloudFiles.inferSchema", "true") \
    .option("cloudFiles.inferColumnTypes", "true") \
    .option("cloudFiles.schemaLocation", checkpoint_path) \
    .load(orders_data)
  • .option("cloudFiles.format", "csv"): Specifies the format of the incoming files.
  • .option("cloudFiles.inferSchema", "true"): Automatically infers the schema of incoming files.
  • .option("cloudFiles.schemaLocation", checkpoint_path): Specifies the location to store the schema and checkpoints.

3. Stream Data to Delta Table:

orders_df.writeStream \
    .format("delta") \
    .option("checkpointLocation", checkpoint_path) \
    .outputMode("append") \
    .toTable("orderdelta")
  • .option("checkpointLocation", checkpoint_path): Defines the checkpoint location for resuming failed jobs.
  • .outputMode("append"): Data is appended to the destination table.
  • .toTable("orderdelta"): Writes the processed data to the Delta table.

4. Query Data from Delta Table:

%sql
show tables
%sql
select * from orderdelta

Key Points to Remember

  • Schema Inference and Evolution: Autoloader automatically infers schema and can adapt to changes like new columns. In case of datatype mismatches, the data is captured in the rescued_data column.
  • Checkpointing: Checkpoints are used to track the progress of data processing, ensuring that jobs can resume from the point of failure without reprocessing all data.
  • File Listing Optimization: Autoloader performs incremental listing, which is more efficient than full file scanning.
  • Cloud Storage Integration: It supports various cloud storage systems like AWS S3, Azure Blob Storage, and Google Cloud Storage.

Handling Schema Evolution and Errors

  • Schema Evolution Example:

If a new column is added to a CSV file, the first time the job runs with that new column will result in a failure with an UnknownFieldException. However, on re-running the job, the schema will evolve automatically.

  • Datatype Mismatch:

If there is a datatype mismatch, such as an integer field receiving a string value, the mismatched data will be stored in a special rescued_data column for future analysis.


Schema Inference and Hints

Schema inference reads a subset of files (by default the first 10 files or 1 GB) to deduce the schema. You can control the inference sample size with the following options:

.option("cloudFiles.schemaInference.sampleSize.numBytes", 1048576)  # 1 MB sample
.option("cloudFiles.schemaInference.sampleSize.numFiles", 10)  # 10 files

If no data files are available initially, you can manually define the schema:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

orders_schema = StructType([
    StructField("order_id", IntegerType()),
    StructField("order_date", StringType()),
    StructField("customer_id", IntegerType()),
    StructField("order_status", StringType())
])

orders_df = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "csv") \
    .schema(orders_schema) \
    .option("cloudFiles.inferColumnTypes", "true") \
    .option("cloudFiles.schemaLocation", checkpoint_path) \
    .load(orders_data)

Adding Additional Columns (File and Load Timestamp)

If you want to capture the file name and timestamp of when a record was processed, you can use the .withColumn() method:

from pyspark.sql.functions import input_file_name, current_timestamp

orders_df = orders_df.withColumn("file_name", input_file_name()) \
    .withColumn("time_of_load", current_timestamp())

Comparison: copy into vs Autoloader

Feature copy into Autoloader
Use Case Batch data (up to thousands of files) Streaming data (millions/billions of files)
Schema Evolution Not supported Supported
DataType Mismatch Data gets lost Captured in rescued_data column
Scaling Limited to batch sizes Scales easily to large datasets
File Listing Full file listing Incremental file listing

Conclusion

Autoloader offers significant advantages over traditional batch data loading methods, particularly when dealing with large volumes of streaming data. It integrates seamlessly with Spark Structured Streaming, providing features like schema evolution, checkpointing, and optimized file listing, making it the ideal choice for processing real-time data from cloud storage systems.

Akash

I am working as a Data Engineer

Post a Comment

Previous Post Next Post