DataFlowTasks

Tasks which automatically respect data-flow dependencies

Basic usage

This package defines a @dspawn macro which behaves very much like Threads.@spawn, except that it allows the user to specify explicit data dependencies for the spawned Task. This information is then used to automatically infer task dependencies by constructing and analyzing a directed acyclic graph based on how tasks access the underlying data. The premise is that it is sometimes simpler to specify how tasks depend on data than to specify how tasks depend on each other.

When creating a Task using @dspawn, the following annotations can be used to declare how the Task accesses the data:

  • read-only: @R or @READ
  • write-only: @W or @WRITE
  • read-write: @RW or @READWRITE

An @R(A) annotation for example implies that data A will be accessed in read-only mode by the task. Here is a simple example:

using DataFlowTasks

n = 100_000
A = ones(n)

d1 = @dspawn begin
    @RW A

    # in-place work on A
    for i in eachindex(A)
        A[i] = log(A[i]) # A[i] = 0
    end
end

# reduce A
d2 = @dspawn sum(@R A)
# The above is a shortcut for:
#   d2 = @dspawn begin
#       @R A
#       sum(A)
#   end

c = fetch(d2) # 0
0.0

Two (asynchronous) tasks were created, both of which access the array A. Because d1 writes to A, and d2 reads from it, the outcome C is nondeterministic unless we specify an order of precedence. DataFlowTasks reinforces the sequential consistency criterion, which is to say that executing tasks in parallel must preserve, up to rounding errors, the result that would have been obtained if they were executed sequentially, following the order in which they were created. In the example above, this means d2 will wait on d1 because of an inferred data dependency. The outcome is thus always zero.

Note

If you replace DataFlowTasks.@dspawn by Threads.@spawn in the example above (and pick an n large enough) you will see that you no longer get 0 because d2 may access an element of A before it has been replaced by zero!

Tip

In the d2 example above, a shortcut syntax was introduced, which allows putting access mode annotations directly around arguments in a function call. This is especially useful when the task body is a one-liner. See @dspawn for an exhaustive list of supported ways to create tasks and specify data dependencies.

No parallelism was allowed in the previous example due to a data conflict. To see that when parallelism is possible, DataFlowTasks will exploit it, consider this one last example:

using DataFlowTasks

function run(A)
    d1 = @dspawn begin
        @W A # write to A
        sleep(1)
        fill!(A,0)
    end

    # a reduction on A
    d2 = @dspawn begin
        @R A # read from A
        sleep(10)
        sum(A)
    end

    # another reduction on A
    d3 = @dspawn sum(@R(A))

    t = @elapsed c = fetch(d3)

    @show t,c
end

A = ones(10)
run(copy(A)) # run once to precompile
run(A)
(1.00235935, 0.0)

We see that the elapsed time to fetch the result from d3 is on the order of one second. This is expected since d3 needs to wait on d1 but can be executed concurrently with d2. The result is, as expected, 0.

All examples this far have been simple enough that the dependencies between the tasks could (and probably should) have been inserted by hand. There are certain problems, however, where the constant reuse of memory (mostly for performance reasons) makes a data-flow approach to parallelism a rather natural way to implicitly describe task dependencies. This is the case, for instance, of tiled (also called blocked) matrix factorization algorithms, where task dependencies can become rather difficult to describe in an explicit manner. The tiled factorization section showcases some non-trivial problems for which DataFlowTasks may be useful.

Tip

The main goal of DataFlowTasks is to expose parallelism: two tasks ti and tj can be executed concurrently if one does not write to memory that the other reads. This data-dependency check is done dynamically, and therefore is not limited to tasks in the same lexical scope. Of course, there is an overhead associated with these checks, so whether performance gains can be obtained depend largely on how parallel the algorithm is, as well as how long each individual task takes (compared to the overhead).

Custom types

In order to infer dependencies between DataFlowTasks, we must be able to determine whether two objects A and B share a common memory space. That is to say, we must know if mutating A can affect B, or vice-versa. This check is performed by the memory_overlap function:

using DataFlowTasks: memory_overlap

A = rand(10,10)
B = view(A,1:10)
C = view(A,11:20)

memory_overlap(A,B), memory_overlap(A,C), memory_overlap(B,C)
(true, true, false)

