4. Core API

Python Basics: Normal Vs Lambda Functions

Defining Normal Function

In Python, a normal function is defined using the def keyword. Here's an example:

def my_sum(x, y):
    return x + y

To call the function:

total = my_sum(5, 7)

Lambda Function

A Lambda function is equivalent to an anonymous function in languages like Scala. The function body and functionality are directly embedded inline and passed as a parameter rather than defining and then calling the function.

Lambda functions are typically used for short, one-off functionality that is required only once. Here’s an example of a Lambda function:

lambda x, y: x + y

Python map Function Vs Spark map Transformation

Consider you want to sum the numbers in a list: my_list = [5,7,8,2,5,9].

The Normal Python map way (Single Machine):

# Defining a normal function
def sumlist(x):
    total = 0
    for i in x:
        total += i
    return total

# Calling the normal function
sumlist(my_list)

This works on a single machine (local/gateway node).

The Distributed Spark map way (Cluster-based):

In Spark, the map transformation operates on a distributed cluster. Here’s how you can achieve the same using Spark:

from functools import reduce

# Defining lambda function to sum the numbers in-line
reduce(lambda x, y: x + y, my_list)

Higher Order Functions

Higher-order functions are functions that accept other functions as parameters or return other functions. Examples include reduce, map.


PySpark Use Case 1

Consider a business scenario where you need to extract the following information from their order data:

  1. Orders in each category (COMPLETE, PENDING_PAYMENT, CLOSED, etc.)
  2. Premium Customers (Top 10 customers who placed the most orders)
  3. Distinct customers who placed at least 1 order
  4. Customers having the maximum number of CLOSED orders

Creating a Spark Session (Boilerplate Code):

from pyspark.sql import SparkSession
import getpass

username = getpass.getuser()

spark = SparkSession.builder \
    .config('spark.ui.port', '0') \
    .config("spark.sql.warehouse.dir", f"/user/{username}/warehouse") \
    .enableHiveSupport() \
    .master('yarn') \
    .getOrCreate()

Loading Data into RDD:

orders_rdd = spark.sparkContext.textFile("/public/orders/*")

Applying Transformations

Orders in Each Category:

mapped_rdd = orders_rdd.map(lambda x: (x.split(",")[3], 1))
reduced_rdd = mapped_rdd.reduceByKey(lambda x, y: x + y)
reduced_sorted = reduced_rdd.sortBy(lambda x: x[1], False)

Premium Customers:

customers_mapped = orders_rdd.map(lambda x: (x.split(",")[2], 1))
customers_aggregated = customers_mapped.reduceByKey(lambda x, y: x + y)
customers_aggregated.sortBy(lambda x: x[1], False).take(10)

Distinct Customers Who Placed at Least 1 Order:

distinct_customers = orders_rdd.map(lambda x: (x.split(",")[2])).distinct()
distinct_customers.count()

Customers Having Maximum Number of CLOSED Orders:

filtered_orders = orders_rdd.filter(lambda x: x.split(",")[3] == 'CLOSED')
filtered_mapped = filtered_orders.map(lambda x: (x.split(",")[2], 1))
filtered_aggregated = filtered_mapped.reduceByKey(lambda x, y: x + y)
filtered_sorted = filtered_aggregated.sortBy(lambda x: x[1], False)

Spark Core APIs: RDD

Alternate Spark Development Environments

Other than Jupyter Notebooks, Spark code can be developed and executed in the terminal. You can execute the following command to start a PySpark execution environment:

pyspark3

Parallelizing Data

In the industry, you usually test functionality on a small dataset first. Once the desired results are achieved, the logic can be applied to large datasets.

Here's an example to find the frequency of each word:

words = ("big", "Data", "Is", "SUPER", "Interesting", "BIG", "data", "IS", "A", "Trending", "technology")
words_rdd = spark.sparkContext.parallelize(words)
words_normalized = words_rdd.map(lambda x: x.lower())
mapped_words = words_normalized.map(lambda x: (x, 1))
aggregated_result = mapped_words.reduceByKey(lambda x, y: x + y)
aggregated_result.collect()

Chaining Functions:

result = spark.sparkContext.parallelize(words) \
    .map(lambda x: x.lower()) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(lambda x, y: x + y)
result.collect()

Checking Number of RDD Partitions

For a 1GB file in HDFS with a default block size of 128MB:

No. of blocks in HDFS = 1024MB / 128MB = 8 blocks
No. of RDD partitions = 8

You can use the getNumPartitions() function to check the number of partitions:

rdd.getNumPartitions()

Spark Transformations and Actions

countByValue

This is an action that combines the functionality of map and reduceByKey. It’s efficient when you only need to count values without further processing.

countByValue = map + reduceByKey

Categories of Transformations

  1. Narrow Transformation: No shuffling of data, e.g., map, filter.
  2. Wide Transformation: Involves shuffling, e.g., reduceByKey, groupByKey.

Repartition Vs Coalesce

  • Repartition: Can increase or decrease the number of partitions.
  • Coalesce: Only decreases the number of partitions, optimizing performance by avoiding reshuffling.
# Repartition Example
repartitioned_rdd = base_rdd.repartition(10)

# Coalesce Example
coalesced_rdd = base_rdd.coalesce(2)

Caching in Spark

Caching is used when you want to store intermediate results and reuse them across multiple transformations. This helps to avoid recalculating the same data multiple times.

rdd.cache()

I hope this helps you with your blog content! You can copy and paste the above directly into your BlogSpot page for a clean and well-organized post.

Akash

I am working as a Data Engineer

Post a Comment

Previous Post Next Post