Add RPC backend service

This uses a simplified RPC model while integrating with the
versioned objects serializer.

bay-create is handled by the backend.
bay-list is handled by the RPC service.

Change-Id: I384994584ad7f928df9366a2e6f677951a450e40
This commit is contained in:
Steven Dake 2014-12-06 13:13:50 -07:00
parent 5405375608
commit 6349b4e384
11 changed files with 278 additions and 384 deletions

View File

@ -26,6 +26,8 @@ from magnum.api.controllers import link
from magnum.api.controllers.v1 import collection
from magnum.api.controllers.v1 import types
from magnum.api.controllers.v1 import utils as api_utils
from magnum.backend import api
from magnum.common import context
from magnum.common import exception
from magnum import objects
@ -88,6 +90,9 @@ class Bay(base.APIBase):
"""A list containing a self link and associated bay links"""
def __init__(self, **kwargs):
super(Bay, self).__init__()
self.backend_api = api.API(context=context.RequestContext())
self.fields = []
fields = list(objects.Bay.fields)
# NOTE(lucasagomes): bay_uuid is not part of objects.Bay.fields
@ -152,6 +157,7 @@ class BayCollection(collection.Collection):
def __init__(self, **kwargs):
self._type = 'bays'
self.backend_api = api.API(context=context.RequestContext())
@staticmethod
def convert_with_links(rpc_bays, limit, url=None, expand=False, **kwargs):
@ -170,6 +176,9 @@ class BayCollection(collection.Collection):
class BaysController(rest.RestController):
"""REST controller for Bays."""
def __init__(self):
super(BaysController, self).__init__()
self.backend_api = api.API(context=context.RequestContext())
from_bays = False
"""A flag to indicate if the requests to this controller are coming
@ -191,9 +200,9 @@ class BaysController(rest.RestController):
marker_obj = objects.Bay.get_by_uuid(pecan.request.context,
marker)
bays = objects.Bay.list(pecan.request.context, limit,
marker_obj, sort_key=sort_key,
sort_dir=sort_dir)
bays = self.backend_api.bay_list(pecan.request.context, limit,
marker_obj, sort_key=sort_key,
sort_dir=sort_dir)
return BayCollection.convert_with_links(bays, limit,
url=resource_url,
@ -259,12 +268,12 @@ class BaysController(rest.RestController):
if self.from_bays:
raise exception.OperationNotPermitted
new_bay = objects.Bay(pecan.request.context,
**bay.as_dict())
new_bay.create()
new_bay = objects.Bay(pecan.request.context, **bay.as_dict())
res_bay = self.backend_api.bay_create(new_bay)
# Set the HTTP Location Header
pecan.response.location = link.build_url('bays', new_bay.uuid)
return Bay.convert_with_links(new_bay)
pecan.response.location = link.build_url('bays', res_bay.uuid)
return Bay.convert_with_links(res_bay)
@wsme.validate(types.uuid, [BayPatchType])
@wsme_pecan.wsexpose(Bay, types.uuid, body=[BayPatchType])

View File

@ -14,7 +14,8 @@
from oslo.config import cfg
from magnum.common.rpc import service
from magnum.common import rpc_service as service
from magnum import objects
# The Backend API class serves as a AMQP client for communicating
@ -30,11 +31,11 @@ class API(service.API):
# Bay Operations
def bay_create(self, id, name, type):
return self._call('bay_create', id=id, name=name, type=type)
def bay_create(self, bay):
return self._call('bay_create', bay=bay)
def bay_list(self):
return self._call('bay_list')
def bay_list(self, context, limit, marker, sort_key, sort_dir):
return objects.Bay.list(context, limit, marker, sort_key, sort_dir)
def bay_delete(self, uuid):
return self._call('bay_delete', uuid=uuid)
@ -44,8 +45,8 @@ class API(service.API):
# Service Operations
def service_create(self, uuid, contents):
return self._call('service_create', uuid=uuid, contents=contents)
def service_create(self, service):
return self._call('service_create', service=service)
def service_list(self):
return self._call('service_list')
@ -58,8 +59,8 @@ class API(service.API):
# Pod Operations
def pod_create(self, uuid, contents):
return self._call('pod_create4', uuid=uuid, contents=contents)
def pod_create(self, uuid, pod):
return self._call('pod_create', uuid=uuid, pod=pod)
def pod_list(self):
return self._call('pod_list')
@ -72,8 +73,8 @@ class API(service.API):
# Container operations
def container_create(self, uuid, contents):
return self._call('container_create', uuid=uuid)
def container_create(self, uuid, container):
return self._call('container_create', container=container)
def container_list(self):
return self._call('container_list')

View File

@ -1,107 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Magnum Bay Heat RPC handler."""
from magnum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
# These are the backend operations. They are executed by the backend
# service. API calls via AMQP (within the ReST API) trigger the handlers to
# be called.
class Handler(object):
def __init__(self):
super(Handler, self).__init__()
# Bay Operations
def bay_create(id, name, type):
LOG.debug('heat bay_create\n')
return None
def bay_list():
LOG.debug('heat bay_list\n')
return None
def bay_delete(uuid):
LOG.debug('heat bay_delete\n')
return None
def bay_show(uuid):
LOG.debug('heat bay_show\n')
return None
# Service Operations
def service_create(uuid, contents):
return None
def service_list():
return None
def service_delete():
return None
def service_show(uuid):
return None
# Pod Operations
def pod_create(uuid, contents):
return None
def pod_list():
return None
def pod_delete(uuid):
return None
def pod_show(uuid):
return None
# Container operations
def container_create(uuid, contents):
return None
def container_list():
return None
def container_delete(uuid):
return None
def container_show(uuid):
return None
def container_reboot(uuid):
return None
def container_stop(uuid):
return None
def container_start(uuid):
return None
def container_pause(uuid):
return None
def container_unpause(uuid):
return None
def container_logs(uuid):
return None
def container_execute(uuid):
return None

