I am using pyspark in MS Fabric. I have a logging table, and I am trying to delete older entries from it.
That’s easy enough, given:
-
a table whose name is stored in
TABLE_LOG
-
a column in that table by the name
LOG_TIME
with dtypeTimestampType
The following example would keep only the last two days of logs (very roughly, because of the integer cast):
RUNTIME = current_timestamp()
PERSISTENCE = 2
SECONDS_IN_DAY=24*60*60
log = spark.read.format('delta').load(f'Tables/{TABLE_LOG}')
log = log.withColumn('age', (RUNTIME.cast(IntegerType()) - log[LOG_TIME].cast(IntegerType())) / SECONDS_IN_DAY)
log = log.where(log.age < PERSISTENCE)
log.write.mode('overwrite').format('delta').save(f'Tables/{TABLE_LOG}')
In SQL, you would partition and delete the data in-place, which should be neatly parallel, and thus more efficient.
But I want to keep using pyspark, because the whole environment (including the constants and variables) is in python.
Is it possible to do that using pyspark?
1
Okay, answering myself here. It works, at least for delta tables:
import delta
from pyspark.sql.functions import col, date_add
dt = delta.DeltaTable.forPath(spark, f'Tables/{TABLE_LOG}')
dt.delete(col(LOG_TIME) < date_add(RUNTIME, -PERSISTENCE))
…and that’s it. Thank you for listening 🙂