Merge "Add heart beat report for polling agents"

This commit is contained in:
Zuul 2024-09-30 10:28:26 +00:00 committed by Gerrit Code Review
commit 6797402224
4 changed files with 261 additions and 11 deletions

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import multiprocessing
import shlex
import cotyledon
@ -79,11 +80,20 @@ def _prepare_config():
return conf
def create_polling_service(worker_id, conf=None):
def create_polling_service(worker_id, conf=None, queue=None):
if conf is None:
conf = _prepare_config()
conf.log_opt_values(LOG, log.DEBUG)
return manager.AgentManager(worker_id, conf, conf.polling_namespaces)
return manager.AgentManager(worker_id, conf,
conf.polling_namespaces, queue)
def create_heartbeat_service(worker_id, conf, queue=None):
if conf is None:
conf = _prepare_config()
conf.log_opt_values(LOG, log.DEBUG)
return manager.AgentHeartBeatManager(worker_id, conf,
conf.polling_namespaces, queue)
def main():
@ -91,5 +101,11 @@ def main():
conf = _prepare_config()
priv_context.init(root_helper=shlex.split(utils._get_root_helper()))
oslo_config_glue.setup(sm, conf)
sm.add(create_polling_service, args=(conf,))
if conf.polling.heartbeat_socket_dir is not None:
queue = multiprocessing.Queue()
sm.add(create_heartbeat_service, args=(conf, queue))
else:
queue = None
sm.add(create_polling_service, args=(conf, queue))
sm.run()

View File

