Dask
This is another parallel computing framework, but a little more lightweight than apache spark.
In task scheduling we break our program into many medium-sized tasks or units of computation, often a function call on a non-trivial amount of data. We represent these tasks as nodes
in a graph with edges
between nodes
if one task depends on data produced by another. A task scheduler
is called to execute this graph. Task scheduling logic often hides within other larger framework.
Dask uses a specification to encode a graph – a directed acyclic graph of tasks with data dependencies using common python objects: dicts
, tuples
, callables
. A dask graph is a dictionary that maps keys to computations:
{'x': 1,
'y': 2,
'z': (add, 'x', 'y'),
'w': (sum, ['x', 'y', 'z']),
'v': [(sum, ['w', 'z']), 2]}
A key
is any hashable value that is not a task
. A task
is a tuple
with a callable
as first element. The succeeding elements in the task tuple are arguments for that function.
Dask collections
dask.array
import h5py
dataset = h5py.File('myfile.hdf5')['/x']
import dask.array as da
x = da.from_array(dataset, chunks=dataset.chunks)
y = x[::10] - x.mean(axis=0)
y.compute()
dask.dataframe
import dask.dataframe as dd
df = dd.read_csv('data/2016-*.*.csv', parse_dates=['timestamp'])
df.groupby(df.timestamp.dt.hour).value.mean().compute()
dask.bag
import dask.bag as db
import json
records = db.read_text('data/2015-*-*.json').map(json.loads)
records.filter(lambda d: d['name'] == 'Alice').pluck('id').frequencies()
Besides built-in collections, dask also provides a wide variety of ways to parallelize custom applications.
Dask delayed
- Call
delayed
on the function, not the result - Call
compute()
on lots of computation at once. Ideally, you want to make manydask.delayed
calls to define your computation and then only calldask.compute
at the end - Don’t change inputs. If you need to use a mutable operation, then make a copy with your function first.
- Avoid global state
- You need to break up computations into many pieces.You achieve parallelism by having many
dask.delayed
calls, not by using only a single one. - Avoid too many tasks.
- Avoid calling
delayed
withindelayed
functions. - Don’t call
delayed
on other Dask collections. When you place a dask array or dask dataframe into a delayed call that function will receive the Numpy or Pandas equivalent. Instead, it’s more common to use methods likeda.map_blocks
ordf.map_partitions
, or to turn your arrays or dataframes into many delayed objects.partitions = df.to_delayed() delayed_values = [dask.delayed(train)(part) for part in partitions]
- Avoid repeatedly putting large inputs into delayed calls. Instead, it better to delay your data as well.