From 0ce0097a9552e22a4897e3d4c6d202aacd3b846f Mon Sep 17 00:00:00 2001 From: "Fengqian.Gao" Date: Wed, 31 Jul 2013 11:06:17 +0800 Subject: [PATCH] Reinitialize pipeline manager for service restart For central and compute agent, they need to reinitialize pipeline manager for service restart. It depends on blueprint service-restart of oslo-incubator and blueprint cfg-reload-config-files of oslo.config. Implements blueprint pollster-runtime-configuration Change-Id: Icf2597392222d6d97b3dbfd7d48e5ac3b84b175e --- ceilometer/agent.py | 11 ++-- tests/test_service.py | 139 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 145 insertions(+), 5 deletions(-) diff --git a/ceilometer/agent.py b/ceilometer/agent.py index 7638b730..2bb0fb0b 100644 --- a/ceilometer/agent.py +++ b/ceilometer/agent.py @@ -51,11 +51,6 @@ class PollingTask(object): class AgentManager(object): def __init__(self, extension_manager): - self.pipeline_manager = pipeline.setup_pipeline( - transformer.TransformerExtensionManager( - 'ceilometer.transformer', - ), - ) self.pollster_manager = extension_manager @@ -80,6 +75,12 @@ class AgentManager(object): return polling_tasks def initialize_service_hook(self, service): + self.pipeline_manager = pipeline.setup_pipeline( + transformer.TransformerExtensionManager( + 'ceilometer.transformer', + ), + ) + self.service = service for interval, task in self.setup_polling_tasks().iteritems(): self.service.tg.add_timer(interval, diff --git a/tests/test_service.py b/tests/test_service.py index 75f34923..65580b66 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -17,6 +17,13 @@ # License for the specific language governing permissions and limitations # under the License. +import yaml +import subprocess +import os +import shutil +import signal +import time +import threading from ceilometer import service from ceilometer.tests import base @@ -24,3 +31,135 @@ from ceilometer.tests import base class ServiceTestCase(base.TestCase): def test_prepare_service(self): service.prepare_service([]) + + +#NOTE(Fengqian): I have to set up a thread to parse the ouput of +#subprocess.Popen. Because readline() may block the process in +#some conditions. +class ParseOutput(threading.Thread): + def __init__(self, input_stream, str_flag): + super(ParseOutput, self).__init__() + self.input_stream = input_stream + self.str_flag = str_flag + self.ret_stream = None + self.ret = False + self.thread_stop = False + + def run(self): + while not self.thread_stop: + next_line = self.input_stream.readline() + if next_line == '': + break + if self.str_flag in next_line: + self.ret = True + self.ret_stream = next_line[(next_line.find(self.str_flag) + + len(self.str_flag)):] + self.stop() + + def stop(self): + self.thread_stop = True + + +class ServiceRestartTest(base.TestCase): + + def setUp(self): + super(ServiceRestartTest, self).setUp() + self.tempfile = self.temp_config_file_path() + self.pipeline_cfg_file = self.temp_config_file_path(name= + 'pipeline.yaml') + shutil.copy(self.path_get('etc/ceilometer/pipeline.yaml'), + self.pipeline_cfg_file) + self.pipelinecfg_read_from_file() + policy_file = self.path_get('tests/policy.json') + with open(self.tempfile, 'w') as tmp: + tmp.write("[DEFAULT]\n") + tmp.write( + "rpc_backend=ceilometer.openstack.common.rpc.impl_fake\n") + tmp.write( + "auth_strategy=noauth\n") + tmp.write( + "debug=true\n") + tmp.write( + "pipeline_cfg_file=%s\n" % self.pipeline_cfg_file) + tmp.write( + "policy_file=%s\n" % policy_file) + tmp.write("[database]\n") + tmp.write("connection=log://localhost\n") + + def _modify_pipeline_file(self): + with open(self.pipeline_cfg_file, 'w') as pipe_fd: + pipe_fd.truncate() + pipe_fd.write(yaml.safe_dump(self.pipeline_cfg[1])) + + def pipelinecfg_read_from_file(self): + with open(self.pipeline_cfg_file) as fd: + data = fd.read() + self.pipeline_cfg = yaml.safe_load(data) + + def tearDown(self): + super(ServiceRestartTest, self).tearDown() + self.sub.kill() + self.sub.wait() + + @staticmethod + def _check_process_alive(pid): + try: + with open("/proc/%d/status" % pid) as fd_proc: + for line in fd_proc.readlines(): + if line.startswith("State:"): + state = line.split(":", 1)[1].strip().split(' ')[0] + return state not in ['Z', 'T', 'Z+'] + except IOError: + return False + + def check_process_alive(self): + cond = lambda: self._check_process_alive(self.sub.pid) + return self._wait(cond, 60) + + def parse_output(self, str_flag, timeout=3): + parse = ParseOutput(self.sub.stderr, str_flag) + parse.start() + parse.join(timeout) + parse.stop() + return parse + + @staticmethod + def _wait(cond, timeout): + start = time.time() + while not cond(): + if time.time() - start > timeout: + break + time.sleep(.1) + return cond() + + def _spawn_service(self, cmd, conf_file=None): + if conf_file is None: + conf_file = self.tempfile + self.sub = subprocess.Popen([cmd, '--config-file=%s' % conf_file], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + #NOTE(Fengqian): Parse the output to see if the service started + self.assertTrue(self.parse_output("Starting").ret) + self.check_process_alive() + + def _service_restart(self, cmd): + self._spawn_service(cmd) + + self.assertTrue(self.sub.pid) + #NOTE(Fengqian): Modify the pipleline configure file to see + #if the file is reloaded correctly. + self._modify_pipeline_file() + self.pipelinecfg_read_from_file() + os.kill(self.sub.pid, signal.SIGHUP) + + self.assertTrue(self.check_process_alive()) + self.assertTrue(self.parse_output("Caught SIGHUP").ret) + self.assertEquals(self.pipeline_cfg, + yaml.safe_load( + self.parse_output("Pipeline config: ").ret_stream)) + + def test_compute_service_restart(self): + self._service_restart('ceilometer-agent-compute') + + def test_central_service_restart(self): + self._service_restart('ceilometer-agent-central')