@ -19,7 +19,10 @@ import glob
import itertools
import logging
import os
import queue
import random
import socket
import threading
import uuid
from concurrent import futures
@ -51,6 +54,10 @@ POLLING_OPTS = [
default="polling.yaml",
help="Configuration file for polling definition."
),
cfg.StrOpt('heartbeat_socket_dir',
default=None,
help="Path to directory where socket file for polling "
"heartbeat will be created."),
cfg.StrOpt('partitioning_group_prefix',
deprecated_group='central',
help='Work-load partitioning group prefix. Use only if you '
@ -89,6 +96,11 @@ class PollingException(agent.ConfigException):
super(PollingException, self).__init__('Polling', message, cfg)
class HeartBeatException(agent.ConfigException):
def __init__(self, message, cfg):
super(HeartBeatException, self).__init__('Polling', message, cfg)
class Resources(object):
def __init__(self, agent_manager):
self.agent_manager = agent_manager
@ -207,6 +219,8 @@ class PollingTask(object):
)
sample_batch = []
self.manager.heartbeat(pollster.name, polling_timestamp)
for sample in samples:
# Note(yuywz): Unify the timestamp of polled samples
sample.set_timestamp(polling_timestamp)
@ -289,15 +303,100 @@ class PollingTask(object):
)
class AgentHeartBeatManager(cotyledon.Service):
def __init__(self, worker_id, conf, namespaces=None, queue=None):
super(AgentHeartBeatManager, self).__init__(worker_id)
self.conf = conf
if conf.polling.heartbeat_socket_dir is None:
raise HeartBeatException("path to a directory containing "
"heart beat sockets is required", conf)
if type(namespaces) is not list:
if namespaces is None:
namespaces = ""
namespaces = [namespaces]
self._lock = threading.Lock()
self._queue = queue
self._status = dict()
self._sock_pth = os.path.join(
conf.polling.heartbeat_socket_dir,
f"ceilometer-{'-'.join(sorted(namespaces))}.socket"
)
self._delete_socket()
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
self._sock.bind(self._sock_pth)
self._sock.listen(1)
except socket.error as err:
raise HeartBeatException("Failed to open socket file "
f"({self._sock_pth}): {err}", conf)
LOG.info("Starting heartbeat child service. Listening"
f" on {self._sock_pth}")
def _delete_socket(self):
try:
os.remove(self._sock_pth)
except OSError:
pass
def terminate(self):
self._tpe.shutdown(wait=False, cancel_futures=True)
self._sock.close()
self._delete_socket()
def _update_status(self):
hb = self._queue.get()
with self._lock:
self._status[hb['pollster']] = hb['timestamp']
LOG.debug(f"Updated heartbeat for {hb['pollster']} "
f"({hb['timestamp']})")
def _send_heartbeat(self):
s, addr = self._sock.accept()
LOG.debug("Heartbeat status report requested "
f"at {self._sock_pth}")
with self._lock:
out = '\n'.join([f"{k} {v}"
for k, v in self._status.items()])
s.sendall(out.encode('utf-8'))
s.close()
LOG.debug(f"Reported heartbeat status:\n{out}")
def run(self):
super(AgentHeartBeatManager, self).run()
LOG.debug("Started heartbeat child process.")
def _read_queue():
LOG.debug("Started heartbeat update thread")
while True:
self._update_status()
def _report_status():
LOG.debug("Started heartbeat reporting thread")
while True:
self._send_heartbeat()
with futures.ThreadPoolExecutor(max_workers=2) as executor:
self._tpe = executor
executor.submit(_read_queue)
executor.submit(_report_status)
class AgentManager(cotyledon.Service):
def __init__(self, worker_id, conf, namespaces=None):
def __init__(self, worker_id, conf, namespaces=None, queue=None):
namespaces = namespaces or ['compute', 'central']
group_prefix = conf.polling.partitioning_group_prefix
super(AgentManager, self).__init__(worker_id)
self.conf = conf
self._queue = queue
if type(namespaces) is not list:
namespaces = [namespaces]
@ -350,6 +449,19 @@ class AgentManager(cotyledon.Service):
self._keystone = None
self._keystone_last_exception = None
def heartbeat(self, name, timestamp):
"""Send heartbeat data if the agent is configured to do so."""
if self._queue is not None:
try:
hb = {
'timestamp': timestamp,
'pollster': name
}
self._queue.put_nowait(hb)
LOG.debug(f"Polster heartbeat update: {name}")
except queue.Full:
LOG.warning(f"Heartbeat queue full. Update failed: {hb}")
def create_dynamic_pollsters(self, namespaces):
"""Creates dynamic pollsters

View File

@ -0,0 +1,113 @@
#
# Copyright 2024 Red Hat, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer polling heartbeat process"""
import multiprocessing
import shutil
import tempfile
from oslo_utils import timeutils
from unittest import mock
from ceilometer.polling import manager
from ceilometer import service
from ceilometer.tests import base
class TestHeartBeatManagert(base.BaseTestCase):
def setUp(self):
super(TestHeartBeatManagert, self).setUp()
self.conf = service.prepare_service([], [])
self.tmpdir = tempfile.mkdtemp()
self.queue = multiprocessing.Queue()
self.mgr = manager.AgentManager(0, self.conf, namespaces='central',
queue=self.queue)
def tearDown(self):
super(TestHeartBeatManagert, self).tearDown()
shutil.rmtree(self.tmpdir)
def setup_polling(self, poll_cfg=None):
name = self.cfg2file(poll_cfg or self.polling_cfg)
self.conf.set_override('cfg_file', name, group='polling')
self.mgr.polling_manager = manager.PollingManager(self.conf)
def test_hb_not_configured(self):
self.assertRaises(manager.HeartBeatException,
manager.AgentHeartBeatManager,
0, self.conf,
namespaces='ipmi',
queue=self.queue)
@mock.patch('ceilometer.polling.manager.LOG')
def test_hb_startup(self, LOG):
# activate heartbeat agent
self.conf.set_override('heartbeat_socket_dir', self.tmpdir,
group='polling')
manager.AgentHeartBeatManager(0, self.conf, namespaces='compute',
queue=self.queue)
calls = [mock.call("Starting heartbeat child service. Listening"
f" on {self.tmpdir}/ceilometer-compute.socket")]
LOG.info.assert_has_calls(calls)
@mock.patch('ceilometer.polling.manager.LOG')
def test_hb_update(self, LOG):
self.conf.set_override('heartbeat_socket_dir', self.tmpdir,
group='polling')
hb = manager.AgentHeartBeatManager(0, self.conf, namespaces='central',
queue=self.queue)
timestamp = timeutils.utcnow().isoformat()
self.queue.put_nowait({'timestamp': timestamp, 'pollster': 'test'})
hb._update_status()
calls = [mock.call(f"Updated heartbeat for test ({timestamp})")]
LOG.debug.assert_has_calls(calls)
@mock.patch('ceilometer.polling.manager.LOG')
def test_hb_send(self, LOG):
with mock.patch('socket.socket') as FakeSocket:
sub_skt = mock.Mock()
sub_skt.sendall.return_value = None
sub_skt.sendall.return_value = None
skt = FakeSocket.return_value
skt.bind.return_value = mock.Mock()
skt.listen.return_value = mock.Mock()
skt.accept.return_value = (sub_skt, "")
self.conf.set_override('heartbeat_socket_dir', self.tmpdir,
group='polling')
hb = manager.AgentHeartBeatManager(0, self.conf,
namespaces='central',
queue=self.queue)
timestamp = timeutils.utcnow().isoformat()
self.queue.put_nowait({'timestamp': timestamp,
'pollster': 'test1'})
hb._update_status()
self.queue.put_nowait({'timestamp': timestamp,
'pollster': 'test2'})
hb._update_status()
# test status report
hb._send_heartbeat()
calls = [mock.call("Heartbeat status report requested "
f"at {self.tmpdir}/ceilometer-central.socket"),
mock.call("Reported heartbeat status:\n"
f"test1 {timestamp}\n"
f"test2 {timestamp}")]
LOG.debug.assert_has_calls(calls)

