References

DataFlowTasks.DAGType
struct DAG{T}

Representation of a directed acyclic graph containing nodes of type T. The list of nodes with edges coming into a node i can be retrieved using inneighbors(dag,i); similarly, the list of nodes with edges leaving from i can be retrieved using outneighbors(dag,i).

DAG is a buffered structure with a buffer of size sz_max: calling addnode! on it will block if the DAG has more than sz_max elements.

source
DataFlowTasks.DataFlowTaskType
DataFlowTask(func,data,mode)

Create a task-like object similar to Task(func) which accesses data with AccessMode mode.

When a DataFlowTask is created, the elements in its data field will be checked against all other active DataFlowTask to determined if a dependency is present based on a data-flow analysis. The resulting Task will then wait on those dependencies.

A DataFlowTask behaves much like a Julia Task: you can call wait(t), schedule(t) and fetch(t) on it.

See also: @dtask, @dspawn, @dasync.

source
DataFlowTasks.FinishedChannelType
struct FinishedChannel{T} <: AbstractChannel{T}

Used to store tasks which have been completed, but not yet removed from the underlying DAG. Taking from an empty FinishedChannel will block.

source
DataFlowTasks.InsertionLogType
struct InsertionLog

Logs the execution trace of a DataFlowTask insertion.

Fields:

  • time_start : time the insertion began
  • time_finish : time the insertion finished
  • taskid : the task it is inserting
  • tid : the thread on which the insertion is happening
source
DataFlowTasks.LogInfoType
struct LogInfo

Contains informations on the program's progress. For thread-safety, the LogInfo structure uses one vector of TaskLog per thread.

You can visualize and postprocess a LogInfo using GraphViz.Graph and Makie.plot.

source
DataFlowTasks.StopType
struct Stop

Singleton type used to safely interrupt a task reading from an AbstractChannel.

source
DataFlowTasks.TaskLogType
struct TaskLog

Logs the execution trace of a DataFlowTask.

Fields:

  • tag : task id in DAG
  • time_start : time the task started running
  • time_finish : time the task finished running
  • tid : thread on which the task ran
  • inneighbors : vector of incoming neighbors in DAG
  • label : a string used for displaying and/or postprocessing tasks
source
Base.empty!Method
empty!(tg::TaskGraph)

Interrupt all tasks in tg and remove them from the underlying DAG.

This function is useful to avoid having to restart the REPL when a task in tg errors.

source
Base.resize!Method
resize!(tg::TaskGraph, sz)

Change the buffer size of tg to sz.

source
Base.waitMethod
wait(tg::TaskGraph)

Wait for all nodes in tg to be finished before continuining.

To wait on the active TaskGraph, use wait(get_active_taskgraph()).

source
DataFlowTasks.addnode!Method
addnode!(dag,(k,v)::Pair[, check=false])
addnode!(dag,k[, check=false])

Add a node to the dag. If passed only a key k, the value v is initialized as empty (no edges added). The check flag is used to indicate if a data flow analysis should be performed to update the dependencies of the newly inserted node.

source
DataFlowTasks.data_dependencyMethod
data_dependency(t1::DataFlowTask,t1::DataFlowTask)

Determines if there is a data dependency between t1 and t2 based on the data they read from and write to.

source
DataFlowTasks.describeMethod
describe(loginfo::LogInfo; categories = String[])
describe(io, loginfo::LogInfo; categories = String[])

Analyses the information contained in loginfo and displays a summary on io (stdout by default).

Passing a categories argument allows grouping tasks by category. The categories can be a vector of Strings or a vector of String => Regex pairs, which will be matched against the tasks' labels.

source
DataFlowTasks.enable_logFunction
enable_log(mode = true)

If mode is true (the default), logging is enabled throug the @log macro. Calling enable_log(false) will de-activate logging at compile time to avoid any possible overhead.

Note that changing the log mode at runtime will may invalidate code, possibly triggering recompilation.

See also: @log, with_logging

source
DataFlowTasks.force_linear_dagFunction
force_linear_dag(mode=false)

If mode is true, nodes are added to the DAG in a linear fashion, i.e. the DAG connects node i to node i+1. This is useful for debugging purposes.

source
DataFlowTasks.force_sequentialFunction
force_sequential(mode = true)

If mode is true, enable sequential mode: no tasks are created and scheduled, code is simply run as it appears in the sources. In effect, this makes @dspawn a no-op.

By default, sequential mode is disabled when the program starts.

See also: force_linear_dag.

source
DataFlowTasks.loggertodotMethod
loggertodot(logger)  --> dagstring

Return a string in the DOT format representing the underlying graph in logger.

If GraphViz is installed, you can use GraphViz.Graph(logger) to produce an image.

source
DataFlowTasks.memory_overlapMethod
memory_overlap(di,dj)

Determine if data di and dj have overlapping memory in the sense that mutating di can change dj (or vice versa). This function is used to build the dependency graph between DataFlowTasks.

A generic version is implemented returning true (but printing a warning). Users should overload this function for the specific data types used in the arguments to allow for appropriate inference of data dependencies.

source
DataFlowTasks.memory_overlapMethod
memory_overlap(di::AbstractArray,dj::AbstractArray)

Try to determine if the arrays di and dj have overlapping memory.

