You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
110 lines
3.8 KiB
110 lines
3.8 KiB
# Copyright 2014 - Rackspace Hosting |
|
# |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
|
# you may not use this file except in compliance with the License. |
|
# You may obtain a copy of the License at |
|
# |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
|
# |
|
# Unless required by applicable law or agreed to in writing, software |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
# See the License for the specific language governing permissions and |
|
# limitations under the License. |
|
|
|
"""Common RPC service and API tools for Magnum.""" |
|
|
|
import eventlet |
|
from oslo_config import cfg |
|
import oslo_messaging as messaging |
|
from oslo_service import service |
|
|
|
from magnum.common import rpc |
|
from magnum.objects import base as objects_base |
|
from magnum.service import periodic |
|
|
|
|
|
# NOTE(paulczar): |
|
# Ubuntu 14.04 forces librabbitmq when kombu is used |
|
# Unfortunately it forces a version that has a crash |
|
# bug. Calling eventlet.monkey_patch() tells kombu |
|
# to use libamqp instead. |
|
eventlet.monkey_patch() |
|
|
|
# NOTE(asalkeld): |
|
# The magnum.openstack.common.rpc entries are for compatibility |
|
# with devstack rpc_backend configuration values. |
|
TRANSPORT_ALIASES = { |
|
'magnum.openstack.common.rpc.impl_kombu': 'rabbit', |
|
'magnum.openstack.common.rpc.impl_qpid': 'qpid', |
|
'magnum.openstack.common.rpc.impl_zmq': 'zmq', |
|
} |
|
|
|
periodic_opts = [ |
|
cfg.BoolOpt('periodic_enable', |
|
default=True, |
|
help='Enable periodic tasks.'), |
|
cfg.IntOpt('periodic_interval_max', |
|
default=60, |
|
help='Max interval size between periodic tasks execution in ' |
|
'seconds.'), |
|
] |
|
|
|
CONF = cfg.CONF |
|
CONF.register_opts(periodic_opts) |
|
|
|
|
|
class Service(service.Service): |
|
_server = None |
|
|
|
def __init__(self, topic, server, handlers): |
|
super(Service, self).__init__() |
|
serializer = rpc.RequestContextSerializer( |
|
objects_base.MagnumObjectSerializer()) |
|
transport = messaging.get_transport(cfg.CONF, |
|
aliases=TRANSPORT_ALIASES) |
|
# TODO(asalkeld) add support for version='x.y' |
|
target = messaging.Target(topic=topic, server=server) |
|
self._server = messaging.get_rpc_server(transport, target, handlers, |
|
serializer=serializer) |
|
|
|
def start(self): |
|
if CONF.periodic_enable: |
|
self.tg = periodic.setup(CONF) |
|
self._server.start() |
|
|
|
def wait(self): |
|
self._server.wait() |
|
|
|
@classmethod |
|
def create(cls, topic, server, handlers): |
|
service_obj = cls(topic, server, handlers) |
|
return service_obj |
|
|
|
|
|
class API(object): |
|
def __init__(self, transport=None, context=None, topic=None, server=None, |
|
timeout=None): |
|
serializer = rpc.RequestContextSerializer( |
|
objects_base.MagnumObjectSerializer()) |
|
if transport is None: |
|
exmods = rpc.get_allowed_exmods() |
|
transport = messaging.get_transport(cfg.CONF, |
|
allowed_remote_exmods=exmods, |
|
aliases=TRANSPORT_ALIASES) |
|
self._context = context |
|
if topic is None: |
|
topic = '' |
|
target = messaging.Target(topic=topic, server=server) |
|
self._client = messaging.RPCClient(transport, target, |
|
serializer=serializer, |
|
timeout=timeout) |
|
|
|
def _call(self, method, *args, **kwargs): |
|
return self._client.call(self._context, method, *args, **kwargs) |
|
|
|
def _cast(self, method, *args, **kwargs): |
|
self._client.cast(self._context, method, *args, **kwargs) |
|
|
|
def echo(self, message): |
|
self._cast('echo', message=message)
|
|
|