Introspective Instance Monitoring through QEMU Guest Agent

https://blueprints.launchpad.net/masakari/+spec/introspective-instance-monitoring

Currently, Masakari instance monitoring is strictly black-box
type monitoring through qemu and libvirt. There are however a number of
internal instance/VM faults, that if monitored and detected by Masakari,
could be recovered by existing Masakari auto-recovery mechanisms; increasing
the overall availability of the instance/VM. This blueprint introduces the
capability of performing introspective instance monitoring of VMs, in order
to detect, report and optionally recover VMs from internal VM faults.
Specifically, VM Heartbeating Monitoring via the QEMU Guest Agent.

Change-Id: I9efc6afc8d476003d3aa7fee8c31bcaa65438674
Implements: blueprint introspective-instance-monitoring
This commit is contained in:
Louie KWAN 2018-01-17 19:36:04 +00:00 committed by Louie Kwan
parent 26d558333d
commit a680e323b8
17 changed files with 1213 additions and 0 deletions

View File

@ -0,0 +1,38 @@
# Copyright(c) 2018 WindRiver Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Starter script for Masakari Introspective Instance Monitor."""
import sys
from oslo_log import log as logging
import masakarimonitors.conf
from masakarimonitors import config
from masakarimonitors import service
from masakarimonitors import utils
CONF = masakarimonitors.conf.CONF
def main():
config.parse_args(sys.argv)
logging.setup(CONF, "masakarimonitors")
utils.monkey_patch()
server = service.Service.create(
binary='masakarimonitors-introspectiveinstancemonitor')
service.serve(server)
service.wait()

View File

@ -17,6 +17,7 @@ from masakarimonitors.conf import api
from masakarimonitors.conf import base
from masakarimonitors.conf import host
from masakarimonitors.conf import instance
from masakarimonitors.conf import introspectiveinstancemonitor
from masakarimonitors.conf import process
from masakarimonitors.conf import service
@ -26,5 +27,6 @@ api.register_opts(CONF)
base.register_opts(CONF)
host.register_opts(CONF)
instance.register_opts(CONF)
introspectiveinstancemonitor.register_opts(CONF)
process.register_opts(CONF)
service.register_opts(CONF)

View File

@ -0,0 +1,67 @@
# Copyright(c) 2018 WindRiver Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
# Note: this string is being used for regex parsing later with re module.
#
# Use Python's raw string notation for regular expressions and
# uses the backslash character ('\') to indicate special
# forms or to allow special characters to be used without invoking
# their special meaning.
SOCK = r'/var/lib/libvirt/qemu/org\.qemu\.guest_agent\..*\.instance-.*\.sock'
monitor_opts = [
cfg.IntOpt('guest_monitoring_interval',
default=10,
help='''
Guest monitoring interval of VM status (in seconds).
* The value should not be too low as there should not be false negative
* for reporting QEMU_GUEST_AGENT failures
* VM needs time to do powering-off.
* guest_monitoring_interval should be greater than
* the time to SHUTDOWN VM gracefully.
* e.g. | 565da9ba-3c0c-4087-83ca | iim1 | ACTIVE | powering-off | Running
'''),
cfg.IntOpt('guest_monitoring_timeout',
default=2,
help='Guest monitoring timeout (in seconds).'),
cfg.IntOpt('guest_monitoring_failure_threshold',
default=3,
help='Failure threshold before sending notification.'),
cfg.StrOpt('qemu_guest_agent_sock_path',
default=SOCK,
help='''
* The file path of qemu guest agent sock.
* Please use Python raw string notation as regular expressions.
e.g. r'/var/lib/libvirt/qemu/org\.qemu\.guest_agent\..*\.instance-.*\.sock'
'''),
cfg.BoolOpt('callback_paused_event',
default=True,
help='''
* True: Callback for VM paused events.
* False: Do not callback for VM paused events.
'''),
]
def register_opts(conf):
conf.register_opts(monitor_opts, group='introspectiveinstancemonitor')
def list_opts():
return {
'introspectiveinstancemonitor': monitor_opts
}

View File

