# Installing required packages
!pip install pyspark findspark wget
import findspark
findspark.init()
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the SparkContext.
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
Data Integration & Transformation - 2
Objectives
Create a DataFrame by loading data from a CSV file and apply transformations and actions using Spark SQL. This needs to be achieved by performing the following tasks:
- Task 1: Generate DataFrame from CSV data.
- Task 2: Define a schema for the data.
- Task 3: Display schema of DataFrame.
- Task 4: Create a temporary view.
- Task 5: Execute an SQL query.
- Task 6: Calculate Average Salary by Department.
- Task 7: Filter and Display IT Department Employees.
- Task 8: Add 10% Bonus to Salaries.
- Task 9: Find Maximum Salary by Age.
- Task 10: Self-Join on Employee Data.
- Task 11: Calculate Average Employee Age.
- Task 12: Calculate Total Salary by Department.
- Task 13: Sort Data by Age and Salary.
- Task 14: Count Employees in Each Department.
- Task 15: Filter Employees with the letter o in the Name.
Setup
Imports
Create SparkContext Object
# Creating a SparkContext object
= SparkContext.getOrCreate() sc
Create SparkSession
# Creating a SparkSession
= SparkSession \
spark \
.builder "Python Spark DataFrames basic example") \
.appName("spark.some.config.option", "some-value") \
.config( .getOrCreate()
Download Data
# Download the CSV data first into a local `employees.csv` file
import wget
"https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/data/employees.csv")
wget.download(
# RESPONSE
100% [................................................................................] 1321 / 1321
'employees.csv'
Task 1: Generate DataFrame from CSV data.
- Read data from the provided CSV file,
employees.csv
and import it into a Spark DataFrame variable namedemployees_df
. - After viewing the data it does appear to contain headers
= spark.read.csv("employees.csv", header = True) employees_df
Preview Data
5)
employees_df.show(
+------+--------+------+---+----------+
|Emp_No|Emp_Name|Salary|Age|Department|
+------+--------+------+---+----------+
| 198| Donald| 2600| 29| IT|
| 199| Douglas| 2600| 34| Sales|
| 200|Jennifer| 4400| 36| Marketing|
| 201| Michael| 13000| 32| IT|
| 202| Pat| 6000| 39| HR|
+------+--------+------+---+----------+
5 rows only showing top
Task 2: Define a schema for the data
- Construct a schema for the input data and then utilize the defined schema to read the CSV file to create a DataFrame named
employees_df
from pyspark.sql.types import StructType, IntegerType, LongType, StringType, StructField
= StructType([
schema "Emp_No", LongType(), False),
StructField("Emp_Name", StringType(), False),
StructField("Salary", IntegerType(), True),
StructField("Age", IntegerType(), False),
StructField("Department", StringType(), False)
StructField(
])
= (spark.read.format("csv").schema(schema).option("header", "true").load("employees.csv"))
employees_df 5)
employees_df.show(+------+--------+------+---+----------+
|Emp_No|Emp_Name|Salary|Age|Department|
+------+--------+------+---+----------+
| 198| Donald| 2600| 29| IT|
| 199| Douglas| 2600| 34| Sales|
| 200|Jennifer| 4400| 36| Marketing|
| 201| Michael| 13000| 32| IT|
| 202| Pat| 6000| 39| HR|
+------+--------+------+---+----------+
5 rows only showing top
Task 3: Display schema of DataFrame
- Display the schema of the
employees_df
DataFrame, showing all columns and their respective data types.
employees_df.printSchema()
root|-- Emp_No: long (nullable = true)
|-- Emp_Name: string (nullable = true)
|-- Salary: integer (nullable = true)
|-- Age: integer (nullable = true)
|-- Department: string (nullable = true)
Task 4: Create a temporary view
- Create a temporary view named
employees
for theemployees_df
DataFrame, enabling Spark SQL queries on the data.
"DROP VIEW IF EXISTS employees")
spark.sql("employees") employees_df.createTempView(
Task 5: Execute an SQL query
- Compose and execute an SQL query to fetch the records from the
employees
view where the age of employees exceeds 30. - Then, display the result of the SQL query, showcasing the filtered records.
= spark.sql("SELECT * FROM employees WHERE age > 30")
result
result.show()
+------+-----------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
+------+-----------+------+---+----------+
| 199| Douglas| 2600| 34| Sales|
| 200| Jennifer| 4400| 36| Marketing|
| 201| Michael| 13000| 32| IT|
| 202| Pat| 6000| 39| HR|
| 203| Susan| 6500| 36| Marketing|
| 205| Shelley| 12008| 33| Finance|
| 206| William| 8300| 37| IT|
| 100| Steven| 24000| 39| IT|
| 102| Lex| 17000| 37| Marketing|
| 103| Alexander| 9000| 39| Marketing|
| 104| Bruce| 6000| 38| IT|
| 105| David| 4800| 39| IT|
| 106| Valli| 4800| 38| Sales|
| 107| Diana| 4200| 35| Sales|
| 109| Daniel| 9000| 35| HR|
| 110| John| 8200| 31| Marketing|
| 111| Ismael| 7700| 32| IT|
| 112|Jose Manuel| 7800| 34| HR|
| 113| Luis| 6900| 34| Sales|
| 116| Shelli| 2900| 37| Finance|
+------+-----------+------+---+----------+
20 rows only showing top
Task 6: Calculate Average Salary by Department
- Compose an SQL query to retrieve the average salary of employees grouped by department.
- Display the result.
'Department']).agg({'Salary':'AVG'}).show()
employees_df.groupby([
----------+-----------------+
|Department| avg(Salary)|
+----------+-----------------+
| Sales|5492.923076923077|
| HR| 5837.5|
| Finance| 5730.8|
| Marketing|6633.333333333333|
| IT| 7400.0|
+----------+-----------------+
Task 7: Filter and Display IT Department Employees
- Apply a filter on the
employees_df
DataFrame to select records where the department is'IT'
. - Display the filtered DataFrame.
'Department'] == "IT").show()
employees_df.where(employees_df[
+------+--------+------+---+----------+
|Emp_No|Emp_Name|Salary|Age|Department|
+------+--------+------+---+----------+
| 198| Donald| 2600| 29| IT|
| 201| Michael| 13000| 32| IT|
| 206| William| 8300| 37| IT|
| 100| Steven| 24000| 39| IT|
| 104| Bruce| 6000| 38| IT|
| 105| David| 4800| 39| IT|
| 111| Ismael| 7700| 32| IT|
| 129| Laura| 3300| 38| IT|
| 132| TJ| 2100| 34| IT|
| 136| Hazel| 2200| 29| IT|
+------+--------+------+---+----------+
Task 8: Add 10% Bonus to Salaries
- Perform a transformation to add a new column named “SalaryAfterBonus” to the DataFrame.
- Calculate the new salary by adding a 10% bonus to each employee’s salary.
from pyspark.sql.functions import col
# Add a new column "SalaryAfterBonus" with 10% bonus added to the original salary
'SalaryAfterBonus', col('Salary') * 1.1).show()
employees_df.withColumn(
+------+---------+------+---+----------+------------------+
|Emp_No| Emp_Name|Salary|Age|Department| SalaryAfterBonus|
+------+---------+------+---+----------+------------------+
| 198| Donald| 2600| 29| IT|2860.0000000000005|
| 199| Douglas| 2600| 34| Sales|2860.0000000000005|
| 200| Jennifer| 4400| 36| Marketing| 4840.0|
| 201| Michael| 13000| 32| IT|14300.000000000002|
| 202| Pat| 6000| 39| HR| 6600.000000000001|
| 203| Susan| 6500| 36| Marketing| 7150.000000000001|
| 204| Hermann| 10000| 29| Finance| 11000.0|
| 205| Shelley| 12008| 33| Finance|13208.800000000001|
| 206| William| 8300| 37| IT| 9130.0|
| 100| Steven| 24000| 39| IT|26400.000000000004|
| 101| Neena| 17000| 27| Sales| 18700.0|
| 102| Lex| 17000| 37| Marketing| 18700.0|
| 103|Alexander| 9000| 39| Marketing| 9900.0|
| 104| Bruce| 6000| 38| IT| 6600.000000000001|
| 105| David| 4800| 39| IT| 5280.0|
| 106| Valli| 4800| 38| Sales| 5280.0|
| 107| Diana| 4200| 35| Sales| 4620.0|
| 108| Nancy| 12008| 28| Sales|13208.800000000001|
| 109| Daniel| 9000| 35| HR| 9900.0|
| 110| John| 8200| 31| Marketing| 9020.0|
+------+---------+------+---+----------+------------------+
20 rows only showing top
Task 9: Find Maximum Salary by Age
- Group the data by age and calculate the maximum salary for each age group.
- Display the result.
from pyspark.sql.functions import max
# Group data by age and calculate the maximum salary for each age group
'Age']).agg({'Salary':'max'}).show()
employees_df.groupBy([
+---+-----------+
|Age|max(Salary)|
+---+-----------+
| 31| 8200|
| 34| 7800|
| 28| 12008|
| 27| 17000|
| 26| 3600|
| 37| 17000|
| 35| 9000|
| 39| 24000|
| 38| 6000|
| 29| 10000|
| 32| 13000|
| 33| 12008|
| 30| 8000|
| 36| 7900|
+---+-----------+
# Let's sort it by age group
'Age']).agg({'Salary':'max'}).sort('Age').show()
employees_df.groupBy([
+---+-----------+
|Age|max(Salary)|
+---+-----------+
| 26| 3600|
| 27| 17000|
| 28| 12008|
| 29| 10000|
| 30| 8000|
| 31| 8200|
| 32| 13000|
| 33| 12008|
| 34| 7800|
| 35| 9000|
| 36| 7900|
| 37| 17000|
| 38| 6000|
| 39| 24000|
+---+-----------+
Task 10: Self-Join on Employee Data
- Join the “employees_df” DataFrame with itself based on the “Emp_No” column.
- Display the result.
# Join the DataFrame with itself based on the "Emp_No" column
= employees_df.join(employees_df, on = 'Emp_No', how = 'inner')
joined_df
joined_df.show()
+------+---------+------+---+----------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+---------+------+---+----------+
| 198| Donald| 2600| 29| IT| Donald| 2600| 29| IT|
| 199| Douglas| 2600| 34| Sales| Douglas| 2600| 34| Sales|
| 200| Jennifer| 4400| 36| Marketing| Jennifer| 4400| 36| Marketing|
| 201| Michael| 13000| 32| IT| Michael| 13000| 32| IT|
| 202| Pat| 6000| 39| HR| Pat| 6000| 39| HR|
| 203| Susan| 6500| 36| Marketing| Susan| 6500| 36| Marketing|
| 204| Hermann| 10000| 29| Finance| Hermann| 10000| 29| Finance|
| 205| Shelley| 12008| 33| Finance| Shelley| 12008| 33| Finance|
| 206| William| 8300| 37| IT| William| 8300| 37| IT|
| 100| Steven| 24000| 39| IT| Steven| 24000| 39| IT|
| 101| Neena| 17000| 27| Sales| Neena| 17000| 27| Sales|
| 102| Lex| 17000| 37| Marketing| Lex| 17000| 37| Marketing|
| 103|Alexander| 9000| 39| Marketing|Alexander| 9000| 39| Marketing|
| 104| Bruce| 6000| 38| IT| Bruce| 6000| 38| IT|
| 105| David| 4800| 39| IT| David| 4800| 39| IT|
| 106| Valli| 4800| 38| Sales| Valli| 4800| 38| Sales|
| 107| Diana| 4200| 35| Sales| Diana| 4200| 35| Sales|
| 108| Nancy| 12008| 28| Sales| Nancy| 12008| 28| Sales|
| 109| Daniel| 9000| 35| HR| Daniel| 9000| 35| HR|
| 110| John| 8200| 31| Marketing| John| 8200| 31| Marketing|
+------+---------+------+---+----------+---------+------+---+----------+
20 rows only showing top
Task 11: Calculate Average Employee Age
- Calculate the average age of employees using the built-in aggregation function.
- Display the result
# Calculate the average age of employees
from pyspark.sql.functions import avg
"DROP VIEW IF EXISTS T_view")
spark.sql("T_view")
employees_df.createTempView("SELECT * FROM T_view").show()
spark.sql(+------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+
| 198| Donald| 2600| 29| IT|
| 199| Douglas| 2600| 34| Sales|
| 200| Jennifer| 4400| 36| Marketing|
| 201| Michael| 13000| 32| IT|
| 202| Pat| 6000| 39| HR|
| 203| Susan| 6500| 36| Marketing|
| 204| Hermann| 10000| 29| Finance|
| 205| Shelley| 12008| 33| Finance|
| 206| William| 8300| 37| IT|
| 100| Steven| 24000| 39| IT|
| 101| Neena| 17000| 27| Sales|
| 102| Lex| 17000| 37| Marketing|
| 103|Alexander| 9000| 39| Marketing|
| 104| Bruce| 6000| 38| IT|
| 105| David| 4800| 39| IT|
| 106| Valli| 4800| 38| Sales|
| 107| Diana| 4200| 35| Sales|
| 108| Nancy| 12008| 28| Sales|
| 109| Daniel| 9000| 35| HR|
| 110| John| 8200| 31| Marketing|
+------+---------+------+---+----------+
20 rows
only showing top
# Average age of all employees
"SELECT avg(Age) FROM T_view").show()
spark.sql(+--------+
|avg(Age)|
+--------+
| 33.56|
+--------+
Task 12: Calculate Total Salary by Department
- Calculate the total salary for each department using the built-in aggregation function.
- Display the result.
# Calculate the total salary for each department. Hint - User GroupBy and Aggregate functions
from pyspark.sql.functions import sum
'Department']).agg({'Salary':'sum'}).show()
employees_df.groupby([+----------+-----------+
|Department|sum(Salary)|
+----------+-----------+
| Sales| 71408|
| HR| 46700|
| Finance| 57308|
| Marketing| 59700|
| IT| 74000|
+----------+-----------+
Task 13: Sort Data by Age and Salary
- Apply a transformation to sort the DataFrame by age in ascending order and
- then by salary in descending order.
- Display the sorted DataFrame.
# Sort the DataFrame by age in ascending order and then by salary in descending order
'Salary',ascending=False).sort('Age').show()
employees_df.sort(+------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+
| 137| Renske| 3600| 26| Marketing|
| 101| Neena| 17000| 27| Sales|
| 114| Den| 11000| 27| Finance|
| 108| Nancy| 12008| 28| Sales|
| 126| Irene| 2700| 28| HR|
| 130| Mozhe| 2800| 28| Marketing|
| 198| Donald| 2600| 29| IT|
| 204| Hermann| 10000| 29| Finance|
| 115|Alexander| 3100| 29| Finance|
| 134| Michael| 2900| 29| Sales|
| 136| Hazel| 2200| 29| IT|
| 140| Joshua| 2500| 29| Finance|
| 120| Matthew| 8000| 30| HR|
| 110| John| 8200| 31| Marketing|
| 127| James| 2400| 31| HR|
| 201| Michael| 13000| 32| IT|
| 111| Ismael| 7700| 32| IT|
| 119| Karen| 2500| 32| Finance|
| 205| Shelley| 12008| 33| Finance|
| 117| Sigal| 2800| 33| Sales|
+------+---------+------+---+----------+
20 rows only showing top
- Both in ascending order
'Age','Salary').show()
employees_df.sort(
+------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+
| 137| Renske| 3600| 26| Marketing|
| 114| Den| 11000| 27| Finance|
| 101| Neena| 17000| 27| Sales|
| 126| Irene| 2700| 28| HR|
| 130| Mozhe| 2800| 28| Marketing|
| 108| Nancy| 12008| 28| Sales|
| 136| Hazel| 2200| 29| IT|
| 140| Joshua| 2500| 29| Finance|
| 198| Donald| 2600| 29| IT|
| 134| Michael| 2900| 29| Sales|
| 115|Alexander| 3100| 29| Finance|
| 204| Hermann| 10000| 29| Finance|
| 120| Matthew| 8000| 30| HR|
| 127| James| 2400| 31| HR|
| 110| John| 8200| 31| Marketing|
| 119| Karen| 2500| 32| Finance|
| 111| Ismael| 7700| 32| IT|
| 201| Michael| 13000| 32| IT|
| 128| Steven| 2200| 33| Finance|
| 117| Sigal| 2800| 33| Sales|
+------+---------+------+---+----------+
20 rows only showing top
Task 14: Count Employees in Each Department
- Calculate the number of employees in each department.
- Display the result.
from pyspark.sql.functions import count
# Calculate the number of employees in each department
'Department']).agg({'Emp_No':'Count'}).show()
employees_df.groupby([
+----------+-------------+
|Department|count(Emp_No)|
+----------+-------------+
| Sales| 13|
| HR| 8|
| Finance| 10|
| Marketing| 9|
| IT| 10|
+----------+-------------+
Task 15: Filter Employees with the letter o in the Name
- Apply a filter to select records where the employee’s name contains the letter
'o'
. - Display the filtered DataFrame.
# Apply a filter to select records where the employee's name contains the letter 'o'
from pyspark.sql.functions import col
= employees_df.filter(col('Emp_Name'). rlike('o'))
filtered
filtered.show()
# ----- OR ----
from pyspark.sql.functions import col
= employees_df.filter(col('Emp_Name'). contains('o'))
filtered
filtered.show()
+------+-----------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
+------+-----------+------+---+----------+
| 198| Donald| 2600| 29| IT|
| 199| Douglas| 2600| 34| Sales|
| 110| John| 8200| 31| Marketing|
| 112|Jose Manuel| 7800| 34| HR|
| 130| Mozhe| 2800| 28| Marketing|
| 133| Jason| 3300| 38| Sales|
| 139| John| 2700| 36| Sales|
| 140| Joshua| 2500| 29| Finance|
+------+-----------+------+---+----------+
spark.stop()