JSON and XML Parsing in PySpark

Free Lesson

Advertisement

πŸ“„ JSON and XML Parsing in PySpark

Architecture Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              SEMI-STRUCTURED DATA PROCESSING PIPELINE                   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”‚
β”‚   β”‚  Raw JSON/   │────▢│  Schema      │────▢│  Flattening  β”‚          β”‚
β”‚   β”‚  XML Strings β”‚     β”‚  Inference   β”‚     β”‚  & Extractionβ”‚          β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜          β”‚
β”‚                               β”‚                     β”‚                   β”‚
β”‚                               β–Ό                     β–Ό                   β”‚
β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”‚
β”‚                    β”‚  Schema Options  β”‚   β”‚  Extraction      β”‚         β”‚
β”‚                    β”‚  ─────────────   β”‚   β”‚  Methods         β”‚         β”‚
β”‚                    β”‚  PERMISSIVE      β”‚   β”‚  ─────────────   β”‚         β”‚
β”‚                    β”‚  DROPMALFORMED   β”‚   β”‚  from_json()     β”‚         β”‚
β”‚                    β”‚  FAILFAST        β”‚   β”‚  get_json_object β”‚         β”‚
β”‚                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚  explode()       β”‚         β”‚
β”‚                             β”‚             β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β”‚
β”‚                             β–Ό                      β”‚                    β”‚
β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚                    β”‚
β”‚                    β”‚  Type Mapping    β”‚            β”‚                    β”‚
β”‚                    β”‚  ─────────────   β”‚            β”‚                    β”‚
│                    │  JSON→Spark Type │            │                    │
│                    │  XML→Spark Type  │            │                    │
β”‚                    β”‚  Null Handling   β”‚            β”‚                    β”‚
β”‚                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β”‚                    β”‚
β”‚                             β”‚                      β”‚                    β”‚
β”‚                             β–Ό                      β–Ό                    β”‚
β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”‚
β”‚                    β”‚  Schema          β”‚   β”‚  Query           β”‚         β”‚
β”‚                    β”‚  Evolution       β”‚   β”‚  Optimization    β”‚         β”‚
β”‚                    β”‚  ─────────────   β”‚   β”‚  ─────────────   β”‚         β”‚
β”‚                    β”‚  mergeSchema     β”‚   β”‚  Predicate       β”‚         β”‚
β”‚                    β”‚  add columns     β”‚   β”‚  Pushdown        β”‚         β”‚
β”‚                    β”‚  type widening   β”‚   β”‚  Partition       β”‚         β”‚
β”‚                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β”‚
β”‚                                                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                 JSON STRUCTURE β†’ SPARK SCHEMA MAPPING                   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚   JSON Input:                                                          β”‚
β”‚   {                                                                     β”‚
β”‚     "order_id": "ORD-001",                                            β”‚
β”‚     "customer": {                                                      β”‚
β”‚       "id": "CUST-001",                                               β”‚
β”‚       "name": "Alice",                                                β”‚
β”‚       "address": {                                                    β”‚
β”‚         "city": "New York",                                           β”‚
β”‚         "state": "NY"                                                 β”‚
β”‚       }                                                                β”‚
β”‚     },                                                                β”‚
β”‚     "items": [                                                        β”‚
β”‚       {"sku": "A1", "qty": 5, "price": 29.99},                       β”‚
β”‚       {"sku": "B2", "qty": 3, "price": 49.99}                        β”‚
β”‚     ],                                                                β”‚
β”‚     "metadata": {"source": "web", "campaign": "summer"}               β”‚
β”‚   }                                                                    β”‚
β”‚                                                                         β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚   β”‚  StructType(                                                β”‚     β”‚
β”‚   β”‚    StructField("order_id", StringType),                     β”‚     β”‚
β”‚   β”‚    StructField("customer", StructType(                      β”‚     β”‚
β”‚   β”‚      StructField("id", StringType),                         β”‚     β”‚
β”‚   β”‚      StructField("name", StringType),                       β”‚     β”‚
β”‚   β”‚      StructField("address", StructType(                     β”‚     β”‚
β”‚   β”‚        StructField("city", StringType),                     β”‚     β”‚
β”‚   β”‚        StructField("state", StringType)                     β”‚     β”‚
β”‚   β”‚      ))                                                     β”‚     β”‚
β”‚   β”‚    )),                                                      β”‚     β”‚
β”‚   β”‚    StructField("items", ArrayType(                          β”‚     β”‚
β”‚   β”‚      StructType(                                            β”‚     β”‚
β”‚   β”‚        StructField("sku", StringType),                      β”‚     β”‚
β”‚   β”‚        StructField("qty", IntegerType),                     β”‚     β”‚
β”‚   β”‚        StructField("price", DoubleType)                     β”‚     β”‚
β”‚   β”‚      )                                                      β”‚     β”‚
β”‚   β”‚    )),                                                      β”‚     β”‚
β”‚   β”‚    StructField("metadata", MapType(                         β”‚     β”‚
β”‚   β”‚      StringType, StringType                                 β”‚     β”‚
β”‚   β”‚    ))                                                       β”‚     β”‚
β”‚   β”‚  )                                                          β”‚     β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                                                         β”‚
β”‚   Flattened Output:                                                    β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚   β”‚ order_id β”‚ cust_id  β”‚ cust_nameβ”‚ city β”‚ sku    β”‚ qty  β”‚ price  β”‚ β”‚
β”‚   β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€ β”‚
β”‚   β”‚ ORD-001  β”‚ CUST-001 β”‚ Alice    β”‚ NY   β”‚ A1     β”‚ 5    β”‚ 29.99  β”‚ β”‚
β”‚   β”‚ ORD-001  β”‚ CUST-001 β”‚ Alice    β”‚ NY   β”‚ B2     β”‚ 3    β”‚ 49.99  β”‚ β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚                                                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                   XML PROCESSING ARCHITECTURE                           β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚   XML Input:                                                           β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚   β”‚  <orders>                                                    β”‚     β”‚
β”‚   β”‚    <order id="ORD-001">                                      β”‚     β”‚
β”‚   β”‚      <customer id="CUST-001">                                β”‚     β”‚
β”‚   β”‚        <name>Alice</name>                                    β”‚     β”‚
β”‚   β”‚        <address>                                             β”‚     β”‚
β”‚   β”‚          <city>New York</city>                               β”‚     β”‚
β”‚   β”‚          <state>NY</state>                                   β”‚     β”‚
β”‚   β”‚        </address>                                            β”‚     β”‚
β”‚   β”‚      </customer>                                             β”‚     β”‚
β”‚   β”‚      <items>                                                 β”‚     β”‚
β”‚   β”‚        <item sku="A1" qty="5" price="29.99"/>               β”‚     β”‚
β”‚   β”‚        <item sku="B2" qty="3" price="49.99"/>               β”‚     β”‚
β”‚   β”‚      </items>                                                β”‚     β”‚
β”‚   β”‚    </order>                                                  β”‚     β”‚
β”‚   β”‚  </orders>                                                   β”‚     β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                                                         β”‚
β”‚   Processing Pipeline:                                                 β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚   β”‚  Step 1: Parse XML β†’ Row with nested elements               β”‚     β”‚
β”‚   β”‚  Step 2: Extract attributes vs child elements               β”‚     β”‚
β”‚   β”‚  Step 3: Flatten nested structure                           β”‚     β”‚
β”‚   β”‚  Step 4: Convert to typed DataFrame                         β”‚     β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                                                         β”‚
β”‚   Attribute vs Element Handling:                                       β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚   β”‚  <order id="ORD-001">        β†’ @id attribute               β”‚     β”‚
β”‚   β”‚    <name>Alice</name>         β†’ element content             β”‚     β”‚
β”‚   β”‚    <item sku="A1"/>           β†’ @sku attribute              β”‚     β”‚
β”‚   β”‚  </order>                                                   β”‚     β”‚
β”‚   β”‚                                                              β”‚     β”‚
β”‚   β”‚  Attributes: Predefined, fixed set, no nesting              β”‚     β”‚
β”‚   β”‚  Elements: Variable, extensible, can nest deeply            β”‚     β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Detailed Explanation

