3. Using dask and zarr for multithreaded input/output¶
import numpy as np
import zarr
import time
import datetime
import pytz
from zarr.util import human_readable_size
import dask
import dask.array as da
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
from dask.diagnostics.profile_visualize import visualize
Many python programmers use hdf5 through either the h5py or pytables modules to store large dense arrays. One drawback of the hdf5 implementation is that it is basically single-threaded, that is only one core can read or write to a dataset at any one time. Multithreading makes data compression much more attractive, because data sections can be decompressing/compressing simultaneouly while the data is read/written.
3.1. zarr¶
zarr keeps the h5py interface (which is similar to numpy’s), but allows different choices for file compression and is fully multithreaded. See Alistair Miles original blog entry for a discussion of the motivation behind zarr.
3.2. dask¶
dask is a Python library that implements lazy data structures (array, dataframe, bag) and a clever thread/process scheduler. It integrates with zarr to allow calculations on datasets that don’t fit into core memory, either in a single node or across a cluster.
3.2.1. Example, write and read zarr arrays using multiple threads¶
3.2.2. Create 230 Mbytes of fake data¶
wvel_data = np.random.normal(2000, 1000, size=[8000,7500]).astype(np.float32)
human_readable_size(wvel_data.nbytes)
3.2.3. Copy to a zarr file on disk, using multiple threads¶
item='disk1_data'
store = zarr.DirectoryStore(item)
group=zarr.hierarchy.group(store=store,overwrite=True,synchronizer=zarr.ThreadSynchronizer())
the_var='wvel'
out_zarr1=group.zeros(the_var,shape=wvel_data.shape,dtype=wvel_data.dtype,chunks=[2000,7500])
out_zarr1[...]=wvel_data[...]
3.2.4. Add some attributes¶
now=datetime.datetime.now(pytz.UTC)
timestamp= int(now.strftime('%s'))
out_zarr1.attrs['history']='written for practice'
out_zarr1.attrs['creation_date']=str(now)
out_zarr1.attrs['gmt_timestap']=timestamp
out_zarr1
3.2.5. Create an array of zeros – note that compression shrinks it from 230 Mbytes to 321 bytes¶
a2 = np.zeros([8000,7500],dtype=np.float32)
item='disk2_data'
store = zarr.DirectoryStore(item)
group=zarr.hierarchy.group(store=store,overwrite=True,synchronizer=zarr.ThreadSynchronizer())
the_var='wvel'
out_zarr2=group.zeros(the_var,shape=a2.shape,dtype=a2.dtype,chunks=[2000,7500])
out_zarr2
3.2.6. copy input to output using chunks¶
item='disk2_data'
store = zarr.DirectoryStore(item)
group=zarr.hierarchy.group(store=store,overwrite=True,synchronizer=zarr.ThreadSynchronizer())
the_var='wvel'
out_zarr=group.empty(the_var,shape=wvel_data.shape,dtype=wvel_data.dtype,chunks=[2000,7500])
out_zarr2[...]=out_zarr1[...]
out_zarr2
3.2.7. Create a dask array from a zarr disk file¶
da_input = da.from_array(out_zarr2, chunks=out_zarr1.chunks)
da_input
3.2.8. The following calculation uses numpy, so it releases the GIL¶
result=(da_input**2. + da_input**3.).mean(axis=0)
result
3.2.9. Note that result hasn’t been computed yet¶
Here is a graph of how the calculation will be split among 4 threads
from dask.dot import dot_graph
dot_graph(result.dask)
3.2.10. Now do the calculation¶
with Profiler() as prof, ResourceProfiler(dt=0.1) as rprof,\
CacheProfiler() as cprof:
answer = result.compute()
Visualize the cpu, memory and cache for the 4 threads
help(visualize)
the_plot=visualize([prof, rprof,cprof], min_border_top=15, min_border_bottom=15)
Here is the dask profile: profile_out
3.2.11. You can evaluate your own functions on dask arrays¶
If your functons release the GIL, you can get multithreaded computation using dask.delayed