References
DataFlowTasks.DataFlowTasks — Modulemodule DataFlowTaskCreate Tasks which can keep track of how data flows through it.
DataFlowTasks.LOGINFO — Constantconst LOGINFO::Ref{LogInfo}Global LogInfo being used to record the events. Can be changed using _setloginfo!.
DataFlowTasks.TASKGRAPH — Constantconst TASKGRAPH::Ref{TASKGRAPH}The active TaskGraph being used. Nodes will be added to this TaskGraph by default.
Can be changed using set_active_taskgraph!.
DataFlowTasks.AccessMode — Type@enum AccessMode READ WRITE READWRITEDescribe how a DataFlowTask access its data.
DataFlowTasks.DAG — Typestruct DAG{T}Representation of a directed acyclic graph containing nodes of type T. The list of nodes with edges coming into a node i can be retrieved using inneighbors(dag,i); similarly, the list of nodes with edges leaving from i can be retrieved using outneighbors(dag,i).
DAG is a buffered structure with a buffer of size sz_max: calling addnode! on it will block if the DAG has more than sz_max elements.
DataFlowTasks.DAG — MethodDAG{T}(sz)Create a buffered DAG holding a maximum of sz nodes of type T.
DataFlowTasks.DataFlowTask — TypeDataFlowTask(func,data,mode)Create a task-like object similar to Task(func) which accesses data with AccessMode mode.
When a DataFlowTask is created, the elements in its data field will be checked against all other active DataFlowTask to determined if a dependency is present based on a data-flow analysis. The resulting Task will then wait on those dependencies.
A DataFlowTask behaves much like a Julia Task: you can call wait(t), schedule(t) and fetch(t) on it.
DataFlowTasks.FinishedChannel — Typestruct FinishedChannel{T} <: AbstractChannel{T}Used to store tasks which have been completed, but not yet removed from the underlying DAG. Taking from an empty FinishedChannel will block.
DataFlowTasks.InsertionLog — Typestruct InsertionLogLogs the execution trace of a DataFlowTask insertion.
Fields:
time_start: time the insertion begantime_finish: time the insertion finishedtaskid: the task it is insertingtid: the thread on which the insertion is happening
DataFlowTasks.LogInfo — Typestruct LogInfoContains informations on the program's progress. For thread-safety, the LogInfo structure uses one vector of TaskLog per thread.
You can visualize and postprocess a LogInfo using GraphViz.Graph and Makie.plot.
DataFlowTasks.Stop — Typestruct StopSingleton type used to safely interrupt a task reading from an AbstractChannel.
DataFlowTasks.TaskGraph — Typestruct TaskGraphA directed acyclic graph used to reprenset the dependencies between DataFlowTasks.
TaskGraph(sz) creates a task graph that can hold at most sz elements at any given time. In particular, trying to add a new DataFlowTask will block if the TaskGraph is already full.
See also: get_active_taskgraph, set_active_taskgraph!
DataFlowTasks.TaskLog — Typestruct TaskLogLogs the execution trace of a DataFlowTask.
Fields:
tag: task id in DAGtime_start: time the task started runningtime_finish: time the task finished runningtid: thread on which the task raninneighbors: vector of incoming neighbors in DAGlabel: a string used for displaying and/or postprocessing tasks
Base.empty! — Methodempty!(tg::TaskGraph)Interrupt all tasks in tg and remove them from the underlying DAG.
This function is useful to avoid having to restart the REPL when a task in tg errors.
Base.resize! — Methodresize!(tg::TaskGraph, sz)Change the buffer size of tg to sz.
Base.wait — Methodwait(tg::TaskGraph)Wait for all nodes in tg to be finished before continuining.
To wait on the active TaskGraph, use wait(get_active_taskgraph()).
DataFlowTasks._getloginfo — Method_getloginfo()Return the active logger.
DataFlowTasks._setloginfo! — Method_setloginfo!(l::LogInfo)Set the active logger to l.
DataFlowTasks.access_mode — MethodDataFlowTasks.addedge! — Methodaddedge!(dag,i,j)Add (directed) edge connecting node i to node j in the dag.
DataFlowTasks.addedge_transitive! — Methodaddedge_transitive!(dag,i,j)Add edge connecting nodes i and j if there is no path connecting them already.
DataFlowTasks.addnode! — Methodaddnode!(dag,(k,v)::Pair[, check=false])
addnode!(dag,k[, check=false])Add a node to the dag. If passed only a key k, the value v is initialized as empty (no edges added). The check flag is used to indicate if a data flow analysis should be performed to update the dependencies of the newly inserted node.
DataFlowTasks.capacity — Methodcapacity(dag)The maximum number of nodes that dag can contain.
DataFlowTasks.data — Methoddata(t::DataFlowTask[,i])Data accessed by t.
DataFlowTasks.data_dependency — Methoddata_dependency(t1::DataFlowTask,t1::DataFlowTask)Determines if there is a data dependency between t1 and t2 based on the data they read from and write to.
DataFlowTasks.describe — Methoddescribe(loginfo::LogInfo; categories = String[])
describe(io, loginfo::LogInfo; categories = String[])Analyses the information contained in loginfo and displays a summary on io (stdout by default).
Passing a categories argument allows grouping tasks by category. The categories can be a vector of Strings or a vector of String => Regex pairs, which will be matched against the tasks' labels.
DataFlowTasks.enable_debug — Functionenable_debug(mode = true)If mode is true (the default), enable debug mode: errors inside tasks will be shown.
DataFlowTasks.enable_log — Functionenable_log(mode = true)If mode is true (the default), logging is enabled throug the @log macro. Calling enable_log(false) will de-activate logging at compile time to avoid any possible overhead.
Note that changing the log mode at runtime will may invalidate code, possibly triggering recompilation.
See also: @log, with_logging
DataFlowTasks.force_linear_dag — Functionforce_linear_dag(mode=false)If mode is true, nodes are added to the DAG in a linear fashion, i.e. the DAG connects node i to node i+1. This is useful for debugging purposes.
DataFlowTasks.force_sequential — Functionforce_sequential(mode = true)If mode is true, enable sequential mode: no tasks are created and scheduled, code is simply run as it appears in the sources. In effect, this makes @dspawn a no-op.
By default, sequential mode is disabled when the program starts.
See also: force_linear_dag.
DataFlowTasks.get_active_taskgraph — Methodget_active_taskgraph()Return the active TaskGraph.
DataFlowTasks.has_edge — Methodhas_edge(dag,i,j)Check if there is an edge connecting i to j.
DataFlowTasks.inneighbors — Methodinneighbors(dag,i)List of predecessors of i in dag.
DataFlowTasks.isconnected — Methodisconnected(dag,i,j)Check if there is a path in dag connecting i to j.
DataFlowTasks.loggertodot — Methodloggertodot(logger) --> dagstringReturn a string in the DOT format representing the underlying graph in logger.
If GraphViz is installed, you can use GraphViz.Graph(logger) to produce an image.
DataFlowTasks.memory_overlap — Methodmemory_overlap(di,dj)Determine if data di and dj have overlapping memory in the sense that mutating di can change dj (or vice versa). This function is used to build the dependency graph between DataFlowTasks.
A generic version is implemented returning true (but printing a warning). Users should overload this function for the specific data types used in the arguments to allow for appropriate inference of data dependencies.
DataFlowTasks.memory_overlap — Methodmemory_overlap(di::AbstractArray,dj::AbstractArray)Try to determine if the arrays di and dj have overlapping memory.
When both di and dj are Arrays of bitstype, simply compare their addresses. Otherwise, compare their parents by default.
When both di and dj are SubArrays we compare the actual indices of the SubArrays when their parents are the same (to avoid too many false positives).
DataFlowTasks.nodes — Methodnodes(dag::DAG)Return an iterator over the nodes of dag.
DataFlowTasks.num_edges — Methodnum_edges(dag::DAG)Number of edges in the DAG.
DataFlowTasks.num_nodes — Methodnum_nodes(dag::DAG)Number of nodes in the DAG.
DataFlowTasks.outneighbors — Methodoutneighbors(dag,i)List of successors of j in dag.
DataFlowTasks.remove_node! — Methodremove_node!(dag::DAG,i)Remove node i and all of its edges from dag.
DataFlowTasks.savedag — FunctionDataFlowTasks.savedag(filepath, graph)Save graph as an SVG image at filepath. This requires GraphViz to be available.
DataFlowTasks.set_active_taskgraph! — Methodset_active_taskgraph!(tg)Set the active TaskGraph to tg.
DataFlowTasks.stack_weakdeps_env! — MethodDataFlowTasks.stack_weakdeps_env!(; verbose = false, update = false)Push to the load stack an environment providing the weak dependencies of DataFlowTasks. During the development stage, this allows benefiting from the profiling / debugging features of DataFlowTasks without having to install GraphViz or Makie in the project environment.
This can take quite some time if packages have to be installed or precompiled. Run in verbose mode to see what happens.
Additionally, set update=true if you want to update the weakdeps environment.
This feature is experimental and might break in the future.
Examples:
DataFlowTasks.stack_weakdeps_env!()
using GraphVizDataFlowTasks.start_dag_cleaner — Functionstart_dag_cleaner(tg)Start a task associated with tg which takes nodes from its finished queue and removes them from the dag. The task blocks if finished is empty.
DataFlowTasks.update_edges! — Methodupdate_edges!(dag::DAG,i)Perform the data-flow analysis to update the edges of node i. Both incoming and outgoing edges are updated.
DataFlowTasks.with_logging! — Methodwith_logging!(f,l::LogInfo)Similar to with_logging, but append events to l.
DataFlowTasks.with_logging — Methodwith_logging(f) --> f(),loginfoExecute f() and log DataFlowTasks into the loginfo object.
Examples:
using DataFlowTasks
A,B = zeros(2), ones(2);
out,loginfo = DataFlowTasks.with_logging() do
@dspawn fill!(@W(A),1)
@dspawn fill!(@W(B),1)
res = @dspawn sum(@R(A)) + sum(@R(B))
fetch(res)
end
#
outSee also: LogInfo
DataFlowTasks.with_taskgraph — Methodwith_taskgraph(f,tg::TaskGraph)Run f, but push DataFlowTasks to tg.
DataFlowTasks.@dasync — MacroDataFlowTasks.@dspawn — Macro@dspawn expr [kwargs...]Spawn a Julia Task to execute the code given by expr, and schedule it to run on any available thread.
Annotate the code in expr with @R, @W and/or @RW to indicate how it accesses data (see examples below). This information is used to automatically infer task dependencies.
Additionally, the following keyword arguments can be provided:
label: provide a label to identify the task. This is useful when logging scheduling information;priority: inform the scheduler about the relative priority of the task. This information is not (yet) leveraged by the default scheduler.
Examples:
Below are 3 equivalent ways to create the same Task, which expresses a Read-Write dependency on C and Read dependencies on A and B
using LinearAlgebra
using DataFlowTasks
A = ones(5, 5)
B = ones(5, 5)
C = zeros(5, 5)
α, β = (1, 0)
# Option 1: annotate arguments in a function call
@dspawn mul!(@RW(C), @R(A), @R(B), α, β)
# Option 2: specify data access modes in the code block
@dspawn begin
@RW C
@R A B
mul!(C, A, B, α, β)
end
# Option 3: specify data access modes after the code block
# (i.e. alongside keyword arguments)
res = @dspawn mul!(C, A, B, α, β) @RW(C) @R(A,B)
fetch(res) # a 5×5 matrix of 5.0Here is a more complete example, demonstrating a full computation involving 2 different tasks.
using DataFlowTasks
A = rand(5)
# create a task with WRITE access mode to A
# and label "writer"
t1 = @dspawn begin
@W A
sleep(1)
fill!(A,0)
println("finished writing")
end label="writer"
# create a task with READ access mode to A
t2 = @dspawn begin
@R A
println("I automatically wait for `t1` to finish")
sum(A)
end priority=1
fetch(t2) # 0
# output
finished writing
I automatically wait for `t1` to finish
0.0Note that in the example above t2 waited for t1 because it read a data field that t1 accessed in a writable manner.
DataFlowTasks.@dtask — Macro@dtask expr [kwargs...]Create a DataFlowTask to execute expr, where data have been tagged to specify how they are accessed. Note that the task is not automatically scheduled for execution.
See @dspawn for information on how to annotate expr to specify data dependencies, and a list of supported keyword arguments.
DataFlowTasks.@log — MacroDataFlowTasks.@log expr --> LogInfoExecute expr and return a LogInfo instance with the recorded events. The Logger waits for the current taskgraph (see get_active_taskgraph to be empty before starting.
The returned LogInfo instance may be incomplete if block returns before all DataFlowTasks spawened inside of it are completed. Typically expr should fetch the outcome before returning to properly benchmark the code that it runs (and not merely the tasks that it spawns).
See also: with_logging, with_logging!
GraphViz.Graph — MethodGraphViz.Graph(log_info::LogInfo)Produce a GraphViz.Graph representing the DAG of tasks collected in log_info.
See also: DataFlowTasks.@log
MakieCore.plot — Methodplot(log_info; categories)Plot DataFlowTasks log_info labeled informations with categories.
Entries in categories define how to group tasks in categories for plotting. Each entry can be:
- a
String: in this case, all tasks having labels in which the string occurs are grouped together. The string is also used as a label for the category itself. - a
String => Regexpair: in this case, all tasks having labels matching the regex are grouped together. The string is used as a label for the category itself.
See the documentation for more information on how to profile and visualize DataFlowTasks.