From 75d11a8974e0b49a8b84e2c5d70a6a53ce24fb33 Mon Sep 17 00:00:00 2001 From: Thomas Herve Date: Thu, 30 Jul 2015 17:12:56 +0200 Subject: [PATCH] Use the executor directly in notifier Instead of going through taskflow, use the executor directly in the notifier, so that task execution doesn't block the pipeline and the main loop when running. Change-Id: I8cc555494d1d40ec7521046ce1b4a28b9b85e0ba --- requirements.txt | 1 - zaqar/notification/notifier.py | 19 ++----------------- zaqar/notification/task/webhook.py | 6 +----- 3 files changed, 3 insertions(+), 23 deletions(-) diff --git a/requirements.txt b/requirements.txt index 4b697b1e3..092a92d80 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,5 +26,4 @@ enum34;python_version=='2.7' or python_version=='2.6' trollius>=1.0 autobahn>=0.10.1 # MIT License requests>=2.5.2 -taskflow>=1.16.0 futurist>=0.1.2 # Apache-2.0 diff --git a/zaqar/notification/notifier.py b/zaqar/notification/notifier.py index 5dd2e27e4..ad0d633ba 100644 --- a/zaqar/notification/notifier.py +++ b/zaqar/notification/notifier.py @@ -14,13 +14,10 @@ # limitations under the License. from stevedore import driver -import uuid import futurist from oslo_log import log as logging from six.moves import urllib_parse -from taskflow import engines -from taskflow.patterns import unordered_flow as uf LOG = logging.getLogger(__name__) @@ -44,24 +41,12 @@ class NotifierDriver(object): subscribers = self.subscription_controller.list(queue_name, project) - wh_flow = uf.Flow('webhook_notifier_flow') - for sub in next(subscribers): s_type = urllib_parse.urlparse(sub['subscriber']).scheme - invoke_args = [uuid.uuid4()] - invoke_kwds = {'inject': {'subscription': sub, - 'messages': messages}} mgr = driver.DriverManager('zaqar.notification.tasks', s_type, - invoke_on_load=True, - invoke_args=invoke_args, - invoke_kwds=invoke_kwds) - wh_flow.add(mgr.driver) - - if wh_flow: - e = engines.load(wh_flow, executor=self.executor, - engine='parallel') - e.run() + invoke_on_load=True) + self.executor.submit(mgr.driver.execute, sub, messages) else: LOG.error('Failed to get subscription controller.') diff --git a/zaqar/notification/task/webhook.py b/zaqar/notification/task/webhook.py index 4b21446ed..b2b2dbb59 100644 --- a/zaqar/notification/task/webhook.py +++ b/zaqar/notification/task/webhook.py @@ -16,15 +16,11 @@ import json from oslo_log import log as logging import requests -from taskflow import task LOG = logging.getLogger(__name__) -class WebhookTask(task.Task): - def __init__(self, name, show_name=True, inject=None): - super(WebhookTask, self).__init__(name, inject=inject) - self._show_name = show_name +class WebhookTask(object): def execute(self, subscription, messages, **kwargs): try: