Data Integration & Transformation - 2

Objectives


Create a DataFrame by loading data from a CSV file and apply transformations and actions using Spark SQL. This needs to be achieved by performing the following tasks:

  • Task 1: Generate DataFrame from CSV data.
  • Task 2: Define a schema for the data.
  • Task 3: Display schema of DataFrame.
  • Task 4: Create a temporary view.
  • Task 5: Execute an SQL query.
  • Task 6: Calculate Average Salary by Department.
  • Task 7: Filter and Display IT Department Employees.
  • Task 8: Add 10% Bonus to Salaries.
  • Task 9: Find Maximum Salary by Age.
  • Task 10: Self-Join on Employee Data.
  • Task 11: Calculate Average Employee Age.
  • Task 12: Calculate Total Salary by Department.
  • Task 13: Sort Data by Age and Salary.
  • Task 14: Count Employees in Each Department.
  • Task 15: Filter Employees with the letter o in the Name.

Setup


Imports

# Installing required packages
!pip install pyspark  findspark wget

import findspark
findspark.init()

# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the SparkContext.   

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

Create SparkContext Object

# Creating a SparkContext object  
sc = SparkContext.getOrCreate()

Create SparkSession

# Creating a SparkSession  
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

Download Data

# Download the CSV data first into a local `employees.csv` file
import wget
wget.download("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/data/employees.csv")

# RESPONSE
100% [................................................................................] 1321 / 1321
'employees.csv'

Task 1: Generate DataFrame from CSV data.

  • Read data from the provided CSV file, employees.csv and import it into a Spark DataFrame variable named employees_df.
  • After viewing the data it does appear to contain headers
employees_df = spark.read.csv("employees.csv", header = True)

Preview Data

employees_df.show(5)

+------+--------+------+---+----------+
|Emp_No|Emp_Name|Salary|Age|Department|
+------+--------+------+---+----------+
|   198|  Donald|  2600| 29|        IT|
|   199| Douglas|  2600| 34|     Sales|
|   200|Jennifer|  4400| 36| Marketing|
|   201| Michael| 13000| 32|        IT|
|   202|     Pat|  6000| 39|        HR|
+------+--------+------+---+----------+
only showing top 5 rows

Task 2: Define a schema for the data

  • Construct a schema for the input data and then utilize the defined schema to read the CSV file to create a DataFrame named employees_df
from pyspark.sql.types import StructType, IntegerType, LongType, StringType, StructField
schema = StructType([
        StructField("Emp_No", LongType(), False),
        StructField("Emp_Name", StringType(), False),
        StructField("Salary", IntegerType(), True),
        StructField("Age", IntegerType(), False),
        StructField("Department", StringType(), False)
])

employees_df = (spark.read.format("csv").schema(schema).option("header", "true").load("employees.csv"))
employees_df.show(5)
+------+--------+------+---+----------+
|Emp_No|Emp_Name|Salary|Age|Department|
+------+--------+------+---+----------+
|   198|  Donald|  2600| 29|        IT|
|   199| Douglas|  2600| 34|     Sales|
|   200|Jennifer|  4400| 36| Marketing|
|   201| Michael| 13000| 32|        IT|
|   202|     Pat|  6000| 39|        HR|
+------+--------+------+---+----------+
only showing top 5 rows

Task 3: Display schema of DataFrame

  • Display the schema of the employees_df DataFrame, showing all columns and their respective data types. 
employees_df.printSchema()
root
 |-- Emp_No: long (nullable = true)
 |-- Emp_Name: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Department: string (nullable = true)

Task 4: Create a temporary view

  • Create a temporary view named employees for the employees_df DataFrame, enabling Spark SQL queries on the data.
spark.sql("DROP VIEW IF EXISTS employees")
employees_df.createTempView("employees")

Task 5: Execute an SQL query

  • Compose and execute an SQL query to fetch the records from the employees view where the age of employees exceeds 30.
  • Then, display the result of the SQL query, showcasing the filtered records.
result = spark.sql("SELECT * FROM employees WHERE age > 30")
result.show()

+------+-----------+------+---+----------+
|Emp_No|   Emp_Name|Salary|Age|Department|
+------+-----------+------+---+----------+
|   199|    Douglas|  2600| 34|     Sales|
|   200|   Jennifer|  4400| 36| Marketing|
|   201|    Michael| 13000| 32|        IT|
|   202|        Pat|  6000| 39|        HR|
|   203|      Susan|  6500| 36| Marketing|
|   205|    Shelley| 12008| 33|   Finance|
|   206|    William|  8300| 37|        IT|
|   100|     Steven| 24000| 39|        IT|
|   102|        Lex| 17000| 37| Marketing|
|   103|  Alexander|  9000| 39| Marketing|
|   104|      Bruce|  6000| 38|        IT|
|   105|      David|  4800| 39|        IT|
|   106|      Valli|  4800| 38|     Sales|
|   107|      Diana|  4200| 35|     Sales|
|   109|     Daniel|  9000| 35|        HR|
|   110|       John|  8200| 31| Marketing|
|   111|     Ismael|  7700| 32|        IT|
|   112|Jose Manuel|  7800| 34|        HR|
|   113|       Luis|  6900| 34|     Sales|
|   116|     Shelli|  2900| 37|   Finance|
+------+-----------+------+---+----------+
only showing top 20 rows