View File

@ -18,6 +18,7 @@
"""Tests for ceilometer agent manager"""
import copy
import datetime
import multiprocessing
import shutil
import tempfile
from unittest import mock
@ -92,7 +93,8 @@ class TestManager(base.BaseTestCase):
self.assertNotEqual(manager.hash_of_set(y), manager.hash_of_set(z))
def test_load_plugins(self):
mgr = manager.AgentManager(0, self.conf)
mgr = manager.AgentManager(0, self.conf,
queue=multiprocessing.Queue())
self.assertIsNotNone(list(mgr.extensions))
# Test plugin load behavior based on Node Manager pollsters.
@ -101,8 +103,8 @@ class TestManager(base.BaseTestCase):
@mock.patch('ceilometer.ipmi.pollsters.sensor.SensorPollster.__init__',
mock.Mock(return_value=None))
def test_load_normal_plugins(self):
mgr = manager.AgentManager(0, self.conf,
namespaces=['ipmi'])
mgr = manager.AgentManager(0, self.conf, namespaces=['ipmi'],
queue=multiprocessing.Queue())
# 8 pollsters for Node Manager
self.assertEqual(13, len(mgr.extensions))
@ -114,7 +116,8 @@ class TestManager(base.BaseTestCase):
def test_load_failed_plugins(self, LOG):
# Here we additionally check that namespaces will be converted to the
# list if param was not set as a list.
manager.AgentManager(0, self.conf, namespaces='ipmi')
manager.AgentManager(0, self.conf, namespaces='ipmi',
queue=multiprocessing.Queue())
err_msg = 'Skip loading extension for %s: %s'
pollster_names = [
'power', 'temperature', 'outlet_temperature',
@ -132,7 +135,8 @@ class TestManager(base.BaseTestCase):
@mock.patch('ceilometer.polling.manager.LOG')
def test_import_error_in_plugin(self, LOG):
namespaces = ['ipmi']
manager.AgentManager(0, self.conf, namespaces=namespaces)
manager.AgentManager(0, self.conf, namespaces=namespaces,
queue=multiprocessing.Queue())
LOG.warning.assert_called_with(
'No valid pollsters can be loaded from %s namespaces', namespaces)
@ -282,7 +286,8 @@ class BaseAgent(base.BaseTestCase):
self.mgr.polling_manager = manager.PollingManager(self.CONF)
def create_manager(self):
return manager.AgentManager(0, self.CONF)
queue = multiprocessing.Queue()
return manager.AgentManager(0, self.CONF, queue=queue)
def fake_notifier_sample(self, ctxt, event_type, payload):
for m in payload['samples']:
@ -301,7 +306,8 @@ class BaseAgent(base.BaseTestCase):
self.CONF = service.prepare_service([], [])
self.CONF.set_override(
'cfg_file',
self.path_get('etc/ceilometer/polling_all.yaml'), group='polling'
self.path_get('etc/ceilometer/polling_all.yaml'),
group='polling'
)
self.polling_cfg = {
'sources': [{
@ -703,6 +709,9 @@ class TestPollingAgent(BaseAgent):
mock.call('Finished polling pollster %(poll)s in the context '
'of %(src)s', {'poll': 'test', 'src': 'test_polling'})
])
LOG.debug.assert_has_calls([
mock.call('Polster heartbeat update: test')
])
@mock.patch('ceilometer.polling.manager.LOG')
def test_skip_polling_and_notify_with_no_resources(self, LOG):