Implement webhook notifier driver

This patch adds a notifier driver for webhook and the basic
task management code with taskflow.

DocImpact
Partially-Implements blueprint: notifications

Change-Id: I2727726cc57f03fb94184653452223f00fcf3d0c
This commit is contained in:
Fei Long Wang 2015-01-30 17:07:00 +13:00
parent f340719b72
commit 99fe10e49c
13 changed files with 194 additions and 11 deletions

View File

@ -23,3 +23,5 @@ oslo.utils>=1.4.0,<1.5.0 # Apache-2.0
SQLAlchemy>=0.9.7,<=0.9.99
enum34
autobahn>=0.10.1
requests>=2.2.0,!=2.4.0
taskflow<0.8.0,>=0.7.1

View File

@ -23,3 +23,5 @@ SQLAlchemy>=0.9.7,<=0.9.99
enum34
trollius>=1.0
autobahn>=0.10.1
requests>=2.2.0,!=2.4.0
taskflow<0.8.0,>=0.7.1

View File

@ -69,6 +69,9 @@ oslo.config.opts =
zaqar.transport.base = zaqar.transport.base:_config_options
zaqar.transport.validation = zaqar.transport.validation:_config_options
zaqar.storage.stages =
zaqar.notification.notifier = zaqar.notification.notifier:NotifierDriver
[nosetests]
where=tests
verbosity=2

View File

@ -18,9 +18,6 @@ python-subunit>=0.0.18
testrepository>=0.0.18
testtools>=0.9.36,!=1.2.0
# Functional Tests
requests>=2.2.0,!=2.4.0
# Documentation
# NOTE(vkmc) sphinx modules are commented out as a workaround for bug #1403510
# sphinx>=1.1.2,!=1.2.0,!=1.3b1,<1.3

View File

@ -20,9 +20,6 @@ python-subunit>=0.0.18
testrepository>=0.0.18
testtools>=0.9.36,!=1.2.0
# Functional Tests
requests>=2.2.0,!=2.4.0
# Documentation
sphinx>=1.1.2,!=1.2.0,!=1.3b1,<1.3
oslosphinx>=2.5.0,<2.6.0 # Apache-2.0

View File

View File

@ -0,0 +1,72 @@
# Copyright (c) 2015 Catalyst IT Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
from zaqar.notification.task import webhook
from zaqar.openstack.common import log as logging
import six
from taskflow import engines
from taskflow.patterns import unordered_flow as uf
from taskflow import task
from taskflow.types import futures
from taskflow.utils import eventlet_utils
LOG = logging.getLogger(__name__)
class NotifierDriver(object):
"""Notifier which is responsible for sending messages to subscribers.
"""
def __init__(self, *args, **kwargs):
self.subscription_controller = kwargs.get('subscription_controller')
if eventlet_utils.EVENTLET_AVAILABLE:
self.executor = futures.GreenThreadPoolExecutor()
else:
# TODO(flwang): Make the max_workers configurable
self.executor = futures.ThreadPoolExecutor(max_workers=10)
def _generate_task(self, subscriber_uri, message):
task_name = uuid.uuid4()
# TODO(flwang): Need to work out a better way to make tasks
s_type = six.moves.urllib_parse.urlparse(subscriber_uri).scheme
t = task.Task
if s_type in ('http', 'https'):
t = webhook.WebhookTask
return t(task_name, inject={'uri': subscriber_uri, 'message': message})
def post(self, queue_name, messages, client_uuid, project=None):
"""Send messages to the subscribers."""
if self.subscription_controller:
subscribers = self.subscription_controller.list(queue_name,
project)
wh_flow = uf.Flow('webhook_notifier_flow')
for s in list(next(subscribers)):
for m in messages:
wh_flow.add(self._generate_task(s['subscriber'], m))
e = engines.load(wh_flow, executor=self.executor,
engine='parallel')
e.run()
else:
LOG.error('Failed to get subscription controller.')

View File

View File

@ -0,0 +1,34 @@
# Copyright (c) 2015 Catalyst IT Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import requests
from zaqar.openstack.common import log as logging
from taskflow import task
LOG = logging.getLogger(__name__)
class WebhookTask(task.Task):
def __init__(self, name, show_name=True, inject=None):
super(WebhookTask, self).__init__(name, inject=inject)
self._show_name = show_name
def execute(self, uri, message, **kwargs):
try:
requests.post(uri, data=message)
except Exception as e:
LOG.error(e)