JSON and XML are the predominant semi-structured data formats in modern data systems. PySpark provides robust support for parsing both formats, but the complexity lies in handling deeply nested structures, variable schemas, and performance optimization for large-scale processing. The fundamental challenge is converting unstructured text into typed, queryable DataFrames while preserving the semantic meaning of the original structure.

Schema inference in PySpark uses a sampling-based approach to determine the data types of JSON fields. By default, Spark samples a fraction of the data and infers the schema from the sample. This can lead to incorrect schemas if the sample is not representativeβ€”for example, if numeric values are stored as strings in some records but not others. The mode option controls how Spark handles malformed records: PERMISSIVE (default) sets corrupt records to null, DROPMALFORMED silently drops corrupt records, and FAILFAST throws exceptions on corrupt records.

The from_json function requires an explicit schema, which is the recommended approach for production workloads. Schema inference should only be used for exploration, not for production pipelines. The schema definition uses StructType and StructField to describe the structure, with support for nested StructType, ArrayType, and MapType. Defining schemas explicitly also enables predicate pushdown and column pruning, which significantly improves query performance.

Nested data flattening is the most common transformation for JSON/XML data. The explode function transforms array elements into separate rows, while select with dot notation (e.g., col("customer.name")) extracts nested fields. For deeply nested structures, multiple levels of explode and select operations may be required, but each explode operation increases the row count, which can lead to data explosion if not carefully managed.

