This example shows how to rename a column, change column data type, add new column with no value, drop column, and debugging output df/table schema.
from pyspark.sql.types import DecimalType, StringType, TimestampType
from pyspark.sql.functions import current_timestamp, lit, col
for table in os.listdir(SOURCE_FOLDER_FILE_API_PATH):
tableFolderABFSPath = os.path.join(source_folder_abfs_path, table)
df = spark.read.format("delta").load(tableFolderABFSPath)
if table == "EmployeeLocation":
df = df.withColumn("LocationId",df["LocationId"].cast("string"))
elif table == "EmployeeRelatedParty":
df = df.withColumnRenamed("RelationshipPeriodStartTimestamp","PeriodStartTimestamp") \
.withColumnRenamed("RelationshipPeriodEndTimestamp","PeriodEndTimestamp") \
.withColumn("SourceModifiedOn",lit(None)) \
.withColumn("SourceTable",lit(None)) \
.drop("RelationshipEstablishedDate")
elif table == "GeographicArea":
df = df.withColumnRenamed("GeographicAreaUomId","GeographicAreaUnitOfMeasureId") \
.withColumnRenamed("GeographicAreaUnits","GeographicArea")
elif table == "Incident":
df = df.withColumn("TotalIncidentCost",df["TotalIncidentCost"].cast("double")) \
.withColumn("CurrencyId",df["CurrencyId"].cast("string")).withColumnRenamed("CurrencyId","IsoCurrencyCode") \
.withColumn("SourceModifiedOn",lit(None)) \
.withColumn("SourceTable",lit(None))
elif table == "IncidentCost":
df = df.withColumn("CurrencyId",df["CurrencyId"].cast("string")).withColumnRenamed("CurrencyId","IsoCurrencyCode") \
.drop("IncidentCostNote")
elif table == "Location":
df = df.withColumn("LocationElevation",col("LocationElevation").cast(DecimalType(22,10))) \
.withColumn("LocationAddressLine3",lit(None).cast(StringType())) \
.withColumn("LocationLatitude", col("LocationLatitude").cast(DecimalType(10,7))) \
.withColumn("LocationLongitude",col("LocationLongitude").cast(DecimalType(10,7))) \
.withColumn("LocationZipCode", col("LocationZipCode").cast(StringType())).withColumnRenamed("LocationZipCode","LocationPostalCode") \
.drop("LocationStateId") \
.withColumn("SourceModifiedOn",lit(None).cast(TimestampType())) \
.withColumn("SourceTable",lit(None).cast(StringType()))
elif table == "PartyBusinessMetric":
df = df.withColumn("CurrencyId",col("CurrencyId").cast(StringType())).withColumnRenamed("CurrencyId","IsoCurrencyCode") \
.withColumnRenamed("PartyBusinessMetricUomId","PartyBusinessMetricUnitOfMeasureId") \
.withColumn("PartyBusinessMetricValue", col("PartyBusinessMetricValue").cast(DecimalType(22,10))) \
.withColumn("SourceModifiedOn",lit(None).cast(TimestampType())) \
.withColumn("SourceTable",lit(None).cast(StringType()))
elif table == "StorageContainerProductActualStorage":
df = df.withColumn("ActualStorageUnits", col("ActualStorageUnits").cast(DecimalType(22,10))).withColumnRenamed("ActualStorageUnits","ActualStorage") \
.withColumnRenamed("ActualStorageUomId","ActualStorageUnitOfMeasureId") \
.withColumn("SourceModifiedOn",lit(None).cast(TimestampType())) \
.withColumn("SourceTable",lit(None).cast(StringType()))
elif table == "UnitOfMeasure":
df = df.withColumn("NumberOfBaseUnits", col("NumberOfBaseUnits").cast(DecimalType(22,10)))
try:
df.write.format("delta").mode('overwrite').saveAsTable(TARGET_LAKEHOUSE_NAME + '.' + table)
except Exception as e:
df.printSchema()
spark.table(TARGET_LAKEHOUSE_NAME + '.' + table).printSchema()
print(f"TARGET_LAKEHOUSE_NAME: {TARGET_LAKEHOUSE_NAME}")
print(f"table: {table}")
print(f"tableFolderABFSPath: {tableFolderABFSPath}")
print(f"Exception: {e}")
raise