Memory Management in Apache Spark
1. Requesting Executor Memory
When submitting a Spark job using spark-submit
, you can configure executor memory using the --executor-memory
option.
Example:
Requesting 4GB memory per executor:
spark-submit --executor-memory 4G --class <main-class> <application-jar>
2. Memory Allocation Segments
-
Heap Memory / JVM Memory
Allocated for Spark’s internal operations (RDD transformations, actions, etc.).
If you allocate 2GB executor memory, the JVM heap gets 60% by default.Example:
2GB executor memory → Unified memory = 1.2GB (60% of 2GB). -
Overhead Memory
Memory outside JVM for managing OS threads and Spark tasks.
Formula:max(10% of executor memory, 384MB)
.Example:
For 2GB executor memory → Overhead = 384MB. -
Off-Heap Memory
Memory outside the JVM used for caching or specific data structures. You must explicitly enable and configure it.Example:
spark.conf.set("spark.memory.offHeap.size", "2G") spark.conf.set("spark.memory.offHeap.enabled", True)
With 2GB executor memory and 2GB off-heap memory:
Unified memory = 1.2GB + 2GB = 3.2GB.
3. Memory Types
-
Overhead Memory: Memory for VM-related operations.
Configuration:spark.executor.memoryOverhead
. -
Reserved Memory: A fixed amount (e.g., 300MB) reserved for Spark Engine.
-
Unified Memory:
Used for Storage Memory (caching) and Execution Memory (joins, aggregations).- Default split: 60% for Unified memory, 40% for User memory.
- Storage and Execution memory are flexible and can borrow from each other.
Example Configuration:
spark.conf.set("spark.memory.fraction", 0.6) # 60% unified memory
spark.conf.set("spark.memory.storageFraction", 0.5) # 50% split of unified memory
Hash Aggregate vs. Sort Aggregate
-
Hash Aggregate:
Uses a hash table for aggregation. Faster but requires additional memory.
Time Complexity: .Example:
df.groupBy("category").sum("sales").show()
Spark uses Hash Aggregate if memory is sufficient.
-
Sort Aggregate:
Sorts data first, then aggregates. Used when data is too large for a hash table.
Time Complexity: .Example: If
category
is a String (immutable), Spark may default to Sort Aggregate.df.groupBy("category").agg({"sales": "sum"}).show()
Apache Spark Logical and Physical Plans
-
Parsed Logical Plan (Unresolved):
Validates syntax correctness (e.g., missing commas or brackets).Example:
SELECT name, age FROM employees WHERE department = 'HR'
Parsed plan checks if SQL syntax is correct.
-
Analyzed Logical Plan (Resolved):
Validates table and column existence using Spark’s catalog.Example:
Ifemployees
table ordepartment
column is missing, Spark throws an AnalysisException. -
Optimized Logical Plan:
Applies optimization rules:- Predicate Pushdown: Apply filters early to reduce data size.
- Projection Pruning: Combine multiple projections.
Example:
df.select("name").filter("age > 30")
Spark pushes the
age > 30
filter to the source. -
Physical Plan:
Decides the best execution strategy:- Join type: Broadcast Hash Join vs. Sort-Merge Join.
- Aggregation: Hash vs. Sort Aggregate.
Example:
Broadcast Hash Join for small tables:df1.join(broadcast(df2), "id").show()
File Formats
-
Row-Based Formats (CSV, JSON):
- Faster writes, slower reads.
- Not space-efficient.
Example:
df.write.csv("/path/to/output")
-
Column-Based Formats (Parquet, ORC):
- Faster reads for specific columns.
- Good compression.
Example:
df.write.parquet("/path/to/output")
Compression Techniques
-
Dictionary Encoding:
Large values are mapped to numeric keys for compression.Example:
- Original: ["India", "India", "USA"]
- Encoded: {1: "India", 2: "USA"} → [1, 1, 2].
-
Bit Packing:
Stores integers with fewer bits.Example:
Store17
(binary: 10001) using 5 bits instead of 32. -
Delta Encoding:
Stores differences between sequential values.Example:
Original: [100, 101, 102] → Encoded: [100, 1, 1]. -
Run-Length Encoding:
Compresses repetitive sequences.Example:
Original:aaaabbbbcc
→ Encoded:a4b4c2
.
Apache Spark: Memory Management, Aggregations, and Performance Optimizations
1. Memory Management in Spark
- Executor Memory Configuration: Allocating memory for Spark executors (
--executor-memory
). - Memory Allocation Segments: Understanding heap, overhead, and off-heap memory.
- Memory Types: Configuring Unified, Storage, and Execution memory for optimal performance.
2. Hash Aggregate vs. Sort Aggregate
- Hash Aggregate: Efficient for smaller data, faster performance, requires more memory.
- Sort Aggregate: Used when hash table memory is insufficient, slower but scalable.
3. Spark Logical & Physical Plans
- Logical Plans: Parsing, analyzing, and optimizing SQL queries.
- Physical Plans: Decision-making on join types and execution strategies.
- Optimization: Predicate pushdown, projection pruning, and efficient execution strategies.
4. File Formats in Spark
- Row-Based Formats: Examples like CSV and JSON, slower reads but faster writes.
- Column-Based Formats: Parquet and ORC, faster reads, and better compression.
5. Compression Techniques
- Dictionary Encoding: Compressing repeated strings by mapping to integers.
- Bit Packing: Storing integers with fewer bits.
- Delta Encoding: Storing differences between values.
- Run-Length Encoding: Compressing sequences of repeated values.