XML processing in PySpark requires the spark-xml library, which provides a XML data source with similar capabilities to the JSON data source. XML has additional complexities due to attributes (key-value pairs in tags) vs child elements (nested tags), mixed content (text interspersed with elements), and namespace handling. The rowTag option specifies the element that defines each row, while attributePrefix distinguishes attributes from child elements.

Schema evolution is critical for semi-structured data sources that change over time. The mergeSchema option enables Spark to automatically merge schemas from different files or batches. When mergeSchema is enabled, Spark reads the schema from all files and creates a superset schema, filling in null values for missing fields. This works well for additive changes (new columns) but can fail for destructive changes (column removal or type changes).

Mathematical Foundations

Definition: Schema Evolution

Schema evolution transforms a record RR from schema SiS_i to SjS_j via a migration function Mi→j(R)M_{i \rightarrow j}(R), where:

Miβ†’j(R)={RifΒ SjβŠ†SiΒ (narrowing)Rβˆͺ{dk:k∈Sjβˆ–Si}ifΒ additiveerrorifΒ incompatibleM_{i \rightarrow j}(R) = \begin{cases} R & \text{if } S_j \subseteq S_i \text{ (narrowing)} \\ R \cup \{d_k: k \in S_j \setminus S_i\} & \text{if additive} \\ \text{error} & \text{if incompatible} \end{cases}

Parsing Complexity

For JSON document of depth dd and branching factor bb, parsing complexity is:

Tparse=O(bd)(worstΒ case),Tstreaming=O(n)Β whereΒ n=tokenΒ countT_{\text{parse}} = O(b^d) \quad \text{(worst case)}, \qquad T_{\text{streaming}} = O(n) \text{ where } n = \text{token count}

Streaming parsers reduce space from O(bd)O(b^d) to O(d)O(d) stack depth.

Schema Correctness Theorem

A schema SS is valid for document DD if and only if every value at path pp in DD matches the type constraint S(p)S(p):

βˆ€p∈paths(D):type(D(p))∈S(p).allowed_types\forall p \in \text{paths}(D): \text{type}(D(p)) \in S(p).\text{allowed\_types}

Inference from sample DsampleD_{\text{sample}} overestimates types: SinferredβŠ‡StrueS_{\text{inferred}} \supseteq S_{\text{true}}.

Columnar Conversion Cost

Converting nested JSON to flat columns with kk levels and nn leaf fields:

Cflatten=nΓ—mΓ—k(totalΒ fieldΒ extractions)C_{\text{flatten}} = n \times m \times k \quad \text{(total field extractions)}

where mm is the number of records.

Compression Ratio

After schema enforcement, columnar storage compression improves by:

Ratio=∣Runtyped∣∣Rtypedβˆ£β‰ˆβˆ‘ibits(Ri.string)βˆ‘ibits(Ri.typed)\text{Ratio} = \frac{|R_{\text{untyped}}|}{|R_{\text{typed}}|} \approx \frac{\sum_i \text{bits}(R_i.\text{string})}{\sum_i \text{bits}(R_i.\text{typed})}

Typical improvement: 3-10x for numeric columns.

Key Insight

from_json with explicit schema is 2-5x faster than schema_of_json inference because it avoids a sampling pass. For nested data, use schema_of_json once to generate the schema, then hardcode it.

Summary

JSON/XML parsing on Spark benefits from streaming parsers for memory efficiency, explicit schemas for performance, and type enforcement for compression. Schema evolution requires careful compatibility analysis. Nested-to-flat conversion cost scales linearly with record count and nesting depth.

Key Concepts Table

ConceptDescriptionConfiguration
Schema InferenceAutomatically determine data types from sampleinferSchema=true (default for JSON)
Explicit SchemaDefine schema programmaticallyschema=my_schema
PERMISSIVE ModeSet corrupt records to nullmode=PERMISSIVE
DROPMALFORMED ModeSilently drop corrupt recordsmode=DROPMALFORMED
FAILFAST ModeThrow exception on corrupt recordsmode=FAILFAST
from_jsonParse JSON string column to structfrom_json(col, schema)
get_json_objectExtract value using JSON pathget_json_object(col, path)
explodeTransform array to multiple rowsexplode(col("array_col"))
mergeSchemaMerge schemas across files/batchesmergeSchema=true
columnNameOfCorruptRecordStore malformed records in columncolumnNameOfCorruptRecord="_corrupt"

Code Examples

JSON Parsing with Explicit Schema

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json

spark = SparkSession.builder \
    .appName("JSONXMLParsing") \
    .getOrCreate()

# Define explicit schema for JSON data
order_schema = StructType([
    StructField("order_id", StringType(), False),
    StructField("timestamp", StringType(), True),
    StructField("customer", StructType([
        StructField("id", StringType(), False),
        StructField("name", StringType(), True),
        StructField("email", StringType(), True),
        StructField("address", StructType([
            StructField("street", StringType(), True),
            StructField("city", StringType(), True),
            StructField("state", StringType(), True),
            StructField("zip", StringType(), True),
        ]), True),
    ]), False),
    StructField("items", ArrayType(StructType([
        StructField("sku", StringType(), False),
        StructField("name", StringType(), True),
        StructField("quantity", IntegerType(), True),
        StructField("price", DoubleType(), True),
        StructField("attributes", MapType(StringType(), StringType()), True),
    ])), True),
    StructField("payment", StructType([
        StructField("method", StringType(), True),
        StructField("amount", DoubleType(), True),
        StructField("currency", StringType(), True),
    ]), True),
    StructField("metadata", MapType(StringType(), StringType()), True),
])

# Read JSON with explicit schema
json_data = [
    ('{"order_id":"ORD-001","timestamp":"2024-01-15T10:30:00Z","customer":{"id":"CUST-001","name":"Alice","email":"alice@email.com","address":{"street":"123 Main St","city":"New York","state":"NY","zip":"10001"}},"items":[{"sku":"A1","name":"Widget","quantity":5,"price":29.99,"attributes":{"color":"blue","size":"M"}},{"sku":"B2","name":"Gadget","quantity":3,"price":49.99,"attributes":{"color":"red","size":"L"}}],"payment":{"method":"credit","amount":249.93,"currency":"USD"},"metadata":{"source":"web","campaign":"summer"}}',),
    ('{"order_id":"ORD-002","timestamp":"2024-01-15T11:00:00Z","customer":{"id":"CUST-002","name":"Bob","email":"bob@email.com","address":{"street":"456 Oak Ave","city":"Boston","state":"MA","zip":"02101"}},"items":[{"sku":"C3","name":"Gizmo","quantity":2,"price":99.99,"attributes":{"color":"black"}}],"payment":{"method":"debit","amount":199.98,"currency":"USD"},"metadata":{"source":"mobile","campaign":"winter"}}',),
]

df = spark.read \
    .option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "_corrupt_record") \
    .schema(order_schema) \
    .json(spark.sparkContext.parallelize([row[0] for row in json_data]))

