diff --git a/tacker/db/vm/vm_db.py b/tacker/db/vm/vm_db.py index 7d79be6f7..bf42b0ee6 100644 --- a/tacker/db/vm/vm_db.py +++ b/tacker/db/vm/vm_db.py @@ -27,6 +27,7 @@ from sqlalchemy import orm from sqlalchemy.orm import exc as orm_exc from tacker.api.v1 import attributes +from tacker import context as t_context from tacker.db import api as qdbapi from tacker.db import db_base from tacker.db import model_base @@ -38,6 +39,8 @@ from tacker.plugins.common import constants LOG = logging.getLogger(__name__) _ACTIVE_UPDATE = (constants.ACTIVE, constants.PENDING_UPDATE) +_ACTIVE_UPDATE_DEAD = ( + constants.ACTIVE, constants.PENDING_UPDATE, constants.DEAD) ########################################################################### @@ -574,13 +577,12 @@ class ServiceResourcePluginDb(servicevm.ServiceVMPluginBase, filter(Device.status == constants.PENDING_CREATE). update({'status': new_status})) - def _get_device_db(self, context, device_id, new_status): + def _get_device_db(self, context, device_id, current_statuses, new_status): try: device_db = ( self._model_query(context, Device). filter(Device.id == device_id). - filter(Device.status.in_(_ACTIVE_UPDATE)). - filter(Device.status == constants.ACTIVE). + filter(Device.status.in_(current_statuses)). with_lockmode('update').one()) except orm_exc.NoResultFound: raise servicevm.DeviceNotFound(device_id=device_id) @@ -591,8 +593,8 @@ class ServiceResourcePluginDb(servicevm.ServiceVMPluginBase, def _update_device_pre(self, context, device_id): with context.session.begin(subtransactions=True): - device_db = self._get_device_db(context, device_id, - constants.PENDING_UPDATE) + device_db = self._get_device_db( + context, device_id, _ACTIVE_UPDATE, constants.PENDING_UPDATE) return self._make_device_dict(device_db) def _update_device_post(self, context, device_id, new_status): @@ -609,8 +611,9 @@ class ServiceResourcePluginDb(servicevm.ServiceVMPluginBase, filter_by(device_id=device_id).first()) if binding_db is not None: raise servicevm.DeviceInUse(device_id=device_id) - device_db = self._get_device_db(context, device_id, - constants.PENDING_DELETE) + device_db = self._get_device_db( + context, device_id, _ACTIVE_UPDATE_DEAD, + constants.PENDING_DELETE) return self._make_device_dict(device_db) @@ -668,6 +671,29 @@ class ServiceResourcePluginDb(servicevm.ServiceVMPluginBase, return self._get_collection(context, Device, self._make_device_dict, filters=filters, fields=fields) + def _mark_device_dead(self, device_id): + context = t_context.get_admin_context() + EXCLUDE_STATUS = [ + constants.DOWN, + constants.PENDING_CREATE, + constants.PENDING_UPDATE, + constants.PENDING_DELETE, + constants.INACTIVE, + constants.ERROR] + with context.session.begin(subtransactions=True): + try: + device_db = ( + self._model_query(context, Device). + filter(Device.id == device_id). + filter(~Device.status.in_(EXCLUDE_STATUS)). + with_lockmode('update').one()) + except orm_exc.NoResultFound: + LOG.warn(_('no device found %s'), device_id) + return False + + device_db.update({'status': constants.DEAD}) + return True + ########################################################################### # logical service instance diff --git a/tacker/plugins/common/constants.py b/tacker/plugins/common/constants.py index 513be13a0..81eaf1038 100644 --- a/tacker/plugins/common/constants.py +++ b/tacker/plugins/common/constants.py @@ -39,6 +39,7 @@ PENDING_CREATE = "PENDING_CREATE" PENDING_UPDATE = "PENDING_UPDATE" PENDING_DELETE = "PENDING_DELETE" INACTIVE = "INACTIVE" +DEAD = "DEAD" ERROR = "ERROR" ACTIVE_PENDING_STATUSES = ( diff --git a/tacker/vm/monitor.py b/tacker/vm/monitor.py new file mode 100644 index 000000000..ff10eac98 --- /dev/null +++ b/tacker/vm/monitor.py @@ -0,0 +1,76 @@ +# Copyright 2015 Intel Corporation. +# Copyright 2015 Isaku Yamahata +# +# All Rights Reserved. +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Isaku Yamahata, Intel Corporation. + +import abc + +import six + +from tacker import context as t_context +from tacker.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +@six.add_metaclass(abc.ABCMeta) +class FailurePolicy(object): + @abc.abstractmethod + @classmethod + def on_failure(cls, plugin, device_dict): + pass + + _POLICIES = {} + + @staticmethod + def register(policy): + def _register(cls): + cls._POLICIES[policy] = cls + return cls + return _register + + @classmethod + def get_policy(cls, policy): + return cls._POLICIES.get(policy) + + +@FailurePolicy.register('respawn') +class Respawn(FailurePolicy): + @classmethod + def on_failure(cls, plugin, device_dict): + LOG.error(_('device %(device_id)s dead'), device_dict['id']) + attributes = device_dict['attributes'].copy() + attributes['dead_device_id'] = device_dict['id'] + new_device = { + 'tenant_id': device_dict['tenant_id'], + 'template_id': device_dict['template_id'], + 'attributes': attributes, + } + new_device_dict = plugin.create_device( + t_context.get_admin_context(), new_device) + LOG.info(_('respawned new device %s'), new_device_dict['id']) + + +@FailurePolicy.register('log_and_kill') +class LogAndKill(FailurePolicy): + @classmethod + def on_failure(cls, plugin, device_dict): + device_id = device_dict['id'] + LOG.error(_('device %(device_id)s dead'), device_id) + plugin.delete_device(t_context.get_admin_context(), device_id) diff --git a/tacker/vm/plugin.py b/tacker/vm/plugin.py index 3d8eaf99b..b120306b4 100644 --- a/tacker/vm/plugin.py +++ b/tacker/vm/plugin.py @@ -38,6 +38,7 @@ from tacker.openstack.common import excutils from tacker.openstack.common import log as logging from tacker.plugins.common import constants from tacker.vm.mgmt_drivers import constants as mgmt_constants +from tacker.vm import monitor from tacker.vm import proxy_api LOG = logging.getLogger(__name__) @@ -61,7 +62,7 @@ class DeviceStatus(object): """Device status""" _instance = None - _hosting_devices = [] + _hosting_devices = dict() # device_id => dict of parameters _status_check_intvl = 0 _lock = threading.Lock() @@ -79,25 +80,34 @@ class DeviceStatus(object): while(1): time.sleep(self._status_check_intvl) with self._lock: - for hosting_device in self._hosting_devices: - self.is_hosting_device_reachable(hosting_device) + for hosting_device in self._hosting_devices.values(): + if not hosting_device.get('dead', False): + self.is_hosting_device_reachable(hosting_device) + + @staticmethod + def to_hosting_device(device_dict, down_cb): + return { + 'id': device_dict['id'], + 'management_ip_address': device_dict['mgmt_url'], + 'down_cb': down_cb, + } def add_hosting_device(self, new_device): - LOG.debug('Adding host %(host)s, Mgmt IP %(ip)', - {'host': new_device['host'], + LOG.debug('Adding host %(device_id)s, Mgmt IP %(ip)', + {'id': new_device['id'], 'ip': new_device['management_ip_address']}) with self._lock: - self._hosting_devices.append(new_device) + self._hosting_devices[new_device['id']] = new_device - def delete_hosting_device(self, del_device): - LOG.debug('Adding host %(host)s, Mgmt IP %(ip)', - {'host': del_device['host'], - 'ip': del_device['management_ip_address']}) + def delete_hosting_device(self, device_id): + LOG.debug('deleting device_id %(device_id)s, Mgmt IP %(ip)', + {'device_id': device_id}) with self._lock: - for hosting_device in self._hosting_devices: - if hosting_device == del_device: - self._hosting_devices.remove(del_device) - break + hosting_device = self._hosting_devices.pop(device_id, None) + if hosting_device: + LOG.debug('deleting device_id %(device_id)s, Mgmt IP %(ip)', + {'device_id': device_id, + 'ip': hosting_device['management_ip_address']}) def is_hosting_device_reachable(self, hosting_device): """Check the hosting device which hosts this resource is reachable. @@ -107,19 +117,21 @@ class DeviceStatus(object): :param hosting_device : dict of the hosting device :return True if device is reachable, else None """ - hd_down_cb = hosting_device['down_cb'] if _is_pingable(hosting_device['management_ip_address']) == 0: - LOG.debug('Host %(host)s:%(ip), is reachable', - {'host': hosting_device['host'], + LOG.debug('Host %(id)s:%(ip), is reachable', + {'id': hosting_device['id'], 'ip': hosting_device['management_ip_address']}) return True else: - LOG.debug('Host %(host)s:%(ip), is unreachable', - {'host': hosting_device['host'], - 'ip': hosting_device['management_ip_address']}) - hd_down_cb(hosting_device) + LOG.debug('Host %(id)s:%(ip), is unreachable', + {'id': hosting_device['id'], + 'ip': hosting_device['management_ip_address']}) + hosting_device['down_cb'](hosting_device) return False + def mark_dead(self, device_id): + self._hosting_device['id']['dead'] = True + class ServiceVMMgmtMixin(object): OPTS = [ @@ -329,6 +341,25 @@ class ServiceVMPlugin(vm_db.ServiceResourcePluginDb, ServiceVMMgmtMixin): device_dict['status'] = new_status self._create_device_status(context, device_id, new_status) + if device_dict['attributes']['monitoring_policy'] == 'ping': + device_status = DeviceStatus() + device_dict_copy = device_dict.copy() + + def down_cb(): + if self._mark_device_dead(device_id): + device_status.mark_dead(device_id) + failure_cls = monitor.FailurePolicy.get_policy( + device_dict['attributes']['failure_policy']) + if failure_cls: + failure_cls.on_failure(device_dict_copy) + + hosting_device = device_status.to_hosting_device( + device_dict, down_cb) + KEY_LIST = ('monitoring_policy', 'failure_policy') + for key in KEY_LIST: + hosting_device[key] = device_dict[key] + device_status.add_hosting_device(hosting_device) + def _create_device(self, context, device): device_dict = self._create_device_pre(context, device) device_id = device_dict['id'] @@ -427,6 +458,7 @@ class ServiceVMPlugin(vm_db.ServiceResourcePluginDb, ServiceVMMgmtMixin): def delete_device(self, context, device_id): device_dict = self._delete_device_pre(context, device_id) + DeviceStatus().delete_hosting_device(device_id) driver_name = self._infra_driver_name(device_dict) instance_id = self._instance_id(device_dict)