Optimal way to convert an xr.Dataset to a Dask DataFrame?

  Kiến thức lập trình

I am running a function in a Dask cluster to compute the normalized difference between two of my data variables in my xarray Dataset object. However, I need this to be in a Dask DataFrame format first before I can do the computation. (I realize I can do this keeping the object as an xarray Dataset, I’m testing future functions where I need to convert from xarray Dataset to a Dask DataFrame). So, I run a method xr.Dataset.to_dask_dataframe(), then run my code. I compared the performance of running this function on just an xarray Dataset object and a Dask DataFrame object and the Dask DataFrame test-case uses a significantly higher number of resources due to all of the new tasks Dask sets up to repartition/rechunk my data (see screenshots, first shot is the xarray run, second is the Dask DataFrame run). I understand that repartitioning and rechunking is a very expensive task. Using xr.Dataset.to_dask_dataframe() does not convert my chunked data into partitions so I run a repartition step in my code. Also, excuse all of the red task streams and the white spaces in-between tasks. This is not optimal by any means. What are my options for efficiently converting the xarray Dataset to a Dask DataFrame and potentially reducing the number of tasks in my task stream? I feel it has to come down to how I chunk/partition the data, but I’m not sure.

######################################################
### IF YOU WANT TO RECREATE THE DASK DATAFRAME RUN ###
######################################################
    import numpy as np
    import pandas as pd
    import xarray as xr
    from dask.distributed import Client, LocalCluster
    
    client = Client(LocalCluster(n_workers=8, threads_per_worker=1, memory_limit='32GiB'))
    
    # Define the dimensions
    time = pd.date_range("2020-12-29T18:57:32.281000", periods=3)
    X = np.linspace(-421600, 486700, 9084)
    Y = np.linspace(-599200, 458500, 10578)
    
    # Create a data array with random data for each variable
    data = np.random.rand(len(time), len(X), len(Y)).astype(np.float32)
    
    # Create a dictionary of data variables
    data_vars = {
        'SR_B4': (['time', 'X', 'Y'], data),
        'SR_B5': (['time', 'X', 'Y'], data),
    }
    
    chunk_size = {'time': 3, 'X': 512, 'Y': 256}
    
    # Create the dataset
    dataset = xr.Dataset(
        data_vars=data_vars,
        coords={'time': time, 'X': X, 'Y': Y},
        attrs={
            'date_range': '[1365638400000, 1654560000000]',
            'description': '<p>This dataset contains atmospherically corrected data.</p>',
            'keywords': ['cfmask', 'cloud', 'fmask', 'global', 'l8sr', 'landsat'],
            'period': 0,
            'visualization_2_max': 30000.0,
            'visualization_2_min': 0.0,
            'visualization_2_name': 'Shortwave Infrared (753)',
            'crs': 'EPSG:3310'
        }
    ).chunk(chunk_size)
    
    dask_df = dataset.to_dask_dataframe().repartition(partition_size='128 MiB')
    
    def compute_ndvi(dask_df):
        # Ensure that the required columns are in the DataFrame
        if 'SR_B5' not in dask_df.columns or 'SR_B4' not in dask_df.columns:
            raise ValueError("DataFrame must contain 'SR_B5' and 'SR_B4' columns")
    
        # Compute NDVI
        return (dask_df['SR_B5'] - dask_df['SR_B4']) / (dask_df['SR_B5'] + dask_df['SR_B4'])
    
    dask_df['NDVI'] = dask_df.map_partitions(compute_ndvi)
    dask_df['NDVI'].compute()

Theme wordpress giá rẻ Theme wordpress giá rẻ Thiết kế website

LEAVE A COMMENT