diff --git a/masakarimonitors/cmd/introspectiveinstancemonitor.py b/masakarimonitors/cmd/introspectiveinstancemonitor.py new file mode 100644 index 0000000..6d15ec9 --- /dev/null +++ b/masakarimonitors/cmd/introspectiveinstancemonitor.py @@ -0,0 +1,38 @@ +# Copyright(c) 2018 WindRiver Systems +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Starter script for Masakari Introspective Instance Monitor.""" + +import sys + +from oslo_log import log as logging + +import masakarimonitors.conf +from masakarimonitors import config +from masakarimonitors import service +from masakarimonitors import utils + + +CONF = masakarimonitors.conf.CONF + + +def main(): + config.parse_args(sys.argv) + logging.setup(CONF, "masakarimonitors") + utils.monkey_patch() + + server = service.Service.create( + binary='masakarimonitors-introspectiveinstancemonitor') + service.serve(server) + service.wait() diff --git a/masakarimonitors/conf/__init__.py b/masakarimonitors/conf/__init__.py index f72ba9f..0b2d9c0 100644 --- a/masakarimonitors/conf/__init__.py +++ b/masakarimonitors/conf/__init__.py @@ -17,6 +17,7 @@ from masakarimonitors.conf import api from masakarimonitors.conf import base from masakarimonitors.conf import host from masakarimonitors.conf import instance +from masakarimonitors.conf import introspectiveinstancemonitor from masakarimonitors.conf import process from masakarimonitors.conf import service @@ -26,5 +27,6 @@ api.register_opts(CONF) base.register_opts(CONF) host.register_opts(CONF) instance.register_opts(CONF) +introspectiveinstancemonitor.register_opts(CONF) process.register_opts(CONF) service.register_opts(CONF) diff --git a/masakarimonitors/conf/introspectiveinstancemonitor.py b/masakarimonitors/conf/introspectiveinstancemonitor.py new file mode 100644 index 0000000..96e3604 --- /dev/null +++ b/masakarimonitors/conf/introspectiveinstancemonitor.py @@ -0,0 +1,67 @@ +# Copyright(c) 2018 WindRiver Systems +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg + + +# Note: this string is being used for regex parsing later with re module. +# +# Use Python's raw string notation for regular expressions and +# uses the backslash character ('\') to indicate special +# forms or to allow special characters to be used without invoking +# their special meaning. +SOCK = r'/var/lib/libvirt/qemu/org\.qemu\.guest_agent\..*\.instance-.*\.sock' + +monitor_opts = [ + cfg.IntOpt('guest_monitoring_interval', + default=10, + help=''' +Guest monitoring interval of VM status (in seconds). +* The value should not be too low as there should not be false negative +* for reporting QEMU_GUEST_AGENT failures +* VM needs time to do powering-off. +* guest_monitoring_interval should be greater than +* the time to SHUTDOWN VM gracefully. +* e.g. | 565da9ba-3c0c-4087-83ca | iim1 | ACTIVE | powering-off | Running +'''), + cfg.IntOpt('guest_monitoring_timeout', + default=2, + help='Guest monitoring timeout (in seconds).'), + cfg.IntOpt('guest_monitoring_failure_threshold', + default=3, + help='Failure threshold before sending notification.'), + cfg.StrOpt('qemu_guest_agent_sock_path', + default=SOCK, + help=''' +* The file path of qemu guest agent sock. +* Please use Python raw string notation as regular expressions. +e.g. r'/var/lib/libvirt/qemu/org\.qemu\.guest_agent\..*\.instance-.*\.sock' +'''), + cfg.BoolOpt('callback_paused_event', + default=True, + help=''' +* True: Callback for VM paused events. +* False: Do not callback for VM paused events. +'''), +] + + +def register_opts(conf): + conf.register_opts(monitor_opts, group='introspectiveinstancemonitor') + + +def list_opts(): + return { + 'introspectiveinstancemonitor': monitor_opts + } diff --git a/masakarimonitors/conf/service.py b/masakarimonitors/conf/service.py index e2fab7d..4c3eb19 100644 --- a/masakarimonitors/conf/service.py +++ b/masakarimonitors/conf/service.py @@ -30,6 +30,10 @@ Possible values: default='masakarimonitors.instancemonitor.instance' '.InstancemonitorManager', help='Full class name for the Manager for instancemonitor.'), + cfg.StrOpt('introspectiveinstancemonitor_manager', + default='masakarimonitors.introspectiveinstancemonitor.instance' + '.IntrospectiveInstanceMonitorManager', + help='Full class name for introspectiveinstancemonitor.'), cfg.StrOpt('processmonitor_manager', default='masakarimonitors.processmonitor.process' '.ProcessmonitorManager', diff --git a/masakarimonitors/introspectiveinstancemonitor/README.rst b/masakarimonitors/introspectiveinstancemonitor/README.rst new file mode 100644 index 0000000..6568ba9 --- /dev/null +++ b/masakarimonitors/introspectiveinstancemonitor/README.rst @@ -0,0 +1,78 @@ +========================================= +masakarimonitors-introspectiveinstancemonitor +========================================= + +Introspective instance monitor for Masakari +---------------------------------------- +- masakarimonitors-introspectiveinstancemonitor, provides Virtual Machine + High Availability (VMHA) service for OpenStack clouds by automatically + detecting the system-level failure events via QEMU Guest Agent. If it + detects VM heartbeat failure events, it sends notifications to the + masakari-api. + + +- Based on the QEMU Guest Agent, + masakarimonitors-introspectiveinstancemonitor aims to provide access + to a system-level agent via standard qemu-ga protocol + + +How does it work? +---------------------------------------- +- libvirt and QEMU Guest Agent are used as the underlying protocol for + messaging to and from VM. + + - The host-side qemu-agent sockets are used to detemine whether VMs are + configured with QEMU Guest Agent. + + - qemu-guest-ping is used as the monitoring heartbeat. + +- For the future release, we can pass through arbitrary guest agent commands + to check the health of the applications inside a VM. + +QEMU Guest Agent Installation notes +---------------------------------------- + +- Set image property: hw_qemu_guest_agent=yes. + + - This tells NOVA to setup the virtual serial interface thru QEMU to VM + + - e.g. + + $ openstack image create --public --disk-format qcow2 --container-format + bare --file ~ubuntu/xenial-server-cloudimg-amd64-disk1.img --public + --property hw_qemu_guest_agent=yes xenial-server-cloudimg + +* Inside VM:: + + $ sudo apt-get install qemu-guest-agent + $ sudo systemctl start qemu-guest-agent + $ ubuntu@test:~$ ps -ef | fgrep qemu + $ ... /usr/sbin/qemu-ga --daemonize -m virtio-serial -p /dev/virtio-ports/org.qemu.guest_agent.0 + $ ubuntu@test:~$ ls /dev/virtio-ports/ + $ org.qemu.guest_agent.0 + + +Configure masakarimonitors-introspectiveinstancemonitor +---------------------------------------------- +#. Clone masakari using:: + + $ git clone https://github.com/openstack/masakari-monitors.git + +#. Create masakarimonitors directory in /etc/. + +#. Run setup.py from masakari-monitors:: + + $ sudo python setup.py install + +#. Copy masakarimonitors.conf and process_list.yaml files from + masakari-monitors/etc/ to /etc/masakarimonitors folder and make necessary + changes to the masakarimonitors.conf and process_list.yaml files. + To generate the sample masakarimonitors.conf file, run the following + command from the top level of the masakari-monitors directory:: + + $ tox -egenconfig + +#. To run masakari-introspectiveinstancemonitor simply use following binary:: + + $ masakari-introspectiveinstancemonitor + diff --git a/masakarimonitors/introspectiveinstancemonitor/__init__.py b/masakarimonitors/introspectiveinstancemonitor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/masakarimonitors/introspectiveinstancemonitor/cache.py b/masakarimonitors/introspectiveinstancemonitor/cache.py new file mode 100644 index 0000000..4941e7b --- /dev/null +++ b/masakarimonitors/introspectiveinstancemonitor/cache.py @@ -0,0 +1,123 @@ +# Copyright (c) 2018 WindRiver Systems +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +The code related to integration between oslo.cache module and masakarimonitors. + +Use 'oslo_cache.dict' + +i.e. dogpile.cache backend that uses dictionary for storage +Dogpile consists of two subsystems, one building on top of the other. +dogpile provides the concept of a dogpile lock, a control structure +which allows a single thread of execution to be selected as the creator +of some resource, while allowing other threads of execution to refer +to the previous version of this resource as the creation proceeds; +if there is no previous version, then those threads block until +the object is available. +""" +from oslo_cache import core +from oslo_config import cfg + +from masakarimonitors.i18n import _ + +WEEK = 604800 + + +def register_cache_configurations(conf): + """Register all configurations required for oslo.cache. + + The procedure registers all configurations required for oslo.cache. + It should be called before configuring of cache region + + :param conf: instance of configuration + :returns: updated configuration + """ + # register global configurations for caching + core.configure(conf) + + # register specific configurations + constraint_cache_group = cfg.OptGroup('constraint_validation_cache') + constraint_cache_opts = [ + cfg.IntOpt('expiration_time', default=WEEK, + help=_( + 'TTL, in seconds, for any cached item in the ' + 'dogpile.cache region used for caching of validation ' + 'constraints.')), + cfg.BoolOpt("caching", default=True, + help=_( + 'Toggle to enable/disable caching when Orchestration ' + 'Engine validates property constraints of stack.' + 'During property validation with constraints ' + 'Orchestration Engine caches requests to other ' + 'OpenStack services. Please note that the global ' + 'toggle for oslo.cache(enabled=True in [cache] group) ' + 'must be enabled to use this feature.')) + ] + conf.register_group(constraint_cache_group) + conf.register_opts(constraint_cache_opts, group=constraint_cache_group) + + extension_cache_group = cfg.OptGroup('service_extension_cache') + extension_cache_opts = [ + cfg.IntOpt('expiration_time', default=WEEK, + help=_( + 'TTL, in seconds, for any cached item in the ' + 'dogpile.cache region used for caching of service ' + 'extensions.')), + cfg.BoolOpt('caching', default=True, + help=_( + 'Toggle to enable/disable caching when Orchestration ' + 'Engine retrieves extensions from other OpenStack ' + 'services. Please note that the global toggle for ' + 'oslo.cache(enabled=True in [cache] group) must be ' + 'enabled to use this feature.')) + ] + conf.register_group(extension_cache_group) + conf.register_opts(extension_cache_opts, group=extension_cache_group) + + find_cache_group = cfg.OptGroup('resource_finder_cache') + find_cache_opts = [ + cfg.IntOpt('expiration_time', default=WEEK, + help=_( + 'TTL, in seconds, for any cached item in the ' + 'dogpile.cache region used for caching of OpenStack ' + 'service finder functions.')), + cfg.BoolOpt('caching', default=True, + help=_( + 'Toggle to enable/disable caching when Orchestration ' + 'Engine looks for other OpenStack service resources ' + 'using name or id. Please note that the global ' + 'toggle for oslo.cache(enabled=True in [cache] group) ' + 'must be enabled to use this feature.')) + ] + conf.register_group(find_cache_group) + conf.register_opts(find_cache_opts, group=find_cache_group) + + return conf + + +# variable that stores an initialized cache region +_REGION = None + + +def get_cache_region(): + global _REGION + if not _REGION: + _REGION = core.create_region() + _REGION.configure('oslo_cache.dict', + arguments={'expiration_time': WEEK}) + core.configure_cache_region( + conf=register_cache_configurations(cfg.CONF), + region=_REGION) + return _REGION diff --git a/masakarimonitors/introspectiveinstancemonitor/instance.py b/masakarimonitors/introspectiveinstancemonitor/instance.py new file mode 100644 index 0000000..3f09f3f --- /dev/null +++ b/masakarimonitors/introspectiveinstancemonitor/instance.py @@ -0,0 +1,176 @@ +# Copyright (c) 2018 WindRiver Systems +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import eventlet +import libvirt +import socket +import sys +import threading +import time + +from oslo_config import cfg +from oslo_log import log as oslo_logging +from oslo_utils import excutils +from oslo_utils import timeutils + +from masakarimonitors.introspectiveinstancemonitor import qemu_utils +from masakarimonitors.introspectiveinstancemonitor import scheduler +from masakarimonitors import manager +from masakarimonitors.objects import event_constants as ec + +LOG = oslo_logging.getLogger(__name__) +CONF = cfg.CONF + + +class IntrospectiveInstanceMonitorManager(manager.Manager): + + def __init__(self, *args, **kwargs): + self.init_tgm() + super(IntrospectiveInstanceMonitorManager, self).__init__( + service_name="introspectiveinstancemonitor", *args, **kwargs) + # This keeps track of what thread is running the event loop, + # (if it is run in a background thread) + self.event_loop_thread = None + + def _reset_journal(self, eventID, eventType, detail, uuID): + """To reset the monitoring to discovery stage + + :param event_id: Event ID + :param event_type: Event type + :param detail: Event code + :param domain_uuid: Uuid + """ + + noticeType = ec.EventConstants.TYPE_VM + hostname = socket.gethostname() + currentTime = timeutils.utcnow() + + def _reset_handler(event_id, event_type, detail, domain_uuid, msg): + """Reset monitoring + + To reset monitoring to discovery stage for the following event: + libvirt.VIR_DOMAIN_EVENT_STARTED + """ + + if (event_id == libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE) and \ + (event_type == libvirt.VIR_DOMAIN_EVENT_STARTED): + LOG.debug(msg) + qemu_utils.resetJournal(domain_uuid) + + # All Event Output if debug mode is on. + msg = "libvirt Event Received.type = %s \ + hostname = %s uuid = %s time = %s eventID = %d eventType = %d \ + detail = %d" % ( + noticeType, + hostname, uuID, currentTime, eventID, + eventType, detail) + + LOG.debug("%s", msg) + + try: + thread = threading.Thread( + _reset_handler(eventID, eventType, detail, uuID, msg) + ) + thread.start() + except KeyError as e: + LOG.error("virEventFilter KeyError: {0}".format(e)) + except IndexError as e: + LOG.error("virEventFilter IndexError: {0}".format(e)) + except TypeError as e: + LOG.error("virEventFilter TypeError: {0}".format(e)) + except Exception: + with excutils.save_and_reraise_exception(): + LOG.error("Unexpected error: %s" % sys.exc_info()[0]) + + def init_tgm(self): + """Manages the masakari-introspectiveinstancemonitor.""" + self.TG = scheduler.ThreadGroupManager() + + def _vir_event_loop_native_run(self): + # Directly run the event loop in the current thread + while True: + libvirt.virEventRunDefaultImpl() + + def _vir_event_loop_native_start(self): + libvirt.virEventRegisterDefaultImpl() + self.event_loop_thread = threading.Thread( + target=self._vir_event_loop_native_run, + name="lib_virt_eventLoop") + self.event_loop_thread.setDaemon(True) + self.event_loop_thread.start() + + def _my_domain_event_callback(self, conn, dom, event, detail, opaque): + self._reset_journal(libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE, + event, detail, dom.UUIDString()) + + def _my_domain_event_reboot_callback(self, conn, dom, opaque): + self._reset_journal(libvirt.VIR_DOMAIN_EVENT_ID_REBOOT, + -1, -1, dom.UUIDString()) + + def _err_handler(self, ctxt, err): + LOG.warning("Error from libvirt : %s", err[2]) + + def _virt_event(self, uri): + # Run a background thread with the event loop + self._vir_event_loop_native_start() + + event_callback_handlers = { + libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE: + self._my_domain_event_callback, + libvirt.VIR_DOMAIN_EVENT_ID_REBOOT: + self._my_domain_event_reboot_callback + } + # Connect to libvirt - If be disconnected, reprocess. + self.running = True + while self.running: + vc = libvirt.openReadOnly(uri) + + # Event callback settings + callback_ids = [] + for event, callback in event_callback_handlers.items(): + cid = vc.domainEventRegisterAny(None, event, callback, None) + callback_ids.append(cid) + + # Connection monitoring. + vc.setKeepAlive(5, 3) + while vc.isAlive() == 1 and self.running: + eventlet.greenthread.sleep(1) + + # If connection between libvirtd was lost, + # clear callback connection. + LOG.warning("Libvirt Connection Closed Unexpectedly.") + for cid in callback_ids: + try: + vc.domainEventDeregisterAny(cid) + except Exception: + pass + vc.close() + del vc + time.sleep(3) + + def stop(self): + self.running = False + + def main(self): + """Main method. + + Set the URI, error handler, and executes event loop processing. + """ + + uri = CONF.libvirt.connection_uri + LOG.debug("Using uri:" + uri) + + # set error handler & do event loop + libvirt.registerErrorHandler(self._err_handler, '_virt_event') + self._virt_event(uri) diff --git a/masakarimonitors/introspectiveinstancemonitor/qemu_utils.py b/masakarimonitors/introspectiveinstancemonitor/qemu_utils.py new file mode 100755 index 0000000..8f1271e --- /dev/null +++ b/masakarimonitors/introspectiveinstancemonitor/qemu_utils.py @@ -0,0 +1,429 @@ +# Copyright (c) 2018 WindRiver Systems +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# Introspective instance monitoring depends on the qemu guest agent to +# monitoring what inside of a VM. +# +# A few items to get in the way of the design: +# +# - After VM is active, it needs time to start qemu-guest-agent. +# Before error/failure is reported, we need a discovery phase to wait +# until VM is guest_pingable. +# +# - Debouncing is needed to enforce that masakari_notifier function +# not calling twice or more for the same failure. +# +# After reported a masakari notification, sent_notification flag will +# need to be reset when there is a QEMU LIFECYCLE event like: +# STARTED_BOOTED +# SUSPENDED_PAUSED + +import eventlet +import libvirt +import libvirtmod_qemu +import logging +import re +import socket +import time +import traceback + +# Machine module contains the state machine logic, state and transition events +from automaton import machines +from dogpile.cache.api import NoValue +from lxml import etree +from oslo_config import cfg +from oslo_utils import timeutils + +from masakarimonitors.ha import masakari +from masakarimonitors.introspectiveinstancemonitor import cache +from masakarimonitors.objects import event_constants as ec +from masakarimonitors import utils + +CONF = cfg.CONF +ICONF = cfg.CONF.introspectiveinstancemonitor + +# The VM QEMU Quest Agent states +# +# discovery = initial state of VM, +# remains in this state until it is determined that +# the VM has a qemu-agent interface enabling intrusive-instance-monitoring +# +# healthy = no failure event is detected +# +# error = An error is recorded on every audit cycle where +# the VM is in the error state +# +# reported = a transient state +# to keep track of reporting of notification +# + +# Transitions +# +# Representation of a transition managed by a ``Machine`` instance. +# source (str): Source state of the transition. +# dest (str): Destination state of the transition. +# trigger (str): The type of triggering event +# that advances to the next state in the sequence. + + +def action_on_enter(new_state, triggered_event): + pass + + +def action_on_exit(old_state, triggered_event): + pass + +STATE_SPACE = [ + { + 'name': 'discovery', + 'next_states': { + 'guest_pingable': 'healthy', + 'guest_not_pingable': 'discovery', + }, + 'on_enter': action_on_enter, + 'on_exit': action_on_exit, + }, + { + 'name': 'healthy', + 'next_states': { + 'guest_pingable': 'healthy', + 'guest_not_pingable': 'error', + }, + 'on_enter': action_on_enter, + 'on_exit': action_on_exit, + }, + { + 'name': 'error', + 'next_states': { + 'report': 'reported', + 'guest_pingable': 'healthy', + 'guest_not_pingable': 'error', + }, + 'on_enter': action_on_enter, + 'on_exit': action_on_exit, + }, + { + 'name': 'reported', + 'next_states': { + 'guest_pingable': 'discovery', + 'guest_not_pingable': 'reported', + }, + 'on_enter': action_on_enter, + 'on_exit': action_on_exit, + }, +] + + +# Journal representation of a managed ``Machine`` instance. +class Journal(machines.FiniteMachine): + + # Factory pattern to create a Journal object + @classmethod + def factory(cls, domain_uuid): + jo = cls.build(STATE_SPACE) + jo.default_start_state = 'discovery' + jo.initialize() + jo.failedCount = 0 + # Conditions to reset sent_notification + jo.sent_notification = False + jo.domain_uuid = domain_uuid + jo.lastUsed = time.time() + LOG.debug(str(domain_uuid) + ':Journal:__init__:') + return jo + + def resetState(self): + self.default_start_state = 'discovery' + self.initialize() + self.failedCount = 0 + self.sent_notification = False + LOG.debug(str(self.domain_uuid) + '__resetState__:') + + def processEvent(self, event): + self.process_event(event) + self.lastUsed = time.time() + + def getFailedCount(self): + return self.failedCount + + def incrementFailedCount(self): + if (self.current_state == 'error'): + self.failedCount += 1 + + def resetFailedCount(self): + self.failedCount = 0 + + def setSentNotification(self, boolean): + self.sent_notification = boolean + + def getSentNotification(self): + return self.sent_notification + + +# libvirt state verbose dictionary +STATES = { + libvirt.VIR_DOMAIN_NOSTATE: 'no state', + libvirt.VIR_DOMAIN_RUNNING: 'running', + libvirt.VIR_DOMAIN_BLOCKED: 'blocked on resource', + libvirt.VIR_DOMAIN_PAUSED: 'paused by user', + libvirt.VIR_DOMAIN_SHUTDOWN: 'being shut down', + libvirt.VIR_DOMAIN_SHUTOFF: 'shut off', + libvirt.VIR_DOMAIN_CRASHED: 'crashed', +} + +LOG = logging.getLogger(__name__) + + +def get_function_name(): + return traceback.extract_stack(None, 2)[0][2] + + +# To reset journal object by domain uuid +def resetJournal(domain_uuid): + # To get the VM Journal object from the dictionary + # :param domain: QEMU domain UUID + dict = cache.get_cache_region() + jo = None + if type(dict.get(domain_uuid)) is NoValue: + jo = Journal.factory(domain_uuid) + dict.set(domain_uuid, jo) + else: + jo = dict.get(domain_uuid) + + jo.resetState() + + +# Qemu guest agent is used to check VM status +# The checking pre-conditions are as follows: +# - VM is running +# - VM has Qemu guest agent installed +# +# then status is determined by +# - VM is guest-agent-pingable or not +# +# Note: checkGuests function is called by the scheduler +class QemuGuestAgent(object): + + def __init__(self): + super(QemuGuestAgent, self).__init__() + self.notifier = masakari.SendNotification() + + # _thresholdsCrossing + # + # We only issue a notification + # to masakari-engine if the VM is 'already' in the error state + # when there are consecutive guest_ping failures. + # Suggested value for guest_monitoring_failure_threshold >= 3 + # + # Note: When operators are trying to gracefully shutdown VM, + # QEMU may take time to powering-off. + # E.g. When you do or + # you may see that QEMU is active but monitoring may fail + # due to VM is still "powering-off" + # Status | Task State | Power State + # ACTIVE | powering-off | Running + def _thresholdsCrossing(self, domain): + if (((self.getVmFsm(domain.UUIDString()).current_state) == 'error') + and + (self._getJournalObject(domain.UUIDString()).getFailedCount() + > ICONF.guest_monitoring_failure_threshold)): + LOG.debug('_thresholdsCrossing:' + domain.UUIDString()) + LOG.debug(self._getJournalObject( + domain.UUIDString()).getFailedCount()) + return True + else: + return False + + def _masakari_notifier(self, domain_uuid): + if self._getJournalObject(domain_uuid).getSentNotification(): + LOG.debug('notifier.send_notification Skipped:' + domain_uuid) + else: + hostname = socket.gethostname() + noticeType = ec.EventConstants.TYPE_VM + current_time = timeutils.utcnow() + event = { + 'notification': { + 'type': noticeType, + 'hostname': hostname, + 'generated_time': current_time, + 'payload': { + 'event': 'QEMU_GUEST_AGENT_ERROR', + 'instance_uuid': domain_uuid, + 'vir_domain_event': 'STOPPED_FAILED' + } + } + } + try: + self.notifier.send_notification(CONF.callback.retry_max, + CONF.callback.retry_interval, + event) + self._getJournalObject(domain_uuid).processEvent('report') + self._getJournalObject(domain_uuid).setSentNotification(True) + except Exception: + LOG.warn('Exception :' + domain_uuid + + ' @ ' + get_function_name()) + pass + + def _qemuAgentGuestPing(self, domain, timeout, flags=0): + def _no_heartbeat(domain_uuid): + # Also advance the FSM + self.getVmFsm(domain_uuid).processEvent('guest_not_pingable') + self._getJournalObject(domain_uuid).incrementFailedCount() + + def _with_heartbeat(domain_uuid): + #The order matters as we want to decrease the counter first + self._getJournalObject(domain_uuid).resetFailedCount() + self.getVmFsm(domain_uuid).processEvent('guest_pingable') + + def _record(result): + if result is None: + LOG.debug(domain.UUIDString() + + '\tqemu-ga_guest-ping is not responding.') + + if self._thresholdsCrossing(domain): + self._masakari_notifier(domain.UUIDString()) + + _no_heartbeat(domain.UUIDString()) + else: + _with_heartbeat(domain.UUIDString()) + + """Send a Guest Agent ping to domain """ + # must pass domain._o to the c library as virDomainPtr + ret = libvirtmod_qemu.virDomainQemuAgentCommand(domain._o, + '{"execute": "guest-ping"}', timeout, flags) + + _record(ret) + + def _getJournalObject(self, domain_uuid): + """Function: To get the dictionary + + :param domain: QEMU domain + :return: the journal object referred by domain_uuid + """ + + dict = cache.get_cache_region() + if type(dict.get(domain_uuid)) is NoValue: + jo = Journal.factory(domain_uuid) + dict.set(domain_uuid, jo) + return jo + else: + return dict.get(domain_uuid) + + def getVmFsm(self, domain_uuid): + """Function: To get the VM Finite State Machine from + the dictionary + + :param domain: QEMU domain + :return: FSM object + """ + dict = cache.get_cache_region() + + if type(dict.get(domain_uuid)) is NoValue: + jo = Journal.factory(domain_uuid) + dict.set(domain_uuid, jo) + return jo + else: + return dict.get(domain_uuid) + + def _hasQemuGuestAgent(self, domain): + """Function: To check whether the VM has an QEMU Guest Agent + by examining the qemu.guest_agent sock + + First check if libvirt is running or not, then sock + + :param domain: QEMU domain + :return: true or false + """ + + def qemuGuestAgentPathMatch(path): + SOCK = ICONF.qemu_guest_agent_sock_path + return re.match('%s' % SOCK, path) + + state, reason = domain.state() + # First check if libvirt is running or not + if state != libvirt.VIR_DOMAIN_RUNNING: + return False + + xmlDesc = domain.XMLDesc() + tree = etree.fromstring(xmlDesc) + ''' Example + + + + +
+ + ''' + try: + source = tree.find("devices/channel/source") + if (source is not None): + mode = source.get('mode') + path = source.get('path') + # There should be a bind for a sock file for qemu guest_agent + if (qemuGuestAgentPathMatch(path) and mode == 'bind'): + return True + except Exception: + pass + + return False + + def checkGuests(self): + """Function: Check QEMU Guests + + Condition: VM under intrusive monitoring must have QEMU agent client + configured, installed and qemu "guest-agent-pingable". + """ + + try: + conn = libvirt.open(None) # LIBVIRT_DEFAULT_URI + ids = conn.listDomainsID() + running = map(conn.lookupByID, ids) + + columns = 3 + + for row in map(None, *[iter(running)] * columns): + for domain in row: + if domain: + try: + if self._hasQemuGuestAgent(domain): + @utils.synchronized(domain.UUIDString()) + def do_qemuAgentGuestPing(domain, timeout): + self._qemuAgentGuestPing(domain, timeout) + do_qemuAgentGuestPing(domain, + ICONF.guest_monitoring_timeout) + except libvirt.libvirtError as le: + LOG.warn(le) + continue + except Exception as e: + LOG.warn(e) + pass + + +def reschedule(action, sleep_time=1): + """Eventlet Sleep for the specified number of seconds. + + :param sleep_time: seconds to sleep; if None, no sleep; + """ + LOG.debug('At reschedule') + if sleep_time is not None: + LOG.debug('Action %s sleep for %s seconds' % ( + action.id, sleep_time)) + eventlet.sleep(sleep_time) + + +def sleep(sleep_time): + """Interface for sleeping.""" + + eventlet.sleep(sleep_time) diff --git a/masakarimonitors/introspectiveinstancemonitor/scheduler.py b/masakarimonitors/introspectiveinstancemonitor/scheduler.py new file mode 100644 index 0000000..e5d08f9 --- /dev/null +++ b/masakarimonitors/introspectiveinstancemonitor/scheduler.py @@ -0,0 +1,110 @@ +# Copyright (c) 2018 WindRiver Systems +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +import logging + +from oslo_config import cfg +from oslo_service import threadgroup + +from masakarimonitors.introspectiveinstancemonitor import qemu_utils + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class ThreadGroupManager(object): + """Thread group manager.""" + + def init_qemu_ga(self): + self.qemuGA = qemu_utils.QemuGuestAgent() + LOG.debug('Started QemuGuestAgent') + + def __init__(self): + self.init_qemu_ga() + super(ThreadGroupManager, self).__init__() + self.threads = {} + self.group = threadgroup.ThreadGroup() + + # Create dummy service task, because when there is nothing queued + # on self.tg the process exits + self.add_timer( + CONF.introspectiveinstancemonitor.guest_monitoring_interval, + self._service_task) + + def _service_task(self): + # This is used to trigger periodic monitoring tasks + + self.qemuGA.checkGuests() + + def start(self, func, *args, **kwargs): + """Run the given method in a sub-thread.""" + + LOG.debug('add_thread') + return self.group.add_thread(func, *args, **kwargs) + + def add_timer(self, interval, func, *args, **kwargs): + """Define a periodic task to be run in the thread group. + + The task will be executed in a separate green thread. + Interval is from cfg.CONF.periodic_interval + """ + + LOG.debug('group.add_timer') + self.group.add_timer(interval, func, *args, **kwargs) + + def stop_timers(self): + self.group.stop_timers() + + def stop(self, graceful=False): + """Stop any active threads belong to this threadgroup.""" + + # Try to stop all threads gracefully + self.group.stop(graceful) + self.group.wait() + + # Wait for link()ed functions (i.e. lock release) + threads = self.group.threads[:] + links_done = dict((th, False) for th in threads) + + def mark_done(gt, th): + links_done[th] = True + + for th in threads: + th.link(mark_done, th) + + while not all(links_done.values()): + eventlet.sleep() + + +def reschedule(action, sleep_time=1): + """Eventlet Sleep for the specified number of seconds. + + :param sleep_time: seconds to sleep; if None, no sleep; + """ + + LOG.debug('At reschedule') + if sleep_time is not None: + LOG.debug('Action %s sleep for %s seconds' % ( + action.id, sleep_time)) + eventlet.sleep(sleep_time) + + +def sleep(sleep_time): + """Interface for sleeping.""" + + LOG.debug('sleep') + + eventlet.sleep(sleep_time) diff --git a/masakarimonitors/tests/unit/introspectiveinstancemonitor/__init__.py b/masakarimonitors/tests/unit/introspectiveinstancemonitor/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/masakarimonitors/tests/unit/introspectiveinstancemonitor/test_monitor_manager.py b/masakarimonitors/tests/unit/introspectiveinstancemonitor/test_monitor_manager.py new file mode 100644 index 0000000..efab397 --- /dev/null +++ b/masakarimonitors/tests/unit/introspectiveinstancemonitor/test_monitor_manager.py @@ -0,0 +1,42 @@ +# Copyright(c) 2018 WindRiver Systems +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import eventlet +import libvirt +import mock +import testtools + +from masakarimonitors.introspectiveinstancemonitor import instance + +eventlet.monkey_patch(os=False) + + +class TestMonitorManager(testtools.TestCase): + + def setUp(self): + super(TestMonitorManager, self).setUp() + + @mock.patch.object(libvirt, 'virEventRunDefaultImpl') + def test_vir_event_loop_native_run(self, mock_virEventRunDefaultImpl): + mock_virEventRunDefaultImpl.side_effect = Exception("Test exception.") + + obj = instance.IntrospectiveInstanceMonitorManager() + exception_flag = False + try: + obj._vir_event_loop_native_run() + except Exception: + exception_flag = True + + self.assertTrue(exception_flag) + mock_virEventRunDefaultImpl.assert_called_once() diff --git a/masakarimonitors/tests/unit/introspectiveinstancemonitor/test_qemu_utils.py b/masakarimonitors/tests/unit/introspectiveinstancemonitor/test_qemu_utils.py new file mode 100755 index 0000000..b4ec728 --- /dev/null +++ b/masakarimonitors/tests/unit/introspectiveinstancemonitor/test_qemu_utils.py @@ -0,0 +1,112 @@ +# Copyright(c) 2018 Nippon Telegraph and Telephone Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import libvirt +import mock +import testtools +import uuid + + +from masakarimonitors.introspectiveinstancemonitor import instance as object +from masakarimonitors.introspectiveinstancemonitor import qemu_utils + + +class TestQemuUtils(testtools.TestCase): + + def setup(self): + super(TestQemuUtils, self).setUp() + + @mock.patch.object(qemu_utils.libvirt, 'virDomain') + def test_getVmFsm(self, mock_domain): + """To test the state machines + + Initial stage should be dicovery and will advance to a + healthy stage after enough pingable events. Also, + it should reach an error stage from healthy after + a pingable event failure. + """ + + reference = qemu_utils.QemuGuestAgent() + mock_domain.UUID.return_value = uuid.uuid4() + reference.getVmFsm(mock_domain) + self.assertEqual(reference.getVmFsm(mock_domain).current_state, + 'discovery') + reference.getVmFsm(mock_domain).process_event('guest_not_pingable') + self.assertEqual(reference.getVmFsm(mock_domain).current_state, + 'discovery') + reference.getVmFsm(mock_domain).process_event('guest_pingable') + self.assertEqual(reference.getVmFsm(mock_domain).current_state, + 'healthy') + reference.getVmFsm(mock_domain).process_event('guest_pingable') + self.assertEqual(reference.getVmFsm(mock_domain).current_state, + 'healthy') + reference.getVmFsm(mock_domain).process_event('guest_not_pingable') + self.assertEqual(reference.getVmFsm(mock_domain).current_state, + 'error') + + @mock.patch.object(qemu_utils.libvirt, 'virDomain') + def test_hasQemuGuestAgent(self, mock_domain): + mock_domain.UUID.return_value = 'testuuid' + mock_domain.state.return_value = libvirt.VIR_DOMAIN_RUNNING, 'reason' + mock_domain.XMLDesc.return_value = """ + + + + + + + + +""" + obj = qemu_utils.QemuGuestAgent() + self.assertFalse(obj._hasQemuGuestAgent(mock_domain)) + mock_domain.XMLDesc.return_value = """ + + + + + + + + + + + +
+ + +""" + obj = qemu_utils.QemuGuestAgent() + self.assertTrue(obj._hasQemuGuestAgent(mock_domain)) + + @mock.patch.object(qemu_utils, 'resetJournal') + def test_resetJournal(self, mock_resetJournal): + + mock_resetJournal.return_value = None + + obj = object.IntrospectiveInstanceMonitorManager() + + event_id = 0 + domain_uuid = uuid.uuid4() + event_type = libvirt.VIR_DOMAIN_EVENT_STARTED + detail = libvirt.VIR_DOMAIN_EVENT_STARTED_BOOTED + + obj._reset_journal(event_id, event_type, detail, domain_uuid) + + mock_resetJournal.assert_called_once_with(domain_uuid) diff --git a/masakarimonitors/utils.py b/masakarimonitors/utils.py index c1dd725..3ef7da1 100644 --- a/masakarimonitors/utils.py +++ b/masakarimonitors/utils.py @@ -21,6 +21,7 @@ import shutil import sys import tempfile +from oslo_concurrency import lockutils from oslo_concurrency import processutils from oslo_log import log as logging from oslo_utils import importutils @@ -111,3 +112,23 @@ def execute(*cmd, **kwargs): return privsep_execute(*cmd, **kwargs) else: return processutils.execute(*cmd, **kwargs) + + +def synchronized(name, semaphores=None, blocking=False): + def wrap(f): + @six.wraps(f) + def inner(*args, **kwargs): + lock_str = 'masakarimonitors-%s' % name + int_lock = lockutils.internal_lock(lock_str, + semaphores=semaphores) + msg = "Lock blocking: %s on resource %s " % (lock_str, f.__name__) + """Acquiring lock: %(lock_str)s on resource """ + if not int_lock.acquire(blocking=blocking): + raise Exception(msg) + try: + return f(*args, **kwargs) + finally: + """Releasing lock: %(lock_str)s on resource """ + int_lock.release() + return inner + return wrap diff --git a/releasenotes/notes/introspectiveinstancemonitor-f4bc71f029b61d49.yaml b/releasenotes/notes/introspectiveinstancemonitor-f4bc71f029b61d49.yaml new file mode 100644 index 0000000..0c6f8c9 --- /dev/null +++ b/releasenotes/notes/introspectiveinstancemonitor-f4bc71f029b61d49.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + Added Introspective Instance Monitoring through QEMU Guest Agent, + in order to detect, report and optionally recover VMs from internal + VM faults. diff --git a/requirements.txt b/requirements.txt index dcdda8f..061cc68 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,10 +2,13 @@ # of appearance. Changing the order has an impact on the overall integration # process, which may cause wedges in the gate later. +automaton>=1.9.0 # Apache-2.0 libvirt-python!=4.1.0,>=3.5.0 # LGPLv2+ openstacksdk>=0.13.0 # Apache-2.0 oslo.concurrency>=3.26.0 # Apache-2.0 oslo.config>=5.2.0 # Apache-2.0 +lxml!=3.7.0,>=3.4.1 # BSD +oslo.cache>=1.26.0 # Apache-2.0 oslo.i18n>=3.15.3 # Apache-2.0 oslo.log>=3.36.0 # Apache-2.0 oslo.middleware>=3.31.0 # Apache-2.0 diff --git a/setup.cfg b/setup.cfg index 188d472..f0a19db 100644 --- a/setup.cfg +++ b/setup.cfg @@ -31,11 +31,13 @@ oslo.config.opts = masakarimonitors.conf = masakarimonitors.conf.opts:list_opts oslo.config.opts.defaults = + masakarimonitors.introspectiveinstancemonitor = masakarimonitors.common.config:set_middleware_defaults masakarimonitors.instancemonitor = masakarimonitors.common.config:set_middleware_defaults masakarimonitors.processmonitor = masakarimonitors.common.config:set_middleware_defaults masakarimonitors.hostmonitor = masakarimonitors.common.config:set_middleware_defaults console_scripts = + masakari-introspectiveinstancemonitor = masakarimonitors.cmd.introspectiveinstancemonitor:main masakari-instancemonitor = masakarimonitors.cmd.instancemonitor:main masakari-processmonitor = masakarimonitors.cmd.processmonitor:main masakari-hostmonitor = masakarimonitors.cmd.hostmonitor:main