Refactor notification framework

1. Using stevedore to load the task instance
2. Pass in messages list to task instead of single message

Change-Id: Ie8a475b8c2530bfd0fb2d39929c1611288c601fe
This commit is contained in:
Fei Long Wang 2015-07-08 15:26:46 +12:00
parent 55f1e3dbd1
commit a128dbc3ac
5 changed files with 23 additions and 28 deletions

View File

@ -73,6 +73,9 @@ zaqar.storage.mongodb.driver.queue.stages =
zaqar.storage.redis.driver.queue.stages = zaqar.storage.redis.driver.queue.stages =
message_queue_handler = zaqar.storage.redis.messages:MessageQueueHandler message_queue_handler = zaqar.storage.redis.messages:MessageQueueHandler
zaqar.notification.tasks =
http = zaqar.notification.task.webhook:WebhookTask
https = zaqar.notification.task.webhook:WebhookTask
[nosetests] [nosetests]
where=zaqar/tests where=zaqar/tests

View File

@ -13,16 +13,14 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from stevedore import driver
import uuid import uuid
import futurist import futurist
from oslo_log import log as logging from oslo_log import log as logging
import six from six.moves import urllib_parse
from taskflow import engines from taskflow import engines
from taskflow.patterns import unordered_flow as uf from taskflow.patterns import unordered_flow as uf
from taskflow import task
from zaqar.notification.task import webhook
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -40,17 +38,6 @@ class NotifierDriver(object):
# TODO(flwang): Make the max_workers configurable # TODO(flwang): Make the max_workers configurable
self.executor = futurist.ThreadPoolExecutor(max_workers=10) self.executor = futurist.ThreadPoolExecutor(max_workers=10)
def _generate_task(self, subscriber_uri, message):
task_name = uuid.uuid4()
# TODO(flwang): Need to work out a better way to make tasks
s_type = six.moves.urllib_parse.urlparse(subscriber_uri).scheme
t = task.Task
if s_type in ('http', 'https'):
t = webhook.WebhookTask
return t(task_name, inject={'uri': subscriber_uri, 'message': message})
def post(self, queue_name, messages, client_uuid, project=None): def post(self, queue_name, messages, client_uuid, project=None):
"""Send messages to the subscribers.""" """Send messages to the subscribers."""
if self.subscription_controller: if self.subscription_controller:
@ -59,9 +46,18 @@ class NotifierDriver(object):
wh_flow = uf.Flow('webhook_notifier_flow') wh_flow = uf.Flow('webhook_notifier_flow')
for s in list(next(subscribers)): for sub in next(subscribers):
for m in messages: s_type = urllib_parse.urlparse(sub['subscriber']).scheme
wh_flow.add(self._generate_task(s['subscriber'], m)) invoke_args = [uuid.uuid4()]
invoke_kwds = {'inject': {'subscription': sub,
'messages': messages}}
mgr = driver.DriverManager('zaqar.notification.tasks',
s_type,
invoke_on_load=True,
invoke_args=invoke_args,
invoke_kwds=invoke_kwds)
wh_flow.add(mgr.driver)
if wh_flow: if wh_flow:
e = engines.load(wh_flow, executor=self.executor, e = engines.load(wh_flow, executor=self.executor,

View File

@ -26,9 +26,11 @@ class WebhookTask(task.Task):
super(WebhookTask, self).__init__(name, inject=inject) super(WebhookTask, self).__init__(name, inject=inject)
self._show_name = show_name self._show_name = show_name
def execute(self, uri, message, **kwargs): def execute(self, subscription, messages, **kwargs):
try: try:
requests.post(uri, data=json.dumps(message), for msg in messages:
requests.post(subscription['subscriber'],
data=json.dumps(msg),
headers={'Content-Type': 'application/json'}) headers={'Content-Type': 'application/json'})
except Exception as e: except Exception as e:
LOG.error(e) LOG.error(e)

View File

@ -19,7 +19,6 @@ import uuid
import mock import mock
from zaqar.notification import notifier from zaqar.notification import notifier
from zaqar.notification import task
from zaqar import tests as testing from zaqar import tests as testing
@ -74,11 +73,6 @@ class NotifierTest(testing.TestBase):
], any_order=True) ], any_order=True)
self.assertEqual(6, len(mock_post.mock_calls)) self.assertEqual(6, len(mock_post.mock_calls))
def test_generate_task(self):
subscriber = self.subscription[0]['subscriber']
new_task = self.driver._generate_task(subscriber, self.messages)
self.assertIsInstance(new_task, task.webhook.WebhookTask)
def test_post_no_subscriber(self): def test_post_no_subscriber(self):
ctlr = mock.MagicMock() ctlr = mock.MagicMock()
ctlr.list = mock.Mock(return_value=iter([[]])) ctlr.list = mock.Mock(return_value=iter([[]]))

View File

@ -72,7 +72,7 @@ _TRANSPORT_LIMITS_OPTIONS = (
deprecated_group='limits:transport', deprecated_group='limits:transport',
help='Defines the maximum message grace period in seconds.'), help='Defines the maximum message grace period in seconds.'),
cfg.ListOpt('subscriber_types', default=['http'], cfg.ListOpt('subscriber_types', default=['http', 'https'],
help='Defines supported subscriber types.'), help='Defines supported subscriber types.'),
) )