-
Notifications
You must be signed in to change notification settings - Fork 122
DaskVine Dag Reconstruction, Visualization + other fixes #4145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
DaskVine Dag Reconstruction, Visualization + other fixes #4145
Conversation
self.set_result(p, self.get_result(node.key)) | ||
) # case e.g, "x": "y", and we just set the value of "y" | ||
x = self.set_result(p, self.get_result(node.target)) | ||
# rs.update(x) # case e.g, "x": "y", and we just set the value of "y" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this correct?
@@ -162,10 +234,14 @@ def set_result(self, key, value): | |||
else: | |||
self._depth_of[r] = 0 | |||
|
|||
for c in self._dependencies_of[key]: | |||
self._pending_needed_by[c].discard(key) | |||
if not DaskVineDag.taskref(self._working_graph[key]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this if needed?
|
||
return rs.values() | ||
if DaskVineDag.taskref(self._working_graph[key]): | ||
return rs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this mixing old and new graph styles? If so, this file should only have the new style, and the one in compat
the old style.
@@ -211,6 +290,33 @@ def set_targets(self, keys): | |||
def get_targets(self): | |||
return self._targets | |||
|
|||
def visualize(self, name='DaskVine_DAG'): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this be different from dask.visualize
?
dag = DaskVineDag(dsk) | ||
dag = DaskVineDag(dsk, reconstruct=self.reconstruct, merge_size=self.merge_size) | ||
|
||
if self.visualize: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to leave the visualization to dask proper.
@@ -56,7 +57,7 @@ def containerp(s): | |||
def symbolp(s): | |||
return isinstance(s, dts.DataNode) | |||
|
|||
def __init__(self, dsk): | |||
def __init__(self, dsk, reconstruct=False, merge_size=2): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add detailed comments of what these options do.
Proposed Changes
Adds some functionality to DaskVine with a couple of fixes as well
Task Graph Reconstruction
A user annotates a function with the
@daskvine_merge
decorator. When the DaskVineDag object is created andreconstruct=True
,expand_merge
will reconstruct the graph to perform a hierarchical merge/reduction based onmerge_size
, which is 2 by default. We replace the original node in the graph with the root node of the hierarchical treeThis turns a DAG like this
original.pdf
To one like this
reconstructed.pdf
Visualization
The visualize function in
dask_dag.py
allows users to visualize the graph. Settingvisualize=True
will create a dot graph. This is done after reconstruction.other fixes
In
set_result
taskrefs, more specifically, dts.Alias is not handled properly.Merge Checklist
The following items must be completed before PRs can be merged.
Check these off to verify you have completed all steps.
make test
Run local tests prior to pushing.make format
Format source code to comply with lint policies. Note that some lint errors can only be resolved manually (e.g., Python)make lint
Run lint on source code prior to pushing.