Implement mesos cluster smart scale down
We currently allow Magnum to scale down mesos cluster by removing nodes from the cluster's ResourceGroup by updating the heat stack that created the cluster. The problem with this approach is that Heat decides which nodes to delete, and all containers on that node will also be deleted. The smart cluster scale down feature has been implemented for k8s bays( for k8s cluster, we'll ask Heat to delete nodes that have NO CONTAINERS on them). This patch proposes a similar implementation for a mesos cluster. Change-Id: I00cda7f35c9db978bdc604cf86603ef58e339256 Implements: blueprint mesos-smart-bay-scale-down
This commit is contained in:
parent
5dae4c54cb
commit
4a7d265aeb
@ -2057,9 +2057,19 @@ Swarm
|
||||
consideration of what containers are running on the selected node.
|
||||
|
||||
Mesos
|
||||
No node selection heuristic is currently supported. If you decrease
|
||||
the node_count, a node will be chosen by magnum without
|
||||
consideration of what containers are running on the selected node.
|
||||
Magnum scans the running tasks on Marathon server to determine the
|
||||
nodes on which there is *no* task running (empty nodes). If the
|
||||
number of nodes to be removed is equal or less than the number of
|
||||
these empty nodes, these nodes will be removed from the cluster.
|
||||
If the number of nodes to be removed is larger than the number of
|
||||
empty nodes, a warning message will be sent to the Magnum log and
|
||||
the empty nodes along with additional nodes will be removed from the
|
||||
cluster. The additional nodes are selected randomly and the containers
|
||||
running on them will be deleted without warning. Note that even when
|
||||
only the empty nodes are removed, there is no guarantee that no
|
||||
container will be deleted because there is no locking to ensure that
|
||||
Mesos will not launch new containers on these nodes after Magnum
|
||||
has scanned the tasks.
|
||||
|
||||
|
||||
Currently, scaling containers and scaling cluster nodes are handled
|
||||
|
@ -184,7 +184,7 @@ class Handler(object):
|
||||
if not delta:
|
||||
return cluster
|
||||
|
||||
manager = scale_manager.ScaleManager(context, osc, cluster)
|
||||
manager = scale_manager.get_scale_manager(context, osc, cluster)
|
||||
|
||||
conductor_utils.notify_about_cluster_operation(
|
||||
context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_PENDING)
|
||||
|
@ -12,6 +12,8 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
from marathon import MarathonClient
|
||||
from oslo_log import log as logging
|
||||
|
||||
from magnum.common import exception
|
||||
@ -25,6 +27,21 @@ from magnum import objects
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_scale_manager(context, osclient, cluster):
|
||||
manager = None
|
||||
coe = cluster.baymodel.coe
|
||||
if coe == 'kubernetes':
|
||||
manager = K8sScaleManager(context, osclient, cluster)
|
||||
elif coe == 'mesos':
|
||||
manager = MesosScaleManager(context, osclient, cluster)
|
||||
else:
|
||||
LOG.warning(_LW(
|
||||
"Currently only kubernetes and mesos cluster scale manager "
|
||||
"are available"))
|
||||
|
||||
return manager
|
||||
|
||||
|
||||
class ScaleManager(object):
|
||||
|
||||
def __init__(self, context, osclient, cluster):
|
||||
@ -46,13 +63,9 @@ class ScaleManager(object):
|
||||
"%(stack_id)s") % {'output_key': hosts_output.heat_output,
|
||||
'stack_id': stack.id})
|
||||
|
||||
hosts_no_container = list(hosts)
|
||||
k8s_api = k8s.create_k8s_api(self.context, cluster)
|
||||
for pod in k8s_api.list_namespaced_pod(namespace='default').items:
|
||||
host = pod.spec.node_name
|
||||
if host in hosts_no_container:
|
||||
hosts_no_container.remove(host)
|
||||
|
||||
hosts_with_container = self._get_hosts_with_container(self.context,
|
||||
cluster)
|
||||
hosts_no_container = list(set(hosts) - hosts_with_container)
|
||||
LOG.debug('List of hosts that has no container: %s',
|
||||
str(hosts_no_container))
|
||||
|
||||
@ -76,3 +89,46 @@ class ScaleManager(object):
|
||||
|
||||
def _get_num_of_removal(self):
|
||||
return self.old_cluster.node_count - self.new_cluster.node_count
|
||||
|
||||
@abc.abstractmethod
|
||||
def _get_hosts_with_container(self, context, cluster):
|
||||
"""Return the hosts with container running on them."""
|
||||
pass
|
||||
|
||||
|
||||
class K8sScaleManager(ScaleManager):
|
||||
|
||||
def __init__(self, context, osclient, cluster):
|
||||
super(K8sScaleManager, self).__init__(context, osclient, cluster)
|
||||
|
||||
def _get_hosts_with_container(self, context, cluster):
|
||||
k8s_api = k8s.create_k8s_api(self.context, cluster)
|
||||
hosts = set()
|
||||
for pod in k8s_api.list_namespaced_pod(namespace='default').items:
|
||||
hosts.add(pod.spec.node_name)
|
||||
|
||||
return hosts
|
||||
|
||||
|
||||
class MesosScaleManager(ScaleManager):
|
||||
"""When scaling a mesos cluster, MesosScaleManager will inspect the
|
||||
|
||||
nodes and find out those with containers on them. Thus we can
|
||||
ask Heat to delete the nodes without containers. Note that this
|
||||
is a best effort basis -- Magnum doesn't have any synchronization
|
||||
with Marathon, so while Magnum is checking for the containers to
|
||||
choose nodes to remove, new containers can be deployed on the
|
||||
nodes to be removed.
|
||||
"""
|
||||
|
||||
def __init__(self, context, osclient, cluster):
|
||||
super(MesosScaleManager, self).__init__(context, osclient, cluster)
|
||||
|
||||
def _get_hosts_with_container(self, context, cluster):
|
||||
marathon_client = MarathonClient(
|
||||
'http://' + cluster.api_address + ':8080')
|
||||
hosts = set()
|
||||
for task in marathon_client.list_tasks():
|
||||
hosts.add(task.host)
|
||||
|
||||
return hosts
|
||||
|
@ -48,7 +48,7 @@ class TestHandler(db_base.DbTestCase):
|
||||
self.cluster = objects.Cluster(self.context, **cluster_dict)
|
||||
self.cluster.create()
|
||||
|
||||
@patch('magnum.conductor.scale_manager.ScaleManager')
|
||||
@patch('magnum.conductor.scale_manager.get_scale_manager')
|
||||
@patch(
|
||||
'magnum.conductor.handlers.cluster_conductor.Handler._poll_and_check')
|
||||
@patch('magnum.conductor.handlers.cluster_conductor._update_stack')
|
||||
@ -116,7 +116,7 @@ class TestHandler(db_base.DbTestCase):
|
||||
cluster = objects.Cluster.get(self.context, self.cluster.uuid)
|
||||
self.assertEqual(1, cluster.node_count)
|
||||
|
||||
@patch('magnum.conductor.scale_manager.ScaleManager')
|
||||
@patch('magnum.conductor.scale_manager.get_scale_manager')
|
||||
@patch(
|
||||
'magnum.conductor.handlers.cluster_conductor.Handler._poll_and_check')
|
||||
@patch('magnum.conductor.handlers.cluster_conductor._update_stack')
|
||||
|
@ -21,23 +21,39 @@ from magnum.tests import base
|
||||
|
||||
class TestScaleManager(base.TestCase):
|
||||
|
||||
@mock.patch('magnum.objects.Cluster.get_by_uuid')
|
||||
def test_get_scale_manager(self, mock_cluster_get):
|
||||
mock_context = mock.MagicMock()
|
||||
mock_osc = mock.MagicMock()
|
||||
k8s_cluster = mock.MagicMock()
|
||||
k8s_cluster.baymodel.coe = 'kubernetes'
|
||||
mesos_cluster = mock.MagicMock()
|
||||
mesos_cluster.baymodel.coe = 'mesos'
|
||||
invalid_cluster = mock.MagicMock()
|
||||
invalid_cluster.baymodel.coe = 'fake'
|
||||
|
||||
mgr = scale_manager.get_scale_manager(
|
||||
mock_context, mock_osc, k8s_cluster)
|
||||
self.assertIsInstance(mgr, scale_manager.K8sScaleManager)
|
||||
|
||||
mgr = scale_manager.get_scale_manager(
|
||||
mock_context, mock_osc, mesos_cluster)
|
||||
self.assertIsInstance(mgr, scale_manager.MesosScaleManager)
|
||||
|
||||
mgr = scale_manager.get_scale_manager(
|
||||
mock_context, mock_osc, invalid_cluster)
|
||||
self.assertIsNone(mgr)
|
||||
|
||||
def _test_get_removal_nodes(
|
||||
self, mock_create_k8s_api, mock_get_num_of_removal,
|
||||
self, mock_get_hosts, mock_get_num_of_removal,
|
||||
mock_is_scale_down, mock_get_by_uuid, is_scale_down,
|
||||
num_of_removal, all_hosts, pod_hosts, expected_removal_hosts):
|
||||
num_of_removal, all_hosts, container_hosts,
|
||||
expected_removal_hosts):
|
||||
|
||||
mock_is_scale_down.return_value = is_scale_down
|
||||
mock_get_num_of_removal.return_value = num_of_removal
|
||||
|
||||
pods = list()
|
||||
for h in pod_hosts:
|
||||
pod = mock.MagicMock()
|
||||
pod.spec.node_name = h
|
||||
pods.append(pod)
|
||||
|
||||
mock_k8s_api = mock.MagicMock()
|
||||
mock_k8s_api.list_namespaced_pod.return_value.items = pods
|
||||
mock_create_k8s_api.return_value = mock_k8s_api
|
||||
mock_get_hosts.return_value = container_hosts
|
||||
|
||||
mock_heat_output = mock.MagicMock()
|
||||
mock_heat_output.get_output_value.return_value = all_hosts
|
||||
@ -60,119 +76,168 @@ class TestScaleManager(base.TestCase):
|
||||
removal_hosts = scale_mgr.get_removal_nodes(mock_heat_output)
|
||||
self.assertEqual(expected_removal_hosts, removal_hosts)
|
||||
if num_of_removal > 0:
|
||||
mock_create_k8s_api.assert_called_once_with(mock_context,
|
||||
mock_cluster)
|
||||
mock_get_hosts.assert_called_once_with(mock_context,
|
||||
mock_cluster)
|
||||
|
||||
@mock.patch('magnum.objects.Cluster.get_by_uuid')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager._is_scale_down')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_num_of_removal')
|
||||
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
|
||||
def test_get_removal_nodes_no_pod(
|
||||
self, mock_create_k8s_api, mock_get_num_of_removal,
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_hosts_with_container')
|
||||
def test_get_removal_nodes_no_container_host(
|
||||
self, mock_get_hosts, mock_get_num_of_removal,
|
||||
mock_is_scale_down, mock_get_by_uuid):
|
||||
|
||||
is_scale_down = True
|
||||
num_of_removal = 1
|
||||
hosts = ['10.0.0.3', '10.0.0.4']
|
||||
pods = []
|
||||
all_hosts = ['10.0.0.3']
|
||||
container_hosts = set()
|
||||
expected_removal_hosts = ['10.0.0.3']
|
||||
self._test_get_removal_nodes(
|
||||
mock_create_k8s_api, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, hosts, pods,
|
||||
expected_removal_hosts)
|
||||
mock_get_hosts, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, all_hosts,
|
||||
container_hosts, expected_removal_hosts)
|
||||
|
||||
@mock.patch('magnum.objects.Cluster.get_by_uuid')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager._is_scale_down')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_num_of_removal')
|
||||
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
|
||||
def test_get_removal_nodes_one_pod(
|
||||
self, mock_create_k8s_api, mock_get_num_of_removal,
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_hosts_with_container')
|
||||
def test_get_removal_nodes_one_container_host(
|
||||
self, mock_get_hosts, mock_get_num_of_removal,
|
||||
mock_is_scale_down, mock_get_by_uuid):
|
||||
|
||||
is_scale_down = True
|
||||
num_of_removal = 1
|
||||
hosts = ['10.0.0.3', '10.0.0.4']
|
||||
pods = ['10.0.0.3']
|
||||
all_hosts = ['10.0.0.3', '10.0.0.4']
|
||||
container_hosts = set(['10.0.0.3'])
|
||||
expected_removal_hosts = ['10.0.0.4']
|
||||
self._test_get_removal_nodes(
|
||||
mock_create_k8s_api, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, hosts, pods,
|
||||
expected_removal_hosts)
|
||||
mock_get_hosts, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, all_hosts,
|
||||
container_hosts, expected_removal_hosts)
|
||||
|
||||
@mock.patch('magnum.objects.Cluster.get_by_uuid')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager._is_scale_down')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_num_of_removal')
|
||||
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
|
||||
def test_get_removal_nodes_two_pods(
|
||||
self, mock_create_k8s_api, mock_get_num_of_removal,
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_hosts_with_container')
|
||||
def test_get_removal_nodes_two_container_hosts(
|
||||
self, mock_get_hosts, mock_get_num_of_removal,
|
||||
mock_is_scale_down, mock_get_by_uuid):
|
||||
|
||||
is_scale_down = True
|
||||
num_of_removal = 1
|
||||
hosts = ['10.0.0.3', '10.0.0.4']
|
||||
pods = ['10.0.0.3', '10.0.0.4']
|
||||
all_hosts = ['10.0.0.3', '10.0.0.4']
|
||||
container_hosts = set(['10.0.0.3', '10.0.0.4'])
|
||||
expected_removal_hosts = []
|
||||
self._test_get_removal_nodes(
|
||||
mock_create_k8s_api, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, hosts, pods,
|
||||
expected_removal_hosts)
|
||||
mock_get_hosts, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, all_hosts,
|
||||
container_hosts, expected_removal_hosts)
|
||||
|
||||
@mock.patch('magnum.objects.Cluster.get_by_uuid')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager._is_scale_down')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_num_of_removal')
|
||||
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
|
||||
def test_get_removal_nodes_three_pods(
|
||||
self, mock_create_k8s_api, mock_get_num_of_removal,
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_hosts_with_container')
|
||||
def test_get_removal_nodes_three_container_hosts(
|
||||
self, mock_get_hosts, mock_get_num_of_removal,
|
||||
mock_is_scale_down, mock_get_by_uuid):
|
||||
|
||||
is_scale_down = True
|
||||
num_of_removal = 1
|
||||
hosts = ['10.0.0.3', '10.0.0.4']
|
||||
pods = ['10.0.0.3', '10.0.0.4', '10.0.0.5']
|
||||
all_hosts = ['10.0.0.3', '10.0.0.4']
|
||||
container_hosts = set(['10.0.0.3', '10.0.0.4', '10.0.0.5'])
|
||||
expected_removal_hosts = []
|
||||
self._test_get_removal_nodes(
|
||||
mock_create_k8s_api, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, hosts, pods,
|
||||
expected_removal_hosts)
|
||||
mock_get_hosts, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, all_hosts,
|
||||
container_hosts, expected_removal_hosts)
|
||||
|
||||
@mock.patch('magnum.objects.Cluster.get_by_uuid')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager._is_scale_down')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_num_of_removal')
|
||||
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_hosts_with_container')
|
||||
def test_get_removal_nodes_scale_up(
|
||||
self, mock_create_k8s_api, mock_get_num_of_removal,
|
||||
self, mock_get_hosts, mock_get_num_of_removal,
|
||||
mock_is_scale_down, mock_get_by_uuid):
|
||||
|
||||
is_scale_down = False
|
||||
num_of_removal = -1
|
||||
hosts = ['10.0.0.3', '10.0.0.4']
|
||||
pods = []
|
||||
all_hosts = ['10.0.0.3', '10.0.0.4']
|
||||
container_hosts = set()
|
||||
expected_removal_hosts = []
|
||||
self._test_get_removal_nodes(
|
||||
mock_create_k8s_api, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, hosts, pods,
|
||||
expected_removal_hosts)
|
||||
mock_get_hosts, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, all_hosts,
|
||||
container_hosts, expected_removal_hosts)
|
||||
|
||||
@mock.patch('magnum.objects.Cluster.get_by_uuid')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager._is_scale_down')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_num_of_removal')
|
||||
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_hosts_with_container')
|
||||
def test_get_removal_nodes_with_none_hosts(
|
||||
self, mock_create_k8s_api, mock_get_num_of_removal,
|
||||
self, mock_get_hosts, mock_get_num_of_removal,
|
||||
mock_is_scale_down, mock_get_by_uuid):
|
||||
|
||||
is_scale_down = True
|
||||
num_of_removal = 1
|
||||
hosts = None
|
||||
pods = []
|
||||
all_hosts = None
|
||||
container_hosts = set()
|
||||
expected_removal_hosts = None
|
||||
self._test_get_removal_nodes(
|
||||
mock_create_k8s_api, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, hosts, pods,
|
||||
expected_removal_hosts)
|
||||
mock_get_hosts, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, all_hosts,
|
||||
container_hosts, expected_removal_hosts)
|
||||
|
||||
|
||||
class TestK8sScaleManager(base.TestCase):
|
||||
|
||||
@mock.patch('magnum.objects.Cluster.get_by_uuid')
|
||||
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
|
||||
def test_get_hosts_with_container(self, mock_create_api, mock_get):
|
||||
pods = mock.MagicMock()
|
||||
pod_1 = mock.MagicMock()
|
||||
pod_1.spec.node_name = 'node1'
|
||||
pod_2 = mock.MagicMock()
|
||||
pod_2.spec.node_name = 'node2'
|
||||
pods.items = [pod_1, pod_2]
|
||||
mock_api = mock.MagicMock()
|
||||
mock_api.list_namespaced_pod.return_value = pods
|
||||
mock_create_api.return_value = mock_api
|
||||
|
||||
mgr = scale_manager.K8sScaleManager(
|
||||
mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
|
||||
hosts = mgr._get_hosts_with_container(
|
||||
mock.MagicMock(), mock.MagicMock())
|
||||
self.assertEqual(hosts, {'node1', 'node2'})
|
||||
|
||||
|
||||
class TestMesosScaleManager(base.TestCase):
|
||||
|
||||
@mock.patch('magnum.objects.Cluster.get_by_uuid')
|
||||
@mock.patch('marathon.MarathonClient')
|
||||
@mock.patch('marathon.MarathonClient.list_tasks')
|
||||
def test_get_hosts_with_container(self, mock_list_tasks,
|
||||
mock_client, mock_get):
|
||||
task_1 = mock.MagicMock()
|
||||
task_1.host = 'node1'
|
||||
task_2 = mock.MagicMock()
|
||||
task_2.host = 'node2'
|
||||
tasks = [task_1, task_2]
|
||||
mock_list_tasks.return_value = tasks
|
||||
|
||||
mgr = scale_manager.MesosScaleManager(
|
||||
mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
|
||||
hosts = mgr._get_hosts_with_container(
|
||||
mock.MagicMock(), mock.MagicMock())
|
||||
self.assertEqual(hosts, {'node1', 'node2'})
|
||||
|
@ -22,6 +22,7 @@ iso8601>=0.1.11 # MIT
|
||||
jsonpatch>=1.1 # BSD
|
||||
keystoneauth1>=2.10.0 # Apache-2.0
|
||||
keystonemiddleware!=4.5.0,>=4.2.0 # Apache-2.0
|
||||
marathon>=0.8.6 # MIT
|
||||
netaddr!=0.7.16,>=0.7.13 # BSD
|
||||
oslo.concurrency>=3.8.0 # Apache-2.0
|
||||
oslo.config>=3.14.0 # Apache-2.0
|
||||
|
Loading…
x
Reference in New Issue
Block a user