This example shows how to rename a column, change column data type, add new column with no value, drop column, and debugging output df/table schema.
from pyspark.sql.types import DecimalType, StringType, TimestampType
from pyspark.sql.functions import current_timestamp, lit, col
for table in os.listdir(SOURCE_FOLDER_FILE_API_PATH):
tableFolderABFSPath = os.path.join(source_folder_abfs_path, table)
df = spark.read.format("delta").load(tableFolderABFSPath)
if table == "EmployeeLocation":
df = df.withColumn("LocationId",df["LocationId"].cast("string"))
elif table == "EmployeeRelatedParty":
df = df.withColumnRenamed("RelationshipPeriodStartTimestamp","PeriodStartTimestamp") \
.withColumnRenamed("RelationshipPeriodEndTimestamp","PeriodEndTimestamp") \
.withColumn("SourceModifiedOn",lit(None)) \
.withColumn("SourceTable",lit(None)) \
.drop("RelationshipEstablishedDate")
elif table == "GeographicArea":
df = df.withColumnRenamed("GeographicAreaUomId","GeographicAreaUnitOfMeasureId") \
.withColumnRenamed("GeographicAreaUnits","GeographicArea")
elif table == "Incident":
df = df.withColumn("TotalIncidentCost",df["TotalIncidentCost"].cast("double")) \
.withColumn("CurrencyId",df["CurrencyId"].cast("string")).withColumnRenamed("CurrencyId","IsoCurrencyCode") \
.withColumn("SourceModifiedOn",lit(None)) \
.withColumn("SourceTable",lit(None))
elif table == "IncidentCost":
df = df.withColumn("CurrencyId",df["CurrencyId"].cast("string")).withColumnRenamed("CurrencyId","IsoCurrencyCode") \
.drop("IncidentCostNote")
elif table == "Location":
df = df.withColumn("LocationElevation",col("LocationElevation").cast(DecimalType(22,10))) \
.withColumn("LocationAddressLine3",lit(None).cast(StringType())) \
.withColumn("LocationLatitude", col("LocationLatitude").cast(DecimalType(10,7))) \
.withColumn("LocationLongitude",col("LocationLongitude").cast(DecimalType(10,7))) \
.withColumn("LocationZipCode", col("LocationZipCode").cast(StringType())).withColumnRenamed("LocationZipCode","LocationPostalCode") \
.drop("LocationStateId") \
.withColumn("SourceModifiedOn",lit(None).cast(TimestampType())) \
.withColumn("SourceTable",lit(None).cast(StringType()))
elif table == "PartyBusinessMetric":
df = df.withColumn("CurrencyId",col("CurrencyId").cast(StringType())).withColumnRenamed("CurrencyId","IsoCurrencyCode") \
.withColumnRenamed("PartyBusinessMetricUomId","PartyBusinessMetricUnitOfMeasureId") \
.withColumn("PartyBusinessMetricValue", col("PartyBusinessMetricValue").cast(DecimalType(22,10))) \
.withColumn("SourceModifiedOn",lit(None).cast(TimestampType())) \
.withColumn("SourceTable",lit(None).cast(StringType()))
elif table == "StorageContainerProductActualStorage":
df = df.withColumn("ActualStorageUnits", col("ActualStorageUnits").cast(DecimalType(22,10))).withColumnRenamed("ActualStorageUnits","ActualStorage") \
.withColumnRenamed("ActualStorageUomId","ActualStorageUnitOfMeasureId") \
.withColumn("SourceModifiedOn",lit(None).cast(TimestampType())) \
.withColumn("SourceTable",lit(None).cast(StringType()))
elif table == "UnitOfMeasure":
df = df.withColumn("NumberOfBaseUnits", col("NumberOfBaseUnits").cast(DecimalType(22,10)))
try:
df.write.format("delta").mode('overwrite').saveAsTable(TARGET_LAKEHOUSE_NAME + '.' + table)
except Exception as e:
df.printSchema()
spark.table(TARGET_LAKEHOUSE_NAME + '.' + table).printSchema()
print(f"TARGET_LAKEHOUSE_NAME: {TARGET_LAKEHOUSE_NAME}")
print(f"table: {table}")
print(f"tableFolderABFSPath: {tableFolderABFSPath}")
print(f"Exception: {e}")
raise
FEATURED TAGS
agent
ai
api
automation
availability
availability-sets
availability-zones
aws-vm
azure
azure-automation-runbook
azure-blob
azure-cosmos-db
azure-data-lake
azure-deployment
azure-function-app
azure-functions
azure-openai
azure-sign-in
azure-site-recovery
azure-sql-database
azure-subscription
azure-vm
base64
certificate
change-data-capture
change-tracking
chrome
claude-code-desktop
clr
container
cte
data-api-builder
data-conversion
data-gateway
database-mail
database-role
database-size
date-table
dax
db-config
deepseek
derived-table
diagram
direct-query
disk-management
disk-space
docker
downtime
dtc
dynamic-m-parameter
embedding
encrypted-connection
excel
excel-online
excel-online-for-business
execution-plan
extended-events
external-data
fabric
fabric-capacity
failover-cluster
fk
geometry
hierarchy
httpwebrequest
hugo
hyper-v
incognito-mode
index
infrastructure
inline-tvf
json
kql
lakehouse
linked-server
live-query-statistics
llm-model
locking
m
machine-learning
machine-learning-model
machine-learning-services
master-key
mcp
mdx
memory
memory-grant
mermaid
mirrored-sql-server
network
network-card
network-category
office-script
onedrive
onnx-runtime
openrowset
p2v
parquet
performance
polybase
power-automate
power-bi
power-bi-report-tricks
power-platform
power-query
powershell
printer
public-ip-address
pyspark
python
qgis
qt-designer
query-performance
query-plan
query-troubleshooting
r
regex
replication
route
s3
schema-design
scripting
self-signed-certificate
server-role
sharepoint
snowflake
software-development
sofware-development
spark
sql
sql-2025
sql-agent
sql-availability-group
sql-error
sql-failover-cluster-instance
sql-index
sql-openjson
sql-permission
sql-recovery
sql-script
sql-security
sql-server
sql-server-admin
sql-server-config
sql-statistics
ssis
ssisdb
ssl
ssl/tls-error
ssms
table-expression
tempdb
terraform
tips
troubleshooting
unicode
view
visual-studio
visual-studio-code
vmware
wait-statistics
wi-fi-connection-issue
windows-settings