From 572a6296e0ebcfe88263617b846be5718010f989 Mon Sep 17 00:00:00 2001 From: Flavio Percoco Date: Mon, 14 Oct 2013 01:04:45 +0200 Subject: [PATCH] Return a consumer function instead of consuming Former implementation was called consume_for and used to consume the whole pipeline. This implementation returns a callable function instead which makes it possible for wrapped methods to have a `method` keyword and to control / reuse the consumer. Implements blueprint storage-pipeline Change-Id: I8d7ad43028b5615b24f06ca1e116e35e5ab6a145 --- marconi/common/pipeline.py | 87 +++++++++++++++++------------- marconi/queues/storage/pipeline.py | 23 ++++---- tests/unit/common/test_pipeline.py | 5 +- 3 files changed, 63 insertions(+), 52 deletions(-) diff --git a/marconi/common/pipeline.py b/marconi/common/pipeline.py index 9d7cfaba0..86bba19d2 100644 --- a/marconi/common/pipeline.py +++ b/marconi/common/pipeline.py @@ -30,8 +30,6 @@ At least one of the stages has to implement the calling method. If none of them do, an AttributeError exception will be raised. """ -import functools - import six from marconi.common import decorators @@ -50,49 +48,62 @@ class Pipeline(object): @decorators.cached_getattr def __getattr__(self, name): - return functools.partial(self.consume_for, name) + return self.consumer_for(name) - def consume_for(self, method, *args, **kwargs): - """Consumes the pipeline for `method`. + def consumer_for(self, method): + """Creates a closure for `method` - This method walks through the pipeline and calls - `method` for each of the items in the pipeline. A - warning will be logged for each pipe not implementing - `method` and an Attribute error will be raised if - none of the stages do. + This method creates a closure to consume the pipeline + for `method`. - :params method: The method name to call on each pipe + :params method: The method name to call on each stage :type method: `six.text_type` - :param args: Positional arguments to pass to the call. - :param kwargs: Keyword arguments to pass to the call. - :returns: Anything returned by the called methods. - :raises: AttributeError if none of the stages implement `method` + :returns: A callable to consume the pipeline. """ - # NOTE(flaper87): Used as a way to verify - # the requested method exists in at least - # one of the stages, otherwise AttributeError - # will be raised. - target = None - for stage in self._pipeline: - try: - target = getattr(stage, method) - except AttributeError: - msg = _(u'Stage {0} does not implement {1}') - LOG.warning(msg.format(six.text_type(stage), method)) - continue + def consumer(*args, **kwargs): + """Consumes the pipeline for `method` - result = target(*args, **kwargs) + This function walks through the pipeline and calls + `method` for each of the items in the pipeline. A + warning will be logged for each stage not implementing + `method` and an Attribute error will be raised if + none of the stages do. - # NOTE(flaper87): Will keep going forward - # through the pipeline unless the call returns - # something. - if result is not None: - return result + :param args: Positional arguments to pass to the call. + :param kwargs: Keyword arguments to pass to the call. - if target is None: - msg = _(u'Method {0} not found in any of ' - 'the registered stages').format(method) - LOG.error(msg) - raise AttributeError(msg) + :raises: AttributeError if none of the stages implement `method` + """ + # NOTE(flaper87): Used as a way to verify + # the requested method exists in at least + # one of the stages, otherwise AttributeError + # will be raised. + target = None + + for stage in self._pipeline: + try: + target = getattr(stage, method) + except AttributeError: + sstage = six.text_type(stage) + msg = _(u"Stage {0} does not implement {1}").format(sstage, + method) + LOG.warning(msg) + continue + + result = target(*args, **kwargs) + + # NOTE(flaper87): Will keep going forward + # through the stageline unless the call returns + # something. + if result is not None: + return result + + if target is None: + msg = _(u'Method {0} not found in any of ' + 'the registered stages').format(method) + LOG.error(msg) + raise AttributeError(msg) + + return consumer diff --git a/marconi/queues/storage/pipeline.py b/marconi/queues/storage/pipeline.py index 9a2c9dcac..553f1060a 100644 --- a/marconi/queues/storage/pipeline.py +++ b/marconi/queues/storage/pipeline.py @@ -68,11 +68,12 @@ def _get_storage_pipeline(resource_name, conf): pipeline = [] for ns in storage_conf[resource_name + '_pipeline']: try: - mgr = driver.DriverManager('marconi.queues.storage.pipes', + mgr = driver.DriverManager('marconi.queues.storage.stages', ns, invoke_on_load=True) pipeline.append(mgr.driver) except RuntimeError as exc: - msg = _('Pipe {0} could not be imported: {1}').format(ns, str(exc)) + msg = _('Stage {0} could not be imported: {1}').format(ns, + str(exc)) LOG.warning(msg) continue @@ -95,18 +96,18 @@ class Driver(base.DriverBase): @decorators.lazy_property(write=False) def queue_controller(self): - pipes = _get_storage_pipeline('queue', self.conf) - pipes.append(self._storage.queue_controller) - return pipes + stages = _get_storage_pipeline('queue', self.conf) + stages.append(self._storage.queue_controller) + return stages @decorators.lazy_property(write=False) def message_controller(self): - pipes = _get_storage_pipeline('message', self.conf) - pipes.append(self._storage.message_controller) - return pipes + stages = _get_storage_pipeline('message', self.conf) + stages.append(self._storage.message_controller) + return stages @decorators.lazy_property(write=False) def claim_controller(self): - pipes = _get_storage_pipeline('claim', self.conf) - pipes.append(self._storage.claim_controller) - return pipes + stages = _get_storage_pipeline('claim', self.conf) + stages.append(self._storage.claim_controller) + return stages diff --git a/tests/unit/common/test_pipeline.py b/tests/unit/common/test_pipeline.py index 55f66ba85..9abff7003 100644 --- a/tests/unit/common/test_pipeline.py +++ b/tests/unit/common/test_pipeline.py @@ -63,9 +63,8 @@ class TestPipeLine(base.TestBase): SecondClass()]) def test_attribute_error(self): - self.assertRaises(AttributeError, - self.pipeline.consume_for, - 'does_not_exist') + consumer = self.pipeline.consumer_for('does_not_exist') + self.assertRaises(AttributeError, consumer) def test_with_args(self): name = 'James'