Merge "Support <ClusterID>/actions/resize API"

This commit is contained in:
Zuul 2019-03-19 22:16:15 +00:00 committed by Gerrit Code Review
commit 0cd35dbcca
14 changed files with 348 additions and 4 deletions

View File

@ -25,6 +25,7 @@ from wsme import types as wtypes
from magnum.api import attr_validator from magnum.api import attr_validator
from magnum.api.controllers import base from magnum.api.controllers import base
from magnum.api.controllers import link from magnum.api.controllers import link
from magnum.api.controllers.v1 import cluster_actions
from magnum.api.controllers.v1 import collection from magnum.api.controllers.v1 import collection
from magnum.api.controllers.v1 import types from magnum.api.controllers.v1 import types
from magnum.api import expose from magnum.api import expose
@ -281,6 +282,8 @@ class ClustersController(base.Controller):
'detail': ['GET'], 'detail': ['GET'],
} }
actions = cluster_actions.ActionsController()
def _generate_name_for_cluster(self, context): def _generate_name_for_cluster(self, context):
"""Generate a random name like: zeta-22-cluster.""" """Generate a random name like: zeta-22-cluster."""
name_gen = name_generator.NameGenerator() name_gen = name_generator.NameGenerator()

View File

@ -0,0 +1,90 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pecan
import wsme
from wsme import types as wtypes
from magnum.api.controllers import base
from magnum.api.controllers.v1 import types
from magnum.api import expose
from magnum.api import utils as api_utils
from magnum.common import policy
class ClusterID(wtypes.Base):
"""API representation of a cluster ID
This class enforces type checking and value constraints, and converts
between the internal object model and the API representation of a cluster
ID.
"""
uuid = types.uuid
"""Unique UUID for this cluster"""
def __init__(self, uuid):
self.uuid = uuid
class ClusterResizeRequest(base.APIBase):
"""API object for handling resize requests.
This class enforces type checking and value constraints.
"""
node_count = wtypes.IntegerType(minimum=1)
"""The expected node count after resize."""
nodes_to_remove = wsme.wsattr([wsme.types.text], mandatory=False,
default=[])
"""Instance ID list for nodes to be removed."""
nodegroup = wtypes.StringType(min_length=1, max_length=255)
"""Group of nodes to be uprgaded (master or node)"""
class ActionsController(base.Controller):
"""REST controller for cluster actions."""
def __init__(self):
super(ActionsController, self).__init__()
_custom_actions = {
'resize': ['POST'],
}
@base.Controller.api_version("1.7")
@expose.expose(None, types.uuid_or_name,
body=ClusterResizeRequest, status_code=202)
def resize(self, cluster_ident, cluster_resize_req):
"""Resize a cluster.
:param cluster_ident: UUID of a cluster or logical name of the cluster.
"""
context = pecan.request.context
cluster = api_utils.get_resource('Cluster', cluster_ident)
policy.enforce(context, 'cluster:resize', cluster,
action='cluster:resize')
if (cluster_resize_req.nodegroup == wtypes.Unset or
not cluster_resize_req.nodegroup):
# TODO(flwang): The default node group of current cluster could be
# extracted by objects.NodeGroups.get_by_uuid or something like
# that as long as we have node group support.
cluster_resize_req.nodegroup = None
pecan.request.rpcapi.cluster_resize_async(
cluster,
cluster_resize_req.node_count,
cluster_resize_req.nodes_to_remove,
cluster_resize_req.nodegroup)
return ClusterID(cluster.uuid)

View File