View File

@ -27,81 +27,19 @@ class Handler(object):
# Bay Operations
def bay_create(id, name, type):
def bay_create(self, ctxt, bay):
LOG.debug('ironic bay_create\n')
return None
bay.create()
return bay
def bay_list():
def bay_list(self, ctxt):
LOG.debug('ironic bay_list\n')
return None
def bay_delete(uuid):
def bay_delete(self, ctxt, uuid):
LOG.debug('ironic bay_delete\n')
return None
def bay_show(uuid):
def bay_show(self, ctxt, uuid):
LOG.debug('ironic bay_show\n')
return None
# Service Operations
def service_create(uuid, contents):
return None
def service_list():
return None
def service_delete():
return None
def service_show(uuid):
return None
# Pod Operations
def pod_create(uuid, contents):
return None
def pod_list():
return None
def pod_delete(uuid):
return None
def pod_show(uuid):
return None
# Container operations
def container_create(uuid, contents):
return None
def container_list():
return None
def container_delete(uuid):
return None
def container_show(uuid):
return None
def container_reboot(uuid):
return None
def container_stop(uuid):
return None
def container_start(uuid):
return None
def container_pause(uuid):
return None
def container_unpause(uuid):
return None
def container_logs(uuid):
return None
def container_execute(uuid):
return None

View File

@ -25,48 +25,6 @@ class Handler(object):
def __init__(self):
super(Handler, self).__init__()
# Bay Operations
def bay_create(id, name, type):
return None
def bay_list():
return None
def bay_delete(uuid):
return None
def bay_show(uuid):
return None
# Service Operations
def service_create(uuid, contents):
return None
def service_list():
return None
def service_delete():
return None
def service_show(uuid):
return None
# Pod Operations
def pod_create(uuid, contents):
return None
def pod_list():
return None
def pod_delete(uuid):
return None
def pod_show(uuid):
return None
# Container operations
def container_create(uuid, contents):

View File

