From a680e323b8d061d1fe61c86817bd0ff5478bc580 Mon Sep 17 00:00:00 2001 From: Louie KWAN Date: Wed, 17 Jan 2018 19:36:04 +0000 Subject: [PATCH] Introspective Instance Monitoring through QEMU Guest Agent https://blueprints.launchpad.net/masakari/+spec/introspective-instance-monitoring Currently, Masakari instance monitoring is strictly black-box type monitoring through qemu and libvirt. There are however a number of internal instance/VM faults, that if monitored and detected by Masakari, could be recovered by existing Masakari auto-recovery mechanisms; increasing the overall availability of the instance/VM. This blueprint introduces the capability of performing introspective instance monitoring of VMs, in order to detect, report and optionally recover VMs from internal VM faults. Specifically, VM Heartbeating Monitoring via the QEMU Guest Agent. Change-Id: I9efc6afc8d476003d3aa7fee8c31bcaa65438674 Implements: blueprint introspective-instance-monitoring --- .../cmd/introspectiveinstancemonitor.py | 38 ++ masakarimonitors/conf/__init__.py | 2 + .../conf/introspectiveinstancemonitor.py | 67 +++ masakarimonitors/conf/service.py | 4 + .../introspectiveinstancemonitor/README.rst | 78 ++++ .../introspectiveinstancemonitor/__init__.py | 0 .../introspectiveinstancemonitor/cache.py | 123 +++++ .../introspectiveinstancemonitor/instance.py | 176 +++++++ .../qemu_utils.py | 429 ++++++++++++++++++ .../introspectiveinstancemonitor/scheduler.py | 110 +++++ .../introspectiveinstancemonitor/__init__.py | 0 .../test_monitor_manager.py | 42 ++ .../test_qemu_utils.py | 112 +++++ masakarimonitors/utils.py | 21 + ...ctiveinstancemonitor-f4bc71f029b61d49.yaml | 6 + requirements.txt | 3 + setup.cfg | 2 + 17 files changed, 1213 insertions(+) create mode 100644 masakarimonitors/cmd/introspectiveinstancemonitor.py create mode 100644 masakarimonitors/conf/introspectiveinstancemonitor.py create mode 100644 masakarimonitors/introspectiveinstancemonitor/README.rst create mode 100644 masakarimonitors/introspectiveinstancemonitor/__init__.py create mode 100644 masakarimonitors/introspectiveinstancemonitor/cache.py create mode 100644 masakarimonitors/introspectiveinstancemonitor/instance.py create mode 100755 masakarimonitors/introspectiveinstancemonitor/qemu_utils.py create mode 100644 masakarimonitors/introspectiveinstancemonitor/scheduler.py create mode 100755 masakarimonitors/tests/unit/introspectiveinstancemonitor/__init__.py create mode 100644 masakarimonitors/tests/unit/introspectiveinstancemonitor/test_monitor_manager.py create mode 100755 masakarimonitors/tests/unit/introspectiveinstancemonitor/test_qemu_utils.py create mode 100644 releasenotes/notes/introspectiveinstancemonitor-f4bc71f029b61d49.yaml diff --git a/masakarimonitors/cmd/introspectiveinstancemonitor.py b/masakarimonitors/cmd/introspectiveinstancemonitor.py new file mode 100644 index 0000000..6d15ec9 --- /dev/null +++ b/masakarimonitors/cmd/introspectiveinstancemonitor.py @@ -0,0 +1,38 @@ +# Copyright(c) 2018 WindRiver Systems +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Starter script for Masakari Introspective Instance Monitor.""" + +import sys + +from oslo_log import log as logging + +import masakarimonitors.conf +from masakarimonitors import config +from masakarimonitors import service +from masakarimonitors import utils + + +CONF = masakarimonitors.conf.CONF + + +def main(): + config.parse_args(sys.argv) + logging.setup(CONF, "masakarimonitors") + utils.monkey_patch() + + server = service.Service.create( + binary='masakarimonitors-introspectiveinstancemonitor') + service.serve(server) + service.wait() diff --git a/masakarimonitors/conf/__init__.py b/masakarimonitors/conf/__init__.py index f72ba9f..0b2d9c0 100644 --- a/masakarimonitors/conf/__init__.py +++ b/masakarimonitors/conf/__init__.py @@ -17,6 +17,7 @@ from masakarimonitors.conf import api from masakarimonitors.conf import base from masakarimonitors.conf import host from masakarimonitors.conf import instance +from masakarimonitors.conf import introspectiveinstancemonitor from masakarimonitors.conf import process from masakarimonitors.conf import service @@ -26,5 +27,6 @@ api.register_opts(CONF) base.register_opts(CONF) host.register_opts(CONF) instance.register_opts(CONF) +introspectiveinstancemonitor.register_opts(CONF) process.register_opts(CONF) service.register_opts(CONF) diff --git a/masakarimonitors/conf/introspectiveinstancemonitor.py b/masakarimonitors/conf/introspectiveinstancemonitor.py new file mode 100644 index 0000000..96e3604 --- /dev/null +++ b/masakarimonitors/conf/introspectiveinstancemonitor.py @@ -0,0 +1,67 @@ +# Copyright(c) 2018 WindRiver Systems +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg + + +# Note: this string is being used for regex parsing later with re module. +# +# Use Python's raw string notation for regular expressions and +# uses the backslash character ('\') to indicate special +# forms or to allow special characters to be used without invoking +# their special meaning. +SOCK = r'/var/lib/libvirt/qemu/org\.qemu\.guest_agent\..*\.instance-.*\.sock' + +monitor_opts = [ + cfg.IntOpt('guest_monitoring_interval', + default=10, + help=''' +Guest monitoring interval of VM status (in seconds). +* The value should not be too low as there should not be false negative +* for reporting QEMU_GUEST_AGENT failures +* VM needs time to do powering-off. +* guest_monitoring_interval should be greater than +* the time to SHUTDOWN VM gracefully. +* e.g. | 565da9ba-3c0c-4087-83ca | iim1 | ACTIVE | powering-off | Running +'''), + cfg.IntOpt('guest_monitoring_timeout', + default=2, + help='Guest monitoring timeout (in seconds).'), + cfg.IntOpt('guest_monitoring_failure_threshold', + default=3, + help='Failure threshold before sending notification.'), + cfg.StrOpt('qemu_guest_agent_sock_path', + default=SOCK, + help=''' +* The file path of qemu guest agent sock. +* Please use Python raw string notation as regular expressions. +e.g. r'/var/lib/libvirt/qemu/org\.qemu\.guest_agent\..*\.instance-.*\.sock' +'''), + cfg.BoolOpt('callback_paused_event', + default=True, + help=''' +* True: Callback for VM paused events. +* False: Do not callback for VM paused events. +'''), +] + + +def register_opts(conf): + conf.register_opts(monitor_opts, group='introspectiveinstancemonitor') + + +def list_opts(): + return { + 'introspectiveinstancemonitor': monitor_opts + } diff --git a/masakarimonitors/conf/service.py b/masakarimonitors/conf/service.py index e2fab7d..4c3eb19 100644 --- a/masakarimonitors/conf/service.py +++ b/masakarimonitors/conf/service.py @@ -30,6 +30,10 @@ Possible values: default='masakarimonitors.instancemonitor.instance' '.InstancemonitorManager', help='Full class name for the Manager for instancemonitor.'), + cfg.StrOpt('introspectiveinstancemonitor_manager', + default='masakarimonitors.introspectiveinstancemonitor.instance' + '.IntrospectiveInstanceMonitorManager', + help='Full class name for introspectiveinstancemonitor.'), cfg.StrOpt('processmonitor_manager', default='masakarimonitors.processmonitor.process' '.ProcessmonitorManager', diff --git a/masakarimonitors/introspectiveinstancemonitor/README.rst b/masakarimonitors/introspectiveinstancemonitor/README.rst new file mode 100644 index 0000000..6568ba9 --- /dev/null +++ b/masakarimonitors/introspectiveinstancemonitor/README.rst @@ -0,0 +1,78 @@ +========================================= +masakarimonitors-introspectiveinstancemonitor +========================================= + +Introspective instance monitor for Masakari +---------------------------------------- +- masakarimonitors-introspectiveinstancemonitor, provides Virtual Machine + High Availability (VMHA) service for OpenStack clouds by automatically + detecting the system-level failure events via QEMU Guest Agent. If it + detects VM heartbeat failure events, it sends notifications to the + masakari-api. + + +- Based on the QEMU Guest Agent, + masakarimonitors-introspectiveinstancemonitor aims to provide access + to a system-level agent via standard qemu-ga protocol + + +How does it work? +---------------------------------------- +- libvirt and QEMU Guest Agent are used as the underlying protocol for + messaging to and from VM. + + - The host-side qemu-agent sockets are used to detemine whether VMs are + configured with QEMU Guest Agent. + + - qemu-guest-ping is used as the monitoring heartbeat. + +- For the future release, we can pass through arbitrary guest agent commands + to check the health of the applications inside a VM. + +QEMU Guest Agent Installation notes +---------------------------------------- + +- Set image property: hw_qemu_guest_agent=yes. + + - This tells NOVA to setup the virtual serial interface thru QEMU to VM + + - e.g. + + $ openstack image create --public --disk-format qcow2 --container-format + bare --file ~ubuntu/xenial-server-cloudimg-amd64-disk1.img --public + --property hw_qemu_guest_agent=yes xenial-server-cloudimg + +* Inside VM:: + + $ sudo apt-get install qemu-guest-agent + $ sudo systemctl start qemu-guest-agent + $ ubuntu@test:~$ ps -ef | fgrep qemu + $ ... /usr/sbin/qemu-ga --daemonize -m virtio-serial -p /dev/virtio-ports/org.qemu.guest_agent.0 + $ ubuntu@test:~$ ls /dev/virtio-ports/ + $ org.qemu.guest_agent.0 + + +Configure masakarimonitors-introspectiveinstancemonitor +---------------------------------------------- +#. Clone masakari using:: + + $ git clone https://github.com/openstack/masakari-monitors.git + +#. Create masakarimonitors directory in /etc/. + +#. Run setup.py from masakari-monitors:: + + $ sudo python setup.py install + +#. Copy masakarimonitors.conf and process_list.yaml files from + masakari-monitors/etc/ to /etc/masakarimonitors folder and make necessary + changes to the masakarimonitors.conf and process_list.yaml files. + To generate the sample masakarimonitors.conf file, run the following + command from the top level of the masakari-monitors directory:: + + $ tox -egenconfig + +#. To run masakari-introspectiveinstancemonitor simply use following binary:: + + $ masakari-introspectiveinstancemonitor + diff --git a/masakarimonitors/introspectiveinstancemonitor/__init__.py b/masakarimonitors/introspectiveinstancemonitor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/masakarimonitors/introspectiveinstancemonitor/cache.py b/masakarimonitors/introspectiveinstancemonitor/cache.py new file mode 100644 index 0000000..4941e7b --- /dev/null +++ b/masakarimonitors/introspectiveinstancemonitor/cache.py @@ -0,0 +1,123 @@ +# Copyright (c) 2018 WindRiver Systems +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +The code related to integration between oslo.cache module and masakarimonitors. + +Use 'oslo_cache.dict' + +i.e. dogpile.cache backend that uses dictionary for storage +Dogpile consists of two subsystems, one building on top of the other. +dogpile provides the concept of a dogpile lock, a control structure +which allows a single thread of execution to be selected as the creator +of some resource, while allowing other threads of execution to refer +to the previous version of this resource as the creation proceeds; +if there is no previous version, then those threads block until +the object is available. +""" +from oslo_cache import core +from oslo_config import cfg + +from masakarimonitors.i18n import _ + +WEEK = 604800 + + +def register_cache_configurations(conf): + """Register all configurations required for oslo.cache. + + The procedure registers all configurations required for oslo.cache. + It should be called before configuring of cache region + + :param conf: instance of configuration + :returns: updated configuration + """ + # register global configurations for caching + core.configure(conf) + + # register specific configurations + constraint_cache_group = cfg.OptGroup('constraint_validation_cache') + constraint_cache_opts = [ + cfg.IntOpt('expiration_time', default=WEEK, + help=_( + 'TTL, in seconds, for any cached item in the ' + 'dogpile.cache region used for caching of validation ' + 'constraints.')), + cfg.BoolOpt("caching", default=True, + help=_( + 'Toggle to enable/disable caching when Orchestration ' + 'Engine validates property constraints of stack.' + 'During property validation with constraints ' + 'Orchestration Engine caches requests to other ' + 'OpenStack services. Please note that the global ' + 'toggle for oslo.cache(enabled=True in [cache] group) ' + 'must be enabled to use this feature.')) + ] + conf.register_group(constraint_cache_group) + conf.register_opts(constraint_cache_opts, group=constraint_cache_group) + + extension_cache_group = cfg.OptGroup('service_extension_cache') + extension_cache_opts = [ + cfg.IntOpt('expiration_time', default=WEEK, + help=_( + 'TTL, in seconds, for any cached item in the ' + 'dogpile.cache region used for caching of service ' + 'extensions.')), + cfg.BoolOpt('caching', default=True, + help=_( + 'Toggle to enable/disable caching when Orchestration ' + 'Engine retrieves extensions from other OpenStack ' + 'services. Please note that the global toggle for ' + 'oslo.cache(enabled=True in [cache] group) must be ' + 'enabled to use this feature.')) + ] + conf.register_group(extension_cache_group) + conf.register_opts(extension_cache_opts, group=extension_cache_group) + + find_cache_group = cfg.OptGroup('resource_finder_cache') + find_cache_opts = [ + cfg.IntOpt('expiration_time', default=WEEK, + help=_( + 'TTL, in seconds, for any cached item in the ' + 'dogpile.cache region used for caching of OpenStack ' + 'service finder functions.')), + cfg.BoolOpt('caching', default=True, + help=_( + 'Toggle to enable/disable caching when Orchestration ' + 'Engine looks for other OpenStack service resources ' + 'using name or id. Please note that the global ' + 'toggle for oslo.cache(enabled=True in [cache] group) ' + 'must be enabled to use this feature.')) + ] + conf.register_group(find_cache_group) + conf.register_opts(find_cache_opts, group=find_cache_group) + + return conf + + +# variable that stores an initialized cache region +_REGION = None + + +def get_cache_region(): + global _REGION + if not _REGION: + _REGION = core.create_region() + _REGION.configure('oslo_cache.dict', + arguments={'expiration_time': WEEK}) + core.configure_cache_region( + conf=register_cache_configurations(cfg.CONF), + region=_REGION) + return _REGION diff --git a/masakarimonitors/introspectiveinstancemonitor/instance.py b/masakarimonitors/introspectiveinstancemonitor/instance.py new file mode 100644 index 0000000..3f09f3f --- /dev/null +++ b/masakarimonitors/introspectiveinstancemonitor/instance.py @@ -0,0 +1,176 @@ +# Copyright (c) 2018 WindRiver Systems +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import eventlet +import libvirt +import socket +import sys +import threading +import time + +from oslo_config import cfg +from oslo_log import log as oslo_logging +from oslo_utils import excutils +from oslo_utils import timeutils + +from masakarimonitors.introspectiveinstancemonitor import qemu_utils +from masakarimonitors.introspectiveinstancemonitor import scheduler +from masakarimonitors import manager +from masakarimonitors.objects import event_constants as ec + +LOG = oslo_logging.getLogger(__name__) +CONF = cfg.CONF + + +class IntrospectiveInstanceMonitorManager(manager.Manager): + + def __init__(self, *args, **kwargs): + self.init_tgm() + super(IntrospectiveInstanceMonitorManager, self).__init__( + service_name="introspectiveinstancemonitor", *args, **kwargs) + # This keeps track of what thread is running the event loop, + # (if it is run in a background thread) + self.event_loop_thread = None + + def _reset_journal(self, eventID, eventType, detail, uuID): + """To reset the monitoring to discovery stage + + :param event_id: Event ID + :param event_type: Event type + :param detail: Event code + :param domain_uuid: Uuid + """ + + noticeType = ec.EventConstants.TYPE_VM + hostname = socket.gethostname() + currentTime = timeutils.utcnow() + + def _reset_handler(event_id, event_type, detail, domain_uuid, msg): + """Reset monitoring + + To reset monitoring to discovery stage for the following event: + libvirt.VIR_DOMAIN_EVENT_STARTED + """ + + if (event_id == libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE) and \ + (event_type == libvirt.VIR_DOMAIN_EVENT_STARTED): + LOG.debug(msg) + qemu_utils.resetJournal(domain_uuid) + + # All Event Output if debug mode is on. + msg = "libvirt Event Received.type = %s \ + hostname = %s uuid = %s time = %s eventID = %d eventType = %d \ + detail = %d" % ( + noticeType, + hostname, uuID, currentTime, eventID, + eventType, detail) + + LOG.debug("%s", msg) + + try: + thread = threading.Thread( + _reset_handler(eventID, eventType, detail, uuID, msg) + ) + thread.start() + except KeyError as e: + LOG.error("virEventFilter KeyError: {0}".format(e)) + except IndexError as e: + LOG.error("virEventFilter IndexError: {0}".format(e)) + except TypeError as e: + LOG.error("virEventFilter TypeError: {0}".format(e)) + except Exception: + with excutils.save_and_reraise_exception(): + LOG.error("Unexpected error: %s" % sys.exc_info()[0]) + + def init_tgm(self): + """Manages the masakari-introspectiveinstancemonitor.""" + self.TG = scheduler.ThreadGroupManager() + + def _vir_event_loop_native_run(self): + # Directly run the event loop in the current thread + while True: + libvirt.virEventRunDefaultImpl() + + def _vir_event_loop_native_start(self): + libvirt.virEventRegisterDefaultImpl() + self.event_loop_thread = threading.Thread( + target=self._vir_event_loop_native_run, + name="lib_virt_eventLoop") + self.event_loop_thread.setDaemon(True) + self.event_loop_thread.start() + + def _my_domain_event_callback(self, conn, dom, event, detail, opaque): + self._reset_journal(libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE, + event, detail, dom.UUIDString()) + + def _my_domain_event_reboot_callback(self, conn, dom, opaque): + self._reset_journal(libvirt.VIR_DOMAIN_EVENT_ID_REBOOT, + -1, -1, dom.UUIDString()) + + def _err_handler(self, ctxt, err): + LOG.warning("Error from libvirt : %s", err[2]) + + def _virt_event(self, uri): + # Run a background thread with the event loop + self._vir_event_loop_native_start() + + event_callback_handlers = { + libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE: + self._my_domain_event_callback, + libvirt.VIR_DOMAIN_EVENT_ID_REBOOT: + self._my_domain_event_reboot_callback + } + # Connect to libvirt - If be disconnected, reprocess. + self.running = True + while self.running: + vc = libvirt.openReadOnly(uri) + + # Event callback settings + callback_ids = [] + for event, callback in event_callback_handlers.items(): + cid = vc.domainEventRegisterAny(None, event, callback, None) + callback_ids.append(cid) + + # Connection monitoring. + vc.setKeepAlive(5, 3) + while vc.isAlive() == 1 and self.running: + eventlet.greenthread.sleep(1) + + # If connection between libvirtd was lost, + # clear callback connection. + LOG.warning("Libvirt Connection Closed Unexpectedly.") + for cid in callback_ids: + try: + vc.domainEventDeregisterAny(cid) + except Exception: + pass + vc.close() + del vc + time.sleep(3) + + def stop(self): + self.running = False + + def main(self): + """Main method. + + Set the URI, error handler, and executes event loop processing. + """ + + uri = CONF.libvirt.connection_uri + LOG.debug("Using uri:" + uri) + + # set error handler & do event loop + libvirt.registerErrorHandler(self._err_handler, '_virt_event') + self._virt_event(uri) diff --git a/masakarimonitors/introspectiveinstancemonitor/qemu_utils.py b/masakarimonitors/introspectiveinstancemonitor/qemu_utils.py new file mode 100755 index 0000000..8f1271e --- /dev/null +++ b/masakarimonitors/introspectiveinstancemonitor/qemu_utils.py @@ -0,0 +1,429 @@ +# Copyright (c) 2018 WindRiver Systems +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# Introspective instance monitoring depends on the qemu guest agent to +# monitoring what inside of a VM. +# +# A few items to get in the way of the design: +# +# - After VM is active, it needs time to start qemu-guest-agent. +# Before error/failure is reported, we need a discovery phase to wait +# until VM is guest_pingable. +# +# - Debouncing is needed to enforce that masakari_notifier function +# not calling twice or more for the same failure. +# +# After reported a masakari notification, sent_notification flag will +# need to be reset when there is a QEMU LIFECYCLE event like: +# STARTED_BOOTED +# SUSPENDED_PAUSED + +import eventlet +import libvirt +import libvirtmod_qemu +import logging +import re +import socket +import time +import traceback + +# Machine module contains the state machine logic, state and transition events +from automaton import machines +from dogpile.cache.api import NoValue +from lxml import etree +from oslo_config import cfg +from oslo_utils import timeutils + +from masakarimonitors.ha import masakari +from masakarimonitors.introspectiveinstancemonitor import cache +from masakarimonitors.objects import event_constants as ec +from masakarimonitors import utils + +CONF = cfg.CONF +ICONF = cfg.CONF.introspectiveinstancemonitor + +# The VM QEMU Quest Agent states +# +# discovery = initial state of VM, +# remains in this state until it is determined that +# the VM has a qemu-agent interface enabling intrusive-instance-monitoring +# +# healthy = no failure event is detected +# +# error = An error is recorded on every audit cycle where +# the VM is in the error state +# +# reported = a transient state +# to keep track of reporting of notification +# + +# Transitions +# +# Representation of a transition managed by a ``Machine`` instance. +# source (str): Source state of the transition. +# dest (str): Destination state of the transition. +# trigger (str): The type of triggering event +# that advances to the next state in the sequence. + + +def action_on_enter(new_state, triggered_event): + pass + + +def action_on_exit(old_state, triggered_event): + pass + +STATE_SPACE = [ + { + 'name': 'discovery', + 'next_states': { + 'guest_pingable': 'healthy', + 'guest_not_pingable': 'discovery', + }, + 'on_enter': action_on_enter, + 'on_exit': action_on_exit, + }, + { + 'name': 'healthy', + 'next_states': { + 'guest_pingable': 'healthy', + 'guest_not_pingable': 'error', + }, + 'on_enter': action_on_enter, + 'on_exit': action_on_exit, + }, + { + 'name': 'error', + 'next_states': { + 'report': 'reported', + 'guest_pingable': 'healthy', + 'guest_not_pingable': 'error', + }, + 'on_enter': action_on_enter, + 'on_exit': action_on_exit, + }, + { + 'name': 'reported', + 'next_states': { + 'guest_pingable': 'discovery', + 'guest_not_pingable': 'reported', + }, + 'on_enter': action_on_enter, + 'on_exit': action_on_exit, + }, +] + + +# Journal representation of a managed ``Machine`` instance. +class Journal(machines.FiniteMachine): + + # Factory pattern to create a Journal object + @classmethod + def factory(cls, domain_uuid): + jo = cls.build(STATE_SPACE) + jo.default_start_state = 'discovery' + jo.initialize() + jo.failedCount = 0 + # Conditions to reset sent_notification + jo.sent_notification = False + jo.domain_uuid = domain_uuid + jo.lastUsed = time.time() + LOG.debug(str(domain_uuid) + ':Journal:__init__:') + return jo + + def resetState(self): + self.default_start_state = 'discovery' + self.initialize() + self.failedCount = 0 + self.sent_notification = False + LOG.debug(str(self.domain_uuid) + '__resetState__:') + + def processEvent(self, event): + self.process_event(event) + self.lastUsed = time.time() + + def getFailedCount(self): + return self.failedCount + + def incrementFailedCount(self): + if (self.current_state == 'error'): + self.failedCount += 1 + + def resetFailedCount(self): + self.failedCount = 0 + + def setSentNotification(self, boolean): + self.sent_notification = boolean + + def getSentNotification(self): + return self.sent_notification + + +# libvirt state verbose dictionary +STATES = { + libvirt.VIR_DOMAIN_NOSTATE: 'no state', + libvirt.VIR_DOMAIN_RUNNING: 'running', + libvirt.VIR_DOMAIN_BLOCKED: 'blocked on resource', + libvirt.VIR_DOMAIN_PAUSED: 'paused by user', + libvirt.VIR_DOMAIN_SHUTDOWN: 'being shut down', + libvirt.VIR_DOMAIN_SHUTOFF: 'shut off', + libvirt.VIR_DOMAIN_CRASHED: 'crashed', +} + +LOG = logging.getLogger(__name__) + + +def get_function_name(): + return traceback.extract_stack(None, 2)[0][2] + + +# To reset journal object by domain uuid +def resetJournal(domain_uuid): + # To get the VM Journal object from the dictionary + # :param domain: QEMU domain UUID + dict = cache.get_cache_region() + jo = None + if type(dict.get(domain_uuid)) is NoValue: + jo = Journal.factory(domain_uuid) + dict.set(domain_uuid, jo) + else: + jo = dict.get(domain_uuid) + + jo.resetState() + + +# Qemu guest agent is used to check VM status +# The checking pre-conditions are as follows: +# - VM is running +# - VM has Qemu guest agent installed +# +# then status is determined by +# - VM is guest-agent-pingable or not +# +# Note: checkGuests function is called by the scheduler +class QemuGuestAgent(object): + + def __init__(self): + super(QemuGuestAgent, self).__init__() + self.notifier = masakari.SendNotification() + + # _thresholdsCrossing + # + # We only issue a notification + # to masakari-engine if the VM is 'already' in the error state + # when there are consecutive guest_ping failures. + # Suggested value for guest_monitoring_failure_threshold >= 3 + # + # Note: When operators are trying to gracefully shutdown VM, + # QEMU may take time to powering-off. + # E.g. When you do or + # you may see that QEMU is active but monitoring may fail + # due to VM is still "powering-off" + # Status | Task State | Power State + # ACTIVE | powering-off | Running + def _thresholdsCrossing(self, domain): + if (((self.getVmFsm(domain.UUIDString()).current_state) == 'error') + and + (self._getJournalObject(domain.UUIDString()).getFailedCount() + > ICONF.guest_monitoring_failure_threshold)): + LOG.debug('_thresholdsCrossing:' + domain.UUIDString()) + LOG.debug(self._getJournalObject( + domain.UUIDString()).getFailedCount()) + return True + else: + return False + + def _masakari_notifier(self, domain_uuid): + if self._getJournalObject(domain_uuid).getSentNotification(): + LOG.debug('notifier.send_notification Skipped:' + domain_uuid) + else: + hostname = socket.gethostname() + noticeType = ec.EventConstants.TYPE_VM + current_time = timeutils.utcnow() + event = { + 'notification': { + 'type': noticeType, + 'hostname': hostname, + 'generated_time': current_time, + 'payload': { + 'event': 'QEMU_GUEST_AGENT_ERROR', + 'instance_uuid': domain_uuid, + 'vir_domain_event': 'STOPPED_FAILED' + } + } + } + try: + self.notifier.send_notification(CONF.callback.retry_max, + CONF.callback.retry_interval, + event) + self._getJournalObject(domain_uuid).processEvent('report') + self._getJournalObject(domain_uuid).setSentNotification(True) + except Exception: + LOG.warn('Exception :' + domain_uuid + + ' @ ' + get_function_name()) + pass + + def _qemuAgentGuestPing(self, domain, timeout, flags=0): + def _no_heartbeat(domain_uuid): + # Also advance the FSM + self.getVmFsm(domain_uuid).processEvent('guest_not_pingable') + self._getJournalObject(domain_uuid).incrementFailedCount() + + def _with_heartbeat(domain_uuid): + #The order matters as we want to decrease the counter first + self._getJournalObject(domain_uuid).resetFailedCount() + self.getVmFsm(domain_uuid).processEvent('guest_pingable') + + def _record(result): + if result is None: + LOG.debug(domain.UUIDString() + + '\tqemu-ga_guest-ping is not responding.') + + if self._thresholdsCrossing(domain): + self._masakari_notifier(domain.UUIDString()) + + _no_heartbeat(domain.UUIDString()) + else: + _with_heartbeat(domain.UUIDString()) + + """Send a Guest Agent ping to domain """ + # must pass domain._o to the c library as virDomainPtr + ret = libvirtmod_qemu.virDomainQemuAgentCommand(domain._o, + '{"execute": "guest-ping"}', timeout, flags) + + _record(ret) + + def _getJournalObject(self, domain_uuid): + """Function: To get the dictionary + + :param domain: QEMU domain + :return: the journal object referred by domain_uuid + """ + + dict = cache.get_cache_region() + if type(dict.get(domain_uuid)) is NoValue: + jo = Journal.factory(domain_uuid) + dict.set(domain_uuid, jo) + return jo + else: + return dict.get(domain_uuid) + + def getVmFsm(self, domain_uuid): + """Function: To get the VM Finite State Machine from + the dictionary + + :param domain: QEMU domain + :return: FSM object + """ + dict = cache.get_cache_region() + + if type(dict.get(domain_uuid)) is NoValue: + jo = Journal.factory(domain_uuid) + dict.set(domain_uuid, jo) + return jo + else: + return dict.get(domain_uuid) + + def _hasQemuGuestAgent(self, domain): + """Function: To check whether the VM has an QEMU Guest Agent + by examining the qemu.guest_agent sock + + First check if libvirt is running or not, then sock + + :param domain: QEMU domain + :return: true or false + """ + + def qemuGuestAgentPathMatch(path): + SOCK = ICONF.qemu_guest_agent_sock_path + return re.match('%s' % SOCK, path) + + state, reason = domain.state() + # First check if libvirt is running or not + if state != libvirt.VIR_DOMAIN_RUNNING: + return False + + xmlDesc = domain.XMLDesc() + tree = etree.fromstring(xmlDesc) + ''' Example + + + + +
+ + ''' + try: + source = tree.find("devices/channel/source") + if (source is not None): + mode = source.get('mode') + path = source.get('path') + # There should be a bind for a sock file for qemu guest_agent + if (qemuGuestAgentPathMatch(path) and mode == 'bind'): + return True + except Exception: + pass + + return False + + def checkGuests(self): + """Function: Check QEMU Guests + + Condition: VM under intrusive monitoring must have QEMU agent client + configured, installed and qemu "guest-agent-pingable". + """ + + try: + conn = libvirt.open(None) # LIBVIRT_DEFAULT_URI + ids = conn.listDomainsID() + running = map(conn.lookupByID, ids) + + columns = 3 + + for row in map(None, *[iter(running)] * columns): + for domain in row: + if domain: + try: + if self._hasQemuGuestAgent(domain): + @utils.synchronized(domain.UUIDString()) + def do_qemuAgentGuestPing(domain, timeout): + self._qemuAgentGuestPing(domain, timeout) + do_qemuAgentGuestPing(domain, + ICONF.guest_monitoring_timeout) + except libvirt.libvirtError as le: + LOG.warn(le) + continue + except Exception as e: + LOG.warn(e) + pass + + +def reschedule(action, sleep_time=1): + """Eventlet Sleep for the specified number of seconds. + + :param sleep_time: seconds to sleep; if None, no sleep; + """ + LOG.debug('At reschedule') + if sleep_time is not None: + LOG.debug('Action %s sleep for %s seconds' % ( + action.id, sleep_time)) + eventlet.sleep(sleep_time) + + +def sleep(sleep_time): + """Interface for sleeping.""" + + eventlet.sleep(sleep_time) diff --git a/masakarimonitors/introspectiveinstancemonitor/scheduler.py b/masakarimonitors/introspectiveinstancemonitor/scheduler.py new file mode 100644 index 0000000..e5d08f9 --- /dev/null +++ b/masakarimonitors/introspectiveinstancemonitor/scheduler.py @@ -0,0 +1,110 @@ +# Copyright (c) 2018 WindRiver Systems +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +import logging + +from oslo_config import cfg +from oslo_service import threadgroup + +from masakarimonitors.introspectiveinstancemonitor import qemu_utils + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class ThreadGroupManager(object): + """Thread group manager.""" + + def init_qemu_ga(self): + self.qemuGA = qemu_utils.QemuGuestAgent() + LOG.debug('Started QemuGuestAgent') + + def __init__(self): + self.init_qemu_ga() + super(ThreadGroupManager, self).__init__() + self.threads = {} + self.group = threadgroup.ThreadGroup() + + # Create dummy service task, because when there is nothing queued + # on self.tg the process exits + self.add_timer( + CONF.introspectiveinstancemonitor.guest_monitoring_interval, + self._service_task) + + def _service_task(self): + # This is used to trigger periodic monitoring tasks + + self.qemuGA.checkGuests() + + def start(self, func, *args, **kwargs): + """Run the given method in a sub-thread.""" + + LOG.debug('add_thread') + return self.group.add_thread(func, *args, **kwargs) + + def add_timer(self, interval, func, *args, **kwargs): + """Define a periodic task to be run in the thread group. + + The task will be executed in a separate green thread. + Interval is from cfg.CONF.periodic_interval + """ + + LOG.debug('group.add_timer') + self.group.add_timer(interval, func, *args, **kwargs) + + def stop_timers(self): + self.group.stop_timers() + + def stop(self, graceful=False): + """Stop any active threads belong to this threadgroup.""" + + # Try to stop all threads gracefully + self.group.stop(graceful) + self.group.wait() + + # Wait for link()ed functions (i.e. lock release) + threads = self.group.threads[:] + links_done = dict((th, False) for th in threads) + + def mark_done(gt, th): + links_done[th] = True + + for th in threads: + th.link(mark_done, th) + + while not all(links_done.values()): + eventlet.sleep() + + +def reschedule(action, sleep_time=1): + """Eventlet Sleep for the specified number of seconds. + + :param sleep_time: seconds to sleep; if None, no sleep; + """ + + LOG.debug('At reschedule') + if sleep_time is not None: + LOG.debug('Action %s sleep for %s seconds' % ( + action.id, sleep_time)) + eventlet.sleep(sleep_time) + + +def sleep(sleep_time): + """Interface for sleeping.""" + + LOG.debug('sleep') + + eventlet.sleep(sleep_time) diff --git a/masakarimonitors/tests/unit/introspectiveinstancemonitor/__init__.py b/masakarimonitors/tests/unit/introspectiveinstancemonitor/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/masakarimonitors/tests/unit/introspectiveinstancemonitor/test_monitor_manager.py b/masakarimonitors/tests/unit/introspectiveinstancemonitor/test_monitor_manager.py new file mode 100644 index 0000000..efab397 --- /dev/null +++ b/masakarimonitors/tests/unit/introspectiveinstancemonitor/test_monitor_manager.py @@ -0,0 +1,42 @@ +# Copyright(c) 2018 WindRiver Systems +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import eventlet +import libvirt +import mock +import testtools + +from masakarimonitors.introspectiveinstancemonitor import instance + +eventlet.monkey_patch(os=False) + + +class TestMonitorManager(testtools.TestCase): + + def setUp(self): + super(TestMonitorManager, self).setUp() + + @mock.patch.object(libvirt, 'virEventRunDefaultImpl') + def test_vir_event_loop_native_run(self, mock_virEventRunDefaultImpl): + mock_virEventRunDefaultImpl.side_effect = Exception("Test exception.") + + obj = instance.IntrospectiveInstanceMonitorManager() + exception_flag = False + try: + obj._vir_event_loop_native_run() + except Exception: + exception_flag = True + + self.assertTrue(exception_flag) + mock_virEventRunDefaultImpl.assert_called_once() diff --git a/masakarimonitors/tests/unit/introspectiveinstancemonitor/test_qemu_utils.py b/masakarimonitors/tests/unit/introspectiveinstancemonitor/test_qemu_utils.py new file mode 100755 index 0000000..b4ec728 --- /dev/null +++ b/masakarimonitors/tests/unit/introspectiveinstancemonitor/test_qemu_utils.py @@ -0,0 +1,112 @@ +# Copyright(c) 2018 Nippon Telegraph and Telephone Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import libvirt +import mock +import testtools +import uuid + + +from masakarimonitors.introspectiveinstancemonitor import instance as object +from masakarimonitors.introspectiveinstancemonitor import qemu_utils + + +class TestQemuUtils(testtools.TestCase): + + def setup(self): + super(TestQemuUtils, self).setUp() + + @mock.patch.object(qemu_utils.libvirt, 'virDomain') + def test_getVmFsm(self, mock_domain): + """To test the state machines + + Initial stage should be dicovery and will advance to a + healthy stage after enough pingable events. Also, + it should reach an error stage from healthy after + a pingable event failure. + """ + + reference = qemu_utils.QemuGuestAgent() + mock_domain.UUID.return_value = uuid.uuid4() + reference.getVmFsm(mock_domain) + self.assertEqual(reference.getVmFsm(mock_domain).current_state, + 'discovery') + reference.getVmFsm(mock_domain).process_event('guest_not_pingable') + self.assertEqual(reference.getVmFsm(mock_domain).current_state, + 'discovery') + reference.getVmFsm(mock_domain).process_event('guest_pingable') + self.assertEqual(reference.getVmFsm(mock_domain).current_state, + 'healthy') + reference.getVmFsm(mock_domain).process_event('guest_pingable') + self.assertEqual(reference.getVmFsm(mock_domain).current_state, + 'healthy') + reference.getVmFsm(mock_domain).process_event('guest_not_pingable') + self.assertEqual(reference.getVmFsm(mock_domain).current_state, + 'error') + + @mock.patch.object(qemu_utils.libvirt, 'virDomain') + def test_hasQemuGuestAgent(self, mock_domain): + mock_domain.UUID.return_value = 'testuuid' + mock_domain.state.return_value = libvirt.VIR_DOMAIN_RUNNING, 'reason' + mock_domain.XMLDesc.return_value = """ + + + + + + + + +""" + obj = qemu_utils.QemuGuestAgent() + self.assertFalse(obj._hasQemuGuestAgent(mock_domain)) + mock_domain.XMLDesc.return_value = """ + + + + + + + + + + + +
+ + +""" + obj = qemu_utils.QemuGuestAgent() + self.assertTrue(obj._hasQemuGuestAgent(mock_domain)) + + @mock.patch.object(qemu_utils, 'resetJournal') + def test_resetJournal(self, mock_resetJournal): + + mock_resetJournal.return_value = None + + obj = object.IntrospectiveInstanceMonitorManager() + + event_id = 0 + domain_uuid = uuid.uuid4() + event_type = libvirt.VIR_DOMAIN_EVENT_STARTED + detail = libvirt.VIR_DOMAIN_EVENT_STARTED_BOOTED + + obj._reset_journal(event_id, event_type, detail, domain_uuid) + + mock_resetJournal.assert_called_once_with(domain_uuid) diff --git a/masakarimonitors/utils.py b/masakarimonitors/utils.py index c1dd725..3ef7da1 100644 --- a/masakarimonitors/utils.py +++ b/masakarimonitors/utils.py @@ -21,6 +21,7 @@ import shutil import sys import tempfile +from oslo_concurrency import lockutils from oslo_concurrency import processutils from oslo_log import log as logging from oslo_utils import importutils @@ -111,3 +112,23 @@ def execute(*cmd, **kwargs): return privsep_execute(*cmd, **kwargs) else: return processutils.execute(*cmd, **kwargs) + + +def synchronized(name, semaphores=None, blocking=False): + def wrap(f): + @six.wraps(f) + def inner(*args, **kwargs): + lock_str = 'masakarimonitors-%s' % name + int_lock = lockutils.internal_lock(lock_str, + semaphores=semaphores) + msg = "Lock blocking: %s on resource %s " % (lock_str, f.__name__) + """Acquiring lock: %(lock_str)s on resource """ + if not int_lock.acquire(blocking=blocking): + raise Exception(msg) + try: + return f(*args, **kwargs) + finally: + """Releasing lock: %(lock_str)s on resource """ + int_lock.release() + return inner + return wrap diff --git a/releasenotes/notes/introspectiveinstancemonitor-f4bc71f029b61d49.yaml b/releasenotes/notes/introspectiveinstancemonitor-f4bc71f029b61d49.yaml new file mode 100644 index 0000000..0c6f8c9 --- /dev/null +++ b/releasenotes/notes/introspectiveinstancemonitor-f4bc71f029b61d49.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + Added Introspective Instance Monitoring through QEMU Guest Agent, + in order to detect, report and optionally recover VMs from internal + VM faults. diff --git a/requirements.txt b/requirements.txt index dcdda8f..061cc68 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,10 +2,13 @@ # of appearance. Changing the order has an impact on the overall integration # process, which may cause wedges in the gate later. +automaton>=1.9.0 # Apache-2.0 libvirt-python!=4.1.0,>=3.5.0 # LGPLv2+ openstacksdk>=0.13.0 # Apache-2.0 oslo.concurrency>=3.26.0 # Apache-2.0 oslo.config>=5.2.0 # Apache-2.0 +lxml!=3.7.0,>=3.4.1 # BSD +oslo.cache>=1.26.0 # Apache-2.0 oslo.i18n>=3.15.3 # Apache-2.0 oslo.log>=3.36.0 # Apache-2.0 oslo.middleware>=3.31.0 # Apache-2.0 diff --git a/setup.cfg b/setup.cfg index 188d472..f0a19db 100644 --- a/setup.cfg +++ b/setup.cfg @@ -31,11 +31,13 @@ oslo.config.opts = masakarimonitors.conf = masakarimonitors.conf.opts:list_opts oslo.config.opts.defaults = + masakarimonitors.introspectiveinstancemonitor = masakarimonitors.common.config:set_middleware_defaults masakarimonitors.instancemonitor = masakarimonitors.common.config:set_middleware_defaults masakarimonitors.processmonitor = masakarimonitors.common.config:set_middleware_defaults masakarimonitors.hostmonitor = masakarimonitors.common.config:set_middleware_defaults console_scripts = + masakari-introspectiveinstancemonitor = masakarimonitors.cmd.introspectiveinstancemonitor:main masakari-instancemonitor = masakarimonitors.cmd.instancemonitor:main masakari-processmonitor = masakarimonitors.cmd.processmonitor:main masakari-hostmonitor = masakarimonitors.cmd.hostmonitor:main