Implement Monitoring Framework

* Changes the monitor function to use a loadable driver

 * Changes the monitoring thread to use a re-entrant lock
   (RLock()) to prevent it from blocking itself during
    recovery actions

Change-Id: Icf40ffd3123f3b804de16c88164d84077fbf28e2
Implements: blueprint health-monitoring
Closes-Bug: 1497474
This commit is contained in:
Bob HADDLETON 2015-09-16 21:03:35 -05:00
parent 139d512047
commit 1afd26a13b
12 changed files with 452 additions and 229 deletions

View File

@ -0,0 +1,40 @@
template_name: sample-vnfd
description: demo-example
service_properties:
Id: sample-vnfd
vendor: tacker
version: 1
vdus:
vdu1:
id: vdu1
vm_image: cirros-0.3.4-x86_64-uec
instance_type: m1.tiny
network_interfaces:
management:
network: net_mgmt
management: true
pkt_in:
network: net0
pkt_out:
network: net1
placement_policy:
availability_zone: nova
auto-scaling: noop
monitoring_policy:
ping:
monitoring_params:
monitoring_delay: 45
count: 3
interval: .5
timeout: 2
actions:
failure: respawn
config:
param0: key0
param1: key1

View File

@ -25,8 +25,6 @@ vdus:
availability_zone: nova
auto-scaling: noop
monitoring_policy: ping
failure_policy: respawn
config:
param0: key0

View File

@ -393,10 +393,12 @@ auth_uri = http://127.0.0.1:5000
infra_driver = heat
# Specify drivers for mgmt
# exmpale: mgmt_driver = noop
mgmt_driver = noop
mgmt_driver = openwrt
# Specify drivers for monitoring
monitor_driver = ping
[servicevm_nova]
# parameters for novaclient to talk to nova
region_name = RegionOne

View File

@ -53,6 +53,8 @@ tacker.servicevm.device.drivers =
tacker.servicevm.mgmt.drivers =
noop = tacker.vm.mgmt_drivers.noop:DeviceMgmtNoop
openwrt = tacker.vm.mgmt_drivers.openwrt.openwrt:DeviceMgmtOpenWRT
tacker.servicevm.monitor.drivers =
ping = tacker.vm.monitor_drivers.ping.ping:VNFMonitorPing
[build_sphinx]

View File

@ -29,7 +29,7 @@ class FakeDriverManager(mock.Mock):
return str(uuid.uuid4())
class FakeDeviceStatus(mock.Mock):
class FakeVNFMonitor(mock.Mock):
pass
@ -43,7 +43,7 @@ class TestVNFMPlugin(db_base.SqlTestCase):
self.addCleanup(mock.patch.stopall)
self.context = context.get_admin_context()
self._mock_device_manager()
self._mock_device_status()
self._mock_vnf_monitor()
self._mock_green_pool()
self.vnfm_plugin = plugin.VNFMPlugin()
@ -56,12 +56,12 @@ class TestVNFMPlugin(db_base.SqlTestCase):
self._mock(
'tacker.common.driver_manager.DriverManager', fake_device_manager)
def _mock_device_status(self):
self._device_status = mock.Mock(wraps=FakeDeviceStatus())
fake_device_status = mock.Mock()
fake_device_status.return_value = self._device_status
def _mock_vnf_monitor(self):
self._vnf_monitor = mock.Mock(wraps=FakeVNFMonitor())
fake_vnf_monitor = mock.Mock()
fake_vnf_monitor.return_value = self._vnf_monitor
self._mock(
'tacker.vm.monitor.DeviceStatus', fake_device_status)
'tacker.vm.monitor.VNFMonitor', fake_vnf_monitor)
def _mock_green_pool(self):
self._pool = mock.Mock(wraps=FakeGreenPool())
@ -142,7 +142,7 @@ class TestVNFMPlugin(db_base.SqlTestCase):
plugin=mock.ANY,
context=mock.ANY,
device_id=mock.ANY)
self._device_status.delete_hosting_device.assert_called_with(mock.ANY)
self._vnf_monitor.delete_hosting_vnf.assert_called_with(mock.ANY)
self._pool.spawn_n.assert_called_once_with(mock.ANY, mock.ANY,
mock.ANY)

View File

@ -245,6 +245,8 @@ class DeviceHeat(abstract_driver.DeviceAbstractDriver):
if vnfd_key in vnfd_dict:
template_dict[key] = vnfd_dict[vnfd_key]
monitoring_dict = {'vdus': {}}
for vdu_id, vdu_dict in vnfd_dict.get('vdus', {}).items():
template_dict.setdefault('resources', {})[vdu_id] = {
"type": "OS::Nova::Server"
@ -277,15 +279,32 @@ class DeviceHeat(abstract_driver.DeviceAbstractDriver):
for key, value in metadata.items():
metadata[key] = value[:255]
# monitoring_policy = vdu_dict.get('monitoring_policy', None)
# failure_policy = vdu_dict.get('failure_policy', None)
monitoring_policy = vdu_dict.get('monitoring_policy', 'noop')
failure_policy = vdu_dict.get('failure_policy', 'noop')
# Convert the old monitoring specification to the new format
# This should be removed after Mitaka
if monitoring_policy == 'ping' and failure_policy == 'respawn':
vdu_dict['monitoring_policy'] = {'ping': {
'actions': {
'failure': 'respawn'
}}}
vdu_dict.pop('failure_policy')
if monitoring_policy != 'noop':
monitoring_dict['vdus'][vdu_id] = \
vdu_dict['monitoring_policy']
# to pass necessary parameters to plugin upwards.
for key in ('monitoring_policy', 'failure_policy',
'service_type'):
for key in ('service_type'):
if key in vdu_dict:
device.setdefault(
'attributes', {})[key] = vdu_dict[key]
'attributes', {})[vdu_id] = jsonutils.dumps(
{key: vdu_dict[key]})
if monitoring_dict.keys():
device['attributes']['monitoring_policy'] = jsonutils.dumps(
monitoring_dict)
if config_yaml is not None:
config_dict = yaml.load(config_yaml)

View File

