Data Integration & Transformation - 1

Objectives


In this example we will work with two datasets and perform the following:

  • add columns
  • rename columns
  • drop unnecessary columns
  • join dataframes
  • write results into both a Hive warehouse and HDFS file system

Setup


install wget pyspark and findspark

# Response
Successfully built pyspark
Installing collected packages: py4j, findspark, pyspark
Successfully installed findspark-2.0.1 py4j-0.10.9.7 pyspark-3.4.4

Initiate Spark Session

import findspark
findspark.init()

Initialize SparkContext

# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the SparkContext. 

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

Create SparkContext Object

sc = SparkContext.getOrCreate()

Create Spark Session

spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

Load Data into DataFrames


Download Data

Find the data at:

import wget

link_to_data1 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv'
wget.download(link_to_data1)

link_to_data2 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv'
wget.download(link_to_data2)

Data to DF

#load the data into a pyspark dataframe
    
df1 = spark.read.csv("dataset1.csv", header=True, inferSchema=True)
df2 = spark.read.csv("dataset2.csv", header=True, inferSchema=True)

Display Schema

  • Lets look at the schema of both dataframes to better understand the structure
#print the schema of df1 and df2
    
df1.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- date_column: string (nullable = true)
 |-- amount: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- location: string (nullable = true)
 
df2.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- value: integer (nullable = true)
 |-- notes: string (nullable = true)

Transform


Add New Column

  • Add a new column named year to df1 and quarter to df2 representing the year and quarter of the data.
  • Convert the date columns which are currently strings to date before extracting needed information
from pyspark.sql.functions import year, quarter

#Add new column year to df1
df1 = df1.withColumn('year', year(to_date('date_column','dd/MM/yyyy')))
    
#Add new column quarter to df2    
df2 = df2.withColumn('quarter', quarter(to_date('transaction_date','dd/MM/yyyy')))

Rename Columns

  • Rename the column amount in df1 to transaction_amount
  • Rename the column value in df2 to transaction _value
#Rename df1 column amount to transaction_amount
df1 = df1.withColumnRenamed('amount', 'transaction_amount')
    
#Rename df2 column value to transaction_value
df2 = df2.withColumnRenamed('value', 'transaction_value')

Drop Columns

  • Drop the columns description and location from df1
  • Drop notes from df2
#Drop columns description and location from df1
df1 = df1.drop('description', 'location')
    
#Drop column notes from df2
df2 = df2.drop('notes')

Join

  • Join df1 and df2 based on the common column customer_id into joined_df
#join df1 and df2 based on common column customer_id
joined_df = df1.join(df2, 'customer_id', 'inner')

joined_df.printSchema()
root
 |-- customer_id: integer (nullable = true)
 |-- date_column: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- transaction_value: integer (nullable = true)
 |-- quarter: integer (nullable = true)

Filter Conditional

  • Filter df to include only transactions where transaction_amount is greater than 1000
  • Create new df filtered_df
# filter the dataframe for transaction amount > 1000
filtered_df = joined_df.filter("transaction_amount > 1000") 

Aggregate By Customer

  • Calculate the total transaction amount for each customer and display results in total_amount
  • Use pyspark.sql.functions
from pyspark.sql.functions import sum
   
# group by customer_id and aggregate the sum of transaction amount

total_amount_per_customer = filtered_df.groupBy('customer_id').agg(sum('transaction_amount').alias('total_amount'))

#display the result
total_amount_per_customer.show()

+-----------+------------+
|customer_id|total_amount|
+-----------+------------+
|         31|        3200|
|         85|        1800|
|         78|        1500|
|         34|        1200|
|         81|        5500|
|         28|        2600|
|         76|        2600|
|         27|        4200|
|         91|        3200|
|         22|        1200|
|         93|        5500|
|          1|        5000|
|         52|        2600|
|         13|        4800|
|          6|        4500|
|         16|        2600|
|         40|        2600|
|         94|        1200|
|         57|        5500|
|         54|        1500|
+-----------+------------+
only showing top 20 rows

Write Out


Write to Hive Table

  • Write the total_amount_per_customer to a Hive table named customer_totals
# Write total_amount_per_customer to a Hive table named customer_totals
total_amount_per_customer.write.mode("overwrite").saveAsTable("customer_totals")

Write to HDFS

  • Write the filtered_df to HDFS in parquet format to a file named filtered_data
#Write filtered_df to HDFS in parquet format file filtered_data
filtered_df.write.mode("overwrite").parquet("filtered_data.parquet")

Transform


Add Column Conditionally

  • Add a new column named high_value to df1 indicating whether the transaction_amount is greater than 5000.
  • When the value is greater than 5000, the value of the column should be Yes.
  • When the value is less than or equal to 5000, the value of the column should be No.
  • Use when and lit from pyspark.sql.functions
from pyspark.sql.functions import when, lit

# Add new column with value indicating whether transaction amount is > 5000 or not
df1 = df1.withColumn("high_value", when(df1.transaction_amount > 5000, lit("Yes")).otherwise(lit("No")))

Average per Quarter

  • Calculate and display the average transaction value for each quarter in df2 and
  • create a new dataframe named average_value_per_quarter with column avg_trans_val
  • Use avg from pyspark.sql.functions
from pyspark.sql.functions import avg

#calculate the average transaction value for each quarter in df2
average_value_per_quarter = df2.groupBy('quarter').agg(avg("transaction_value").alias("avg_trans_val"))

    
#show the average transaction value for each quarter in df2    
average_value_per_quarter.show()

Write Out


Write to Hive Table

  • Write average_value_per_quarter to a Hive table named quarterly_averages.
#Write average_value_per_quarter to a Hive table named quarterly_averages

average_value_per_quarter.write.mode("overwrite").saveAsTable("quarterly_averages")

Transform


Transaction Value per Year

  • Calculate and display the total transaction value for each year in df1 and
  • create a new dataframe named total_value_per_year with column total_transaction_val.

Note: The provided DataFrame df1 does not explicitly have a year column initially. However, in Task 3, a new column named year is added to df1 by extracting the year from the date column. Additionally, in Task 4, the column amount is renamed to transaction_amount.

# calculate the total transaction value for each year in df1.
total_value_per_year = df1.groupBy('year').agg(sum("transaction_amount").alias("total_transaction_val"))

# show the total transaction value for each year in df1.
total_value_per_year.show()

Write Out


Write to HDFS

  • Write total_value_per_year to HDFS in the CSV format to file named total_value_per_year.
#Write total_value_per_year to HDFS in the CSV format

total_value_per_year.write.mode("overwrite").csv("total_value_per_year.csv")