Add RPC support modules needed for conductor

Change-Id: I5cfad5c20cbe96268fee25c94e078a688373a700
This commit is contained in:
vinod pandarinathan 2015-06-06 08:33:22 -07:00
parent a5f303b6f2
commit a3db64326f
3 changed files with 286 additions and 0 deletions

View File

@ -0,0 +1,30 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
# Copyright 2012 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from cloudpulse.common import rpc
from cloudpulse import version
def parse_args(argv, default_config_files=None):
rpc.set_defaults(control_exchange='cloudpulse')
cfg.CONF(argv[1:],
project='cloudpulse',
version=version.version_info.release_string(),
default_config_files=default_config_files)
rpc.init(cfg.CONF)

148
cloudpulse/common/rpc.py Normal file
View File

@ -0,0 +1,148 @@
# Copyright 2014 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__all__ = [
'init',
'cleanup',
'set_defaults',
'add_extra_exmods',
'clear_extra_exmods',
'get_allowed_exmods',
'RequestContextSerializer',
'get_client',
'get_server',
'get_notifier',
'TRANSPORT_ALIASES',
]
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_serialization import jsonutils
from cloudpulse.common import context as cloudpulse_context
from cloudpulse.common import exception
CONF = cfg.CONF
TRANSPORT = None
NOTIFIER = None
ALLOWED_EXMODS = [
exception.__name__,
]
EXTRA_EXMODS = []
# NOTE(lucasagomes): The cloudpulse.openstack.common.rpc entries are for
# backwards compat with IceHouse rpc_backend configuration values.
TRANSPORT_ALIASES = {
'cloudpulse.openstack.common.rpc.impl_kombu': 'rabbit',
'cloudpulse.openstack.common.rpc.impl_qpid': 'qpid',
'cloudpulse.openstack.common.rpc.impl_zmq': 'zmq',
'cloudpulse.rpc.impl_kombu': 'rabbit',
'cloudpulse.rpc.impl_qpid': 'qpid',
'cloudpulse.rpc.impl_zmq': 'zmq',
}
def init(conf):
global TRANSPORT, NOTIFIER
exmods = get_allowed_exmods()
TRANSPORT = messaging.get_transport(conf,
allowed_remote_exmods=exmods,
aliases=TRANSPORT_ALIASES)
serializer = RequestContextSerializer(JsonPayloadSerializer())
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer)
def cleanup():
global TRANSPORT, NOTIFIER
assert TRANSPORT is not None
assert NOTIFIER is not None
TRANSPORT.cleanup()
TRANSPORT = NOTIFIER = None
def set_defaults(control_exchange):
messaging.set_transport_defaults(control_exchange)
def add_extra_exmods(*args):
EXTRA_EXMODS.extend(args)
def clear_extra_exmods():
del EXTRA_EXMODS[:]
def get_allowed_exmods():
return ALLOWED_EXMODS + EXTRA_EXMODS
class JsonPayloadSerializer(messaging.NoOpSerializer):
@staticmethod
def serialize_entity(context, entity):
return jsonutils.to_primitive(entity, convert_instances=True)
class RequestContextSerializer(messaging.Serializer):
def __init__(self, base):
self._base = base
def serialize_entity(self, context, entity):
if not self._base:
return entity
return self._base.serialize_entity(context, entity)
def deserialize_entity(self, context, entity):
if not self._base:
return entity
return self._base.deserialize_entity(context, entity)
def serialize_context(self, context):
return context
def deserialize_context(self, context):
return cloudpulse_context.RequestContext.from_dict(context)
def get_transport_url(url_str=None):
return messaging.TransportURL.parse(CONF, url_str, TRANSPORT_ALIASES)
def get_client(target, version_cap=None, serializer=None):
assert TRANSPORT is not None
serializer = RequestContextSerializer(serializer)
return messaging.RPCClient(TRANSPORT,
target,
version_cap=version_cap,
serializer=serializer)
def get_server(target, endpoints, serializer=None):
assert TRANSPORT is not None
serializer = RequestContextSerializer(serializer)
return messaging.get_rpc_server(TRANSPORT,
target,
endpoints,
executor='eventlet',
serializer=serializer)
def get_notifier(service=None, host=None, publisher_id=None):
assert NOTIFIER is not None
if not publisher_id:
publisher_id = "%s.%s" % (service, host or CONF.host)
return NOTIFIER.prepare(publisher_id=publisher_id)

View File

@ -0,0 +1,108 @@
# Copyright 2014 - Rackspace Hosting
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Common RPC service and API tools for Cloudpulse."""
import eventlet
from oslo_config import cfg
import oslo_messaging as messaging
from cloudpulse.common import context as cloudpulse_context
from cloudpulse.common import rpc
from cloudpulse.objects import base as objects_base
# NOTE(paulczar):
# Ubuntu 14.04 forces librabbitmq when kombu is used
# Unfortunately it forces a version that has a crash
# bug. Calling eventlet.monkey_patch() tells kombu
# to use libamqp instead.
eventlet.monkey_patch()
# NOTE(asalkeld):
# The cloudpulse.openstack.common.rpc entries are for compatibility
# with devstack rpc_backend configuration values.
TRANSPORT_ALIASES = {
'cloudpulse.openstack.common.rpc.impl_kombu': 'rabbit',
'cloudpulse.openstack.common.rpc.impl_qpid': 'qpid',
'cloudpulse.openstack.common.rpc.impl_zmq': 'zmq',
}
class RequestContextSerializer(messaging.Serializer):
def __init__(self, base):
self._base = base
def serialize_entity(self, context, entity):
if not self._base:
return entity
return self._base.serialize_entity(context, entity)
def deserialize_entity(self, context, entity):
if not self._base:
return entity
return self._base.deserialize_entity(context, entity)
def serialize_context(self, context):
return context.to_dict()
def deserialize_context(self, context):
return cloudpulse_context.RequestContext.from_dict(context)
class Service(object):
_server = None
def __init__(self, topic, server, handlers):
serializer = RequestContextSerializer(
objects_base.CloudpulseObjectSerializer())
transport = messaging.get_transport(cfg.CONF,
aliases=TRANSPORT_ALIASES)
# TODO(asalkeld) add support for version='x.y'
target = messaging.Target(topic=topic, server=server)
self._server = messaging.get_rpc_server(transport, target, handlers,
serializer=serializer)
def serve(self):
self._server.start()
self._server.wait()
class API(object):
def __init__(self, transport=None, context=None, topic=None, server=None,
timeout=None):
serializer = RequestContextSerializer(
objects_base.CloudpulseObjectSerializer())
if transport is None:
exmods = rpc.get_allowed_exmods()
transport = messaging.get_transport(cfg.CONF,
allowed_remote_exmods=exmods,
aliases=TRANSPORT_ALIASES)
self._context = context
if topic is None:
topic = ''
target = messaging.Target(topic=topic, server=server)
self._client = messaging.RPCClient(transport, target,
serializer=serializer,
timeout=timeout)
def _call(self, method, *args, **kwargs):
return self._client.call(self._context, method, *args, **kwargs)
def _cast(self, method, *args, **kwargs):
self._client.cast(self._context, method, *args, **kwargs)
def echo(self, message):
self._cast('echo', message=message)