Schema evolution in Delta Lake tables using AWS Glue ET

  Kiến thức lập trình

I’m trying to configure schema evolution on delta lake tables in Glue ETL to perform a merge between a “ref” table and a “trusted” table, where the “trusted” table has a schema evolution (new columns). Using the following code:

delta_object.delta_table.alias('ref').merge(
    source = delta_object.dataframe_upsert.alias('trusted'),
    condition = condition_nk) 
    .whenNotMatchedInsert(
        condition = condition_insert,
        values = delta_object.dict_columns_insert
    ).whenMatchedUpdate(
        condition = "ref.is_current = true",    
        set = {                                     
            "is_current": "false",
            "eff_end_date": lit(delta_object.data)
        }
    ).execute()

According to the delta lake documentation it is necessary enable schema evolution by default by setting autoMerge to true:

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

I tried different ways to configure autoMerge within Glue Job ETL:

  1. SparkConf
conf = SparkConf()
conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
conf.set("spark.databricks.delta.schema.autoMerge.enabled", True)
conf.set("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED")
conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
  1. Byukder SparkSession
spark = SparkSession 
.builder 
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
.config("spark.databricks.delta.schema.autoMerge.enabled", True) 
.config("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED") 
.config("spark.sql.legacy.timeParserPolicy", "LEGACY") 
.getOrCreate()
  1. parquet.mergeSchema and autoMerge
spark = SparkSession 
.builder 
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
.config("spark.databricks.delta.schema.autoMerge.enabled", True) 
.config("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED") 
.config("spark.sql.legacy.timeParserPolicy", "LEGACY") 
.config("spark.sql.parquet.mergeSchema", True) 
.getOrCreate()

New contributor

Igor Formiga is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.

LEAVE A COMMENT