pip install pyspark
pip install findspark
pip install pandas
# import findspark to findspark and initialize
import findspark
findspark.init()
# import other libraries
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
DataFrame from CSV
Objectives
A DataFrame is two-dimensional. Columns can be of different data types. DataFrames accept many data inputs including series and other DataFrames. You can pass indexes (row labels) and columns (column labels). Indexes can be numbers, dates, or strings/tuples.
In this example we will:
- Load a data file into a DataFrame
- View the data schema of a DataFrame
- Perform basic data manipulation
- Aggregate data in a DataFrame
Setup
If this is the first time you use Spark
Start Spark Session
Create Session
# Creating a spark session
= SparkSession \
spark \
.builder "Python Spark DataFrames basic example") \
.appName("spark.some.config.option", "some-value") \
.config(
.getOrCreate()
# OUTPUT
-hadoop library for your platform... using builtin-java classes where applicable
xxxxx WARN util.NativeCodeLoader: Unable to load native"WARN".
Setting default log level to To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Initialize Session
spark
# OUTPUT
- in-memory
SparkSession
SparkContext
Spark UI.4.3Masterlocal[*]AppNamePython Spark DataFrames basic example Versionv2
Load CSV into Pandas
- We will use the mtcars.csv file to load the data into a pandas DataFrame
- Here is a look at the structure of the data
colIndex | colName | units/description |
---|---|---|
[, 1] | mpg | Miles per gallon |
[, 2] | cyl | Number of cylinders |
[, 3] | disp | Displacement (cu.in.) |
[, 4] | hp | Gross horsepower |
[, 5] | drat | Rear axle ratio |
[, 6] | wt | Weight (lb/1000) |
[, 7] | qsec | 1/4 mile time |
[, 8] | vs | V/S |
[, 9] | am | Transmission (0 = automatic, 1 = manual) |
[,10] | gear | Number of forward gears |
[,11] | carb | Number of carburetors |
# Read the file using `read_csv` function in pandas
= pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/mtcars.csv') mtcars
Preview data
# Preview a few records
mtcars.head()
|
|
mpg |
cyl |
disp |
hp |
drat |
wt |
qsec |
vs |
am |
gear |
carb |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 |
Mazda RX4 |
21.0 |
6 |
160.0 |
110 |
3.90 |
2.620 |
16.46 |
0 |
1 |
4 |
4 |
1 |
Mazda RX4 Wag |
21.0 |
6 |
160.0 |
110 |
3.90 |
2.875 |
17.02 |
0 |
1 |
4 |
4 |
2 |
Datsun 710 |
22.8 |
4 |
108.0 |
93 |
3.85 |
2.320 |
18.61 |
1 |
1 |
4 |
1 |
3 |
Hornet 4 Drive |
21.4 |
6 |
258.0 |
110 |
3.08 |
3.215 |
19.44 |
1 |
0 |
3 |
1 |
4 |
Hornet Sportabout |
18.7 |
8 |
360.0 |
175 |
3.15 |
3.440 |
17.02 |
0 |
0 |
3 |
2 |
Load Pandas to Spark DF
# We use the `createDataFrame` function to load the data into a spark dataframe
= spark.createDataFrame(mtcars) sdf
View Schema
sdf.printSchema()
# OUTPUT
root|-- Unnamed: 0: string (nullable = true)
|-- mpg: double (nullable = true)
|-- cyl: long (nullable = true)
|-- disp: double (nullable = true)
|-- hp: long (nullable = true)
|-- drat: double (nullable = true)
|-- wt: double (nullable = true)
|-- qsec: double (nullable = true)
|-- vs: long (nullable = true)
|-- am: long (nullable = true)
|-- gear: long (nullable = true)
|-- carb: long (nullable = true)
Transform & Analyze
Preview Data
5)
sdf.show(
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
| Unnamed: 0| mpg|cyl| disp| hp|drat| wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
| Mazda RX4|21.0| 6|160.0|110| 3.9| 2.62|16.46| 0| 1| 4| 4|
| Mazda RX4 Wag|21.0| 6|160.0|110| 3.9|2.875|17.02| 0| 1| 4| 4|
| Datsun 710|22.8| 4|108.0| 93|3.85| 2.32|18.61| 1| 1| 4| 1|
| Hornet 4 Drive|21.4| 6|258.0|110|3.08|3.215|19.44| 1| 0| 3| 1|
|Hornet Sportabout|18.7| 8|360.0|175|3.15| 3.44|17.02| 0| 0| 3| 2|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
5 rows only showing top
Filter View
'mpg').show(5)
sdf.select(
+----+
| mpg|
+----+
|21.0|
|21.0|
|22.8|
|21.4|
|18.7|
+----+
5 rows only showing top
Columnar Operations
- We first filter to only retain rows with mpg > 18. We use the
filter()
function for this.
filter(sdf['mpg'] < 18).show(5)
sdf.+-----------+----+---+-----+---+----+----+-----+---+---+----+----+
| Unnamed: 0| mpg|cyl| disp| hp|drat| wt| qsec| vs| am|gear|carb|
+-----------+----+---+-----+---+----+----+-----+---+---+----+----+
| Duster 360|14.3| 8|360.0|245|3.21|3.57|15.84| 0| 0| 3| 4|
| Merc 280C|17.8| 6|167.6|123|3.92|3.44| 18.9| 1| 0| 4| 4|
| Merc 450SE|16.4| 8|275.8|180|3.07|4.07| 17.4| 0| 0| 3| 3|
| Merc 450SL|17.3| 8|275.8|180|3.07|3.73| 17.6| 0| 0| 3| 3|
|Merc 450SLC|15.2| 8|275.8|180|3.07|3.78| 18.0| 0| 0| 3| 3|
+-----------+----+---+-----+---+----+----+-----+---+---+----+----+
5 rows only showing top
Convert Column Data
- Use of basic arithmetic functions to convert the weight values from
lb
tometric ton
. We create a new column calledwtTon
that has the weight from thewt
column converted to metric tons.
'wtTon', sdf['wt'] * 0.45).show(5)
sdf.withColumn(
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+-------+
| Unnamed: 0| mpg|cyl| disp| hp|drat| wt| qsec| vs| am|gear|carb| wtTon|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+-------+
| Mazda RX4|21.0| 6|160.0|110| 3.9| 2.62|16.46| 0| 1| 4| 4| 1.179|
| Mazda RX4 Wag|21.0| 6|160.0|110| 3.9|2.875|17.02| 0| 1| 4| 4|1.29375|
| Datsun 710|22.8| 4|108.0| 93|3.85| 2.32|18.61| 1| 1| 4| 1| 1.044|
| Hornet 4 Drive|21.4| 6|258.0|110|3.08|3.215|19.44| 1| 0| 3| 1|1.44675|
|Hornet Sportabout|18.7| 8|360.0|175|3.15| 3.44|17.02| 0| 0| 3| 2| 1.548|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+-------+
5 rows only showing top
Rename Column
- Change vs to versus
- Note we created a new SparkDataFrame named: sdf_new
= sdf.withColumnRenamed("vs", "versus") sdf_new
Conditional Filter
- The function “where()” filters the Dataframe rows based on the given condition. It returns a new DataFrame containing the rows that satisfy the given condition.
'mpg'] < 18).show(3)
sdf.where(sdf[+----------+----+---+-----+---+----+----+-----+---+---+----+----+
|Unnamed: 0| mpg|cyl| disp| hp|drat| wt| qsec| vs| am|gear|carb|
+----------+----+---+-----+---+----+----+-----+---+---+----+----+
|Duster 360|14.3| 8|360.0|245|3.21|3.57|15.84| 0| 0| 3| 4|
| Merc 280C|17.8| 6|167.6|123|3.92|3.44| 18.9| 1| 0| 4| 4|
|Merc 450SE|16.4| 8|275.8|180|3.07|4.07| 17.4| 0| 0| 3| 3|
+----------+----+---+-----+---+----+----+-----+---+---+----+----+
3 rows only showing top
- Look at the corrected column name df as well
'mpg'] < 18).show(3)
sdf_new.where(sdf[+----------+----+---+-----+---+----+----+-----+------+---+----+----+
|Unnamed: 0| mpg|cyl| disp| hp|drat| wt| qsec|versus| am|gear|carb|
+----------+----+---+-----+---+----+----+-----+------+---+----+----+
|Duster 360|14.3| 8|360.0|245|3.21|3.57|15.84| 0| 0| 3| 4|
| Merc 280C|17.8| 6|167.6|123|3.92|3.44| 18.9| 1| 0| 4| 4|
|Merc 450SE|16.4| 8|275.8|180|3.07|4.07| 17.4| 0| 0| 3| 3|
+----------+----+---+-----+---+----+----+-----+------+---+----+----+
3 rows only showing top
Join
- Let’s combine two DFs based on specific conditions
Create DF1
# define sample DataFrame 1
= [("A101", "John"), ("A102", "Peter"), ("A103", "Charlie")]
data = ["emp_id", "emp_name"]
columns = spark.createDataFrame(data, columns)
dataframe_1
+------+--------+
|emp_id|emp_name|
+------+--------+
| A101| John|
| A102| Peter|
| A103| Charlie|
+------+--------+
Create DF2
# define sample DataFrame 2
= [("A101", 1000), ("A102", 2000), ("A103", 3000)]
data = ["emp_id", "salary"]
columns = spark.createDataFrame(data, columns)
dataframe_2
+------+------+
|emp_id|salary|
+------+------+
| A101| 1000|
| A102| 2000|
| A103| 3000|
+------+------+
Join
# create a new DataFrame, "combined_df" by performing an inner join
= dataframe_1.join(dataframe_2, on="emp_id", how="inner")
combined_df
combined_df.show()+------+--------+------+
|emp_id|emp_name|salary|
+------+--------+------+
| A103| Charlie| 3000|
| A102| Peter| 2000|
| A101| John| 1000|
+------+--------+------+
Fill NAs
- We can fill any missing values with “fillna()” or “fill()” function fill the missing values with a specified value.
- Create df first
# define sample DataFrame 1
= [("A101", 1000), ("A102", 2000), ("A103",None)]
data = ["emp_id", "salary"]
columns = spark.createDataFrame(data, columns)
dataframe_1
dataframe_1.show()
+------+------+
|emp_id|salary|
+------+------+
| A101| 1000|
| A102| 2000|
| A103| null|
+------+------+
Fillna
= dataframe_1.fillna({"salary": 3000})
filled_df 3)
filled_df.head(
='A101', salary=1000),
[Row(emp_id='A102', salary=2000),
Row(emp_id='A103', salary=3000)]
Row(emp_id
# OR
filled_df.show()
+------+------+
|emp_id|salary|
+------+------+
| A101| 1000|
| A102| 2000|
| A103| 3000|
+------+------+
Group & Aggregate
Average
Spark DataFrames support a number of commonly used functions to aggregate data after grouping.
- In this example we compute the average weight of cars groupted by their cylinders as shown below.
'cyl'])\
sdf.groupby(["wt": "AVG"})\
.agg({5)
.show(
+---+-----------------+
|cyl| avg(wt)|
+---+-----------------+
| 6|3.117142857142857|
| 8|3.999214285714286|
| 4|2.285727272727273|
+---+-----------------+
Sort Aggregation
- Sort by Descending order to highlight the most popular cars
= sdf.groupby(['cyl'])\
car_counts "wt": "count"})\
.agg({"count(wt)", ascending=False)\
.sort(5)
.show(
+---+---------+
|cyl|count(wt)|
+---+---------+
| 8| 14|
| 4| 11|
| 6| 7|
+---+---------+