@ -30,6 +30,10 @@ Possible values:
default='masakarimonitors.instancemonitor.instance'
'.InstancemonitorManager',
help='Full class name for the Manager for instancemonitor.'),
cfg.StrOpt('introspectiveinstancemonitor_manager',
default='masakarimonitors.introspectiveinstancemonitor.instance'
'.IntrospectiveInstanceMonitorManager',
help='Full class name for introspectiveinstancemonitor.'),
cfg.StrOpt('processmonitor_manager',
default='masakarimonitors.processmonitor.process'
'.ProcessmonitorManager',

View File

@ -0,0 +1,78 @@
=========================================
masakarimonitors-introspectiveinstancemonitor
=========================================
Introspective instance monitor for Masakari
----------------------------------------
- masakarimonitors-introspectiveinstancemonitor, provides Virtual Machine
High Availability (VMHA) service for OpenStack clouds by automatically
detecting the system-level failure events via QEMU Guest Agent. If it
detects VM heartbeat failure events, it sends notifications to the
masakari-api.
- Based on the QEMU Guest Agent,
masakarimonitors-introspectiveinstancemonitor aims to provide access
to a system-level agent via standard qemu-ga protocol
How does it work?
----------------------------------------
- libvirt and QEMU Guest Agent are used as the underlying protocol for
messaging to and from VM.
- The host-side qemu-agent sockets are used to detemine whether VMs are
configured with QEMU Guest Agent.
- qemu-guest-ping is used as the monitoring heartbeat.
- For the future release, we can pass through arbitrary guest agent commands
to check the health of the applications inside a VM.
QEMU Guest Agent Installation notes
----------------------------------------
- Set image property: hw_qemu_guest_agent=yes.
- This tells NOVA to setup the virtual serial interface thru QEMU to VM
- e.g.
$ openstack image create --public --disk-format qcow2 --container-format
bare --file ~ubuntu/xenial-server-cloudimg-amd64-disk1.img --public
--property hw_qemu_guest_agent=yes xenial-server-cloudimg
* Inside VM::
$ sudo apt-get install qemu-guest-agent
$ sudo systemctl start qemu-guest-agent
$ ubuntu@test:~$ ps -ef | fgrep qemu
$ ... /usr/sbin/qemu-ga --daemonize -m virtio-serial -p /dev/virtio-ports/org.qemu.guest_agent.0
$ ubuntu@test:~$ ls /dev/virtio-ports/
$ org.qemu.guest_agent.0
Configure masakarimonitors-introspectiveinstancemonitor
----------------------------------------------
#. Clone masakari using::
$ git clone https://github.com/openstack/masakari-monitors.git
#. Create masakarimonitors directory in /etc/.
#. Run setup.py from masakari-monitors::
$ sudo python setup.py install
#. Copy masakarimonitors.conf and process_list.yaml files from
masakari-monitors/etc/ to /etc/masakarimonitors folder and make necessary
changes to the masakarimonitors.conf and process_list.yaml files.
To generate the sample masakarimonitors.conf file, run the following
command from the top level of the masakari-monitors directory::
$ tox -egenconfig
#. To run masakari-introspectiveinstancemonitor simply use following binary::
$ masakari-introspectiveinstancemonitor

View File

@ -0,0 +1,123 @@
# Copyright (c) 2018 WindRiver Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
The code related to integration between oslo.cache module and masakarimonitors.
Use 'oslo_cache.dict'
i.e. dogpile.cache backend that uses dictionary for storage
Dogpile consists of two subsystems, one building on top of the other.
dogpile provides the concept of a dogpile lock, a control structure
which allows a single thread of execution to be selected as the creator
of some resource, while allowing other threads of execution to refer
to the previous version of this resource as the creation proceeds;
if there is no previous version, then those threads block until
the object is available.
"""
from oslo_cache import core
from oslo_config import cfg
from masakarimonitors.i18n import _
WEEK = 604800
def register_cache_configurations(conf):
"""Register all configurations required for oslo.cache.
The procedure registers all configurations required for oslo.cache.
It should be called before configuring of cache region
:param conf: instance of configuration
:returns: updated configuration
"""
# register global configurations for caching
core.configure(conf)
# register specific configurations
constraint_cache_group = cfg.OptGroup('constraint_validation_cache')
constraint_cache_opts = [
cfg.IntOpt('expiration_time', default=WEEK,
help=_(
'TTL, in seconds, for any cached item in the '
'dogpile.cache region used for caching of validation '
'constraints.')),
cfg.BoolOpt("caching", default=True,
help=_(
'Toggle to enable/disable caching when Orchestration '
'Engine validates property constraints of stack.'
'During property validation with constraints '
'Orchestration Engine caches requests to other '
'OpenStack services. Please note that the global '
'toggle for oslo.cache(enabled=True in [cache] group) '
'must be enabled to use this feature.'))
]
conf.register_group(constraint_cache_group)
conf.register_opts(constraint_cache_opts, group=constraint_cache_group)
extension_cache_group = cfg.OptGroup('service_extension_cache')
extension_cache_opts = [
cfg.IntOpt('expiration_time', default=WEEK,
help=_(
'TTL, in seconds, for any cached item in the '
'dogpile.cache region used for caching of service '
'extensions.')),
cfg.BoolOpt('caching', default=True,
help=_(
'Toggle to enable/disable caching when Orchestration '
'Engine retrieves extensions from other OpenStack '
'services. Please note that the global toggle for '
'oslo.cache(enabled=True in [cache] group) must be '
'enabled to use this feature.'))
]
conf.register_group(extension_cache_group)
conf.register_opts(extension_cache_opts, group=extension_cache_group)
find_cache_group = cfg.OptGroup('resource_finder_cache')
find_cache_opts = [
cfg.IntOpt('expiration_time', default=WEEK,
help=_(
'TTL, in seconds, for any cached item in the '
'dogpile.cache region used for caching of OpenStack '
'service finder functions.')),
cfg.BoolOpt('caching', default=True,
help=_(
'Toggle to enable/disable caching when Orchestration '
'Engine looks for other OpenStack service resources '
'using name or id. Please note that the global '
'toggle for oslo.cache(enabled=True in [cache] group) '
'must be enabled to use this feature.'))
]
conf.register_group(find_cache_group)
conf.register_opts(find_cache_opts, group=find_cache_group)
return conf
# variable that stores an initialized cache region
_REGION = None
def get_cache_region():
global _REGION
if not _REGION:
_REGION = core.create_region()
_REGION.configure('oslo_cache.dict',
arguments={'expiration_time': WEEK})
core.configure_cache_region(
conf=register_cache_configurations(cfg.CONF),
region=_REGION)
return _REGION

View File

@ -0,0 +1,176 @@
# Copyright (c) 2018 WindRiver Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
import libvirt
import socket
import sys
import threading
import time
from oslo_config import cfg
from oslo_log import log as oslo_logging
from oslo_utils import excutils
from oslo_utils import timeutils
from masakarimonitors.introspectiveinstancemonitor import qemu_utils
from masakarimonitors.introspectiveinstancemonitor import scheduler
from masakarimonitors import manager
from masakarimonitors.objects import event_constants as ec
LOG = oslo_logging.getLogger(__name__)
CONF = cfg.CONF
class IntrospectiveInstanceMonitorManager(manager.Manager):
def __init__(self, *args, **kwargs):
self.init_tgm()
super(IntrospectiveInstanceMonitorManager, self).__init__(
service_name="introspectiveinstancemonitor", *args, **kwargs)
# This keeps track of what thread is running the event loop,
# (if it is run in a background thread)
self.event_loop_thread = None
def _reset_journal(self, eventID, eventType, detail, uuID):
"""To reset the monitoring to discovery stage
:param event_id: Event ID
:param event_type: Event type
:param detail: Event code
:param domain_uuid: Uuid
"""
noticeType = ec.EventConstants.TYPE_VM
hostname = socket.gethostname()
currentTime = timeutils.utcnow()
def _reset_handler(event_id, event_type, detail, domain_uuid, msg):
"""Reset monitoring
To reset monitoring to discovery stage for the following event:
libvirt.VIR_DOMAIN_EVENT_STARTED
"""
if (event_id == libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE) and \
(event_type == libvirt.VIR_DOMAIN_EVENT_STARTED):
LOG.debug(msg)
qemu_utils.resetJournal(domain_uuid)
# All Event Output if debug mode is on.
msg = "libvirt Event Received.type = %s \
hostname = %s uuid = %s time = %s eventID = %d eventType = %d \
detail = %d" % (
noticeType,
hostname, uuID, currentTime, eventID,
eventType, detail)
LOG.debug("%s", msg)
try:
thread = threading.Thread(
_reset_handler(eventID, eventType, detail, uuID, msg)
)
thread.start()
except KeyError as e:
LOG.error("virEventFilter KeyError: {0}".format(e))
except IndexError as e:
LOG.error("virEventFilter IndexError: {0}".format(e))
except TypeError as e:
LOG.error("virEventFilter TypeError: {0}".format(e))
except Exception:
with excutils.save_and_reraise_exception():
LOG.error("Unexpected error: %s" % sys.exc_info()[0])
def init_tgm(self):
"""Manages the masakari-introspectiveinstancemonitor."""
self.TG = scheduler.ThreadGroupManager()
def _vir_event_loop_native_run(self):
# Directly run the event loop in the current thread
while True:
libvirt.virEventRunDefaultImpl()
def _vir_event_loop_native_start(self):
libvirt.virEventRegisterDefaultImpl()
self.event_loop_thread = threading.Thread(
target=self._vir_event_loop_native_run,
name="lib_virt_eventLoop")
self.event_loop_thread.setDaemon(True)
self.event_loop_thread.start()
def _my_domain_event_callback(self, conn, dom, event, detail, opaque):
self._reset_journal(libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE,
event, detail, dom.UUIDString())
def _my_domain_event_reboot_callback(self, conn, dom, opaque):
self._reset_journal(libvirt.VIR_DOMAIN_EVENT_ID_REBOOT,
-1, -1, dom.UUIDString())
def _err_handler(self, ctxt, err):
LOG.warning("Error from libvirt : %s", err[2])
def _virt_event(self, uri):
# Run a background thread with the event loop
self._vir_event_loop_native_start()
event_callback_handlers = {
libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE:
self._my_domain_event_callback,
libvirt.VIR_DOMAIN_EVENT_ID_REBOOT:
self._my_domain_event_reboot_callback
}
# Connect to libvirt - If be disconnected, reprocess.
self.running = True
while self.running:
vc = libvirt.openReadOnly(uri)
# Event callback settings
callback_ids = []
for event, callback in event_callback_handlers.items():
cid = vc.domainEventRegisterAny(None, event, callback, None)
callback_ids.append(cid)
# Connection monitoring.
vc.setKeepAlive(5, 3)
while vc.isAlive() == 1 and self.running:
eventlet.greenthread.sleep(1)
# If connection between libvirtd was lost,
# clear callback connection.
LOG.warning("Libvirt Connection Closed Unexpectedly.")
for cid in callback_ids:
try:
vc.domainEventDeregisterAny(cid)
except Exception:
pass
vc.close()
del vc
time.sleep(3)
def stop(self):
self.running = False
def main(self):
"""Main method.
Set the URI, error handler, and executes event loop processing.
"""
uri = CONF.libvirt.connection_uri
LOG.debug("Using uri:" + uri)
# set error handler & do event loop
libvirt.registerErrorHandler(self._err_handler, '_virt_event')
self._virt_event(uri)

View File

@ -0,0 +1,429 @@
# Copyright (c) 2018 WindRiver Systems
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Introspective instance monitoring depends on the qemu guest agent to
# monitoring what inside of a VM.
#
# A few items to get in the way of the design:
#
# - After VM is active, it needs time to start qemu-guest-agent.
# Before error/failure is reported, we need a discovery phase to wait
# until VM is guest_pingable.
#
# - Debouncing is needed to enforce that masakari_notifier function
# not calling twice or more for the same failure.
#
# After reported a masakari notification, sent_notification flag will
# need to be reset when there is a QEMU LIFECYCLE event like:
# STARTED_BOOTED
# SUSPENDED_PAUSED
import eventlet
import libvirt
import libvirtmod_qemu
import logging
import re
import socket
import time
import traceback
# Machine module contains the state machine logic, state and transition events
from automaton import machines
from dogpile.cache.api import NoValue
from lxml import etree
from oslo_config import cfg
from oslo_utils import timeutils
from masakarimonitors.ha import masakari
from masakarimonitors.introspectiveinstancemonitor import cache
from masakarimonitors.objects import event_constants as ec
from masakarimonitors import utils
CONF = cfg.CONF
ICONF = cfg.CONF.introspectiveinstancemonitor
# The VM QEMU Quest Agent states
#
# discovery = initial state of VM,
# remains in this state until it is determined that
# the VM has a qemu-agent interface enabling intrusive-instance-monitoring
#
# healthy = no failure event is detected
#
# error = An error is recorded on every audit cycle where
# the VM is in the error state
#
# reported = a transient state
# to keep track of reporting of notification
#
# Transitions
#
# Representation of a transition managed by a ``Machine`` instance.
# source (str): Source state of the transition.
# dest (str): Destination state of the transition.
# trigger (str): The type of triggering event
# that advances to the next state in the sequence.
def action_on_enter(new_state, triggered_event):
pass
def action_on_exit(old_state, triggered_event):
pass
STATE_SPACE = [
{
'name': 'discovery',
'next_states': {
'guest_pingable': 'healthy',
'guest_not_pingable': 'discovery',
},
'on_enter': action_on_enter,
'on_exit': action_on_exit,
},
{
'name': 'healthy',
'next_states': {
'guest_pingable': 'healthy',
'guest_not_pingable': 'error',
},
'on_enter': action_on_enter,
'on_exit': action_on_exit,
},
{
'name': 'error',
'next_states': {
'report': 'reported',
'guest_pingable': 'healthy',
'guest_not_pingable': 'error',
},
'on_enter': action_on_enter,
'on_exit': action_on_exit,
},
{
'name': 'reported',
'next_states': {
'guest_pingable': 'discovery',
'guest_not_pingable': 'reported',
},
'on_enter': action_on_enter,
'on_exit': action_on_exit,
},
]
# Journal representation of a managed ``Machine`` instance.
class Journal(machines.FiniteMachine):
# Factory pattern to create a Journal object
@classmethod
def factory(cls, domain_uuid):
jo = cls.build(STATE_SPACE)
jo.default_start_state = 'discovery'
jo.initialize()
jo.failedCount = 0
# Conditions to reset sent_notification
jo.sent_notification = False
jo.domain_uuid = domain_uuid
jo.lastUsed = time.time()
LOG.debug(str(domain_uuid) + ':Journal:__init__:')
return jo
def resetState(self):
self.default_start_state = 'discovery'
self.initialize()
self.failedCount = 0
self.sent_notification = False
LOG.debug(str(self.domain_uuid) + '__resetState__:')
def processEvent(self, event):
self.process_event(event)
self.lastUsed = time.time()
def getFailedCount(self):
return self.failedCount
def incrementFailedCount(self):
if (self.current_state == 'error'):
self.failedCount += 1
def resetFailedCount(self):
self.failedCount = 0
def setSentNotification(self, boolean):
self.sent_notification = boolean
def getSentNotification(self):
return self.sent_notification
# libvirt state verbose dictionary
STATES = {
libvirt.VIR_DOMAIN_NOSTATE: 'no state',
libvirt.VIR_DOMAIN_RUNNING: 'running',
libvirt.VIR_DOMAIN_BLOCKED: 'blocked on resource',
libvirt.VIR_DOMAIN_PAUSED: 'paused by user',
libvirt.VIR_DOMAIN_SHUTDOWN: 'being shut down',
libvirt.VIR_DOMAIN_SHUTOFF: 'shut off',
libvirt.VIR_DOMAIN_CRASHED: 'crashed',
}
LOG = logging.getLogger(__name__)
def get_function_name():
return traceback.extract_stack(None, 2)[0][2]
# To reset journal object by domain uuid
def resetJournal(domain_uuid):
# To get the VM Journal object from the dictionary
# :param domain: QEMU domain UUID
dict = cache.get_cache_region()
jo = None
if type(dict.get(domain_uuid)) is NoValue:
jo = Journal.factory(domain_uuid)
dict.set(domain_uuid, jo)
else:
jo = dict.get(domain_uuid)
jo.resetState()
# Qemu guest agent is used to check VM status
# The checking pre-conditions are as follows:
# - VM is running
# - VM has Qemu guest agent installed
#
# then status is determined by
# - VM is guest-agent-pingable or not
#
# Note: checkGuests function is called by the scheduler
class QemuGuestAgent(object):
def __init__(self):
super(QemuGuestAgent, self).__init__()
self.notifier = masakari.SendNotification()
# _thresholdsCrossing
#
# We only issue a notification
# to masakari-engine if the VM is 'already' in the error state
# when there are consecutive guest_ping failures.
# Suggested value for guest_monitoring_failure_threshold >= 3
#
# Note: When operators are trying to gracefully shutdown VM,
# QEMU may take time to powering-off.
# E.g. When you do <nova list> or <openstack server show>
# you may see that QEMU is active but monitoring may fail
# due to VM is still "powering-off"
# Status | Task State | Power State
# ACTIVE | powering-off | Running
def _thresholdsCrossing(self, domain):
if (((self.getVmFsm(domain.UUIDString()).current_state) == 'error')
and
(self._getJournalObject(domain.UUIDString()).getFailedCount()
> ICONF.guest_monitoring_failure_threshold)):
LOG.debug('_thresholdsCrossing:' + domain.UUIDString())
LOG.debug(self._getJournalObject(
domain.UUIDString()).getFailedCount())
return True
else:
return False
def _masakari_notifier(self, domain_uuid):
if self._getJournalObject(domain_uuid).getSentNotification():
LOG.debug('notifier.send_notification Skipped:' + domain_uuid)
else:
hostname = socket.gethostname()
noticeType = ec.EventConstants.TYPE_VM
current_time = timeutils.utcnow()
event = {
'notification': {
'type': noticeType,
'hostname': hostname,
'generated_time': current_time,
'payload': {
'event': 'QEMU_GUEST_AGENT_ERROR',
'instance_uuid': domain_uuid,
'vir_domain_event': 'STOPPED_FAILED'
}
}
}
try:
self.notifier.send_notification(CONF.callback.retry_max,
CONF.callback.retry_interval,
event)
self._getJournalObject(domain_uuid).processEvent('report')
self._getJournalObject(domain_uuid).setSentNotification(True)
except Exception:
LOG.warn('Exception :' + domain_uuid +
' @ ' + get_function_name())
pass
def _qemuAgentGuestPing(self, domain, timeout, flags=0):
def _no_heartbeat(domain_uuid):
# Also advance the FSM
self.getVmFsm(domain_uuid).processEvent('guest_not_pingable')
self._getJournalObject(domain_uuid).incrementFailedCount()
def _with_heartbeat(domain_uuid):
#The order matters as we want to decrease the counter first
self._getJournalObject(domain_uuid).resetFailedCount()
self.getVmFsm(domain_uuid).processEvent('guest_pingable')
def _record(result):
if result is None:
LOG.debug(domain.UUIDString()
+ '\tqemu-ga_guest-ping is not responding.')
if self._thresholdsCrossing(domain):
self._masakari_notifier(domain.UUIDString())
_no_heartbeat(domain.UUIDString())
else:
_with_heartbeat(domain.UUIDString())
"""Send a Guest Agent ping to domain """
# must pass domain._o to the c library as virDomainPtr
ret = libvirtmod_qemu.virDomainQemuAgentCommand(domain._o,
'{"execute": "guest-ping"}', timeout, flags)
_record(ret)
def _getJournalObject(self, domain_uuid):
"""Function: To get the dictionary
:param domain: QEMU domain
:return: the journal object referred by domain_uuid
"""
dict = cache.get_cache_region()
if type(dict.get(domain_uuid)) is NoValue:
jo = Journal.factory(domain_uuid)
dict.set(domain_uuid, jo)
return jo
else:
return dict.get(domain_uuid)
def getVmFsm(self, domain_uuid):
"""Function: To get the VM Finite State Machine from
the dictionary
:param domain: QEMU domain
:return: FSM object
"""
dict = cache.get_cache_region()
if type(dict.get(domain_uuid)) is NoValue:
jo = Journal.factory(domain_uuid)
dict.set(domain_uuid, jo)
return jo
else:
return dict.get(domain_uuid)
def _hasQemuGuestAgent(self, domain):
"""Function: To check whether the VM has an QEMU Guest Agent
by examining the qemu.guest_agent sock
First check if libvirt is running or not, then sock
:param domain: QEMU domain
:return: true or false
"""
def qemuGuestAgentPathMatch(path):
SOCK = ICONF.qemu_guest_agent_sock_path
return re.match('%s' % SOCK, path)
state, reason = domain.state()
# First check if libvirt is running or not
if state != libvirt.VIR_DOMAIN_RUNNING:
return False
xmlDesc = domain.XMLDesc()
tree = etree.fromstring(xmlDesc)
''' Example
<channel type='unix'>
<source mode='bind' path=
'/var/lib/libvirt/qemu/org.qemu.guest_agent.0.instance-00000004.sock'/>
<target type=
'virtio' name='org.qemu.guest_agent.0' state='connected'/>
<alias name='channel0'/>
<address type='virtio-serial' controller='0' bus='0' port='1'/>
</channel>
'''
try:
source = tree.find("devices/channel/source")
if (source is not None):
mode = source.get('mode')
path = source.get('path')
# There should be a bind for a sock file for qemu guest_agent
if (qemuGuestAgentPathMatch(path) and mode == 'bind'):
return True
except Exception:
pass
return False
def checkGuests(self):
"""Function: Check QEMU Guests
Condition: VM under intrusive monitoring must have QEMU agent client
configured, installed and qemu "guest-agent-pingable".
"""
try:
conn = libvirt.open(None) # LIBVIRT_DEFAULT_URI
ids = conn.listDomainsID()
running = map(conn.lookupByID, ids)
columns = 3
for row in map(None, *[iter(running)] * columns):
for domain in row:
if domain:
try:
if self._hasQemuGuestAgent(domain):
@utils.synchronized(domain.UUIDString())
def do_qemuAgentGuestPing(domain, timeout):
self._qemuAgentGuestPing(domain, timeout)
do_qemuAgentGuestPing(domain,
ICONF.guest_monitoring_timeout)
except libvirt.libvirtError as le:
LOG.warn(le)
continue
except Exception as e:
LOG.warn(e)
pass
def reschedule(action, sleep_time=1):
"""Eventlet Sleep for the specified number of seconds.
:param sleep_time: seconds to sleep; if None, no sleep;
"""
LOG.debug('At reschedule')
if sleep_time is not None:
LOG.debug('Action %s sleep for %s seconds' % (
action.id, sleep_time))
eventlet.sleep(sleep_time)
def sleep(sleep_time):
"""Interface for sleeping."""
eventlet.sleep(sleep_time)

View File

@ -0,0 +1,110 @@
# Copyright (c) 2018 WindRiver Systems
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import logging
from oslo_config import cfg
from oslo_service import threadgroup
from masakarimonitors.introspectiveinstancemonitor import qemu_utils
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class ThreadGroupManager(object):
"""Thread group manager."""
def init_qemu_ga(self):
self.qemuGA = qemu_utils.QemuGuestAgent()
LOG.debug('Started QemuGuestAgent')
def __init__(self):
self.init_qemu_ga()
super(ThreadGroupManager, self).__init__()
self.threads = {}
self.group = threadgroup.ThreadGroup()
# Create dummy service task, because when there is nothing queued
# on self.tg the process exits
self.add_timer(
CONF.introspectiveinstancemonitor.guest_monitoring_interval,
self._service_task)
def _service_task(self):
# This is used to trigger periodic monitoring tasks
self.qemuGA.checkGuests()
def start(self, func, *args, **kwargs):
"""Run the given method in a sub-thread."""
LOG.debug('add_thread')
return self.group.add_thread(func, *args, **kwargs)
def add_timer(self, interval, func, *args, **kwargs):
"""Define a periodic task to be run in the thread group.
The task will be executed in a separate green thread.
Interval is from cfg.CONF.periodic_interval
"""
LOG.debug('group.add_timer')
self.group.add_timer(interval, func, *args, **kwargs)
def stop_timers(self):
self.group.stop_timers()
def stop(self, graceful=False):
"""Stop any active threads belong to this threadgroup."""
# Try to stop all threads gracefully
self.group.stop(graceful)
self.group.wait()
# Wait for link()ed functions (i.e. lock release)
threads = self.group.threads[:]
links_done = dict((th, False) for th in threads)
def mark_done(gt, th):
links_done[th] = True
for th in threads:
th.link(mark_done, th)
while not all(links_done.values()):
eventlet.sleep()
def reschedule(action, sleep_time=1):
"""Eventlet Sleep for the specified number of seconds.
:param sleep_time: seconds to sleep; if None, no sleep;
"""
LOG.debug('At reschedule')
if sleep_time is not None:
LOG.debug('Action %s sleep for %s seconds' % (
action.id, sleep_time))
eventlet.sleep(sleep_time)
def sleep(sleep_time):
"""Interface for sleeping."""
LOG.debug('sleep')
eventlet.sleep(sleep_time)

View File

@ -0,0 +1,42 @@
# Copyright(c) 2018 WindRiver Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
import libvirt
import mock
import testtools
from masakarimonitors.introspectiveinstancemonitor import instance
eventlet.monkey_patch(os=False)
class TestMonitorManager(testtools.TestCase):
def setUp(self):
super(TestMonitorManager, self).setUp()
@mock.patch.object(libvirt, 'virEventRunDefaultImpl')
def test_vir_event_loop_native_run(self, mock_virEventRunDefaultImpl):
mock_virEventRunDefaultImpl.side_effect = Exception("Test exception.")
obj = instance.IntrospectiveInstanceMonitorManager()
exception_flag = False
try:
obj._vir_event_loop_native_run()
except Exception:
exception_flag = True
self.assertTrue(exception_flag)
mock_virEventRunDefaultImpl.assert_called_once()

View File

@ -0,0 +1,112 @@
# Copyright(c) 2018 Nippon Telegraph and Telephone Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import libvirt
import mock
import testtools
import uuid
from masakarimonitors.introspectiveinstancemonitor import instance as object
from masakarimonitors.introspectiveinstancemonitor import qemu_utils
class TestQemuUtils(testtools.TestCase):
def setup(self):
super(TestQemuUtils, self).setUp()
@mock.patch.object(qemu_utils.libvirt, 'virDomain')
def test_getVmFsm(self, mock_domain):
"""To test the state machines
Initial stage should be dicovery and will advance to a
healthy stage after enough pingable events. Also,
it should reach an error stage from healthy after
a pingable event failure.
"""
reference = qemu_utils.QemuGuestAgent()
mock_domain.UUID.return_value = uuid.uuid4()
reference.getVmFsm(mock_domain)
self.assertEqual(reference.getVmFsm(mock_domain).current_state,
'discovery')
reference.getVmFsm(mock_domain).process_event('guest_not_pingable')
self.assertEqual(reference.getVmFsm(mock_domain).current_state,
'discovery')
reference.getVmFsm(mock_domain).process_event('guest_pingable')
self.assertEqual(reference.getVmFsm(mock_domain).current_state,
'healthy')
reference.getVmFsm(mock_domain).process_event('guest_pingable')
self.assertEqual(reference.getVmFsm(mock_domain).current_state,
'healthy')
reference.getVmFsm(mock_domain).process_event('guest_not_pingable')
self.assertEqual(reference.getVmFsm(mock_domain).current_state,
'error')
@mock.patch.object(qemu_utils.libvirt, 'virDomain')
def test_hasQemuGuestAgent(self, mock_domain):
mock_domain.UUID.return_value = 'testuuid'
mock_domain.state.return_value = libvirt.VIR_DOMAIN_RUNNING, 'reason'
mock_domain.XMLDesc.return_value = """<domain>
<devices>
<interface type="network">
<target dev="vnet0"/>
</interface>
<interface type="network">
<target dev="vnet1"/>
</interface>
</devices>
</domain>"""
obj = qemu_utils.QemuGuestAgent()
self.assertFalse(obj._hasQemuGuestAgent(mock_domain))
mock_domain.XMLDesc.return_value = """<domain>
<devices>
<interface type="network">
<target dev="vnet0"/>
</interface>
<interface type="network">
<target dev="vnet1"/>
</interface>
<channel type='unix'>
<source
mode='bind'
path=
'/var/lib/libvirt/qemu/org.qemu.guest_agent.0.instance-00000003.sock'/>
<target
type='virtio'
name='org.qemu.guest_agent.0' state='disconnected'/>
<alias name='channel0'/>
<address type='virtio-serial' controller='0' bus='0' port='1'/>
</channel>
</devices>
</domain>"""
obj = qemu_utils.QemuGuestAgent()
self.assertTrue(obj._hasQemuGuestAgent(mock_domain))
@mock.patch.object(qemu_utils, 'resetJournal')
def test_resetJournal(self, mock_resetJournal):
mock_resetJournal.return_value = None
obj = object.IntrospectiveInstanceMonitorManager()
event_id = 0
domain_uuid = uuid.uuid4()
event_type = libvirt.VIR_DOMAIN_EVENT_STARTED
detail = libvirt.VIR_DOMAIN_EVENT_STARTED_BOOTED
obj._reset_journal(event_id, event_type, detail, domain_uuid)
mock_resetJournal.assert_called_once_with(domain_uuid)

View File

@ -21,6 +21,7 @@ import shutil
import sys
import tempfile
from oslo_concurrency import lockutils
from oslo_concurrency import processutils
from oslo_log import log as logging
from oslo_utils import importutils
@ -111,3 +112,23 @@ def execute(*cmd, **kwargs):
return privsep_execute(*cmd, **kwargs)
else:
return processutils.execute(*cmd, **kwargs)
def synchronized(name, semaphores=None, blocking=False):
def wrap(f):
@six.wraps(f)
def inner(*args, **kwargs):
lock_str = 'masakarimonitors-%s' % name
int_lock = lockutils.internal_lock(lock_str,
semaphores=semaphores)
msg = "Lock blocking: %s on resource %s " % (lock_str, f.__name__)
"""Acquiring lock: %(lock_str)s on resource """
if not int_lock.acquire(blocking=blocking):
raise Exception(msg)
try:
return f(*args, **kwargs)
finally:
"""Releasing lock: %(lock_str)s on resource """
int_lock.release()
return inner
return wrap

View File

@ -0,0 +1,6 @@
---
features:
- |
Added Introspective Instance Monitoring through QEMU Guest Agent,
in order to detect, report and optionally recover VMs from internal
VM faults.

View File

@ -2,10 +2,13 @@
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
automaton>=1.9.0 # Apache-2.0
libvirt-python!=4.1.0,>=3.5.0 # LGPLv2+
openstacksdk>=0.13.0 # Apache-2.0
oslo.concurrency>=3.26.0 # Apache-2.0
oslo.config>=5.2.0 # Apache-2.0
lxml!=3.7.0,>=3.4.1 # BSD
oslo.cache>=1.26.0 # Apache-2.0
oslo.i18n>=3.15.3 # Apache-2.0
oslo.log>=3.36.0 # Apache-2.0
oslo.middleware>=3.31.0 # Apache-2.0

View File

@ -31,11 +31,13 @@ oslo.config.opts =
masakarimonitors.conf = masakarimonitors.conf.opts:list_opts
oslo.config.opts.defaults =
masakarimonitors.introspectiveinstancemonitor = masakarimonitors.common.config:set_middleware_defaults
masakarimonitors.instancemonitor = masakarimonitors.common.config:set_middleware_defaults
masakarimonitors.processmonitor = masakarimonitors.common.config:set_middleware_defaults
masakarimonitors.hostmonitor = masakarimonitors.common.config:set_middleware_defaults
console_scripts =
masakari-introspectiveinstancemonitor = masakarimonitors.cmd.introspectiveinstancemonitor:main
masakari-instancemonitor = masakarimonitors.cmd.instancemonitor:main
masakari-processmonitor = masakarimonitors.cmd.processmonitor:main
masakari-hostmonitor = masakarimonitors.cmd.hostmonitor:main