@ -19,6 +19,7 @@
# @author: Isaku Yamahata, Intel Corporation.
import abc
import inspect
import six
import threading
import time
@ -27,9 +28,8 @@ from keystoneclient.v2_0 import client as ks_client
from oslo_config import cfg
from oslo_utils import timeutils
from tacker.agent.linux import utils as linux_utils
from tacker.common import driver_manager
from tacker import context as t_context
from tacker.i18n import _LW
from tacker.openstack.common import jsonutils
from tacker.openstack.common import log as logging
from tacker.vm.drivers.heat import heat
@ -48,127 +48,141 @@ OPTS = [
CONF.register_opts(OPTS, group='monitor')
def _is_pingable(ip):
"""Checks whether an IP address is reachable by pinging.
Use linux utils to execute the ping (ICMP ECHO) command.
Sends 5 packets with an interval of 0.2 seconds and timeout of 1
seconds. Runtime error implies unreachability else IP is pingable.
:param ip: IP to check
:return: bool - True or False depending on pingability.
"""
ping_cmd = ['ping',
'-c', '5',
'-W', '1',
'-i', '0.2',
ip]
try:
linux_utils.execute(ping_cmd, check_exit_code=True)
return True
except RuntimeError:
LOG.warning(_LW("Cannot ping ip address: %s"), ip)
return False
class DeviceStatus(object):
"""Device status"""
class VNFMonitor(object):
"""VNF Monitor"""
_instance = None
_hosting_devices = dict() # device_id => dict of parameters
_hosting_vnfs = dict() # device_id => dict of parameters
_status_check_intvl = 0
_lock = threading.Lock()
_lock = threading.RLock()
OPTS = [
cfg.MultiStrOpt(
'monitor_driver', default=[],
help=_('Monitor driver to communicate with '
'Hosting VNF/logical service '
'instance servicevm plugin will use')),
]
cfg.CONF.register_opts(OPTS, 'servicevm')
def __new__(cls, check_intvl=None):
if not cls._instance:
cls._instance = super(DeviceStatus, cls).__new__(cls)
cls._instance = super(VNFMonitor, cls).__new__(cls)
return cls._instance
def __init__(self, check_intvl=None):
self._monitor_manager = driver_manager.DriverManager(
'tacker.servicevm.monitor.drivers',
cfg.CONF.servicevm.monitor_driver)
if check_intvl is None:
check_intvl = cfg.CONF.monitor.check_intvl
self._status_check_intvl = check_intvl
LOG.debug('Spawning device status thread')
LOG.debug('Spawning VNF monitor thread')
threading.Thread(target=self.__run__).start()
def __run__(self):
while(1):
time.sleep(self._status_check_intvl)
dead_hosting_devices = []
with self._lock:
for hosting_device in self._hosting_devices.values():
if hosting_device.get('dead', False):
for hosting_vnf in self._hosting_vnfs.values():
if hosting_vnf.get('dead', False):
continue
if not timeutils.is_older_than(
hosting_device['boot_at'],
hosting_device['boot_wait']):
continue
if not self.is_hosting_device_reachable(hosting_device):
dead_hosting_devices.append(hosting_device)
for hosting_device in dead_hosting_devices:
hosting_device['down_cb'](hosting_device)
self.run_monitor(hosting_vnf)
@staticmethod
def to_hosting_device(device_dict, down_cb):
def to_hosting_vnf(device_dict, action_cb):
return {
'id': device_dict['id'],
'management_ip_addresses': jsonutils.loads(
device_dict['mgmt_url']),
'boot_wait': cfg.CONF.monitor.boot_wait,
'down_cb': down_cb,
'action_cb': action_cb,
'device': device_dict,
'monitoring_policy': jsonutils.loads(
device_dict['attributes']['monitoring_policy'])
}
def add_hosting_device(self, new_device):
def add_hosting_vnf(self, new_device):
LOG.debug('Adding host %(id)s, Mgmt IP %(ips)s',
{'id': new_device['id'],
'ips': new_device['management_ip_addresses']})
new_device['boot_at'] = timeutils.utcnow()
with self._lock:
self._hosting_devices[new_device['id']] = new_device
self._hosting_vnfs[new_device['id']] = new_device
def delete_hosting_device(self, device_id):
def delete_hosting_vnf(self, device_id):
LOG.debug('deleting device_id %(device_id)s', {'device_id': device_id})
with self._lock:
hosting_device = self._hosting_devices.pop(device_id, None)
if hosting_device:
hosting_vnf = self._hosting_vnfs.pop(device_id, None)
if hosting_vnf:
LOG.debug('deleting device_id %(device_id)s, Mgmt IP %(ips)s',
{'device_id': device_id,
'ips': hosting_device['management_ip_addresses']})
'ips': hosting_vnf['management_ip_addresses']})
def is_hosting_device_reachable(self, hosting_device):
"""Check the hosting device which hosts this resource is reachable.
def run_monitor(self, hosting_vnf):
mgmt_ips = hosting_vnf['management_ip_addresses']
vdupolicies = hosting_vnf['monitoring_policy']['vdus']
If the resource is not reachable, it is added to the backlog.
vnf_delay = hosting_vnf['monitoring_policy'].get(
'monitoring_delay', cfg.CONF.monitor.boot_wait)
:param hosting_device : dict of the hosting device
:return True if device is reachable, else None
"""
for key, mgmt_ip_address in hosting_device[
'management_ip_addresses'].items():
if not _is_pingable(mgmt_ip_address):
LOG.debug('Host %(id)s:%(key)s:%(ip)s, is unreachable',
{'id': hosting_device['id'],
'key': key,
'ip': mgmt_ip_address})
hosting_device['dead_at'] = timeutils.utcnow()
return False
for vdu in vdupolicies.keys():
if hosting_vnf.get('dead'):
return
LOG.debug('Host %(id)s:%(key)s:%(ip)s, is reachable',
{'id': hosting_device['id'],
'key': key,
'ip': mgmt_ip_address})
policy = vdupolicies[vdu]
for driver in policy.keys():
params = policy[driver].get('monitoring_params', {})
return True
vdu_delay = params.get('monitoring_delay', vnf_delay)
if not timeutils.is_older_than(
hosting_vnf['boot_at'],
vdu_delay):
continue
actions = policy[driver].get('actions', {})
if 'mgmt_ip' not in params:
params['mgmt_ip'] = mgmt_ips[vdu]
driver_return = self.monitor_call(driver,
hosting_vnf['device'],
params)
LOG.debug('driver_return %s', driver_return)
if driver_return in actions:
action = actions[driver_return]
hosting_vnf['action_cb'](hosting_vnf, action)
def mark_dead(self, device_id):
self._hosting_devices[device_id]['dead'] = True
self._hosting_vnfs[device_id]['dead'] = True
def _invoke(self, driver, **kwargs):
method = inspect.stack()[1][3]
return self._monitor_manager.invoke(
driver, method, **kwargs)
def monitor_get_config(self, device_dict):
return self._invoke(
device_dict, monitor=self, device=device_dict)
def monitor_url(self, device_dict):
return self._invoke(
device_dict, monitor=self, device=device_dict)
def monitor_call(self, driver, device_dict, kwargs):
return self._invoke(driver,
device=device_dict, kwargs=kwargs)
@six.add_metaclass(abc.ABCMeta)
class FailurePolicy(object):
class ActionPolicy(object):
@classmethod
@abc.abstractmethod
def on_failure(cls, plugin, device_dict):
def execute_action(cls, plugin, device_dict):
pass
_POLICIES = {}
@ -182,25 +196,32 @@ class FailurePolicy(object):
@classmethod
def get_policy(cls, policy, device):
failure_clses = cls._POLICIES.get(policy)
if not failure_clses:
action_clses = cls._POLICIES.get(policy)
if not action_clses:
return None
infra_driver = device['device_template'].get('infra_driver')
cls = failure_clses.get(infra_driver)
cls = action_clses.get(infra_driver)
if cls:
return cls
return failure_clses.get(None)
return action_clses.get(None)
@classmethod
def get_supported_actions(cls):
return cls._POLICIES.keys()
@abc.abstractmethod
def on_failure(cls, plugin, device_dict):
def execute_action(cls, plugin, device_dict):
pass
@FailurePolicy.register('respawn')
class Respawn(FailurePolicy):
@ActionPolicy.register('respawn')
class ActionRespawn(ActionPolicy):
@classmethod
def on_failure(cls, plugin, device_dict):
def execute_action(cls, plugin, device_dict):
LOG.error(_('device %s dead'), device_dict['id'])
if plugin._mark_device_dead(device_dict['id']):
plugin._vnf_monitor.mark_dead(device_dict['id'])
attributes = device_dict['attributes'].copy()
attributes['dead_device_id'] = device_dict['id']
new_device = {'attributes': attributes}
@ -224,16 +245,19 @@ class Respawn(FailurePolicy):
context.auth_token = token['id']
context.tenant_id = token['tenant_id']
context.user_id = token['user_id']
new_device_dict = plugin.create_device(context, {'device': new_device})
new_device_dict = plugin.create_device(context,
{'device': new_device})
LOG.info(_('respawned new device %s'), new_device_dict['id'])
@FailurePolicy.register('respawn', 'heat')
class RespawnHeat(FailurePolicy):
@ActionPolicy.register('respawn', 'heat')
class ActionRespawnHeat(ActionPolicy):
@classmethod
def on_failure(cls, plugin, device_dict):
def execute_action(cls, plugin, device_dict):
device_id = device_dict['id']
LOG.error(_('device %s dead'), device_id)
if plugin._mark_device_dead(device_dict['id']):
plugin._vnf_monitor.mark_dead(device_dict['id'])
attributes = device_dict['attributes']
config = attributes.get('config')
LOG.debug(_('device config %s dead'), config)
@ -282,22 +306,32 @@ class RespawnHeat(FailurePolicy):
{'dead': dead_device_id,
'new': new_device_id,
'cur': device_id})
with context.session.begin(subtransactions=True):
plugin.rename_device_id(context, device_id, dead_device_id)
plugin.rename_device_id(context, new_device_id, device_id)
LOG.debug('Delete dead device')
plugin.delete_device(context, dead_device_id)
new_device_dict['id'] = device_id
if config:
new_device_dict.setdefault('attributes', {})['config'] = config
plugin.config_device(context, new_device_dict)
plugin.config_device(context, new_device_dict)
plugin.add_device_to_monitor(new_device_dict)
@FailurePolicy.register('log_and_kill')
class LogAndKill(FailurePolicy):
@ActionPolicy.register('log')
class ActionLogOnly(ActionPolicy):
@classmethod
def on_failure(cls, plugin, device_dict):
def execute_action(cls, plugin, device_dict):
device_id = device_dict['id']
LOG.error(_('device %s dead'), device_id)
@ActionPolicy.register('log_and_kill')
class ActionLogAndKill(ActionPolicy):
@classmethod
def execute_action(cls, plugin, device_dict):
device_id = device_dict['id']
if plugin._mark_device_dead(device_dict['id']):
plugin._vnf_monitor.mark_dead(device_dict['id'])
plugin.delete_device(t_context.get_admin_context(), device_id)
LOG.error(_('device %s dead'), device_id)

