glue monitor policy and implement failure policy

Change-Id: I18a2a0cd6c46979b73091650e40326f5e6bc4b3c
This commit is contained in:
Isaku Yamahata 2015-05-05 18:36:44 -07:00
parent 96e082a08a
commit 5301817f26
4 changed files with 163 additions and 28 deletions

View File

@ -27,6 +27,7 @@ from sqlalchemy import orm
from sqlalchemy.orm import exc as orm_exc
from tacker.api.v1 import attributes
from tacker import context as t_context
from tacker.db import api as qdbapi
from tacker.db import db_base
from tacker.db import model_base
@ -38,6 +39,8 @@ from tacker.plugins.common import constants
LOG = logging.getLogger(__name__)
_ACTIVE_UPDATE = (constants.ACTIVE, constants.PENDING_UPDATE)
_ACTIVE_UPDATE_DEAD = (
constants.ACTIVE, constants.PENDING_UPDATE, constants.DEAD)
###########################################################################
@ -574,13 +577,12 @@ class ServiceResourcePluginDb(servicevm.ServiceVMPluginBase,
filter(Device.status == constants.PENDING_CREATE).
update({'status': new_status}))
def _get_device_db(self, context, device_id, new_status):
def _get_device_db(self, context, device_id, current_statuses, new_status):
try:
device_db = (
self._model_query(context, Device).
filter(Device.id == device_id).
filter(Device.status.in_(_ACTIVE_UPDATE)).
filter(Device.status == constants.ACTIVE).
filter(Device.status.in_(current_statuses)).
with_lockmode('update').one())
except orm_exc.NoResultFound:
raise servicevm.DeviceNotFound(device_id=device_id)
@ -591,8 +593,8 @@ class ServiceResourcePluginDb(servicevm.ServiceVMPluginBase,
def _update_device_pre(self, context, device_id):
with context.session.begin(subtransactions=True):
device_db = self._get_device_db(context, device_id,
constants.PENDING_UPDATE)
device_db = self._get_device_db(
context, device_id, _ACTIVE_UPDATE, constants.PENDING_UPDATE)
return self._make_device_dict(device_db)
def _update_device_post(self, context, device_id, new_status):
@ -609,8 +611,9 @@ class ServiceResourcePluginDb(servicevm.ServiceVMPluginBase,
filter_by(device_id=device_id).first())
if binding_db is not None:
raise servicevm.DeviceInUse(device_id=device_id)
device_db = self._get_device_db(context, device_id,
constants.PENDING_DELETE)
device_db = self._get_device_db(
context, device_id, _ACTIVE_UPDATE_DEAD,
constants.PENDING_DELETE)
return self._make_device_dict(device_db)
@ -668,6 +671,29 @@ class ServiceResourcePluginDb(servicevm.ServiceVMPluginBase,
return self._get_collection(context, Device, self._make_device_dict,
filters=filters, fields=fields)
def _mark_device_dead(self, device_id):
context = t_context.get_admin_context()
EXCLUDE_STATUS = [
constants.DOWN,
constants.PENDING_CREATE,
constants.PENDING_UPDATE,
constants.PENDING_DELETE,
constants.INACTIVE,
constants.ERROR]
with context.session.begin(subtransactions=True):
try:
device_db = (
self._model_query(context, Device).
filter(Device.id == device_id).
filter(~Device.status.in_(EXCLUDE_STATUS)).
with_lockmode('update').one())
except orm_exc.NoResultFound:
LOG.warn(_('no device found %s'), device_id)
return False
device_db.update({'status': constants.DEAD})
return True
###########################################################################
# logical service instance

View File

@ -39,6 +39,7 @@ PENDING_CREATE = "PENDING_CREATE"
PENDING_UPDATE = "PENDING_UPDATE"
PENDING_DELETE = "PENDING_DELETE"
INACTIVE = "INACTIVE"
DEAD = "DEAD"
ERROR = "ERROR"
ACTIVE_PENDING_STATUSES = (

76
tacker/vm/monitor.py Normal file
View File

@ -0,0 +1,76 @@
# Copyright 2015 Intel Corporation.
# Copyright 2015 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
import abc
import six
from tacker import context as t_context
from tacker.openstack.common import log as logging
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class FailurePolicy(object):
@abc.abstractmethod
@classmethod
def on_failure(cls, plugin, device_dict):
pass
_POLICIES = {}
@staticmethod
def register(policy):
def _register(cls):
cls._POLICIES[policy] = cls
return cls
return _register
@classmethod
def get_policy(cls, policy):
return cls._POLICIES.get(policy)
@FailurePolicy.register('respawn')
class Respawn(FailurePolicy):
@classmethod
def on_failure(cls, plugin, device_dict):
LOG.error(_('device %(device_id)s dead'), device_dict['id'])
attributes = device_dict['attributes'].copy()
attributes['dead_device_id'] = device_dict['id']
new_device = {
'tenant_id': device_dict['tenant_id'],
'template_id': device_dict['template_id'],
'attributes': attributes,
}
new_device_dict = plugin.create_device(
t_context.get_admin_context(), new_device)
LOG.info(_('respawned new device %s'), new_device_dict['id'])
@FailurePolicy.register('log_and_kill')
class LogAndKill(FailurePolicy):
@classmethod
def on_failure(cls, plugin, device_dict):
device_id = device_dict['id']
LOG.error(_('device %(device_id)s dead'), device_id)
plugin.delete_device(t_context.get_admin_context(), device_id)

View File

@ -38,6 +38,7 @@ from tacker.openstack.common import excutils
from tacker.openstack.common import log as logging
from tacker.plugins.common import constants
from tacker.vm.mgmt_drivers import constants as mgmt_constants
from tacker.vm import monitor
from tacker.vm import proxy_api
LOG = logging.getLogger(__name__)
@ -61,7 +62,7 @@ class DeviceStatus(object):
"""Device status"""
_instance = None
_hosting_devices = []
_hosting_devices = dict() # device_id => dict of parameters
_status_check_intvl = 0
_lock = threading.Lock()
@ -79,25 +80,34 @@ class DeviceStatus(object):
while(1):
time.sleep(self._status_check_intvl)
with self._lock:
for hosting_device in self._hosting_devices:
self.is_hosting_device_reachable(hosting_device)
for hosting_device in self._hosting_devices.values():
if not hosting_device.get('dead', False):
self.is_hosting_device_reachable(hosting_device)
@staticmethod
def to_hosting_device(device_dict, down_cb):
return {
'id': device_dict['id'],
'management_ip_address': device_dict['mgmt_url'],
'down_cb': down_cb,
}
def add_hosting_device(self, new_device):
LOG.debug('Adding host %(host)s, Mgmt IP %(ip)',
{'host': new_device['host'],
LOG.debug('Adding host %(device_id)s, Mgmt IP %(ip)',
{'id': new_device['id'],
'ip': new_device['management_ip_address']})
with self._lock:
self._hosting_devices.append(new_device)
self._hosting_devices[new_device['id']] = new_device
def delete_hosting_device(self, del_device):
LOG.debug('Adding host %(host)s, Mgmt IP %(ip)',
{'host': del_device['host'],
'ip': del_device['management_ip_address']})
def delete_hosting_device(self, device_id):
LOG.debug('deleting device_id %(device_id)s, Mgmt IP %(ip)',
{'device_id': device_id})
with self._lock:
for hosting_device in self._hosting_devices:
if hosting_device == del_device:
self._hosting_devices.remove(del_device)
break
hosting_device = self._hosting_devices.pop(device_id, None)
if hosting_device:
LOG.debug('deleting device_id %(device_id)s, Mgmt IP %(ip)',
{'device_id': device_id,
'ip': hosting_device['management_ip_address']})
def is_hosting_device_reachable(self, hosting_device):
"""Check the hosting device which hosts this resource is reachable.
@ -107,19 +117,21 @@ class DeviceStatus(object):
:param hosting_device : dict of the hosting device
:return True if device is reachable, else None
"""
hd_down_cb = hosting_device['down_cb']
if _is_pingable(hosting_device['management_ip_address']) == 0:
LOG.debug('Host %(host)s:%(ip), is reachable',
{'host': hosting_device['host'],
LOG.debug('Host %(id)s:%(ip), is reachable',
{'id': hosting_device['id'],
'ip': hosting_device['management_ip_address']})
return True
else:
LOG.debug('Host %(host)s:%(ip), is unreachable',
{'host': hosting_device['host'],
'ip': hosting_device['management_ip_address']})
hd_down_cb(hosting_device)
LOG.debug('Host %(id)s:%(ip), is unreachable',
{'id': hosting_device['id'],
'ip': hosting_device['management_ip_address']})
hosting_device['down_cb'](hosting_device)
return False
def mark_dead(self, device_id):
self._hosting_device['id']['dead'] = True
class ServiceVMMgmtMixin(object):
OPTS = [
@ -329,6 +341,25 @@ class ServiceVMPlugin(vm_db.ServiceResourcePluginDb, ServiceVMMgmtMixin):
device_dict['status'] = new_status
self._create_device_status(context, device_id, new_status)
if device_dict['attributes']['monitoring_policy'] == 'ping':
device_status = DeviceStatus()
device_dict_copy = device_dict.copy()
def down_cb():
if self._mark_device_dead(device_id):
device_status.mark_dead(device_id)
failure_cls = monitor.FailurePolicy.get_policy(
device_dict['attributes']['failure_policy'])
if failure_cls:
failure_cls.on_failure(device_dict_copy)
hosting_device = device_status.to_hosting_device(
device_dict, down_cb)
KEY_LIST = ('monitoring_policy', 'failure_policy')
for key in KEY_LIST:
hosting_device[key] = device_dict[key]
device_status.add_hosting_device(hosting_device)
def _create_device(self, context, device):
device_dict = self._create_device_pre(context, device)
device_id = device_dict['id']
@ -427,6 +458,7 @@ class ServiceVMPlugin(vm_db.ServiceResourcePluginDb, ServiceVMMgmtMixin):
def delete_device(self, context, device_id):
device_dict = self._delete_device_pre(context, device_id)
DeviceStatus().delete_hosting_device(device_id)
driver_name = self._infra_driver_name(device_dict)
instance_id = self._instance_id(device_dict)