Higher Level APIs in Apache Spark
1. Dataframes
2. Spark SQL
(Datasets is another language-specific High-Level API which is functional in the case of Scala and Java and not with Python)
Previously we have understood about RDDs and their functionality.
RDDs consist of raw data distributed across partitions without any associated structure (schema/metadata).
In the case of normal databases, where the data is stored in the form of tables, there are two things:
- Actual Data
- Schema / Metadata
In the case of Apache Spark:
- Spark SQL/Spark Table – Is persistent over different sessions. Data files are stored on the disk (any storage like Datalake/HDFS/S3), and the schema/metadata is in a metastore (Database).
- Dataframes – Are in the form of RDDs with some structure/schema that is not persistent, as it is available only in that particular session. Data is stored in memory, and metadata is saved in a temporary metadata catalog.
Note: Why are higher-level APIs more performant?
As the system, that is the Spark engine, is provided with more context to handle the data effectively with the help of metadata information.
Working of Dataframes
- Load the data file and create a Spark Dataframe.
- Apply transformations.
- Write the results back to storage.
(Spark Session is an entry point to the Spark Cluster in the case of higher-level APIs. Spark Context is for lower-level RDDs.)
Note: Inferschema configuration attribute should be avoided, as the inference of data types may not always be correct, and it takes some additional execution time to infer the schema.
Dataframe Reader
Standard way to create a Dataframe:
df = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferschema", "true") \
.load("<file-path>")
Alternate shortcuts to create a Dataframe for different file formats:
- CSV:
df = spark.read \
.csv("<file-path>", header="true", inferSchema="true")
- JSON:
df = spark.read \
.json("<file-path>")
- Parquet (column-based file format with embedded schema and is most compatible with Spark):
df = spark.read \
.parquet("<file-path>")
- ORC:
df = spark.read \
.orc("<file-path>")
Some Transformations
Dataframes and Spark SQL Tables
Dataframes and Spark SQL tables are interconvertible. Now, we can execute normal SQL queries on the SparkSQL table view created from a Dataframe.
In order to be able to work in a normal SQL style (much more convenient for SQL developers), a SparkSQL table view is created from a Dataframe.
Converting from SparkSQL Table to a Dataframe
Other alternatives to create a SparkSQL table from a Dataframe:
createOrReplaceTempView
– Creates a table. If the table exists, it replaces the existing table without throwing any error.createTempView
– Creates a table. If the table already exists, it errors out, stating that the table already exists.createGlobalTempView
– The table view will be visible across other applications as well. If the table already exists, it errors out, stating that the table already exists.createOrReplaceGlobalTempView
– Replaces any existing table view with the newly created table.
Creating a Spark Table
If a table is created without selecting the database in which it has to be created, it will be created under the default database.
To create a database:
spark.sql("CREATE DATABASE IF NOT EXISTS <Database-name-with-path>")
To view the database:
spark.sql("SHOW DATABASES")
To view the databases with a certain pattern in their name:
spark.sql("SHOW DATABASES").filter("namespace LIKE '<pattern>%'").show()
To view the tables:
spark.sql("SHOW TABLES").show()
To Create a Spark SQL Table
Scenario: We have a temporary table (created using createTempView
) with some data, and we need to persist this data to a persistent table in the database.
To view the extended description of the table created, use the following command:
DESCRIBE <table-name>
The above table is a Managed Table. We know that the table consists of Data and Metadata.
- Metadata is in the metastore, and we can view this metadata using the
DESCRIBE TABLE
command. - Data/Actual Data is stored in the directory (as highlighted in the diagram as
Location
).
Types of Tables:
- Managed Table
- External Table
Managed Table Example:
- Create an empty table.
- Load the data from a temporary view/table.
spark.sql("CREATE TABLE itv005857_retail.orders (order_id INTEGER, order_date STRING, customer_id INTEGER, order_status STRING) USING CSV")
External Table Example:
- The data is already present in a specific location, and a structure has to be created to get a tabular view.
spark.sql("CREATE TABLE itv005857_retail.orders (order_id INTEGER, order_date STRING, customer_id INTEGER, order_status STRING) USING CSV LOCATION '/public/trendytech/retail_db/orders'")
Managed Table vs External Table:
- When the data is owned by a single user, Managed tables could be used.
- However, if multiple users are accessing the data kept at a centralized location, then it is best to use External tables. With this, the data remains intact as users just reuse the data by applying the schema on the existing data but cannot delete the data.
DML Operations
In the case of open-source Apache Spark, and in the case of Databricks, the operations differ.
Working with Spark SQL API vs Dataframe API
Use cases to understand the working of these higher-level APIs:
1. Top 15 customers who placed the most number of orders:
DataFrame way:
result = ordersdf.groupBy("customer_id").count().sort("count", ascending=False).limit(15)
SparkSQL way:
result = spark.sql("SELECT customer_id, COUNT(order_id) AS count FROM orders GROUP BY customer_id ORDER BY count DESC LIMIT 15")
2. Find the number of orders under each order status:
DataFrame way:
result = ordersdf.groupBy("order_status").count()
SparkSQL way:
result = spark.sql("SELECT order_status, COUNT(order_id) AS count FROM orders GROUP BY order_status")
3. Number of active customers (customers who have placed at least one order):
DataFrame way:
results = ordersdf.select("customer_id").distinct().count()
SparkSQL way:
results = spark.sql("SELECT COUNT(DISTINCT(customer_id)) AS active_customers FROM orders")
4. Customers with the most number of orders:
DataFrame way:
results = ordersdf.filter("order_status = 'CLOSED'").groupBy("customer_id").count().sort("count", ascending=False)
SparkSQL way:
results = spark.sql("SELECT customer_id, COUNT(order_id) AS count FROM orders WHERE order_status = 'CLOSED' GROUP BY customer_id ORDER BY count DESC")
Spark Optimizations / Performance Tuning
Types of Optimizations:
-
Application Code Level Optimization
- Example: Usage of Cache, Using
reduceByKey
instead ofgroupByKey
.
- Example: Usage of Cache, Using
-
Cluster Level Optimization / Resource Level Optimization
- Containers / Executors
- Resource Level Optimization
In order for a job to run efficiently, the right amount of resources should be allocated.
Resources include:- Memory (RAM)
- CPU cores (Compute)
Scenario:
Consider a 10-node cluster (i.e., 10 worker nodes). Each machine has:
- 16 CPU cores
- 64GB RAM
Executor / Container / JVM: Is a container of resources (CPU & RAM). A single node can have more than one executor.
Strategies for Creating Containers
Right / Balanced Strategy for Creating Containers:
Overhead / off-heap memory = max(384MB, 7% of executor memory) ~ 1.5 GB (Not part of containers).
Therefore, 21GB - 1.5GB = 19GB
=> Each worker node can have ~ 3 executors.
Each executor holds 5 CPU cores & 19GB RAM.
Labels for Blogspot:
Apache Spark, Dataframes, Spark SQL, RDDs, SQL, Spark Table, Data Transformation, Spark Performance Tuning, Datasets, Data Processing, Big Data, Spark Optimization