Reinitialize pipeline manager for service restart

For central and compute agent, they need to reinitialize
pipeline manager for service restart.
It depends on blueprint service-restart of oslo-incubator and
blueprint cfg-reload-config-files of oslo.config.

Implements blueprint pollster-runtime-configuration

Change-Id: Icf2597392222d6d97b3dbfd7d48e5ac3b84b175e
This commit is contained in:
Fengqian.Gao 2013-07-31 11:06:17 +08:00 committed by Fengqian
parent b59a03109c
commit 0ce0097a95
2 changed files with 145 additions and 5 deletions

View File

@ -51,11 +51,6 @@ class PollingTask(object):
class AgentManager(object): class AgentManager(object):
def __init__(self, extension_manager): def __init__(self, extension_manager):
self.pipeline_manager = pipeline.setup_pipeline(
transformer.TransformerExtensionManager(
'ceilometer.transformer',
),
)
self.pollster_manager = extension_manager self.pollster_manager = extension_manager
@ -80,6 +75,12 @@ class AgentManager(object):
return polling_tasks return polling_tasks
def initialize_service_hook(self, service): def initialize_service_hook(self, service):
self.pipeline_manager = pipeline.setup_pipeline(
transformer.TransformerExtensionManager(
'ceilometer.transformer',
),
)
self.service = service self.service = service
for interval, task in self.setup_polling_tasks().iteritems(): for interval, task in self.setup_polling_tasks().iteritems():
self.service.tg.add_timer(interval, self.service.tg.add_timer(interval,

View File

@ -17,6 +17,13 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import yaml
import subprocess
import os
import shutil
import signal
import time
import threading
from ceilometer import service from ceilometer import service
from ceilometer.tests import base from ceilometer.tests import base
@ -24,3 +31,135 @@ from ceilometer.tests import base
class ServiceTestCase(base.TestCase): class ServiceTestCase(base.TestCase):
def test_prepare_service(self): def test_prepare_service(self):
service.prepare_service([]) service.prepare_service([])
#NOTE(Fengqian): I have to set up a thread to parse the ouput of
#subprocess.Popen. Because readline() may block the process in
#some conditions.
class ParseOutput(threading.Thread):
def __init__(self, input_stream, str_flag):
super(ParseOutput, self).__init__()
self.input_stream = input_stream
self.str_flag = str_flag
self.ret_stream = None
self.ret = False
self.thread_stop = False
def run(self):
while not self.thread_stop:
next_line = self.input_stream.readline()
if next_line == '':
break
if self.str_flag in next_line:
self.ret = True
self.ret_stream = next_line[(next_line.find(self.str_flag) +
len(self.str_flag)):]
self.stop()
def stop(self):
self.thread_stop = True
class ServiceRestartTest(base.TestCase):
def setUp(self):
super(ServiceRestartTest, self).setUp()
self.tempfile = self.temp_config_file_path()
self.pipeline_cfg_file = self.temp_config_file_path(name=
'pipeline.yaml')
shutil.copy(self.path_get('etc/ceilometer/pipeline.yaml'),
self.pipeline_cfg_file)
self.pipelinecfg_read_from_file()
policy_file = self.path_get('tests/policy.json')
with open(self.tempfile, 'w') as tmp:
tmp.write("[DEFAULT]\n")
tmp.write(
"rpc_backend=ceilometer.openstack.common.rpc.impl_fake\n")
tmp.write(
"auth_strategy=noauth\n")
tmp.write(
"debug=true\n")
tmp.write(
"pipeline_cfg_file=%s\n" % self.pipeline_cfg_file)
tmp.write(
"policy_file=%s\n" % policy_file)
tmp.write("[database]\n")
tmp.write("connection=log://localhost\n")
def _modify_pipeline_file(self):
with open(self.pipeline_cfg_file, 'w') as pipe_fd:
pipe_fd.truncate()
pipe_fd.write(yaml.safe_dump(self.pipeline_cfg[1]))
def pipelinecfg_read_from_file(self):
with open(self.pipeline_cfg_file) as fd:
data = fd.read()
self.pipeline_cfg = yaml.safe_load(data)
def tearDown(self):
super(ServiceRestartTest, self).tearDown()
self.sub.kill()
self.sub.wait()
@staticmethod
def _check_process_alive(pid):
try:
with open("/proc/%d/status" % pid) as fd_proc:
for line in fd_proc.readlines():
if line.startswith("State:"):
state = line.split(":", 1)[1].strip().split(' ')[0]
return state not in ['Z', 'T', 'Z+']
except IOError:
return False
def check_process_alive(self):
cond = lambda: self._check_process_alive(self.sub.pid)
return self._wait(cond, 60)
def parse_output(self, str_flag, timeout=3):
parse = ParseOutput(self.sub.stderr, str_flag)
parse.start()
parse.join(timeout)
parse.stop()
return parse
@staticmethod
def _wait(cond, timeout):
start = time.time()
while not cond():
if time.time() - start > timeout:
break
time.sleep(.1)
return cond()
def _spawn_service(self, cmd, conf_file=None):
if conf_file is None:
conf_file = self.tempfile
self.sub = subprocess.Popen([cmd, '--config-file=%s' % conf_file],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
#NOTE(Fengqian): Parse the output to see if the service started
self.assertTrue(self.parse_output("Starting").ret)
self.check_process_alive()
def _service_restart(self, cmd):
self._spawn_service(cmd)
self.assertTrue(self.sub.pid)
#NOTE(Fengqian): Modify the pipleline configure file to see
#if the file is reloaded correctly.
self._modify_pipeline_file()
self.pipelinecfg_read_from_file()
os.kill(self.sub.pid, signal.SIGHUP)
self.assertTrue(self.check_process_alive())
self.assertTrue(self.parse_output("Caught SIGHUP").ret)
self.assertEquals(self.pipeline_cfg,
yaml.safe_load(
self.parse_output("Pipeline config: ").ret_stream))
def test_compute_service_restart(self):
self._service_restart('ceilometer-agent-compute')
def test_central_service_restart(self):
self._service_restart('ceilometer-agent-central')