Monitor driver for k8s bay type

Add support for computing memory utilization of k8s bay

Change-Id: I3eecd5e9ceab77bb1877dff2229ba2eb18a5dd76
Closes-Bug: #1500299
This commit is contained in:
Bharath Thiruveedula 2015-10-14 20:13:25 +05:30
parent d90bc0ef7d
commit 61c9017335
8 changed files with 407 additions and 95 deletions

View File

@ -502,3 +502,7 @@ class MagnumServiceNotFound(ResourceNotFound):
class MagnumServiceAlreadyExists(Conflict): class MagnumServiceAlreadyExists(Conflict):
message = _("A magnum service with ID %(id)s already exists.") message = _("A magnum service with ID %(id)s already exists.")
class UnsupportedK8sMemoryFormat(MagnumException):
message = _("Unsupported memory format for k8s bay.")

View File

@ -55,6 +55,23 @@ CONF.register_opts(UTILS_OPTS)
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
MEMORY_UNITS = {
'Ki': 2 ** 10,
'Mi': 2 ** 20,
'Gi': 2 ** 30,
'Ti': 2 ** 40,
'Pi': 2 ** 50,
'Ei': 2 ** 60,
'm': 10 ** -3,
'k': 10 ** 3,
'M': 10 ** 6,
'G': 10 ** 9,
'T': 10 ** 12,
'p': 10 ** 15,
'E': 10 ** 18,
'': 1
}
def _get_root_helper(): def _get_root_helper():
return 'sudo magnum-rootwrap %s' % CONF.rootwrap_config return 'sudo magnum-rootwrap %s' % CONF.rootwrap_config
@ -518,3 +535,31 @@ def raise_exception_invalid_scheme(url):
scheme = url.split(':')[0] scheme = url.split(':')[0]
if scheme not in valid_schemes: if scheme not in valid_schemes:
raise exception.Urllib2InvalidScheme(url=url) raise exception.Urllib2InvalidScheme(url=url)
def get_memory_bytes(memory):
"""Kubernetes memory format must be in the format of:
<signedNumber><suffix>
signedNumber = digits|digits.digits|digits.|.digits
suffix = Ki|Mi|Gi|Ti|Pi|Ei|m|k|M|G|T|P|E|''
or suffix = E|e<signedNumber>
digits = digit | digit<digits>
digit = 0|1|2|3|4|5|6|7|8|9
"""
signed_num_regex = r"(^\d+\.\d+)|(^\d+\.)|(\.\d+)|(^\d+)"
matched_signed_number = re.search(signed_num_regex, memory)
if matched_signed_number is None:
raise exception.UnsupportedK8sMemoryFormat()
else:
signed_number = matched_signed_number.group(0)
suffix = memory.replace(signed_number, '', 1)
if suffix == '':
return float(memory)
if re.search(r"^(Ki|Mi|Gi|Ti|Pi|Ei|m|k|M|G|T|P|E|'')$", suffix):
return float(signed_number) * MEMORY_UNITS[suffix]
elif re.search(r"^[E|e][+|-]?(\d+\.\d+$)|(\d+\.$)|(\.\d+$)|(\d+$)",
suffix):
return float(signed_number) * (10 ** float(suffix[1:]))
else:
raise exception.UnsupportedK8sMemoryFormat()

View File