@ -18,7 +18,7 @@ from magnum.openstack.common import utils
LOG = logging.getLogger(__name__)
class KubeHandler(object):
class Handler(object):
"""These are the backend operations. They are executed by the backend
service. API calls via AMQP (within the ReST API) trigger the
handlers to be called.
@ -28,7 +28,7 @@ class KubeHandler(object):
"""
def __init__(self):
super(KubeHandler, self).__init__()
super(Handler, self).__init__()
@staticmethod
def service_create(uuid, contents):
@ -160,4 +160,4 @@ class KubeHandler(object):
return out
except Exception as e:
LOG.error("Couldn't delete pod %s due to error %s" % (uuid, e))
return None
return None

80
magnum/backend/manager.py Normal file
View File

@ -0,0 +1,80 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from eventlet import greenpool
from oslo.config import cfg
from oslo import messaging
from magnum.common import exception
from magnum.openstack.common import periodic_task
MANAGER_TOPIC = 'magnum_backend'
class BackendManager(periodic_task.PeriodicTasks):
"""Magnum Backend manager main class."""
RPC_API_VERSION = '1.0'
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, host, topic):
super(BackendManager, self).__init__()
if not host:
host = cfg.CONF.host
self.host = host
self.topic = topic
def _conductor_service_record_keepalive(self):
while not self._keepalive_evt.is_set():
self._keepalive_evt.wait(1)
def _spawn_worker(self, func, *args, **kwargs):
"""Create a greenthread to run func(*args, **kwargs).
Spawns a greenthread if there are free slots in pool, otherwise raises
exception. Execution control returns immediately to the caller.
:returns: GreenThread object.
:raises: NoFreeConductorWorker if worker pool is currently full.
"""
if self._worker_pool.free():
return self._worker_pool.spawn(func, *args, **kwargs)
else:
raise exception.NoFreeConductorWorker()
def create_bay(self, context, bay):
bay.create()
return bay
def init_host(self):
self._worker_pool = greenpool.GreenPool(8)
# Spawn a dedicated greenthread for the keepalive
# self._keepalive_evt = threading.Event()
# self._spawn_worker(self._conductor_service_record_keepalive)
def del_host(self):
pass
def periodic_tasks(self, context, raise_on_error=False):
"""Periodic tasks are run at pre-specified interval."""
res = self.run_periodic_tasks(context, raise_on_error=raise_on_error)
return res
@periodic_task.periodic_task
def trigger(self, context):
pass

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Starter script for the Magnum Magnum service."""
"""Starter script for the Magnum backend service."""
import logging as std_logging
import os
@ -20,11 +20,10 @@ import sys
from oslo.config import cfg
from magnum.backend.handlers import bay_heat as bay_heat
from magnum.backend.handlers import bay_ironic as bay_ironic
from magnum.backend.handlers import docker as docker_backend
from magnum.backend.handlers import k8s as k8s_backend
from magnum.common.rpc import service
from magnum.common import rpc_service as service
from magnum.openstack.common._i18n import _
from magnum.openstack.common import log as logging
@ -44,7 +43,6 @@ def main():
endpoints = [
docker_backend.Handler(),
k8s_backend.Handler(),
bay_heat.Handler(),
bay_ironic.Handler()
]
server = service.Service(cfg.CONF.backend.topic,

View File

@ -0,0 +1,103 @@
# Copyright 2014 - Rackspace Hosting
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Common RPC service and API tools for Magnum."""
import eventlet
from oslo.config import cfg
from oslo import messaging
import magnum.common.context
from magnum.objects import base as objects_base
# NOTE(paulczar):
# Ubuntu 14.04 forces librabbitmq when kombu is used
# Unfortunately it forces a version that has a crash
# bug. Calling eventlet.monkey_patch() tells kombu
# to use libamqp instead.
eventlet.monkey_patch()
# NOTE(asalkeld):
# The magnum.openstack.common.rpc entries are for compatability
# with devstack rpc_backend configuration values.
TRANSPORT_ALIASES = {
'magnum.openstack.common.rpc.impl_kombu': 'rabbit',
'magnum.openstack.common.rpc.impl_qpid': 'qpid',
'magnum.openstack.common.rpc.impl_zmq': 'zmq',
}
class RequestContextSerializer(messaging.Serializer):
def __init__(self, base):
self._base = base
def serialize_entity(self, context, entity):
if not self._base:
return entity
return self._base.serialize_entity(context, entity)
def deserialize_entity(self, context, entity):
if not self._base:
return entity
return self._base.deserialize_entity(context, entity)
def serialize_context(self, context):
return context.to_dict()
def deserialize_context(self, context):
return magnum.common.context.RequestContext.from_dict(context)
class Service(object):
_server = None
def __init__(self, topic, server, handlers):
serializer = RequestContextSerializer(
objects_base.MagnumObjectSerializer())
transport = messaging.get_transport(cfg.CONF,
aliases=TRANSPORT_ALIASES)
# TODO(asalkeld) add support for version='x.y'
target = messaging.Target(topic=topic, server=server)
self._server = messaging.get_rpc_server(transport, target, handlers,
serializer=serializer)
def serve(self):
self._server.start()
self._server.wait()
class API(object):
def __init__(self, transport=None, context=None, topic=None):
serializer = RequestContextSerializer(
objects_base.MagnumObjectSerializer())
if transport is None:
transport = messaging.get_transport(cfg.CONF,
aliases=TRANSPORT_ALIASES)
self._context = context
if topic is None:
topic = ''
target = messaging.Target(topic=topic)
self._client = messaging.RPCClient(transport, target,
serializer=serializer)
def _call(self, method, *args, **kwargs):
return self._client.call(self._context, method, *args, **kwargs)
def _cast(self, method, *args, **kwargs):
self._client.cast(self._context, method, *args, **kwargs)
def echo(self, message):
self._cast('echo', message=message)

