UDF

User Defined Functions


Apache Spark has become the de-facto standard in processing big data. To enable data scientists to leverage the value of big data, Spark added a Python API in version 0.7, with support for user-defined functions (UDF). These user-defined functions operate one-row-at-a-time, and thus suffer from high serialization and invocation overhead. As a result, many data pipelines define UDFs in Java and Scala and then invoke them from Python.

Pandas UDFs built on top of Apache Arrow bring you the _best of both worlds_—the ability to define low-overhead, high-performance UDFs entirely in Python.

In addition, UDFs can be registered and invoked in SQL out of the box by registering a regular python function using the @pandas_udf() decorator. We can then apply this UDF to our wt column.

Let’s first start with

Setup


# Installing required packages
pip install pyspark
pip install findspark
pip install pyarrow==0.14.1 
pip install pandas
pip install numpy==1.19.5

import findspark
findspark.init()

import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

Spark Session


Create Spark Session & Context

# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

Initialize Spark

spark

# OUTPUT
SparkSession - in-memory
SparkContext
Spark UI
Versionv2.4.3Masterlocal[*]AppNamepyspark-shell

Load Data & Create View


In this section, you will first read the CSV file into a Pandas Dataframe and then read it into a Spark Dataframe Pandas is a library used for data manipulation and analysis.

# Read the file using `read_csv` function in pandas
mtcars = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/mtcars.csv')

# preview some records
mtcars.head()

      Unnamed: 0    mpg cyl disp    hp  drat    wt  qsec    vs  am  gear    carb
0   Mazda RX4   21.0    6   160.0   110 3.90    2.620   16.46   0   1   4   4
1   Mazda RX4 Wag   21.0    6   160.0   110 3.90    2.875   17.02   0   1   4   4
2   Datsun 710  22.8    4   108.0   93  3.85    2.320   18.61   1   1   4   1
3   Hornet 4 Drive  21.4    6   258.0   110 3.08    3.215   19.44   1   0   3   1
4   Hornet Sportabout   18.7    8   360.0   175 3.15    3.440   17.02   0   0   3   2

Rename Column

mtcars.rename( columns={'Unnamed: 0':'name'}, inplace=True )
mtcars.head()


        name        mpg cyl disp    hp  drat    wt  qsec    vs  am  gear    carb
0   Mazda RX4   21.0    6   160.0   110 3.90    2.620   16.46   0   1   4   4
1   Mazda RX4 Wag   21.0    6   160.0   110 3.90    2.875   17.02   0   1   4   4
2   Datsun 710  22.8    4   108.0   93  3.85    2.320   18.61   1   1   4   1
3   Hornet 4 Drive  21.4    6   258.0   110 3.08    3.215   19.44   1   0   3   1
4   Hornet Sportabout   18.7    8   360.0   175 3.15    3.440   17.02   0   0   3   2

Load Data to SparkDF

CreateDataFrame

sdf = spark.createDataFrame(mtcars)

# preview schema
sdf.printSchema()

root
 |-- name: string (nullable = true)
 |-- mpg: double (nullable = true)
 |-- cyl: long (nullable = true)
 |-- disp: double (nullable = true)
 |-- hp: long (nullable = true)
 |-- drat: double (nullable = true)
 |-- wt: double (nullable = true)
 |-- qsec: double (nullable = true)
 |-- vs: long (nullable = true)
 |-- am: long (nullable = true)
 |-- gear: long (nullable = true)
 |-- carb: long (nullable = true)

Rename Column

  • We do not modify the df we just create a new df and
  • replace vs with versus
sdf_new = sdf.withColumnRenamed("vs", "versus")
sdf_new.show(5)
+-----------------+----+---+-----+---+----+-----+-----+------+---+----+----+
|             name| mpg|cyl| disp| hp|drat|   wt| qsec|versus| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+------+---+----+----+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|     0|  1|   4|   4|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|     0|  1|   4|   4|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|     1|  1|   4|   1|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|     1|  0|   3|   1|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|     0|  0|   3|   2|
+-----------------+----+---+-----+---+----+-----+-----+------+---+----+----+
only showing top 5 rows

Create Table View

Creating a table view in Spark SQL is required to run SQL queries programmatically on a DataFrame. A view is a temporary table to run SQL queries. A Temporary view provides local scope within the current Spark session. In this example we create a temporary view using the createTempView() function

sdf.createTempView("cars")

SQL to Query & Aggregate


Once we have a table view, we can run queries similar to querying a SQL table. We perform similar operations to the ones in the DataFrames notebook. Note the difference here however is that we use the SQL queries directly.

# Showing the top of the table
spark.sql("SELECT * FROM cars").show(5)
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|             name| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows

# Showing a specific column
spark.sql("SELECT mpg FROM cars").show(5)

# Basic filtering query to determine cars that have a high mileage and low cylinder count
spark.sql("SELECT * FROM cars where mpg>20 AND cyl < 6").show(5)

# Use where method to get list of cars that have miles per gallon is less than 18
sdf.where(sdf['mpg'] < 18).show(3) 

# Aggregating data and grouping by cylinders
spark.sql("SELECT count(*), cyl from cars GROUP BY cyl").show()
+--------+---+
|count(1)|cyl|
+--------+---+
|       7|  6|
|      14|  8|
|      11|  4|
+--------+---+

Create UDF 1


In this simple example, we will build a Scalar Pandas UDF to convert the wT column from imperial units (1000-lbs) to metric units (metric tons).

Register UDF 1

