from pyspark import SparkContext
= SparkContext("local", "MapExample")
sc = [1, 2, 3, 4, 5]
data = sc.parallelize(data)
rdd = rdd.map(lambda x: x * 2)
mapped_rdd
mapped_rdd.collect()
# Output:
2, 4, 6, 8, 10] [
Transformation & Optimization Techniques
Narrow
Narrow transformations in Spark work within partitions without shuffling data between them. They’re applied locally to each partition, avoiding data exchange. Within narrow transformations, data is transferred without executing data shuffling operations.
- Narrow transformations can be compared to performing straightforward operations on distinct data sets.
- Consider having various types of data in separate containers. You can perform actions on each data container or shift data between containers independently without requiring interaction or transfer.
- Examples of narrow transformations include modifying individual pieces of data, selecting specific items, or combining two data containers.
Map
- Applying a function to each element in the data set.
Filter
- Selecting elements based on a specified condition.
from pyspark import SparkContext
= SparkContext("local", "FilterExample")
sc = [1, 2, 3, 4, 5]
data = sc.parallelize(data)
rdd = rdd.filter(lambda x: x % 2 == 0)
filtered_rdd
filtered_rdd.collect()
# Output:
2, 4] [
Union
- Combining two datasets with the same schema
from pyspark import SparkContext
= SparkContext("local", "UnionExample")
sc = sc.parallelize([1, 2, 3])
rdd1 = sc.parallelize([4, 5, 6])
rdd2 = rdd1.union(rdd2)
union_rdd
union_rdd.collect()
# Output:
1, 2, 3, 4, 5, 6] [
Wide
Wide transformations in Spark involve redistributing and shuffling data between partitions, often leading to more resource-intensive and complex operations. Wide transformations involve the shuffling of data across partitions.
- Wide transformations are similar to reshuffling and redistributing data between different groups. Imagine having data sets that you want to combine or organize in a new way. However, this task is not as straightforward as working with just one data set.
- You need to coordinate and move data between these sets, which involves more complexity.
- For example, merging two data sets based on a common attribute requires rearranging the data between them, making it a wide transformation in data engineering.
GroupBy
- Aggregating data based on a specific key.
from pyspark import SparkContext
= SparkContext("local", "GroupByExample")
sc = [("apple", 2), ("banana", 3), ("apple", 5), ("banana", 1)]
data = sc.parallelize(data)
rdd = rdd.groupBy(lambda x: x[0])
grouped_rdd = grouped_rdd.mapValues(lambda values: sum([v[1] for v in values]))
sum_rdd # Output: [('apple', 7), ('banana', 4)] sum_rdd.collect()
Join
- Combining two data sets based on a common key.
from pyspark import SparkContext
= SparkContext("local", "JoinExample")
sc = sc.parallelize([("apple", 2), ("banana", 3)])
rdd1 = sc.parallelize([("apple", 5), ("banana", 1)])
rdd2 = rdd1.join(rdd2)
joined_rdd # Output: [('apple', (2, 5)), ('banana', (3, 1))] joined_rdd.collect()
Sort
- Rearranging data based on a specific criterion.
from pyspark import SparkContext
= SparkContext("local", "SortExample")
sc = [4, 2, 1, 3, 5]
data = sc.parallelize(data)
rdd = rdd.sortBy(lambda x: x, ascending=True)
sorted_rdd # Output: [1, 2, 3, 4, 5] sorted_rdd.collect()
Rule-Based Transformations
The DataFrame API in PySpark offers various transformations based on predefined rules. These transformations are designed to improve how queries are executed and boost overall performance. Let’s take a look at some common rule-based transformations.
Predicate pushdown
Pushing filtering conditions closer to the data source before processing to minimize data movement.
Constant folding
Evaluating constant expressions during query compilation to reduce computation during runtime.
Column pruning
Eliminating unnecessary columns from the query plan to enhance processing efficiency.
Join reordering
Rearranging join operations to minimize the intermediate data size and enhance the join performance.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Create a Spark session
= SparkSession.builder.appName("RuleBasedTransformations").getOrCreate()
spark
# Sample input data for DataFrame 1
= [
data1 "Alice", 25, "F"),
("Bob", 30, "M"),
("Charlie", 22, "M"),
("Diana", 28, "F")
(
]
# Sample input data for DataFrame 2
= [
data2 "Alice", "New York"),
("Bob", "San Francisco"),
("Charlie", "Los Angeles"),
("Eve", "Chicago")
(
]
# Create DataFrames
= ["name", "age", "gender"]
columns1 = spark.createDataFrame(data1, columns1)
df1
= ["name", "city"]
columns2 = spark.createDataFrame(data2, columns2)
df2
# Applying Predicate Pushdown (Filtering)
= df1.filter(col("age") > 25)
filtered_df
# Applying Constant Folding
= filtered_df.select(col("name"), col("age") + 2)
folded_df
# Applying Column Pruning
= folded_df.select(col("name"))
pruned_df
# Join Reordering
= df1.join(df2, on="name")
reordered_join
# Show the final results
print("Filtered DataFrame:")
filtered_df.show()
print("Folded DataFrame:")
folded_df.show()
print("Pruned DataFrame:")
pruned_df.show()
print("Reordered Join DataFrame:")
reordered_join.show()
# Stop the Spark session
spark.stop()
Optimization Techinques
Predicate pushdown
Apply a filter to DataFrame “df1” to only select rows where the “age” column is greater than 25.
Constant folding
Perform an arithmetic operation on the “age” column in the folded_df, adding a constant value of 2.
Column pruning
Select only the “name” column in the pruned_df, eliminating unnecessary columns from the query plan.
Join reordering
Perform a join between df1 and df2 on the “name” column, allowing Spark to potentially reorder the join for better performance.
Cost-Based Optimization
Spark employs cost-based optimization techniques to enhance the efficiency of query execution. These methods involve estimating and analyzing the costs associated with queries, leading to more informed decisions that result in improved performance.
Adaptive query execution
Dynamically adjusts the query plan during execution based on runtime statistics to optimize performance.
Cost-based join reordering
Optimizes join order based on estimated costs of different join paths.
Broadcast hash join
Optimizes small-table joins by broadcasting one table to all nodes, reducing data shuffling.
Shuffle partitioning and memory management
Efficiently manages data shuffling during operations like groupBy and aggregation and optimizes memory usage.
By utilizing these methods, Spark endeavors to deliver efficient and scalable data processing capabilities. It is essential to grasp the effective application of these transformations and optimizations to attain the best possible query performance and optimal utilization of system resources.
Example
- We’ll create two DataFrames (df1 and df2) with sample input data.
- Enable the adaptive query execution feature by setting the configuration parameter “spark.sql.adaptive.enabled” to “true”.
- Adaptive Query Execution allows Spark to adjust the query plan during execution based on runtime statistics.
- The code performs a join between df1 and df2 on the “name” column.
- Spark’s adaptive query execution dynamically adjusts the query plan based on runtime statistics, which can result in improved performance.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Create a Spark session
= SparkSession.builder.appName("CostBasedOptimization").getOrCreate()
spark
# Sample input data for DataFrame 1
= [
data1 "Alice", 25),
("Bob", 30),
("Charlie", 22),
("Diana", 28)
(
]
# Sample input data for DataFrame 2
= [
data2 "Alice", "New York"),
("Bob", "San Francisco"),
("Charlie", "Los Angeles"),
("Eve", "Chicago")
(
]
# Create DataFrames
= ["name", "age"]
columns1 = spark.createDataFrame(data1, columns1)
df1
= ["name", "city"]
columns2 = spark.createDataFrame(data2, columns2)
df2
# Enable adaptive query execution
set("spark.sql.adaptive.enabled", "true")
spark.conf.
# Applying Adaptive Query Execution (Runtime adaptive optimization)
= df1.join(df2, on="name")
optimized_join
# Show the optimized join result
print("Optimized Join DataFrame:")
optimized_join.show()
# Stop the Spark session
spark.stop()