# Installing required packages
pip install pyspark
pip install findspark==0.14.1
pip install pyarrow
pip install pandas==1.19.5
pip install numpy
import findspark
findspark.init()
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
UDF
User Defined Functions
Apache Spark has become the de-facto standard in processing big data. To enable data scientists to leverage the value of big data, Spark added a Python API in version 0.7, with support for user-defined functions (UDF). These user-defined functions operate one-row-at-a-time, and thus suffer from high serialization and invocation overhead. As a result, many data pipelines define UDFs in Java and Scala and then invoke them from Python.
Pandas UDFs built on top of Apache Arrow bring you the _best of both worlds_—the ability to define low-overhead, high-performance UDFs entirely in Python.
In addition, UDFs can be registered and invoked in SQL out of the box by registering a regular python function using the @pandas_udf()
decorator. We can then apply this UDF to our wt
column.
Let’s first start with
Setup
Spark Session
Create Spark Session & Context
# Creating a spark context class
= SparkContext()
sc
# Creating a spark session
= SparkSession \
spark \
.builder "Python Spark DataFrames basic example") \
.appName("spark.some.config.option", "some-value") \
.config( .getOrCreate()
Initialize Spark
spark
# OUTPUT
- in-memory
SparkSession
SparkContext
Spark UI.4.3Masterlocal[*]AppNamepyspark-shell Versionv2
Load Data & Create View
In this section, you will first read the CSV file into a Pandas Dataframe and then read it into a Spark Dataframe Pandas is a library used for data manipulation and analysis.
# Read the file using `read_csv` function in pandas
= pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/mtcars.csv')
mtcars
# preview some records
mtcars.head()
0 mpg cyl disp hp drat wt qsec vs am gear carb
Unnamed: 0 Mazda RX4 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4
1 Mazda RX4 Wag 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4
2 Datsun 710 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1
3 Hornet 4 Drive 21.4 6 258.0 110 3.08 3.215 19.44 1 0 3 1
4 Hornet Sportabout 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2
Rename Column
={'Unnamed: 0':'name'}, inplace=True )
mtcars.rename( columns
mtcars.head()
name mpg cyl disp hp drat wt qsec vs am gear carb0 Mazda RX4 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4
1 Mazda RX4 Wag 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4
2 Datsun 710 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1
3 Hornet 4 Drive 21.4 6 258.0 110 3.08 3.215 19.44 1 0 3 1
4 Hornet Sportabout 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2
Load Data to SparkDF
CreateDataFrame
= spark.createDataFrame(mtcars)
sdf
# preview schema
sdf.printSchema()
root|-- name: string (nullable = true)
|-- mpg: double (nullable = true)
|-- cyl: long (nullable = true)
|-- disp: double (nullable = true)
|-- hp: long (nullable = true)
|-- drat: double (nullable = true)
|-- wt: double (nullable = true)
|-- qsec: double (nullable = true)
|-- vs: long (nullable = true)
|-- am: long (nullable = true)
|-- gear: long (nullable = true)
|-- carb: long (nullable = true)
Rename Column
- We do not modify the df we just create a new df and
- replace vs with versus
= sdf.withColumnRenamed("vs", "versus")
sdf_new 5)
sdf_new.show(+-----------------+----+---+-----+---+----+-----+-----+------+---+----+----+
| name| mpg|cyl| disp| hp|drat| wt| qsec|versus| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+------+---+----+----+
| Mazda RX4|21.0| 6|160.0|110| 3.9| 2.62|16.46| 0| 1| 4| 4|
| Mazda RX4 Wag|21.0| 6|160.0|110| 3.9|2.875|17.02| 0| 1| 4| 4|
| Datsun 710|22.8| 4|108.0| 93|3.85| 2.32|18.61| 1| 1| 4| 1|
| Hornet 4 Drive|21.4| 6|258.0|110|3.08|3.215|19.44| 1| 0| 3| 1|
|Hornet Sportabout|18.7| 8|360.0|175|3.15| 3.44|17.02| 0| 0| 3| 2|
+-----------------+----+---+-----+---+----+-----+-----+------+---+----+----+
5 rows only showing top
Create Table View
Creating a table view in Spark SQL is required to run SQL queries programmatically on a DataFrame. A view is a temporary table to run SQL queries. A Temporary view provides local scope within the current Spark session. In this example we create a temporary view using the createTempView()
function
"cars") sdf.createTempView(
SQL to Query & Aggregate
Once we have a table view, we can run queries similar to querying a SQL table. We perform similar operations to the ones in the DataFrames notebook. Note the difference here however is that we use the SQL queries directly.
# Showing the top of the table
"SELECT * FROM cars").show(5)
spark.sql(+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
| name| mpg|cyl| disp| hp|drat| wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
| Mazda RX4|21.0| 6|160.0|110| 3.9| 2.62|16.46| 0| 1| 4| 4|
| Mazda RX4 Wag|21.0| 6|160.0|110| 3.9|2.875|17.02| 0| 1| 4| 4|
| Datsun 710|22.8| 4|108.0| 93|3.85| 2.32|18.61| 1| 1| 4| 1|
| Hornet 4 Drive|21.4| 6|258.0|110|3.08|3.215|19.44| 1| 0| 3| 1|
|Hornet Sportabout|18.7| 8|360.0|175|3.15| 3.44|17.02| 0| 0| 3| 2|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
5 rows
only showing top
# Showing a specific column
"SELECT mpg FROM cars").show(5)
spark.sql(
# Basic filtering query to determine cars that have a high mileage and low cylinder count
"SELECT * FROM cars where mpg>20 AND cyl < 6").show(5)
spark.sql(
# Use where method to get list of cars that have miles per gallon is less than 18
'mpg'] < 18).show(3)
sdf.where(sdf[
# Aggregating data and grouping by cylinders
"SELECT count(*), cyl from cars GROUP BY cyl").show()
spark.sql(+--------+---+
|count(1)|cyl|
+--------+---+
| 7| 6|
| 14| 8|
| 11| 4|
+--------+---+
Create UDF 1
In this simple example, we will build a Scalar Pandas UDF to convert the wT column from imperial units (1000-lbs) to metric units (metric tons).
Register UDF 1
# import the Pandas UDF function
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf("float")
def convert_wt(s: pd.Series) -> pd.Series:
# The formula for converting from imperial to metric tons
return s * 0.45
"convert_weight", convert_wt)
spark.udf.register(
# OUTPUT
<function __main__.convert_wt(s: pandas.core.series.Series) -> pandas.core.series.Series>
Apply UDF to TableView
We can now apply the
convert_weight
user-defined-function to ourwt
column from thecars
table view.
- This is done very simply using the SQL query shown below.
- In this example below we show both the original weight (in ton-lbs) and converted weight (in metric tons).
"SELECT *, wt AS weight_imperial, convert_weight(wt) as weight_metric FROM cars").show()
spark.sql(
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------+-------------+
| name| mpg|cyl| disp| hp|drat| wt| qsec| vs| am|gear|carb|weight_imperial|weight_metric|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------+-------------+
| Mazda RX4|21.0| 6|160.0|110| 3.9| 2.62|16.46| 0| 1| 4| 4| 2.62| 1.179|
| Mazda RX4 Wag|21.0| 6|160.0|110| 3.9|2.875|17.02| 0| 1| 4| 4| 2.875| 1.29375|
| Datsun 710|22.8| 4|108.0| 93|3.85| 2.32|18.61| 1| 1| 4| 1| 2.32| 1.044|
| Hornet 4 Drive|21.4| 6|258.0|110|3.08|3.215|19.44| 1| 0| 3| 1| 3.215| 1.44675|
| Hornet Sportabout|18.7| 8|360.0|175|3.15| 3.44|17.02| 0| 0| 3| 2| 3.44| 1.548|
| Valiant|18.1| 6|225.0|105|2.76| 3.46|20.22| 1| 0| 3| 1| 3.46| 1.557|
| Duster 360|14.3| 8|360.0|245|3.21| 3.57|15.84| 0| 0| 3| 4| 3.57| 1.6065|
| Merc 240D|24.4| 4|146.7| 62|3.69| 3.19| 20.0| 1| 0| 4| 2| 3.19| 1.4355|
| Merc 230|22.8| 4|140.8| 95|3.92| 3.15| 22.9| 1| 0| 4| 2| 3.15| 1.4175|
| Merc 280|19.2| 6|167.6|123|3.92| 3.44| 18.3| 1| 0| 4| 4| 3.44| 1.548|
| Merc 280C|17.8| 6|167.6|123|3.92| 3.44| 18.9| 1| 0| 4| 4| 3.44| 1.548|
| Merc 450SE|16.4| 8|275.8|180|3.07| 4.07| 17.4| 0| 0| 3| 3| 4.07| 1.8315|
| Merc 450SL|17.3| 8|275.8|180|3.07| 3.73| 17.6| 0| 0| 3| 3| 3.73| 1.6785|
| Merc 450SLC|15.2| 8|275.8|180|3.07| 3.78| 18.0| 0| 0| 3| 3| 3.78| 1.701|
| Cadillac Fleetwood|10.4| 8|472.0|205|2.93| 5.25|17.98| 0| 0| 3| 4| 5.25| 2.3625|
|Lincoln Continental|10.4| 8|460.0|215| 3.0|5.424|17.82| 0| 0| 3| 4| 5.424| 2.4408|
| Chrysler Imperial|14.7| 8|440.0|230|3.23|5.345|17.42| 0| 0| 3| 4| 5.345| 2.40525|
| Fiat 128|32.4| 4| 78.7| 66|4.08| 2.2|19.47| 1| 1| 4| 1| 2.2| 0.99|
| Honda Civic|30.4| 4| 75.7| 52|4.93|1.615|18.52| 1| 1| 4| 2| 1.615| 0.72675|
| Toyota Corolla|33.9| 4| 71.1| 65|4.22|1.835| 19.9| 1| 1| 4| 1| 1.835| 0.82575|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------+-------------+
20 rows only showing top
Create UDF 2
We will create a pandas UDF to convert the
mpg
column tokmpl
(kilometers per liter). You can use the conversion factor of 0.425.
Register UDF 2
# Code block for learners to answer
from pyspark.sql.functions import pandas_udf
@pandas_udf("float")
def convert_mileage(s: pd.Series) -> pd.Series:
# The formula for converting from imperial to metric tons
return s * 0.425
"convert_mileage", convert_mileage) spark.udf.register(
Apply UDF 2 to TableView
"SELECT *, mpg AS mpg, convert_mileage(mpg) as kmpl FROM cars").show()
spark.sql(+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----+-------+
| name| mpg|cyl| disp| hp|drat| wt| qsec| vs| am|gear|carb| mpg| kmpl|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----+-------+
| Mazda RX4|21.0| 6|160.0|110| 3.9| 2.62|16.46| 0| 1| 4| 4|21.0| 8.925|
| Mazda RX4 Wag|21.0| 6|160.0|110| 3.9|2.875|17.02| 0| 1| 4| 4|21.0| 8.925|
| Datsun 710|22.8| 4|108.0| 93|3.85| 2.32|18.61| 1| 1| 4| 1|22.8| 9.69|
| Hornet 4 Drive|21.4| 6|258.0|110|3.08|3.215|19.44| 1| 0| 3| 1|21.4| 9.095|
| Hornet Sportabout|18.7| 8|360.0|175|3.15| 3.44|17.02| 0| 0| 3| 2|18.7| 7.9475|
| Valiant|18.1| 6|225.0|105|2.76| 3.46|20.22| 1| 0| 3| 1|18.1| 7.6925|
| Duster 360|14.3| 8|360.0|245|3.21| 3.57|15.84| 0| 0| 3| 4|14.3| 6.0775|
| Merc 240D|24.4| 4|146.7| 62|3.69| 3.19| 20.0| 1| 0| 4| 2|24.4| 10.37|
| Merc 230|22.8| 4|140.8| 95|3.92| 3.15| 22.9| 1| 0| 4| 2|22.8| 9.69|
| Merc 280|19.2| 6|167.6|123|3.92| 3.44| 18.3| 1| 0| 4| 4|19.2| 8.16|
| Merc 280C|17.8| 6|167.6|123|3.92| 3.44| 18.9| 1| 0| 4| 4|17.8| 7.565|
| Merc 450SE|16.4| 8|275.8|180|3.07| 4.07| 17.4| 0| 0| 3| 3|16.4| 6.97|
| Merc 450SL|17.3| 8|275.8|180|3.07| 3.73| 17.6| 0| 0| 3| 3|17.3| 7.3525|
| Merc 450SLC|15.2| 8|275.8|180|3.07| 3.78| 18.0| 0| 0| 3| 3|15.2| 6.46|
| Cadillac Fleetwood|10.4| 8|472.0|205|2.93| 5.25|17.98| 0| 0| 3| 4|10.4| 4.42|
|Lincoln Continental|10.4| 8|460.0|215| 3.0|5.424|17.82| 0| 0| 3| 4|10.4| 4.42|
| Chrysler Imperial|14.7| 8|440.0|230|3.23|5.345|17.42| 0| 0| 3| 4|14.7| 6.2475|
| Fiat 128|32.4| 4| 78.7| 66|4.08| 2.2|19.47| 1| 1| 4| 1|32.4| 13.77|
| Honda Civic|30.4| 4| 75.7| 52|4.93|1.615|18.52| 1| 1| 4| 2|30.4| 12.92|
| Toyota Corolla|33.9| 4| 71.1| 65|4.22|1.835| 19.9| 1| 1| 4| 1|33.9|14.4075|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----+-------+
20 rows only showing top