Merge "Implement mesos cluster smart scale down"
This commit is contained in:
commit
7ad74bb4af
@ -2068,9 +2068,19 @@ Swarm
|
||||
consideration of what containers are running on the selected node.
|
||||
|
||||
Mesos
|
||||
No node selection heuristic is currently supported. If you decrease
|
||||
the node_count, a node will be chosen by magnum without
|
||||
consideration of what containers are running on the selected node.
|
||||
Magnum scans the running tasks on Marathon server to determine the
|
||||
nodes on which there is *no* task running (empty nodes). If the
|
||||
number of nodes to be removed is equal or less than the number of
|
||||
these empty nodes, these nodes will be removed from the cluster.
|
||||
If the number of nodes to be removed is larger than the number of
|
||||
empty nodes, a warning message will be sent to the Magnum log and
|
||||
the empty nodes along with additional nodes will be removed from the
|
||||
cluster. The additional nodes are selected randomly and the containers
|
||||
running on them will be deleted without warning. Note that even when
|
||||
only the empty nodes are removed, there is no guarantee that no
|
||||
container will be deleted because there is no locking to ensure that
|
||||
Mesos will not launch new containers on these nodes after Magnum
|
||||
has scanned the tasks.
|
||||
|
||||
|
||||
Currently, scaling containers and scaling cluster nodes are handled
|
||||
|
@ -184,7 +184,7 @@ class Handler(object):
|
||||
if not delta:
|
||||
return cluster
|
||||
|
||||
manager = scale_manager.ScaleManager(context, osc, cluster)
|
||||
manager = scale_manager.get_scale_manager(context, osc, cluster)
|
||||
|
||||
conductor_utils.notify_about_cluster_operation(
|
||||
context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_PENDING)
|
||||
|
@ -12,6 +12,8 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
from marathon import MarathonClient
|
||||
from oslo_log import log as logging
|
||||
|
||||
from magnum.common import exception
|
||||
@ -25,6 +27,21 @@ from magnum import objects
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_scale_manager(context, osclient, cluster):
|
||||
manager = None
|
||||
coe = cluster.baymodel.coe
|
||||
if coe == 'kubernetes':
|
||||
manager = K8sScaleManager(context, osclient, cluster)
|
||||
elif coe == 'mesos':
|
||||
manager = MesosScaleManager(context, osclient, cluster)
|
||||
else:
|
||||
LOG.warning(_LW(
|
||||
"Currently only kubernetes and mesos cluster scale manager "
|
||||
"are available"))
|
||||
|
||||
return manager
|
||||
|
||||
|
||||
class ScaleManager(object):
|
||||
|
||||
def __init__(self, context, osclient, cluster):
|
||||
@ -46,13 +63,9 @@ class ScaleManager(object):
|
||||
"%(stack_id)s") % {'output_key': hosts_output.heat_output,
|
||||
'stack_id': stack.id})
|
||||
|
||||
hosts_no_container = list(hosts)
|
||||
k8s_api = k8s.create_k8s_api(self.context, cluster)
|
||||
for pod in k8s_api.list_namespaced_pod(namespace='default').items:
|
||||
host = pod.spec.node_name
|
||||
if host in hosts_no_container:
|
||||
hosts_no_container.remove(host)
|
||||
|
||||
hosts_with_container = self._get_hosts_with_container(self.context,
|
||||
cluster)
|
||||
hosts_no_container = list(set(hosts) - hosts_with_container)
|
||||
LOG.debug('List of hosts that has no container: %s',
|
||||
str(hosts_no_container))
|
||||
|
||||
@ -76,3 +89,46 @@ class ScaleManager(object):
|
||||
|
||||
def _get_num_of_removal(self):
|
||||
return self.old_cluster.node_count - self.new_cluster.node_count
|
||||
|
||||
@abc.abstractmethod
|
||||
def _get_hosts_with_container(self, context, cluster):
|
||||
"""Return the hosts with container running on them."""
|
||||
pass
|
||||
|
||||
|
||||
class K8sScaleManager(ScaleManager):
|
||||
|
||||
def __init__(self, context, osclient, cluster):
|
||||
super(K8sScaleManager, self).__init__(context, osclient, cluster)
|
||||
|
||||
def _get_hosts_with_container(self, context, cluster):
|
||||
k8s_api = k8s.create_k8s_api(self.context, cluster)
|
||||
hosts = set()
|
||||
for pod in k8s_api.list_namespaced_pod(namespace='default').items:
|
||||
hosts.add(pod.spec.node_name)
|
||||
|
||||
return hosts
|
||||
|
||||
|
||||
class MesosScaleManager(ScaleManager):
|
||||
"""When scaling a mesos cluster, MesosScaleManager will inspect the
|
||||
|
||||
nodes and find out those with containers on them. Thus we can
|
||||
ask Heat to delete the nodes without containers. Note that this
|
||||
is a best effort basis -- Magnum doesn't have any synchronization
|
||||
with Marathon, so while Magnum is checking for the containers to
|
||||
choose nodes to remove, new containers can be deployed on the
|
||||
nodes to be removed.
|
||||
"""
|
||||
|
||||
def __init__(self, context, osclient, cluster):
|
||||
super(MesosScaleManager, self).__init__(context, osclient, cluster)
|
||||
|
||||
def _get_hosts_with_container(self, context, cluster):
|
||||
marathon_client = MarathonClient(
|
||||
'http://' + cluster.api_address + ':8080')
|
||||
hosts = set()
|
||||
for task in marathon_client.list_tasks():
|
||||
hosts.add(task.host)
|
||||
|
||||
return hosts
|
||||
|
@ -48,7 +48,7 @@ class TestHandler(db_base.DbTestCase):
|
||||
self.cluster = objects.Cluster(self.context, **cluster_dict)
|
||||
self.cluster.create()
|
||||
|
||||
@patch('magnum.conductor.scale_manager.ScaleManager')
|
||||
@patch('magnum.conductor.scale_manager.get_scale_manager')
|
||||
@patch(
|
||||
'magnum.conductor.handlers.cluster_conductor.Handler._poll_and_check')
|
||||
@patch('magnum.conductor.handlers.cluster_conductor._update_stack')
|
||||
@ -116,7 +116,7 @@ class TestHandler(db_base.DbTestCase):
|
||||
cluster = objects.Cluster.get(self.context, self.cluster.uuid)
|
||||
self.assertEqual(1, cluster.node_count)
|
||||
|
||||
@patch('magnum.conductor.scale_manager.ScaleManager')
|
||||
@patch('magnum.conductor.scale_manager.get_scale_manager')
|
||||
@patch(
|
||||
'magnum.conductor.handlers.cluster_conductor.Handler._poll_and_check')
|
||||
@patch('magnum.conductor.handlers.cluster_conductor._update_stack')
|
||||
|
@ -21,23 +21,39 @@ from magnum.tests import base
|
||||
|
||||
class TestScaleManager(base.TestCase):
|
||||
|
||||
@mock.patch('magnum.objects.Cluster.get_by_uuid')
|
||||
def test_get_scale_manager(self, mock_cluster_get):
|
||||
mock_context = mock.MagicMock()
|
||||
mock_osc = mock.MagicMock()
|
||||
k8s_cluster = mock.MagicMock()
|
||||
k8s_cluster.baymodel.coe = 'kubernetes'
|
||||
mesos_cluster = mock.MagicMock()
|
||||
mesos_cluster.baymodel.coe = 'mesos'
|
||||
invalid_cluster = mock.MagicMock()
|
||||
invalid_cluster.baymodel.coe = 'fake'
|
||||
|
||||
mgr = scale_manager.get_scale_manager(
|
||||
mock_context, mock_osc, k8s_cluster)
|
||||
self.assertIsInstance(mgr, scale_manager.K8sScaleManager)
|
||||
|
||||
mgr = scale_manager.get_scale_manager(
|
||||
mock_context, mock_osc, mesos_cluster)
|
||||
self.assertIsInstance(mgr, scale_manager.MesosScaleManager)
|
||||
|
||||
mgr = scale_manager.get_scale_manager(
|
||||
mock_context, mock_osc, invalid_cluster)
|
||||
self.assertIsNone(mgr)
|
||||
|
||||
def _test_get_removal_nodes(
|
||||
self, mock_create_k8s_api, mock_get_num_of_removal,
|
||||
self, mock_get_hosts, mock_get_num_of_removal,
|
||||
mock_is_scale_down, mock_get_by_uuid, is_scale_down,
|
||||
num_of_removal, all_hosts, pod_hosts, expected_removal_hosts):
|
||||
num_of_removal, all_hosts, container_hosts,
|
||||
expected_removal_hosts):
|
||||
|
||||
mock_is_scale_down.return_value = is_scale_down
|
||||
mock_get_num_of_removal.return_value = num_of_removal
|
||||
|
||||
pods = list()
|
||||
for h in pod_hosts:
|
||||
pod = mock.MagicMock()
|
||||
pod.spec.node_name = h
|
||||
pods.append(pod)
|
||||
|
||||
mock_k8s_api = mock.MagicMock()
|
||||
mock_k8s_api.list_namespaced_pod.return_value.items = pods
|
||||
mock_create_k8s_api.return_value = mock_k8s_api
|
||||
mock_get_hosts.return_value = container_hosts
|
||||
|
||||
mock_heat_output = mock.MagicMock()
|
||||
mock_heat_output.get_output_value.return_value = all_hosts
|
||||
@ -60,119 +76,168 @@ class TestScaleManager(base.TestCase):
|
||||
removal_hosts = scale_mgr.get_removal_nodes(mock_heat_output)
|
||||
self.assertEqual(expected_removal_hosts, removal_hosts)
|
||||
if num_of_removal > 0:
|
||||
mock_create_k8s_api.assert_called_once_with(mock_context,
|
||||
mock_get_hosts.assert_called_once_with(mock_context,
|
||||
mock_cluster)
|
||||
|
||||
@mock.patch('magnum.objects.Cluster.get_by_uuid')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager._is_scale_down')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_num_of_removal')
|
||||
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
|
||||
def test_get_removal_nodes_no_pod(
|
||||
self, mock_create_k8s_api, mock_get_num_of_removal,
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_hosts_with_container')
|
||||
def test_get_removal_nodes_no_container_host(
|
||||
self, mock_get_hosts, mock_get_num_of_removal,
|
||||
mock_is_scale_down, mock_get_by_uuid):
|
||||
|
||||
is_scale_down = True
|
||||
num_of_removal = 1
|
||||
hosts = ['10.0.0.3', '10.0.0.4']
|
||||
pods = []
|
||||
all_hosts = ['10.0.0.3']
|
||||
container_hosts = set()
|
||||
expected_removal_hosts = ['10.0.0.3']
|
||||
self._test_get_removal_nodes(
|
||||
mock_create_k8s_api, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, hosts, pods,
|
||||
expected_removal_hosts)
|
||||
mock_get_hosts, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, all_hosts,
|
||||
container_hosts, expected_removal_hosts)
|
||||
|
||||
@mock.patch('magnum.objects.Cluster.get_by_uuid')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager._is_scale_down')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_num_of_removal')
|
||||
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
|
||||
def test_get_removal_nodes_one_pod(
|
||||
self, mock_create_k8s_api, mock_get_num_of_removal,
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_hosts_with_container')
|
||||
def test_get_removal_nodes_one_container_host(
|
||||
self, mock_get_hosts, mock_get_num_of_removal,
|
||||
mock_is_scale_down, mock_get_by_uuid):
|
||||
|
||||
is_scale_down = True
|
||||
num_of_removal = 1
|
||||
hosts = ['10.0.0.3', '10.0.0.4']
|
||||
pods = ['10.0.0.3']
|
||||
all_hosts = ['10.0.0.3', '10.0.0.4']
|
||||
container_hosts = set(['10.0.0.3'])
|
||||
expected_removal_hosts = ['10.0.0.4']
|
||||
self._test_get_removal_nodes(
|
||||
mock_create_k8s_api, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, hosts, pods,
|
||||
expected_removal_hosts)
|
||||
mock_get_hosts, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, all_hosts,
|
||||
container_hosts, expected_removal_hosts)
|
||||
|
||||
@mock.patch('magnum.objects.Cluster.get_by_uuid')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager._is_scale_down')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_num_of_removal')
|
||||
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
|
||||
def test_get_removal_nodes_two_pods(
|
||||
self, mock_create_k8s_api, mock_get_num_of_removal,
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_hosts_with_container')
|
||||
def test_get_removal_nodes_two_container_hosts(
|
||||
self, mock_get_hosts, mock_get_num_of_removal,
|
||||
mock_is_scale_down, mock_get_by_uuid):
|
||||
|
||||
is_scale_down = True
|
||||
num_of_removal = 1
|
||||
hosts = ['10.0.0.3', '10.0.0.4']
|
||||
pods = ['10.0.0.3', '10.0.0.4']
|
||||
all_hosts = ['10.0.0.3', '10.0.0.4']
|
||||
container_hosts = set(['10.0.0.3', '10.0.0.4'])
|
||||
expected_removal_hosts = []
|
||||
self._test_get_removal_nodes(
|
||||
mock_create_k8s_api, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, hosts, pods,
|
||||
expected_removal_hosts)
|
||||
mock_get_hosts, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, all_hosts,
|
||||
container_hosts, expected_removal_hosts)
|
||||
|
||||
@mock.patch('magnum.objects.Cluster.get_by_uuid')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager._is_scale_down')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_num_of_removal')
|
||||
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
|
||||
def test_get_removal_nodes_three_pods(
|
||||
self, mock_create_k8s_api, mock_get_num_of_removal,
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_hosts_with_container')
|
||||
def test_get_removal_nodes_three_container_hosts(
|
||||
self, mock_get_hosts, mock_get_num_of_removal,
|
||||
mock_is_scale_down, mock_get_by_uuid):
|
||||
|
||||
is_scale_down = True
|
||||
num_of_removal = 1
|
||||
hosts = ['10.0.0.3', '10.0.0.4']
|
||||
pods = ['10.0.0.3', '10.0.0.4', '10.0.0.5']
|
||||
all_hosts = ['10.0.0.3', '10.0.0.4']
|
||||
container_hosts = set(['10.0.0.3', '10.0.0.4', '10.0.0.5'])
|
||||
expected_removal_hosts = []
|
||||
self._test_get_removal_nodes(
|
||||
mock_create_k8s_api, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, hosts, pods,
|
||||
expected_removal_hosts)
|
||||
mock_get_hosts, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, all_hosts,
|
||||
container_hosts, expected_removal_hosts)
|
||||
|
||||
@mock.patch('magnum.objects.Cluster.get_by_uuid')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager._is_scale_down')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_num_of_removal')
|
||||
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_hosts_with_container')
|
||||
def test_get_removal_nodes_scale_up(
|
||||
self, mock_create_k8s_api, mock_get_num_of_removal,
|
||||
self, mock_get_hosts, mock_get_num_of_removal,
|
||||
mock_is_scale_down, mock_get_by_uuid):
|
||||
|
||||
is_scale_down = False
|
||||
num_of_removal = -1
|
||||
hosts = ['10.0.0.3', '10.0.0.4']
|
||||
pods = []
|
||||
all_hosts = ['10.0.0.3', '10.0.0.4']
|
||||
container_hosts = set()
|
||||
expected_removal_hosts = []
|
||||
self._test_get_removal_nodes(
|
||||
mock_create_k8s_api, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, hosts, pods,
|
||||
expected_removal_hosts)
|
||||
mock_get_hosts, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, all_hosts,
|
||||
container_hosts, expected_removal_hosts)
|
||||
|
||||
@mock.patch('magnum.objects.Cluster.get_by_uuid')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager._is_scale_down')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_num_of_removal')
|
||||
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
|
||||
@mock.patch('magnum.conductor.scale_manager.ScaleManager.'
|
||||
'_get_hosts_with_container')
|
||||
def test_get_removal_nodes_with_none_hosts(
|
||||
self, mock_create_k8s_api, mock_get_num_of_removal,
|
||||
self, mock_get_hosts, mock_get_num_of_removal,
|
||||
mock_is_scale_down, mock_get_by_uuid):
|
||||
|
||||
is_scale_down = True
|
||||
num_of_removal = 1
|
||||
hosts = None
|
||||
pods = []
|
||||
all_hosts = None
|
||||
container_hosts = set()
|
||||
expected_removal_hosts = None
|
||||
self._test_get_removal_nodes(
|
||||
mock_create_k8s_api, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, hosts, pods,
|
||||
expected_removal_hosts)
|
||||
mock_get_hosts, mock_get_num_of_removal, mock_is_scale_down,
|
||||
mock_get_by_uuid, is_scale_down, num_of_removal, all_hosts,
|
||||
container_hosts, expected_removal_hosts)
|
||||
|
||||
|
||||
class TestK8sScaleManager(base.TestCase):
|
||||
|
||||
@mock.patch('magnum.objects.Cluster.get_by_uuid')
|
||||
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
|
||||
def test_get_hosts_with_container(self, mock_create_api, mock_get):
|
||||
pods = mock.MagicMock()
|
||||
pod_1 = mock.MagicMock()
|
||||
pod_1.spec.node_name = 'node1'
|
||||
pod_2 = mock.MagicMock()
|
||||
pod_2.spec.node_name = 'node2'
|
||||
pods.items = [pod_1, pod_2]
|
||||
mock_api = mock.MagicMock()
|
||||
mock_api.list_namespaced_pod.return_value = pods
|
||||
mock_create_api.return_value = mock_api
|
||||
|
||||
mgr = scale_manager.K8sScaleManager(
|
||||
mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
|
||||
hosts = mgr._get_hosts_with_container(
|
||||
mock.MagicMock(), mock.MagicMock())
|
||||
self.assertEqual(hosts, {'node1', 'node2'})
|
||||
|
||||
|
||||
class TestMesosScaleManager(base.TestCase):
|
||||
|
||||
@mock.patch('magnum.objects.Cluster.get_by_uuid')
|
||||
@mock.patch('marathon.MarathonClient')
|
||||
@mock.patch('marathon.MarathonClient.list_tasks')
|
||||
def test_get_hosts_with_container(self, mock_list_tasks,
|
||||
mock_client, mock_get):
|
||||
task_1 = mock.MagicMock()
|
||||
task_1.host = 'node1'
|
||||
task_2 = mock.MagicMock()
|
||||
task_2.host = 'node2'
|
||||
tasks = [task_1, task_2]
|
||||
mock_list_tasks.return_value = tasks
|
||||
|
||||
mgr = scale_manager.MesosScaleManager(
|
||||
mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
|
||||
hosts = mgr._get_hosts_with_container(
|
||||
mock.MagicMock(), mock.MagicMock())
|
||||
self.assertEqual(hosts, {'node1', 'node2'})
|
||||
|
@ -22,6 +22,7 @@ iso8601>=0.1.11 # MIT
|
||||
jsonpatch>=1.1 # BSD
|
||||
keystoneauth1>=2.10.0 # Apache-2.0
|
||||
keystonemiddleware!=4.5.0,>=4.2.0 # Apache-2.0
|
||||
marathon>=0.8.6 # MIT
|
||||
netaddr!=0.7.16,>=0.7.13 # BSD
|
||||
oslo.concurrency>=3.8.0 # Apache-2.0
|
||||
oslo.config>=3.14.0 # Apache-2.0
|
||||
|
Loading…
x
Reference in New Issue
Block a user