# Flatten nested JSON structure
flattened_df = df \
    .select(
        col("order_id"),
        col("timestamp"),
        col("customer.id").alias("customer_id"),
        col("customer.name").alias("customer_name"),
        col("customer.email").alias("customer_email"),
        col("customer.address.city").alias("customer_city"),
        col("customer.address.state").alias("customer_state"),
        col("payment.method").alias("payment_method"),
        col("payment.amount").alias("payment_amount"),
        col("metadata").alias("order_metadata"),
    )

flattened_df.show(truncate=False)

# Explode items array
items_df = df \
    .select(
        col("order_id"),
        col("customer.id").alias("customer_id"),
        explode(col("items")).alias("item")
    ) \
    .select(
        col("order_id"),
        col("customer_id"),
        col("item.sku"),
        col("item.name"),
        col("item.quantity"),
        col("item.price"),
        (col("item.quantity") * col("item.price")).alias("line_total"),
    )

items_df.show(truncate=False)

Advanced JSON Operations

# Using from_json for string columns
json_strings_df = spark.createDataFrame([
    ('{"a": 1, "b": "hello"}',),
    ('{"a": 2, "b": "world", "c": 3.14}',),
], ["json_string"])

schema = StructType([
    StructField("a", IntegerType()),
    StructField("b", StringType()),
    StructField("c", DoubleType()),
])