The example above works because memory_overlap has been defined for some basic AbstractArrays types inside DataFlowTasks. If a specialized method for memory_overlap is not found, DataFlowTasks errs on the safe side and falls back to a generic implementation that always returns true:

julia> using DataFlowTasks: memory_overlap
julia> struct CirculantMatrix # a custom type data::Vector{Float64} end
julia> v = rand(10);
julia> M = CirculantMatrix(v);
julia> memory_overlap(M,copy(v))┌ Warning: using fallback `memory_overlap(::Any,::Any) = true`. Consider implementing │ `DataFlowTasks.memory_overlap(::Main.var"Main".CirculantMatrix,::Vector{Float64})`. └ @ DataFlowTasks ~/runner-dataflow-tasks/_work/DataFlowTasks.jl/DataFlowTasks.jl/src/dataflowtask.jl:144 true

The warning message printed above hints at what should be done:

julia> import DataFlowTasks: memory_overlap
julia> memory_overlap(M::CirculantMatrix,v) = memory_overlap(M.data,v) # overloadmemory_overlap (generic function with 9 methods)
julia> memory_overlap(v,M::CirculantMatrix) = memory_overlap(M,v)memory_overlap (generic function with 10 methods)
julia> memory_overlap(M,v), memory_overlap(M,copy(v))(true, false)

You can now spawn tasks with your custom type CirculantMatrix as a data dependency, and things should work as expected:

using DataFlowTasks

v  = ones(5);
M1 = CirculantMatrix(v);
M2 = CirculantMatrix(copy(v));

Base.sum(M::CirculantMatrix) = length(M.data)*sum(M.data)

d1 = @dspawn begin
    @W v
    sleep(0.5)
    fill!(v,0)
end;
d2 = @dspawn sum(@R M1)
d3 = @dspawn sum(@R M2)

fetch(d3) # 25
fetch(d2) # 0

Task graph

Each time a Task is spawned using @dspawn, it is added to an internal TaskGraph (see get_active_taskgraph) so that its data-dependencies can be tracked and analyzed. There are two important things to know about TaskGraph objects. First, they are buffered to handle at most sz_max tasks at a time: trying to add a task to the TaskGraph when it is full will block. This is done to keep the cost of analyzing the data dependencies under control. You can modify the buffer size as follows:

taskgraph = DataFlowTasks.get_active_taskgraph()
resize!(taskgraph,200)
DataFlowTasks.TaskGraph with 2 active nodes, 0 finished nodes, and 0 edges (capacity of 200 nodes)

Second, when the computation of a task in the TaskGraph is completed, it gets pushed into a finished channel, to be eventually processed and poped from the graph by the dag_cleaner. If you want to stop nodes from being removed from the DAG, you may stop the dag_cleaner using:

DataFlowTasks.stop_dag_cleaner(taskgraph)
Task (done) @0x00007f40d2308330

Finished nodes will now remain in the DAG:

A = ones(5)
@dspawn begin
    @RW A
    A .= 2 .* A
end
@dspawn sum(@R A)
taskgraph
DataFlowTasks.TaskGraph with 4 active nodes, 2 finished nodes, and 1 edge (capacity of 200 nodes)

Note that stopping the dag_cleaner means finished nodes are no longer removed; since the task graph is a buffered structure, this may cause the execution to halt if it is at full capacity. You can then either resize! it, or simply start the worker (which will result in the processing of the finished channel):

DataFlowTasks.start_dag_cleaner(taskgraph)
taskgraph
DataFlowTasks.TaskGraph with 2 active nodes, 0 finished nodes, and 0 edges (capacity of 200 nodes)
Tip

There are situations where you may want to use a different TaskGraph temporarily to execute a block of code, and restore the default after. This can be done using the with_taskgraph method.

Limitations

Some current limitations are listed below:

  • At present, there is no way to specify priorities for a task. We plan to support such a feature in the near future. Check this issue to track the development.
  • The main thread executes tasks, and is responsible for adding/removing nodes from the DAG. This may hinder parallelism if the main thread is given a long task since the processing of the dag will halt until the main thread becomes free again. This could be a problem if the task execution times are very inhomogeneous; a solution, not currently implemented, would be to allow for blocking the main thread from executing tasks.
  • Nesting DataFlowTasks, although possible, can be tricky. See Nested Tasks for a more in-depth discussion.