Merge "host monitor by consul"

This commit is contained in:
Zuul 2022-03-03 11:26:36 +00:00 committed by Gerrit Code Review
commit a89511e088
16 changed files with 995 additions and 4 deletions

129
doc/source/consul-usage.rst Normal file
View File

@ -0,0 +1,129 @@
=============
Consul Usage
=============
Consul overview
================
Consul is a service mesh solution providing a full featured control plane
with service discovery, configuration, and segmentation functionality.
Each of these features can be used individually as needed, or they can be
used together to build a full service mesh.
The Consul agent is the core process of Consul. The Consul agent maintains
membership information, registers services, runs checks, responds to queries,
and more.
Consul clients can provide any number of health checks, either associated
with a given service or with the local node. This information can be used
by an operator to monitor cluster health.
Please refer to `Consul Agent Overview <https://www.consul.io/docs/agent>`_.
Test Environment
================
There are three controller nodes and two compute nodes in the test environment.
Every node has three network interfaces. The first interface is used for
management, with an ip such as '192.168.101.*'. The second interface is used
to connect to storage, with an ip such as '192.168.102.*'. The third interface
is used for tenant, with an ip such as '192.168.103.*'.
Download Consul
================
Download Consul package for CentOS. Other OS please refer to `Download Consul
<https://www.consul.io/downloads>`_.
.. code-block:: console
sudo yum install -y yum-utils
sudo yum-config-manager --add-repo https://rpm.releases.hashicorp.com/RHEL/hashicorp.repo
sudo yum -y install Consul
Configure Consul agent
======================
Consul agent must runs on every node. Consul server agent runs on controller
nodes, while Consul client agent runs on compute nodes, which makes up one
Consul cluster.
The following is an example of a config file for Consul server agent which
binds to management interface of the host.
management.json
.. code-block:: ini
{
"bind_addr": "192.168.101.1",
"datacenter": "management",
"data_dir": "/tmp/consul_m",
"log_level": "INFO",
"server": true,
"bootstrap_expect": 3,
"node_name": "node01",
"addresses": {
"http": "192.168.101.1"
},
"ports": {
"http": 8500,
"serf_lan": 8501
},
"retry_join": ["192.168.101.1:8501", "192.168.101.2:8501", "192.168.101.3:8501"]
}
The following is an example of a config file for Consul client agent which
binds to management interface of the host.
management.json
.. code-block:: ini
{
"bind_addr": "192.168.101.4",
"datacenter": "management",
"data_dir": "/tmp/consul_m",
"log_level": "INFO",
"node_name": "node04",
"addresses": {
"http": "192.168.101.4"
},
"ports": {
"http": 8500,
"serf_lan": 8501
},
"retry_join": ["192.168.101.1:8501", "192.168.101.2:8501", "192.168.101.3:8501"]
}
Use the tenant or storage interface ip and ports when config agent in tenant
or storage datacenter.
Please refer to `Consul Agent Configuration <https://www.consul.io/docs/agent/options#command-line-options>`_.
Start Consul agent
==================
The Consul agent is started by the following command.
.. code-block:: console
# Consul agent config-file management.json
Test Consul installation
========================
After all Consul agents installed and started,
you can see all nodes in the cluster by the following command.
.. code-block:: console
# Consul members -http-addr=192.168.101.1:8500
Node Address Status Type Build Protocol DC
node01 192.168.101.1:8501 alive server 1.10.2 2 management
node02 192.168.101.2:8501 alive server 1.10.2 2 management
node03 192.168.101.3:8501 alive server 1.10.2 2 management
node04 192.168.101.4:8501 alive client 1.10.2 2 management
node05 192.168.101.5:8501 alive client 1.10.2 2 management

View File

