Python Basics: Normal Vs Lambda Functions
Defining Normal Function
In Python, a normal function is defined using the def
keyword. Here's an example:
def my_sum(x, y):
return x + y
To call the function:
total = my_sum(5, 7)
Lambda Function
A Lambda function is equivalent to an anonymous function in languages like Scala. The function body and functionality are directly embedded inline and passed as a parameter rather than defining and then calling the function.
Lambda functions are typically used for short, one-off functionality that is required only once. Here’s an example of a Lambda function:
lambda x, y: x + y
Python map
Function Vs Spark map
Transformation
Consider you want to sum the numbers in a list: my_list = [5,7,8,2,5,9]
.
The Normal Python map
way (Single Machine):
# Defining a normal function
def sumlist(x):
total = 0
for i in x:
total += i
return total
# Calling the normal function
sumlist(my_list)
This works on a single machine (local/gateway node).
The Distributed Spark map
way (Cluster-based):
In Spark, the map
transformation operates on a distributed cluster. Here’s how you can achieve the same using Spark:
from functools import reduce
# Defining lambda function to sum the numbers in-line
reduce(lambda x, y: x + y, my_list)
Higher Order Functions
Higher-order functions are functions that accept other functions as parameters or return other functions. Examples include reduce
, map
.
PySpark Use Case 1
Consider a business scenario where you need to extract the following information from their order data:
- Orders in each category (COMPLETE, PENDING_PAYMENT, CLOSED, etc.)
- Premium Customers (Top 10 customers who placed the most orders)
- Distinct customers who placed at least 1 order
- Customers having the maximum number of CLOSED orders
Creating a Spark Session (Boilerplate Code):
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession.builder \
.config('spark.ui.port', '0') \
.config("spark.sql.warehouse.dir", f"/user/{username}/warehouse") \
.enableHiveSupport() \
.master('yarn') \
.getOrCreate()
Loading Data into RDD:
orders_rdd = spark.sparkContext.textFile("/public/orders/*")
Applying Transformations
Orders in Each Category:
mapped_rdd = orders_rdd.map(lambda x: (x.split(",")[3], 1))
reduced_rdd = mapped_rdd.reduceByKey(lambda x, y: x + y)
reduced_sorted = reduced_rdd.sortBy(lambda x: x[1], False)
Premium Customers:
customers_mapped = orders_rdd.map(lambda x: (x.split(",")[2], 1))
customers_aggregated = customers_mapped.reduceByKey(lambda x, y: x + y)
customers_aggregated.sortBy(lambda x: x[1], False).take(10)
Distinct Customers Who Placed at Least 1 Order:
distinct_customers = orders_rdd.map(lambda x: (x.split(",")[2])).distinct()
distinct_customers.count()
Customers Having Maximum Number of CLOSED Orders:
filtered_orders = orders_rdd.filter(lambda x: x.split(",")[3] == 'CLOSED')
filtered_mapped = filtered_orders.map(lambda x: (x.split(",")[2], 1))
filtered_aggregated = filtered_mapped.reduceByKey(lambda x, y: x + y)
filtered_sorted = filtered_aggregated.sortBy(lambda x: x[1], False)
Spark Core APIs: RDD
Alternate Spark Development Environments
Other than Jupyter Notebooks, Spark code can be developed and executed in the terminal. You can execute the following command to start a PySpark execution environment:
pyspark3
Parallelizing Data
In the industry, you usually test functionality on a small dataset first. Once the desired results are achieved, the logic can be applied to large datasets.
Here's an example to find the frequency of each word:
words = ("big", "Data", "Is", "SUPER", "Interesting", "BIG", "data", "IS", "A", "Trending", "technology")
words_rdd = spark.sparkContext.parallelize(words)
words_normalized = words_rdd.map(lambda x: x.lower())
mapped_words = words_normalized.map(lambda x: (x, 1))
aggregated_result = mapped_words.reduceByKey(lambda x, y: x + y)
aggregated_result.collect()
Chaining Functions:
result = spark.sparkContext.parallelize(words) \
.map(lambda x: x.lower()) \
.map(lambda x: (x, 1)) \
.reduceByKey(lambda x, y: x + y)
result.collect()
Checking Number of RDD Partitions
For a 1GB file in HDFS with a default block size of 128MB:
No. of blocks in HDFS = 1024MB / 128MB = 8 blocks
No. of RDD partitions = 8
You can use the getNumPartitions()
function to check the number of partitions:
rdd.getNumPartitions()
Spark Transformations and Actions
countByValue
This is an action that combines the functionality of map
and reduceByKey
. It’s efficient when you only need to count values without further processing.
countByValue = map + reduceByKey
Categories of Transformations
- Narrow Transformation: No shuffling of data, e.g.,
map
,filter
. - Wide Transformation: Involves shuffling, e.g.,
reduceByKey
,groupByKey
.
Repartition Vs Coalesce
- Repartition: Can increase or decrease the number of partitions.
- Coalesce: Only decreases the number of partitions, optimizing performance by avoiding reshuffling.
# Repartition Example
repartitioned_rdd = base_rdd.repartition(10)
# Coalesce Example
coalesced_rdd = base_rdd.coalesce(2)
Caching in Spark
Caching is used when you want to store intermediate results and reuse them across multiple transformations. This helps to avoid recalculating the same data multiple times.
rdd.cache()
I hope this helps you with your blog content! You can copy and paste the above directly into your BlogSpot page for a clean and well-organized post.