Import Libraries

Spark Basics

General Python Review


pip

To ensure that requests will function, the pip program searches for the package in the Python Package Index (PyPI), resolves any dependencies, and installs everything in your current Python environment. pip list

pip install

The pip install <package> command looks for the latest version of the package and installs it.

pip install pyspark
pip install findspark  # used to find Spark
pip install pandas...

Used to make code from one module accessible in another. Python imports are crucial for a successful code structure. You may reuse code and keep your projects manageable by using imports effectively, which can increase your productivity.

# Import types to set Schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DateType
import pandas as pd ...

Spark Session


Find Spark

  • Initialize findspark to locate Spark
import findspark
findspark.init()

getOrCreate - Create Session

Gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder.

AppName

from pyspark.sql import SparkSession 
spark = SparkSession.builder.appName("MyApp").getOrCreate()

Pandas


Import CSV Data

We covered Pandas in Python here let’s say we are importing a csv file into a pandas

vaccination_data = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/KpHDlIzdtR63BdTofl1mOg/owid-covid-latest.csv')

Functions


def

Used to define a function. It is placed before a function name that is provided by the user to create a user-defined function.

This function takes a name as a parameter and prints a greeting
print(f"Hello, {name}!")

This is how you call a function named saysomething
saysomething("You")

DataFrames


Operators

Create DF from Pandas

  • After defining and creating the Schema in the Schema section below, we can create the DF
  • We can include any columns we want one by one or
  • Create a list of columns
  • or Use all
# Filter out unwanted columns
spark_df = spark.createDataFrame(vaccination_data[schema.fieldNames()])  # Use only the specified fields in this case all fieldNames()
# Show the Spark DataFrame
spark_df.show()

# Here is a simple example
import pandas as pd
mtcars = pd.read_csv('mtcars.csv')
sdf = spark.createDataFrame(mtcars)

Cache

An Apache Spark transformation often used on a DataFrame, data set, or RDD when you want to perform multiple actions. cache() caches the specified DataFrame, data set, or RDD in the memory of your cluster’s workers. Since cache() is a transformation, the caching operation takes place only when a Spark action (for example, count(), show(), take(), or write()) is also used on the same DataFrame, data set, or RDD in a single action.

df = spark.read.csv("customer.csv")
df.cache()

Count

Returns the number of elements with the specified value.

count = df.count()
print(count)

len - row count

Returns the number of items in an object. When the object is a string, the len() function returns the number of characters in the string.

row_count = len(df.collect())
print(row_count)

map

Returns a map object (an iterator) of the results after applying the given function to each item of a given iterable (list, tuple, etc.)

rdd = df.rdd.map(lambda row: (row['name'],
row['age']))

time

Returns the current time in the number of seconds since the Unix Epoch.

result = spark.sql("SELECT name, age FROM cust_tbl WHERE age > 30")
result.show()

selectExpr

  • Used to project a set of SQL expressions and return a new DataFrame. It is a variant of the select() method, but accepts SQL expressions as strings instead of Column objects.
  • Notice below how a new df is created
  • This will create a new DataFrame with two columns: username (renamed from name) and next_age (calculated as age + 1).
# Simple column selection
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('example').getOrCreate()
data = [('Alice', 25), ('Bob', 30)]
df = spark.createDataFrame(data, schema=['name', 'age'])
result = df.selectExpr('name AS username', 'age + 1 AS next_age').show()
  • A more complex chunk
  • This will create a new column distance_from_25 calculated using a SQL-like expression.
result = df.selectExpr('SQRT(POW((age - 25), 2)) AS distance_from_25')

Column


withColumn

  • Transformation function of DataFrame used to change the value, convert the data type of an existing column, create a new column, and many more.
  • Here we create a new column that’s a calculation of another
from pyspark.sql.functions import col
new_df = df.withColumn("age_squared", col("age") ** 2)

Rename Column

  • rename a column from vs to versus
sdf_new = sdf.withColumnRenamed("vs", "versus")
sdf_new.where(sdf['mpg'] < 18).show(3)

+----------+----+---+-----+---+----+----+-----+------+---+----+----+
|Unnamed: 0| mpg|cyl| disp| hp|drat|  wt| qsec|versus| am|gear|carb|
+----------+----+---+-----+---+----+----+-----+------+---+----+----+
|Duster 360|14.3|  8|360.0|245|3.21|3.57|15.84|     0|  0|   3|   4|
| Merc 280C|17.8|  6|167.6|123|3.92|3.44| 18.9|     1|  0|   4|   4|
|Merc 450SE|16.4|  8|275.8|180|3.07|4.07| 17.4|     0|  0|   3|   3|
+----------+----+---+-----+---+----+----+-----+------+---+----+----+
only showing top 3 rows

Add Column

# Create df
data = [("John", 25), ("Peter", 30), ("David", 35)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns) 

# Create new column and change values
updated_df = df \ 
    .withColumn("DoubleAge", col("Age") * 2)  # Create a new column "DoubleAge" by doubling the "Age" column 
updated_df = updated_df \ 
    .withColumn("AgeGroup", when(col("Age") <= 30, "Young") 
                .when((col("Age") > 30) & (col("Age") <= 40), "Middle-aged") 
                .otherwise("Old"))  # Create a new column "AgeGroup" based on conditions 
updated_df.show() 

Display


print

Prints the specified message to the screen or other standard output device. The message can be a string or any other object; the object will be converted into a string before being written to the screen.

  • We can also print a variable using the f & {variable}
  • Or we can print columns
print("Hello, PySpark!")
print(f"Hello, {input variable}")
print(vaccination_data[columns_to_display].head())

show

Spark DataFrame show() is used to display the contents of the DataFrame in a table row and column format . By default, it shows only twenty rows, and the column values are truncated at twenty characters.

df.show()
  • Here we filter and show
# List the names of the columns you want to display
columns_to_display = ['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']
# Display the first 5 records of the specified columns
spark_df.select(columns_to_display).show(5)

# another example of filter df and show
spark_df.select('continent', 'total_cases').show(5)

Filter


Select

See SparkSQL section

Used to select one or multiple columns, nested columns, column by index, all columns from the list, by regular expression from a DataFrame. select() is a transformation function in Spark and returns a new DataFrame with the selected columns.

  • Or we can create a list of columns to display and use the list
selected_df = df.select('name', 'age')

# List the names of the columns you want to display
columns_to_display = ['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']
# Display the first 5 records of the specified columns
spark_df.select(columns_to_display).show(5)

Conditional

sdf.where(sdf['mpg'] < 18).show(3) 

+----------+----+---+-----+---+----+----+-----+---+---+----+----+
|Unnamed: 0| mpg|cyl| disp| hp|drat|  wt| qsec| vs| am|gear|carb|
+----------+----+---+-----+---+----+----+-----+---+---+----+----+
|Duster 360|14.3|  8|360.0|245|3.21|3.57|15.84|  0|  0|   3|   4|
| Merc 280C|17.8|  6|167.6|123|3.92|3.44| 18.9|  1|  0|   4|   4|
|Merc 450SE|16.4|  8|275.8|180|3.07|4.07| 17.4|  0|  0|   3|   3|
+----------+----+---+-----+---+----+----+-----+---+---+----+----+
only showing top 3 rows

Where

  • The function “where()” filters the Dataframe rows based on the given condition. It returns a new DataFrame containing the rows that satisfy the given condition.
sdf.where(sdf['mpg'] < 18).show(3) 

filtered_df = df.where(df['age'] > 30)

Join


  • Let’s combine two DFs based on specific conditions

Create DF1

# define sample DataFrame 1 
data = [("A101", "John"), ("A102", "Peter"), ("A103", "Charlie")] 
columns = ["emp_id", "emp_name"] 
dataframe_1 = spark.createDataFrame(data, columns) 

+------+--------+
|emp_id|emp_name|
+------+--------+
|  A101|    John|
|  A102|   Peter|
|  A103| Charlie|
+------+--------+

Create DF2

# define sample DataFrame 2 
data = [("A101", 1000), ("A102", 2000), ("A103", 3000)]
columns = ["emp_id", "salary"]
dataframe_2 = spark.createDataFrame(data, columns)

+------+------+
|emp_id|salary|
+------+------+
|  A101|  1000|
|  A102|  2000|
|  A103|  3000|
+------+------+

Join

# create a new DataFrame, "combined_df" by performing an inner join
combined_df = dataframe_1.join(dataframe_2, on="emp_id", how="inner")

combined_df.show()
+------+--------+------+
|emp_id|emp_name|salary|
+------+--------+------+
|  A103| Charlie|  3000|
|  A102|   Peter|  2000|
|  A101|    John|  1000|
+------+--------+------+

Fill NAs


  • We can fill any missing values with “fillna()” or “fill()” function fill the missing values with a specified value.
  • Create df first
# define sample DataFrame 1
data = [("A101", 1000), ("A102", 2000), ("A103",None)]
columns = ["emp_id", "salary"]
dataframe_1 = spark.createDataFrame(data, columns)
dataframe_1.show()

+------+------+
|emp_id|salary|
+------+------+
|  A101|  1000|
|  A102|  2000|
|  A103|  null|
+------+------+

Fillna

filled_df = dataframe_1.fillna({"salary": 3000}) 
filled_df.head(3)