View File

View File

@ -0,0 +1,60 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import abc
import six
from tacker.api import extensions
@six.add_metaclass(abc.ABCMeta)
class VNFMonitorAbstractDriver(extensions.PluginInterface):
@abc.abstractmethod
def get_type(self):
"""Return one of predefined type of the hosting device drivers."""
pass
@abc.abstractmethod
def get_name(self):
"""Return a symbolic name for the VNF Monitor plugin."""
pass
@abc.abstractmethod
def get_description(self):
pass
def monitor_get_config(self, plugin, context, device):
"""
returns dict of monitor configuration data
"""
return {}
@abc.abstractmethod
def monitor_url(self, plugin, context, device):
pass
@abc.abstractmethod
def monitor_call(self, device, kwargs):
pass
def monitor_service_driver(self, plugin, context, device,
service_instance):
# use same monitor driver to communicate with service
return self.get_name()

View File

@ -0,0 +1,78 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo_config import cfg
from tacker.agent.linux import utils as linux_utils
from tacker.common import log
from tacker.i18n import _LW
from tacker.openstack.common import log as logging
from tacker.vm.monitor_drivers import abstract_driver
LOG = logging.getLogger(__name__)
OPTS = [
cfg.StrOpt('count', default='1',
help=_('number of ICMP packets to send')),
cfg.StrOpt('timeout', default='1',
help=_('number of seconds to wait for a response')),
cfg.StrOpt('interval', default='1',
help=_('number of seconds to wait between packets'))
]
cfg.CONF.register_opts(OPTS, 'monitor_ping')
class VNFMonitorPing(abstract_driver.VNFMonitorAbstractDriver):
def get_type(self):
return 'ping'
def get_name(self):
return 'ping'
def get_description(self):
return 'Tacker VNFMonitor Ping Driver'
def monitor_url(self, plugin, context, device):
LOG.debug(_('monitor_url %s'), device)
return device.get('monitor_url', '')
def _is_pingable(self, mgmt_ip="", count=5, timeout=1, interval='0.2',
**kwargs):
"""Checks whether an IP address is reachable by pinging.
Use linux utils to execute the ping (ICMP ECHO) command.
Sends 5 packets with an interval of 0.2 seconds and timeout of 1
seconds. Runtime error implies unreachability else IP is pingable.
:param ip: IP to check
:return: bool - True or string 'failure' depending on pingability.
"""
ping_cmd = ['ping',
'-c', count,
'-W', timeout,
'-i', interval,
mgmt_ip]
try:
linux_utils.execute(ping_cmd, check_exit_code=True)
return True
except RuntimeError:
LOG.warning(_LW("Cannot ping ip address: %s"), mgmt_ip)
return 'failure'
@log.log
def monitor_call(self, device, kwargs):
if not kwargs['mgmt_ip']:
return
return self._is_pingable(**kwargs)

