Hadoop Core Components
YARN Architecture
YARN consists of two major components:
- Resource Manager (Master)
- Node Manager (Worker / Slave)
Processes Involved in Invoking a Hadoop Job from Client Machine
To execute a Hadoop job:
hadoop jar <Jar-Name>
When you execute this command, the following steps occur:
- Request goes to the Resource Manager.
- The Resource Manager creates a Container in one of the Node Managers.
- Inside this Container, an Application Master Service is started.
- The Application Master is a local manager for the application.
- It negotiates resources with the Resource Manager and interacts with the Name Node to identify data locations based on the principle of Data Locality.
- Each application has its own Application Master. For instance, 20 applications will result in 20 Application Masters.
Uber Mode
- In Uber Mode, the application runs in the same container where the Application Master is running.
- This mode is suitable for small jobs that can be executed on a single container.
Two Ways of Running Spark Code
- Interactive Mode
- Use Notebooks or PySpark Shell for interactive Spark operations.
- Cluster Mode
- Package your code into a JAR and run it using Spark Submit.
Note:
- Every Spark job has one Driver. The Application Master acts as a Driver and registers with the Resource Manager.
- If the Driver crashes, the application terminates.
Two Modes in Which Spark Runs
- Client Mode
- Suitable for interactive operations to get results instantly.
- The Driver runs on the Client machine or Gateway node.
- Cluster Mode
- Recommended for production environments.
- Code is submitted to the cluster for execution using Spark Submit, and the Driver runs on the Cluster.
Resource Manager UI
The Resource Manager UI provides insights into:
- Application status
- Memory usage
- VCores usage
Spark UI
The Spark UI is used to monitor and debug Spark applications.
Accessing Columns in PySpark
Different methods to access columns in a DataFrame:
- String Notation:
df.select("*").show()
- Prefixing with DataFrame Name:
df.order_date
- Array Notation:
df['order_date']
- Column Object Notation:
column('order_date')
orcol('order_date')
- Column Expression:
expr("order_date")
Why so many ways?
- Ambiguity arises when multiple DataFrames have columns with the same name. Prefixing with the DataFrame name avoids conflicts.
Example:
orders_df.select("*").where(col('order_status').like('PENDING%')).show()
orders_df.select("*").where("order_status like 'PENDING%'").show()
Aggregations in PySpark
Simple Aggregations
Example operations on orders.csv
dataset:
- Count total records
- Count distinct invoice IDs
- Calculate sum of quantities
- Find average unit price
Three Ways to Achieve This:
- Programmatic Style:
from pyspark.sql.functions import *
orders_df.select(
count("*").alias("row_count"),
countDistinct("invoiceno").alias("unique_invoice"),
sum("quantity").alias("total_quantity"),
avg("unitprice").alias("avg_price")
).show()
- Column Expression Style:
orders_df.selectExpr(
"count(*) as row_count",
"count(distinct(invoiceno)) as unique_invoice",
"sum(quantity) as total_quantity",
"avg(unitprice) as avg_price"
).show()
- Spark SQL Style:
spark.sql("""
SELECT count(*) AS row_count,
count(distinct(invoiceno)) AS unique_invoice,
sum(quantity) AS total_quantity,
avg(unitprice) AS avg_price
FROM orders
""").show()
Grouping Aggregations
Example Use Case: Group data by invoice number
and country
.
Three Ways to Implement This:
- Programmatic Style:
summary_df = orders_df.groupBy("country", "invoiceno") \
.agg(
sum("quantity").alias("total_quantity"),
sum(expr("quantity * unitprice")).alias("invoice_value")
).sort("invoiceno")
- Column Expression Style:
summary_df = orders_df.groupBy("country", "invoiceno") \
.agg(
expr("sum(quantity) as total_quantity"),
expr("sum(quantity * unitprice) as invoice_value")
).sort("invoiceno")
- Spark SQL Style:
spark.sql("""
SELECT country, invoiceno,
sum(quantity) AS total_quantity,
sum(quantity * unitprice) AS invoice_value
FROM orders
GROUP BY country, invoiceno
ORDER BY invoiceno
""").show()
Windowing Aggregations
Example Use Case: Calculate the running total of invoice value
.
- Create a Window Specification:
from pyspark.sql.window import Window
mywindow = Window.partitionBy("country") \
.orderBy("weeknum") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
- Apply the Window:
result_df = orders_df.withColumn("running_total", sum("invoicevalue").over(mywindow))
result_df.show()
Windowing Functions
- Rank: Skips ranks if there are ties.
- Dense Rank: Does not skip ranks, even with ties.
- Row Number: Assigns unique row numbers.
- Lead: Compare current row with the next row.
- Lag: Compare current row with the previous row.
Log File Analysis
Example: Analyze log files to extract insights.
- Create a Spark Session and load sample data:
logs_data = [("INFO", "2025-01-12 10:33:34"), ("WARN", "2025-01-12 20:53:34")]
log_df = spark.createDataFrame(logs_data).toDF('log_level', 'log_time')
- Convert
log_time
to timestamp:
new_log_df = log_df.withColumn("logtime", to_timestamp("log_time"))
new_log_df.createOrReplaceTempView("serverlogs")
- Query logs with Spark SQL:
spark.sql("""
SELECT loglevel,
date_format(logtime, 'MMMM') AS month,
COUNT(*) AS total_occurrence
FROM serverlogs
GROUP BY loglevel, month
""").show()
Pivot Table Optimization
Example:
month_list = ['Jan', 'Feb', 'Mar', ..., 'Dec']
spark.sql("""
SELECT loglevel,
date_format(logtime, 'MMMM') AS month
FROM serverlogs
""").groupBy('loglevel').pivot('month', month_list).count().show()
Labels:
Spark, YARN Architecture, Hadoop Core Components, Spark on YARN, Spark Jobs, PySpark Aggregations, Log Analysis, Spark SQL, Windowing Aggregations, Pivot Table