@ -0,0 +1,150 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
from magnum.common import utils
from magnum.conductor import k8s_api as k8s
from magnum.conductor.monitors import MonitorBase
class K8sMonitor(MonitorBase):
def __init__(self, context, bay):
super(K8sMonitor, self).__init__(context, bay)
self.data = {}
self.data['nodes'] = []
self.data['pods'] = []
@property
def metrics_spec(self):
return {
'memory_util': {
'unit': '%',
'func': 'compute_memory_util',
},
}
def pull_data(self):
k8s_api = k8s.create_k8s_api(self.context, self.bay)
nodes = k8s_api.list_namespaced_node()
self.data['nodes'] = self._parse_node_info(nodes)
pods = k8s_api.list_namespaced_pod('default')
self.data['pods'] = self._parse_pod_info(pods)
def compute_memory_util(self):
mem_total = 0
for node in self.data['nodes']:
mem_total += node['Memory']
mem_reserved = 0
for pod in self.data['pods']:
mem_reserved += pod['Memory']
if mem_total == 0:
return 0
else:
return mem_reserved * 100 / mem_total
def _parse_pod_info(self, pods):
"""Parse pods and retrieve memory details about each pod
:param pods: Thr output k8s_api.list_namespaced_pods()
For example:
{
'items': [{
'status': {
'container_statuses': None,
'pod_ip': None,
'phase': 'Pending',
'message': None,
'conditions': None,
},
'spec': {
'containers': [{
'image': 'nginx',
'resources': {'requests': None,
'limits': "{u'memory': u'1280e3'}"},
}],
},
'api_version': None,
}],
'kind': 'PodList',
}
The above output is the dict form of:
magnum.common.pythonk8sclient.swagger_client.models.v1_pod_list.
V1PodList object
:return: Memory size of each pod. Example:
[{'Memory': 1280000.0},
{'Memory': 1280000.0}]
"""
pods = pods.items
parsed_containers = []
for pod in pods:
containers = pod.spec.containers
for container in containers:
memory = 0
resources = container.resources
limits = resources.limits
if limits is not None:
# Output of resources.limits is string
# for example:
# limits = "{'memory': '1000Ki'}"
limits = ast.literal_eval(limits)
if limits.get('memory', ''):
memory = utils.get_memory_bytes(limits['memory'])
container_dict = {
'Memory': memory
}
parsed_containers.append(container_dict)
return parsed_containers
def _parse_node_info(self, nodes):
"""Parse nodes to retrieve memory of each node
:param nodes: The output of k8s_api.list_namespaced_node()
For example:
{
'items': [{
'status': {
'phase': None,
'capacity': "{u'memory': u'2049852Ki'}",
},
},
'api_version': None,
}],
'kind': 'NodeList',
'api_version': 'v1',
}
The above output is the dict form of:
magnum.common.pythonk8sclient.swagger_client.models.v1_node_list.
V1NodeList object
:return: Memory size of each node. Excample:
[{'Memory': 1024.0},
{'Memory': 1024.0}]
"""
nodes = nodes.items
parsed_nodes = []
for node in nodes:
# Output of node.status.capacity is strong
# for example:
# capacity = "{'memory': '1000Ki'}"
capacity = ast.literal_eval(node.status.capacity)
memory = utils.get_memory_bytes(capacity['memory'])
parsed_nodes.append({'Memory': memory})
return parsed_nodes

View File

