Autoloader Schema Inference and Evolution
Schema Inference and Evolution in Auto Loader🔗
Auto Loader first samples first 50GB or 1000 files whichever limit is crossed first.
Auto Loader stores the schema information in a directory _schemas at the configured cloudFiles.schemaLocation to track schema changes to the input data over time.
spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes
and spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles
is used to change default configuration.
By default, Auto Loader schema inference seeks to avoid schema evolution issues due to type mismatches. For formats that don't encode data types (JSON, CSV, and XML), Auto Loader infers all columns as strings (including nested fields in JSON files). For formats with typed schema (Parquet and Avro), Auto Loader samples a subset of files and merges the schemas of individual files.
This is different from Spark DataFrameReader that infers the datatypes based on the sample data even if its JSON / CSV.
If we want to replicate the DataFrameReader schema evolution use cloudFiles.inferColumnTypes
to true.
When inferring the schema for CSV data, Auto Loader assumes that the files contain headers. If your CSV files do not contain headers, provide the option .option("header", "false"). In addition, Auto Loader merges the schemas of all the files in the sample to come up with a global schema. Auto Loader can then read each file according to its header and parse the CSV correctly.
What Happens when multiple parquet files have different types for same column?🔗
When a column has different data types in two Parquet files, Auto Loader chooses the widest type. You can use schemaHints to override this choice. When you specify schema hints, Auto Loader doesn't cast the column to the specified type, but rather tells the Parquet reader to read the column as the specified type. In the case of a mismatch, the column is rescued in the rescued data column.
When a column has different data types in two Parquet files, Auto Loader chooses the widest type. You can use schemaHints to override this choice. When you specify schema hints, Auto Loader doesn't cast the column to the specified type, but rather tells the Parquet reader to read the column as the specified type. In the case of a mismatch, the column is rescued in the rescued data column.
Schema Evolution in Auto Loader🔗
Auto Loader detects the addition of new columns as it processes your data. When Auto Loader detects a new column, the stream stops with an UnknownFieldException. Before your stream throws this error, Auto Loader performs schema inference on the latest micro-batch of data and updates the schema location with the latest schema by merging new columns to the end of the schema. The data types of existing columns remain unchanged.
Other Modes🔗
rescue
- Schema is never evolved and stream does not fail due to schema changes. All new columns are recorded in the rescued data column.
failOnNewColumns
- Stream fails. Stream does not restart unless the provided schema is updated, or the offending data file is removed.
none
- Does not evolve the schema, new columns are ignored, and data is not rescued unless the rescuedDataColumn option is set. Stream does not fail due to schema changes.
Default Mode🔗
addNewColumns
mode is the default when a schema is not provided, but none is the default when you provide a schema. addNewColumns is not allowed when the schema of the stream is provided, but does work if you provide your schema as a schema hint.
How Do Partitions Work in Auto Loader?🔗
Auto Loader attempts to infer partition columns from the underlying directory structure of the data if the data is laid out in Hive style partitioning. For example, the file path base_path/event=click/date=2021-04-01/f0.json results in the inference of date and event as partition columns. If the underlying directory structure contains conflicting Hive partitions or doesn't contain Hive style partitioning, partition columns are ignored.
Binary file (binaryFile) and text file formats have fixed data schemas, but support partition column inference. Databricks recommends setting cloudFiles.schemaLocation for these file formats. This avoids any potential errors or information loss and prevents inference of partitions columns each time an Auto Loader begins.
Partition columns are not considered for schema evolution. If you had an initial directory structure like base_path/event=click/date=2021-04-01/f0.json
, and then start receiving new files as base_path/event=click/date=2021-04-01/hour=01/f1.json
, Auto Loader ignores the hour column. To capture information for new partition columns, set cloudFiles.partitionColumns
to event,date,hour.
Some Caveats in Rescue Data Mode🔗
The JSON and CSV parsers support three modes when parsing records: PERMISSIVE, DROPMALFORMED, and FAILFAST
. When used together with rescuedDataColumn, data type mismatches do not cause records to be dropped in DROPMALFORMED mode or throw an error in FAILFAST mode. Only corrupt records are dropped or throw errors, such as incomplete or malformed JSON or CSV. If you use badRecordsPath when parsing JSON or CSV, data type mismatches are not considered as bad records when using the rescuedDataColumn. Only incomplete and malformed JSON or CSV records are stored in badRecordsPath
.
Defining Schema Hints in Auto Loader🔗
Inferred Schema:
|-- date: string
|-- quantity: int
|-- user_info: struct
|Â Â Â Â |-- id: string
|Â Â Â Â |-- name: string
|Â Â Â Â |-- dob: string
|-- purchase_options: struct
|Â Â Â Â |-- delivery_address: string
By Specifying Schema Hints:
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
We get
|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|Â Â Â Â |-- id: string
|Â Â Â Â |-- name: string
|Â Â Â Â |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp
Schema hints are used only if you do not provide a schema to Auto Loader. You can use schema hints whether cloudFiles.inferColumnTypes is enabled or disabled.