@ -39,10 +39,11 @@ REST_API_VERSION_HISTORY = """REST API Version History:
* 1.4 - Add stats API * 1.4 - Add stats API
* 1.5 - Add cluster CA certificate rotation support * 1.5 - Add cluster CA certificate rotation support
* 1.6 - Add quotas API * 1.6 - Add quotas API
* 1.7 - Add resize API
""" """
BASE_VER = '1.1' BASE_VER = '1.1'
CURRENT_MAX_VER = '1.6' CURRENT_MAX_VER = '1.7'
class Version(object): class Version(object):

View File

@ -128,6 +128,17 @@ rules = [
'method': 'PATCH' 'method': 'PATCH'
} }
] ]
),
policy.DocumentedRuleDefault(
name=CLUSTER % 'resize',
check_str=base.RULE_DENY_CLUSTER_USER,
description='Resize an existing cluster.',
operations=[
{
'path': '/v1/clusters/{cluster_ident}/actions/resize',
'method': 'POST'
}
]
) )
] ]

View File

@ -51,6 +51,23 @@ class API(rpc_service.API):
def cluster_update_async(self, cluster, rollback=False): def cluster_update_async(self, cluster, rollback=False):
self._cast('cluster_update', cluster=cluster, rollback=rollback) self._cast('cluster_update', cluster=cluster, rollback=rollback)
def cluster_resize(self, cluster, node_count, nodes_to_remove,
nodegroup=None, rollback=False):
return self._call('cluster_resize',
cluster=cluster,
node_count=node_count,
nodes_to_remove=nodes_to_remove,
nodegroup=nodegroup)
def cluster_resize_async(self, cluster, node_count, nodes_to_remove,
nodegroup=None, rollback=False):
return self._cast('cluster_resize',
cluster=cluster,
node_count=node_count,
nodes_to_remove=nodes_to_remove,
nodegroup=nodegroup)
# Federation Operations # Federation Operations
def federation_create(self, federation, create_timeout): def federation_create(self, federation, create_timeout):

View File

@ -177,3 +177,64 @@ class Handler(object):
cluster.save() cluster.save()
return None return None
def cluster_resize(self, context, cluster,
node_count, nodes_to_remove, nodegroup=None):
LOG.debug('cluster_conductor cluster_resize')
osc = clients.OpenStackClients(context)
# NOTE(flwang): One of important user cases of /resize API is
# supporting the auto scaling action triggered by Kubernetes Cluster
# Autoscaler, so there are 2 cases may happen:
# 1. API could be triggered very offen
# 2. Scale up or down may fail and we would like to offer the ability
# that recover the cluster to allow it being resized when last
# update failed.
allow_update_status = (
fields.ClusterStatus.CREATE_COMPLETE,
fields.ClusterStatus.UPDATE_COMPLETE,
fields.ClusterStatus.RESUME_COMPLETE,
fields.ClusterStatus.RESTORE_COMPLETE,
fields.ClusterStatus.ROLLBACK_COMPLETE,
fields.ClusterStatus.SNAPSHOT_COMPLETE,
fields.ClusterStatus.CHECK_COMPLETE,
fields.ClusterStatus.ADOPT_COMPLETE,
fields.ClusterStatus.UPDATE_FAILED,
fields.ClusterStatus.UPDATE_IN_PROGRESS,
)
if cluster.status not in allow_update_status:
conductor_utils.notify_about_cluster_operation(
context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_FAILURE)
operation = _('Resizing a cluster when status is '
'"%s"') % cluster.status
raise exception.NotSupported(operation=operation)
resize_manager = scale_manager.get_scale_manager(context, osc, cluster)
# Get driver
ct = conductor_utils.retrieve_cluster_template(context, cluster)
cluster_driver = driver.Driver.get_driver(ct.server_type,
ct.cluster_distro,
ct.coe)
# Resize cluster
try:
conductor_utils.notify_about_cluster_operation(
context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_PENDING)
cluster_driver.resize_cluster(context, cluster, resize_manager,
node_count, nodes_to_remove,
nodegroup)
cluster.status = fields.ClusterStatus.UPDATE_IN_PROGRESS
cluster.status_reason = None
except Exception as e:
cluster.status = fields.ClusterStatus.UPDATE_FAILED
cluster.status_reason = six.text_type(e)
cluster.save()
conductor_utils.notify_about_cluster_operation(
context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_FAILURE)
if isinstance(e, exc.HTTPBadRequest):
e = exception.InvalidParameterValue(message=six.text_type(e))
raise e
raise
cluster.save()
return cluster

View File

@ -181,6 +181,12 @@ class Driver(object):
raise NotImplementedError("Subclasses must implement " raise NotImplementedError("Subclasses must implement "
"'delete_cluster'.") "'delete_cluster'.")
@abc.abstractmethod
def resize_cluster(self, context, cluster, resize_manager,
node_count, nodes_to_remove, nodegroup=None):
raise NotImplementedError("Subclasses must implement "
"'resize_cluster'.")
@abc.abstractmethod @abc.abstractmethod
def create_federation(self, context, federation): def create_federation(self, context, federation):
raise NotImplementedError("Subclasses must implement " raise NotImplementedError("Subclasses must implement "

View File

@ -111,6 +111,13 @@ class HeatDriver(driver.Driver):
LOG.info("Starting to delete cluster %s", cluster.uuid) LOG.info("Starting to delete cluster %s", cluster.uuid)
self._delete_stack(context, clients.OpenStackClients(context), cluster) self._delete_stack(context, clients.OpenStackClients(context), cluster)
def resize_cluster(self, context, cluster, resize_manager,
node_count, nodes_to_remove, nodegroup=None,
rollback=False):
self._resize_stack(context, cluster, resize_manager,
node_count, nodes_to_remove, nodegroup=nodegroup,
rollback=rollback)
def _create_stack(self, context, osc, cluster, cluster_create_timeout): def _create_stack(self, context, osc, cluster, cluster_create_timeout):
template_path, heat_params, env_files = ( template_path, heat_params, env_files = (
self._extract_template_definition(context, cluster)) self._extract_template_definition(context, cluster))
@ -176,6 +183,28 @@ class HeatDriver(driver.Driver):
osc = clients.OpenStackClients(context) osc = clients.OpenStackClients(context)
osc.heat().stacks.update(cluster.stack_id, **fields) osc.heat().stacks.update(cluster.stack_id, **fields)
def _resize_stack(self, context, cluster, resize_manager,
node_count, nodes_to_remove, nodegroup=None,
rollback=False):
definition = self.get_template_definition()
heat_params = {}
stack_nc_param = definition.get_heat_param(cluster_attr='node_count')
heat_params[stack_nc_param] = node_count or cluster.node_count
scale_params = definition.get_scale_params(context,
cluster,
resize_manager,
nodes_to_remove)
heat_params.update(scale_params)
fields = {
'parameters': heat_params,
'existing': True,
'disable_rollback': not rollback
}
osc = clients.OpenStackClients(context)
osc.heat().stacks.update(cluster.stack_id, **fields)
def _delete_stack(self, context, osc, cluster): def _delete_stack(self, context, osc, cluster):
osc.heat().stacks.delete(cluster.stack_id) osc.heat().stacks.delete(cluster.stack_id)

View File

@ -163,8 +163,11 @@ class K8sTemplateDefinition(template_def.BaseTemplateDefinition):
extra_params=extra_params, extra_params=extra_params,
**kwargs) **kwargs)
def get_scale_params(self, context, cluster, scale_manager=None): def get_scale_params(self, context, cluster, scale_manager=None,
nodes_to_remove=None):
scale_params = dict() scale_params = dict()
if nodes_to_remove:
scale_params['minions_to_remove'] = nodes_to_remove
if scale_manager: if scale_manager:
hosts = self.get_output('kube_minions_private') hosts = self.get_output('kube_minions_private')
scale_params['minions_to_remove'] = ( scale_params['minions_to_remove'] = (

View File

@ -80,8 +80,11 @@ class UbuntuMesosTemplateDefinition(template_def.BaseTemplateDefinition):
extra_params=extra_params, extra_params=extra_params,
**kwargs) **kwargs)
def get_scale_params(self, context, cluster, scale_manager=None): def get_scale_params(self, context, cluster, scale_manager=None,
nodes_to_remove=None):
scale_params = dict() scale_params = dict()
if nodes_to_remove:
scale_params['slaves_to_remove'] = nodes_to_remove
if scale_manager: if scale_manager:
hosts = self.get_output('mesos_slaves_private') hosts = self.get_output('mesos_slaves_private')
scale_params['slaves_to_remove'] = ( scale_params['slaves_to_remove'] = (

View File

@ -40,7 +40,7 @@ class TestRootController(api_base.FunctionalTest):
[{u'href': u'http://localhost/v1/', [{u'href': u'http://localhost/v1/',
u'rel': u'self'}], u'rel': u'self'}],
u'status': u'CURRENT', u'status': u'CURRENT',
u'max_version': u'1.6', u'max_version': u'1.7',
u'min_version': u'1.1'}]} u'min_version': u'1.1'}]}
self.v1_expected = { self.v1_expected = {

View File

@ -0,0 +1,53 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from magnum.conductor import api as rpcapi
import magnum.conf
from magnum.tests.unit.api import base as api_base
from magnum.tests.unit.objects import utils as obj_utils
CONF = magnum.conf.CONF
class TestClusterActions(api_base.FunctionalTest):
def setUp(self):
super(TestClusterActions, self).setUp()
self.cluster_obj = obj_utils.create_test_cluster(
self.context, name='cluster_example_A', node_count=3)
p = mock.patch.object(rpcapi.API, 'cluster_resize_async')
self.mock_cluster_resize = p.start()
self.mock_cluster_resize.side_effect = self._sim_rpc_cluster_resize
self.addCleanup(p.stop)
def _sim_rpc_cluster_resize(self, cluster, node_count, nodes_to_remove,
nodegroup=None, rollback=False):
cluster.node_count = node_count
cluster.save()
return cluster
def test_resize(self):
new_node_count = 6
response = self.post_json('/clusters/%s/actions/resize' %
self.cluster_obj.uuid,
{"node_count": new_node_count},
headers={"Openstack-Api-Version":
"container-infra 1.7"})
self.assertEqual(202, response.status_code)
response = self.get_json('/clusters/%s' % self.cluster_obj.uuid)
self.assertEqual(new_node_count, response['node_count'])
self.assertEqual(self.cluster_obj.uuid, response['uuid'])
self.assertEqual(self.cluster_obj.cluster_template_id,
response['cluster_template_id'])

View File

@ -533,3 +533,59 @@ class TestHandler(db_base.DbTestCase):
notifications = fake_notifier.NOTIFICATIONS notifications = fake_notifier.NOTIFICATIONS
self.assertEqual(1, len(notifications)) self.assertEqual(1, len(notifications))
self.assertEqual(1, mock_delete_lb.call_count) self.assertEqual(1, mock_delete_lb.call_count)
@patch('magnum.conductor.scale_manager.get_scale_manager')
@patch('magnum.drivers.common.driver.Driver.get_driver')
@patch('magnum.common.clients.OpenStackClients')
def test_cluster_resize_success(
self, mock_openstack_client_class,
mock_driver,
mock_scale_manager):
mock_heat_stack = mock.MagicMock()
mock_heat_stack.stack_status = cluster_status.CREATE_COMPLETE
mock_heat_client = mock.MagicMock()
mock_heat_client.stacks.get.return_value = mock_heat_stack
mock_openstack_client = mock_openstack_client_class.return_value
mock_openstack_client.heat.return_value = mock_heat_client
mock_dr = mock.MagicMock()
mock_driver.return_value = mock_dr
self.cluster.status = cluster_status.CREATE_COMPLETE
self.handler.cluster_resize(self.context, self.cluster, 3, ["ID1"])
notifications = fake_notifier.NOTIFICATIONS
self.assertEqual(1, len(notifications))
self.assertEqual(
'magnum.cluster.update', notifications[0].event_type)
self.assertEqual(
taxonomy.OUTCOME_PENDING, notifications[0].payload['outcome'])
mock_dr.resize_cluster.assert_called_once_with(
self.context, self.cluster, mock_scale_manager.return_value, 3,
["ID1"], None)
@patch('magnum.common.clients.OpenStackClients')
def test_cluster_resize_failure(
self, mock_openstack_client_class):
mock_heat_stack = mock.MagicMock()
mock_heat_stack.stack_status = cluster_status.CREATE_FAILED
mock_heat_client = mock.MagicMock()
mock_heat_client.stacks.get.return_value = mock_heat_stack
mock_openstack_client = mock_openstack_client_class.return_value
mock_openstack_client.heat.return_value = mock_heat_client
self.cluster.status = cluster_status.CREATE_FAILED
self.assertRaises(exception.NotSupported, self.handler.cluster_resize,
self.context, self.cluster, 2, [])
notifications = fake_notifier.NOTIFICATIONS
self.assertEqual(1, len(notifications))
self.assertEqual(
'magnum.cluster.update', notifications[0].event_type)
self.assertEqual(
taxonomy.OUTCOME_FAILURE, notifications[0].payload['outcome'])
cluster = objects.Cluster.get(self.context, self.cluster.uuid)
self.assertEqual(1, cluster.node_count)

View File

@ -0,0 +1,11 @@
---
features:
- |
Now an OpenStack driver for Kubernetes Cluster Autoscaler is being
proposed to support autoscaling when running k8s cluster on top of
OpenStack. However, currently there is no way in Magnum to let
the external consumer to control which node will be removed. The
alternative option is calling Heat API directly but obviously it
is not the best solution and it's confusing k8s community. So this
new API is being added into Magnum: POST <ClusterID>/actions/resize