pip install pyspark# used to find Spark
pip install findspark pip install pandas...
Import Libraries
Spark Basics
General Python Review
pip
To ensure that requests will function, the pip program searches for the package in the Python Package Index (PyPI), resolves any dependencies, and installs everything in your current Python environment. pip list
pip install
The pip install <package> command looks for the latest version of the package and installs it.
Used to make code from one module accessible in another. Python imports are crucial for a successful code structure. You may reuse code and keep your projects manageable by using imports effectively, which can increase your productivity.
# Import types to set Schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DateType
import pandas as pd ...
Spark Session
Find Spark
- Initialize findspark to locate Spark
import findspark
findspark.init()
getOrCreate - Create Session
Gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder.
AppName
from pyspark.sql import SparkSession
= SparkSession.builder.appName("MyApp").getOrCreate() spark
Pandas
Import CSV Data
We covered Pandas in Python here let’s say we are importing a csv file into a pandas
= pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/KpHDlIzdtR63BdTofl1mOg/owid-covid-latest.csv') vaccination_data
Functions
def
Used to define a function. It is placed before a function name that is provided by the user to create a user-defined function.
as a parameter and prints a greeting
This function takes a name print(f"Hello, {name}!")
is how you call a function named saysomething
This "You") saysomething(
print for
We can combine a for loop with print to loop through a list
= ["apple", "banana", "cherry"]
fruits print(f"I like {fruit}s")
DataFrames
Operators
Create DF from Pandas
- After defining and creating the Schema in the Schema section below, we can create the DF
- We can include any columns we want one by one or
- Create a list of columns
- or Use all
# Filter out unwanted columns
= spark.createDataFrame(vaccination_data[schema.fieldNames()]) # Use only the specified fields in this case all fieldNames()
spark_df # Show the Spark DataFrame
spark_df.show()
# Here is a simple example
import pandas as pd
= pd.read_csv('mtcars.csv')
mtcars = spark.createDataFrame(mtcars) sdf
Cache
An Apache Spark transformation often used on a DataFrame, data set, or RDD when you want to perform multiple actions. cache() caches the specified DataFrame, data set, or RDD in the memory of your cluster’s workers. Since cache() is a transformation, the caching operation takes place only when a Spark action (for example, count(), show(), take(), or write()) is also used on the same DataFrame, data set, or RDD in a single action.
= spark.read.csv("customer.csv")
df df.cache()
Count
Returns the number of elements with the specified value.
= df.count()
count print(count)
len - row count
Returns the number of items in an object. When the object is a string, the len() function returns the number of characters in the string.
= len(df.collect())
row_count print(row_count)
map
Returns a map object (an iterator) of the results after applying the given function to each item of a given iterable (list, tuple, etc.)
= df.rdd.map(lambda row: (row['name'],
rdd 'age'])) row[
time
Returns the current time in the number of seconds since the Unix Epoch.
= spark.sql("SELECT name, age FROM cust_tbl WHERE age > 30")
result result.show()
selectExpr
- Used to project a set of SQL expressions and return a new DataFrame. It is a variant of the
select()
method, but accepts SQL expressions as strings instead of Column objects. - Notice below how a new df is created
- This will create a new DataFrame with two columns:
username
(renamed fromname
) andnext_age
(calculated asage + 1
).
# Simple column selection
from pyspark.sql import SparkSession
= SparkSession.builder.appName('example').getOrCreate()
spark = [('Alice', 25), ('Bob', 30)]
data = spark.createDataFrame(data, schema=['name', 'age'])
df = df.selectExpr('name AS username', 'age + 1 AS next_age').show() result
- A more complex chunk
- This will create a new column
distance_from_25
calculated using a SQL-like expression.
= df.selectExpr('SQRT(POW((age - 25), 2)) AS distance_from_25') result
Column
withColumn
- Transformation function of DataFrame used to change the value, convert the data type of an existing column, create a new column, and many more.
- Here we create a new column that’s a calculation of another
from pyspark.sql.functions import col
= df.withColumn("age_squared", col("age") ** 2) new_df
Rename Column
- rename a column from vs to versus
= sdf.withColumnRenamed("vs", "versus")
sdf_new 'mpg'] < 18).show(3)
sdf_new.where(sdf[
+----------+----+---+-----+---+----+----+-----+------+---+----+----+
|Unnamed: 0| mpg|cyl| disp| hp|drat| wt| qsec|versus| am|gear|carb|
+----------+----+---+-----+---+----+----+-----+------+---+----+----+
|Duster 360|14.3| 8|360.0|245|3.21|3.57|15.84| 0| 0| 3| 4|
| Merc 280C|17.8| 6|167.6|123|3.92|3.44| 18.9| 1| 0| 4| 4|
|Merc 450SE|16.4| 8|275.8|180|3.07|4.07| 17.4| 0| 0| 3| 3|
+----------+----+---+-----+---+----+----+-----+------+---+----+----+
3 rows only showing top
Add Column
# Create df
= [("John", 25), ("Peter", 30), ("David", 35)]
data = ["Name", "Age"]
columns = spark.createDataFrame(data, columns)
df
# Create new column and change values
= df \
updated_df "DoubleAge", col("Age") * 2) # Create a new column "DoubleAge" by doubling the "Age" column
.withColumn(= updated_df \
updated_df "AgeGroup", when(col("Age") <= 30, "Young")
.withColumn("Age") > 30) & (col("Age") <= 40), "Middle-aged")
.when((col("Old")) # Create a new column "AgeGroup" based on conditions
.otherwise( updated_df.show()
Display
Prints the specified message to the screen or other standard output device. The message can be a string or any other object; the object will be converted into a string before being written to the screen.
- We can also print a variable using the f & {variable}
- Or we can print columns
print("Hello, PySpark!")
print(f"Hello, {input variable}")
print(vaccination_data[columns_to_display].head())
show
Spark DataFrame show() is used to display the contents of the DataFrame in a table row and column format . By default, it shows only twenty rows, and the column values are truncated at twenty characters.
df.show()
- Here we filter and show
# List the names of the columns you want to display
= ['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']
columns_to_display # Display the first 5 records of the specified columns
5)
spark_df.select(columns_to_display).show(
# another example of filter df and show
'continent', 'total_cases').show(5) spark_df.select(
Filter
Select
See SparkSQL section
Used to select one or multiple columns, nested columns, column by index, all columns from the list, by regular expression from a DataFrame. select() is a transformation function in Spark and returns a new DataFrame with the selected columns.
- Or we can create a list of columns to display and use the list
= df.select('name', 'age')
selected_df
# List the names of the columns you want to display
= ['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']
columns_to_display # Display the first 5 records of the specified columns
5) spark_df.select(columns_to_display).show(
Conditional
'mpg'] < 18).show(3)
sdf.where(sdf[
+----------+----+---+-----+---+----+----+-----+---+---+----+----+
|Unnamed: 0| mpg|cyl| disp| hp|drat| wt| qsec| vs| am|gear|carb|
+----------+----+---+-----+---+----+----+-----+---+---+----+----+
|Duster 360|14.3| 8|360.0|245|3.21|3.57|15.84| 0| 0| 3| 4|
| Merc 280C|17.8| 6|167.6|123|3.92|3.44| 18.9| 1| 0| 4| 4|
|Merc 450SE|16.4| 8|275.8|180|3.07|4.07| 17.4| 0| 0| 3| 3|
+----------+----+---+-----+---+----+----+-----+---+---+----+----+
3 rows only showing top
Where
- The function “where()” filters the Dataframe rows based on the given condition. It returns a new DataFrame containing the rows that satisfy the given condition.
'mpg'] < 18).show(3)
sdf.where(sdf[
= df.where(df['age'] > 30) filtered_df
Join
- Let’s combine two DFs based on specific conditions
Create DF1
# define sample DataFrame 1
= [("A101", "John"), ("A102", "Peter"), ("A103", "Charlie")]
data = ["emp_id", "emp_name"]
columns = spark.createDataFrame(data, columns)
dataframe_1
+------+--------+
|emp_id|emp_name|
+------+--------+
| A101| John|
| A102| Peter|
| A103| Charlie|
+------+--------+
Create DF2
# define sample DataFrame 2
= [("A101", 1000), ("A102", 2000), ("A103", 3000)]
data = ["emp_id", "salary"]
columns = spark.createDataFrame(data, columns)
dataframe_2
+------+------+
|emp_id|salary|
+------+------+
| A101| 1000|
| A102| 2000|
| A103| 3000|
+------+------+
Join
# create a new DataFrame, "combined_df" by performing an inner join
= dataframe_1.join(dataframe_2, on="emp_id", how="inner")
combined_df
combined_df.show()+------+--------+------+
|emp_id|emp_name|salary|
+------+--------+------+
| A103| Charlie| 3000|
| A102| Peter| 2000|
| A101| John| 1000|
+------+--------+------+
Fill NAs
- We can fill any missing values with “fillna()” or “fill()” function fill the missing values with a specified value.
- Create df first
# define sample DataFrame 1
= [("A101", 1000), ("A102", 2000), ("A103",None)]
data = ["emp_id", "salary"]
columns = spark.createDataFrame(data, columns)
dataframe_1
dataframe_1.show()
+------+------+
|emp_id|salary|
+------+------+
| A101| 1000|
| A102| 2000|
| A103| null|
+------+------+
Fillna
= dataframe_1.fillna({"salary": 3000})
filled_df 3)
filled_df.head(
='A101', salary=1000),
[Row(emp_id='A102', salary=2000),
Row(emp_id='A103', salary=3000)]
Row(emp_id
# OR
filled_df.show()
+------+------+
|emp_id|salary|
+------+------+
| A101| 1000|
| A102| 2000|
| A103| 3000|
+------+------+
Aggregation
Agg
Used to get the aggregate values like count, sum, avg, min, and max for each group.
= df.groupBy("column_name").agg({"column_to_aggregate": "sum"}) agg_df
Average
Spark DataFrames support a number of commonly used functions to aggregate data after grouping.
- In this example we compute the average weight of cars groupted by their cylinders as shown below.
'cyl'])\
sdf.groupby(["wt": "AVG"})\
.agg({5)
.show(
+---+-----------------+
|cyl| avg(wt)|
+---+-----------------+
| 6|3.117142857142857|
| 8|3.999214285714286|
| 4|2.285727272727273|
+---+-----------------+
Sort Aggregation
- Sort by Descending order to highlight the most popular cars
= sdf.groupby(['cyl'])\
car_counts "wt": "count"})\
.agg({"count(wt)", ascending=False)\
.sort(5)
.show(
+---+---------+
|cyl|count(wt)|
+---+---------+
| 8| 14|
| 4| 11|
| 6| 7|
+---+---------+
GroupBy
Used to collect the identical data into groups on DataFrame and perform count, sum, avg, min, max functions on the grouped data.
import pandas as pd
= {'Category': ['A', 'B', 'A', 'B', 'A', 'B'],
data 'Value': [10, 20, 15, 25, 30, 35]}
= pd.DataFrame(data)
df
= df.groupby('Category').agg({'Value': ['count', 'sum', 'mean', 'min', 'max']})
grouped print(grouped)
Repartition
repartition
Used to increase or decrease the RDD or DataFrame partitions by number of partitions or by a single column name or multiple column names.
#Create df
= [("John", 25), ("Peter", 30), ("Julie", 35), ("David", 40), ("Eva", 45)]
data = ["Name", "Age"]
columns = spark.createDataFrame(data, columns)
df
# Show current number of partitions
print("Number of partitions before repartitioning: ", df.rdd.getNumPartitions())
# Repartition the DF to 2 partitions
= df.repartition(2)
df_repartitioned
# Show number of partitions
print("Number of partitions after repartitioning: ", df_repartitioned.rdd.getNumPartitions())
# Stop Spark session
spark.stop()
Schema
Spark provides a structured data processing framework that can define and enforce schemas for various data sources, including CSV files.
CSV
Let’s look at the steps to define and use a user-defined schema for a CSV file in PySpark:
Import the required libraries.
from pyspark.sql.types import StructType, IntegerType, FloatType, StringType, StructField
Define Schema
- Let’s say we just imported data in a file format
- We can either allow Spark to guess the schema or define it
- Explore the data: Understand the different data types present in each column.
- Column data types: Determine the appropriate data types for each column based on your observed values.
- Define the schema: Use the ‘StructType’ class in Spark and create a ‘StructField’ for each column, mentioning the column name, data type, and other properties.
- ‘False’ indicates null values are NOT allowed for the column.
# Example
= StructType([
schema "Emp_Id", StringType(), False),
StructField("Emp_Name", StringType(), False),
StructField("Department", StringType(), False),
StructField("Salary", IntegerType(), False),
StructField("Phone", IntegerType(), True),
StructField(
])
# Here is the data for the schema above
emp_id,emp_name,dept,salary,phone1000,+1 (701) 846 958
A101,jhon,computer science,2000,
A102,Peter,Electronics,2500,
A103,Micheal,IT,
# Another example
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DateType
import pandas as pd
# Define the schema
= StructType([
schema "continent", StringType(), True),
StructField("total_cases", LongType(), True),
StructField("total_deaths", LongType(), True),
StructField("total_vaccinations", LongType(), True),
StructField("population", LongType(), True)
StructField( ])
Create DF from CSV & Schema
- For the simple example of employee.csv shown above, after
- We’ve imported the libraries
- Explored the data
- Defined the Schema
- Now we can create the DF from the CSV file
#create a dataframe on top a csv file
= (spark.read
df format("csv")
.
.schema(schema)"header", "true")
.option("employee.csv")
.load(
)# display the dataframe content
df.show()
# print Schema if we want
df.printSchema()
convert datatypes
- At times it is necessary to convert the columns to the appropriate data types
# Convert the columns to the appropriate data types
'continent'] = vaccination_data['continent'].astype(str) # Ensures continent is a string
vaccination_data['total_cases'] = vaccination_data['total_cases'].fillna(0).astype('int64') # Fill NaNs and convert to int
vaccination_data['total_deaths'] = vaccination_data['total_deaths'].fillna(0).astype('int64') # Fill NaNs and convert to int
vaccination_data['total_vaccinations'] = vaccination_data['total_vaccinations'].fillna(0).astype('int64') # Fill NaNs and convert to int
vaccination_data['population'] = vaccination_data['population'].fillna(0).astype('int64') # Fill NaNs and convert to int vaccination_data[
printSchema
Used to print or display the schema of the DataFrame or data set in tree format along with the column name and data type. If you have a DataFrame or data set with a nested structure, it displays the schema in a nested tree format.
print("Schema of the Spark DataFrame:")
spark_df.printSchema()# Print the structure of the DataFrame (columns and types)
# OUTPUT
Schema of the Spark DataFrame:
root|-- continent: string (nullable = true)
|-- total_cases: long (nullable = true)
|-- total_deaths: long (nullable = true)
|-- total_vaccinations: long (nullable = true)
|-- population: long (nullable = true)
define Schema
If we don’t want to chance Spark to guess the schema of the imported data we can define it manually
# Define the schema
= StructType([
schema "continent", StringType(), True),
StructField("total_cases", LongType(), True),
StructField("total_deaths", LongType(), True),
StructField("total_vaccinations", LongType(), True),
StructField("population", LongType(), True)
StructField( ])
JSON
spark.read.json Schema
Spark SQL can automatically infer the schema of a JSON data set and load it as a DataFrame. The read.json() function loads data from a directory of JSON files where each line of the files is a JSON object. Note that the file offered as a JSON file is not a typical JSON file.
= spark.read.json("customer.json") json_df
Views
TempView
Creates a temporary view cust_tbl that can later be used to query the data. The only required parameter is the name of the view cust_tbl .
Replace is used to replace a TempView with the same name with desired values
"cust_tbl")
df.createTempView("cust_tbl") df.replaceTempView(
Parallelize Processing
parallelize
To breakup our data into parallel processes we can distribute a local Python collection to form an RDD. Using range is recommended if the input represents a range for performance
= sc.parallelize([1, 2, 3, 4, 5]) rdd
SparkSQL
To issue any SQL query, use the sql() method on the SparkSession instance . All spark.sql queries executed in this manner return a DataFrame on which you may perform further Spark operations if required.
See SparkSQL page for more
= spark.sql("SELECT name, age FROM cust_tbl WHERE age > 30")
result result.show()
General
docker-compose
Tool for defining and running multicontainer Docker applications. It uses the YAML file to configure the services and enables us to create and start all the services from just one configuration file.
# docker-compose.yml
'3'
version:
services:
web:
image: nginx:latest
ports:- "80:80"
db: image: postgres:latest
docker exec
Runs a new command in a running container. Only runs while the container’s primary process is running, and it is not restarted if the container is restarted.
exec -it container_name command_to_run
docker exec -it my_container /bin/bash docker
docker rm
Used to remove one or more containers
# To remove multiple containers
docker rm container1_name_or_id container2_name_or_id
# To remove ALL stopped containers
-aq) docker rm $(docker ps
docker run
This runs a command in a new container, getting the image and starting the container if needed
docker run [OPTIONS] IMAGE [COMMAND] [ARG...]
git-clone
You can create a copy of a specific repository or branch within a repository.
git clone REPOSITORY_URL [DESTINATION_DIRECTORY]
sc.setloglevel
Using this method, you can change the log level to the desired level. Valid log levels include ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, and WARN.
from pyspark import SparkContext
# Create sparkContext
= SparkContext("local", "LogLevelExample")
sc # Set log level to desired level (INFO, ERROR)
"INFO") sc.setLogLevel(
setMaster
Denotes where to run your Spark application, local or cluster. When you run on a cluster, you need to specify the address of the Spark master or Driver URL for a distributed cluster. We usually assign a local[*] value to setMaster() in Spark while doing internal testing.
<pre>from pyspark import SparkContext </pre>
<p>Create a SparkContext with a local master:</p>
<pre>sc = SparkContext("local[*]", "MyApp")</pre></td>
source
Used to execute a script file in the current shell environment, allowing you to modify the current shell environment in the same way you would if you had typed commands manually.
# Assuming a Bash shell
source myscript.sh
virtualenv
Primarily a command-line application. It modifies the environment variables in a shell to create an isolated Python environment, so you’ll need to have a shell to run it. You can type in virtualenv (name of the application), followed by flags that control its behavior.
# Create a virtual environment named "myenv"
virtualenv myenv