Spark – How to read large zstandard file efficiently

  Kiến thức lập trình

I’m trying to read a large zstandard file (~30GB compressed) in Databricks with Spark. It’s a collection of chess games in the following PGN format (.pgn.zst)

[Event "Rated Bullet tournament https://lichess.org/tournament/yc1WW2Ox"]
[Site "https://lichess.org/PpwPOZMq"]
[Date "2017.04.01"]
[Round "-"]
[White "Abbot"]
[Black "Costello"]
[Result "0-1"]
[UTCDate "2017.04.01"]
[UTCTime "11:32:01"]
[WhiteElo "2100"]
[BlackElo "2000"]
[WhiteRatingDiff "-4"]
[BlackRatingDiff "+1"]
[WhiteTitle "FM"]
[ECO "B30"]
[Opening "Sicilian Defense: Old Sicilian"]
[TimeControl "300+0"]
[Termination "Time forfeit"]

1. e4 { [%eval 0.17] [%clk 0:00:30] } 1... c5 { [%eval 0.19] [%clk 0:00:30] }
2. Nf3 { [%eval 0.25] [%clk 0:00:29] } 2... Nc6 { [%eval 0.33] [%clk 0:00:30] }
3. Bc4 { [%eval -0.13] [%clk 0:00:28] } 3... e6 { [%eval -0.04] [%clk 0:00:30] }
4. c3 { [%eval -0.4] [%clk 0:00:27] } 4... b5? { [%eval 1.18] [%clk 0:00:30] }
5. Bb3?! { [%eval 0.21] [%clk 0:00:26] } 5... c4 { [%eval 0.32] [%clk 0:00:29] }
6. Bc2 { [%eval 0.2] [%clk 0:00:25] } 6... a5 { [%eval 0.6] [%clk 0:00:29] }
7. d4 { [%eval 0.29] [%clk 0:00:23] } 7... cxd3 { [%eval 0.6] [%clk 0:00:27] }
8. Qxd3 { [%eval 0.12] [%clk 0:00:22] } 8... Nf6 { [%eval 0.52] [%clk 0:00:26] }
9. e5 { [%eval 0.39] [%clk 0:00:21] } 9... Nd5 { [%eval 0.45] [%clk 0:00:25] }
10. Bg5?! { [%eval -0.44] [%clk 0:00:18] } 10... Qc7 { [%eval -0.12] [%clk 0:00:23] }
11. Nbd2?? { [%eval -3.15] [%clk 0:00:14] } 11... h6 { [%eval -2.99] [%clk 0:00:23] }
12. Bh4 { [%eval -3.0] [%clk 0:00:11] } 12... Ba6? { [%eval -0.12] [%clk 0:00:23] }
13. b3?? { [%eval -4.14] [%clk 0:00:02] } 13... Nf4? { [%eval -2.73] [%clk 0:00:21] } 0-1

To load the file, I am using spark.read.text. From my understanding, zst files are not splittable so this ends up reading the whole file into a single partition which causes massive spills. I also wanted to apply a pivot transformation to extract each games into a single record. I believe due to me having to go line by line grouping each rows into a game, this step also runs inefficiently on one partition as well.
I’m wondering if there is a better way to do this in Spark, and if not what is a more appropriate tool for a job like this?

Below is my code with a sample df for clarification