@ -6,11 +6,11 @@ Monitor Overview
------------------
The masakari-hostmonitor provides compute node High Availability
for OpenStack clouds by automatically detecting compute nodes failure
via pacemaker & corosync.
via monitor driver.
How does it work?
----------------------------------------
How does it work based on pacemaker & corosync?
------------------------------------------------
- Pacemaker or pacemaker-remote is required to install into compute nodes
to form a pacemaker cluster.
@ -19,10 +19,30 @@ How does it work?
in other nodes will detect the failure and send notifications to masakari-api.
How does it work based on consul?
----------------------------------
- If the nodes in the cloud have multiple interfaces to connect to
management network, tenant network or storage network, monitor driver based
on consul is another choice. Consul agents are required to install into all
noedes, which make up multiple consul clusters.
Here is an example to show how to make up one consul cluster.
.. toctree::
:maxdepth: 1
consul-usage
- The compute node's status is depending on assembly of multiple interfaces
connectivity status, which are retrieved from multiple consul clusters. Then
it sends notifition to trigger host failure recovery according to defined
HA strategy - host states and the corresponding actions.
Related configurations
------------------------
This section in masakarimonitors.conf shows an example of how to configure
the monitor.
the hostmonitor if you choice monitor driver based on pacemaker.
.. code-block:: ini
@ -77,3 +97,47 @@ the monitor.
# corosync_multicast_interfaces values and must be in correct order with
# relevant interfaces in corosync_multicast_interfaces.
corosync_multicast_ports = 5405,5406
If you want to use or test monitor driver based on consul, please modify
following configration.
.. code-block:: ini
[host]
# Driver that hostmonitor uses for monitoring hosts.
monitoring_driver = consul
[consul]
# Addr for local consul agent in management datacenter.
# The addr is make up of the agent's bind_addr and http port,
# such as '192.168.101.1:8500'.
agent_manage = $(CONSUL_MANAGEMENT_ADDR)
# Addr for local consul agent in tenant datacenter.
agent_tenant = $(CONSUL_TENANT_ADDR)
# Addr for local consul agent in storage datacenter.
agent_storage = $(CONSUL_STORAGE_ADDR)
# Config file for consul health action matrix.
matrix_config_file = /etc/masakarimonitors/matrix.yaml
The ``matrix_config_file`` shows the HA strategy. Matrix is combined by host
health and actions. The 'health: [x, x, x]', repreasents assembly status of
SEQUENCE. Action, means which actions it will trigger if host health turns
into, while 'recovery' means it will trigger one host failure recovery
workflow. User can define the HA strategy according to the physical
environment. For example, if there is just 1 cluster to monitor management
network connectivity, the user just need to configrate
``$(CONSUL_MANAGEMENT_ADDR)`` in consul section of the hostmontior'
configration file, and change the HA strategy in
``/etc/masakarimonitors/matrix.yaml`` as following:
.. code-block:: yaml
sequence: ['manage']
matrix:
- health: ['up']
action: []
- health: ['down']
action: ['recovery']
Then the hostmonitor by consul works as same as the hostmonitor by pacemaker.

View File

@ -0,0 +1,19 @@
---
sequence: ['manage', 'tenant', 'storage']
matrix:
- health: ['up', 'up', 'up']
action: []
- health: ['up', 'up', 'down']
action: ['recovery']
- health: ['up', 'down', 'up']
action: []
- health: ['up', 'down', 'down']
action: ['recovery']
- health: ['down', 'up', 'up']
action: []
- health: ['down', 'up', 'down']
action: ['recovery']
- health: ['down', 'down', 'up']
action: []
- health: ['down', 'down', 'down']
action: ['recovery']

View File

@ -15,6 +15,7 @@ from oslo_config import cfg
from masakarimonitors.conf import api
from masakarimonitors.conf import base
from masakarimonitors.conf import consul
from masakarimonitors.conf import host
from masakarimonitors.conf import instance
from masakarimonitors.conf import introspectiveinstancemonitor
@ -25,6 +26,7 @@ CONF = cfg.CONF
api.register_opts(CONF)
base.register_opts(CONF)
consul.register_opts(CONF)
host.register_opts(CONF)
instance.register_opts(CONF)
introspectiveinstancemonitor.register_opts(CONF)

View File

@ -0,0 +1,37 @@
# Copyright(c) 2019 Inspur
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
consul_opts = [
cfg.StrOpt('agent_manage',
help='Addr for local consul agent in management datacenter.'),
cfg.StrOpt('agent_tenant',
help='Addr for local consul agent in tenant datacenter.'),
cfg.StrOpt('agent_storage',
help='Addr for local consul agent in storage datacenter.'),
cfg.StrOpt('matrix_config_file',
help='Config file for consul health action matrix.'),
]
def register_opts(conf):
conf.register_opts(consul_opts, group='consul')
def list_opts():
return {
'consul': consul_opts
}

