References
DataFlowTasks.DataFlowTasks
— Modulemodule DataFlowTask
Create Task
s which can keep track of how data flows through it.
DataFlowTasks.LOGINFO
— Constantconst LOGINFO::Ref{LogInfo}
Global LogInfo
being used to record the events. Can be changed using _setloginfo!
.
DataFlowTasks.TASKGRAPH
— Constantconst TASKGRAPH::Ref{TASKGRAPH}
The active TaskGraph
being used. Nodes will be added to this TaskGraph
by default.
Can be changed using set_active_taskgraph!
.
DataFlowTasks.AccessMode
— Type@enum AccessMode READ WRITE READWRITE
Describe how a DataFlowTask
access its data
.
DataFlowTasks.DAG
— Typestruct DAG{T}
Representation of a directed acyclic graph containing nodes of type T
. The list of nodes with edges coming into a node i
can be retrieved using inneighbors(dag,i)
; similarly, the list of nodes with edges leaving from i
can be retrieved using outneighbors(dag,i)
.
DAG
is a buffered structure with a buffer of size sz_max
: calling addnode!
on it will block if the DAG
has more than sz_max
elements.
DataFlowTasks.DAG
— MethodDAG{T}(sz)
Create a buffered DAG
holding a maximum of sz
nodes of type T
.
DataFlowTasks.DataFlowTask
— TypeDataFlowTask(func,data,mode)
Create a task-like object similar to Task(func)
which accesses data
with AccessMode
mode
.
When a DataFlowTask
is created, the elements in its data
field will be checked against all other active DataFlowTask
to determined if a dependency is present based on a data-flow analysis. The resulting Task
will then wait on those dependencies.
A DataFlowTask
behaves much like a Julia Task
: you can call wait(t)
, schedule(t)
and fetch(t)
on it.
DataFlowTasks.FinishedChannel
— Typestruct FinishedChannel{T} <: AbstractChannel{T}
Used to store tasks which have been completed, but not yet removed from the underlying DAG
. Taking from an empty FinishedChannel
will block.
DataFlowTasks.InsertionLog
— Typestruct InsertionLog
Logs the execution trace of a DataFlowTask
insertion.
Fields:
time_start
: time the insertion begantime_finish
: time the insertion finishedtaskid
: the task it is insertingtid
: the thread on which the insertion is happening
DataFlowTasks.LogInfo
— Typestruct LogInfo
Contains informations on the program's progress. For thread-safety, the LogInfo
structure uses one vector of TaskLog
per thread.
You can visualize and postprocess a LogInfo
using GraphViz.Graph
and Makie.plot
.
DataFlowTasks.Stop
— Typestruct Stop
Singleton type used to safely interrupt a task reading from an AbstractChannel
.
DataFlowTasks.TaskGraph
— Typestruct TaskGraph
A directed acyclic graph used to reprenset the dependencies between DataFlowTask
s.
TaskGraph(sz)
creates a task graph that can hold at most sz
elements at any given time. In particular, trying to add a new DataFlowTask
will block if the TaskGraph
is already full.
See also: get_active_taskgraph
, set_active_taskgraph!
DataFlowTasks.TaskLog
— Typestruct TaskLog
Logs the execution trace of a DataFlowTask
.
Fields:
tag
: task id in DAGtime_start
: time the task started runningtime_finish
: time the task finished runningtid
: thread on which the task raninneighbors
: vector of incoming neighbors in DAGlabel
: a string used for displaying and/or postprocessing tasks
Base.empty!
— Methodempty!(tg::TaskGraph)
Interrupt all tasks in tg
and remove them from the underlying DAG
.
This function is useful to avoid having to restart the REPL when a task in tg
errors.
Base.resize!
— Methodresize!(tg::TaskGraph, sz)
Change the buffer size of tg
to sz
.
Base.wait
— Methodwait(tg::TaskGraph)
Wait for all nodes in tg
to be finished before continuining.
To wait on the active TaskGraph
, use wait(get_active_taskgraph())
.
DataFlowTasks._getloginfo
— Method_getloginfo()
Return the active logger.
DataFlowTasks._setloginfo!
— Method_setloginfo!(l::LogInfo)
Set the active logger to l
.
DataFlowTasks.access_mode
— MethodDataFlowTasks.addedge!
— Methodaddedge!(dag,i,j)
Add (directed) edge connecting node i
to node j
in the dag
.
DataFlowTasks.addedge_transitive!
— Methodaddedge_transitive!(dag,i,j)
Add edge connecting nodes i
and j
if there is no path connecting them already.
DataFlowTasks.addnode!
— Methodaddnode!(dag,(k,v)::Pair[, check=false])
addnode!(dag,k[, check=false])
Add a node to the dag
. If passed only a key k
, the value v
is initialized as empty (no edges added). The check
flag is used to indicate if a data flow analysis should be performed to update the dependencies of the newly inserted node.
DataFlowTasks.capacity
— Methodcapacity(dag)
The maximum number of nodes that dag
can contain.
DataFlowTasks.data
— Methoddata(t::DataFlowTask[,i])
Data accessed by t
.
DataFlowTasks.data_dependency
— Methoddata_dependency(t1::DataFlowTask,t1::DataFlowTask)
Determines if there is a data dependency between t1
and t2
based on the data they read from and write to.
DataFlowTasks.describe
— Methoddescribe(loginfo::LogInfo; categories = String[])
describe(io, loginfo::LogInfo; categories = String[])
Analyses the information contained in loginfo
and displays a summary on io
(stdout
by default).
Passing a categories
argument allows grouping tasks by category. The categories
can be a vector of String
s or a vector of String => Regex
pairs, which will be matched against the tasks' labels.
DataFlowTasks.enable_debug
— Functionenable_debug(mode = true)
If mode
is true
(the default), enable debug mode: errors inside tasks will be shown.
DataFlowTasks.enable_log
— Functionenable_log(mode = true)
If mode
is true
(the default), logging is enabled throug the @log
macro. Calling enable_log(false)
will de-activate logging at compile time to avoid any possible overhead.
Note that changing the log mode at runtime will may invalidate code, possibly triggering recompilation.
See also: @log
, with_logging
DataFlowTasks.force_linear_dag
— Functionforce_linear_dag(mode=false)
If mode
is true
, nodes are added to the DAG in a linear fashion, i.e. the DAG connects node i
to node i+1
. This is useful for debugging purposes.
DataFlowTasks.force_sequential
— Functionforce_sequential(mode = true)
If mode
is true
, enable sequential mode: no tasks are created and scheduled, code is simply run as it appears in the sources. In effect, this makes @dspawn
a no-op.
By default, sequential mode is disabled when the program starts.
See also: force_linear_dag
.
DataFlowTasks.get_active_taskgraph
— Methodget_active_taskgraph()
Return the active TaskGraph
.
DataFlowTasks.has_edge
— Methodhas_edge(dag,i,j)
Check if there is an edge connecting i
to j
.
DataFlowTasks.inneighbors
— Methodinneighbors(dag,i)
List of predecessors of i
in dag
.
DataFlowTasks.isconnected
— Methodisconnected(dag,i,j)
Check if there is a path in dag
connecting i
to j
.
DataFlowTasks.loggertodot
— Methodloggertodot(logger) --> dagstring
Return a string in the DOT format representing the underlying graph in logger
.
If GraphViz
is installed, you can use GraphViz.Graph(logger)
to produce an image.
DataFlowTasks.memory_overlap
— Methodmemory_overlap(di,dj)
Determine if data di
and dj
have overlapping memory in the sense that mutating di
can change dj
(or vice versa). This function is used to build the dependency graph between DataFlowTask
s.
A generic version is implemented returning true
(but printing a warning). Users should overload this function for the specific data types used in the arguments to allow for appropriate inference of data dependencies.
DataFlowTasks.memory_overlap
— Methodmemory_overlap(di::AbstractArray,dj::AbstractArray)
Try to determine if the arrays di
and dj
have overlapping memory.
When both di
and dj
are Array
s of bitstype, simply compare their addresses. Otherwise, compare their parent
s by default.
When both di
and dj
are SubArray
s we compare the actual indices of the SubArray
s when their parents are the same (to avoid too many false positives).
DataFlowTasks.nodes
— Methodnodes(dag::DAG)
Return an iterator over the nodes of dag
.
DataFlowTasks.num_edges
— Methodnum_edges(dag::DAG)
Number of edges in the DAG
.
DataFlowTasks.num_nodes
— Methodnum_nodes(dag::DAG)
Number of nodes in the DAG
.
DataFlowTasks.outneighbors
— Methodoutneighbors(dag,i)
List of successors of j
in dag
.
DataFlowTasks.remove_node!
— Methodremove_node!(dag::DAG,i)
Remove node i
and all of its edges from dag
.
DataFlowTasks.savedag
— FunctionDataFlowTasks.savedag(filepath, graph)
Save graph
as an SVG image at filepath
. This requires GraphViz
to be available.
DataFlowTasks.set_active_taskgraph!
— Methodset_active_taskgraph!(tg)
Set the active TaskGraph
to tg
.
DataFlowTasks.stack_weakdeps_env!
— MethodDataFlowTasks.stack_weakdeps_env!(; verbose = false, update = false)
Push to the load stack an environment providing the weak dependencies of DataFlowTasks. During the development stage, this allows benefiting from the profiling / debugging features of DataFlowTasks without having to install GraphViz
or Makie
in the project environment.
This can take quite some time if packages have to be installed or precompiled. Run in verbose
mode to see what happens.
Additionally, set update=true
if you want to update the weakdeps
environment.
This feature is experimental and might break in the future.
Examples:
DataFlowTasks.stack_weakdeps_env!()
using GraphViz
DataFlowTasks.start_dag_cleaner
— Functionstart_dag_cleaner(tg)
Start a task associated with tg
which takes nodes from its finished
queue and removes them from the dag
. The task blocks if finished
is empty.
DataFlowTasks.update_edges!
— Methodupdate_edges!(dag::DAG,i)
Perform the data-flow analysis to update the edges of node i
. Both incoming and outgoing edges are updated.
DataFlowTasks.with_logging!
— Methodwith_logging!(f,l::LogInfo)
Similar to with_logging
, but append events to l
.
DataFlowTasks.with_logging
— Methodwith_logging(f) --> f(),loginfo
Execute f()
and log DataFlowTask
s into the loginfo
object.
Examples:
using DataFlowTasks
A,B = zeros(2), ones(2);
out,loginfo = DataFlowTasks.with_logging() do
@dspawn fill!(@W(A),1)
@dspawn fill!(@W(B),1)
res = @dspawn sum(@R(A)) + sum(@R(B))
fetch(res)
end
#
out
See also: LogInfo
DataFlowTasks.with_taskgraph
— Methodwith_taskgraph(f,tg::TaskGraph)
Run f
, but push DataFlowTask
s to tg
.
DataFlowTasks.@dasync
— MacroDataFlowTasks.@dspawn
— Macro@dspawn expr [kwargs...]
Spawn a Julia Task
to execute the code given by expr
, and schedule it to run on any available thread.
Annotate the code in expr
with @R
, @W
and/or @RW
to indicate how it accesses data (see examples below). This information is used to automatically infer task dependencies.
Additionally, the following keyword arguments can be provided:
label
: provide a label to identify the task. This is useful when logging scheduling information;priority
: inform the scheduler about the relative priority of the task. This information is not (yet) leveraged by the default scheduler.
Examples:
Below are 3 equivalent ways to create the same Task
, which expresses a Read-Write dependency on C
and Read dependencies on A
and B
using LinearAlgebra
using DataFlowTasks
A = ones(5, 5)
B = ones(5, 5)
C = zeros(5, 5)
α, β = (1, 0)
# Option 1: annotate arguments in a function call
@dspawn mul!(@RW(C), @R(A), @R(B), α, β)
# Option 2: specify data access modes in the code block
@dspawn begin
@RW C
@R A B
mul!(C, A, B, α, β)
end
# Option 3: specify data access modes after the code block
# (i.e. alongside keyword arguments)
res = @dspawn mul!(C, A, B, α, β) @RW(C) @R(A,B)
fetch(res) # a 5×5 matrix of 5.0
Here is a more complete example, demonstrating a full computation involving 2 different tasks.
using DataFlowTasks
A = rand(5)
# create a task with WRITE access mode to A
# and label "writer"
t1 = @dspawn begin
@W A
sleep(1)
fill!(A,0)
println("finished writing")
end label="writer"
# create a task with READ access mode to A
t2 = @dspawn begin
@R A
println("I automatically wait for `t1` to finish")
sum(A)
end priority=1
fetch(t2) # 0
# output
finished writing
I automatically wait for `t1` to finish
0.0
Note that in the example above t2
waited for t1
because it read a data field that t1
accessed in a writable manner.
DataFlowTasks.@dtask
— Macro@dtask expr [kwargs...]
Create a DataFlowTask
to execute expr
, where data have been tagged to specify how they are accessed. Note that the task is not automatically scheduled for execution.
See @dspawn
for information on how to annotate expr
to specify data dependencies, and a list of supported keyword arguments.
DataFlowTasks.@log
— MacroDataFlowTasks.@log expr --> LogInfo
Execute expr
and return a LogInfo
instance with the recorded events. The Logger
waits for the current taskgraph (see get_active_taskgraph
to be empty before starting.
The returned LogInfo
instance may be incomplete if block
returns before all DataFlowTasks
spawened inside of it are completed. Typically expr
should fetch
the outcome before returning to properly benchmark the code that it runs (and not merely the tasks that it spawns).
See also: with_logging
, with_logging!
GraphViz.Graph
— MethodGraphViz.Graph(log_info::LogInfo)
Produce a GraphViz.Graph
representing the DAG of tasks collected in log_info
.
See also: DataFlowTasks.@log
MakieCore.plot
— Methodplot(log_info; categories)
Plot DataFlowTasks log_info
labeled informations with categories.
Entries in categories
define how to group tasks in categories for plotting. Each entry can be:
- a
String
: in this case, all tasks having labels in which the string occurs are grouped together. The string is also used as a label for the category itself. - a
String => Regex
pair: in this case, all tasks having labels matching the regex are grouped together. The string is used as a label for the category itself.
See the documentation for more information on how to profile and visualize DataFlowTasks
.