df = spark.read.text(file_name).filter(
    (col('value') != '') &
    (~col('value').like('%UTCTime%')) &
    (~col('value').like('%Result%'))
)
value
[Event “Rated Blitz game”]
[White “btr18am”]
[Black “Kozionov_sergey”]
[Opening “Queen’s Gambit Declined: Exchange Variation, Positional Variation”]
1. d4 { [%clk 0:03:00] } d5 { [%clk 0:03:00] } 2. c4 { [%clk 0:03:01] } e6 { [%clk 0:03:01] } …  27. Nxf7 { [%clk 0:01:28] } 1-0
[Event “Rated UltraBullet tournament https://lichess.org/tournament/5Emn5TK6”]
[White “rickyrich”]
[Black “seanysean”]
[Opening “Alekhine Defense”]
1. e4 { [%clk 0:00:15] } Nf6 { [%clk 0:00:15] } 2. e5 { [%clk 0:00:14] } g6 { [%clk 0:00:15] } … 22. Rxe1 { [%clk 0:00:00] } Bc3+ { [%clk 0:00:11] } 0-1
# Rename the "value" column to "Line"
df = df.withColumnRenamed("value", "Line")

# Extract the "Key" and "Value" columns based on the structure of the "Line" column
df = df.withColumn('Key', when(col('Line').startswith('1.'), 'Moves')
                   .otherwise(regexp_extract(col('Line'), r'[(.*?)s', 1))) 
       .withColumn('Value', when(col('Line').startswith('1.'), col('Line'))
                   .otherwise(regexp_extract(col('Line'), r'"(.*)"', 1)))

# Add a column to identify the start of a game
df = df.withColumn("StartOfGame", when(col("Line").startswith("[Event"), 1).otherwise(0))

# Define a window specification for calculating the cumulative sum
windowSpec = Window.orderBy(monotonically_increasing_id())

# Calculate the cumulative sum of "StartOfGame" to create "GameID"
df = df.withColumn("GameID", sum("StartOfGame").over(windowSpec))
Line Key Value StartOfGame GameID
[Event “Rated Blitz game”] Event Rated Blitz game 1 1
[White “btr18am”] White btr18am 0 1
[Black “Kozionov_sergey”] Black Kozionov_sergey 0 1
0 1
[Opening “Queen’s Gambit Declined: Exchange Variation, Positional Variation”] Opening Queen’s Gambit Declined: Exchange Variation, Positional Variation 0 1
1. d4 { [%clk 0:03:00] } d5 { [%clk 0:03:00] } 2. c4 { [%clk 0:03:01] } e6 { [%clk 0:03:01] } …  27. Nxf7 { [%clk 0:01:28] } 1-0 Moves 1. d4 { [%clk 0:03:00] } d5 { [%clk 0:03:00] } 2. c4 { [%clk 0:03:01] } e6 { [%clk 0:03:01] } …  27. Nxf7 { [%clk 0:01:28] } 1-0 0 1
[Event “Rated UltraBullet tournament https://lichess.org/tournament/5Emn5TK6”] Event Rated UltraBullet tournament https://lichess.org/tournament/5Emn5TK6 1 2
[White “rickyrich”] White rickyrich 0 2
[Black “seanysean”] Black seanysean 0 2
0 2
[Opening “Alekhine Defense”] Opening Alekhine Defense 0 2
1. e4 { [%clk 0:00:15] } Nf6 { [%clk 0:00:15] } 2. e5 { [%clk 0:00:14] } g6 { [%clk 0:00:15] } … 22. Rxe1 { [%clk 0:00:00] } Bc3+ { [%clk 0:00:11] } 0-1 Moves 1. e4 { [%clk 0:00:15] } Nf6 { [%clk 0:00:15] } 2. e5 { [%clk 0:00:14] } g6 { [%clk 0:00:15] } … 22. Rxe1 { [%clk 0:00:00] } Bc3+ { [%clk 0:00:11] } 0-1 0 2
# Define the list of columns for pivoting
col_list = [ 
    'UTCDate', 'Event', 'TimeControl', 'Opening', 'ECO' , 'Site',
     'Termination', 'Moves', 
    'White', 'WhiteTitle', 'WhiteElo', 'WhiteRatingDiff', 
    'Black', 'BlackTitle', 'BlackElo', 'BlackRatingDiff', 
    ]


# Pivot the DataFrame based on "GameID" and the specified columns
df = df.groupBy("GameID").pivot("Key", col_list).agg(first("Value"))

df = df.filter(col('Moves').contains('%eval'))

df.write.partitionBy('UTCDate').mode("overwrite").parquet(silver_file_path)
GameID Event White Black Opening Moves
1 Rated Blitz game btr18am Kozionov_sergey Queen’s Gambit Declined: Exchange Variation, Positional Variation 1. d4 { [%clk 0:03:00] } d5 { [%clk 0:03:00] } 2. c4 { [%clk 0:03:01] } e6 { [%clk 0:03:01] } …  27. Nxf7 { [%clk 0:01:28] } 1-0
2 Rated UltraBullet tournament https://lichess.org/tournament/5Emn5TK6 rickyrich seanysean Alekhine Defense 1. e4 { [%clk 0:00:15] } Nf6 { [%clk 0:00:15] } 2. e5 { [%clk 0:00:14] } g6 { [%clk 0:00:15] } … 22. Rxe1 { [%clk 0:00:00] } Bc3+ { [%clk 0:00:11] } 0-1

I have tried using a bigger cluster with more memory. While it does reduce spills, I’m not sure if this is a more ‘efficient’ way of doing this as it also increaes the number of cores in the cluster which are ultimately unsued.

LEAVE A COMMENT