View File

@ -0,0 +1,122 @@
# Copyright(c) 2021 Inspur
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Main abstraction layer for retrieving node status from consul
"""
import consul
from masakarimonitors.i18n import _
class ConsulException(Exception):
"""Base Consul Exception"""
msg_fmt = _("An unknown exception occurred.")
def __init__(self, message=None, **kwargs):
if not message:
message = self.msg_fmt % kwargs
super(ConsulException, self).__init__(message)
class ConsulAgentNotExist(ConsulException):
msg_fmt = _("Consul agent of %(cluster)s not exist.")
class ConsulGetMembersException(ConsulException):
msg_fmt = _("Failed to get members of %(cluster)s: %(err)s.")
class ConsulManager(object):
"""Consul manager class
This class helps to pull health data from all consul clusters,
and return health data in sequence.
"""
def __init__(self, CONF):
self.agents = {}
self.init_agents(CONF)
def init_agents(self, CONF):
if CONF.consul.agent_manage:
addr, port = CONF.consul.agent_manage.split(':')
self.agents['manage'] = ConsulAgent('manage', addr, port)
if CONF.consul.agent_tenant:
addr, port = CONF.consul.agent_tenant.split(':')
self.agents['tenant'] = ConsulAgent('tenant', addr, port)
if CONF.consul.agent_storage:
addr, port = CONF.consul.agent_storage.split(':')
self.agents['storage'] = ConsulAgent('storage', addr, port)
def valid_agents(self, sequence):
for name in sequence:
if self.agents.get(name) is None:
raise ConsulAgentNotExist(cluster=name)
def get_health(self, sequence):
hosts_health = {}
all_agents = []
for name in sequence:
consul_agent = self.agents.get(name)
agent_health = consul_agent.get_health()
hosts_health[name] = agent_health
if not all_agents:
all_agents = agent_health.keys()
sequence_hosts_health = {}
for host in all_agents:
sequence_hosts_health[host] = []
for name in sequence:
state = hosts_health[name].get(host)
if state:
sequence_hosts_health[host].append(state)
else:
continue
return sequence_hosts_health
class ConsulAgent(object):
"""Agent to consul cluster"""
def __init__(self, name, addr=None, port=None):
self.name = name
self.addr = addr
self.port = port
# connection to consul cluster
self.cluster = consul.Consul(host=addr, port=self.port)
def get_agents(self):
try:
members = self.cluster.agent.members()
except Exception as e:
raise ConsulGetMembersException(cluster=self.name, err=str(e))
return members
def get_health(self):
agents_health = {}
agents = self.get_agents()
for agent in agents:
host = agent.get('Name')
status = agent.get('Status')
if status == 1:
agents_health[host] = 'up'
else:
agents_health[host] = 'down'
return agents_health

View File

@ -0,0 +1,193 @@
# Copyright(c) 2019 Inspur
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
import socket
from collections import deque
from oslo_log import log
from oslo_utils import timeutils
from masakarimonitors import conf
from masakarimonitors.ha import masakari
from masakarimonitors.hostmonitor.consul_check import consul_helper
from masakarimonitors.hostmonitor.consul_check import matrix_helper
from masakarimonitors.hostmonitor import driver
from masakarimonitors.objects import event_constants as ec
LOG = log.getLogger(__name__)
CONF = conf.CONF
class ConsulCheck(driver.DriverBase):
"""Check host status by consul"""
def __init__(self):
super(ConsulCheck, self).__init__()
self.hostname = socket.gethostname()
self.monitoring_interval = CONF.host.monitoring_interval
self.monitoring_samples = CONF.host.monitoring_samples
self.matrix_manager = matrix_helper.MatrixManager(CONF)
self.consul_manager = consul_helper.ConsulManager(CONF)
self.notifier = masakari.SendNotification()
self._matrix = None
self._sequence = None
self.monitoring_data = {}
self.last_host_health = {}
@property
def matrix(self):
if not self._matrix:
self._matrix = self.matrix_manager.get_matrix()
return self._matrix
@property
def sequence(self):
if not self._sequence:
self._sequence = self.matrix_manager.get_sequence()
return self._sequence
def _formate_health(self, host_health):
formate_health = {}
for i in range(len(host_health)):
layer = "%s-interface" % self.sequence[i]
state = host_health[i]
formate_health[layer] = state
return formate_health
def _event(self, host, host_health):
host_status = ec.EventConstants.HOST_STATUS_NORMAL
if 'down' not in host_health:
event_type = ec.EventConstants.EVENT_STARTED
cluster_status = ec.EventConstants.CLUSTER_STATUS_ONLINE
else:
actions = self.get_action_from_matrix(host_health)
if 'recovery' in actions:
LOG.info("Host %s needs recovery, health status: %s." %
(host, str(self._formate_health(host_health))))
event_type = ec.EventConstants.EVENT_STOPPED
cluster_status = ec.EventConstants.CLUSTER_STATUS_OFFLINE
else:
return None
# TODO(suzhengwei): Add host status detail in the payload to show
# host status in all Consul clusters.
event = {
'notification': {
'type': ec.EventConstants.TYPE_COMPUTE_HOST,
'hostname': host,
'generated_time': timeutils.utcnow(),
'payload': {
'event': event_type,
'cluster_status': cluster_status,
'host_status': host_status
}
}
}
return event
def update_monitoring_data(self):
'''update monitoring data from consul clusters'''
LOG.debug("update monitoring data from consul.")
# Get current host health in sequence [x, y, z]. The example of
# the return value is {'node01':[x, y, z], 'node02':[x, y, z]...}
cluster_health = self.consul_manager.get_health(self.sequence)
LOG.debug("Current cluster state: %s.", cluster_health)
# Reassemble host health history with the latest host health.
# Example of 'host_health_history' is [[x, y, x], [x, y, z]...]
for host, health in cluster_health.items():
host_health_history = self.monitoring_data.get(
host, deque([], maxlen=self.monitoring_samples))
host_health_history.append(health)
self.monitoring_data[host] = host_health_history
def get_host_health(self, host):
health_history = self.monitoring_data.get(host, [])
if len(health_history) < self.monitoring_samples:
LOG.debug("Not enough monitoring data for host %s", host)
return None
# Caculate host health from host health history.
# Only continous 'down' represents the interface 'down',
# while continous 'up' represents the interface 'up'.
host_sequence_health = []
host_health_history = list(zip(*health_history))
for i in range(0, len(host_health_history)):
if ('up' in host_health_history[i] and
'down' in host_health_history[i]):
host_sequence_health.append(None)
else:
host_sequence_health.append(host_health_history[i][0])
return host_sequence_health
def _host_health_changed(self, host, health):
last_health = self.last_host_health.get(host)
if last_health is None:
self.last_host_health[host] = health
return False
if health != last_health:
self.last_host_health[host] = health
return True
else:
return False
def get_action_from_matrix(self, host_health):
for health_action in self.matrix:
matrix_health = health_action["health"]
matrix_action = health_action["action"]
if host_health == matrix_health:
return matrix_action
return []
def poll_hosts(self):
'''poll and check hosts health'''
for host in self.monitoring_data.keys():
if host == self.hostname:
continue
host_health = self.get_host_health(host)
if host_health is None:
continue
if not self._host_health_changed(host, host_health):
continue
# it will send notifition to trigger host failure recovery
# according to defined HA strategy
event = self._event(host, host_health)
if event:
self.notifier.send_notification(
CONF.host.api_retry_max,
CONF.host.api_retry_interval,
event)
def stop(self):
self.running = False
def monitor_hosts(self):
self.running = True
while self.running:
try:
self.update_monitoring_data()
self.poll_hosts()
except Exception as e:
LOG.exception("Exception when host-monitor by consul: %s", e)
eventlet.greenthread.sleep(self.monitoring_interval)

View File

@ -0,0 +1,77 @@
# Copyright(c) 2021 Inspur
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import yaml
DEFAULT_SEQUENCE = ['manage', 'tenant', 'storage']
# matrix is combined by health and actions.
# health: [x, x, x], repreasents status of DEFAULT_SEQUENCE.
# action, means which actions it will trigge if host health turns into.
# action choice: 'recovery'.
# 'recovery' means it will trigge one host recovery event.
DEFAULT_MATRIX = [
{"health": ["up", "up", "up"],
"action": []},
{"health": ["up", "up", "down"],
"action": ["recovery"]},
{"health": ["up", "down", "up"],
"action": []},
{"health": ["up", "down", "down"],
"action": ["recovery"]},
{"health": ["down", "up", "up"],
"action": []},
{"health": ["down", "up", "down"],
"action": ["recovery"]},
{"health": ["down", "down", "up"],
"action": []},
{"health": ["down", "down", "down"],
"action": ["recovery"]},
]
class MatrixManager(object):
"""Matrix Manager"""
def __init__(self, CONF):
cfg_file = CONF.consul.matrix_config_file
matrix_conf = self.load_config(cfg_file)
if not matrix_conf:
self.sequence = DEFAULT_SEQUENCE
self.matrix = DEFAULT_MATRIX
else:
self.sequence = matrix_conf.get("sequence")
self.matrix = matrix_conf.get("matrix")
self.valid_matrix(self.matrix, self.sequence)
def load_config(self, cfg_file):
if not cfg_file or not os.path.exists(cfg_file):
return None
with open(cfg_file) as f:
data = f.read()
matrix_conf = yaml.safe_load(data)
return matrix_conf
def get_sequence(self):
return self.sequence
def get_matrix(self):
return self.matrix
def valid_matrix(self, matrix, sequence):
pass

View File

@ -0,0 +1,128 @@
# Copyright(c) 2021 Inspur
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import testtools
from unittest import mock
from oslo_config import fixture as fixture_config
from masakarimonitors.hostmonitor.consul_check import consul_helper
class FakerAgentMembers(object):
def __init__(self):
self.agent_members = []
def create_agent(self, name, status=1):
agent = {
'Name': name,
'Status': status,
'Port': 'agent_lan_port',
'Addr': 'agent_ip',
'Tags': {
'dc': 'storage',
'role': 'consul',
'port': 'agent_server_port',
'wan_join_port': 'agent_wan_port',
'expect': '3',
'id': 'agent_id',
'vsn_max': '3',
'vsn_min': '2',
'vsn': '2',
'raft_vsn': '2',
},
'ProtocolMax': 5,
'ProtocolMin': 1,
'ProtocolCur': 2,
'DelegateMax': 5,
'DelegateMin': 2,
'DelegateCur': 4,
}
self.agent_members.append(agent)
def generate_agent_members(self):
return self.agent_members
class TestConsulManager(testtools.TestCase):
def setUp(self):
super(TestConsulManager, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
self.consul_manager = consul_helper.ConsulManager(self.CONF)
self.consul_manager.agents = {
'manage': consul_helper.ConsulAgent('manage'),
'tenant': consul_helper.ConsulAgent('tenant'),
'storage': consul_helper.ConsulAgent('storage'),
}
def test_get_health(self):
fake_manage_agents = FakerAgentMembers()
fake_manage_agents.create_agent('node01', status=1)
fake_manage_agents.create_agent('node02', status=1)
fake_manage_agents.create_agent('node03', status=1)
agent_manage_members = fake_manage_agents.generate_agent_members()
fake_tenant_agents = FakerAgentMembers()
fake_tenant_agents.create_agent('node01', status=1)
fake_tenant_agents.create_agent('node02', status=1)
fake_tenant_agents.create_agent('node03', status=1)
agent_tenant_members = fake_tenant_agents.generate_agent_members()
fake_storage_agents = FakerAgentMembers()
fake_storage_agents.create_agent('node01', status=1)
fake_storage_agents.create_agent('node02', status=1)
fake_storage_agents.create_agent('node03', status=3)
agent_storage_members = fake_storage_agents.generate_agent_members()
with mock.patch.object(self.consul_manager.agents['manage'],
'get_agents', return_value=agent_manage_members):
with mock.patch.object(self.consul_manager.agents['tenant'],
'get_agents', return_value=agent_tenant_members):
with mock.patch.object(self.consul_manager.agents['storage'],
'get_agents', return_value=agent_storage_members):
excepted_health = {
"node01": ['up', 'up', 'up'],
"node02": ['up', 'up', 'up'],
"node03": ['up', 'up', 'down'],
}
sequence = ['manage', 'tenant', 'storage']
agents_health = self.consul_manager.get_health(sequence)
self.assertEqual(excepted_health, agents_health)
class TestConsulAgent(testtools.TestCase):
def setUp(self):
super(TestConsulAgent, self).setUp()
self.consul_agent = consul_helper.ConsulAgent('test')
def test_get_health(self):
fake_agents = FakerAgentMembers()
fake_agents.create_agent('node01', status=1)
fake_agents.create_agent('node02', status=1)
fake_agents.create_agent('node03', status=3)
agent_members = fake_agents.generate_agent_members()
with mock.patch.object(self.consul_agent, 'get_agents',
return_value=agent_members):
excepted_health = {
"node01": 'up',
"node02": 'up',
"node03": 'down',
}
agents_health = self.consul_agent.get_health()
self.assertEqual(excepted_health, agents_health)

View File

@ -0,0 +1,157 @@
# Copyright(c) 2021 Inspur
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import testtools
from unittest import mock
from collections import deque
import eventlet
from oslo_config import fixture as fixture_config
import masakarimonitors.conf
from masakarimonitors.ha import masakari
from masakarimonitors.hostmonitor.consul_check import consul_helper
from masakarimonitors.hostmonitor.consul_check import manager
from masakarimonitors.hostmonitor.consul_check import matrix_helper
eventlet.monkey_patch(os=False)
CONF = masakarimonitors.conf.CONF
class TestConsulCheck(testtools.TestCase):
def setUp(self):
super(TestConsulCheck, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
self.host_monitor = manager.ConsulCheck()
self.host_monitor.matrix_manager = \
matrix_helper.MatrixManager(self.CONF)
self.host_monitor._matrix = matrix_helper.DEFAULT_MATRIX
self.host_monitor.consul_manager = \
consul_helper.ConsulManager(self.CONF)
self.host_monitor._sequence = ['manage', 'tenant', 'storage']
self.host_monitor.monitoring_data = {
"node01": deque([['up', 'up', 'up'],
['up', 'up', 'up'],
['up', 'up', 'up']],
maxlen=3),
"node02": deque([['up', 'up', 'up'],
['up', 'up', 'up'],
['up', 'up', 'down']],
maxlen=3),
"node03": deque([['up', 'up', 'up'],
['down', 'up', 'up'],
['down', 'up', 'up']],
maxlen=3),
}
def test_update_monitoring_data(self):
mock_health = {
'node01': ['up', 'up', 'up'],
'node02': ['up', 'up', 'up'],
'node03': ['up', 'up', 'up']
}
with mock.patch.object(self.host_monitor.consul_manager, 'get_health',
return_value=mock_health):
self.host_monitor.update_monitoring_data()
excepted_monitoring_data = {
"node01": deque([['up', 'up', 'up'],
['up', 'up', 'up'],
['up', 'up', 'up']],
maxlen=3),
"node02": deque([['up', 'up', 'up'],
['up', 'up', 'down'],
['up', 'up', 'up']],
maxlen=3),
"node03": deque([['down', 'up', 'up'],
['down', 'up', 'up'],
['up', 'up', 'up']],
maxlen=3),
}
self.assertEqual(excepted_monitoring_data,
self.host_monitor.monitoring_data)
def test_get_host_statistical_health(self):
self.assertEqual(['up', 'up', 'up'],
self.host_monitor.get_host_health('node01'))
self.assertEqual(['up', 'up', None],
self.host_monitor.get_host_health('node02'))
self.assertEqual([None, 'up', 'up'],
self.host_monitor.get_host_health('node03'))
def test_host_statistical_health_changed(self):
self.host_monitor.last_host_health = {
'node02': ['up', 'up', None],
'node03': ['up', 'up', 'down']
}
self.assertFalse(self.host_monitor._host_health_changed(
'node01', ['up', 'up', 'up']))
self.assertTrue(self.host_monitor._host_health_changed(
'node02', ['up', 'up', 'up']))
self.assertTrue(self.host_monitor._host_health_changed(
'node03', ['up', 'up', 'up']))
last_host_health = {
'node01': ['up', 'up', 'up'],
'node02': ['up', 'up', 'up'],
'node03': ['up', 'up', 'up']
}
self.assertEqual(self.host_monitor.last_host_health,
last_host_health)
def test_get_action_from_matrix_by_host_health(self):
self.assertEqual(
[],
self.host_monitor.get_action_from_matrix(['up', 'up', 'up']))
self.assertEqual(
["recovery"],
self.host_monitor.get_action_from_matrix(['up', 'up', 'down']))
self.assertEqual(
[],
self.host_monitor.get_action_from_matrix(['down', 'up', 'up']))
self.assertEqual(
["recovery"],
self.host_monitor.get_action_from_matrix(['down', 'down', 'down']))
@mock.patch.object(masakari.SendNotification, 'send_notification')
@mock.patch.object(manager.ConsulCheck, '_event')
def test_poll_hosts(self, mock_event, mock_send_notification):
self.host_monitor.monitoring_data = {
"node01": deque([['up', 'up', 'up'],
['up', 'up', 'up'],
['up', 'up', 'up']],
maxlen=3),
"node02": deque([['up', 'up', 'down'],
['up', 'up', 'down'],
['up', 'up', 'down']],
maxlen=3),
"node03": deque([['up', 'up', 'up'],
['up', 'up', 'up'],
['up', 'up', 'up']],
maxlen=3),
}
self.host_monitor.last_host_health = {
'node02': ['up', 'up', None],
'node03': ['up', 'up', 'up']
}
test_event = {'notification': 'test'}
mock_event.return_value = test_event
self.host_monitor.poll_hosts()
mock_send_notification.assert_called_once_with(
CONF.host.api_retry_max, CONF.host.api_retry_interval, test_event)

View File

@ -0,0 +1,55 @@
# Copyright(c) 2021 Inspur
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tempfile
from oslo_config import fixture as fixture_config
import testtools
import yaml
from masakarimonitors.hostmonitor.consul_check import matrix_helper
class TestMatrixManager(testtools.TestCase):
def setUp(self):
super(TestMatrixManager, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
def test_get_matrix_and_sequence_from_file(self):
matrix_cfg = {
'sequence': ['manage', 'tenant', 'storage'],
'matrix': [{"health": ["up", "up", "up"],
"action": ['test']}]
}
tmp_cfg = tempfile.NamedTemporaryFile(mode='w', delete=False)
tmp_cfg.write(yaml.safe_dump(matrix_cfg))
tmp_cfg.close()
self.CONF.set_override('matrix_config_file',
tmp_cfg.name, group='consul')
matrix_manager = matrix_helper.MatrixManager(self.CONF)
self.assertEqual(matrix_cfg.get('sequence'),
matrix_manager.get_sequence())
self.assertEqual(matrix_cfg.get('matrix'),
matrix_manager.get_matrix())
def test_get_default_matrix_and_sequence(self):
self.CONF.set_override('matrix_config_file', None, group='consul')
matrix_manager = matrix_helper.MatrixManager(self.CONF)
self.assertEqual(matrix_helper.DEFAULT_SEQUENCE,
matrix_manager.get_sequence())
self.assertEqual(matrix_helper.DEFAULT_MATRIX,
matrix_manager.get_matrix())

View File

@ -0,0 +1,6 @@
---
features:
- |
Added hostmonitor driver based on consul. It can detects interfaces
connectivity status via multiple consul clusters, and sends notifition
to trigger host failure recovery according to defined HA strategy.

View File

@ -16,6 +16,7 @@ oslo.privsep>=1.23.0 # Apache-2.0
oslo.service!=1.28.1,>=1.24.0 # Apache-2.0
oslo.utils>=3.33.0 # Apache-2.0
pbr!=2.1.0,>=2.0.0 # Apache-2.0
python-consul >=1.1.0 # MIT
# Due to the nature of libvirt-python package, in DevStack we use the one
# provided in the distro alongside libvirtd - to ensure the two are compatible,

View File

@ -45,3 +45,4 @@ console_scripts =
hostmonitor.driver =
simple = masakarimonitors.hostmonitor.host_handler.handle_host:HandleHost
default = masakarimonitors.hostmonitor.host_handler.handle_host:HandleHost
consul = masakarimonitors.hostmonitor.consul_check.manager:ConsulCheck