Task 6: Calculate Average Salary by Department

  • Compose an SQL query to retrieve the average salary of employees grouped by department.
  • Display the result.
employees_df.groupby(['Department']).agg({'Salary':'AVG'}).show()

----------+-----------------+
|Department|      avg(Salary)|
+----------+-----------------+
|     Sales|5492.923076923077|
|        HR|           5837.5|
|   Finance|           5730.8|
| Marketing|6633.333333333333|
|        IT|           7400.0|
+----------+-----------------+

Task 7: Filter and Display IT Department Employees

  • Apply a filter on the employees_df DataFrame to select records where the department is 'IT'.
  • Display the filtered DataFrame.
employees_df.where(employees_df['Department'] == "IT").show()

+------+--------+------+---+----------+
|Emp_No|Emp_Name|Salary|Age|Department|
+------+--------+------+---+----------+
|   198|  Donald|  2600| 29|        IT|
|   201| Michael| 13000| 32|        IT|
|   206| William|  8300| 37|        IT|
|   100|  Steven| 24000| 39|        IT|
|   104|   Bruce|  6000| 38|        IT|
|   105|   David|  4800| 39|        IT|
|   111|  Ismael|  7700| 32|        IT|
|   129|   Laura|  3300| 38|        IT|
|   132|      TJ|  2100| 34|        IT|
|   136|   Hazel|  2200| 29|        IT|
+------+--------+------+---+----------+

Task 8: Add 10% Bonus to Salaries

  • Perform a transformation to add a new column named “SalaryAfterBonus” to the DataFrame.
  • Calculate the new salary by adding a 10% bonus to each employee’s salary.
from pyspark.sql.functions import col
# Add a new column "SalaryAfterBonus" with 10% bonus added to the original salary
employees_df.withColumn('SalaryAfterBonus', col('Salary') * 1.1).show()

+------+---------+------+---+----------+------------------+
|Emp_No| Emp_Name|Salary|Age|Department|  SalaryAfterBonus|
+------+---------+------+---+----------+------------------+
|   198|   Donald|  2600| 29|        IT|2860.0000000000005|
|   199|  Douglas|  2600| 34|     Sales|2860.0000000000005|
|   200| Jennifer|  4400| 36| Marketing|            4840.0|
|   201|  Michael| 13000| 32|        IT|14300.000000000002|
|   202|      Pat|  6000| 39|        HR| 6600.000000000001|
|   203|    Susan|  6500| 36| Marketing| 7150.000000000001|
|   204|  Hermann| 10000| 29|   Finance|           11000.0|
|   205|  Shelley| 12008| 33|   Finance|13208.800000000001|
|   206|  William|  8300| 37|        IT|            9130.0|
|   100|   Steven| 24000| 39|        IT|26400.000000000004|
|   101|    Neena| 17000| 27|     Sales|           18700.0|
|   102|      Lex| 17000| 37| Marketing|           18700.0|
|   103|Alexander|  9000| 39| Marketing|            9900.0|
|   104|    Bruce|  6000| 38|        IT| 6600.000000000001|
|   105|    David|  4800| 39|        IT|            5280.0|
|   106|    Valli|  4800| 38|     Sales|            5280.0|
|   107|    Diana|  4200| 35|     Sales|            4620.0|
|   108|    Nancy| 12008| 28|     Sales|13208.800000000001|
|   109|   Daniel|  9000| 35|        HR|            9900.0|
|   110|     John|  8200| 31| Marketing|            9020.0|
+------+---------+------+---+----------+------------------+
only showing top 20 rows

Task 9: Find Maximum Salary by Age

  • Group the data by age and calculate the maximum salary for each age group.
  • Display the result.
from pyspark.sql.functions import max
# Group data by age and calculate the maximum salary for each age group
employees_df.groupBy(['Age']).agg({'Salary':'max'}).show()

+---+-----------+
|Age|max(Salary)|
+---+-----------+
| 31|       8200|
| 34|       7800|
| 28|      12008|
| 27|      17000|
| 26|       3600|
| 37|      17000|
| 35|       9000|
| 39|      24000|
| 38|       6000|
| 29|      10000|
| 32|      13000|
| 33|      12008|
| 30|       8000|
| 36|       7900|
+---+-----------+

# Let's sort it by age group
employees_df.groupBy(['Age']).agg({'Salary':'max'}).sort('Age').show()

+---+-----------+
|Age|max(Salary)|
+---+-----------+
| 26|       3600|
| 27|      17000|
| 28|      12008|
| 29|      10000|
| 30|       8000|
| 31|       8200|
| 32|      13000|
| 33|      12008|
| 34|       7800|
| 35|       9000|
| 36|       7900|
| 37|      17000|
| 38|       6000|
| 39|      24000|
+---+-----------+

Task 10: Self-Join on Employee Data

  • Join the “employees_df” DataFrame with itself based on the “Emp_No” column.
  • Display the result.
# Join the DataFrame with itself based on the "Emp_No" column
joined_df = employees_df.join(employees_df, on = 'Emp_No', how = 'inner')
joined_df.show()

+------+---------+------+---+----------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+---------+------+---+----------+
|   198|   Donald|  2600| 29|        IT|   Donald|  2600| 29|        IT|
|   199|  Douglas|  2600| 34|     Sales|  Douglas|  2600| 34|     Sales|
|   200| Jennifer|  4400| 36| Marketing| Jennifer|  4400| 36| Marketing|
|   201|  Michael| 13000| 32|        IT|  Michael| 13000| 32|        IT|
|   202|      Pat|  6000| 39|        HR|      Pat|  6000| 39|        HR|
|   203|    Susan|  6500| 36| Marketing|    Susan|  6500| 36| Marketing|
|   204|  Hermann| 10000| 29|   Finance|  Hermann| 10000| 29|   Finance|
|   205|  Shelley| 12008| 33|   Finance|  Shelley| 12008| 33|   Finance|
|   206|  William|  8300| 37|        IT|  William|  8300| 37|        IT|
|   100|   Steven| 24000| 39|        IT|   Steven| 24000| 39|        IT|
|   101|    Neena| 17000| 27|     Sales|    Neena| 17000| 27|     Sales|
|   102|      Lex| 17000| 37| Marketing|      Lex| 17000| 37| Marketing|
|   103|Alexander|  9000| 39| Marketing|Alexander|  9000| 39| Marketing|
|   104|    Bruce|  6000| 38|        IT|    Bruce|  6000| 38|        IT|
|   105|    David|  4800| 39|        IT|    David|  4800| 39|        IT|
|   106|    Valli|  4800| 38|     Sales|    Valli|  4800| 38|     Sales|
|   107|    Diana|  4200| 35|     Sales|    Diana|  4200| 35|     Sales|
|   108|    Nancy| 12008| 28|     Sales|    Nancy| 12008| 28|     Sales|
|   109|   Daniel|  9000| 35|        HR|   Daniel|  9000| 35|        HR|
|   110|     John|  8200| 31| Marketing|     John|  8200| 31| Marketing|
+------+---------+------+---+----------+---------+------+---+----------+
only showing top 20 rows

Task 11: Calculate Average Employee Age

  • Calculate the average age of employees using the built-in aggregation function.
  • Display the result
# Calculate the average age of employees
from pyspark.sql.functions import avg
spark.sql("DROP VIEW IF EXISTS T_view")
employees_df.createTempView("T_view")
spark.sql("SELECT * FROM T_view").show()
+------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+
|   198|   Donald|  2600| 29|        IT|
|   199|  Douglas|  2600| 34|     Sales|
|   200| Jennifer|  4400| 36| Marketing|
|   201|  Michael| 13000| 32|        IT|
|   202|      Pat|  6000| 39|        HR|
|   203|    Susan|  6500| 36| Marketing|
|   204|  Hermann| 10000| 29|   Finance|
|   205|  Shelley| 12008| 33|   Finance|
|   206|  William|  8300| 37|        IT|
|   100|   Steven| 24000| 39|        IT|
|   101|    Neena| 17000| 27|     Sales|
|   102|      Lex| 17000| 37| Marketing|
|   103|Alexander|  9000| 39| Marketing|
|   104|    Bruce|  6000| 38|        IT|
|   105|    David|  4800| 39|        IT|
|   106|    Valli|  4800| 38|     Sales|
|   107|    Diana|  4200| 35|     Sales|
|   108|    Nancy| 12008| 28|     Sales|
|   109|   Daniel|  9000| 35|        HR|
|   110|     John|  8200| 31| Marketing|
+------+---------+------+---+----------+
only showing top 20 rows

# Average age of all employees

spark.sql("SELECT avg(Age) FROM T_view").show()
+--------+
|avg(Age)|
+--------+
|   33.56|
+--------+

Task 12: Calculate Total Salary by Department

  • Calculate the total salary for each department using the built-in aggregation function.
  • Display the result.
# Calculate the total salary for each department. Hint - User GroupBy and Aggregate functions
from pyspark.sql.functions import sum 
employees_df.groupby(['Department']).agg({'Salary':'sum'}).show()
+----------+-----------+
|Department|sum(Salary)|
+----------+-----------+
|     Sales|      71408|
|        HR|      46700|
|   Finance|      57308|
| Marketing|      59700|
|        IT|      74000|
+----------+-----------+

Task 13: Sort Data by Age and Salary

  • Apply a transformation to sort the DataFrame by age in ascending order and
  • then by salary in descending order.
  • Display the sorted DataFrame.
# Sort the DataFrame by age in ascending order and then by salary in descending order
employees_df.sort('Salary',ascending=False).sort('Age').show()
+------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+
|   137|   Renske|  3600| 26| Marketing|
|   101|    Neena| 17000| 27|     Sales|
|   114|      Den| 11000| 27|   Finance|
|   108|    Nancy| 12008| 28|     Sales|
|   126|    Irene|  2700| 28|        HR|
|   130|    Mozhe|  2800| 28| Marketing|
|   198|   Donald|  2600| 29|        IT|
|   204|  Hermann| 10000| 29|   Finance|
|   115|Alexander|  3100| 29|   Finance|
|   134|  Michael|  2900| 29|     Sales|
|   136|    Hazel|  2200| 29|        IT|
|   140|   Joshua|  2500| 29|   Finance|
|   120|  Matthew|  8000| 30|        HR|
|   110|     John|  8200| 31| Marketing|
|   127|    James|  2400| 31|        HR|
|   201|  Michael| 13000| 32|        IT|
|   111|   Ismael|  7700| 32|        IT|
|   119|    Karen|  2500| 32|   Finance|
|   205|  Shelley| 12008| 33|   Finance|
|   117|    Sigal|  2800| 33|     Sales|
+------+---------+------+---+----------+
only showing top 20 rows
  • Both in ascending order
employees_df.sort('Age','Salary').show()

+------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+
|   137|   Renske|  3600| 26| Marketing|
|   114|      Den| 11000| 27|   Finance|
|   101|    Neena| 17000| 27|     Sales|
|   126|    Irene|  2700| 28|        HR|
|   130|    Mozhe|  2800| 28| Marketing|
|   108|    Nancy| 12008| 28|     Sales|
|   136|    Hazel|  2200| 29|        IT|
|   140|   Joshua|  2500| 29|   Finance|
|   198|   Donald|  2600| 29|        IT|
|   134|  Michael|  2900| 29|     Sales|
|   115|Alexander|  3100| 29|   Finance|
|   204|  Hermann| 10000| 29|   Finance|
|   120|  Matthew|  8000| 30|        HR|
|   127|    James|  2400| 31|        HR|
|   110|     John|  8200| 31| Marketing|
|   119|    Karen|  2500| 32|   Finance|
|   111|   Ismael|  7700| 32|        IT|
|   201|  Michael| 13000| 32|        IT|
|   128|   Steven|  2200| 33|   Finance|
|   117|    Sigal|  2800| 33|     Sales|
+------+---------+------+---+----------+
only showing top 20 rows

Task 14: Count Employees in Each Department

  • Calculate the number of employees in each department.
  • Display the result.
from pyspark.sql.functions import count
# Calculate the number of employees in each department
employees_df.groupby(['Department']).agg({'Emp_No':'Count'}).show()

+----------+-------------+
|Department|count(Emp_No)|
+----------+-------------+
|     Sales|           13|
|        HR|            8|
|   Finance|           10|
| Marketing|            9|
|        IT|           10|
+----------+-------------+

Task 15: Filter Employees with the letter o in the Name

  • Apply a filter to select records where the employee’s name contains the letter 'o'.
  • Display the filtered DataFrame.
# Apply a filter to select records where the employee's name contains the letter 'o'
from pyspark.sql.functions import col
filtered = employees_df.filter(col('Emp_Name'). rlike('o'))
filtered.show()

# ----- OR ----
from pyspark.sql.functions import col
filtered = employees_df.filter(col('Emp_Name'). contains('o'))
filtered.show()

+------+-----------+------+---+----------+
|Emp_No|   Emp_Name|Salary|Age|Department|
+------+-----------+------+---+----------+
|   198|     Donald|  2600| 29|        IT|
|   199|    Douglas|  2600| 34|     Sales|
|   110|       John|  8200| 31| Marketing|
|   112|Jose Manuel|  7800| 34|        HR|
|   130|      Mozhe|  2800| 28| Marketing|
|   133|      Jason|  3300| 38|     Sales|
|   139|       John|  2700| 36|     Sales|
|   140|     Joshua|  2500| 29|   Finance|
+------+-----------+------+---+----------+
spark.stop()