View File

@ -29,7 +29,6 @@ from sqlalchemy.orm import exc as orm_exc
from tacker.api.v1 import attributes
from tacker.common import driver_manager
from tacker import context as t_context
from tacker.db.vm import proxy_db # noqa
from tacker.db.vm import vm_db
from tacker.extensions import vnfm
@ -170,7 +169,7 @@ class VNFMPlugin(vm_db.VNFMPluginDb, VNFMMgmtMixin):
self._device_manager = driver_manager.DriverManager(
'tacker.servicevm.device.drivers',
cfg.CONF.servicevm.infra_driver)
self._device_status = monitor.DeviceStatus()
self._vnf_monitor = monitor.VNFMonitor()
def spawn_n(self, function, *args, **kwargs):
self._pool.spawn_n(function, *args, **kwargs)
@ -212,29 +211,20 @@ class VNFMPlugin(vm_db.VNFMPluginDb, VNFMMgmtMixin):
###########################################################################
# hosting device
def add_device_to_monitor(self, device_dict):
device_id = device_dict['id']
dev_attrs = device_dict['attributes']
if dev_attrs.get('monitoring_policy') == 'ping':
def down_cb(hosting_device_):
if self._mark_device_dead(device_id):
self._device_status.mark_dead(device_id)
device_dict_ = self.get_device(
t_context.get_admin_context(), device_id)
failure_cls = monitor.FailurePolicy.get_policy(
device_dict_['attributes'].get('failure_policy'),
device_dict_)
if failure_cls:
failure_cls.on_failure(self, device_dict_)
hosting_device = self._device_status.to_hosting_device(
device_dict, down_cb)
KEY_LIST = ('monitoring_policy', 'failure_policy')
for key in KEY_LIST:
if key in dev_attrs:
hosting_device[key] = dev_attrs[key]
self._device_status.add_hosting_device(hosting_device)
if 'monitoring_policy' in dev_attrs:
def action_cb(hosting_vnf_, action):
action_cls = monitor.ActionPolicy.get_policy(action,
device_dict)
if action_cls:
action_cls.execute_action(self, hosting_vnf['device'])
hosting_vnf = self._vnf_monitor.to_hosting_vnf(
device_dict, action_cb)
LOG.debug('hosting_vnf: %s', hosting_vnf)
self._vnf_monitor.add_hosting_vnf(hosting_vnf)
def config_device(self, context, device_dict):
config = device_dict['attributes'].get('config')
@ -391,7 +381,7 @@ class VNFMPlugin(vm_db.VNFMPluginDb, VNFMMgmtMixin):
def delete_device(self, context, device_id):
device_dict = self._delete_device_pre(context, device_id)
self._device_status.delete_hosting_device(device_id)
self._vnf_monitor.delete_hosting_vnf(device_id)
driver_name = self._infra_driver_name(device_dict)
instance_id = self._instance_id(device_dict)