Copy ironic/common files to magnum/common for RPC server

These are straight copies of ironic/ironic/common files needed
for the RPC server.

Change-Id: If493eaee919e55b2e5f85f2411c0b0d813f21bde
This commit is contained in:
Steven Dake 2014-12-06 13:07:20 -07:00
parent efaeeb0457
commit 3f85ccce3a
4 changed files with 298 additions and 48 deletions

View File

@ -17,11 +17,14 @@
from oslo.config import cfg
from magnum.common import rpc
from magnum import version
def parse_args(argv, default_config_files=None):
rpc.set_defaults(control_exchange='magnum')
cfg.CONF(argv[1:],
project='magnum',
version=version.version_info.release_string(),
default_config_files=default_config_files)
rpc.init(cfg.CONF)

View File

@ -1,54 +1,59 @@
# Copyright 2014 - Mirantis, Inc.
# -*- encoding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from magnum.openstack.common import context
class RequestContext(context.RequestContext):
def __init__(self, auth_token=None, user=None, tenant=None, domain=None,
user_domain=None, project_domain=None, is_admin=False,
read_only=False, request_id=None, user_name=None, roles=None,
auth_url=None, trust_id=None, auth_token_info=None):
"""Extends security contexts from the OpenStack common library."""
def __init__(self, auth_token=None, domain_id=None, domain_name=None,
user=None, tenant=None, is_admin=False, is_public_api=False,
read_only=False, show_deleted=False, request_id=None):
"""Stores several additional request parameters:
:param domain_id: The ID of the domain.
:param domain_name: The name of the domain.
:param is_public_api: Specifies whether the request should be processed
without authentication.
"""
self.is_public_api = is_public_api
self.domain_id = domain_id
self.domain_name = domain_name
super(RequestContext, self).__init__(auth_token=auth_token,
user=user, tenant=tenant,
domain=domain,
user_domain=user_domain,
project_domain=project_domain,
is_admin=is_admin,
read_only=read_only,
show_deleted=False,
show_deleted=show_deleted,
request_id=request_id)
self.roles = roles
self.user_name = user_name
self.auth_url = auth_url
self.trust_id = trust_id
self.auth_token_info = auth_token_info
def to_dict(self):
data = super(RequestContext, self).to_dict()
data.update(roles=self.roles, user_name=self.user_name,
auth_url=self.auth_url,
auth_token_info=self.auth_token_info,
trust_id=self.trust_id)
return data
return {'auth_token': self.auth_token,
'user': self.user,
'tenant': self.tenant,
'is_admin': self.is_admin,
'read_only': self.read_only,
'show_deleted': self.show_deleted,
'request_id': self.request_id,
'domain_id': self.domain_id,
'domain_name': self.domain_name,
'is_public_api': self.is_public_api}
@classmethod
def from_dict(cls, values):
allowed = [arg for arg in
inspect.getargspec(RequestContext.__init__).args
if arg != 'self']
kwargs = dict((k, v) for (k, v) in values.items() if k in allowed)
return cls(**kwargs)
values.pop('user', None)
values.pop('tenant', None)
return cls(**values)

148
magnum/common/rpc.py Normal file
View File

@ -0,0 +1,148 @@
# Copyright 2014 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__all__ = [
'init',
'cleanup',
'set_defaults',
'add_extra_exmods',
'clear_extra_exmods',
'get_allowed_exmods',
'RequestContextSerializer',
'get_client',
'get_server',
'get_notifier',
'TRANSPORT_ALIASES',
]
from oslo.config import cfg
from oslo import messaging
from oslo.serialization import jsonutils
from magnum.common import context as magnum_context
from magnum.common import exception
CONF = cfg.CONF
TRANSPORT = None
NOTIFIER = None
ALLOWED_EXMODS = [
exception.__name__,
]
EXTRA_EXMODS = []
# NOTE(lucasagomes): The magnum.openstack.common.rpc entries are for
# backwards compat with IceHouse rpc_backend configuration values.
TRANSPORT_ALIASES = {
'magnum.openstack.common.rpc.impl_kombu': 'rabbit',
'magnum.openstack.common.rpc.impl_qpid': 'qpid',
'magnum.openstack.common.rpc.impl_zmq': 'zmq',
'magnum.rpc.impl_kombu': 'rabbit',
'magnum.rpc.impl_qpid': 'qpid',
'magnum.rpc.impl_zmq': 'zmq',
}
def init(conf):
global TRANSPORT, NOTIFIER
exmods = get_allowed_exmods()
TRANSPORT = messaging.get_transport(conf,
allowed_remote_exmods=exmods,
aliases=TRANSPORT_ALIASES)
serializer = RequestContextSerializer(JsonPayloadSerializer())
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer)
def cleanup():
global TRANSPORT, NOTIFIER
assert TRANSPORT is not None
assert NOTIFIER is not None
TRANSPORT.cleanup()
TRANSPORT = NOTIFIER = None
def set_defaults(control_exchange):
messaging.set_transport_defaults(control_exchange)
def add_extra_exmods(*args):
EXTRA_EXMODS.extend(args)
def clear_extra_exmods():
del EXTRA_EXMODS[:]
def get_allowed_exmods():
return ALLOWED_EXMODS + EXTRA_EXMODS
class JsonPayloadSerializer(messaging.NoOpSerializer):
@staticmethod
def serialize_entity(context, entity):
return jsonutils.to_primitive(entity, convert_instances=True)
class RequestContextSerializer(messaging.Serializer):
def __init__(self, base):
self._base = base
def serialize_entity(self, context, entity):
if not self._base:
return entity
return self._base.serialize_entity(context, entity)
def deserialize_entity(self, context, entity):
if not self._base:
return entity
return self._base.deserialize_entity(context, entity)
def serialize_context(self, context):
return context
def deserialize_context(self, context):
return magnum_context.RequestContext.from_dict(context)
def get_transport_url(url_str=None):
return messaging.TransportURL.parse(CONF, url_str, TRANSPORT_ALIASES)
def get_client(target, version_cap=None, serializer=None):
assert TRANSPORT is not None
serializer = RequestContextSerializer(serializer)
return messaging.RPCClient(TRANSPORT,
target,
version_cap=version_cap,
serializer=serializer)
def get_server(target, endpoints, serializer=None):
assert TRANSPORT is not None
serializer = RequestContextSerializer(serializer)
return messaging.get_rpc_server(TRANSPORT,
target,
endpoints,
executor='eventlet',
serializer=serializer)
def get_notifier(service=None, host=None, publisher_id=None):
assert NOTIFIER is not None
if not publisher_id:
publisher_id = "%s.%s" % (service, host or CONF.host)
return NOTIFIER.prepare(publisher_id=publisher_id)

