Update order calls and connect call.
This commit is contained in:
@@ -16,8 +16,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from collections import defaultdict
|
||||
|
||||
import collections
|
||||
import logging
|
||||
|
||||
from networkx import exception as g_exc
|
||||
@@ -25,7 +24,9 @@ from networkx.algorithms import dag
|
||||
from networkx.classes import digraph
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow.openstack.common import excutils
|
||||
from taskflow.patterns import ordered_flow
|
||||
from taskflow import states
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@@ -50,20 +51,19 @@ class Flow(ordered_flow.Flow):
|
||||
self._connected = False
|
||||
|
||||
def _fetch_task_inputs(self, task):
|
||||
task_needs = task.requires()
|
||||
if not task_needs:
|
||||
return None
|
||||
inputs = {}
|
||||
for (_who, there_result) in self.results:
|
||||
for n in task_needs:
|
||||
if there_result and n in there_result:
|
||||
for n in task.requires():
|
||||
for (them, there_result) in self.results:
|
||||
if (not self._graph.has_edge(them, task) or
|
||||
not n in them.provides() or not there_result):
|
||||
continue
|
||||
if n in there_result:
|
||||
# NOTE(harlowja): later results overwrite
|
||||
# prior results for the same keys, which is
|
||||
# typically desired.
|
||||
inputs[n] = there_result[n]
|
||||
return inputs
|
||||
|
||||
def run(self, context, *args, **kwargs):
|
||||
self.connect()
|
||||
return super(Flow, self).run(context, *args, **kwargs)
|
||||
|
||||
def order(self):
|
||||
self.connect()
|
||||
try:
|
||||
@@ -79,17 +79,19 @@ class Flow(ordered_flow.Flow):
|
||||
if self._connected or len(self._graph) == 0:
|
||||
return
|
||||
|
||||
provides_what = defaultdict(list)
|
||||
requires_what = defaultdict(list)
|
||||
# Figure out the provider of items and the requirers of items.
|
||||
provides_what = collections.defaultdict(list)
|
||||
requires_what = collections.defaultdict(list)
|
||||
for t in self._graph.nodes_iter():
|
||||
for r in t.requires():
|
||||
requires_what[r].append(t)
|
||||
for p in t.provides():
|
||||
provides_what[p].append(t)
|
||||
|
||||
for (want_what, who_wants) in requires_what.items():
|
||||
# Link providers to consumers of items.
|
||||
for (want_what, who_wants) in requires_what.iteritems():
|
||||
who_provided = 0
|
||||
for p in provides_what.get(want_what, []):
|
||||
for p in provides_what[want_what]:
|
||||
# P produces for N so thats why we link P->N and not N->P
|
||||
for n in who_wants:
|
||||
if p is n:
|
||||
@@ -101,7 +103,8 @@ class Flow(ordered_flow.Flow):
|
||||
self._graph.add_edge(p, n, why)
|
||||
who_provided += 1
|
||||
if not who_provided:
|
||||
raise exc.InvalidStateException("Task/s %s requires input %s "
|
||||
who_wants = ", ".join([str(a) for a in who_wants])
|
||||
raise exc.InvalidStateException("%s requires input %s "
|
||||
"but no other task produces "
|
||||
"said output." % (who_wants,
|
||||
want_what))
|
||||
|
||||
Reference in New Issue
Block a user