Suppress Warnings

ETL Retail Data - Spark

RetailX, a prominent retail chain with numerous stores across Mars, faces the challenge of processing and analyzing substantial volumes of daily sales data.

  • With real-time data streaming from multiple sources, RetailX needs to clean, transform, and aggregate this data to derive actionable insights such as
  • Total Sales and Revenue per Product
  • Total Sales and Revenue per Store
  • Sales and Revenue per Promotion Type
  • Stock Analysis per Product

Challenge

Traditional data processing tools are inadequate for handling the velocity and volume of incoming sales data, leading to delays in analysis and decision-making. These delays hinder RetailX’s ability to respond swiftly to market demands and optimize inventory and sales strategies.

Solution

To address these challenges, RetailX requires a scalable and efficient solution. Apache Spark, with its distributed computing architecture and robust processing capabilities, is the ideal solution for RetailX’s data analytics needs.

  • Spark’s ability to parallelize data processing tasks across a cluster of nodes enables rapid aggregation and analysis of large datasets.
  • Additionally, its fault-tolerant design ensures reliability and resilience against failures, making it a dependable choice for RetailX’s critical data processing tasks.

Objectives


  1. Understand the Distributed Architecture of Spark in the context of a Real Time Problem
  2. Perform Data Parsing and Cleaning of Data
  3. Perform various aggregations to derive insights from the cleaned data
  4. Save the aggregated results to HDFS (Hadoop Distributed File System) for further storage and processing.

Data


This dataset is a modified sales dataset taken from th Kaggle website. This data is collected from a retail company on Mars, covering the period from the beginning of 2017 to the end of 2019. It currently consists of 1033435 records.

Description

product_id: This attribute represents the unique identifier for each product in the dataset. Each product is assigned a specific ID (e.g., P0001).

store_id: This attribute represents the unique identifier for each store where the product is sold. Each store is assigned a specific ID (e.g., S0002).

date: This attribute represents the date of sales data. It indicates when the sales, revenue, stock, and other information were recorded for a particular product in a specific store.

sales: This attribute represents the number of units of the product sold on a given date in a particular store. It indicates the quantity of the product that was purchased.

revenue: This attribute represents the total revenue generated from the sales of the product on a given date in a specific store. It is calculated by multiplying the number of units sold (sales) by the price per unit (price).

stock: This attribute represents the quantity of the product available in stock at the beginning of the day on the specified date in the given store.

price: This attribute represents the price per unit of the product on a given date in a specific store. It indicates the amount charged to the customer for each unit of the product.

promo_type_1: This attribute represents the type of promotion (if any) applied to the product. It indicates the first type of promotional activity associated with the product, such as discounts, special offers, or marketing campaigns.

promo_bin_1: This attribute represents the specific promotional bin (if any) associated with the first type of promotion. It provides additional details about the nature or category of the promotion.

promo_type_2: This attribute represents the type of secondary promotion (if any) applied to the product. It indicates another type of promotional activity associated with the product, similar to promo_type_1 but potentially different in nature or timing.

These attributes collectively provide detailed information about the sales, revenue, pricing, and promotional activities associated with each product in various stores over time.

Setup


Install Libraries

pip install pyspark
pip install findspark
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')
# Findspark simplifies the process of using Spark with Python
import findspark
findspark.init()

# import sparksession
from pyspark.sql import SparkSession

Initialize SparkContext


# This next library is used to edit or create a schema but is not needed this time
from pyspark import SparkContext, SparkConf
from datetime import datetime

# Initialize Spark Context
sc = SparkContext(appName = "RetailStoreAnalysis")

Load Data


Download Data

import wget
wget.download ("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/XXlNzqYcxqkTbllc-tL_0w/Retailsales.csv")

Load from CSV

  • Load the data from a CSV file named ““Retailsales.csv” using SparkContext’s textFile function.
  • The data is loaded as an RDD (Resilient Distributed Dataset) named raw_data.
raw_data = sc.textFile("Retailsales.csv")
# Preview data
raw_data.take(10).foreach(print)

Transform Data


Parse & Clean

  • The header line is removed from the RDD.
  • The parse_linefunction is defined to parse each line of the CSV file into a structured format, extracting fields like product ID, store ID, date, sales, revenue, etc
  • The parsed data is filtered to remove records with missing or invalid data, such as zero or negative sales or price.
# Parse and Clean Data function
def parse_line(line):
    # Split the line by comma to get fields
    fields = line.split(",")
    # Return a dictionary with parsed fields
    return {
        'product_id': fields[0],
        'store_id': fields[1],
        'date': fields[2],
        'sales': float(fields[3]),
        'revenue': float(fields[4]),
        'stock': float(fields[5]),
        'price': float(fields[6]),
        'promo_type_1': fields[7],
        'promo_type_2': fields[9]
    }

# Remove the header line
header = raw_data.first()

raw_data_no_header = raw_data.filter(lambda line: line != header)

