DataFrame from CSV

Objectives


A DataFrame is two-dimensional. Columns can be of different data types. DataFrames accept many data inputs including series and other DataFrames. You can pass indexes (row labels) and columns (column labels). Indexes can be numbers, dates, or strings/tuples.

In this example we will:

  • Load a data file into a DataFrame
  • View the data schema of a DataFrame
  • Perform basic data manipulation
  • Aggregate data in a DataFrame

Setup


If this is the first time you use Spark

pip install pyspark
pip install findspark
pip install pandas

# import findspark to findspark and initialize
import findspark
findspark.init()

# import other libraries
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

Start Spark Session


Create Session

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
    
# OUTPUT
xxxxx WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Initialize Session

spark

# OUTPUT
SparkSession - in-memory
SparkContext
Spark UI
Versionv2.4.3Masterlocal[*]AppNamePython Spark DataFrames basic example

Load CSV into Pandas


  • We will use the mtcars.csv file to load the data into a pandas DataFrame
  • Here is a look at the structure of the data
colIndex colName units/description
[, 1] mpg Miles per gallon
[, 2] cyl Number of cylinders
[, 3] disp Displacement (cu.in.)
[, 4] hp Gross horsepower
[, 5] drat Rear axle ratio
[, 6] wt Weight (lb/1000)
[, 7] qsec 1/4 mile time
[, 8] vs V/S
[, 9] am Transmission (0 = automatic, 1 = manual)
[,10] gear Number of forward gears
[,11] carb Number of carburetors
# Read the file using `read_csv` function in pandas
mtcars = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/mtcars.csv')

Preview data

# Preview a few records
mtcars.head()


Unnamed: 0

mpg

cyl

disp

hp

drat

wt

qsec

vs

am

gear

carb

0

Mazda RX4

21.0

6

160.0

110

3.90

2.620

16.46

0

1

4

4

1

Mazda RX4 Wag

21.0

6

160.0

110

3.90

2.875

17.02

0

1

4

4

2

Datsun 710

22.8

4

108.0

93

3.85

2.320

18.61

1

1

4

1

3

Hornet 4 Drive

21.4

6

258.0

110

3.08

3.215

19.44

1

0

3

1

4

Hornet Sportabout

18.7

8

360.0

175

3.15

3.440

17.02

0

0

3

2

Load Pandas to Spark DF


# We use the `createDataFrame` function to load the data into a spark dataframe
sdf = spark.createDataFrame(mtcars)

View Schema

sdf.printSchema()

# OUTPUT
root
 |-- Unnamed: 0: string (nullable = true)
 |-- mpg: double (nullable = true)
 |-- cyl: long (nullable = true)
 |-- disp: double (nullable = true)
 |-- hp: long (nullable = true)
 |-- drat: double (nullable = true)
 |-- wt: double (nullable = true)
 |-- qsec: double (nullable = true)
 |-- vs: long (nullable = true)
 |-- am: long (nullable = true)
 |-- gear: long (nullable = true)
 |-- carb: long (nullable = true)

Transform & Analyze


Preview Data

sdf.show(5)

+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|       Unnamed: 0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows

Filter View

sdf.select('mpg').show(5)

+----+
| mpg|
+----+
|21.0|
|21.0|
|22.8|
|21.4|
|18.7|
+----+
only showing top 5 rows

Columnar Operations

  • We first filter to only retain rows with mpg > 18. We use the filter() function for this.
sdf.filter(sdf['mpg'] < 18).show(5)
+-----------+----+---+-----+---+----+----+-----+---+---+----+----+
| Unnamed: 0| mpg|cyl| disp| hp|drat|  wt| qsec| vs| am|gear|carb|
+-----------+----+---+-----+---+----+----+-----+---+---+----+----+
| Duster 360|14.3|  8|360.0|245|3.21|3.57|15.84|  0|  0|   3|   4|
|  Merc 280C|17.8|  6|167.6|123|3.92|3.44| 18.9|  1|  0|   4|   4|
| Merc 450SE|16.4|  8|275.8|180|3.07|4.07| 17.4|  0|  0|   3|   3|
| Merc 450SL|17.3|  8|275.8|180|3.07|3.73| 17.6|  0|  0|   3|   3|
|Merc 450SLC|15.2|  8|275.8|180|3.07|3.78| 18.0|  0|  0|   3|   3|
+-----------+----+---+-----+---+----+----+-----+---+---+----+----+
only showing top 5 rows

Convert Column Data

  • Use of basic arithmetic functions to convert the weight values from lb to metric ton. We create a new column called wtTon that has the weight from the wt column converted to metric tons.
sdf.withColumn('wtTon', sdf['wt'] * 0.45).show(5)

+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+-------+
|       Unnamed: 0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|  wtTon|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+-------+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|  1.179|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|1.29375|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|  1.044|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|1.44675|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|  1.548|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+-------+
only showing top 5 rows

