π JSON and XML Parsing in PySpark
Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SEMI-STRUCTURED DATA PROCESSING PIPELINE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Raw JSON/ ββββββΆβ Schema ββββββΆβ Flattening β β
β β XML Strings β β Inference β β & Extractionβ β
β ββββββββββββββββ ββββββββ¬ββββββββ ββββββββ¬ββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β Schema Options β β Extraction β β
β β βββββββββββββ β β Methods β β
β β PERMISSIVE β β βββββββββββββ β β
β β DROPMALFORMED β β from_json() β β
β β FAILFAST β β get_json_object β β
β ββββββββββ¬ββββββββββ β explode() β β
β β ββββββββββ¬ββββββββββ β
β βΌ β β
β ββββββββββββββββββββ β β
β β Type Mapping β β β
β β βββββββββββββ β β β
β β JSONβSpark Type β β β
β β XMLβSpark Type β β β
β β Null Handling β β β
β ββββββββββ¬ββββββββββ β β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β Schema β β Query β β
β β Evolution β β Optimization β β
β β βββββββββββββ β β βββββββββββββ β β
β β mergeSchema β β Predicate β β
β β add columns β β Pushdown β β
β β type widening β β Partition β β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β JSON STRUCTURE β SPARK SCHEMA MAPPING β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β JSON Input: β
β { β
β "order_id": "ORD-001", β
β "customer": { β
β "id": "CUST-001", β
β "name": "Alice", β
β "address": { β
β "city": "New York", β
β "state": "NY" β
β } β
β }, β
β "items": [ β
β {"sku": "A1", "qty": 5, "price": 29.99}, β
β {"sku": "B2", "qty": 3, "price": 49.99} β
β ], β
β "metadata": {"source": "web", "campaign": "summer"} β
β } β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β StructType( β β
β β StructField("order_id", StringType), β β
β β StructField("customer", StructType( β β
β β StructField("id", StringType), β β
β β StructField("name", StringType), β β
β β StructField("address", StructType( β β
β β StructField("city", StringType), β β
β β StructField("state", StringType) β β
β β )) β β
β β )), β β
β β StructField("items", ArrayType( β β
β β StructType( β β
β β StructField("sku", StringType), β β
β β StructField("qty", IntegerType), β β
β β StructField("price", DoubleType) β β
β β ) β β
β β )), β β
β β StructField("metadata", MapType( β β
β β StringType, StringType β β
β β )) β β
β β ) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Flattened Output: β
β ββββββββββββ¬βββββββββββ¬βββββββββββ¬βββββββ¬βββββββββ¬βββββββ¬βββββββββ β
β β order_id β cust_id β cust_nameβ city β sku β qty β price β β
β ββββββββββββΌβββββββββββΌβββββββββββΌβββββββΌβββββββββΌβββββββΌβββββββββ€ β
β β ORD-001 β CUST-001 β Alice β NY β A1 β 5 β 29.99 β β
β β ORD-001 β CUST-001 β Alice β NY β B2 β 3 β 49.99 β β
β ββββββββββββ΄βββββββββββ΄βββββββββββ΄βββββββ΄βββββββββ΄βββββββ΄βββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β XML PROCESSING ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β XML Input: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β <orders> β β
β β <order id="ORD-001"> β β
β β <customer id="CUST-001"> β β
β β <name>Alice</name> β β
β β <address> β β
β β <city>New York</city> β β
β β <state>NY</state> β β
β β </address> β β
β β </customer> β β
β β <items> β β
β β <item sku="A1" qty="5" price="29.99"/> β β
β β <item sku="B2" qty="3" price="49.99"/> β β
β β </items> β β
β β </order> β β
β β </orders> β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Processing Pipeline: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Step 1: Parse XML β Row with nested elements β β
β β Step 2: Extract attributes vs child elements β β
β β Step 3: Flatten nested structure β β
β β Step 4: Convert to typed DataFrame β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Attribute vs Element Handling: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β <order id="ORD-001"> β @id attribute β β
β β <name>Alice</name> β element content β β
β β <item sku="A1"/> β @sku attribute β β
β β </order> β β
β β β β
β β Attributes: Predefined, fixed set, no nesting β β
β β Elements: Variable, extensible, can nest deeply β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Detailed Explanation
JSON and XML are the predominant semi-structured data formats in modern data systems. PySpark provides robust support for parsing both formats, but the complexity lies in handling deeply nested structures, variable schemas, and performance optimization for large-scale processing. The fundamental challenge is converting unstructured text into typed, queryable DataFrames while preserving the semantic meaning of the original structure.
Schema inference in PySpark uses a sampling-based approach to determine the data types of JSON fields. By default, Spark samples a fraction of the data and infers the schema from the sample. This can lead to incorrect schemas if the sample is not representativeβfor example, if numeric values are stored as strings in some records but not others. The mode option controls how Spark handles malformed records: PERMISSIVE (default) sets corrupt records to null, DROPMALFORMED silently drops corrupt records, and FAILFAST throws exceptions on corrupt records.
The from_json function requires an explicit schema, which is the recommended approach for production workloads. Schema inference should only be used for exploration, not for production pipelines. The schema definition uses StructType and StructField to describe the structure, with support for nested StructType, ArrayType, and MapType. Defining schemas explicitly also enables predicate pushdown and column pruning, which significantly improves query performance.
Nested data flattening is the most common transformation for JSON/XML data. The explode function transforms array elements into separate rows, while select with dot notation (e.g., col("customer.name")) extracts nested fields. For deeply nested structures, multiple levels of explode and select operations may be required, but each explode operation increases the row count, which can lead to data explosion if not carefully managed.
XML processing in PySpark requires the spark-xml library, which provides a XML data source with similar capabilities to the JSON data source. XML has additional complexities due to attributes (key-value pairs in tags) vs child elements (nested tags), mixed content (text interspersed with elements), and namespace handling. The rowTag option specifies the element that defines each row, while attributePrefix distinguishes attributes from child elements.
Schema evolution is critical for semi-structured data sources that change over time. The mergeSchema option enables Spark to automatically merge schemas from different files or batches. When mergeSchema is enabled, Spark reads the schema from all files and creates a superset schema, filling in null values for missing fields. This works well for additive changes (new columns) but can fail for destructive changes (column removal or type changes).
Mathematical Foundations
Definition: Schema Evolution
Schema evolution transforms a record from schema to via a migration function , where:
Parsing Complexity
For JSON document of depth and branching factor , parsing complexity is:
Streaming parsers reduce space from to stack depth.
Schema Correctness Theorem
A schema is valid for document if and only if every value at path in matches the type constraint :
Inference from sample overestimates types: .
Columnar Conversion Cost
Converting nested JSON to flat columns with levels and leaf fields:
where is the number of records.
Compression Ratio
After schema enforcement, columnar storage compression improves by:
Typical improvement: 3-10x for numeric columns.
Key Insight
from_json with explicit schema is 2-5x faster than schema_of_json inference because it avoids a sampling pass. For nested data, use schema_of_json once to generate the schema, then hardcode it.
Summary
JSON/XML parsing on Spark benefits from streaming parsers for memory efficiency, explicit schemas for performance, and type enforcement for compression. Schema evolution requires careful compatibility analysis. Nested-to-flat conversion cost scales linearly with record count and nesting depth.
Key Concepts Table
| Concept | Description | Configuration |
|---|---|---|
| Schema Inference | Automatically determine data types from sample | inferSchema=true (default for JSON) |
| Explicit Schema | Define schema programmatically | schema=my_schema |
| PERMISSIVE Mode | Set corrupt records to null | mode=PERMISSIVE |
| DROPMALFORMED Mode | Silently drop corrupt records | mode=DROPMALFORMED |
| FAILFAST Mode | Throw exception on corrupt records | mode=FAILFAST |
| from_json | Parse JSON string column to struct | from_json(col, schema) |
| get_json_object | Extract value using JSON path | get_json_object(col, path) |
| explode | Transform array to multiple rows | explode(col("array_col")) |
| mergeSchema | Merge schemas across files/batches | mergeSchema=true |
| columnNameOfCorruptRecord | Store malformed records in column | columnNameOfCorruptRecord="_corrupt" |
Code Examples
JSON Parsing with Explicit Schema
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
spark = SparkSession.builder \
.appName("JSONXMLParsing") \
.getOrCreate()
# Define explicit schema for JSON data
order_schema = StructType([
StructField("order_id", StringType(), False),
StructField("timestamp", StringType(), True),
StructField("customer", StructType([
StructField("id", StringType(), False),
StructField("name", StringType(), True),
StructField("email", StringType(), True),
StructField("address", StructType([
StructField("street", StringType(), True),
StructField("city", StringType(), True),
StructField("state", StringType(), True),
StructField("zip", StringType(), True),
]), True),
]), False),
StructField("items", ArrayType(StructType([
StructField("sku", StringType(), False),
StructField("name", StringType(), True),
StructField("quantity", IntegerType(), True),
StructField("price", DoubleType(), True),
StructField("attributes", MapType(StringType(), StringType()), True),
])), True),
StructField("payment", StructType([
StructField("method", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("currency", StringType(), True),
]), True),
StructField("metadata", MapType(StringType(), StringType()), True),
])
# Read JSON with explicit schema
json_data = [
('{"order_id":"ORD-001","timestamp":"2024-01-15T10:30:00Z","customer":{"id":"CUST-001","name":"Alice","email":"alice@email.com","address":{"street":"123 Main St","city":"New York","state":"NY","zip":"10001"}},"items":[{"sku":"A1","name":"Widget","quantity":5,"price":29.99,"attributes":{"color":"blue","size":"M"}},{"sku":"B2","name":"Gadget","quantity":3,"price":49.99,"attributes":{"color":"red","size":"L"}}],"payment":{"method":"credit","amount":249.93,"currency":"USD"},"metadata":{"source":"web","campaign":"summer"}}',),
('{"order_id":"ORD-002","timestamp":"2024-01-15T11:00:00Z","customer":{"id":"CUST-002","name":"Bob","email":"bob@email.com","address":{"street":"456 Oak Ave","city":"Boston","state":"MA","zip":"02101"}},"items":[{"sku":"C3","name":"Gizmo","quantity":2,"price":99.99,"attributes":{"color":"black"}}],"payment":{"method":"debit","amount":199.98,"currency":"USD"},"metadata":{"source":"mobile","campaign":"winter"}}',),
]
df = spark.read \
.option("mode", "PERMISSIVE") \
.option("columnNameOfCorruptRecord", "_corrupt_record") \
.schema(order_schema) \
.json(spark.sparkContext.parallelize([row[0] for row in json_data]))
# Flatten nested JSON structure
flattened_df = df \
.select(
col("order_id"),
col("timestamp"),
col("customer.id").alias("customer_id"),
col("customer.name").alias("customer_name"),
col("customer.email").alias("customer_email"),
col("customer.address.city").alias("customer_city"),
col("customer.address.state").alias("customer_state"),
col("payment.method").alias("payment_method"),
col("payment.amount").alias("payment_amount"),
col("metadata").alias("order_metadata"),
)
flattened_df.show(truncate=False)
# Explode items array
items_df = df \
.select(
col("order_id"),
col("customer.id").alias("customer_id"),
explode(col("items")).alias("item")
) \
.select(
col("order_id"),
col("customer_id"),
col("item.sku"),
col("item.name"),
col("item.quantity"),
col("item.price"),
(col("item.quantity") * col("item.price")).alias("line_total"),
)
items_df.show(truncate=False)
Advanced JSON Operations
# Using from_json for string columns
json_strings_df = spark.createDataFrame([
('{"a": 1, "b": "hello"}',),
('{"a": 2, "b": "world", "c": 3.14}',),
], ["json_string"])
schema = StructType([
StructField("a", IntegerType()),
StructField("b", StringType()),
StructField("c", DoubleType()),
])
parsed_df = json_strings_df \
.withColumn("parsed", from_json(col("json_string"), schema)) \
.select(
col("parsed.a").alias("a"),
col("parsed.b").alias("b"),
col("parsed.c").alias("c"),
)
parsed_df.show(truncate=False)
# Using get_json_object for JSON path queries
paths_df = df \
.withColumn("customer_city",
get_json_object(col("_corrupt_record") if "_corrupt_record" in df.columns
else to_json(struct("*")), "$.customer.address.city")
)
# Handle nested arrays with posexplode
items_with_position = df \
.select(
col("order_id"),
posexplode(col("items")).alias("position", "item")
) \
.select(
col("order_id"),
col("position"),
col("item.sku"),
col("item.quantity"),
)
items_with_position.show(truncate=False)
# Aggregate nested data
order_summary = df \
.withColumn("item_count", size(col("items"))) \
.withColumn("total_item_value",
expr("aggregate(items, 0D, (acc, item) -> acc + item.quantity * item.price)")
) \
.withColumn("unique_skus",
size(array_distinct(expr("transform(items, x -> x.sku)")))
) \
.select(
col("order_id"),
col("customer.name"),
col("item_count"),
col("total_item_value"),
col("unique_skus"),
)
order_summary.show(truncate=False)
XML Parsing
# Install spark-xml: spark.jars.packages=com.databricks:spark-xml_2.12:0.17.0
# Create sample XML data
xml_data = """<?xml version="1.0" encoding="UTF-8"?>
<catalog>
<book id="bk101">
<author>Gambardella, Matthew</author>
<title>XML Developer's Guide</title>
<genre>Computer</genre>
<price>44.95</price>
<publish_date>2000-10-01</publish_date>
<description>An in-depth look at creating applications with XML.</description>
</book>
<book id="bk102">
<author>Ralls, Kim</author>
<title>Midnight Rain</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-12-16</publish_date>
<description>A former architect battles corporate zombies.</description>
</book>
</catalog>"""
# Write XML to file for reading
xml_path = "/tmp/catalog.xml"
spark.sparkContext.parallelize([xml_data]).saveAsTextFile(xml_path)
# Read XML
xml_df = spark.read \
.option("rowTag", "book") \
.option("attributePrefix", "attr_") \
.option("valueTag", "_value") \
.xml(xml_path)
xml_df.show(truncate=False)
# Parse XML from string column
xml_strings = spark.createDataFrame([
('<person><name>Alice</name><age>30</age></person>',),
('<person><name>Bob</name><age>25</age></person>',),
], ["xml_string"])
# Using from_xml (requires spark-xml)
from pyspark.sql.types import schema_of_json
person_schema = StructType([
StructField("name", StringType()),
StructField("age", IntegerType()),
])
xml_parsed = xml_strings \
.withColumn("parsed", from_xml(col("xml_string"), person_schema)) \
.select(
col("parsed.name").alias("name"),
col("parsed.age").alias("age"),
)
xml_parsed.show(truncate=False)
# Handle XML attributes
xml_with_attrs = """<order id="ORD-001" status="complete">
<customer id="CUST-001">Alice</customer>
<total currency="USD">249.95</total>
</order>"""
# Extract attributes using regex or UDF
@udf(returnType=MapType(StringType(), StringType()))
def extract_xml_attributes(xml_string):
"""Extract XML attributes from root element."""
import re
attrs = {}
matches = re.findall(r'(\w+)="([^"]*)"', xml_string)
for key, value in matches:
attrs[key] = value
return attrs
# Combine with element extraction
xml_df = spark.createDataFrame([(xml_with_attrs,)], ["xml"])
result = xml_df \
.withColumn("attributes", extract_xml_attributes(col("xml"))) \
.withColumn("order_id", col("attributes")["id"]) \
.withColumn("status", col("attributes")["status"])
result.show(truncate=False)
Performance Metrics
| Operation | 1K Records | 100K Records | 10M Records | 100M Records |
|---|---|---|---|---|
| JSON Read (schema inference) | < 1 sec | 2-5 sec | 30-60 sec | 5-10 min |
| JSON Read (explicit schema) | < 1 sec | 1-3 sec | 10-20 sec | 2-5 min |
| JSON String Parse (from_json) | < 1 sec | 1-2 sec | 8-15 sec | 1-3 min |
| JSON Path Extract (get_json_object) | < 1 sec | 2-5 sec | 20-40 sec | 3-8 min |
| Array Explode | < 1 sec | 1-3 sec | 10-20 sec | 2-5 min |
| Deep Nesting Extraction (3+ levels) | < 1 sec | 3-8 sec | 45-90 sec | 8-15 min |
| XML Read | < 1 sec | 3-8 sec | 45-90 sec | 8-15 min |
| Schema Merge (mergeSchema) | < 1 sec | 2-5 sec | 20-40 sec | 3-8 min |
| Malformed Record Handling | < 1 sec | 1-3 sec | 10-20 sec | 2-5 min |
| Nested Aggregation | < 1 sec | 2-5 sec | 20-40 sec | 3-8 min |
Best Practices
- Always define explicit schemas for production JSON/XML parsing to avoid schema inference overhead and ensure type consistency
- Use
PERMISSIVEmode withcolumnNameOfCorruptRecordto capture and investigate malformed records instead of losing data - Limit schema inference samples with
spark.sql.columnNameOfCorruptRecordandspark.sql.json.filterNullRecordsfor large datasets - Flatten incrementallyβextract top-level fields first, then explode arrays, to maintain data lineage and avoid row explosion
- Cache parsed DataFrames when performing multiple operations on the same semi-structured data to avoid re-parsing
- Use
get_json_objectfor targeted extraction of specific fields instead of parsing the entire JSON structure - Enable
mergeSchemacautiouslyβonly use when schema evolution is expected and test with a small sample first - Partition XML/JSON files by a logical key (date, region) to enable predicate pushdown and reduce scan volume
- Avoid
explodeon large arrays without filteringβusefilterbeforeexplodeto reduce the number of generated rows - Use
from_jsoninstead ofget_json_objectwhen extracting multiple fields from the same JSON to avoid repeated parsing - Implement custom UDFs for complex XML parsing when spark-xml doesn't support your specific XML structure
- Monitor corrupt record counts in productionβhigh corruption rates indicate upstream data quality issues that should be addressed at the source
See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)