8. Spark on YARN Architecture

Hadoop Core Components

YARN Architecture

YARN consists of two major components:

  1. Resource Manager (Master)
  2. Node Manager (Worker / Slave)

Processes Involved in Invoking a Hadoop Job from Client Machine

To execute a Hadoop job:

hadoop jar <Jar-Name>

When you execute this command, the following steps occur:

  1. Request goes to the Resource Manager.
  2. The Resource Manager creates a Container in one of the Node Managers.
  3. Inside this Container, an Application Master Service is started.
    • The Application Master is a local manager for the application.
    • It negotiates resources with the Resource Manager and interacts with the Name Node to identify data locations based on the principle of Data Locality.
    • Each application has its own Application Master. For instance, 20 applications will result in 20 Application Masters.

Uber Mode

  • In Uber Mode, the application runs in the same container where the Application Master is running.
  • This mode is suitable for small jobs that can be executed on a single container.

Two Ways of Running Spark Code

  1. Interactive Mode
    • Use Notebooks or PySpark Shell for interactive Spark operations.
  2. Cluster Mode
    • Package your code into a JAR and run it using Spark Submit.

Note:

  • Every Spark job has one Driver. The Application Master acts as a Driver and registers with the Resource Manager.
  • If the Driver crashes, the application terminates.

Two Modes in Which Spark Runs

  1. Client Mode
    • Suitable for interactive operations to get results instantly.
    • The Driver runs on the Client machine or Gateway node.
  2. Cluster Mode
    • Recommended for production environments.
    • Code is submitted to the cluster for execution using Spark Submit, and the Driver runs on the Cluster.

Resource Manager UI

The Resource Manager UI provides insights into:

  • Application status
  • Memory usage
  • VCores usage

Spark UI

The Spark UI is used to monitor and debug Spark applications.


Accessing Columns in PySpark

Different methods to access columns in a DataFrame:

  1. String Notation: df.select("*").show()
  2. Prefixing with DataFrame Name: df.order_date
  3. Array Notation: df['order_date']
  4. Column Object Notation: column('order_date') or col('order_date')
  5. Column Expression: expr("order_date")

Why so many ways?

  • Ambiguity arises when multiple DataFrames have columns with the same name. Prefixing with the DataFrame name avoids conflicts.

Example:

orders_df.select("*").where(col('order_status').like('PENDING%')).show()
orders_df.select("*").where("order_status like 'PENDING%'").show()

Aggregations in PySpark

Simple Aggregations

Example operations on orders.csv dataset:

  • Count total records
  • Count distinct invoice IDs
  • Calculate sum of quantities
  • Find average unit price

Three Ways to Achieve This:

  1. Programmatic Style:
from pyspark.sql.functions import *
orders_df.select(
    count("*").alias("row_count"),
    countDistinct("invoiceno").alias("unique_invoice"),
    sum("quantity").alias("total_quantity"),
    avg("unitprice").alias("avg_price")
).show()
  1. Column Expression Style:
orders_df.selectExpr(
    "count(*) as row_count",
    "count(distinct(invoiceno)) as unique_invoice",
    "sum(quantity) as total_quantity",
    "avg(unitprice) as avg_price"
).show()
  1. Spark SQL Style:
spark.sql("""
    SELECT count(*) AS row_count, 
           count(distinct(invoiceno)) AS unique_invoice, 
           sum(quantity) AS total_quantity, 
           avg(unitprice) AS avg_price 
    FROM orders
""").show()

Grouping Aggregations

Example Use Case: Group data by invoice number and country.

Three Ways to Implement This:

  1. Programmatic Style:
summary_df = orders_df.groupBy("country", "invoiceno") \
    .agg(
        sum("quantity").alias("total_quantity"),
        sum(expr("quantity * unitprice")).alias("invoice_value")
    ).sort("invoiceno")
  1. Column Expression Style:
summary_df = orders_df.groupBy("country", "invoiceno") \
    .agg(
        expr("sum(quantity) as total_quantity"),
        expr("sum(quantity * unitprice) as invoice_value")
    ).sort("invoiceno")
  1. Spark SQL Style:
spark.sql("""
    SELECT country, invoiceno, 
           sum(quantity) AS total_quantity, 
           sum(quantity * unitprice) AS invoice_value 
    FROM orders 
    GROUP BY country, invoiceno 
    ORDER BY invoiceno
""").show()

Windowing Aggregations

Example Use Case: Calculate the running total of invoice value.

  1. Create a Window Specification:
from pyspark.sql.window import Window
mywindow = Window.partitionBy("country") \
    .orderBy("weeknum") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
  1. Apply the Window:
result_df = orders_df.withColumn("running_total", sum("invoicevalue").over(mywindow))
result_df.show()

Windowing Functions

  1. Rank: Skips ranks if there are ties.
  2. Dense Rank: Does not skip ranks, even with ties.
  3. Row Number: Assigns unique row numbers.
  4. Lead: Compare current row with the next row.
  5. Lag: Compare current row with the previous row.

Log File Analysis

Example: Analyze log files to extract insights.

  1. Create a Spark Session and load sample data:
logs_data = [("INFO", "2025-01-12 10:33:34"), ("WARN", "2025-01-12 20:53:34")]
log_df = spark.createDataFrame(logs_data).toDF('log_level', 'log_time')
  1. Convert log_time to timestamp:
new_log_df = log_df.withColumn("logtime", to_timestamp("log_time"))
new_log_df.createOrReplaceTempView("serverlogs")
  1. Query logs with Spark SQL:
spark.sql("""
    SELECT loglevel, 
           date_format(logtime, 'MMMM') AS month, 
           COUNT(*) AS total_occurrence 
    FROM serverlogs 
    GROUP BY loglevel, month
""").show()

Pivot Table Optimization

Example:

month_list = ['Jan', 'Feb', 'Mar', ..., 'Dec']
spark.sql("""
    SELECT loglevel, 
           date_format(logtime, 'MMMM') AS month 
    FROM serverlogs
""").groupBy('loglevel').pivot('month', month_list).count().show()

Labels:

Spark, YARN Architecture, Hadoop Core Components, Spark on YARN, Spark Jobs, PySpark Aggregations, Log Analysis, Spark SQL, Windowing Aggregations, Pivot Table

Akash

I am working as a Data Engineer

Post a Comment

Previous Post Next Post