# Installing required packages
pip install pyspark
pip install findspark==0.14.1
pip install pyarrow
pip install pandas==1.19.5
pip install numpy
import findspark
findspark.init()
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
Spark Session
UDF
User Defined Functions
Apache Spark has become the de-facto standard in processing big data. To enable data scientists to leverage the value of big data, Spark added a Python API in version 0.7, with support for user-defined functions (UDF). These user-defined functions operate one-row-at-a-time, and thus suffer from high serialization and invocation overhead. As a result, many data pipelines define UDFs in Java and Scala and then invoke them from Python.
Pandas UDFs built on top of Apache Arrow bring you the _best of both worlds_—the ability to define low-overhead, high-performance UDFs entirely in Python.
In addition, UDFs can be registered and invoked in SQL out of the box by registering a regular python function using the @pandas_udf()
decorator. We can then apply this UDF to our wt
column.
Let’s first start with
Setup
Create Spark Session & Context
# Creating a spark context class
= SparkContext()
sc
# Creating a spark session
= SparkSession \
spark \
.builder "Python Spark DataFrames basic example") \
.appName("spark.some.config.option", "some-value") \
.config( .getOrCreate()
Initialize Spark
spark
# OUTPUT
- in-memory
SparkSession
SparkContext
Spark UI.4.3Masterlocal[*]AppNamepyspark-shell Versionv2
Load Data & Create View
In this section, you will first read the CSV file into a Pandas Dataframe and then read it into a Spark Dataframe Pandas is a library used for data manipulation and analysis.
# Read the file using `read_csv` function in pandas
= pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/mtcars.csv')
mtcars
# preview some records
mtcars.head()
0 mpg cyl disp hp drat wt qsec vs am gear carb
Unnamed: 0 Mazda RX4 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4
1 Mazda RX4 Wag 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4
2 Datsun 710 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1
3 Hornet 4 Drive 21.4 6 258.0 110 3.08 3.215 19.44 1 0 3 1
4 Hornet Sportabout 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2
Rename Column
={'Unnamed: 0':'name'}, inplace=True )
mtcars.rename( columns
mtcars.head()
name mpg cyl disp hp drat wt qsec vs am gear carb0 Mazda RX4 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4
1 Mazda RX4 Wag 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4
2 Datsun 710 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1
3 Hornet 4 Drive 21.4 6 258.0 110 3.08 3.215 19.44 1 0 3 1
4 Hornet Sportabout 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2
Load Data to SparkDF
CreateDataFrame
= spark.createDataFrame(mtcars)
sdf
# preview schema
sdf.printSchema()
root|-- name: string (nullable = true)
|-- mpg: double (nullable = true)
|-- cyl: long (nullable = true)
|-- disp: double (nullable = true)
|-- hp: long (nullable = true)
|-- drat: double (nullable = true)
|-- wt: double (nullable = true)
|-- qsec: double (nullable = true)
|-- vs: long (nullable = true)
|-- am: long (nullable = true)
|-- gear: long (nullable = true)
|-- carb: long (nullable = true)
Rename Column
- We do not modify the df we just create a new df and
- replace vs with versus
= sdf.withColumnRenamed("vs", "versus")
sdf_new 5)
sdf_new.show(+-----------------+----+---+-----+---+----+-----+-----+------+---+----+----+
| name| mpg|cyl| disp| hp|drat| wt| qsec|versus| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+------+---+----+----+
| Mazda RX4|21.0| 6|160.0|110| 3.9| 2.62|16.46| 0| 1| 4| 4|
| Mazda RX4 Wag|21.0| 6|160.0|110| 3.9|2.875|17.02| 0| 1| 4| 4|
| Datsun 710|22.8| 4|108.0| 93|3.85| 2.32|18.61| 1| 1| 4| 1|
| Hornet 4 Drive|21.4| 6|258.0|110|3.08|3.215|19.44| 1| 0| 3| 1|
|Hornet Sportabout|18.7| 8|360.0|175|3.15| 3.44|17.02| 0| 0| 3| 2|
+-----------------+----+---+-----+---+----+-----+-----+------+---+----+----+
5 rows only showing top
Create Table View
Creating a table view in Spark SQL is required to run SQL queries programmatically on a DataFrame. A view is a temporary table to run SQL queries. A Temporary view provides local scope within the current Spark session. In this example we create a temporary view using the createTempView()
function
"cars") sdf.createTempView(
SQL to Query & Aggregate
Once we have a table view, we can run queries similar to querying a SQL table. We perform similar operations to the ones in the DataFrames notebook. Note the difference here however is that we use the SQL queries directly.
# Showing the top of the table
"SELECT * FROM cars").show(5)
spark.sql(+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
| name| mpg|cyl| disp| hp|drat| wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
| Mazda RX4|21.0| 6|160.0|110| 3.9| 2.62|16.46| 0| 1| 4| 4|
| Mazda RX4 Wag|21.0| 6|160.0|110| 3.9|2.875|17.02| 0| 1| 4| 4|
| Datsun 710|22.8| 4|108.0| 93|3.85| 2.32|18.61| 1| 1| 4| 1|
| Hornet 4 Drive|21.4| 6|258.0|110|3.08|3.215|19.44| 1| 0| 3| 1|
|Hornet Sportabout|18.7| 8|360.0|175|3.15| 3.44|17.02| 0| 0| 3| 2|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
5 rows
only showing top
# Showing a specific column
"SELECT mpg FROM cars").show(5)
spark.sql(
# Basic filtering query to determine cars that have a high mileage and low cylinder count
"SELECT * FROM cars where mpg>20 AND cyl < 6").show(5)
spark.sql(
# Use where method to get list of cars that have miles per gallon is less than 18
'mpg'] < 18).show(3)
sdf.where(sdf[
# Aggregating data and grouping by cylinders
"SELECT count(*), cyl from cars GROUP BY cyl").show()
spark.sql(+--------+---+
|count(1)|cyl|
+--------+---+
| 7| 6|
| 14| 8|
| 11| 4|
+--------+---+
Create UDF 1
In this simple example, we will build a Scalar Pandas UDF to convert the wT column from imperial units (1000-lbs) to metric units (metric tons).
Register UDF 1
# import the Pandas UDF function
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf("float")
def convert_wt(s: pd.Series) -> pd.Series:
# The formula for converting from imperial to metric tons
return s * 0.45
"convert_weight", convert_wt)
spark.udf.register(
# OUTPUT
<function __main__.convert_wt(s: pandas.core.series.Series) -> pandas.core.series.Series>
Apply UDF to TableView
We can now apply the
convert_weight
user-defined-function to ourwt
column from thecars
table view.
- This is done very simply using the SQL query shown below.
- In this example below we show both the original weight (in ton-lbs) and converted weight (in metric tons).
"SELECT *, wt AS weight_imperial, convert_weight(wt) as weight_metric FROM cars").show()
spark.sql(
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------+-------------+
| name| mpg|cyl| disp| hp|drat| wt| qsec| vs| am|gear|carb|weight_imperial|weight_metric|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------+-------------+
| Mazda RX4|21.0| 6|160.0|110| 3.9| 2.62|16.46| 0| 1| 4| 4| 2.62| 1.179|
| Mazda RX4 Wag|21.0| 6|160.0|110| 3.9|2.875|17.02| 0| 1| 4| 4| 2.875| 1.29375|
| Datsun 710|22.8| 4|108.0| 93|3.85| 2.32|18.61| 1| 1| 4| 1| 2.32| 1.044|
| Hornet 4 Drive|21.4| 6|258.0|110|3.08|3.215|19.44| 1| 0| 3| 1| 3.215| 1.44675|
| Hornet Sportabout|18.7| 8|360.0|175|3.15| 3.44|17.02| 0| 0| 3| 2| 3.44| 1.548|
| Valiant|18.1| 6|225.0|105|2.76| 3.46|20.22| 1| 0| 3| 1| 3.46| 1.557|
| Duster 360|14.3| 8|360.0|245|3.21| 3.57|15.84| 0| 0| 3| 4| 3.57| 1.6065|
| Merc 240D|24.4| 4|146.7| 62|3.69| 3.19| 20.0| 1| 0| 4| 2| 3.19| 1.4355|
| Merc 230|22.8| 4|140.8| 95|3.92| 3.15| 22.9| 1| 0| 4| 2| 3.15| 1.4175|
| Merc 280|19.2| 6|167.6|123|3.92| 3.44| 18.3| 1| 0| 4| 4| 3.44| 1.548|
| Merc 280C|17.8| 6|167.6|123|3.92| 3.44| 18.9| 1| 0| 4| 4| 3.44| 1.548|
| Merc 450SE|16.4| 8|275.8|180|3.07| 4.07| 17.4| 0| 0| 3| 3| 4.07| 1.8315|
| Merc 450SL|17.3| 8|275.8|180|3.07| 3.73| 17.6| 0| 0| 3| 3| 3.73| 1.6785|
| Merc 450SLC|15.2| 8|275.8|180|3.07| 3.78| 18.0| 0| 0| 3| 3| 3.78| 1.701|
| Cadillac Fleetwood|10.4| 8|472.0|205|2.93| 5.25|17.98| 0| 0| 3| 4| 5.25| 2.3625|
|Lincoln Continental|10.4| 8|460.0|215| 3.0|5.424|17.82| 0| 0| 3| 4| 5.424| 2.4408|
| Chrysler Imperial|14.7| 8|440.0|230|3.23|5.345|17.42| 0| 0| 3| 4| 5.345| 2.40525|
| Fiat 128|32.4| 4| 78.7| 66|4.08| 2.2|19.47| 1| 1| 4| 1| 2.2| 0.99|
| Honda Civic|30.4| 4| 75.7| 52|4.93|1.615|18.52| 1| 1| 4| 2| 1.615| 0.72675|
| Toyota Corolla|33.9| 4| 71.1| 65|4.22|1.835| 19.9| 1| 1| 4| 1| 1.835| 0.82575|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------+-------------+
20 rows only showing top
Create UDF 2
We will create a pandas UDF to convert the
mpg
column tokmpl
(kilometers per liter). You can use the conversion factor of 0.425.
Register UDF 2
# Code block for learners to answer
from pyspark.sql.functions import pandas_udf
@pandas_udf("float")
def convert_mileage(s: pd.Series) -> pd.Series:
# The formula for converting from imperial to metric tons
return s * 0.425
"convert_mileage", convert_mileage) spark.udf.register(
Apply UDF 2 to TableView
"SELECT *, mpg AS mpg, convert_mileage(mpg) as kmpl FROM cars").show()
spark.sql(+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----+-------+
| name| mpg|cyl| disp| hp|drat| wt| qsec| vs| am|gear|carb| mpg| kmpl|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----+-------+
| Mazda RX4|21.0| 6|160.0|110| 3.9| 2.62|16.46| 0| 1| 4| 4|21.0| 8.925|
| Mazda RX4 Wag|21.0| 6|160.0|110| 3.9|2.875|17.02| 0| 1| 4| 4|21.0| 8.925|
| Datsun 710|22.8| 4|108.0| 93|3.85| 2.32|18.61| 1| 1| 4| 1|22.8| 9.69|
| Hornet 4 Drive|21.4| 6|258.0|110|3.08|3.215|19.44| 1| 0| 3| 1|21.4| 9.095|
| Hornet Sportabout|18.7| 8|360.0|175|3.15| 3.44|17.02| 0| 0| 3| 2|18.7| 7.9475|
| Valiant|18.1| 6|225.0|105|2.76| 3.46|20.22| 1| 0| 3| 1|18.1| 7.6925|
| Duster 360|14.3| 8|360.0|245|3.21| 3.57|15.84| 0| 0| 3| 4|14.3| 6.0775|
| Merc 240D|24.4| 4|146.7| 62|3.69| 3.19| 20.0| 1| 0| 4| 2|24.4| 10.37|
| Merc 230|22.8| 4|140.8| 95|3.92| 3.15| 22.9| 1| 0| 4| 2|22.8| 9.69|
| Merc 280|19.2| 6|167.6|123|3.92| 3.44| 18.3| 1| 0| 4| 4|19.2| 8.16|
| Merc 280C|17.8| 6|167.6|123|3.92| 3.44| 18.9| 1| 0| 4| 4|17.8| 7.565|
| Merc 450SE|16.4| 8|275.8|180|3.07| 4.07| 17.4| 0| 0| 3| 3|16.4| 6.97|
| Merc 450SL|17.3| 8|275.8|180|3.07| 3.73| 17.6| 0| 0| 3| 3|17.3| 7.3525|
| Merc 450SLC|15.2| 8|275.8|180|3.07| 3.78| 18.0| 0| 0| 3| 3|15.2| 6.46|
| Cadillac Fleetwood|10.4| 8|472.0|205|2.93| 5.25|17.98| 0| 0| 3| 4|10.4| 4.42|
|Lincoln Continental|10.4| 8|460.0|215| 3.0|5.424|17.82| 0| 0| 3| 4|10.4| 4.42|
| Chrysler Imperial|14.7| 8|440.0|230|3.23|5.345|17.42| 0| 0| 3| 4|14.7| 6.2475|
| Fiat 128|32.4| 4| 78.7| 66|4.08| 2.2|19.47| 1| 1| 4| 1|32.4| 13.77|
| Honda Civic|30.4| 4| 75.7| 52|4.93|1.615|18.52| 1| 1| 4| 2|30.4| 12.92|
| Toyota Corolla|33.9| 4| 71.1| 65|4.22|1.835| 19.9| 1| 1| 4| 1|33.9|14.4075|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----+-------+
20 rows only showing top