10. Spark Optimization

1. How groupBy() Works in Spark

When you use groupBy() in PySpark, it triggers a wide transformation. This involves shuffling data across partitions because records with the same keys must be grouped together.

Example: Grouping Orders by order_status

Suppose you have an orders dataset with the following data:

order_id order_status
1 COMPLETE
2 PENDING_PAYMENT
3 COMPLETE
4 CANCELLED
5 COMPLETE

If you execute:

orders_df.groupBy("order_status").count().show()
  1. Spark first performs local aggregation (grouping within each partition).
    • If you have 3 partitions, each partition will produce counts for all unique order_status values in that partition.
  2. After this, Spark shuffles the data to group all records with the same order_status into a single partition. For example:
    • All COMPLETE records from different partitions will move to the same partition.
  3. Finally, Spark aggregates the counts from all partitions to produce the final result.

Output:

order_status count
COMPLETE 3
PENDING_PAYMENT 1
CANCELLED 1

2. Broadcast Join vs Shuffle-Sort-Merge Join

Broadcast Join

This join is used when one of the DataFrames is small enough to fit in memory. Spark broadcasts the smaller DataFrame to all the executors, avoiding expensive shuffling.

Example:
# Small DataFrame: Customers
customers_df = spark.createDataFrame(
    [(1, "Alice"), (2, "Bob"), (3, "Charlie")],
    ["customer_id", "name"]
)

# Large DataFrame: Orders
orders_df = spark.createDataFrame(
    [(1, "COMPLETE"), (2, "PENDING"), (3, "COMPLETE"), (4, "CANCELLED")],
    ["customer_id", "order_status"]
)

# Perform a Broadcast Join
result = orders_df.join(customers_df.hint("broadcast"), "customer_id")
result.show()

Output:

customer_id order_status name
1 COMPLETE Alice
2 PENDING Bob
3 COMPLETE Charlie

Here, the smaller customers_df is broadcasted to all executors.


Shuffle-Sort-Merge Join

When both DataFrames are large, Spark performs a shuffle to sort the data based on the join key and then merges them.

Example:
result = orders_df.join(customers_df, "customer_id")
result.show()

In this case, Spark shuffles data across partitions to ensure that records with the same customer_id are colocated, then performs the join.


3. Handling Partition Skew

Problem:

Partition skew occurs when one partition has significantly more data than others, leading to performance bottlenecks. For example, if most orders have order_status = "COMPLETE", that partition will become overloaded.

Solution: Salting

To distribute the data more evenly, you can add a salt key.

Example:
from pyspark.sql.functions import lit, concat, col

# Add a salt column with random values
orders_df = orders_df.withColumn("salted_key", concat(col("order_status"), lit("_"), (col("order_id") % 3)))

# GroupBy using salted_key
result = orders_df.groupBy("salted_key").count()
result.show()

This breaks the skewed key into multiple smaller keys (COMPLETE_0, COMPLETE_1, etc.), distributing the data more evenly across partitions.


4. Partitioning vs Bucketing

Partitioning

Partitioning divides the data into folders based on a column. It is useful for filtering operations.

Example:

Partition the orders DataFrame by order_status:

orders_df.write.partitionBy("order_status").parquet("orders_partitioned")

Folder Structure:

orders_partitioned/
  order_status=COMPLETE/
  order_status=CANCELLED/
  order_status=PENDING/

When you query data with a filter:

spark.read.parquet("orders_partitioned").filter("order_status = 'COMPLETE'").show()

Only the order_status=COMPLETE folder is scanned, improving performance (called partition pruning).


Bucketing

Bucketing is used for evenly distributing data across a fixed number of buckets. It is especially useful for joins and equality-based queries.

Example:

Bucket the orders DataFrame by order_id:

orders_df.write.bucketBy(8, "order_id").saveAsTable("bucketed_orders")

When joining two bucketed tables (e.g., orders and customers), Spark avoids shuffling because both are pre-bucketed by the join key.


5. Adaptive Query Execution (AQE)

Adaptive Query Execution is available in Spark 3.0+ and improves performance by dynamically optimizing queries at runtime.

Benefits:

  1. Coalescing Shuffle Partitions:

    • If many partitions are empty after a shuffle, AQE reduces the number of shuffle partitions.
  2. Handling Partition Skew:

    • AQE detects skewed partitions and splits them dynamically.
  3. Switching Join Strategies:

    • If runtime statistics show that one table is small, Spark switches to a broadcast join.

Enabling AQE:

spark.conf.set("spark.sql.adaptive.enabled", "true")

Example:

orders_df.groupBy("order_status").count().write.format("noop").mode("overwrite").save()

With AQE enabled, Spark reduces the number of shuffle partitions to match the number of unique keys (order_status), avoiding unnecessary tasks.


Summary Table:

Concept Use Case Optimization Technique
groupBy() Aggregations with wide transformations Shuffle data efficiently
Broadcast Join One small, one large DataFrame Avoids shuffle by broadcasting
Shuffle-Sort-Merge Joining large DataFrames Sort and merge after shuffling
Partitioning Queries on partition columns Partition pruning
Bucketing Joins on specific columns Pre-bucket data to avoid shuffling
Salting Fix partition skew Add random salt keys
AQE Optimize queries dynamically Coalesce partitions, fix skew


Akash

I am working as a Data Engineer

Post a Comment

Previous Post Next Post