zaqar/marconi/common/pipeline.py
Flavio Percoco ef9d5247e9 Switch over oslo.i18n
oslo.i18n has recently been released. This patch switches marconi over
oslo.i18n. As per oslo.i18n's instructions, a new i18n module has been
added under marconi. This module defines the translation globals and
imports the necessary functions from oslo.i18n.

The patch doesn't change the way Marconi does translation, this means
that a `_` function is still being injected to the builtins.

Note that the gettextutils module is still a required module from
oslo-inc because there are oslo-inc modules that depend on the old
gettextutils module.

Closes-bug: #1314300

Change-Id: Ifb8f3296d1a0e2483ebd1c8d868b7359ecc99fb5
2014-07-09 15:39:54 +02:00

114 lines
3.8 KiB
Python

# Copyright (c) 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""This module implements a common Pipeline object.
The pipeline can be used to enhance the storage layer with filtering, routing,
multiplexing and the like. For example:
>>> stages = [MessageFilter(), EncryptionFilter(), QueueController()]
>>> pipeline = Pipeline(stages)
Every stage has to implement the method it wants to hook into. This method
will be called when the pipeline consumption gets to that point - stage
ordering matters - and will continue unless the method call returns a value
that is not None.
At least one of the stages has to implement the calling method. If none of
them do, an AttributeError exception will be raised.
"""
import contextlib
import six
from marconi.common import decorators
from marconi.i18n import _
import marconi.openstack.common.log as logging
LOG = logging.getLogger(__name__)
class Pipeline(object):
def __init__(self, pipeline=None):
self._pipeline = pipeline and list(pipeline) or []
def append(self, stage):
self._pipeline.append(stage)
@decorators.memoized_getattr
def __getattr__(self, name):
with self.consumer_for(name) as consumer:
return consumer
@contextlib.contextmanager
def consumer_for(self, method):
"""Creates a closure for `method`
This method creates a closure to consume the pipeline
for `method`.
:params method: The method name to call on each stage
:type method: `six.text_type`
:returns: A callable to consume the pipeline.
"""
def consumer(*args, **kwargs):
"""Consumes the pipeline for `method`
This function walks through the pipeline and calls
`method` for each of the items in the pipeline. A
warning will be logged for each stage not implementing
`method` and an Attribute error will be raised if
none of the stages do.
:param args: Positional arguments to pass to the call.
:param kwargs: Keyword arguments to pass to the call.
:raises: AttributeError if none of the stages implement `method`
"""
# NOTE(flaper87): Used as a way to verify
# the requested method exists in at least
# one of the stages, otherwise AttributeError
# will be raised.
target = None
for stage in self._pipeline:
try:
target = getattr(stage, method)
except AttributeError:
sstage = six.text_type(stage)
msgtmpl = _(u"Stage %(stage)s does not "
"implement %(method)s")
LOG.warning(msgtmpl, {'stage': sstage, 'method': method})
continue
result = target(*args, **kwargs)
# NOTE(flaper87): Will keep going forward
# through the stageline unless the call returns
# something.
if result is not None:
return result
if target is None:
msg = _(u'Method %s not found in any of '
'the registered stages') % method
LOG.error(msg)
raise AttributeError(msg)
yield consumer