# xray + dask

*This was modified from a notebook originally written by Stephan Hoyer*

Weather data -- especially the results of numerical weather simulations -- is big. Some of the biggest super computers make weather forecasts, and they save their output on increasingly high resolution grids. Even for data analysis purposes, it's easy to need to process 10s or 100s of GB of data.

There are many excellent tools for working with weather data, which is usually stored in the netCDF file format. Many of these have support for out-of-core data, notably including the command line tools [NCO](http://nco.sourceforge.net/nco.html) and [CDO](https://code.zmaw.de/projects/cdo/wiki/cdo). There are even Python tools, including a netCDF4 library and Iris. However, none of these tools matched the ease of use of pandas. We knew there there was a better way, so we decided to write xray, a library for working with multi-dimensional labeled data.

The latest release of xray includes support for processing datasets that don't fit into memory using dask, a new Python library that extends NumPy to out-of-core datasets by blocking arrays into small chunks and using a simple task scheduling abstraction. Dask allows xray to easily process out of core data and simultaneously make use of all our CPUs resources.

## Loading data

First, we'll import dask and setup a ThreadPool for processing tasks. Dask currently doesn't do this automatically.

In [1]:
import xray
import dask.array as da
import numpy as np

import dask

We'll use the new `xray.open_mfdataset` function to open archived weather data from ECMWF. It opens a glob of netCDF files on my local disk and automatically infers how to combine them into a few logical arrays by reading their metadata:

In [1]:
!ls /home/mrocklin/data/ecmwf/*.nc3

/home/mrocklin/data/ecmwf/2014-01-01.nc3
/home/mrocklin/data/ecmwf/2014-01-02.nc3
/home/mrocklin/data/ecmwf/2014-01-03.nc3
/home/mrocklin/data/ecmwf/2014-01-04.nc3
/home/mrocklin/data/ecmwf/2014-01-05.nc3
/home/mrocklin/data/ecmwf/2014-01-06.nc3
/home/mrocklin/data/ecmwf/2014-01-07.nc3
/home/mrocklin/data/ecmwf/2014-01-08.nc3
/home/mrocklin/data/ecmwf/2014-01-09.nc3
/home/mrocklin/data/ecmwf/2014-01-10.nc3
/home/mrocklin/data/ecmwf/2014-01-11.nc3
/home/mrocklin/data/ecmwf/2014-01-12.nc3
/home/mrocklin/data/ecmwf/2014-01-13.nc3
/home/mrocklin/data/ecmwf/2014-01-14.nc3
/home/mrocklin/data/ecmwf/2014-01-15.nc3
/home/mrocklin/data/ecmwf/2014-01-16.nc3
/home/mrocklin/data/ecmwf/2014-01-17.nc3
/home/mrocklin/data/ecmwf/2014-01-18.nc3
/home/mrocklin/data/ecmwf/2014-01-19.nc3
/home/mrocklin/data/ecmwf/2014-01-20.nc3
/home/mrocklin/data/ecmwf/2014-01-21.nc3
/home/mrocklin/data/ecmwf/2014-01-22.nc3
/home/mrocklin/data/ecmwf/2014-01-23.nc3
/home/mrocklin/data/ecmwf/2014-01-

In [2]:
ds = xray.open_mfdataset('/home/mrocklin/data/ecmwf/*.nc3', engine='scipy')

TypeError: from_array() takes at least 2 arguments (2 given)

In [None]:
ds

## 11 GB of Data

In [4]:
np.prod(ds.dims.values()) * 8 * 2 ** -30

11.324758529663086

## 4GB of Memory

In [5]:
!cat /proc/meminfo | grep MemTotal

MemTotal:        3766528 kB


## Index with meaningful values, not numbers 

In [6]:
# x.mean(2)
ds.mean('longitude')

<xray.Dataset>
Dimensions:   (latitude: 721, time: 1464)
Coordinates:
  * latitude  (latitude) >f4 90.0 89.75 89.5 89.25 89.0 88.75 88.5 88.25 88.0 87.75 87.5 87.25 ...
  * time      (time) datetime64[ns] 2014-01-01 2014-01-01T06:00:00 2014-01-01T12:00:00 ...
Data variables:
    u10       (time, latitude) float64 0.8483 0.9731 1.098 1.037 1.258 1.459 1.39 1.322 1.203 ...
    v10       (time, latitude) float64 -0.282 -0.2981 -0.3142 -0.3064 -0.3349 -0.3596 -0.3326 ...
    t2m       (time, latitude) float64 254.0 253.9 253.8 253.9 253.8 253.6 253.4 253.1 252.8 ...

In [7]:
ds.sel(time="2014-04", latitude=(ds.latitude > 10 & (ds.latitude < 40)))

<xray.Dataset>
Dimensions:    (latitude: 360, longitude: 1440, time: 120)
Coordinates:
  * longitude  (longitude) >f4 0.0 0.25 0.5 0.75 1.0 1.25 1.5 1.75 2.0 2.25 2.5 2.75 3.0 3.25 ...
  * latitude   (latitude) >f4 90.0 89.75 89.5 89.25 89.0 88.75 88.5 88.25 88.0 87.75 87.5 87.25 ...
  * time       (time) datetime64[ns] 2014-04-01 2014-04-01T06:00:00 2014-04-01T12:00:00 ...
Data variables:
    u10        (time, latitude, longitude) float64 2.715 2.683 2.65 2.616 2.583 2.551 2.518 ...
    v10        (time, latitude, longitude) float64 -7.506 -7.511 -7.517 -7.523 -7.529 -7.535 ...
    t2m        (time, latitude, longitude) float64 254.1 254.1 254.1 254.1 254.1 254.1 254.1 ...
Attributes:
    Conventions: CF-1.0
    history: 2015-02-11 16:45:42 GMT by grib_to_netcdf-1.13.0: grib_to_netcdf /data/data01/mars-web230-20150211164538-19989-9003.target -o /data/data01/mars-web230-20150211164541-19989-9004.nc

## Groupby operations and datetime handling

In [8]:
%time ds.groupby('time.month').mean('time').load_data()

CPU times: user 5min 3s, sys: 36.4 s, total: 5min 39s
Wall time: 1min 53s


<xray.Dataset>
Dimensions:    (latitude: 721, longitude: 1440, month: 12)
Coordinates:
  * longitude  (longitude) >f4 0.0 0.25 0.5 0.75 1.0 1.25 1.5 1.75 2.0 2.25 2.5 2.75 3.0 3.25 ...
  * latitude   (latitude) >f4 90.0 89.75 89.5 89.25 89.0 88.75 88.5 88.25 88.0 87.75 87.5 87.25 ...
  * month      (month) int64 1 2 3 4 5 6 7 8 9 10 11 12
Data variables:
    u10        (month, latitude, longitude) float64 -2.758 -2.749 -2.74 -2.731 -2.722 -2.713 ...
    v10        (month, latitude, longitude) float64 1.707 1.718 1.729 1.741 1.752 1.764 1.775 ...
    t2m        (month, latitude, longitude) float64 252.4 252.4 252.4 252.4 252.4 252.4 252.4 ...

## Bandwidth

In [9]:
11e9 / 113 / 1e6 # MB/s

97.34513274336283