and findspark
install wget pyspark
# Response
Successfully built pyspark
Installing collected packages: py4j, findspark, pyspark-2.0.1 py4j-0.10.9.7 pyspark-3.4.4 Successfully installed findspark
Data Integration & Transformation - 1
Objectives
In this example we will work with two datasets and perform the following:
- add columns
- rename columns
- drop unnecessary columns
- join dataframes
- write results into both a Hive warehouse and HDFS file system
Setup
Initiate Spark Session
import findspark
findspark.init()
Initialize SparkContext
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the SparkContext.
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
Create SparkContext Object
= SparkContext.getOrCreate() sc
Create Spark Session
= SparkSession \
spark \
.builder "Python Spark DataFrames basic example") \
.appName("spark.some.config.option", "some-value") \
.config( .getOrCreate()
Load Data into DataFrames
Download Data
Find the data at:
- https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv
- https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv
import wget
= 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv'
link_to_data1
wget.download(link_to_data1)
= 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv'
link_to_data2 wget.download(link_to_data2)
Data to DF
#load the data into a pyspark dataframe
= spark.read.csv("dataset1.csv", header=True, inferSchema=True)
df1 = spark.read.csv("dataset2.csv", header=True, inferSchema=True) df2
Display Schema
- Lets look at the schema of both dataframes to better understand the structure
#print the schema of df1 and df2
df1.printSchema()
root|-- customer_id: integer (nullable = true)
|-- date_column: string (nullable = true)
|-- amount: integer (nullable = true)
|-- description: string (nullable = true)
|-- location: string (nullable = true)
df2.printSchema()
root|-- customer_id: integer (nullable = true)
|-- transaction_date: string (nullable = true)
|-- value: integer (nullable = true)
|-- notes: string (nullable = true)
Transform
Add New Column
- Add a new column named year to
df1
and quarter todf2
representing the year and quarter of the data. - Convert the date columns which are currently strings to date before extracting needed information
from pyspark.sql.functions import year, quarter
#Add new column year to df1
= df1.withColumn('year', year(to_date('date_column','dd/MM/yyyy')))
df1
#Add new column quarter to df2
= df2.withColumn('quarter', quarter(to_date('transaction_date','dd/MM/yyyy'))) df2
Print Schema Again
df1.printSchema()
root|-- customer_id: integer (nullable = true)
|-- date_column: string (nullable = true)
|-- amount: integer (nullable = true)
|-- description: string (nullable = true)
|-- location: string (nullable = true)
|-- year: integer (nullable = true)
df2.printSchema()
root|-- customer_id: integer (nullable = true)
|-- transaction_date: string (nullable = true)
|-- value: integer (nullable = true)
|-- notes: string (nullable = true)
|-- quarter: integer (nullable = true)
Rename Columns
- Rename the column amount in df1 to transaction_amount
- Rename the column value in df2 to transaction _value
#Rename df1 column amount to transaction_amount
= df1.withColumnRenamed('amount', 'transaction_amount')
df1
#Rename df2 column value to transaction_value
= df2.withColumnRenamed('value', 'transaction_value') df2
Drop Columns
- Drop the columns description and location from df1
- Drop notes from df2
#Drop columns description and location from df1
= df1.drop('description', 'location')
df1
#Drop column notes from df2
= df2.drop('notes') df2
Join
- Join df1 and df2 based on the common column customer_id into joined_df
#join df1 and df2 based on common column customer_id
= df1.join(df2, 'customer_id', 'inner')
joined_df
joined_df.printSchema()
root|-- customer_id: integer (nullable = true)
|-- date_column: string (nullable = true)
|-- transaction_amount: integer (nullable = true)
|-- year: integer (nullable = true)
|-- transaction_date: string (nullable = true)
|-- transaction_value: integer (nullable = true)
|-- quarter: integer (nullable = true)
Filter Conditional
- Filter df to include only transactions where transaction_amount is greater than 1000
- Create new df filtered_df
# filter the dataframe for transaction amount > 1000
= joined_df.filter("transaction_amount > 1000") filtered_df
Aggregate By Customer
- Calculate the total transaction amount for each customer and display results in total_amount
- Use pyspark.sql.functions
from pyspark.sql.functions import sum
# group by customer_id and aggregate the sum of transaction amount
= filtered_df.groupBy('customer_id').agg(sum('transaction_amount').alias('total_amount'))
total_amount_per_customer
#display the result
total_amount_per_customer.show()
+-----------+------------+
|customer_id|total_amount|
+-----------+------------+
| 31| 3200|
| 85| 1800|
| 78| 1500|
| 34| 1200|
| 81| 5500|
| 28| 2600|
| 76| 2600|
| 27| 4200|
| 91| 3200|
| 22| 1200|
| 93| 5500|
| 1| 5000|
| 52| 2600|
| 13| 4800|
| 6| 4500|
| 16| 2600|
| 40| 2600|
| 94| 1200|
| 57| 5500|
| 54| 1500|
+-----------+------------+
20 rows only showing top
Write Out
Write to Hive Table
- Write the total_amount_per_customer to a Hive table named customer_totals
# Write total_amount_per_customer to a Hive table named customer_totals
"overwrite").saveAsTable("customer_totals") total_amount_per_customer.write.mode(
Write to HDFS
- Write the filtered_df to HDFS in parquet format to a file named filtered_data
#Write filtered_df to HDFS in parquet format file filtered_data
"overwrite").parquet("filtered_data.parquet") filtered_df.write.mode(
Transform
Add Column Conditionally
- Add a new column named high_value to
df1
indicating whether the transaction_amount is greater than 5000. - When the value is greater than 5000, the value of the column should be Yes.
- When the value is less than or equal to 5000, the value of the column should be No.
- Use when and lit from pyspark.sql.functions
from pyspark.sql.functions import when, lit
# Add new column with value indicating whether transaction amount is > 5000 or not
= df1.withColumn("high_value", when(df1.transaction_amount > 5000, lit("Yes")).otherwise(lit("No"))) df1
Average per Quarter
- Calculate and display the average transaction value for each quarter in
df2
and - create a new dataframe named
average_value_per_quarter
with columnavg_trans_val
- Use avg from pyspark.sql.functions
from pyspark.sql.functions import avg
#calculate the average transaction value for each quarter in df2
= df2.groupBy('quarter').agg(avg("transaction_value").alias("avg_trans_val"))
average_value_per_quarter
#show the average transaction value for each quarter in df2
average_value_per_quarter.show()
Write Out
Write to Hive Table
- Write
average_value_per_quarter
to a Hive table named quarterly_averages.
#Write average_value_per_quarter to a Hive table named quarterly_averages
"overwrite").saveAsTable("quarterly_averages") average_value_per_quarter.write.mode(
Transform
Transaction Value per Year
- Calculate and display the total transaction value for each year in
df1
and - create a new dataframe named
total_value_per_year
with columntotal_transaction_val
.
Note: The provided DataFrame
df1
does not explicitly have ayear
column initially. However, in Task 3, a new column namedyear
is added todf1
by extracting the year from the date column. Additionally, in Task 4, the columnamount
is renamed totransaction_amount
.
# calculate the total transaction value for each year in df1.
= df1.groupBy('year').agg(sum("transaction_amount").alias("total_transaction_val"))
total_value_per_year
# show the total transaction value for each year in df1.
total_value_per_year.show()
Write Out
Write to HDFS
- Write
total_value_per_year
to HDFS in the CSV format to file named total_value_per_year.
#Write total_value_per_year to HDFS in the CSV format
"overwrite").csv("total_value_per_year.csv") total_value_per_year.write.mode(