Batch Data Ingestion

Here are some batch data ingestion methods in Spark.

File Based Ingestion


File-based ingestion refers to the process of importing data from various file formats into Apache Spark.

CSV

df_csv = spark.read.csv("file.csv", header=True)
df_csv.show()

JSON

df_json = spark.read.json("file.json")
df_json.show()

Parquet

df_parquet = spark.read.parquet("file.parquet")
df_parquet.show()

cXML

df_xml = spark.read.format("com.databricks.spark.xml").option("rowTag", "record").load("file.xml")
df_xml.show()

Replication & ETL


You can leverage database replication and ETL processes to facilitate continuous data ingestion and transformation.

Database Replication

# Assuming df_db is the DataFrame from the replicated database
df_db.write.mode("append").saveAsTable("database_table")

ETL Process

# Assuming df_raw is the raw data DataFrame
df_transformed = df_raw.select("col1", "col2").withColumn("new_col", expr("col1 + col2"))
df_transformed.write.mode("append").saveAsTable("transformed_table")

Import from API


Integrating external APIs seamlessly allows you to efficiently retrieve and ingest data.

HTTP API

import requests
response = requests.get("https://api.example.com/data")
json_data = response.json()
df_api = spark.read.json(spark.sparkContext.parallelize([json_data]))
df_api.show()

Data Transfer Protocols


Data transfer protocols like File Transfer Protocol (FTP), Secure File Transfer Protocol (SFTP), and Hypertext Transfer Protocol (HTTP) are essential for efficient and secure data transfer.

FTP Ingestion

spark.read.format("com.springml.spark.sftp").option("host", "ftp.example.com").load("/path/to/file.csv")

HTTP Ingestion

spark.read.text("http://example.com/data.txt")

Source Assessment


You must evaluate data source characteristics and quality, as they are fundamental for effective data integration.

Characteristics Assessment

df_source.describe().show()

Quality Assessment

df_source.selectExpr("count(distinct *) as unique_records").show()

Schema Mapping


Mapping and transforming data schemas can be a challenge for you, but at the same time, they are essential for fitting data into the target model.

Schema Mapping

df_mapped = df_raw.selectExpr("col1 as new_col1", "col2 as new_col2")

Schema Transformation

df_transformed = df_mapped.withColumn("new_col3", expr("new_col1 + new_col2"))

Data Validation


You must ensure high data quality during ingestion, as it is critical for downstream processes.

Data Validation

df_validated = df_raw.filter("col1 IS NOT NULL AND col2 > 0")

Data Cleansing

df_cleansed = df_raw.na.fill(0, ["col1"])

Data deduplication


You must prevent duplicate records during ingestion, as it is vital to maintain data integrity downstream.

Remove Duplicates

df_deduplicated = df_raw.dropDuplicates(["col1", "col2"])

Unstructured Data


Unstructured data includes documents, images, logs, and more. Techniques for ingesting and extracting insights are crucial.

NLP

# Using Spark NLP library for text data processing
df_text = spark.read.text("text_file.txt")

Governance & Compliance


You should ensure data governance practices and compliance with regulatory requirements and data privacy laws, as these are essential for responsible data handling.

Compliance Checks

# Example: Ensure GDPR compliance
df_gdpr_compliant = df_raw.filter("country IN ('EU')")