@ -19,8 +19,6 @@ from oslo_config import cfg
from oslo_log import log from oslo_log import log
import six import six
from magnum.common import docker_utils
from magnum.i18n import _LW
from magnum import objects from magnum import objects
from magnum.objects.fields import BayType as bay_type from magnum.objects.fields import BayType as bay_type
@ -63,98 +61,14 @@ class MonitorBase(object):
return func() return func()
class SwarmMonitor(MonitorBase):
def __init__(self, context, bay):
super(SwarmMonitor, self).__init__(context, bay)
self.data = {}
self.data['nodes'] = []
self.data['containers'] = []
@property
def metrics_spec(self):
return {
'memory_util': {
'unit': '%',
'func': 'compute_memory_util',
},
}
def pull_data(self):
with docker_utils.docker_for_bay(self.context,
self.bay) as docker:
system_info = docker.info()
self.data['nodes'] = self._parse_node_info(system_info)
# pull data from each container
containers = []
for container in docker.containers(all=True):
try:
container = docker.inspect_container(container['Id'])
except Exception as e:
LOG.warn(_LW("Ignore error [%(e)s] when inspecting "
"container %(container_id)s."),
{'e': e, 'container_id': container['Id']},
exc_info=True)
containers.append(container)
self.data['containers'] = containers
def compute_memory_util(self):
mem_total = 0
for node in self.data['nodes']:
mem_total += node['MemTotal']
mem_reserved = 0
for container in self.data['containers']:
mem_reserved += container['HostConfig']['Memory']
if mem_total == 0:
return 0
else:
return mem_reserved * 100 / mem_total
def _parse_node_info(self, system_info):
"""Parse system_info to retrieve memory size of each node.
:param system_info: The output returned by docker.info(). Example:
{
u'Debug': False,
u'NEventsListener': 0,
u'DriverStatus': [
[u'\x08Strategy', u'spread'],
[u'\x08Filters', u'...'],
[u'\x08Nodes', u'2'],
[u'node1', u'10.0.0.4:2375'],
[u' \u2514 Containers', u'1'],
[u' \u2514 Reserved CPUs', u'0 / 1'],
[u' \u2514 Reserved Memory', u'0 B / 2.052 GiB'],
[u'node2', u'10.0.0.3:2375'],
[u' \u2514 Containers', u'2'],
[u' \u2514 Reserved CPUs', u'0 / 1'],
[u' \u2514 Reserved Memory', u'0 B / 2.052 GiB']
],
u'Containers': 3
}
:return: Memory size of each node. Excample:
[{'MemTotal': 2203318222.848},
{'MemTotal': 2203318222.848}]
"""
nodes = []
for info in system_info['DriverStatus']:
key = info[0]
value = info[1]
if key == u' \u2514 Reserved Memory':
memory = value # Example: '0 B / 2.052 GiB'
memory = memory.split('/')[1].strip() # Example: '2.052 GiB'
memory = memory.split(' ')[0] # Example: '2.052'
memory = float(memory) * 1024 * 1024 * 1024
nodes.append({'MemTotal': memory})
return nodes
def create_monitor(context, bay): def create_monitor(context, bay):
baymodel = objects.BayModel.get_by_uuid(context, bay.baymodel_id) baymodel = objects.BayModel.get_by_uuid(context, bay.baymodel_id)
if baymodel.coe == bay_type.SWARM: if baymodel.coe == bay_type.SWARM:
from magnum.conductor.swarm_monitor import SwarmMonitor
return SwarmMonitor(context, bay) return SwarmMonitor(context, bay)
elif baymodel.coe == bay_type.KUBERNETES:
from magnum.conductor.k8s_monitor import K8sMonitor
return K8sMonitor(context, bay)
# TODO(hongbin): add support for other bay types # TODO(hongbin): add support for other bay types
LOG.debug("Cannot create monitor with bay type '%s'" % baymodel.coe) LOG.debug("Cannot create monitor with bay type '%s'" % baymodel.coe)

View File

@ -0,0 +1,110 @@
# Copyright 2015 Huawei Technologies Co.,LTD.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log
from magnum.common import docker_utils
from magnum.conductor.monitors import MonitorBase
from magnum.i18n import _LW
LOG = log.getLogger(__name__)
class SwarmMonitor(MonitorBase):
def __init__(self, context, bay):
super(SwarmMonitor, self).__init__(context, bay)
self.data = {}
self.data['nodes'] = []
self.data['containers'] = []
@property
def metrics_spec(self):
return {
'memory_util': {
'unit': '%',
'func': 'compute_memory_util',
},
}
def pull_data(self):
with docker_utils.docker_for_bay(self.context,
self.bay) as docker:
system_info = docker.info()
self.data['nodes'] = self._parse_node_info(system_info)
# pull data from each container
containers = []
for container in docker.containers(all=True):
try:
container = docker.inspect_container(container['Id'])
except Exception as e:
LOG.warn(_LW("Ignore error [%(e)s] when inspecting "
"container %(container_id)s."),
{'e': e, 'container_id': container['Id']},
exc_info=True)
containers.append(container)
self.data['containers'] = containers
def compute_memory_util(self):
mem_total = 0
for node in self.data['nodes']:
mem_total += node['MemTotal']
mem_reserved = 0
for container in self.data['containers']:
mem_reserved += container['HostConfig']['Memory']
if mem_total == 0:
return 0
else:
return mem_reserved * 100 / mem_total
def _parse_node_info(self, system_info):
"""Parse system_info to retrieve memory size of each node.
:param system_info: The output returned by docker.info(). Example:
{
u'Debug': False,
u'NEventsListener': 0,
u'DriverStatus': [
[u'\x08Strategy', u'spread'],
[u'\x08Filters', u'...'],
[u'\x08Nodes', u'2'],
[u'node1', u'10.0.0.4:2375'],
[u' \u2514 Containers', u'1'],
[u' \u2514 Reserved CPUs', u'0 / 1'],
[u' \u2514 Reserved Memory', u'0 B / 2.052 GiB'],
[u'node2', u'10.0.0.3:2375'],
[u' \u2514 Containers', u'2'],
[u' \u2514 Reserved CPUs', u'0 / 1'],
[u' \u2514 Reserved Memory', u'0 B / 2.052 GiB']
],
u'Containers': 3
}
:return: Memory size of each node. Excample:
[{'MemTotal': 2203318222.848},
{'MemTotal': 2203318222.848}]
"""
nodes = []
for info in system_info['DriverStatus']:
key = info[0]
value = info[1]
if key == u' \u2514 Reserved Memory':
memory = value # Example: '0 B / 2.052 GiB'
memory = memory.split('/')[1].strip() # Example: '2.052 GiB'
memory = memory.split(' ')[0] # Example: '2.052'
memory = float(memory) * 1024 * 1024 * 1024
nodes.append({'MemTotal': memory})
return nodes