View File

@ -16,6 +16,7 @@ from oslo_utils import timeutils
import pymongo.errors
from zaqar.common import utils as common_utils
from zaqar import storage
from zaqar.storage import base
from zaqar.storage import errors
from zaqar.storage.mongodb import utils
@ -50,7 +51,8 @@ class SubscriptionController(base.Subscription):
self._collection.ensure_index(SUBSCRIPTIONS_INDEX, unique=True)
@utils.raises_conn_error
def list(self, queue, project=None, marker=None, limit=10):
def list(self, queue, project=None, marker=None,
limit=storage.DEFAULT_SUBSCRIPTIONS_PER_PAGE):
query = {'s': queue, 'p': project}
if marker is not None:
query['_id'] = {'$gt': utils.to_oid(marker)}

View File

@ -42,7 +42,7 @@ def _config_options():
return [(_PIPELINE_GROUP, _PIPELINE_CONFIGS)]
def _get_storage_pipeline(resource_name, conf):
def _get_storage_pipeline(resource_name, conf, *args, **kwargs):
"""Constructs and returns a storage resource pipeline.
This is a helper function for any service supporting
@ -72,10 +72,13 @@ def _get_storage_pipeline(resource_name, conf):
for ns in storage_conf[resource_name + '_pipeline']:
try:
mgr = driver.DriverManager('zaqar.storage.stages',
ns, invoke_on_load=True)
ns,
invoke_args=args,
invoke_kwds=kwargs,
invoke_on_load=True)
pipeline.append(mgr.driver)
except RuntimeError as exc:
LOG.warning(_(u'Stage %(stage)d could not be imported: %(ex)s'),
LOG.warning(_(u'Stage %(stage)s could not be imported: %(ex)s'),
{'stage': ns, 'ex': str(exc)})
continue
@ -138,7 +141,9 @@ class DataDriver(base.DataDriverBase):
@decorators.lazy_property(write=False)
def message_controller(self):
stages = _get_builtin_entry_points('message', self._storage)
stages.extend(_get_storage_pipeline('message', self.conf))
kwargs = {'subscription_controller':
self._storage.subscription_controller}
stages.extend(_get_storage_pipeline('message', self.conf, **kwargs))
stages.append(self._storage.message_controller)
return common.Pipeline(stages)

View File

@ -0,0 +1,69 @@
# Copyright (c) 2014 Catalyst IT Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
import mock
from zaqar.notification import notifier
from zaqar.notification import task
from zaqar import tests as testing
class NotifierTest(testing.TestBase):
def setUp(self):
super(NotifierTest, self).setUp()
self.subscription = [{'subscriber': 'http://trigger.me'},
{'subscriber': 'http://call.me'},
{'subscriber': 'http://ping.me'}
]
self.cliend_id = uuid.uuid4()
self.project = uuid.uuid4()
self.messages = [{"ttl": 300,
"body": {"event": "BackupStarted",
"backup_id": "c378813c-3f0b-11e2-ad92"}
},
{"body": {"event": "BackupProgress",
"current_bytes": "0",
"total_bytes": "99614720"}
}
]
ctlr = mock.MagicMock()
ctlr.list = mock.Mock(return_value=iter(self.subscription))
self.driver = notifier.NotifierDriver(subscription_controller=ctlr)
def test_post(self):
with mock.patch('requests.post') as mock_post:
self.driver.post('fake_queue', self.messages,
self.client_uuid, self.project)
mock_post.assert_called_with(self.subscription[0]['subscriber'],
self.messages[0])
mock_post.assert_called_with(self.subscription[0]['subscriber'],
self.messages[0])
mock_post.assert_called_with(self.subscription[1]['subscriber'],
self.messages[0])
mock_post.assert_called_with(self.subscription[1]['subscriber'],
self.messages[1])
mock_post.assert_called_with(self.subscription[2]['subscriber'],
self.messages[1])
mock_post.assert_called_with(self.subscription[2]['subscriber'],
self.messages[1])
def test_genrate_task(self):
subscriber = self.subscription_list[0]['subscriber']
new_task = self.driver._generate_task(subscriber, self.messages)
self.assertIsInstance(new_task, task.webhook.WebhookTask)