# Installing required packages
!pip install pyspark findspark wget
import findspark
findspark.init()
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the SparkContext.
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
Create SparkContext Object
Data Integration & Transformation - 2
Objectives
Create a DataFrame by loading data from a CSV file and apply transformations and actions using Spark SQL. This needs to be achieved by performing the following tasks:
- Task 1: Generate DataFrame from CSV data.
- Task 2: Define a schema for the data.
- Task 3: Display schema of DataFrame.
- Task 4: Create a temporary view.
- Task 5: Execute an SQL query.
- Task 6: Calculate Average Salary by Department.
- Task 7: Filter and Display IT Department Employees.
- Task 8: Add 10% Bonus to Salaries.
- Task 9: Find Maximum Salary by Age.
- Task 10: Self-Join on Employee Data.
- Task 11: Calculate Average Employee Age.
- Task 12: Calculate Total Salary by Department.
- Task 13: Sort Data by Age and Salary.
- Task 14: Count Employees in Each Department.
- Task 15: Filter Employees with the letter o in the Name.
Setup
Imports
# Creating a SparkContext object
= SparkContext.getOrCreate() sc
Create SparkSession
# Creating a SparkSession
= SparkSession \
spark \
.builder "Python Spark DataFrames basic example") \
.appName("spark.some.config.option", "some-value") \
.config( .getOrCreate()
Download Data
# Download the CSV data first into a local `employees.csv` file
import wget
"https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/data/employees.csv")
wget.download(
# RESPONSE
100% [................................................................................] 1321 / 1321
'employees.csv'
Task 1: Generate DataFrame from CSV data.
- Read data from the provided CSV file,
employees.csv
and import it into a Spark DataFrame variable namedemployees_df
. - After viewing the data it does appear to contain headers
= spark.read.csv("employees.csv", header = True) employees_df
Preview Data
5)
employees_df.show(
+------+--------+------+---+----------+
|Emp_No|Emp_Name|Salary|Age|Department|
+------+--------+------+---+----------+
| 198| Donald| 2600| 29| IT|
| 199| Douglas| 2600| 34| Sales|
| 200|Jennifer| 4400| 36| Marketing|
| 201| Michael| 13000| 32| IT|
| 202| Pat| 6000| 39| HR|
+------+--------+------+---+----------+
5 rows only showing top
Task 2: Define a schema for the data
- Construct a schema for the input data and then utilize the defined schema to read the CSV file to create a DataFrame named
employees_df
from pyspark.sql.types import StructType, IntegerType, LongType, StringType, StructField
= StructType([
schema "Emp_No", LongType(), False),
StructField("Emp_Name", StringType(), False),
StructField("Salary", IntegerType(), True),
StructField("Age", IntegerType(), False),
StructField("Department", StringType(), False)
StructField(
])
= (spark.read.format("csv").schema(schema).option("header", "true").load("employees.csv"))
employees_df 5)
employees_df.show(+------+--------+------+---+----------+
|Emp_No|Emp_Name|Salary|Age|Department|
+------+--------+------+---+----------+
| 198| Donald| 2600| 29| IT|
| 199| Douglas| 2600| 34| Sales|
| 200|Jennifer| 4400| 36| Marketing|
| 201| Michael| 13000| 32| IT|
| 202| Pat| 6000| 39| HR|
+------+--------+------+---+----------+
5 rows only showing top
Task 3: Display schema of DataFrame
- Display the schema of the
employees_df
DataFrame, showing all columns and their respective data types.
employees_df.printSchema()
root|-- Emp_No: long (nullable = true)
|-- Emp_Name: string (nullable = true)
|-- Salary: integer (nullable = true)
|-- Age: integer (nullable = true)
|-- Department: string (nullable = true)
Task 4: Create a temporary view
- Create a temporary view named
employees
for theemployees_df
DataFrame, enabling Spark SQL queries on the data.
"DROP VIEW IF EXISTS employees")
spark.sql("employees") employees_df.createTempView(
Task 5: Execute an SQL query
- Compose and execute an SQL query to fetch the records from the
employees
view where the age of employees exceeds 30. - Then, display the result of the SQL query, showcasing the filtered records.
= spark.sql("SELECT * FROM employees WHERE age > 30")
result
result.show()
+------+-----------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
+------+-----------+------+---+----------+
| 199| Douglas| 2600| 34| Sales|
| 200| Jennifer| 4400| 36| Marketing|
| 201| Michael| 13000| 32| IT|
| 202| Pat| 6000| 39| HR|
| 203| Susan| 6500| 36| Marketing|
| 205| Shelley| 12008| 33| Finance|
| 206| William| 8300| 37| IT|
| 100| Steven| 24000| 39| IT|
| 102| Lex| 17000| 37| Marketing|
| 103| Alexander| 9000| 39| Marketing|
| 104| Bruce| 6000| 38| IT|
| 105| David| 4800| 39| IT|
| 106| Valli| 4800| 38| Sales|
| 107| Diana| 4200| 35| Sales|
| 109| Daniel| 9000| 35| HR|
| 110| John| 8200| 31| Marketing|
| 111| Ismael| 7700| 32| IT|
| 112|Jose Manuel| 7800| 34| HR|
| 113| Luis| 6900| 34| Sales|
| 116| Shelli| 2900| 37| Finance|
+------+-----------+------+---+----------+
20 rows only showing top
Task 6: Calculate Average Salary by Department
- Compose an SQL query to retrieve the average salary of employees grouped by department.
- Display the result.
'Department']).agg({'Salary':'AVG'}).show()
employees_df.groupby([
----------+-----------------+
|Department| avg(Salary)|
+----------+-----------------+
| Sales|5492.923076923077|
| HR| 5837.5|
| Finance| 5730.8|
| Marketing|6633.333333333333|
| IT| 7400.0|
+----------+-----------------+
Task 7: Filter and Display IT Department Employees
- Apply a filter on the
employees_df
DataFrame to select records where the department is'IT'
. - Display the filtered DataFrame.
'Department'] == "IT").show()
employees_df.where(employees_df[
+------+--------+------+---+----------+
|Emp_No|Emp_Name|Salary|Age|Department|
+------+--------+------+---+----------+
| 198| Donald| 2600| 29| IT|
| 201| Michael| 13000| 32| IT|
| 206| William| 8300| 37| IT|
| 100| Steven| 24000| 39| IT|
| 104| Bruce| 6000| 38| IT|
| 105| David| 4800| 39| IT|
| 111| Ismael| 7700| 32| IT|
| 129| Laura| 3300| 38| IT|
| 132| TJ| 2100| 34| IT|
| 136| Hazel| 2200| 29| IT|
+------+--------+------+---+----------+
Task 8: Add 10% Bonus to Salaries
- Perform a transformation to add a new column named “SalaryAfterBonus” to the DataFrame.
- Calculate the new salary by adding a 10% bonus to each employee’s salary.
from pyspark.sql.functions import col
# Add a new column "SalaryAfterBonus" with 10% bonus added to the original salary
'SalaryAfterBonus', col('Salary') * 1.1).show()
employees_df.withColumn(
+------+---------+------+---+----------+------------------+
|Emp_No| Emp_Name|Salary|Age|Department| SalaryAfterBonus|
+------+---------+------+---+----------+------------------+
| 198| Donald| 2600| 29| IT|2860.0000000000005|
| 199| Douglas| 2600| 34| Sales|2860.0000000000005|
| 200| Jennifer| 4400| 36| Marketing| 4840.0|
| 201| Michael| 13000| 32| IT|14300.000000000002|
| 202| Pat| 6000| 39| HR| 6600.000000000001|
| 203| Susan| 6500| 36| Marketing| 7150.000000000001|
| 204| Hermann| 10000| 29| Finance| 11000.0|
| 205| Shelley| 12008| 33| Finance|13208.800000000001|
| 206| William| 8300| 37| IT| 9130.0|
| 100| Steven| 24000| 39| IT|26400.000000000004|
| 101| Neena| 17000| 27| Sales| 18700.0|
| 102| Lex| 17000| 37| Marketing| 18700.0|
| 103|Alexander| 9000| 39| Marketing| 9900.0|
| 104| Bruce| 6000| 38| IT| 6600.000000000001|
| 105| David| 4800| 39| IT| 5280.0|
| 106| Valli| 4800| 38| Sales| 5280.0|
| 107| Diana| 4200| 35| Sales| 4620.0|
| 108| Nancy| 12008| 28| Sales|13208.800000000001|
| 109| Daniel| 9000| 35| HR| 9900.0|
| 110| John| 8200| 31| Marketing| 9020.0|
+------+---------+------+---+----------+------------------+
20 rows only showing top
Task 9: Find Maximum Salary by Age
- Group the data by age and calculate the maximum salary for each age group.
- Display the result.
from pyspark.sql.functions import max
# Group data by age and calculate the maximum salary for each age group
'Age']).agg({'Salary':'max'}).show()
employees_df.groupBy([
+---+-----------+
|Age|max(Salary)|
+---+-----------+
| 31| 8200|
| 34| 7800|
| 28| 12008|
| 27| 17000|
| 26| 3600|
| 37| 17000|
| 35| 9000|
| 39| 24000|
| 38| 6000|
| 29| 10000|
| 32| 13000|
| 33| 12008|
| 30| 8000|
| 36| 7900|
+---+-----------+
# Let's sort it by age group
'Age']).agg({'Salary':'max'}).sort('Age').show()
employees_df.groupBy([
+---+-----------+
|Age|max(Salary)|
+---+-----------+
| 26| 3600|
| 27| 17000|
| 28| 12008|
| 29| 10000|
| 30| 8000|
| 31| 8200|
| 32| 13000|
| 33| 12008|
| 34| 7800|
| 35| 9000|
| 36| 7900|
| 37| 17000|
| 38| 6000|
| 39| 24000|
+---+-----------+
Task 10: Self-Join on Employee Data
- Join the “employees_df” DataFrame with itself based on the “Emp_No” column.
- Display the result.
# Join the DataFrame with itself based on the "Emp_No" column
= employees_df.join(employees_df, on = 'Emp_No', how = 'inner')
joined_df
joined_df.show()
+------+---------+------+---+----------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+---------+------+---+----------+
| 198| Donald| 2600| 29| IT| Donald| 2600| 29| IT|
| 199| Douglas| 2600| 34| Sales| Douglas| 2600| 34| Sales|
| 200| Jennifer| 4400| 36| Marketing| Jennifer| 4400| 36| Marketing|
| 201| Michael| 13000| 32| IT| Michael| 13000| 32| IT|
| 202| Pat| 6000| 39| HR| Pat| 6000| 39| HR|
| 203| Susan| 6500| 36| Marketing| Susan| 6500| 36| Marketing|
| 204| Hermann| 10000| 29| Finance| Hermann| 10000| 29| Finance|
| 205| Shelley| 12008| 33| Finance| Shelley| 12008| 33| Finance|
| 206| William| 8300| 37| IT| William| 8300| 37| IT|
| 100| Steven| 24000| 39| IT| Steven| 24000| 39| IT|
| 101| Neena| 17000| 27| Sales| Neena| 17000| 27| Sales|
| 102| Lex| 17000| 37| Marketing| Lex| 17000| 37| Marketing|
| 103|Alexander| 9000| 39| Marketing|Alexander| 9000| 39| Marketing|
| 104| Bruce| 6000| 38| IT| Bruce| 6000| 38| IT|
| 105| David| 4800| 39| IT| David| 4800| 39| IT|
| 106| Valli| 4800| 38| Sales| Valli| 4800| 38| Sales|
| 107| Diana| 4200| 35| Sales| Diana| 4200| 35| Sales|
| 108| Nancy| 12008| 28| Sales| Nancy| 12008| 28| Sales|
| 109| Daniel| 9000| 35| HR| Daniel| 9000| 35| HR|
| 110| John| 8200| 31| Marketing| John| 8200| 31| Marketing|
+------+---------+------+---+----------+---------+------+---+----------+
20 rows only showing top
Task 11: Calculate Average Employee Age
- Calculate the average age of employees using the built-in aggregation function.
- Display the result
# Calculate the average age of employees
from pyspark.sql.functions import avg
"DROP VIEW IF EXISTS T_view")
spark.sql("T_view")
employees_df.createTempView("SELECT * FROM T_view").show()
spark.sql(+------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+
| 198| Donald| 2600| 29| IT|
| 199| Douglas| 2600| 34| Sales|
| 200| Jennifer| 4400| 36| Marketing|
| 201| Michael| 13000| 32| IT|
| 202| Pat| 6000| 39| HR|
| 203| Susan| 6500| 36| Marketing|
| 204| Hermann| 10000| 29| Finance|
| 205| Shelley| 12008| 33| Finance|
| 206| William| 8300| 37| IT|
| 100| Steven| 24000| 39| IT|
| 101| Neena| 17000| 27| Sales|
| 102| Lex| 17000| 37| Marketing|
| 103|Alexander| 9000| 39| Marketing|
| 104| Bruce| 6000| 38| IT|
| 105| David| 4800| 39| IT|
| 106| Valli| 4800| 38| Sales|
| 107| Diana| 4200| 35| Sales|
| 108| Nancy| 12008| 28| Sales|
| 109| Daniel| 9000| 35| HR|
| 110| John| 8200| 31| Marketing|
+------+---------+------+---+----------+
20 rows
only showing top
# Average age of all employees
"SELECT avg(Age) FROM T_view").show()
spark.sql(+--------+
|avg(Age)|
+--------+
| 33.56|
+--------+
Task 12: Calculate Total Salary by Department
- Calculate the total salary for each department using the built-in aggregation function.
- Display the result.
# Calculate the total salary for each department. Hint - User GroupBy and Aggregate functions
from pyspark.sql.functions import sum
'Department']).agg({'Salary':'sum'}).show()
employees_df.groupby([+----------+-----------+
|Department|sum(Salary)|
+----------+-----------+
| Sales| 71408|
| HR| 46700|
| Finance| 57308|
| Marketing| 59700|
| IT| 74000|
+----------+-----------+
Task 13: Sort Data by Age and Salary
- Apply a transformation to sort the DataFrame by age in ascending order and
- then by salary in descending order.
- Display the sorted DataFrame.
# Sort the DataFrame by age in ascending order and then by salary in descending order
'Salary',ascending=False).sort('Age').show()
employees_df.sort(+------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+
| 137| Renske| 3600| 26| Marketing|
| 101| Neena| 17000| 27| Sales|
| 114| Den| 11000| 27| Finance|
| 108| Nancy| 12008| 28| Sales|
| 126| Irene| 2700| 28| HR|
| 130| Mozhe| 2800| 28| Marketing|
| 198| Donald| 2600| 29| IT|
| 204| Hermann| 10000| 29| Finance|
| 115|Alexander| 3100| 29| Finance|
| 134| Michael| 2900| 29| Sales|
| 136| Hazel| 2200| 29| IT|
| 140| Joshua| 2500| 29| Finance|
| 120| Matthew| 8000| 30| HR|
| 110| John| 8200| 31| Marketing|
| 127| James| 2400| 31| HR|
| 201| Michael| 13000| 32| IT|
| 111| Ismael| 7700| 32| IT|
| 119| Karen| 2500| 32| Finance|
| 205| Shelley| 12008| 33| Finance|
| 117| Sigal| 2800| 33| Sales|
+------+---------+------+---+----------+
20 rows only showing top
- Both in ascending order
'Age','Salary').show()
employees_df.sort(
+------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+
| 137| Renske| 3600| 26| Marketing|
| 114| Den| 11000| 27| Finance|
| 101| Neena| 17000| 27| Sales|
| 126| Irene| 2700| 28| HR|
| 130| Mozhe| 2800| 28| Marketing|
| 108| Nancy| 12008| 28| Sales|
| 136| Hazel| 2200| 29| IT|
| 140| Joshua| 2500| 29| Finance|
| 198| Donald| 2600| 29| IT|
| 134| Michael| 2900| 29| Sales|
| 115|Alexander| 3100| 29| Finance|
| 204| Hermann| 10000| 29| Finance|
| 120| Matthew| 8000| 30| HR|
| 127| James| 2400| 31| HR|
| 110| John| 8200| 31| Marketing|
| 119| Karen| 2500| 32| Finance|
| 111| Ismael| 7700| 32| IT|
| 201| Michael| 13000| 32| IT|
| 128| Steven| 2200| 33| Finance|
| 117| Sigal| 2800| 33| Sales|
+------+---------+------+---+----------+
20 rows only showing top
Task 14: Count Employees in Each Department
- Calculate the number of employees in each department.
- Display the result.
from pyspark.sql.functions import count
# Calculate the number of employees in each department
'Department']).agg({'Emp_No':'Count'}).show()
employees_df.groupby([
+----------+-------------+
|Department|count(Emp_No)|
+----------+-------------+
| Sales| 13|
| HR| 8|
| Finance| 10|
| Marketing| 9|
| IT| 10|
+----------+-------------+
Task 15: Filter Employees with the letter o in the Name
- Apply a filter to select records where the employee’s name contains the letter
'o'
. - Display the filtered DataFrame.
# Apply a filter to select records where the employee's name contains the letter 'o'
from pyspark.sql.functions import col
= employees_df.filter(col('Emp_Name'). rlike('o'))
filtered
filtered.show()
# ----- OR ----
from pyspark.sql.functions import col
= employees_df.filter(col('Emp_Name'). contains('o'))
filtered
filtered.show()
+------+-----------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
+------+-----------+------+---+----------+
| 198| Donald| 2600| 29| IT|
| 199| Douglas| 2600| 34| Sales|
| 110| John| 8200| 31| Marketing|
| 112|Jose Manuel| 7800| 34| HR|
| 130| Mozhe| 2800| 28| Marketing|
| 133| Jason| 3300| 38| Sales|
| 139| John| 2700| 36| Sales|
| 140| Joshua| 2500| 29| Finance|
+------+-----------+------+---+----------+
spark.stop()