View File

@ -1,117 +1,22 @@
# -*- encoding: utf-8 -*-
# Copyright 2013 - Red Hat, Inc.
#
# Copyright © 2012 eNovance <licensing@enovance.com>
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# Author: Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from oslo import messaging
from oslo.utils import importutils
from magnum.common import config
from magnum.common import rpc
from magnum.objects import base as objects_base
from magnum.openstack.common._i18n import _LE
from magnum.openstack.common._i18n import _LI
from magnum.openstack.common import context
from magnum.openstack.common import log
from magnum.openstack.common import service
service_opts = [
cfg.IntOpt('periodic_interval',
default=60,
help='Seconds between running periodic tasks.'),
cfg.StrOpt('host',
default=socket.getfqdn(),
help='Name of this node. This can be an opaque identifier. '
'It is not necessarily a hostname, FQDN, or IP address. '
'However, the node name must be valid within '
'an AMQP key, and if using ZeroMQ, a valid '
'hostname, FQDN, or IP address.'),
]
cfg.CONF.register_opts(service_opts)
LOG = log.getLogger(__name__)
class RPCService(service.Service):
def __init__(self, host, manager_module, manager_class):
super(RPCService, self).__init__()
self.host = host
manager_module = importutils.try_import(manager_module)
manager_class = getattr(manager_module, manager_class)
self.manager = manager_class(host, manager_module.MANAGER_TOPIC)
self.topic = self.manager.topic
self.rpcserver = None
def start(self):
super(RPCService, self).start()
admin_context = context.RequestContext('admin', 'admin', is_admin=True)
self.tg.add_dynamic_timer(
self.manager.periodic_tasks,
periodic_interval_max=cfg.CONF.periodic_interval,
context=admin_context)
self.manager.init_host()
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [self.manager]
serializer = objects_base.MagnumObjectSerializer()
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()
LOG.info(_LI('Created RPC server for service %(service)s on host '
'%(host)s.'),
{'service': self.topic, 'host': self.host})
def stop(self):
super(RPCService, self).stop()
try:
self.rpcserver.stop()
self.rpcserver.wait()
except Exception as e:
LOG.exception(_LE('Service error occurred when stopping the '
'RPC server. Error: %s'), e)
try:
self.manager.del_host()
except Exception as e:
LOG.exception(_LE('Service error occurred when cleaning up '
'the RPC manager. Error: %s'), e)
LOG.info(_LI('Stopped RPC server for service %(service)s on host '
'%(host)s.'),
{'service': self.topic, 'host': self.host})
from magnum.openstack.common import log as logging
def prepare_service(argv=[]):
config.parse_args(argv)
cfg.set_defaults(log.log_opts,
default_log_levels=['amqp=WARN',
'amqplib=WARN',
'qpid.messaging=INFO',
'sqlalchemy=WARN',
'keystoneclient=INFO',
'stevedore=INFO',
'eventlet.wsgi.server=WARN',
'iso8601=WARN',
'paramiko=WARN',
'requests=WARN',
'neutronclient=WARN',
'glanceclient=WARN',
'magnum.openstack.common=WARN',
])
log.setup('magnum')
cfg.CONF(argv[1:], project='magnum')
logging.setup('magnum')