View File

@ -1,23 +1,117 @@
# Copyright 2013 - Red Hat, Inc.
# -*- encoding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Copyright © 2012 eNovance <licensing@enovance.com>
#
# http://www.apache.org/licenses/LICENSE-2.0
# Author: Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
from oslo.config import cfg
from oslo import messaging
from oslo.utils import importutils
from magnum.openstack.common import log as logging
from magnum.common import config
from magnum.common import rpc
from magnum.objects import base as objects_base
from magnum.openstack.common._i18n import _LE
from magnum.openstack.common._i18n import _LI
from magnum.openstack.common import context
from magnum.openstack.common import log
from magnum.openstack.common import service
service_opts = [
cfg.IntOpt('periodic_interval',
default=60,
help='Seconds between running periodic tasks.'),
cfg.StrOpt('host',
default=socket.getfqdn(),
help='Name of this node. This can be an opaque identifier. '
'It is not necessarily a hostname, FQDN, or IP address. '
'However, the node name must be valid within '
'an AMQP key, and if using ZeroMQ, a valid '
'hostname, FQDN, or IP address.'),
]
cfg.CONF.register_opts(service_opts)
LOG = log.getLogger(__name__)
class RPCService(service.Service):
def __init__(self, host, manager_module, manager_class):
super(RPCService, self).__init__()
self.host = host
manager_module = importutils.try_import(manager_module)
manager_class = getattr(manager_module, manager_class)
self.manager = manager_class(host, manager_module.MANAGER_TOPIC)
self.topic = self.manager.topic
self.rpcserver = None
def start(self):
super(RPCService, self).start()
admin_context = context.RequestContext('admin', 'admin', is_admin=True)
self.tg.add_dynamic_timer(
self.manager.periodic_tasks,
periodic_interval_max=cfg.CONF.periodic_interval,
context=admin_context)
self.manager.init_host()
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [self.manager]
serializer = objects_base.MagnumObjectSerializer()
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()
LOG.info(_LI('Created RPC server for service %(service)s on host '
'%(host)s.'),
{'service': self.topic, 'host': self.host})
def stop(self):
super(RPCService, self).stop()
try:
self.rpcserver.stop()
self.rpcserver.wait()
except Exception as e:
LOG.exception(_LE('Service error occurred when stopping the '
'RPC server. Error: %s'), e)
try:
self.manager.del_host()
except Exception as e:
LOG.exception(_LE('Service error occurred when cleaning up '
'the RPC manager. Error: %s'), e)
LOG.info(_LI('Stopped RPC server for service %(service)s on host '
'%(host)s.'),
{'service': self.topic, 'host': self.host})
def prepare_service(argv=[]):
cfg.CONF(argv[1:], project='magnum')
logging.setup('magnum')
# may make sense to load DB here
config.parse_args(argv)
cfg.set_defaults(log.log_opts,
default_log_levels=['amqp=WARN',
'amqplib=WARN',
'qpid.messaging=INFO',
'sqlalchemy=WARN',
'keystoneclient=INFO',
'stevedore=INFO',
'eventlet.wsgi.server=WARN',
'iso8601=WARN',
'paramiko=WARN',
'requests=WARN',
'neutronclient=WARN',
'glanceclient=WARN',
'magnum.openstack.common=WARN',
])
log.setup('magnum')