Databricks Dlt Code Walkthrough
Delta Live Tables Code Walkthrough🔗
- Create Streaming Table for Orders
@dlt.table(
table_properties = {"quality":"bronze"},
comment = "Orders Bronze Table"
)
def orders_bronze():
df = spark.readStream.table("dev.bronze.orders_raw")
return df
- Create Materialized View for Customers
@dlt.table(
table_properties = {"quality":"bronze"},
comment = "Customers Materialized View"
)
def customers_bronze():
df = spark.read.table("dev.bronze.customers_raw")
return df
- Create a view that joins above streaming table and materialized view
@dlt.view(
comment = 'Joined View'
)
def joined_vw():
df_c = spark.read.table("LIVE.customers_bronze")
df_o = spark.read.table("LIVE.orders_bronze")
df_join = df_o.join(df_c,how = "left_outer",on = df_c.c_custkey==df_o.o_custkey)
return df_join
- Add a new column to the view
@dlt.table(
table_properties = {"quality":"silver"},
comment = "joined table",
name = 'joined_silver'
)
def joined_silver():
df = spark.read.table("LIVE.joined_vw").withColumn("_insertdate",current_timestamp())
return df
- Create gold level aggregation
@dlt.table(
table_properties = {"quality":"gold"},
comment = "orders aggregated table",
)
def joined_silver():
df = spark.read.table("LIVE.joined_silver")
df_final = df.groupBy('c_mktsegment').agg(count('o_orderkey').alias('sum_orders').withColumn('_insertdate',current_timestamp()))
return df_final
Deleting DLT Pipeline🔗
The tables / datasets in DLT are managed and linked to DLT pipelines. So if we delete a pipleine all fo them get dropped.
Incremental Load in DLT🔗
When we inserted 10k records into orders_bronze, only those got ingested not the entire table.
Adding New Column🔗
@dlt.table(
table_properties = {"quality":"gold"},
comment = "orders aggregated table",
)
def joined_silver():
df = spark.read.table("LIVE.joined_silver")
df_final = df.groupBy('c_mktsegment').agg(count('o_orderkey').alias('sum_orders').agg(sum('o_totalprice').alias('sum_price').withColumn('_insertdate',current_timestamp()))
return df_final
We dont have to manipulate ddl, the dlt pipeline will auto detect addition of new column.
Renaming Tables🔗
We just change the name of the function in the table declaration and the table name will be renamed. The catalog will also reflect this.
DLT Internals🔗
Every streaming table, MV is supported by underlying tables in _databricks_internal
schema.
and they have a table_id associated with it.
If we go to these tables in storage account, we can see checkpoints that keep track of incremental data changes.
Data Lineage🔗
DLT Append Flow and Autoloader🔗
@dlt.table(
table_properties = {"quality":"bronze"},
comment = "orders autoloader",
name = "orders_autoloader_bronze"
)
def func():
df = (
spark.readStream
.format("cloudFiles")
.option("cloudFilesFormat","CSV")
.option("cloudFiles.schemaLocation","...")
.option("pathGlobFilter","*.csv")
.option("cloudFiles.schemaEvolutionMode","none")
.load("/Volumes/etl/landing/files"
)
return df
dlt.createStreamingTable("order_union_bronze")
@dlt.append_flow(
target = "orders_union_bronze"
)
def order_delta_append():
df = spark.readStream.table("LIVE.orders_bronze")
return df
@dlt.append_flow(
target = "orders_union_bronze"
)
def order_autoloader_append():
df = spark.readStream.table("LIVE.orders_autoloader_bronze")
return df
@dlt.view(
comment = 'Joined View'
)
def joined_vw():
df_c = spark.read.table("LIVE.customers_bronze")
df_o = spark.read.table("LIVE.orders_union_bronze")
df_join = df_o.join(df_c,how = "left_outer",on = df_c.c_custkey==df_o.o_custkey)
return df_join
Custom Configuration🔗
Use this param in code
for _status in _order_status.split(","):
# create gold table
@dlt.table(
table_properties = {"quality":"gold"},
comment = "order aggregated table",
name = f"orders_agg_{_status}_gold"
)
def orders_aggregated_gold():
df = spark.read.table("LIVE.joined_silver")
df_final = df.where(f"o_orderstatus = '{_status}'").groupBy("c_mktsegment").agg(count('o_orderkey').alias("count_of_orders"),sum("o_totalprice").alias('sum_totalprice')).withColumn("_insert_date", current_timestamp())
return df_final
DLT SCD1 and SCD2🔗
Pre Requisites
Input Source Table
@dlt.view(
comment = "Customer Bronze streaming view"
)
def customer_bronze():
df = spark.readStream.table("dev.bronze.customers_raw")
return df
SCD Type1 Table
dlt.create_streaming_table('customer_sdc1_bronze')
dlt.apply_changes(
target = "customer_scd1_bronze",
source = "customer_bronze_vw",
keys = ['c_custkey'],
stored_as_scd_type = 1,
apply_as_deletes = expr("__src_action = 'D'"),
apply_as_truncates = expr("__src_action = 'T'"),
sequence_by = "__src_insert_dt"
)
SCD Type 2 Table
dlt.create_streaming_table('customer_sdc2_bronze')
dlt.apply_changes(
target = "customer_scd1_bronze",
source = "customer_bronze_vw",
keys = ['c_custkey'],
stored_as_scd_type = 2,
except_column_list = ['__src_action','__src_insert_dt']
sequence_by = "__src_insert_dt"
)
Changes in view to make SCD2 applicable
@dlt.view(
comment = 'Joined View'
)
def joined_vw():
df_c = spark.read.table("LIVE.customers_scd2_bronze").where("__END_AT is null")
df_o = spark.read.table("LIVE.orders_union_bronze")
df_join = df_o.join(df_c,how = "left_outer",on = df_c.c_custkey==df_o.o_custkey)
return df_join
After inserting record with update the __END_AT
for the new update is null signifying its the latest update
In SCD Type1 just the update is captured.
Insert Old Timestamp record🔗
SCD Type1 vs SCD Type2 Delete Records🔗
Rules for Data Quality : Warn, Drop and Fail🔗
Defining the Rules
__order_rules = {
"Valid Order Status" : "o_order_status in ('O','F','P')",
"Valid Order Price" : "o_orderprice > 0"
}
__customer_rules = {
"valid market segment" : "c_mktsegment is not null"
}
Adding the rules
@dlt.table(
table_properties = {"quality":"bronze"},
comment = "Orders Bronze Table"
)
@dlt.expect_all(__order_rules) # warn
def orders_bronze():
df = spark.readStream.table("dev.bronze.orders_raw")
return df
@dlt.table(
table_properties = {"quality":"bronze"},
comment = "Customers Materialized View"
)
@dlt.expect_all(__customer_rules) # warn
def customers_bronze():
df = spark.read.table("dev.bronze.customers_raw")
return df
Edge Case🔗
Number of failed records here is 2, but in source table only one record was flawed, but since there are two consumers it shows 2 records failed.
Using Expectations on the view🔗
Even though on top we can see market segment is null, since we are doing a left join and the joined view does not have details for the customer 99999,(because it failed expectation and record was dropped), so there were no failed records at all.