11. Memory Optimization in Spark

Memory Management in Apache Spark

1. Requesting Executor Memory

When submitting a Spark job using spark-submit, you can configure executor memory using the --executor-memory option.

Example:
Requesting 4GB memory per executor:

spark-submit --executor-memory 4G --class <main-class> <application-jar>

2. Memory Allocation Segments

  1. Heap Memory / JVM Memory
    Allocated for Spark’s internal operations (RDD transformations, actions, etc.).
    If you allocate 2GB executor memory, the JVM heap gets 60% by default.

    Example:
    2GB executor memory → Unified memory = 1.2GB (60% of 2GB).

  2. Overhead Memory
    Memory outside JVM for managing OS threads and Spark tasks.
    Formula: max(10% of executor memory, 384MB).

    Example:
    For 2GB executor memory → Overhead = 384MB.

  3. Off-Heap Memory
    Memory outside the JVM used for caching or specific data structures. You must explicitly enable and configure it.

    Example:

    spark.conf.set("spark.memory.offHeap.size", "2G")
    spark.conf.set("spark.memory.offHeap.enabled", True)
    

    With 2GB executor memory and 2GB off-heap memory:
    Unified memory = 1.2GB + 2GB = 3.2GB.


3. Memory Types

  1. Overhead Memory: Memory for VM-related operations.
    Configuration: spark.executor.memoryOverhead.

  2. Reserved Memory: A fixed amount (e.g., 300MB) reserved for Spark Engine.

  3. Unified Memory:
    Used for Storage Memory (caching) and Execution Memory (joins, aggregations).

    • Default split: 60% for Unified memory, 40% for User memory.
    • Storage and Execution memory are flexible and can borrow from each other.

Example Configuration:

spark.conf.set("spark.memory.fraction", 0.6)  # 60% unified memory
spark.conf.set("spark.memory.storageFraction", 0.5)  # 50% split of unified memory

Hash Aggregate vs. Sort Aggregate

  1. Hash Aggregate:
    Uses a hash table for aggregation. Faster but requires additional memory.
    Time Complexity: O(n)O(n).

    Example:

    df.groupBy("category").sum("sales").show()
    

    Spark uses Hash Aggregate if memory is sufficient.

  2. Sort Aggregate:
    Sorts data first, then aggregates. Used when data is too large for a hash table.
    Time Complexity: O(nlogn)O(n \log n).

    Example: If category is a String (immutable), Spark may default to Sort Aggregate.

    df.groupBy("category").agg({"sales": "sum"}).show()
    

Apache Spark Logical and Physical Plans

  1. Parsed Logical Plan (Unresolved):
    Validates syntax correctness (e.g., missing commas or brackets).

    Example:

    SELECT name, age FROM employees WHERE department = 'HR'
    

    Parsed plan checks if SQL syntax is correct.

  2. Analyzed Logical Plan (Resolved):
    Validates table and column existence using Spark’s catalog.

    Example:
    If employees table or department column is missing, Spark throws an AnalysisException.

  3. Optimized Logical Plan:
    Applies optimization rules:

    • Predicate Pushdown: Apply filters early to reduce data size.
    • Projection Pruning: Combine multiple projections.

    Example:

    df.select("name").filter("age > 30")
    

    Spark pushes the age > 30 filter to the source.

  4. Physical Plan:
    Decides the best execution strategy:

    • Join type: Broadcast Hash Join vs. Sort-Merge Join.
    • Aggregation: Hash vs. Sort Aggregate.

    Example:
    Broadcast Hash Join for small tables:

    df1.join(broadcast(df2), "id").show()
    

File Formats

  1. Row-Based Formats (CSV, JSON):

    • Faster writes, slower reads.
    • Not space-efficient.

    Example:

    df.write.csv("/path/to/output")
    
  2. Column-Based Formats (Parquet, ORC):

    • Faster reads for specific columns.
    • Good compression.

    Example:

    df.write.parquet("/path/to/output")
    

Compression Techniques

  1. Dictionary Encoding:
    Large values are mapped to numeric keys for compression.

    Example:

    • Original: ["India", "India", "USA"]
    • Encoded: {1: "India", 2: "USA"} → [1, 1, 2].
  2. Bit Packing:
    Stores integers with fewer bits.

    Example:
    Store 17 (binary: 10001) using 5 bits instead of 32.

  3. Delta Encoding:
    Stores differences between sequential values.

    Example:
    Original: [100, 101, 102] → Encoded: [100, 1, 1].

  4. Run-Length Encoding:
    Compresses repetitive sequences.

    Example:
    Original: aaaabbbbcc → Encoded: a4b4c2.

------------------------------------------------------------------------------------------------------------------

Apache Spark: Memory Management, Aggregations, and Performance Optimizations

1. Memory Management in Spark

  • Executor Memory Configuration: Allocating memory for Spark executors (--executor-memory).
  • Memory Allocation Segments: Understanding heap, overhead, and off-heap memory.
  • Memory Types: Configuring Unified, Storage, and Execution memory for optimal performance.

2. Hash Aggregate vs. Sort Aggregate

  • Hash Aggregate: Efficient for smaller data, faster performance, requires more memory.
  • Sort Aggregate: Used when hash table memory is insufficient, slower but scalable.

3. Spark Logical & Physical Plans

  • Logical Plans: Parsing, analyzing, and optimizing SQL queries.
  • Physical Plans: Decision-making on join types and execution strategies.
  • Optimization: Predicate pushdown, projection pruning, and efficient execution strategies.

4. File Formats in Spark

  • Row-Based Formats: Examples like CSV and JSON, slower reads but faster writes.
  • Column-Based Formats: Parquet and ORC, faster reads, and better compression.

5. Compression Techniques

  • Dictionary Encoding: Compressing repeated strings by mapping to integers.
  • Bit Packing: Storing integers with fewer bits.
  • Delta Encoding: Storing differences between values.
  • Run-Length Encoding: Compressing sequences of repeated values.
Akash

I am working as a Data Engineer

Post a Comment

Previous Post Next Post