Rename Column

  • Change vs to versus
  • Note we created a new SparkDataFrame named: sdf_new
sdf_new = sdf.withColumnRenamed("vs", "versus")

Conditional Filter

  • The function “where()” filters the Dataframe rows based on the given condition. It returns a new DataFrame containing the rows that satisfy the given condition.
sdf.where(sdf['mpg'] < 18).show(3) 
+----------+----+---+-----+---+----+----+-----+---+---+----+----+
|Unnamed: 0| mpg|cyl| disp| hp|drat|  wt| qsec| vs| am|gear|carb|
+----------+----+---+-----+---+----+----+-----+---+---+----+----+
|Duster 360|14.3|  8|360.0|245|3.21|3.57|15.84|  0|  0|   3|   4|
| Merc 280C|17.8|  6|167.6|123|3.92|3.44| 18.9|  1|  0|   4|   4|
|Merc 450SE|16.4|  8|275.8|180|3.07|4.07| 17.4|  0|  0|   3|   3|
+----------+----+---+-----+---+----+----+-----+---+---+----+----+
only showing top 3 rows
  • Look at the corrected column name df as well
sdf_new.where(sdf['mpg'] < 18).show(3)
+----------+----+---+-----+---+----+----+-----+------+---+----+----+
|Unnamed: 0| mpg|cyl| disp| hp|drat|  wt| qsec|versus| am|gear|carb|
+----------+----+---+-----+---+----+----+-----+------+---+----+----+
|Duster 360|14.3|  8|360.0|245|3.21|3.57|15.84|     0|  0|   3|   4|
| Merc 280C|17.8|  6|167.6|123|3.92|3.44| 18.9|     1|  0|   4|   4|
|Merc 450SE|16.4|  8|275.8|180|3.07|4.07| 17.4|     0|  0|   3|   3|
+----------+----+---+-----+---+----+----+-----+------+---+----+----+
only showing top 3 rows

Join

  • Let’s combine two DFs based on specific conditions

Create DF1

# define sample DataFrame 1 
data = [("A101", "John"), ("A102", "Peter"), ("A103", "Charlie")] 
columns = ["emp_id", "emp_name"] 
dataframe_1 = spark.createDataFrame(data, columns) 

+------+--------+
|emp_id|emp_name|
+------+--------+
|  A101|    John|
|  A102|   Peter|
|  A103| Charlie|
+------+--------+

Create DF2

# define sample DataFrame 2 
data = [("A101", 1000), ("A102", 2000), ("A103", 3000)]
columns = ["emp_id", "salary"]
dataframe_2 = spark.createDataFrame(data, columns)

+------+------+
|emp_id|salary|
+------+------+
|  A101|  1000|
|  A102|  2000|
|  A103|  3000|
+------+------+

Join

# create a new DataFrame, "combined_df" by performing an inner join
combined_df = dataframe_1.join(dataframe_2, on="emp_id", how="inner")

combined_df.show()
+------+--------+------+
|emp_id|emp_name|salary|
+------+--------+------+
|  A103| Charlie|  3000|
|  A102|   Peter|  2000|
|  A101|    John|  1000|
+------+--------+------+

Fill NAs


  • We can fill any missing values with “fillna()” or “fill()” function fill the missing values with a specified value.
  • Create df first
# define sample DataFrame 1
data = [("A101", 1000), ("A102", 2000), ("A103",None)]
columns = ["emp_id", "salary"]
dataframe_1 = spark.createDataFrame(data, columns)
dataframe_1.show()

+------+------+
|emp_id|salary|
+------+------+
|  A101|  1000|
|  A102|  2000|
|  A103|  null|
+------+------+

Fillna

filled_df = dataframe_1.fillna({"salary": 3000}) 
filled_df.head(3)

[Row(emp_id='A101', salary=1000),
 Row(emp_id='A102', salary=2000),
 Row(emp_id='A103', salary=3000)]
 
# OR
filled_df.show()

+------+------+
|emp_id|salary|
+------+------+
|  A101|  1000|
|  A102|  2000|
|  A103|  3000|
+------+------+

Group & Aggregate


Average

Spark DataFrames support a number of commonly used functions to aggregate data after grouping.

  • In this example we compute the average weight of cars groupted by their cylinders as shown below.
sdf.groupby(['cyl'])\
        .agg({"wt": "AVG"})\
        .show(5)
        
+---+-----------------+
|cyl|          avg(wt)|
+---+-----------------+
|  6|3.117142857142857|
|  8|3.999214285714286|
|  4|2.285727272727273|
+---+-----------------+

Sort Aggregation

  • Sort by Descending order to highlight the most popular cars
car_counts = sdf.groupby(['cyl'])\
                        .agg({"wt": "count"})\
                        .sort("count(wt)", ascending=False)\
                        .show(5)
                        
+---+---------+
|cyl|count(wt)|
+---+---------+
|  8|       14|
|  4|       11|
|  6|        7|
+---+---------+