View File

@ -126,6 +126,15 @@ class UtilsTestCase(base.TestCase):
utils.convert_to_list_dict(['first', 'second'], utils.convert_to_list_dict(['first', 'second'],
'fred')) 'fred'))
def test_get_memory_bytes(self):
self.assertEqual(1024000.0, utils.get_memory_bytes('1000Ki'))
self.assertEqual(0.001, utils.get_memory_bytes('1E-3'))
self.assertEqual(0.5, utils.get_memory_bytes('0.0005k'))
self.assertEqual(1300000.0, utils.get_memory_bytes('1.3E+6'))
self.assertEqual(1300000.0, utils.get_memory_bytes('1.3E6'))
self.assertRaises(exception.UnsupportedK8sMemoryFormat,
utils.get_memory_bytes, '1E1E')
class ExecuteTestCase(base.TestCase): class ExecuteTestCase(base.TestCase):

View File

@ -101,7 +101,8 @@ class TestK8sAPI(base.TestCase):
side_effect=self._mock_cert_mgr_get_cert): side_effect=self._mock_cert_mgr_get_cert):
k8s_api.create_k8s_api(context, obj) k8s_api.create_k8s_api(context, obj)
mock_bay_retrieval.assert_called_once_with(context, obj) if cls is not 'Bay':
mock_bay_retrieval.assert_called_once_with(context, obj)
mock_api_client.assert_called_once_with( mock_api_client.assert_called_once_with(
bay_obj.api_address, bay_obj.api_address,

View File

@ -15,7 +15,9 @@
import mock import mock
from magnum.conductor import k8s_monitor
from magnum.conductor import monitors from magnum.conductor import monitors
from magnum.conductor import swarm_monitor
from magnum import objects from magnum import objects
from magnum.tests import base from magnum.tests import base
from magnum.tests.unit.db import utils from magnum.tests.unit.db import utils
@ -40,9 +42,10 @@ class MonitorsTestCase(base.TestCase):
bay = utils.get_test_bay(node_addresses=['1.2.3.4'], bay = utils.get_test_bay(node_addresses=['1.2.3.4'],
api_address='5.6.7.8') api_address='5.6.7.8')
self.bay = objects.Bay(self.context, **bay) self.bay = objects.Bay(self.context, **bay)
self.monitor = monitors.SwarmMonitor(self.context, self.bay) self.monitor = swarm_monitor.SwarmMonitor(self.context, self.bay)
p = mock.patch('magnum.conductor.monitors.SwarmMonitor.metrics_spec', self.k8s_monitor = k8s_monitor.K8sMonitor(self.context, self.bay)
new_callable=mock.PropertyMock) p = mock.patch('magnum.conductor.swarm_monitor.SwarmMonitor.'
'metrics_spec', new_callable=mock.PropertyMock)
self.mock_metrics_spec = p.start() self.mock_metrics_spec = p.start()
self.mock_metrics_spec.return_value = self.test_metrics_spec self.mock_metrics_spec.return_value = self.test_metrics_spec
self.addCleanup(p.stop) self.addCleanup(p.stop)
@ -53,7 +56,15 @@ class MonitorsTestCase(base.TestCase):
baymodel.coe = 'swarm' baymodel.coe = 'swarm'
mock_baymodel_get_by_uuid.return_value = baymodel mock_baymodel_get_by_uuid.return_value = baymodel
monitor = monitors.create_monitor(self.context, self.bay) monitor = monitors.create_monitor(self.context, self.bay)
self.assertIsInstance(monitor, monitors.SwarmMonitor) self.assertIsInstance(monitor, swarm_monitor.SwarmMonitor)
@mock.patch('magnum.objects.BayModel.get_by_uuid')
def test_create_monitor_k8s_bay(self, mock_baymodel_get_by_uuid):
baymodel = mock.MagicMock()
baymodel.coe = 'kubernetes'
mock_baymodel_get_by_uuid.return_value = baymodel
monitor = monitors.create_monitor(self.context, self.bay)
self.assertIsInstance(monitor, k8s_monitor.K8sMonitor)
@mock.patch('magnum.objects.BayModel.get_by_uuid') @mock.patch('magnum.objects.BayModel.get_by_uuid')
def test_create_monitor_unsupported_coe(self, mock_baymodel_get_by_uuid): def test_create_monitor_unsupported_coe(self, mock_baymodel_get_by_uuid):
@ -137,3 +148,71 @@ class MonitorsTestCase(base.TestCase):
self.monitor.data = test_data self.monitor.data = test_data
mem_util = self.monitor.compute_memory_util() mem_util = self.monitor.compute_memory_util()
self.assertEqual(0, mem_util) self.assertEqual(0, mem_util)
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
def test_k8s_monitor_pull_data_success(self, mock_k8s_api):
mock_nodes = mock.MagicMock()
mock_node = mock.MagicMock()
mock_node.status = mock.MagicMock()
mock_node.status.capacity = "{'memory': '2000Ki'}"
mock_nodes.items = [mock_node]
mock_k8s_api.return_value.list_namespaced_node.return_value = (
mock_nodes)
mock_pods = mock.MagicMock()
mock_pod = mock.MagicMock()
mock_pod.spec = mock.MagicMock()
mock_container = mock.MagicMock()
mock_container.resources = mock.MagicMock()
mock_container.resources.limits = "{'memory':'100Mi'}"
mock_pod.spec.containers = [mock_container]
mock_pods.items = [mock_pod]
mock_k8s_api.return_value.list_namespaced_pod.return_value = mock_pods
self.k8s_monitor.pull_data()
self.assertEqual(self.k8s_monitor.data['nodes'],
[{'Memory': 2048000.0}])
self.assertEqual(self.k8s_monitor.data['pods'],
[{'Memory': 104857600.0}])
def test_k8s_monitor_get_metric_names(self):
k8s_metric_spec = 'magnum.conductor.k8s_monitor.K8sMonitor.'\
'metrics_spec'
with mock.patch(k8s_metric_spec,
new_callable=mock.PropertyMock) as mock_k8s_metric:
mock_k8s_metric.return_value = self.test_metrics_spec
names = self.k8s_monitor.get_metric_names()
self.assertEqual(sorted(['metric1', 'metric2']), sorted(names))
def test_k8s_monitor_get_metric_unit(self):
k8s_metric_spec = 'magnum.conductor.k8s_monitor.K8sMonitor.' \
'metrics_spec'
with mock.patch(k8s_metric_spec,
new_callable=mock.PropertyMock) as mock_k8s_metric:
mock_k8s_metric.return_value = self.test_metrics_spec
unit = self.k8s_monitor.get_metric_unit('metric1')
self.assertEqual('metric1_unit', unit)
def test_k8s_monitor_compute_memory_util(self):
test_data = {
'nodes': [
{
'Memory': 20,
},
],
'pods': [
{
'Memory': 10,
},
],
}
self.k8s_monitor.data = test_data
mem_util = self.k8s_monitor.compute_memory_util()
self.assertEqual(50, mem_util)
test_data = {
'nodes': [],
'pods': [],
}
self.k8s_monitor.data = test_data
mem_util = self.k8s_monitor.compute_memory_util()
self.assertEqual(0, mem_util)