Don't assume that there is one service per host

When we have multiple workers the service info is incorrect.
This patch changes the logic to make a new service and
then later delete entries that are not getting updated.

Closes-bug: 1433865
Change-Id: If297049f62e614b955faaefeaaf67e2649934a0c
This commit is contained in:
Angus Salkeld 2015-03-20 10:57:01 +10:00
parent f19c54dd9a
commit 35b96dcf67
2 changed files with 46 additions and 59 deletions

View File

@ -12,6 +12,7 @@
# under the License.
import collections
import datetime
import os
import socket
import warnings
@ -347,6 +348,7 @@ class EngineService(service.Service):
self._client = rpc_messaging.get_rpc_client(
version=self.RPC_API_VERSION)
self.service_manage_cleanup()
self.manage_thread_grp = threadgroup.ThreadGroup()
self.manage_thread_grp.add_timer(cfg.CONF.periodic_interval,
self.service_manage_report)
@ -1518,39 +1520,35 @@ class EngineService(service.Service):
service_objects.Service.update_by_id(
cnxt,
self.service_id,
dict())
dict(deleted_at=None))
LOG.info(_LI('Service %s is updated'), self.service_id)
else:
service_refs = service_objects.Service.get_all_by_args(
service_ref = service_objects.Service.create(
cnxt,
self.host,
self.binary,
self.hostname)
if len(service_refs) == 1:
# Service was aborted or stopped
service_ref = service_refs[0]
dict(host=self.host,
hostname=self.hostname,
binary=self.binary,
engine_id=self.engine_id,
topic=self.topic,
report_interval=cfg.CONF.periodic_interval)
)
self.service_id = service_ref['id']
LOG.info(_LI('Service %s is started'), self.service_id)
if service_ref['deleted_at'] is None:
LOG.info(_LI('Service %s was aborted'), self.service_id)
def service_manage_cleanup(self):
cnxt = context.get_admin_context()
last_updated_window = (3 * cfg.CONF.periodic_interval)
time_line = datetime.datetime.utcnow() - datetime.timedelta(
seconds=last_updated_window)
service_ref = service_objects.Service.update_by_id(
cnxt,
service_ref['id'],
dict(engine_id=self.engine_id,
deleted_at=None,
report_interval=cfg.CONF.periodic_interval))
self.service_id = service_ref['id']
LOG.info(_LI('Service %s is restarted'), self.service_id)
elif len(service_refs) == 0:
# Service is started now
service_ref = service_objects.Service.create(
cnxt,
dict(host=self.host,
hostname=self.hostname,
binary=self.binary,
engine_id=self.engine_id,
topic=self.topic,
report_interval=cfg.CONF.periodic_interval)
)
self.service_id = service_ref['id']
LOG.info(_LI('Service %s is started'), self.service_id)
service_refs = service_objects.Service.get_all_by_args(
cnxt, self.host, self.binary, self.hostname)
for service_ref in service_refs:
if (service_ref['id'] == self.service_id or
service_ref['deleted_at'] is not None or
service_ref['updated_at'] is None):
continue
if service_ref['updated_at'] < time_line:
# hasn't been updated, assuming it's died.
LOG.info(_LI('Service %s was aborted'), service_ref['id'])
service_objects.Service.delete(cnxt, service_ref['id'])

View File

@ -3152,24 +3152,17 @@ class StackServiceTest(common.HeatTestCase):
self.assertTrue(mock_get_all.called)
mock_format_service.assert_called_once_with(mock.ANY)
@mock.patch.object(service_objects.Service, 'get_all_by_args')
@mock.patch.object(service_objects.Service, 'create')
@mock.patch.object(context, 'get_admin_context')
def test_service_manage_report_start(self,
mock_admin_context,
mock_service_create,
mock_get_all):
mock_service_create):
self.eng.service_id = None
mock_admin_context.return_value = self.ctx
mock_get_all.return_value = []
srv = dict(id='mock_id')
mock_service_create.return_value = srv
self.eng.service_manage_report()
mock_admin_context.assert_called_once_with()
mock_get_all.assert_called_once_with(self.ctx,
self.eng.host,
self.eng.binary,
self.eng.hostname)
mock_service_create.assert_called_once_with(
self.ctx,
dict(host=self.eng.host,
@ -3182,32 +3175,26 @@ class StackServiceTest(common.HeatTestCase):
self.assertEqual(self.eng.service_id, srv['id'])
@mock.patch.object(service_objects.Service, 'get_all_by_args')
@mock.patch.object(service_objects.Service, 'update_by_id')
@mock.patch.object(service_objects.Service, 'delete')
@mock.patch.object(context, 'get_admin_context')
def test_service_manage_report_restart(
self,
mock_admin_context,
mock_service_update,
mock_get_all):
self.eng.service_id = None
srv = dict(id='mock_id', deleted_at=None)
mock_get_all.return_value = [srv]
def test_service_manage_report_cleanup(self,
mock_admin_context,
mock_service_delete,
mock_get_all):
mock_admin_context.return_value = self.ctx
mock_service_update.return_value = srv
self.eng.service_manage_report()
ages_a_go = datetime.datetime.utcnow() - datetime.timedelta(
seconds=4000)
mock_get_all.return_value = [{'id': 'foo',
'deleted_at': None,
'updated_at': ages_a_go}]
self.eng.service_manage_cleanup()
mock_admin_context.assert_called_once_with()
mock_get_all.assert_called_once_with(self.ctx,
self.eng.host,
self.eng.binary,
self.eng.hostname)
mock_service_update.assert_called_once_with(
self.ctx,
srv['id'],
dict(engine_id=self.eng.engine_id,
deleted_at=None,
report_interval=cfg.CONF.periodic_interval))
self.assertEqual(self.eng.service_id, srv['id'])
mock_service_delete.assert_called_once_with(
self.ctx, 'foo')
@mock.patch.object(service_objects.Service, 'update_by_id')
@mock.patch.object(context, 'get_admin_context')
@ -3222,7 +3209,7 @@ class StackServiceTest(common.HeatTestCase):
mock_service_update.assert_called_once_with(
self.ctx,
'mock_id',
dict())
dict(deleted_at=None))
def test_stop_rpc_server(self):
with mock.patch.object(self.eng,
@ -3241,6 +3228,7 @@ class StackServiceTest(common.HeatTestCase):
rpc_client_class,
target_class,
rpc_server_method):
self.patchobject(self.eng, 'service_manage_cleanup')
self.eng.start()
# engine id
@ -3388,6 +3376,7 @@ class StackServiceTest(common.HeatTestCase):
service_delete_method,
admin_context_method):
cfg.CONF.set_default('periodic_interval', 60)
self.patchobject(self.eng, 'service_manage_cleanup')
self.eng.start()
# Add dummy thread group to test thread_group_mgr.stop() is executed?