When both di and dj are Arrays of bitstype, simply compare their addresses. Otherwise, compare their parents by default.

When both di and dj are SubArrays we compare the actual indices of the SubArrays when their parents are the same (to avoid too many false positives).

source
DataFlowTasks.savedagFunction
DataFlowTasks.savedag(filepath, graph)

Save graph as an SVG image at filepath. This requires GraphViz to be available.

source
DataFlowTasks.stack_weakdeps_env!Method
DataFlowTasks.stack_weakdeps_env!(; verbose = false, update = false)

Push to the load stack an environment providing the weak dependencies of DataFlowTasks. During the development stage, this allows benefiting from the profiling / debugging features of DataFlowTasks without having to install GraphViz or Makie in the project environment.

This can take quite some time if packages have to be installed or precompiled. Run in verbose mode to see what happens.

Additionally, set update=true if you want to update the weakdeps environment.

Warning

This feature is experimental and might break in the future.

Examples:

DataFlowTasks.stack_weakdeps_env!()
using GraphViz
source
DataFlowTasks.start_dag_cleanerFunction
start_dag_cleaner(tg)

Start a task associated with tg which takes nodes from its finished queue and removes them from the dag. The task blocks if finished is empty.

source
DataFlowTasks.update_edges!Method
update_edges!(dag::DAG,i)

Perform the data-flow analysis to update the edges of node i. Both incoming and outgoing edges are updated.

source
DataFlowTasks.with_loggingMethod
with_logging(f) --> f(),loginfo

Execute f() and log DataFlowTasks into the loginfo object.

Examples:

using DataFlowTasks

A,B = zeros(2), ones(2);

out,loginfo = DataFlowTasks.with_logging() do
    @dspawn fill!(@W(A),1)
    @dspawn fill!(@W(B),1)
    res = @dspawn sum(@R(A)) + sum(@R(B))
    fetch(res)
end

#

out

See also: LogInfo

source
DataFlowTasks.@dspawnMacro
@dspawn expr [kwargs...]

Spawn a Julia Task to execute the code given by expr, and schedule it to run on any available thread.

Annotate the code in expr with @R, @W and/or @RW to indicate how it accesses data (see examples below). This information is used to automatically infer task dependencies.

Additionally, the following keyword arguments can be provided:

  • label: provide a label to identify the task. This is useful when logging scheduling information;
  • priority: inform the scheduler about the relative priority of the task. This information is not (yet) leveraged by the default scheduler.

Examples:

Below are 3 equivalent ways to create the same Task, which expresses a Read-Write dependency on C and Read dependencies on A and B

using LinearAlgebra
using DataFlowTasks
A = ones(5, 5)
B = ones(5, 5)
C = zeros(5, 5)
α, β = (1, 0)

# Option 1: annotate arguments in a function call
@dspawn mul!(@RW(C), @R(A), @R(B), α, β)

# Option 2: specify data access modes in the code block
@dspawn begin
   @RW C
   @R  A B
   mul!(C, A, B, α, β)
end

# Option 3: specify data access modes after the code block
# (i.e. alongside keyword arguments)
res = @dspawn mul!(C, A, B, α, β) @RW(C) @R(A,B)

fetch(res) # a 5×5 matrix of 5.0

Here is a more complete example, demonstrating a full computation involving 2 different tasks.

using DataFlowTasks

A = rand(5)

# create a task with WRITE access mode to A
# and label "writer"
t1 = @dspawn begin
    @W A
    sleep(1)
    fill!(A,0)
    println("finished writing")
end  label="writer"

# create a task with READ access mode to A
t2 = @dspawn begin
    @R A
    println("I automatically wait for `t1` to finish")
    sum(A)
end  priority=1

fetch(t2) # 0

# output

finished writing
I automatically wait for `t1` to finish
0.0

Note that in the example above t2 waited for t1 because it read a data field that t1 accessed in a writable manner.

source
DataFlowTasks.@dtaskMacro
@dtask expr [kwargs...]

Create a DataFlowTask to execute expr, where data have been tagged to specify how they are accessed. Note that the task is not automatically scheduled for execution.

See @dspawn for information on how to annotate expr to specify data dependencies, and a list of supported keyword arguments.

See also: @dspawn, @dasync

source
DataFlowTasks.@logMacro
DataFlowTasks.@log expr --> LogInfo

Execute expr and return a LogInfo instance with the recorded events. The Logger waits for the current taskgraph (see get_active_taskgraph to be empty before starting.

Warning

The returned LogInfo instance may be incomplete if block returns before all DataFlowTasks spawened inside of it are completed. Typically expr should fetch the outcome before returning to properly benchmark the code that it runs (and not merely the tasks that it spawns).

See also: with_logging, with_logging!

source
MakieCore.plotMethod
plot(log_info; categories)

Plot DataFlowTasks log_info labeled informations with categories.

Entries in categories define how to group tasks in categories for plotting. Each entry can be:

  • a String: in this case, all tasks having labels in which the string occurs are grouped together. The string is also used as a label for the category itself.
  • a String => Regex pair: in this case, all tasks having labels matching the regex are grouped together. The string is used as a label for the category itself.

See the documentation for more information on how to profile and visualize DataFlowTasks.

source