Problem Solving in the Era of Pre and Post-LLM Coding Agents - A Case Study
Table of Contents
Introduction
Approximately an year ago, while at work, I had to implement a data pipeline job that among other things need to read and write datetime partitioned folders from one location to another, preserving the partitioning information while writing it to the sink. When I researched online on how to do this in spark, I found very few tutorials giving an end-to-end solution that worked especially when the partitions are deeply nested and we don't know beforehand the values these folder names will take (e.g. year=*/month=*/day=*/hour=*/*.csv
). Below, I have provided one such implementation using Spark. Note that this was before LLM assisted coding agents had become more widespread and efficient (I also provided another version by claude 3.5 sonnet).
Setup
Assumption is that the partition folders are created at the source with below pattern:
year/month/day/hour
Crucially, as mentioned above, the full name of the folders are not known except they have some constant prefix pattern in them.
False Starts
-
Use
recursiveFileLookup
andpathGlobFilter
option while reading and writing. But it doesn't work on writing. As a new user of spark this confused me. The symmetry between read and write behavior doesn't always exist as I had naively expected. -
Parameterize the reading and writing based on all the possible year/month/day/hour combination and if any of them missing in source then allow that particular realization of the job to be silently ignored. But it is grossly inefficient.
-
Other arcane approaches using
udf
+foreachBatch
that didn't work as I had expected.
Silver Lining
After few trial and error and searching in Stack Overflow + Spark documentation, I hit upon an idea to use a combination of input_file_name()
, regexp_extract()
and partitionBy()
API's on the write side to achieve the end goal.
In software engineering, one usually struggle through many hazy path of wrong-turns
before arriving at a clear view of a solution that looking back it feels like we
had achieved success through hard work spending many hours attacking a problem and finally subduing it. With the advent of LLM based coding agent's and their immediate conjuring of near perfect solution for a problem, I wonder if we can still feel the same way ever again.
Anyways, below is the solution promised with all the sharp edges of battle scars removed.
Spark Solution (Scala version)
package main.scala.blog
/**
* Spark stream example code to read and write from a partitioned folder
* to a partitioned folder without explicitly known datetime.
*/
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.functions.{udf, input_file_name, col, lit, regexp_extract}
object PartitionedReaderWriter {
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("PartitionedReaderWriterApp")
.getOrCreate()
val sourceBasePath = "data/partitioned_files_source/user"
val sourceDf = spark.read
.format("csv")
.schema("State STRING, Color STRING, Count INT")
.option("header", "true")
.option("pathGlobFilter", "*.csv")
.option("recursiveFileLookup", "true")
.load(sourceBasePath)
val destinationBasePath = "data/partitioned_files_destination/user"
val writeDf = sourceDf
.withColumn("year", regexp_extract(input_file_name(), "year=(\\d{4})", 1))
.withColumn("month", regexp_extract(input_file_name(), "month=(\\d{2})", 1))
.withColumn("day", regexp_extract(input_file_name(), "day=(\\d{2})", 1))
.withColumn("hour", regexp_extract(input_file_name(), "hour=(\\d{2})", 1))
writeDf.write
.format("csv")
.option("header", "true")
.mode("overwrite")
.partitionBy("year", "month", "day", "hour")
.save(destinationBasePath)
spark.stop()
}
}
Link to a repo
sample-spark-read-write-partitioned-folders
LLM Version
Below is the version (keeping only the relevant bits) generated by claude 3.5 sonnet. Granted, this code is not what I would have come up with by default but the code works and it is not terribly hard to understand.
The scary (or interesting) part is, it provided me a solution in few seconds that I had only arrived at after a few days of much toil and head scratching. On the positive side, toiling and hard work provided me an increased sense of confidence in my technical ability and deepening of knowledge in Spark.
I can only hope one can take the best of both worlds in this brave new world!
def extractPartitions(basePath: String, path: String): Map[String, String] = {
val segments = path.split(basePath + "/").last
.split("/")
.dropRight(1) // Remove filename
segments.map(segment => {
val parts = segment.split("=")
(parts(0), parts(1))
}).toMap
}
def main(args: Array[String]) {
// ...existing code...
val extractPartitionsUDF = udf((basePath: String, path: String) => extractPartitions(basePath, path))
// Add partition columns
val writeDf = sourceDf_with_paths
.withColumn("source_path", input_file_name())
.withColumn("partitions", extractPartitionsUDF(lit(sourceBasePath), col("source_path")))
.withColumn("year", col("partitions.year"))
.withColumn("month", col("partitions.month"))
.withColumn("day", col("partitions.day"))
.withColumn("hour", col("partitions.hour"))
.drop("partitions", "source_path") // Clean up temporary columns
writeDf.write
.format("csv")
.partitionBy("year", "month", "day", "hour") // Specify partition columns in order
.save(destinationBasePath)
// ...existing code...
}