From 99fe10e49c54ac6418893a0c5a7a89e62b636e31 Mon Sep 17 00:00:00 2001 From: Fei Long Wang Date: Fri, 30 Jan 2015 17:07:00 +1300 Subject: [PATCH] Implement webhook notifier driver This patch adds a notifier driver for webhook and the basic task management code with taskflow. DocImpact Partially-Implements blueprint: notifications Change-Id: I2727726cc57f03fb94184653452223f00fcf3d0c --- requirements-py3.txt | 2 + requirements.txt | 2 + setup.cfg | 3 + test-requirements-py3.txt | 3 - test-requirements.txt | 3 - zaqar/notification/__init__.py | 0 zaqar/notification/notifier.py | 72 +++++++++++++++++++ zaqar/notification/task/__init__.py | 0 zaqar/notification/task/webhook.py | 34 +++++++++ zaqar/storage/mongodb/subscriptions.py | 4 +- zaqar/storage/pipeline.py | 13 ++-- zaqar/tests/unit/notification/__init__.py | 0 .../tests/unit/notification/test_notifier.py | 69 ++++++++++++++++++ 13 files changed, 194 insertions(+), 11 deletions(-) create mode 100644 zaqar/notification/__init__.py create mode 100644 zaqar/notification/notifier.py create mode 100644 zaqar/notification/task/__init__.py create mode 100644 zaqar/notification/task/webhook.py create mode 100644 zaqar/tests/unit/notification/__init__.py create mode 100644 zaqar/tests/unit/notification/test_notifier.py diff --git a/requirements-py3.txt b/requirements-py3.txt index ff4add439..ca6fe16e1 100644 --- a/requirements-py3.txt +++ b/requirements-py3.txt @@ -23,3 +23,5 @@ oslo.utils>=1.4.0,<1.5.0 # Apache-2.0 SQLAlchemy>=0.9.7,<=0.9.99 enum34 autobahn>=0.10.1 +requests>=2.2.0,!=2.4.0 +taskflow<0.8.0,>=0.7.1 diff --git a/requirements.txt b/requirements.txt index 3a818b1b6..8b3edc834 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,3 +23,5 @@ SQLAlchemy>=0.9.7,<=0.9.99 enum34 trollius>=1.0 autobahn>=0.10.1 +requests>=2.2.0,!=2.4.0 +taskflow<0.8.0,>=0.7.1 diff --git a/setup.cfg b/setup.cfg index fea530943..bb9eb71f4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -69,6 +69,9 @@ oslo.config.opts = zaqar.transport.base = zaqar.transport.base:_config_options zaqar.transport.validation = zaqar.transport.validation:_config_options +zaqar.storage.stages = + zaqar.notification.notifier = zaqar.notification.notifier:NotifierDriver + [nosetests] where=tests verbosity=2 diff --git a/test-requirements-py3.txt b/test-requirements-py3.txt index f6d63072c..fcf1a9326 100644 --- a/test-requirements-py3.txt +++ b/test-requirements-py3.txt @@ -18,9 +18,6 @@ python-subunit>=0.0.18 testrepository>=0.0.18 testtools>=0.9.36,!=1.2.0 -# Functional Tests -requests>=2.2.0,!=2.4.0 - # Documentation # NOTE(vkmc) sphinx modules are commented out as a workaround for bug #1403510 # sphinx>=1.1.2,!=1.2.0,!=1.3b1,<1.3 diff --git a/test-requirements.txt b/test-requirements.txt index e3dbdcbd0..b3e2b49a2 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -20,9 +20,6 @@ python-subunit>=0.0.18 testrepository>=0.0.18 testtools>=0.9.36,!=1.2.0 -# Functional Tests -requests>=2.2.0,!=2.4.0 - # Documentation sphinx>=1.1.2,!=1.2.0,!=1.3b1,<1.3 oslosphinx>=2.5.0,<2.6.0 # Apache-2.0 diff --git a/zaqar/notification/__init__.py b/zaqar/notification/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/zaqar/notification/notifier.py b/zaqar/notification/notifier.py new file mode 100644 index 000000000..766bb303b --- /dev/null +++ b/zaqar/notification/notifier.py @@ -0,0 +1,72 @@ +# Copyright (c) 2015 Catalyst IT Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import uuid + +from zaqar.notification.task import webhook +from zaqar.openstack.common import log as logging + +import six +from taskflow import engines +from taskflow.patterns import unordered_flow as uf +from taskflow import task +from taskflow.types import futures +from taskflow.utils import eventlet_utils + +LOG = logging.getLogger(__name__) + + +class NotifierDriver(object): + """Notifier which is responsible for sending messages to subscribers. + + """ + + def __init__(self, *args, **kwargs): + self.subscription_controller = kwargs.get('subscription_controller') + + if eventlet_utils.EVENTLET_AVAILABLE: + self.executor = futures.GreenThreadPoolExecutor() + else: + # TODO(flwang): Make the max_workers configurable + self.executor = futures.ThreadPoolExecutor(max_workers=10) + + def _generate_task(self, subscriber_uri, message): + task_name = uuid.uuid4() + # TODO(flwang): Need to work out a better way to make tasks + s_type = six.moves.urllib_parse.urlparse(subscriber_uri).scheme + + t = task.Task + if s_type in ('http', 'https'): + t = webhook.WebhookTask + + return t(task_name, inject={'uri': subscriber_uri, 'message': message}) + + def post(self, queue_name, messages, client_uuid, project=None): + """Send messages to the subscribers.""" + if self.subscription_controller: + subscribers = self.subscription_controller.list(queue_name, + project) + + wh_flow = uf.Flow('webhook_notifier_flow') + + for s in list(next(subscribers)): + for m in messages: + wh_flow.add(self._generate_task(s['subscriber'], m)) + + e = engines.load(wh_flow, executor=self.executor, + engine='parallel') + e.run() + else: + LOG.error('Failed to get subscription controller.') diff --git a/zaqar/notification/task/__init__.py b/zaqar/notification/task/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/zaqar/notification/task/webhook.py b/zaqar/notification/task/webhook.py new file mode 100644 index 000000000..205b795d9 --- /dev/null +++ b/zaqar/notification/task/webhook.py @@ -0,0 +1,34 @@ +# Copyright (c) 2015 Catalyst IT Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import requests + +from zaqar.openstack.common import log as logging + +from taskflow import task + +LOG = logging.getLogger(__name__) + + +class WebhookTask(task.Task): + def __init__(self, name, show_name=True, inject=None): + super(WebhookTask, self).__init__(name, inject=inject) + self._show_name = show_name + + def execute(self, uri, message, **kwargs): + try: + requests.post(uri, data=message) + except Exception as e: + LOG.error(e) diff --git a/zaqar/storage/mongodb/subscriptions.py b/zaqar/storage/mongodb/subscriptions.py index cb9e8feec..a41a2cab5 100644 --- a/zaqar/storage/mongodb/subscriptions.py +++ b/zaqar/storage/mongodb/subscriptions.py @@ -16,6 +16,7 @@ from oslo_utils import timeutils import pymongo.errors from zaqar.common import utils as common_utils +from zaqar import storage from zaqar.storage import base from zaqar.storage import errors from zaqar.storage.mongodb import utils @@ -50,7 +51,8 @@ class SubscriptionController(base.Subscription): self._collection.ensure_index(SUBSCRIPTIONS_INDEX, unique=True) @utils.raises_conn_error - def list(self, queue, project=None, marker=None, limit=10): + def list(self, queue, project=None, marker=None, + limit=storage.DEFAULT_SUBSCRIPTIONS_PER_PAGE): query = {'s': queue, 'p': project} if marker is not None: query['_id'] = {'$gt': utils.to_oid(marker)} diff --git a/zaqar/storage/pipeline.py b/zaqar/storage/pipeline.py index 542138bfd..b9ccb4b57 100644 --- a/zaqar/storage/pipeline.py +++ b/zaqar/storage/pipeline.py @@ -42,7 +42,7 @@ def _config_options(): return [(_PIPELINE_GROUP, _PIPELINE_CONFIGS)] -def _get_storage_pipeline(resource_name, conf): +def _get_storage_pipeline(resource_name, conf, *args, **kwargs): """Constructs and returns a storage resource pipeline. This is a helper function for any service supporting @@ -72,10 +72,13 @@ def _get_storage_pipeline(resource_name, conf): for ns in storage_conf[resource_name + '_pipeline']: try: mgr = driver.DriverManager('zaqar.storage.stages', - ns, invoke_on_load=True) + ns, + invoke_args=args, + invoke_kwds=kwargs, + invoke_on_load=True) pipeline.append(mgr.driver) except RuntimeError as exc: - LOG.warning(_(u'Stage %(stage)d could not be imported: %(ex)s'), + LOG.warning(_(u'Stage %(stage)s could not be imported: %(ex)s'), {'stage': ns, 'ex': str(exc)}) continue @@ -138,7 +141,9 @@ class DataDriver(base.DataDriverBase): @decorators.lazy_property(write=False) def message_controller(self): stages = _get_builtin_entry_points('message', self._storage) - stages.extend(_get_storage_pipeline('message', self.conf)) + kwargs = {'subscription_controller': + self._storage.subscription_controller} + stages.extend(_get_storage_pipeline('message', self.conf, **kwargs)) stages.append(self._storage.message_controller) return common.Pipeline(stages) diff --git a/zaqar/tests/unit/notification/__init__.py b/zaqar/tests/unit/notification/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/zaqar/tests/unit/notification/test_notifier.py b/zaqar/tests/unit/notification/test_notifier.py new file mode 100644 index 000000000..f705b575a --- /dev/null +++ b/zaqar/tests/unit/notification/test_notifier.py @@ -0,0 +1,69 @@ +# Copyright (c) 2014 Catalyst IT Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import uuid + +import mock + +from zaqar.notification import notifier +from zaqar.notification import task +from zaqar import tests as testing + + +class NotifierTest(testing.TestBase): + + def setUp(self): + super(NotifierTest, self).setUp() + self.subscription = [{'subscriber': 'http://trigger.me'}, + {'subscriber': 'http://call.me'}, + {'subscriber': 'http://ping.me'} + ] + self.cliend_id = uuid.uuid4() + self.project = uuid.uuid4() + self.messages = [{"ttl": 300, + "body": {"event": "BackupStarted", + "backup_id": "c378813c-3f0b-11e2-ad92"} + }, + {"body": {"event": "BackupProgress", + "current_bytes": "0", + "total_bytes": "99614720"} + } + ] + + ctlr = mock.MagicMock() + ctlr.list = mock.Mock(return_value=iter(self.subscription)) + self.driver = notifier.NotifierDriver(subscription_controller=ctlr) + + def test_post(self): + with mock.patch('requests.post') as mock_post: + self.driver.post('fake_queue', self.messages, + self.client_uuid, self.project) + mock_post.assert_called_with(self.subscription[0]['subscriber'], + self.messages[0]) + mock_post.assert_called_with(self.subscription[0]['subscriber'], + self.messages[0]) + mock_post.assert_called_with(self.subscription[1]['subscriber'], + self.messages[0]) + mock_post.assert_called_with(self.subscription[1]['subscriber'], + self.messages[1]) + mock_post.assert_called_with(self.subscription[2]['subscriber'], + self.messages[1]) + mock_post.assert_called_with(self.subscription[2]['subscriber'], + self.messages[1]) + + def test_genrate_task(self): + subscriber = self.subscription_list[0]['subscriber'] + new_task = self.driver._generate_task(subscriber, self.messages) + self.assertIsInstance(new_task, task.webhook.WebhookTask)