[Row(emp_id='A101', salary=1000),
 Row(emp_id='A102', salary=2000),
 Row(emp_id='A103', salary=3000)]
 
# OR
filled_df.show()

+------+------+
|emp_id|salary|
+------+------+
|  A101|  1000|
|  A102|  2000|
|  A103|  3000|
+------+------+

Aggregation


Agg

Used to get the aggregate values like count, sum, avg, min, and max for each group.

agg_df = df.groupBy("column_name").agg({"column_to_aggregate": "sum"}) 

Average

Spark DataFrames support a number of commonly used functions to aggregate data after grouping.

  • In this example we compute the average weight of cars groupted by their cylinders as shown below.
sdf.groupby(['cyl'])\
        .agg({"wt": "AVG"})\
        .show(5)
        
+---+-----------------+
|cyl|          avg(wt)|
+---+-----------------+
|  6|3.117142857142857|
|  8|3.999214285714286|
|  4|2.285727272727273|
+---+-----------------+

Sort Aggregation

  • Sort by Descending order to highlight the most popular cars
car_counts = sdf.groupby(['cyl'])\
                        .agg({"wt": "count"})\
                        .sort("count(wt)", ascending=False)\
                        .show(5)
                        
+---+---------+
|cyl|count(wt)|
+---+---------+
|  8|       14|
|  4|       11|
|  6|        7|
+---+---------+

GroupBy

Used to collect the identical data into groups on DataFrame and perform count, sum, avg, min, max functions on the grouped data.

import pandas as pd 
data = {'Category': ['A', 'B', 'A', 'B', 'A', 'B'], 
        'Value': [10, 20, 15, 25, 30, 35]}
df = pd.DataFrame(data) 

grouped = df.groupby('Category').agg({'Value': ['count', 'sum', 'mean', 'min', 'max']}) 
print(grouped) 

Repartition


repartition

Used to increase or decrease the RDD or DataFrame partitions by number of partitions or by a single column name or multiple column names.

#Create df
data = [("John", 25), ("Peter", 30), ("Julie", 35), ("David", 40), ("Eva", 45)] 
columns = ["Name", "Age"] 
df = spark.createDataFrame(data, columns) 

# Show current number of partitions
print("Number of partitions before repartitioning: ", df.rdd.getNumPartitions()) 

# Repartition the DF to 2 partitions
df_repartitioned = df.repartition(2)

# Show number of partitions
print("Number of partitions after repartitioning: ", df_repartitioned.rdd.getNumPartitions())

# Stop Spark session
spark.stop() 

Schema


Spark provides a structured data processing framework that can define and enforce schemas for various data sources, including CSV files.

CSV

Let’s look at the steps to define and use a user-defined schema for a CSV file in PySpark:

Import the required libraries.

from pyspark.sql.types import StructType, IntegerType, FloatType, StringType, StructField

Define Schema

  • Let’s say we just imported data in a file format
  • We can either allow Spark to guess the schema or define it
  • Explore the data: Understand the different data types present in each column.
  • Column data types: Determine the appropriate data types for each column based on your observed values.
  • Define the schema: Use the ‘StructType’ class in Spark and create a ‘StructField’ for each column, mentioning the column name, data type, and other properties.
  • ‘False’ indicates null values are NOT allowed for the column.
# Example
schema = StructType([
    StructField("Emp_Id", StringType(), False),
    StructField("Emp_Name", StringType(), False),
    StructField("Department", StringType(), False),
    StructField("Salary", IntegerType(), False),
    StructField("Phone", IntegerType(), True),
])

# Here is the data for the schema above
emp_id,emp_name,dept,salary,phone
A101,jhon,computer science,1000,+1 (701) 846 958
A102,Peter,Electronics,2000,
A103,Micheal,IT,2500,

# Another example
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DateType
import pandas as pd  
# Define the schema
schema = StructType([
    StructField("continent", StringType(), True),
    StructField("total_cases", LongType(), True),
    StructField("total_deaths", LongType(), True),
    StructField("total_vaccinations", LongType(), True),
    StructField("population", LongType(), True)
])

Create DF from CSV & Schema

  • For the simple example of employee.csv shown above, after
  • We’ve imported the libraries
  • Explored the data
  • Defined the Schema
  • Now we can create the DF from the CSV file
#create a dataframe on top a csv file
df = (spark.read
  .format("csv")
  .schema(schema)
  .option("header", "true")
  .load("employee.csv")
)
# display the dataframe content
df.show()

# print Schema if we want
df.printSchema()

convert datatypes

  • At times it is necessary to convert the columns to the appropriate data types
# Convert the columns to the appropriate data types
vaccination_data['continent'] = vaccination_data['continent'].astype(str)  # Ensures continent is a string
vaccination_data['total_cases'] = vaccination_data['total_cases'].fillna(0).astype('int64')  # Fill NaNs and convert to int
vaccination_data['total_deaths'] = vaccination_data['total_deaths'].fillna(0).astype('int64')  # Fill NaNs and convert to int
vaccination_data['total_vaccinations'] = vaccination_data['total_vaccinations'].fillna(0).astype('int64')  # Fill NaNs and convert to int
vaccination_data['population'] = vaccination_data['population'].fillna(0).astype('int64')  # Fill NaNs and convert to int

printSchema

Used to print or display the schema of the DataFrame or data set in tree format along with the column name and data type. If you have a DataFrame or data set with a nested structure, it displays the schema in a nested tree format.

print("Schema of the Spark DataFrame:")
spark_df.printSchema()
# Print the structure of the DataFrame (columns and types)

# OUTPUT
Schema of the Spark DataFrame:
root
 |-- continent: string (nullable = true)
 |-- total_cases: long (nullable = true)
 |-- total_deaths: long (nullable = true)
 |-- total_vaccinations: long (nullable = true)
 |-- population: long (nullable = true)

define Schema

If we don’t want to chance Spark to guess the schema of the imported data we can define it manually

# Define the schema
schema = StructType([
    StructField("continent", StringType(), True),
    StructField("total_cases", LongType(), True),
    StructField("total_deaths", LongType(), True),
    StructField("total_vaccinations", LongType(), True),
    StructField("population", LongType(), True)
])

JSON

spark.read.json Schema

Spark SQL can automatically infer the schema of a JSON data set and load it as a DataFrame. The read.json() function loads data from a directory of JSON files where each line of the files is a JSON object. Note that the file offered as a JSON file is not a typical JSON file.

json_df = spark.read.json("customer.json")

Views


TempView

Creates a temporary view cust_tbl that can later be used to query the data. The only required parameter is the name of the view cust_tbl .

Replace is used to replace a TempView with the same name with desired values

df.createTempView("cust_tbl")
df.replaceTempView("cust_tbl")

Parallelize Processing


parallelize

To breakup our data into parallel processes we can distribute a local Python collection to form an RDD. Using range is recommended if the input represents a range for performance

rdd = sc.parallelize([1, 2, 3, 4, 5])

SparkSQL


To issue any SQL query, use the sql() method on the SparkSession instance . All spark.sql queries executed in this manner return a DataFrame on which you may perform further Spark operations if required.

See SparkSQL page for more

result = spark.sql("SELECT name, age FROM cust_tbl WHERE age > 30")
result.show()

General


docker-compose

Tool for defining and running multicontainer Docker applications. It uses the YAML file to configure the services and enables us to create and start all the services from just one configuration file. 

# docker-compose.yml
version: '3'
services:
  web:
    image: nginx:latest
    ports:
      - "80:80"
  db:
    image: postgres:latest 

docker exec

Runs a new command in a running container. Only runs while the container’s primary process is running, and it is not restarted if the container is restarted. 

docker exec -it container_name command_to_run
docker exec -it my_container /bin/bash 

docker rm

Used to remove one or more containers

# To remove multiple containers
docker rm container1_name_or_id container2_name_or_id 

# To remove ALL stopped containers
docker rm $(docker ps -aq) 

docker run

This runs a command in a new container, getting the image and starting the container if needed

docker run [OPTIONS] IMAGE [COMMAND] [ARG...] 

git-clone

You can create a copy of a specific repository or branch within a repository.

git clone REPOSITORY_URL [DESTINATION_DIRECTORY]

sc.setloglevel

Using this method, you can change the log level to the desired level. Valid log levels include ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, and WARN.

from pyspark import SparkContext 
# Create sparkContext
sc = SparkContext("local", "LogLevelExample") 
# Set log level to desired level (INFO, ERROR)
sc.setLogLevel("INFO")

setMaster

Denotes where to run your Spark application, local or cluster. When you run on a cluster, you need to specify the address of the Spark master or Driver URL for a distributed cluster. We usually assign a local[*] value to setMaster() in Spark while doing internal testing.

<pre>from pyspark import SparkContext </pre>
<p>Create a SparkContext with a local master:</p>
<pre>sc = SparkContext("local[*]", "MyApp")</pre></td>

source

Used to execute a script file in the current shell environment, allowing you to modify the current shell environment in the same way you would if you had typed commands manually.

# Assuming a Bash shell
source myscript.sh 

virtualenv

Primarily a command-line application. It modifies the environment variables in a shell to create an isolated Python environment, so you’ll need to have a shell to run it. You can type in virtualenv (name of the application), followed by flags that control its behavior.

# Create a virtual environment named "myenv"
virtualenv myenv