From 3f85ccce3a55f37c39e10d382f4dfc8cd5a5fd91 Mon Sep 17 00:00:00 2001 From: Steven Dake Date: Sat, 6 Dec 2014 13:07:20 -0700 Subject: [PATCH] Copy ironic/common files to magnum/common for RPC server These are straight copies of ironic/ironic/common files needed for the RPC server. Change-Id: If493eaee919e55b2e5f85f2411c0b0d813f21bde --- magnum/common/config.py | 3 + magnum/common/context.py | 75 +++++++++++--------- magnum/common/rpc.py | 148 +++++++++++++++++++++++++++++++++++++++ magnum/common/service.py | 120 +++++++++++++++++++++++++++---- 4 files changed, 298 insertions(+), 48 deletions(-) create mode 100644 magnum/common/rpc.py diff --git a/magnum/common/config.py b/magnum/common/config.py index 7e21b8ae68..86c2ab6a06 100644 --- a/magnum/common/config.py +++ b/magnum/common/config.py @@ -17,11 +17,14 @@ from oslo.config import cfg +from magnum.common import rpc from magnum import version def parse_args(argv, default_config_files=None): + rpc.set_defaults(control_exchange='magnum') cfg.CONF(argv[1:], project='magnum', version=version.version_info.release_string(), default_config_files=default_config_files) + rpc.init(cfg.CONF) diff --git a/magnum/common/context.py b/magnum/common/context.py index 53d73eb1f7..08b4f4d48b 100644 --- a/magnum/common/context.py +++ b/magnum/common/context.py @@ -1,54 +1,59 @@ -# Copyright 2014 - Mirantis, Inc. +# -*- encoding: utf-8 -*- # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import inspect +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. from magnum.openstack.common import context class RequestContext(context.RequestContext): - def __init__(self, auth_token=None, user=None, tenant=None, domain=None, - user_domain=None, project_domain=None, is_admin=False, - read_only=False, request_id=None, user_name=None, roles=None, - auth_url=None, trust_id=None, auth_token_info=None): + """Extends security contexts from the OpenStack common library.""" + + def __init__(self, auth_token=None, domain_id=None, domain_name=None, + user=None, tenant=None, is_admin=False, is_public_api=False, + read_only=False, show_deleted=False, request_id=None): + """Stores several additional request parameters: + + :param domain_id: The ID of the domain. + :param domain_name: The name of the domain. + :param is_public_api: Specifies whether the request should be processed + without authentication. + + """ + self.is_public_api = is_public_api + self.domain_id = domain_id + self.domain_name = domain_name + super(RequestContext, self).__init__(auth_token=auth_token, user=user, tenant=tenant, - domain=domain, - user_domain=user_domain, - project_domain=project_domain, is_admin=is_admin, read_only=read_only, - show_deleted=False, + show_deleted=show_deleted, request_id=request_id) - self.roles = roles - self.user_name = user_name - self.auth_url = auth_url - self.trust_id = trust_id - self.auth_token_info = auth_token_info def to_dict(self): - data = super(RequestContext, self).to_dict() - data.update(roles=self.roles, user_name=self.user_name, - auth_url=self.auth_url, - auth_token_info=self.auth_token_info, - trust_id=self.trust_id) - return data + return {'auth_token': self.auth_token, + 'user': self.user, + 'tenant': self.tenant, + 'is_admin': self.is_admin, + 'read_only': self.read_only, + 'show_deleted': self.show_deleted, + 'request_id': self.request_id, + 'domain_id': self.domain_id, + 'domain_name': self.domain_name, + 'is_public_api': self.is_public_api} @classmethod def from_dict(cls, values): - allowed = [arg for arg in - inspect.getargspec(RequestContext.__init__).args - if arg != 'self'] - kwargs = dict((k, v) for (k, v) in values.items() if k in allowed) - return cls(**kwargs) + values.pop('user', None) + values.pop('tenant', None) + return cls(**values) diff --git a/magnum/common/rpc.py b/magnum/common/rpc.py new file mode 100644 index 0000000000..387a4bbe7c --- /dev/null +++ b/magnum/common/rpc.py @@ -0,0 +1,148 @@ +# Copyright 2014 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +__all__ = [ + 'init', + 'cleanup', + 'set_defaults', + 'add_extra_exmods', + 'clear_extra_exmods', + 'get_allowed_exmods', + 'RequestContextSerializer', + 'get_client', + 'get_server', + 'get_notifier', + 'TRANSPORT_ALIASES', +] + +from oslo.config import cfg +from oslo import messaging +from oslo.serialization import jsonutils + +from magnum.common import context as magnum_context +from magnum.common import exception + + +CONF = cfg.CONF +TRANSPORT = None +NOTIFIER = None + +ALLOWED_EXMODS = [ + exception.__name__, +] +EXTRA_EXMODS = [] + +# NOTE(lucasagomes): The magnum.openstack.common.rpc entries are for +# backwards compat with IceHouse rpc_backend configuration values. +TRANSPORT_ALIASES = { + 'magnum.openstack.common.rpc.impl_kombu': 'rabbit', + 'magnum.openstack.common.rpc.impl_qpid': 'qpid', + 'magnum.openstack.common.rpc.impl_zmq': 'zmq', + 'magnum.rpc.impl_kombu': 'rabbit', + 'magnum.rpc.impl_qpid': 'qpid', + 'magnum.rpc.impl_zmq': 'zmq', +} + + +def init(conf): + global TRANSPORT, NOTIFIER + exmods = get_allowed_exmods() + TRANSPORT = messaging.get_transport(conf, + allowed_remote_exmods=exmods, + aliases=TRANSPORT_ALIASES) + serializer = RequestContextSerializer(JsonPayloadSerializer()) + NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer) + + +def cleanup(): + global TRANSPORT, NOTIFIER + assert TRANSPORT is not None + assert NOTIFIER is not None + TRANSPORT.cleanup() + TRANSPORT = NOTIFIER = None + + +def set_defaults(control_exchange): + messaging.set_transport_defaults(control_exchange) + + +def add_extra_exmods(*args): + EXTRA_EXMODS.extend(args) + + +def clear_extra_exmods(): + del EXTRA_EXMODS[:] + + +def get_allowed_exmods(): + return ALLOWED_EXMODS + EXTRA_EXMODS + + +class JsonPayloadSerializer(messaging.NoOpSerializer): + @staticmethod + def serialize_entity(context, entity): + return jsonutils.to_primitive(entity, convert_instances=True) + + +class RequestContextSerializer(messaging.Serializer): + + def __init__(self, base): + self._base = base + + def serialize_entity(self, context, entity): + if not self._base: + return entity + return self._base.serialize_entity(context, entity) + + def deserialize_entity(self, context, entity): + if not self._base: + return entity + return self._base.deserialize_entity(context, entity) + + def serialize_context(self, context): + return context + + def deserialize_context(self, context): + return magnum_context.RequestContext.from_dict(context) + + +def get_transport_url(url_str=None): + return messaging.TransportURL.parse(CONF, url_str, TRANSPORT_ALIASES) + + +def get_client(target, version_cap=None, serializer=None): + assert TRANSPORT is not None + serializer = RequestContextSerializer(serializer) + return messaging.RPCClient(TRANSPORT, + target, + version_cap=version_cap, + serializer=serializer) + + +def get_server(target, endpoints, serializer=None): + assert TRANSPORT is not None + serializer = RequestContextSerializer(serializer) + return messaging.get_rpc_server(TRANSPORT, + target, + endpoints, + executor='eventlet', + serializer=serializer) + + +def get_notifier(service=None, host=None, publisher_id=None): + assert NOTIFIER is not None + if not publisher_id: + publisher_id = "%s.%s" % (service, host or CONF.host) + return NOTIFIER.prepare(publisher_id=publisher_id) diff --git a/magnum/common/service.py b/magnum/common/service.py index 0b102a2049..804418aacb 100644 --- a/magnum/common/service.py +++ b/magnum/common/service.py @@ -1,23 +1,117 @@ -# Copyright 2013 - Red Hat, Inc. +# -*- encoding: utf-8 -*- # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# Copyright © 2012 eNovance # -# http://www.apache.org/licenses/LICENSE-2.0 +# Author: Julien Danjou +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import socket from oslo.config import cfg +from oslo import messaging +from oslo.utils import importutils -from magnum.openstack.common import log as logging +from magnum.common import config +from magnum.common import rpc +from magnum.objects import base as objects_base +from magnum.openstack.common._i18n import _LE +from magnum.openstack.common._i18n import _LI +from magnum.openstack.common import context +from magnum.openstack.common import log +from magnum.openstack.common import service + + +service_opts = [ + cfg.IntOpt('periodic_interval', + default=60, + help='Seconds between running periodic tasks.'), + cfg.StrOpt('host', + default=socket.getfqdn(), + help='Name of this node. This can be an opaque identifier. ' + 'It is not necessarily a hostname, FQDN, or IP address. ' + 'However, the node name must be valid within ' + 'an AMQP key, and if using ZeroMQ, a valid ' + 'hostname, FQDN, or IP address.'), +] + +cfg.CONF.register_opts(service_opts) + +LOG = log.getLogger(__name__) + + +class RPCService(service.Service): + + def __init__(self, host, manager_module, manager_class): + super(RPCService, self).__init__() + self.host = host + manager_module = importutils.try_import(manager_module) + manager_class = getattr(manager_module, manager_class) + self.manager = manager_class(host, manager_module.MANAGER_TOPIC) + self.topic = self.manager.topic + self.rpcserver = None + + def start(self): + super(RPCService, self).start() + admin_context = context.RequestContext('admin', 'admin', is_admin=True) + self.tg.add_dynamic_timer( + self.manager.periodic_tasks, + periodic_interval_max=cfg.CONF.periodic_interval, + context=admin_context) + + self.manager.init_host() + target = messaging.Target(topic=self.topic, server=self.host) + endpoints = [self.manager] + serializer = objects_base.MagnumObjectSerializer() + self.rpcserver = rpc.get_server(target, endpoints, serializer) + self.rpcserver.start() + LOG.info(_LI('Created RPC server for service %(service)s on host ' + '%(host)s.'), + {'service': self.topic, 'host': self.host}) + + def stop(self): + super(RPCService, self).stop() + try: + self.rpcserver.stop() + self.rpcserver.wait() + except Exception as e: + LOG.exception(_LE('Service error occurred when stopping the ' + 'RPC server. Error: %s'), e) + try: + self.manager.del_host() + except Exception as e: + LOG.exception(_LE('Service error occurred when cleaning up ' + 'the RPC manager. Error: %s'), e) + LOG.info(_LI('Stopped RPC server for service %(service)s on host ' + '%(host)s.'), + {'service': self.topic, 'host': self.host}) def prepare_service(argv=[]): - cfg.CONF(argv[1:], project='magnum') - logging.setup('magnum') - # may make sense to load DB here + config.parse_args(argv) + cfg.set_defaults(log.log_opts, + default_log_levels=['amqp=WARN', + 'amqplib=WARN', + 'qpid.messaging=INFO', + 'sqlalchemy=WARN', + 'keystoneclient=INFO', + 'stevedore=INFO', + 'eventlet.wsgi.server=WARN', + 'iso8601=WARN', + 'paramiko=WARN', + 'requests=WARN', + 'neutronclient=WARN', + 'glanceclient=WARN', + 'magnum.openstack.common=WARN', + ]) + log.setup('magnum')