parsed_df = json_strings_df \
    .withColumn("parsed", from_json(col("json_string"), schema)) \
    .select(
        col("parsed.a").alias("a"),
        col("parsed.b").alias("b"),
        col("parsed.c").alias("c"),
    )

parsed_df.show(truncate=False)

# Using get_json_object for JSON path queries
paths_df = df \
    .withColumn("customer_city", 
        get_json_object(col("_corrupt_record") if "_corrupt_record" in df.columns 
                       else to_json(struct("*")), "$.customer.address.city")
    )

# Handle nested arrays with posexplode
items_with_position = df \
    .select(
        col("order_id"),
        posexplode(col("items")).alias("position", "item")
    ) \
    .select(
        col("order_id"),
        col("position"),
        col("item.sku"),
        col("item.quantity"),
    )

items_with_position.show(truncate=False)

# Aggregate nested data
order_summary = df \
    .withColumn("item_count", size(col("items"))) \
    .withColumn("total_item_value", 
        expr("aggregate(items, 0D, (acc, item) -> acc + item.quantity * item.price)")
    ) \
    .withColumn("unique_skus", 
        size(array_distinct(expr("transform(items, x -> x.sku)")))
    ) \
    .select(
        col("order_id"),
        col("customer.name"),
        col("item_count"),
        col("total_item_value"),
        col("unique_skus"),
    )

order_summary.show(truncate=False)

XML Parsing

# Install spark-xml: spark.jars.packages=com.databricks:spark-xml_2.12:0.17.0

# Create sample XML data
xml_data = """<?xml version="1.0" encoding="UTF-8"?>
<catalog>
    <book id="bk101">
        <author>Gambardella, Matthew</author>
        <title>XML Developer's Guide</title>
        <genre>Computer</genre>
        <price>44.95</price>
        <publish_date>2000-10-01</publish_date>
        <description>An in-depth look at creating applications with XML.</description>
    </book>
    <book id="bk102">
        <author>Ralls, Kim</author>
        <title>Midnight Rain</title>
        <genre>Fantasy</genre>
        <price>5.95</price>
        <publish_date>2000-12-16</publish_date>
        <description>A former architect battles corporate zombies.</description>
    </book>
</catalog>"""

# Write XML to file for reading
xml_path = "/tmp/catalog.xml"
spark.sparkContext.parallelize([xml_data]).saveAsTextFile(xml_path)

# Read XML
xml_df = spark.read \
    .option("rowTag", "book") \
    .option("attributePrefix", "attr_") \
    .option("valueTag", "_value") \
    .xml(xml_path)

