Merge "Use the executor directly in notifier"

This commit is contained in:
Jenkins 2015-08-19 21:42:59 +00:00 committed by Gerrit Code Review
commit 5e8cd7eb7f
3 changed files with 3 additions and 23 deletions

View File

@ -26,5 +26,4 @@ enum34;python_version=='2.7' or python_version=='2.6'
trollius>=1.0 trollius>=1.0
autobahn>=0.10.1 # MIT License autobahn>=0.10.1 # MIT License
requests>=2.5.2 requests>=2.5.2
taskflow>=1.16.0
futurist>=0.1.2 # Apache-2.0 futurist>=0.1.2 # Apache-2.0

View File

@ -14,13 +14,10 @@
# limitations under the License. # limitations under the License.
from stevedore import driver from stevedore import driver
import uuid
import futurist import futurist
from oslo_log import log as logging from oslo_log import log as logging
from six.moves import urllib_parse from six.moves import urllib_parse
from taskflow import engines
from taskflow.patterns import unordered_flow as uf
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -44,24 +41,12 @@ class NotifierDriver(object):
subscribers = self.subscription_controller.list(queue_name, subscribers = self.subscription_controller.list(queue_name,
project) project)
wh_flow = uf.Flow('webhook_notifier_flow')
for sub in next(subscribers): for sub in next(subscribers):
s_type = urllib_parse.urlparse(sub['subscriber']).scheme s_type = urllib_parse.urlparse(sub['subscriber']).scheme
invoke_args = [uuid.uuid4()]
invoke_kwds = {'inject': {'subscription': sub,
'messages': messages}}
mgr = driver.DriverManager('zaqar.notification.tasks', mgr = driver.DriverManager('zaqar.notification.tasks',
s_type, s_type,
invoke_on_load=True, invoke_on_load=True)
invoke_args=invoke_args, self.executor.submit(mgr.driver.execute, sub, messages)
invoke_kwds=invoke_kwds)
wh_flow.add(mgr.driver)
if wh_flow:
e = engines.load(wh_flow, executor=self.executor,
engine='parallel')
e.run()
else: else:
LOG.error('Failed to get subscription controller.') LOG.error('Failed to get subscription controller.')

View File

@ -16,15 +16,11 @@
import json import json
from oslo_log import log as logging from oslo_log import log as logging
import requests import requests
from taskflow import task
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class WebhookTask(task.Task): class WebhookTask(object):
def __init__(self, name, show_name=True, inject=None):
super(WebhookTask, self).__init__(name, inject=inject)
self._show_name = show_name
def execute(self, subscription, messages, **kwargs): def execute(self, subscription, messages, **kwargs):
try: try: