I am running a function in a Dask cluster to compute the normalized difference between two of my data variables in my xarray Dataset object. However, I need this to be in a Dask DataFrame format first before I can do the computation. (I realize I can do this keeping the object as an xarray Dataset, I’m testing future functions where I need to convert from xarray Dataset to a Dask DataFrame). So, I run a method xr.Dataset.to_dask_dataframe(), then run my code. I compared the performance of running this function on just an xarray Dataset object and a Dask DataFrame object and the Dask DataFrame test-case uses a significantly higher number of resources due to all of the new tasks Dask sets up to repartition/rechunk my data (see screenshots, first shot is the xarray run, second is the Dask DataFrame run). I understand that repartitioning and rechunking is a very expensive task. Using xr.Dataset.to_dask_dataframe() does not convert my chunked data into partitions so I run a repartition step in my code. Also, excuse all of the red task streams and the white spaces in-between tasks. This is not optimal by any means. What are my options for efficiently converting the xarray Dataset to a Dask DataFrame and potentially reducing the number of tasks in my task stream? I feel it has to come down to how I chunk/partition the data, but I’m not sure.
######################################################
### IF YOU WANT TO RECREATE THE DASK DATAFRAME RUN ###
######################################################
import numpy as np
import pandas as pd
import xarray as xr
from dask.distributed import Client, LocalCluster
client = Client(LocalCluster(n_workers=8, threads_per_worker=1, memory_limit='32GiB'))
# Define the dimensions
time = pd.date_range("2020-12-29T18:57:32.281000", periods=3)
X = np.linspace(-421600, 486700, 9084)
Y = np.linspace(-599200, 458500, 10578)
# Create a data array with random data for each variable
data = np.random.rand(len(time), len(X), len(Y)).astype(np.float32)
# Create a dictionary of data variables
data_vars = {
'SR_B4': (['time', 'X', 'Y'], data),
'SR_B5': (['time', 'X', 'Y'], data),
}
chunk_size = {'time': 3, 'X': 512, 'Y': 256}
# Create the dataset
dataset = xr.Dataset(
data_vars=data_vars,
coords={'time': time, 'X': X, 'Y': Y},
attrs={
'date_range': '[1365638400000, 1654560000000]',
'description': '<p>This dataset contains atmospherically corrected data.</p>',
'keywords': ['cfmask', 'cloud', 'fmask', 'global', 'l8sr', 'landsat'],
'period': 0,
'visualization_2_max': 30000.0,
'visualization_2_min': 0.0,
'visualization_2_name': 'Shortwave Infrared (753)',
'crs': 'EPSG:3310'
}
).chunk(chunk_size)
dask_df = dataset.to_dask_dataframe().repartition(partition_size='128 MiB')
def compute_ndvi(dask_df):
# Ensure that the required columns are in the DataFrame
if 'SR_B5' not in dask_df.columns or 'SR_B4' not in dask_df.columns:
raise ValueError("DataFrame must contain 'SR_B5' and 'SR_B4' columns")
# Compute NDVI
return (dask_df['SR_B5'] - dask_df['SR_B4']) / (dask_df['SR_B5'] + dask_df['SR_B4'])
dask_df['NDVI'] = dask_df.map_partitions(compute_ndvi)
dask_df['NDVI'].compute()