pip install pyspark pip install findspark
ETL Retail Data - Spark
RetailX, a prominent retail chain with numerous stores across Mars, faces the challenge of processing and analyzing substantial volumes of daily sales data.
- With real-time data streaming from multiple sources, RetailX needs to clean, transform, and aggregate this data to derive actionable insights such as
- Total Sales and Revenue per Product
- Total Sales and Revenue per Store
- Sales and Revenue per Promotion Type
- Stock Analysis per Product
Challenge
Traditional data processing tools are inadequate for handling the velocity and volume of incoming sales data, leading to delays in analysis and decision-making. These delays hinder RetailX’s ability to respond swiftly to market demands and optimize inventory and sales strategies.
Solution
To address these challenges, RetailX requires a scalable and efficient solution. Apache Spark, with its distributed computing architecture and robust processing capabilities, is the ideal solution for RetailX’s data analytics needs.
- Spark’s ability to parallelize data processing tasks across a cluster of nodes enables rapid aggregation and analysis of large datasets.
- Additionally, its fault-tolerant design ensures reliability and resilience against failures, making it a dependable choice for RetailX’s critical data processing tasks.
Objectives
- Understand the Distributed Architecture of Spark in the context of a Real Time Problem
- Perform Data Parsing and Cleaning of Data
- Perform various aggregations to derive insights from the cleaned data
- Save the aggregated results to HDFS (Hadoop Distributed File System) for further storage and processing.
Data
This dataset is a modified sales dataset taken from th Kaggle website. This data is collected from a retail company on Mars, covering the period from the beginning of 2017 to the end of 2019. It currently consists of 1033435 records.
Description
product_id: This attribute represents the unique identifier for each product in the dataset. Each product is assigned a specific ID (e.g., P0001).
store_id: This attribute represents the unique identifier for each store where the product is sold. Each store is assigned a specific ID (e.g., S0002).
date: This attribute represents the date of sales data. It indicates when the sales, revenue, stock, and other information were recorded for a particular product in a specific store.
sales: This attribute represents the number of units of the product sold on a given date in a particular store. It indicates the quantity of the product that was purchased.
revenue: This attribute represents the total revenue generated from the sales of the product on a given date in a specific store. It is calculated by multiplying the number of units sold (sales) by the price per unit (price).
stock: This attribute represents the quantity of the product available in stock at the beginning of the day on the specified date in the given store.
price: This attribute represents the price per unit of the product on a given date in a specific store. It indicates the amount charged to the customer for each unit of the product.
promo_type_1: This attribute represents the type of promotion (if any) applied to the product. It indicates the first type of promotional activity associated with the product, such as discounts, special offers, or marketing campaigns.
promo_bin_1: This attribute represents the specific promotional bin (if any) associated with the first type of promotion. It provides additional details about the nature or category of the promotion.
promo_type_2: This attribute represents the type of secondary promotion (if any) applied to the product. It indicates another type of promotional activity associated with the product, similar to promo_type_1 but potentially different in nature or timing.
These attributes collectively provide detailed information about the sales, revenue, pricing, and promotional activities associated with each product in various stores over time.
Setup
Install Libraries
Suppress Warnings
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
pass
import warnings
= warn
warnings.warn 'ignore') warnings.filterwarnings(
# Findspark simplifies the process of using Spark with Python
import findspark
findspark.init()
# import sparksession
from pyspark.sql import SparkSession
Initialize SparkContext
# This next library is used to edit or create a schema but is not needed this time
from pyspark import SparkContext, SparkConf
from datetime import datetime
# Initialize Spark Context
= SparkContext(appName = "RetailStoreAnalysis") sc
Load Data
Download Data
import wget
"https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/XXlNzqYcxqkTbllc-tL_0w/Retailsales.csv") wget.download (
Load from CSV
- Load the data from a CSV file named ““Retailsales.csv” using SparkContext’s textFile function.
- The data is loaded as an RDD (Resilient Distributed Dataset) named raw_data.
= sc.textFile("Retailsales.csv") raw_data
# Preview data
10).foreach(print) raw_data.take(
Transform Data
Parse & Clean
- The header line is removed from the RDD.
- The
parse_line
function is defined to parse each line of the CSV file into a structured format, extracting fields like product ID, store ID, date, sales, revenue, etc - The parsed data is filtered to remove records with missing or invalid data, such as zero or negative sales or price.
# Parse and Clean Data function
def parse_line(line):
# Split the line by comma to get fields
= line.split(",")
fields # Return a dictionary with parsed fields
return {
'product_id': fields[0],
'store_id': fields[1],
'date': fields[2],
'sales': float(fields[3]),
'revenue': float(fields[4]),
'stock': float(fields[5]),
'price': float(fields[6]),
'promo_type_1': fields[7],
'promo_type_2': fields[9]
}
# Remove the header line
= raw_data.first()
header
= raw_data.filter(lambda line: line != header)
raw_data_no_header
# Parse the lines into a structured format
= raw_data_no_header.map(parse_line)
parsed_data = parsed_data.filter(lambda x: x is not None)
parsed_data
# Filter out records with missing or invalid data
= parsed_data.filter(lambda x: x['sales'] > 0 and x['price'] > 0) cleaned_data
Partition
Function count_in_partition
is defined to count the number of records in each partition of the RDD. This function is applied using mapPartitionsWithIndex
to get the count of records in each partition, and the results are printed.
# Function to count the number of records in each partition
def count_in_partition(index, iterator):
= sum(1 for _ in iterator)
count yield (index, count)
# Get the count of records in each partition
= cleaned_data.mapPartitionsWithIndex(count_in_partition).collect()
partitions_info print("Number of records in each partition:")
for partition, count in partitions_info:
print(f"Partition {partition}: {count} records")
# OUTPUT
in each partition:
Number of records 0: 97534 records
Partition 1: 99127 records Partition
Aggregation
In order to solve the issues raised at the start of the project, several aggregations will be performed
- Total sales and revenue per product.
- Total sales and revenue per store.
- Average price per product.
- Sales and revenue per promotion type 1 and promotion type 2.
- Stock analysis per product.
- Each aggregation is performed using map to transform the data into key-value pairs and reduceByKey to aggregate the values for each key.
1 - Total Sales & Revenue/Product
This aggregation calculates the total sales and revenue for each product.
- It first maps each record in cleaned_data to a key-value pair, where
- the key is the product ID and the value is a tuple containing sales and revenue
- it uses reduceByKey to aggregate the sales and revenue values for each product ID
# Aggregation 1: Total Sales and Revenue per Product
= cleaned_data.map(lambda x: (x['product_id'], (x['sales'], x['revenue']))) \
sales_revenue_per_product lambda a, b: (a[0] + b[0], a[1] + b[1]))
.reduceByKey(print(f"Number of partitions in cleaned_data: {cleaned_data.getNumPartitions()}")
#OUTPUT
in cleaned_data: 2 Number of partitions
2 - Total Sales & Revenue/Store
This aggregation calculates the total sales and revenue for each store.
- Similar to the first aggregation, it maps each record to a key-value pair with the store ID as the key and a tuple containing sales and revenue as the value.
- It then uses reduceByKey to aggregate the sales and revenue values for each store ID.
# Aggregation 2: Total Sales and Revenue per Store
= cleaned_data.map(lambda x: (x['store_id'], (x['sales'], x['revenue']))) \
sales_revenue_per_store lambda a, b: (a[0] + b[0], a[1] + b[1])) .reduceByKey(
3 - Average Price/Product
This aggregation calculates the average price for each product.
- It first maps each record to a key-value pair with the product ID as the key and a tuple containing the price and a count of 1.
- Then, it uses reduceByKey to aggregate the total price and count of prices for each product.
- Finally, it calculates the average price by dividing the total price by the count.
# Aggregation 3: Average Price per Product
= cleaned_data.map(lambda x: (x['product_id'], (x['price'], 1))) \
total_price_count_per_product lambda a, b: (a[0] + b[0], a[1] + b[1]))
.reduceByKey(= total_price_count_per_product.mapValues(lambda x: x[0] / x[1]) average_price_per_product
4 - Sales & Revenue/Promotion
These aggregations calculate the total sales and revenue for each promotion type (promo_type_1 and promo_type_2).
- Each record is mapped to a key-value pair with the promotion type as the key and a tuple containing sales and revenue as the value.
- Then, reduceByKey is used to aggregate the sales and revenue values for each promotion type.
# Aggregation 4: Sales and Revenue per Promotion Type
= cleaned_data.map(lambda x: (x['promo_type_1'], (x['sales'], x['revenue']))) \
sales_revenue_per_promo_1 lambda a, b: (a[0] + b[0], a[1] + b[1]))
.reduceByKey(= cleaned_data.map(lambda x: (x['promo_type_2'], (x['sales'], x['revenue']))) \
sales_revenue_per_promo_2 lambda a, b: (a[0] + b[0], a[1] + b[1])) .reduceByKey(
5 - Stock Analysis/Product
This aggregation calculates the total stock for each product.
- Each record is mapped to a key-value pair with the product ID as the key and the stock as the value.
- Then, reduceByKey is used to aggregate the stock values for each product.
# Aggregation 5: Stock Analysis per Product
= cleaned_data.map(lambda x: (x['product_id'], x['stock'])) \
stock_per_product lambda a, b: a + b) .reduceByKey(
Save to HDFS
Save the results of each aggregation to HDFS using saveAsTextFile
# Save results to HDFS
"sales_revenue_per_product")
sales_revenue_per_product.saveAsTextFile("sales_revenue_per_store")
sales_revenue_per_store.saveAsTextFile("average_price_per_product")
average_price_per_product.saveAsTextFile("sales_revenue_per_promo_1")
sales_revenue_per_promo_1.saveAsTextFile("sales_revenue_per_promo_2")
sales_revenue_per_promo_2.saveAsTextFile("stock_per_product") stock_per_product.saveAsTextFile(
Print Aggregations
# Print results
print("Total Sales and Revenue per Product:")
print("=" * 35)
for product in sales_revenue_per_product.collect():
# Create the format string with appropriate padding
= f"{{:<5}} | {{:<9}} | {{:<9}}"
format_string
# Print the values using the format string
print(format_string.format(str(product[0]), str(round(product[1][0],2)), str(round(product[1][1],2))))
and Revenue per Product:
Total Sales ===================================
| 60.0 | 111.08
P0016 | 26381.0 | 16782.56
P0051 | 1136.0 | 3650.86
P0055 | 233.0 | 2478.77
P0057 | 286.0 | 3799.19
P0067 | 255.0 | 1188.18
P0068 | 1366.0 | 6895.11
P0070 | 63.0 | 484.85
P0071 | 5464.0 | 10949.09 P0079
print("\n\nTotal Sales and Revenue per Store:")
print("=" * 35)
for store in sales_revenue_per_store.collect():
= f"{{:<5}} | {{:<9}} | {{:<9}}"
format_string print(format_string.format(str(store[0]), str(round(store[1][0],2)), str(round(store[1][1],2))))
and Revenue per Store:
Total Sales ===================================
| 3969.6 | 16489.44
S0013 | 5526.99 | 13176.46
S0106 | 1378.0 | 2432.52
S0044 | 6375.7 | 27373.84
S0001 | 2655.77 | 5713.52
S0032 | 9843.92 | 33434.98
S0040 | 1387.01 | 3096.19 S0068
print("\n\nAverage Price per Product:")
print("=" * 30)
for product in average_price_per_product.collect():
= f"{{:<5}} | {{:<9}}"
format_string print(format_string.format(str(product[0]), str(round(product[1],2))))
Average Price per Product:==============================
| 2.0
P0016 | 0.7
P0051 | 3.49
P0055 | 13.92
P0057 | 16.37
P0067 | 5.31
P0068 | 6.22 P0070
print("\n\nSales and Revenue per Promotion Type 1:")
print("=" * 40)
for promo in sales_revenue_per_promo_1.collect():
= f"{{:<5}} | {{:<9}} | {{:<9}}"
format_string print(format_string.format(str(promo[0]), str(round(promo[1][0],2)), str(round(promo[1][1],2))))
and Revenue per Promotion Type 1:
Sales ========================================
| 13645.0 | 81005.96
PR05 | 7957.81 | 53616.67
PR03 | 3800.0 | 7620.63
PR06 | 838.0 | 4939.66
PR07 | 4319.0 | 20421.66
PR12 | 1007.0 | 12547.35
PR08 | 3233.0 | 13896.21 PR09
print("\n\nSales and Revenue per Promotion Type 2:")
print("=" * 40)
for promo in sales_revenue_per_promo_2.collect():
= f"{{:<5}} | {{:<9}} | {{:<9}}"
format_string
print(format_string.format(str(promo[0]), str(round(promo[1][0],2)), str(round(promo[1][1],2))))
and Revenue per Promotion Type 2:
Sales ========================================
| 612642.47 | 1968846.77 PR03
print("\n\nStock per Product:")
print("=" * 20)
for product in stock_per_product.collect():
= f"{{:<5}} | {{:<9}}"
format_string print(format_string.format(str(product[0]), str(round(product[1],2))))
Stock per Product:====================
| 1032.0
P0016 | 476816.0
P0051 | 14777.0
P0055 | 1317.0
P0057 | 1501.0
P0067 | 1970.0
P0068 | 6191.0
P0070 | 225.0
P0071 | 161374.0 P0079
Cleanup
# Stop the SparkContext
sc.stop()
Jupyter notebook can be found at spark/spark_etl_retail_store.ipynb