View File

@ -10,9 +10,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from magnum.backend import api
from magnum import tests
from magnum.tests.db import base as db_base
from mock import patch
class TestRootController(tests.FunctionalTest):
def test_version(self):
@ -69,46 +72,52 @@ class TestRootController(tests.FunctionalTest):
class TestBayController(db_base.DbTestCase):
def simulate_rpc_bay_create(self, bay):
bay.create()
return bay
def test_bay_api(self):
# Create a bay
params = '{"name": "bay_example_A", "type": "virt", \
"image_id": "Fedora", "node_count": "3"}'
response = self.app.post('/v1/bays',
params=params,
content_type='application/json')
self.assertEqual(response.status_int, 201)
with patch.object(api.API, 'bay_create') as mock_method:
# Create a bay
mock_method.side_effect = self.simulate_rpc_bay_create
params = '{"name": "bay_example_A", "type": "virt", \
"image_id": "Fedora", "node_count": "3"}'
response = self.app.post('/v1/bays',
params=params,
content_type='application/json')
self.assertEqual(response.status_int, 201)
# Get all bays
response = self.app.get('/v1/bays')
self.assertEqual(response.status_int, 200)
self.assertEqual(1, len(response.json))
c = response.json['bays'][0]
self.assertIsNotNone(c.get('uuid'))
self.assertEqual('bay_example_A', c.get('name'))
self.assertEqual('virt', c.get('type'))
self.assertEqual('Fedora', c.get('image_id'))
self.assertEqual(3, c.get('node_count'))
# Get all bays
response = self.app.get('/v1/bays')
self.assertEqual(response.status_int, 200)
self.assertEqual(1, len(response.json))
c = response.json['bays'][0]
self.assertIsNotNone(c.get('uuid'))
self.assertEqual('bay_example_A', c.get('name'))
self.assertEqual('virt', c.get('type'))
self.assertEqual('Fedora', c.get('image_id'))
self.assertEqual(3, c.get('node_count'))
# Get just the one we created
response = self.app.get('/v1/bays/%s' % c.get('uuid'))
self.assertEqual(response.status_int, 200)
# Get just the one we created
response = self.app.get('/v1/bays/%s' % c.get('uuid'))
self.assertEqual(response.status_int, 200)
# Update the description
params = [{'path': '/name',
'value': 'bay_example_B',
'op': 'replace'}]
response = self.app.patch_json('/v1/bays/%s' % c.get('uuid'),
params=params)
self.assertEqual(response.status_int, 200)
# Update the description
params = [{'path': '/name',
'value': 'bay_example_B',
'op': 'replace'}]
response = self.app.patch_json('/v1/bays/%s' % c.get('uuid'),
params=params)
self.assertEqual(response.status_int, 200)
# Delete the bay we created
response = self.app.delete('/v1/bays/%s' % c.get('uuid'))
self.assertEqual(response.status_int, 204)
# Delete the bay we created
response = self.app.delete('/v1/bays/%s' % c.get('uuid'))
self.assertEqual(response.status_int, 204)
response = self.app.get('/v1/bays')
self.assertEqual(response.status_int, 200)
c = response.json['bays']
self.assertEqual(0, len(c))
response = self.app.get('/v1/bays')
self.assertEqual(response.status_int, 200)
c = response.json['bays']
self.assertEqual(0, len(c))
class TestNodeController(db_base.DbTestCase):