# Parse the lines into a structured format
parsed_data = raw_data_no_header.map(parse_line)
parsed_data = parsed_data.filter(lambda x: x is not None)


# Filter out records with missing or invalid data
cleaned_data = parsed_data.filter(lambda x: x['sales'] > 0 and x['price'] > 0)

Partition


Function count_in_partition is defined to count the number of records in each partition of the RDD. This function is applied using mapPartitionsWithIndex to get the count of records in each partition, and the results are printed.

# Function to count the number of records in each partition
def count_in_partition(index, iterator):
    count = sum(1 for _ in iterator)
    yield (index, count)

# Get the count of records in each partition
partitions_info = cleaned_data.mapPartitionsWithIndex(count_in_partition).collect()
print("Number of records in each partition:")
for partition, count in partitions_info:
    print(f"Partition {partition}: {count} records")
    
# OUTPUT
Number of records in each partition:
Partition 0: 97534 records
Partition 1: 99127 records

Aggregation


In order to solve the issues raised at the start of the project, several aggregations will be performed

  1. Total sales and revenue per product.
  2. Total sales and revenue per store.
  3. Average price per product.
  4. Sales and revenue per promotion type 1 and promotion type 2.
  5. Stock analysis per product.
  6. Each aggregation is performed using map to transform the data into key-value pairs and reduceByKey to aggregate the values for each key.

1 - Total Sales & Revenue/Product

This aggregation calculates the total sales and revenue for each product.

  • It first maps each record in cleaned_data to a key-value pair, where
  • the key is the product ID and the value is a tuple containing sales and revenue
  • it uses reduceByKey to aggregate the sales and revenue values for each product ID
# Aggregation 1: Total Sales and Revenue per Product
sales_revenue_per_product = cleaned_data.map(lambda x: (x['product_id'], (x['sales'], x['revenue']))) \
                                        .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
print(f"Number of partitions in cleaned_data: {cleaned_data.getNumPartitions()}")

#OUTPUT
Number of partitions in cleaned_data: 2

2 - Total Sales & Revenue/Store

This aggregation calculates the total sales and revenue for each store.

  • Similar to the first aggregation, it maps each record to a key-value pair with the store ID as the key and a tuple containing sales and revenue as the value.
  • It then uses reduceByKey to aggregate the sales and revenue values for each store ID.
# Aggregation 2: Total Sales and Revenue per Store
sales_revenue_per_store = cleaned_data.map(lambda x: (x['store_id'], (x['sales'], x['revenue']))) \
                                      .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

3 - Average Price/Product

This aggregation calculates the average price for each product.

  • It first maps each record to a key-value pair with the product ID as the key and a tuple containing the price and a count of 1.
  • Then, it uses reduceByKey to aggregate the total price and count of prices for each product.
  • Finally, it calculates the average price by dividing the total price by the count.
# Aggregation 3: Average Price per Product
total_price_count_per_product = cleaned_data.map(lambda x: (x['product_id'], (x['price'], 1))) \
                                            .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
average_price_per_product = total_price_count_per_product.mapValues(lambda x: x[0] / x[1])

4 - Sales & Revenue/Promotion

These aggregations calculate the total sales and revenue for each promotion type (promo_type_1 and promo_type_2).

  • Each record is mapped to a key-value pair with the promotion type as the key and a tuple containing sales and revenue as the value.
  • Then, reduceByKey is used to aggregate the sales and revenue values for each promotion type.
# Aggregation 4: Sales and Revenue per Promotion Type
sales_revenue_per_promo_1 = cleaned_data.map(lambda x: (x['promo_type_1'], (x['sales'], x['revenue']))) \
                                        .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
sales_revenue_per_promo_2 = cleaned_data.map(lambda x: (x['promo_type_2'], (x['sales'], x['revenue']))) \
                                        .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

5 - Stock Analysis/Product

This aggregation calculates the total stock for each product.

  • Each record is mapped to a key-value pair with the product ID as the key and the stock as the value.
  • Then, reduceByKey is used to aggregate the stock values for each product.
# Aggregation 5: Stock Analysis per Product
stock_per_product = cleaned_data.map(lambda x: (x['product_id'], x['stock'])) \
                                .reduceByKey(lambda a, b: a + b)

Save to HDFS


Save the results of each aggregation to HDFS using saveAsTextFile

# Save results to HDFS
sales_revenue_per_product.saveAsTextFile("sales_revenue_per_product")
sales_revenue_per_store.saveAsTextFile("sales_revenue_per_store")
average_price_per_product.saveAsTextFile("average_price_per_product")
sales_revenue_per_promo_1.saveAsTextFile("sales_revenue_per_promo_1")
sales_revenue_per_promo_2.saveAsTextFile("sales_revenue_per_promo_2")
stock_per_product.saveAsTextFile("stock_per_product")

Cleanup


# Stop the SparkContext
sc.stop()

Jupyter notebook can be found at spark/spark_etl_retail_store.ipynb