xml_df.show(truncate=False)

# Parse XML from string column
xml_strings = spark.createDataFrame([
    ('<person><name>Alice</name><age>30</age></person>',),
    ('<person><name>Bob</name><age>25</age></person>',),
], ["xml_string"])

# Using from_xml (requires spark-xml)
from pyspark.sql.types import schema_of_json

person_schema = StructType([
    StructField("name", StringType()),
    StructField("age", IntegerType()),
])

xml_parsed = xml_strings \
    .withColumn("parsed", from_xml(col("xml_string"), person_schema)) \
    .select(
        col("parsed.name").alias("name"),
        col("parsed.age").alias("age"),
    )

xml_parsed.show(truncate=False)

# Handle XML attributes
xml_with_attrs = """<order id="ORD-001" status="complete">
    <customer id="CUST-001">Alice</customer>
    <total currency="USD">249.95</total>
</order>"""

# Extract attributes using regex or UDF
@udf(returnType=MapType(StringType(), StringType()))
def extract_xml_attributes(xml_string):
    """Extract XML attributes from root element."""
    import re
    attrs = {}
    matches = re.findall(r'(\w+)="([^"]*)"', xml_string)
    for key, value in matches:
        attrs[key] = value
    return attrs

# Combine with element extraction
xml_df = spark.createDataFrame([(xml_with_attrs,)], ["xml"])
result = xml_df \
    .withColumn("attributes", extract_xml_attributes(col("xml"))) \
    .withColumn("order_id", col("attributes")["id"]) \
    .withColumn("status", col("attributes")["status"])

result.show(truncate=False)

Performance Metrics

Operation1K Records100K Records10M Records100M Records
JSON Read (schema inference)< 1 sec2-5 sec30-60 sec5-10 min
JSON Read (explicit schema)< 1 sec1-3 sec10-20 sec2-5 min
JSON String Parse (from_json)< 1 sec1-2 sec8-15 sec1-3 min
JSON Path Extract (get_json_object)< 1 sec2-5 sec20-40 sec3-8 min
Array Explode< 1 sec1-3 sec10-20 sec2-5 min
Deep Nesting Extraction (3+ levels)< 1 sec3-8 sec45-90 sec8-15 min
XML Read< 1 sec3-8 sec45-90 sec8-15 min
Schema Merge (mergeSchema)< 1 sec2-5 sec20-40 sec3-8 min
Malformed Record Handling< 1 sec1-3 sec10-20 sec2-5 min
Nested Aggregation< 1 sec2-5 sec20-40 sec3-8 min

Best Practices

  1. Always define explicit schemas for production JSON/XML parsing to avoid schema inference overhead and ensure type consistency
  2. Use PERMISSIVE mode with columnNameOfCorruptRecord to capture and investigate malformed records instead of losing data
  3. Limit schema inference samples with spark.sql.columnNameOfCorruptRecord and spark.sql.json.filterNullRecords for large datasets
  4. Flatten incrementallyβ€”extract top-level fields first, then explode arrays, to maintain data lineage and avoid row explosion
  5. Cache parsed DataFrames when performing multiple operations on the same semi-structured data to avoid re-parsing
  6. Use get_json_object for targeted extraction of specific fields instead of parsing the entire JSON structure
  7. Enable mergeSchema cautiouslyβ€”only use when schema evolution is expected and test with a small sample first
  8. Partition XML/JSON files by a logical key (date, region) to enable predicate pushdown and reduce scan volume
  9. Avoid explode on large arrays without filteringβ€”use filter before explode to reduce the number of generated rows
  10. Use from_json instead of get_json_object when extracting multiple fields from the same JSON to avoid repeated parsing
  11. Implement custom UDFs for complex XML parsing when spark-xml doesn't support your specific XML structure
  12. Monitor corrupt record counts in productionβ€”high corruption rates indicate upstream data quality issues that should be addressed at the source

See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)

Advertisement

Need Expert PySpark Help?

Get personalized Spark optimization, cluster tuning, or production data pipeline consulting.

Advertisement