Spark examples

Posted by John Liu on Monday, May 19, 2025

This example shows how to rename a column, change column data type, add new column with no value, drop column, and debugging output df/table schema.

from pyspark.sql.types import DecimalType, StringType, TimestampType
from pyspark.sql.functions import current_timestamp, lit, col

for table in os.listdir(SOURCE_FOLDER_FILE_API_PATH):
    tableFolderABFSPath = os.path.join(source_folder_abfs_path, table)
    df = spark.read.format("delta").load(tableFolderABFSPath)
    if table == "EmployeeLocation":
        df = df.withColumn("LocationId",df["LocationId"].cast("string"))
    elif table == "EmployeeRelatedParty":
        df = df.withColumnRenamed("RelationshipPeriodStartTimestamp","PeriodStartTimestamp") \
            .withColumnRenamed("RelationshipPeriodEndTimestamp","PeriodEndTimestamp") \
            .withColumn("SourceModifiedOn",lit(None)) \
            .withColumn("SourceTable",lit(None)) \
            .drop("RelationshipEstablishedDate")
    elif table == "GeographicArea":
        df = df.withColumnRenamed("GeographicAreaUomId","GeographicAreaUnitOfMeasureId") \
            .withColumnRenamed("GeographicAreaUnits","GeographicArea")
    elif table == "Incident":
        df = df.withColumn("TotalIncidentCost",df["TotalIncidentCost"].cast("double")) \
            .withColumn("CurrencyId",df["CurrencyId"].cast("string")).withColumnRenamed("CurrencyId","IsoCurrencyCode") \
            .withColumn("SourceModifiedOn",lit(None)) \
            .withColumn("SourceTable",lit(None))
    elif table == "IncidentCost":
        df = df.withColumn("CurrencyId",df["CurrencyId"].cast("string")).withColumnRenamed("CurrencyId","IsoCurrencyCode") \
            .drop("IncidentCostNote")
    elif table == "Location":
        df = df.withColumn("LocationElevation",col("LocationElevation").cast(DecimalType(22,10))) \
            .withColumn("LocationAddressLine3",lit(None).cast(StringType())) \
            .withColumn("LocationLatitude", col("LocationLatitude").cast(DecimalType(10,7))) \
            .withColumn("LocationLongitude",col("LocationLongitude").cast(DecimalType(10,7))) \
            .withColumn("LocationZipCode", col("LocationZipCode").cast(StringType())).withColumnRenamed("LocationZipCode","LocationPostalCode") \
            .drop("LocationStateId") \
            .withColumn("SourceModifiedOn",lit(None).cast(TimestampType())) \
            .withColumn("SourceTable",lit(None).cast(StringType()))
    elif table == "PartyBusinessMetric":
        df = df.withColumn("CurrencyId",col("CurrencyId").cast(StringType())).withColumnRenamed("CurrencyId","IsoCurrencyCode") \
            .withColumnRenamed("PartyBusinessMetricUomId","PartyBusinessMetricUnitOfMeasureId") \
            .withColumn("PartyBusinessMetricValue", col("PartyBusinessMetricValue").cast(DecimalType(22,10))) \
            .withColumn("SourceModifiedOn",lit(None).cast(TimestampType())) \
            .withColumn("SourceTable",lit(None).cast(StringType()))
    elif table == "StorageContainerProductActualStorage":
        df = df.withColumn("ActualStorageUnits", col("ActualStorageUnits").cast(DecimalType(22,10))).withColumnRenamed("ActualStorageUnits","ActualStorage") \
            .withColumnRenamed("ActualStorageUomId","ActualStorageUnitOfMeasureId") \
            .withColumn("SourceModifiedOn",lit(None).cast(TimestampType())) \
            .withColumn("SourceTable",lit(None).cast(StringType()))
    elif table == "UnitOfMeasure":
        df = df.withColumn("NumberOfBaseUnits", col("NumberOfBaseUnits").cast(DecimalType(22,10)))

    try:
        df.write.format("delta").mode('overwrite').saveAsTable(TARGET_LAKEHOUSE_NAME + '.' + table)
    except Exception as e:
        df.printSchema()
        spark.table(TARGET_LAKEHOUSE_NAME + '.' + table).printSchema()
        print(f"TARGET_LAKEHOUSE_NAME: {TARGET_LAKEHOUSE_NAME}")
        print(f"table: {table}")
        print(f"tableFolderABFSPath: {tableFolderABFSPath}")
        print(f"Exception: {e}")
        raise