Delays the creation of the looping calls that that check the queue until startService is called.
This commit is contained in:
commit
f94bdc1c7b
|
@ -49,15 +49,17 @@ flags.DEFINE_integer('periodic_interval', 60,
|
|||
class Service(object, service.Service):
|
||||
"""Base class for workers that run on hosts."""
|
||||
|
||||
def __init__(self, host, binary, topic, manager, *args, **kwargs):
|
||||
def __init__(self, host, binary, topic, manager, report_interval=None,
|
||||
periodic_interval=None, *args, **kwargs):
|
||||
self.host = host
|
||||
self.binary = binary
|
||||
self.topic = topic
|
||||
self.manager_class_name = manager
|
||||
self.report_interval = report_interval
|
||||
self.periodic_interval = periodic_interval
|
||||
super(Service, self).__init__(*args, **kwargs)
|
||||
self.saved_args, self.saved_kwargs = args, kwargs
|
||||
|
||||
|
||||
def startService(self): # pylint: disable-msg C0103
|
||||
manager_class = utils.import_class(self.manager_class_name)
|
||||
self.manager = manager_class(host=self.host, *self.saved_args,
|
||||
|
@ -73,6 +75,26 @@ class Service(object, service.Service):
|
|||
except exception.NotFound:
|
||||
self._create_service_ref(ctxt)
|
||||
|
||||
conn = rpc.Connection.instance()
|
||||
if self.report_interval:
|
||||
consumer_all = rpc.AdapterConsumer(
|
||||
connection=conn,
|
||||
topic=self.topic,
|
||||
proxy=self)
|
||||
consumer_node = rpc.AdapterConsumer(
|
||||
connection=conn,
|
||||
topic='%s.%s' % (self.topic, self.host),
|
||||
proxy=self)
|
||||
|
||||
consumer_all.attach_to_twisted()
|
||||
consumer_node.attach_to_twisted()
|
||||
|
||||
pulse = task.LoopingCall(self.report_state)
|
||||
pulse.start(interval=self.report_interval, now=False)
|
||||
|
||||
if self.periodic_interval:
|
||||
pulse = task.LoopingCall(self.periodic_tasks)
|
||||
pulse.start(interval=self.periodic_interval, now=False)
|
||||
|
||||
def _create_service_ref(self, context):
|
||||
service_ref = db.service_create(context,
|
||||
|
@ -83,10 +105,8 @@ class Service(object, service.Service):
|
|||
self.service_id = service_ref['id']
|
||||
|
||||
def __getattr__(self, key):
|
||||
try:
|
||||
return super(Service, self).__getattr__(key)
|
||||
except AttributeError:
|
||||
return getattr(self.manager, key)
|
||||
manager = self.__dict__.get('manager', None)
|
||||
return getattr(manager, key)
|
||||
|
||||
@classmethod
|
||||
def create(cls,
|
||||
|
@ -119,25 +139,8 @@ class Service(object, service.Service):
|
|||
if not periodic_interval:
|
||||
periodic_interval = FLAGS.periodic_interval
|
||||
logging.warn("Starting %s node", topic)
|
||||
service_obj = cls(host, binary, topic, manager)
|
||||
conn = rpc.Connection.instance()
|
||||
consumer_all = rpc.AdapterConsumer(
|
||||
connection=conn,
|
||||
topic=topic,
|
||||
proxy=service_obj)
|
||||
consumer_node = rpc.AdapterConsumer(
|
||||
connection=conn,
|
||||
topic='%s.%s' % (topic, host),
|
||||
proxy=service_obj)
|
||||
|
||||
consumer_all.attach_to_twisted()
|
||||
consumer_node.attach_to_twisted()
|
||||
|
||||
pulse = task.LoopingCall(service_obj.report_state)
|
||||
pulse.start(interval=report_interval, now=False)
|
||||
|
||||
pulse = task.LoopingCall(service_obj.periodic_tasks)
|
||||
pulse.start(interval=periodic_interval, now=False)
|
||||
service_obj = cls(host, binary, topic, manager,
|
||||
report_interval, periodic_interval)
|
||||
|
||||
# This is the parent service that twistd will be looking for when it
|
||||
# parses this file, return it so that we can get it into globals.
|
||||
|
|
|
@ -39,11 +39,44 @@ flags.DEFINE_string("fake_manager", "nova.tests.service_unittest.FakeManager",
|
|||
|
||||
class FakeManager(manager.Manager):
|
||||
"""Fake manager for tests"""
|
||||
pass
|
||||
def test_method(self):
|
||||
return 'manager'
|
||||
|
||||
|
||||
class ExtendedService(service.Service):
|
||||
def test_method(self):
|
||||
return 'service'
|
||||
|
||||
|
||||
class ServiceManagerTestCase(test.BaseTestCase):
|
||||
"""Test cases for Services"""
|
||||
|
||||
def test_attribute_error_for_no_manager(self):
|
||||
serv = service.Service('test',
|
||||
'test',
|
||||
'test',
|
||||
'nova.tests.service_unittest.FakeManager')
|
||||
self.assertRaises(AttributeError, getattr, serv, 'test_method')
|
||||
|
||||
def test_message_gets_to_manager(self):
|
||||
serv = service.Service('test',
|
||||
'test',
|
||||
'test',
|
||||
'nova.tests.service_unittest.FakeManager')
|
||||
serv.startService()
|
||||
self.assertEqual(serv.test_method(), 'manager')
|
||||
|
||||
def test_override_manager_method(self):
|
||||
serv = ExtendedService('test',
|
||||
'test',
|
||||
'test',
|
||||
'nova.tests.service_unittest.FakeManager')
|
||||
serv.startService()
|
||||
self.assertEqual(serv.test_method(), 'service')
|
||||
|
||||
|
||||
class ServiceTestCase(test.BaseTestCase):
|
||||
"""Test cases for rpc"""
|
||||
"""Test cases for Services"""
|
||||
|
||||
def setUp(self): # pylint: disable=C0103
|
||||
super(ServiceTestCase, self).setUp()
|
||||
|
@ -54,6 +87,11 @@ class ServiceTestCase(test.BaseTestCase):
|
|||
host = 'foo'
|
||||
binary = 'nova-fake'
|
||||
topic = 'fake'
|
||||
|
||||
# NOTE(vish): Create was moved out of mox replay to make sure that
|
||||
# the looping calls are created in StartService.
|
||||
app = service.Service.create(host=host, binary=binary)
|
||||
|
||||
self.mox.StubOutWithMock(rpc,
|
||||
'AdapterConsumer',
|
||||
use_mock_anything=True)
|
||||
|
@ -99,7 +137,6 @@ class ServiceTestCase(test.BaseTestCase):
|
|||
service_create).AndReturn(service_ref)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
app = service.Service.create(host=host, binary=binary)
|
||||
startApplication(app, False)
|
||||
self.assert_(app)
|
||||
|
||||
|
@ -190,3 +227,4 @@ class ServiceTestCase(test.BaseTestCase):
|
|||
rv = yield s.report_state(host, binary)
|
||||
|
||||
self.assert_(not s.model_disconnected)
|
||||
|
||||
|
|
Loading…
Reference in New Issue