# import the Pandas UDF function 
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf("float")
def convert_wt(s: pd.Series) -> pd.Series:
    # The formula for converting from imperial to metric tons
    return s * 0.45

spark.udf.register("convert_weight", convert_wt)

# OUTPUT
<function __main__.convert_wt(s: pandas.core.series.Series) -> pandas.core.series.Series>

Apply UDF to TableView

We can now apply the convert_weight user-defined-function to our wt column from the cars table view.

  • This is done very simply using the SQL query shown below.
  • In this example below we show both the original weight (in ton-lbs) and converted weight (in metric tons).
spark.sql("SELECT *, wt AS weight_imperial, convert_weight(wt) as weight_metric FROM cars").show()

+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------+-------------+
|               name| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|weight_imperial|weight_metric|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------+-------------+
|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|           2.62|        1.179|
|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|          2.875|      1.29375|
|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|           2.32|        1.044|
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|          3.215|      1.44675|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|           3.44|        1.548|
|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|           3.46|        1.557|
|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|           3.57|       1.6065|
|          Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|           3.19|       1.4355|
|           Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|           3.15|       1.4175|
|           Merc 280|19.2|  6|167.6|123|3.92| 3.44| 18.3|  1|  0|   4|   4|           3.44|        1.548|
|          Merc 280C|17.8|  6|167.6|123|3.92| 3.44| 18.9|  1|  0|   4|   4|           3.44|        1.548|
|         Merc 450SE|16.4|  8|275.8|180|3.07| 4.07| 17.4|  0|  0|   3|   3|           4.07|       1.8315|
|         Merc 450SL|17.3|  8|275.8|180|3.07| 3.73| 17.6|  0|  0|   3|   3|           3.73|       1.6785|
|        Merc 450SLC|15.2|  8|275.8|180|3.07| 3.78| 18.0|  0|  0|   3|   3|           3.78|        1.701|
| Cadillac Fleetwood|10.4|  8|472.0|205|2.93| 5.25|17.98|  0|  0|   3|   4|           5.25|       2.3625|
|Lincoln Continental|10.4|  8|460.0|215| 3.0|5.424|17.82|  0|  0|   3|   4|          5.424|       2.4408|
|  Chrysler Imperial|14.7|  8|440.0|230|3.23|5.345|17.42|  0|  0|   3|   4|          5.345|      2.40525|
|           Fiat 128|32.4|  4| 78.7| 66|4.08|  2.2|19.47|  1|  1|   4|   1|            2.2|         0.99|
|        Honda Civic|30.4|  4| 75.7| 52|4.93|1.615|18.52|  1|  1|   4|   2|          1.615|      0.72675|
|     Toyota Corolla|33.9|  4| 71.1| 65|4.22|1.835| 19.9|  1|  1|   4|   1|          1.835|      0.82575|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------+-------------+
only showing top 20 rows

Create UDF 2


We will create a pandas UDF to convert the mpg column to kmpl (kilometers per liter). You can use the conversion factor of 0.425.

Register UDF 2

# Code block for learners to answer
from pyspark.sql.functions import pandas_udf

@pandas_udf("float")
def convert_mileage(s: pd.Series) -> pd.Series:
    # The formula for converting from imperial to metric tons
    return s * 0.425

spark.udf.register("convert_mileage", convert_mileage)

Apply UDF 2 to TableView

spark.sql("SELECT *, mpg AS mpg, convert_mileage(mpg) as kmpl FROM cars").show()
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----+-------+
|               name| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb| mpg|   kmpl|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----+-------+
|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|21.0|  8.925|
|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|21.0|  8.925|
|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|22.8|   9.69|
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|21.4|  9.095|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|18.7| 7.9475|
|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|18.1| 7.6925|
|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|14.3| 6.0775|
|          Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|24.4|  10.37|
|           Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|22.8|   9.69|
|           Merc 280|19.2|  6|167.6|123|3.92| 3.44| 18.3|  1|  0|   4|   4|19.2|   8.16|
|          Merc 280C|17.8|  6|167.6|123|3.92| 3.44| 18.9|  1|  0|   4|   4|17.8|  7.565|
|         Merc 450SE|16.4|  8|275.8|180|3.07| 4.07| 17.4|  0|  0|   3|   3|16.4|   6.97|
|         Merc 450SL|17.3|  8|275.8|180|3.07| 3.73| 17.6|  0|  0|   3|   3|17.3| 7.3525|
|        Merc 450SLC|15.2|  8|275.8|180|3.07| 3.78| 18.0|  0|  0|   3|   3|15.2|   6.46|
| Cadillac Fleetwood|10.4|  8|472.0|205|2.93| 5.25|17.98|  0|  0|   3|   4|10.4|   4.42|
|Lincoln Continental|10.4|  8|460.0|215| 3.0|5.424|17.82|  0|  0|   3|   4|10.4|   4.42|
|  Chrysler Imperial|14.7|  8|440.0|230|3.23|5.345|17.42|  0|  0|   3|   4|14.7| 6.2475|
|           Fiat 128|32.4|  4| 78.7| 66|4.08|  2.2|19.47|  1|  1|   4|   1|32.4|  13.77|
|        Honda Civic|30.4|  4| 75.7| 52|4.93|1.615|18.52|  1|  1|   4|   2|30.4|  12.92|
|     Toyota Corolla|33.9|  4| 71.1| 65|4.22|1.835| 19.9|  1|  1|   4|   1|33.9|14.4075|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----+-------+
only showing top 20 rows