Knitting Pod and Service object flow for Kubernetes backend

Get the work flow in place for pods and services to use rpc
backend and integrate kube handlers.

Partially-Implements: blueprint magnum-backend-kubernetes

Change-Id: Iabd7f5f1217402c18820170521354ac04e436551
This commit is contained in:
Pradeep Kilambi 2014-12-12 13:30:51 -05:00
parent de215c5fa6
commit 209a639ba1
6 changed files with 272 additions and 110 deletions

View File

@ -24,7 +24,9 @@ from magnum.api.controllers import link
from magnum.api.controllers.v1 import collection
from magnum.api.controllers.v1 import types
from magnum.api.controllers.v1 import utils as api_utils
from magnum.common import context
from magnum.common import exception
from magnum.conductor import api
from magnum import objects
@ -83,6 +85,9 @@ class Pod(base.APIBase):
"""A list containing a self link and associated pod links"""
def __init__(self, **kwargs):
super(Pod, self).__init__()
self.backend_api = api.API(context=context.RequestContext())
self.fields = []
fields = list(objects.Pod.fields)
# NOTE(lucasagomes): pod_uuid is not part of objects.Pod.fields
@ -111,10 +116,10 @@ class Pod(base.APIBase):
pod.pod_id = wtypes.Unset
pod.links = [link.Link.make_link('self', url,
'pods', pod.uuid),
link.Link.make_link('bookmark', url,
'pods', pod.uuid,
bookmark=True)
'pods', pod.uuid),
link.Link.make_link('bookmark', url,
'pods', pod.uuid,
bookmark=True)
]
return pod
@ -145,12 +150,13 @@ class PodCollection(collection.Collection):
def __init__(self, **kwargs):
self._type = 'pods'
self.backend_api = api.API(context=context.RequestContext())
@staticmethod
def convert_with_links(rpc_pods, limit, url=None, expand=False, **kwargs):
collection = PodCollection()
collection.pods = [Pod.convert_with_links(p, expand)
for p in rpc_pods]
for p in rpc_pods]
collection.next = collection.get_next(limit, url=url, **kwargs)
return collection
@ -164,6 +170,10 @@ class PodCollection(collection.Collection):
class PodsController(rest.RestController):
"""REST controller for Pods."""
def __init__(self):
super(PodsController, self).__init__()
self.backend_api = api.API(context=context.RequestContext())
from_pods = False
"""A flag to indicate if the requests to this controller are coming
from the top-level resource Pods."""
@ -173,8 +183,8 @@ class PodsController(rest.RestController):
}
def _get_pods_collection(self, marker, limit,
sort_key, sort_dir, expand=False,
resource_url=None):
sort_key, sort_dir, expand=False,
resource_url=None):
limit = api_utils.validate_limit(limit)
sort_dir = api_utils.validate_sort_dir(sort_dir)
@ -182,11 +192,11 @@ class PodsController(rest.RestController):
marker_obj = None
if marker:
marker_obj = objects.Pod.get_by_uuid(pecan.request.context,
marker)
marker)
pods = objects.Pod.list(pecan.request.context, limit,
marker_obj, sort_key=sort_key,
sort_dir=sort_dir)
pods = self.backend_api.pod_list(pecan.request.context, limit,
marker_obj, sort_key=sort_key,
sort_dir=sort_dir)
return PodCollection.convert_with_links(pods, limit,
url=resource_url,
@ -211,7 +221,7 @@ class PodsController(rest.RestController):
@wsme_pecan.wsexpose(PodCollection, types.uuid,
types.uuid, int, wtypes.text, wtypes.text)
def detail(self, pod_uuid=None, marker=None, limit=None,
sort_key='id', sort_dir='asc'):
sort_key='id', sort_dir='asc'):
"""Retrieve a list of pods with detail.
:param pod_uuid: UUID of a pod, to get only pods for that pod.
@ -252,9 +262,9 @@ class PodsController(rest.RestController):
if self.from_pods:
raise exception.OperationNotPermitted
new_pod = objects.Pod(pecan.request.context,
**pod.as_dict())
new_pod.create()
pod_obj = objects.Pod(pecan.request.context,
**pod.as_dict())
new_pod = self.backend_api.pod_create(pod_obj)
# Set the HTTP Location Header
pecan.response.location = link.build_url('pods', new_pod.uuid)
return Pod.convert_with_links(new_pod)
@ -296,7 +306,7 @@ class PodsController(rest.RestController):
if hasattr(pecan.request, 'rpcapi'):
rpc_pod = objects.Pod.get_by_id(pecan.request.context,
rpc_pod.pod_id)
rpc_pod.pod_id)
topic = pecan.request.rpcapi.get_topic_for(rpc_pod)
new_pod = pecan.request.rpcapi.update_pod(
@ -317,5 +327,5 @@ class PodsController(rest.RestController):
raise exception.OperationNotPermitted
rpc_pod = objects.Pod.get_by_uuid(pecan.request.context,
pod_uuid)
rpc_pod.destroy()
pod_uuid)
self.backend_api.pod_delete(rpc_pod)

View File

@ -23,7 +23,9 @@ from magnum.api.controllers import link
from magnum.api.controllers.v1 import collection
from magnum.api.controllers.v1 import types
from magnum.api.controllers.v1 import utils as api_utils
from magnum.common import context
from magnum.common import exception
from magnum.conductor import api
from magnum import objects
@ -71,6 +73,9 @@ class Service(base.APIBase):
"""A list containing a self link and associated service links"""
def __init__(self, **kwargs):
super(Service, self).__init__()
self.backend_api = api.API(context=context.RequestContext())
self.fields = []
fields = list(objects.Service.fields)
fields.append('service_uuid')
@ -123,6 +128,7 @@ class ServiceCollection(collection.Collection):
def __init__(self, **kwargs):
self._type = 'services'
self.backend_api = api.API(context=context.RequestContext())
@staticmethod
def convert_with_links(rpc_services, limit, url=None,
@ -143,6 +149,10 @@ class ServiceCollection(collection.Collection):
class ServicesController(rest.RestController):
"""REST controller for Services."""
def __init__(self):
super(ServicesController, self).__init__()
self.backend_api = api.API(context=context.RequestContext())
from_services = False
"""A flag to indicate if the requests to this controller are coming
from the top-level resource Services."""
@ -163,9 +173,11 @@ class ServicesController(rest.RestController):
marker_obj = objects.Service.get_by_uuid(pecan.request.context,
marker)
services = objects.Service.list(pecan.request.context, limit,
marker_obj, sort_key=sort_key,
sort_dir=sort_dir)
services = self.backend_api.service_list(pecan.request.context,
limit,
marker_obj,
sort_key=sort_key,
sort_dir=sort_dir)
return ServiceCollection.convert_with_links(services, limit,
url=resource_url,
@ -232,9 +244,9 @@ class ServicesController(rest.RestController):
"""
if self.from_services:
raise exception.OperationNotPermitted
new_service = objects.Service(pecan.request.context,
service_obj = objects.Service(pecan.request.context,
**service.as_dict())
new_service.create()
new_service = self.backend_api.service_create(service_obj)
# Set the HTTP Location Header
pecan.response.location = link.build_url('services', new_service.uuid)
return Service.convert_with_links(new_service)
@ -299,4 +311,4 @@ class ServicesController(rest.RestController):
rpc_service = objects.Service.get_by_uuid(pecan.request.context,
service_uuid)
rpc_service.destroy()
self.backend_api.service_delete(rpc_service)

View File

@ -11,10 +11,9 @@
# limitations under the License.
"""API for interfacing with Magnum Backend."""
from oslo.config import cfg
from magnum.common import rpc_service as service
from magnum.common import rpc_service
from magnum import objects
@ -22,7 +21,7 @@ from magnum import objects
# on a topic exchange specific to the conductors. This allows the ReST
# API to trigger operations on the conductors
class API(service.API):
class API(rpc_service.API):
def __init__(self, transport=None, context=None):
cfg.CONF.import_opt('topic', 'magnum.conductor.config',
group='conductor')
@ -48,25 +47,27 @@ class API(service.API):
def service_create(self, service):
return self._call('service_create', service=service)
def service_list(self):
return self._call('service_list')
def service_list(self, context, limit, marker, sort_key, sort_dir):
# TODO(pkilambi): return kubectl results once we parse appropriately
# or figure out a clean way to interact with k8s.
return objects.Service.list(context, limit, marker, sort_key, sort_dir)
def service_delete(self, uuid):
return self._call('service_delete', uuid=uuid)
def service_delete(self, service):
return self._call('service_delete', service)
def service_show(self, uuid):
return self._call('service_show', uuid=uuid)
# Pod Operations
def pod_create(self, uuid, pod):
return self._call('pod_create', uuid=uuid, pod=pod)
def pod_create(self, pod):
return self._call('pod_create', pod=pod)
def pod_list(self):
return self._call('pod_list')
def pod_list(self, context, limit, marker, sort_key, sort_dir):
return objects.Pod.list(context, limit, marker, sort_key, sort_dir)
def pod_delete(self, uuid):
return self._call('pod_delete', uuid=uuid)
def pod_delete(self, pod):
return self._call('pod_delete', pod)
def pod_show(self, uuid):
return self._call('pod_show', uuid=uuid)

View File

@ -0,0 +1,115 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Magnum Kubernetes RPC handler."""
from magnum.conductor import kubecli
from magnum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class Handler(object):
"""These are the backend operations. They are executed by the backend
service. API calls via AMQP (within the ReST API) trigger the
handlers to be called.
This handler acts as an interface to executes kubectl command line
services.
"""
def __init__(self):
super(Handler, self).__init__()
self.kube_cli = kubecli.KubeClient()
def service_create(self, context, service):
LOG.debug("service_create")
# trigger a kubectl command
status = self.kube_cli.service_create(service)
if not status:
return None
# call the service object to persist in db
service.create(context)
return service
def service_update(self, context, service):
LOG.debug("service_create")
# trigger a kubectl command
status = self.kube_cli.service_update(service)
if not status:
return None
# call the service object to persist in db
service.refresh(context)
return service
def service_list(self, context):
LOG.debug("service_list")
return self.kube_cli.service_list()
def service_delete(self, context, service):
LOG.debug("service_delete")
# trigger a kubectl command
status = self.kube_cli.service_delete(service.uuid)
if not status:
return None
# call the service object to persist in db
service.destroy(context)
def service_get(self, context, uuid):
LOG.debug("service_get")
return self.kube_cli.service_get(uuid)
def service_show(self, uuid):
LOG.debug("service_show")
return self.kube_cli.service_show(uuid)
# Pod Operations
def pod_create(self, context, pod):
LOG.debug("pod_create")
# trigger a kubectl command
status = self.kube_cli.pod_create(pod)
if not status:
return None
# call the pod object to persist in db
pod.create(context)
return pod
def pod_update(self, context, pod):
LOG.debug("pod_update")
# trigger a kubectl command
status = self.kube_cli.pod_create(pod)
if not status:
return None
# call the pod object to persist in db
pod.create(context)
return pod
def pod_list(self, context):
LOG.debug("pod_list")
return self.kube_cli.pod_list()
def pod_delete(self, context, pod):
LOG.debug("pod_delete ")
# trigger a kubectl command
status = self.kube_cli.service_delete(pod.uuid)
if not status:
return None
# call the pod object to persist in db
pod.destroy(context)
def pod_get(self, context, uuid):
LOG.debug("pod_get")
return self.kube_cli.pod_get(uuid)
def pod_show(self, context, uuid):
LOG.debug("pod_show")
return self.kube_cli.pod_show(uuid)

View File

@ -18,7 +18,7 @@ from magnum.openstack.common import utils
LOG = logging.getLogger(__name__)
class Handler(object):
class KubeClient(object):
"""These are the backend operations. They are executed by the backend
service. API calls via AMQP (within the ReST API) trigger the
handlers to be called.
@ -28,18 +28,19 @@ class Handler(object):
"""
def __init__(self):
super(Handler, self).__init__()
super(KubeClient, self).__init__()
# TODO(pkilambi): Add server endpoint config
@staticmethod
def service_create(uuid, contents):
LOG.debug("service_create %s contents %s" % (uuid, contents))
def service_create(service):
LOG.debug("service_create with contents %s" % service)
try:
out, err = utils.trycmd('kubectl', 'create', '-f', contents)
out, err = utils.trycmd('kubectl', 'create', '-f', service)
if err:
return False
except Exception as e:
LOG.error("Couldn't create service with contents %s \
due to error %s" % (contents, e))
due to error %s" % (service, e))
return False
return True

View File

@ -153,46 +153,57 @@ class TestNodeController(db_base.DbTestCase):
class TestPodController(db_base.DbTestCase):
def mock_pod_create(self, pod):
pod.create()
return pod
def mock_pod_destroy(self, pod):
pod.destroy()
def test_pod_api(self):
# Create a pod
params = '{"name": "pod_example_A", "desc": "My Pod",' \
'"bay_uuid": "7ae81bb3-dec3-4289-8d6c-da80bd8001ae"}'
response = self.app.post('/v1/pods',
params=params,
content_type='application/json')
self.assertEqual(response.status_int, 201)
with patch.object(api.API, 'pod_create') as mock_method:
mock_method.side_effect = self.mock_pod_create
# Create a pod
params = '{"name": "pod_example_A", "desc": "My Pod",' \
'"bay_uuid": "7ae81bb3-dec3-4289-8d6c-da80bd8001ae"}'
response = self.app.post('/v1/pods',
params=params,
content_type='application/json')
self.assertEqual(response.status_int, 201)
# Get all pods
response = self.app.get('/v1/pods')
self.assertEqual(response.status_int, 200)
self.assertEqual(1, len(response.json))
c = response.json['pods'][0]
self.assertIsNotNone(c.get('uuid'))
self.assertEqual('pod_example_A', c.get('name'))
self.assertEqual('My Pod', c.get('desc'))
self.assertEqual('7ae81bb3-dec3-4289-8d6c-da80bd8001ae',
c.get('bay_uuid'))
# Get all pods
response = self.app.get('/v1/pods')
self.assertEqual(response.status_int, 200)
self.assertEqual(1, len(response.json))
c = response.json['pods'][0]
self.assertIsNotNone(c.get('uuid'))
self.assertEqual('pod_example_A', c.get('name'))
self.assertEqual('My Pod', c.get('desc'))
self.assertEqual('7ae81bb3-dec3-4289-8d6c-da80bd8001ae',
c.get('bay_uuid'))
# Get just the one we created
response = self.app.get('/v1/pods/%s' % c.get('uuid'))
self.assertEqual(response.status_int, 200)
# Get just the one we created
response = self.app.get('/v1/pods/%s' % c.get('uuid'))
self.assertEqual(response.status_int, 200)
# Update the description
params = [{'path': '/name',
'value': 'pod_example_B',
'op': 'replace'}]
response = self.app.patch_json('/v1/pods/%s' % c.get('uuid'),
params=params)
self.assertEqual(response.status_int, 200)
# Update the description
params = [{'path': '/name',
'value': 'pod_example_B',
'op': 'replace'}]
response = self.app.patch_json('/v1/pods/%s' % c.get('uuid'),
params=params)
self.assertEqual(response.status_int, 200)
# Delete the pod we created
response = self.app.delete('/v1/pods/%s' % c.get('uuid'))
self.assertEqual(response.status_int, 204)
with patch.object(api.API, 'pod_delete') as mock_method:
mock_method.side_effect = self.mock_pod_destroy
# Delete the pod we created
response = self.app.delete('/v1/pods/%s' % c.get('uuid'))
self.assertEqual(response.status_int, 204)
response = self.app.get('/v1/pods')
self.assertEqual(response.status_int, 200)
c = response.json['pods']
self.assertEqual(0, len(c))
response = self.app.get('/v1/pods')
self.assertEqual(response.status_int, 200)
c = response.json['pods']
self.assertEqual(0, len(c))
class TestContainerController(db_base.DbTestCase):
@ -243,41 +254,53 @@ class TestContainerController(db_base.DbTestCase):
class TestServiceController(db_base.DbTestCase):
def mock_service_create(self, service):
service.create()
return service
def mock_service_destroy(self, service):
service.destroy()
def test_service_api(self):
# Create a service
params = '{"name": "service_foo",'\
'"bay_uuid": "7ae81bb3-dec3-4289-8d6c-da80bd8001ae"}'
response = self.app.post('/v1/services',
params=params,
content_type='application/json')
self.assertEqual(response.status_int, 201)
# Get all services
response = self.app.get('/v1/services')
self.assertEqual(response.status_int, 200)
self.assertEqual(1, len(response.json))
c = response.json['services'][0]
self.assertIsNotNone(c.get('uuid'))
self.assertEqual('service_foo', c.get('name'))
self.assertEqual('7ae81bb3-dec3-4289-8d6c-da80bd8001ae',
c.get('bay_uuid'))
with patch.object(api.API, 'service_create') as mock_method:
mock_method.side_effect = self.mock_service_create
# Create a service
params = '{"name": "service_foo",'\
'"bay_uuid": "7ae81bb3-dec3-4289-8d6c-da80bd8001ae"}'
response = self.app.post('/v1/services',
params=params,
content_type='application/json')
self.assertEqual(response.status_int, 201)
# Get all services
response = self.app.get('/v1/services')
self.assertEqual(response.status_int, 200)
self.assertEqual(1, len(response.json))
c = response.json['services'][0]
self.assertIsNotNone(c.get('uuid'))
self.assertEqual('service_foo', c.get('name'))
self.assertEqual('7ae81bb3-dec3-4289-8d6c-da80bd8001ae',
c.get('bay_uuid'))
# Get just the one we created
response = self.app.get('/v1/services/%s' % c.get('uuid'))
self.assertEqual(response.status_int, 200)
# Get just the one we created
response = self.app.get('/v1/services/%s' % c.get('uuid'))
self.assertEqual(response.status_int, 200)
# Update the description
params = [{'path': '/name',
'value': 'service_bar',
'op': 'replace'}]
response = self.app.patch_json('/v1/services/%s' % c.get('uuid'),
params=params)
self.assertEqual(response.status_int, 200)
# Update the description
params = [{'path': '/name',
'value': 'service_bar',
'op': 'replace'}]
response = self.app.patch_json('/v1/services/%s' % c.get('uuid'),
params=params)
self.assertEqual(response.status_int, 200)
# Delete the service we created
response = self.app.delete('/v1/services/%s' % c.get('uuid'))
self.assertEqual(response.status_int, 204)
with patch.object(api.API, 'service_delete') as mock_method:
mock_method.side_effect = self.mock_service_destroy
# Delete the service we created
response = self.app.delete('/v1/services/%s' % c.get('uuid'))
self.assertEqual(response.status_int, 204)
response = self.app.get('/v1/services')
self.assertEqual(response.status_int, 200)
c = response.json['services']
self.assertEqual(0, len(c))
response = self.app.get('/v1/services')
self.assertEqual(response.status_int, 200)
c = response.json['services']
self.assertEqual(0, len(c))