Remove context from service db apis
This also adds service create call to the service start up procedure. Change-Id: Ia7a0296929ab2b17250128fee808846d9222e9d3
This commit is contained in:
parent
62c9f95652
commit
58ed4484dc
|
@ -105,7 +105,7 @@ class ServiceManageCommand(object):
|
|||
svc = self._format_service(service)
|
||||
if svc['status'] == 'down':
|
||||
print(_('Dead service %s is removed.') % svc['service_id'])
|
||||
service_obj.Service.delete(self.ctx, svc['service_id'])
|
||||
service_obj.Service.delete(svc['service_id'])
|
||||
|
||||
@staticmethod
|
||||
def add_service_parsers(subparsers):
|
||||
|
|
|
@ -434,30 +434,29 @@ def receiver_delete(context, receiver_id):
|
|||
return IMPL.receiver_delete(context, receiver_id)
|
||||
|
||||
|
||||
def service_create(context, service_id, host=None, binary=None,
|
||||
topic=None):
|
||||
return IMPL.service_create(context, service_id=service_id, host=host,
|
||||
binary=binary, topic=topic)
|
||||
def service_create(service_id, host=None, binary=None, topic=None):
|
||||
return IMPL.service_create(service_id, host=host, binary=binary,
|
||||
topic=topic)
|
||||
|
||||
|
||||
def service_update(context, service_id, values=None):
|
||||
return IMPL.service_update(context, service_id, values=values)
|
||||
def service_update(service_id, values=None):
|
||||
return IMPL.service_update(service_id, values=values)
|
||||
|
||||
|
||||
def service_delete(context, service_id):
|
||||
return IMPL.service_delete(context, service_id)
|
||||
def service_delete(service_id):
|
||||
return IMPL.service_delete(service_id)
|
||||
|
||||
|
||||
def service_get(context, service_id):
|
||||
return IMPL.service_get(context, service_id)
|
||||
def service_get(service_id):
|
||||
return IMPL.service_get(service_id)
|
||||
|
||||
|
||||
def service_get_all(context):
|
||||
return IMPL.service_get_all(context)
|
||||
def service_get_all():
|
||||
return IMPL.service_get_all()
|
||||
|
||||
|
||||
def gc_by_engine(context, engine_id):
|
||||
return IMPL.gc_by_engine(context, engine_id)
|
||||
def gc_by_engine(engine_id):
|
||||
return IMPL.gc_by_engine(engine_id)
|
||||
|
||||
|
||||
def registry_create(context, cluster_id, check_type, interval, params,
|
||||
|
|
|
@ -1341,8 +1341,7 @@ def receiver_delete(context, receiver_id):
|
|||
session.delete(receiver)
|
||||
|
||||
|
||||
def service_create(context, service_id, host=None, binary=None,
|
||||
topic=None):
|
||||
def service_create(service_id, host=None, binary=None, topic=None):
|
||||
with session_for_write() as session:
|
||||
time_now = timeutils.utcnow(True)
|
||||
svc = models.Service(id=service_id, host=host, binary=binary,
|
||||
|
@ -1352,7 +1351,7 @@ def service_create(context, service_id, host=None, binary=None,
|
|||
return svc
|
||||
|
||||
|
||||
def service_update(context, service_id, values=None):
|
||||
def service_update(service_id, values=None):
|
||||
with session_for_write() as session:
|
||||
service = session.query(models.Service).get(service_id)
|
||||
if not service:
|
||||
|
@ -1367,21 +1366,23 @@ def service_update(context, service_id, values=None):
|
|||
return service
|
||||
|
||||
|
||||
def service_delete(context, service_id):
|
||||
def service_delete(service_id):
|
||||
with session_for_write() as session:
|
||||
session.query(models.Service).filter_by(
|
||||
id=service_id).delete(synchronize_session='fetch')
|
||||
|
||||
|
||||
def service_get(context, service_id):
|
||||
return model_query(context, models.Service).get(service_id)
|
||||
def service_get(service_id):
|
||||
with session_for_read() as session:
|
||||
return session.query(models.Service).get(service_id)
|
||||
|
||||
|
||||
def service_get_all(context):
|
||||
return model_query(context, models.Service).all()
|
||||
def service_get_all():
|
||||
with session_for_read() as session:
|
||||
return session.query(models.Service).all()
|
||||
|
||||
|
||||
def gc_by_engine(context, engine_id):
|
||||
def gc_by_engine(engine_id):
|
||||
# Get all actions locked by an engine
|
||||
with session_for_write() as session:
|
||||
q_actions = session.query(models.Action).filter_by(owner=engine_id)
|
||||
|
|
|
@ -128,6 +128,11 @@ class EngineService(service.Service):
|
|||
target, self, serializer=serializer)
|
||||
self._rpc_server.start()
|
||||
|
||||
# create service record
|
||||
ctx = senlin_context.get_admin_context()
|
||||
service_obj.Service.create(ctx, self.engine_id, self.host,
|
||||
'senlin-engine', self.topic)
|
||||
|
||||
# create a health manager RPC service for this engine.
|
||||
self.health_mgr = health_manager.HealthManager(
|
||||
self, self.health_mgr_topic, consts.RPC_API_VERSION)
|
||||
|
@ -139,8 +144,7 @@ class EngineService(service.Service):
|
|||
self.cleanup_timer = self.TG.add_timer(2 * CONF.periodic_interval,
|
||||
self.service_manage_cleanup)
|
||||
|
||||
self.TG.add_timer(CONF.periodic_interval,
|
||||
self.service_manage_report)
|
||||
self.TG.add_timer(CONF.periodic_interval, self.service_manage_report)
|
||||
super(EngineService, self).start()
|
||||
|
||||
def _stop_rpc_server(self):
|
||||
|
@ -167,23 +171,14 @@ class EngineService(service.Service):
|
|||
|
||||
self.TG.stop()
|
||||
|
||||
ctx = senlin_context.get_admin_context()
|
||||
service_obj.Service.delete(ctx, self.engine_id)
|
||||
service_obj.Service.delete(self.engine_id)
|
||||
LOG.info(_LI('Engine %s is deleted'), self.engine_id)
|
||||
|
||||
super(EngineService, self).stop()
|
||||
|
||||
def service_manage_report(self):
|
||||
ctx = senlin_context.get_admin_context()
|
||||
try:
|
||||
svc = service_obj.Service.update(ctx, self.engine_id)
|
||||
# if svc is None, means it's not created.
|
||||
if svc is None:
|
||||
service_obj.Service.create(ctx, self.engine_id, self.host,
|
||||
'senlin-engine', self.topic)
|
||||
except Exception as ex:
|
||||
LOG.error(_LE('Service %(service_id)s update failed: %(error)s'),
|
||||
{'service_id': self.engine_id, 'error': ex})
|
||||
service_obj.Service.update(ctx, self.engine_id)
|
||||
|
||||
def _service_manage_cleanup(self):
|
||||
ctx = senlin_context.get_admin_context()
|
||||
|
@ -195,9 +190,9 @@ class EngineService(service.Service):
|
|||
if timeutils.is_older_than(svc['updated_at'], time_window):
|
||||
LOG.info(_LI('Service %s was aborted'), svc['id'])
|
||||
LOG.info(_LI('Breaking locks for dead engine %s'), svc['id'])
|
||||
service_obj.Service.gc_by_engine(ctx, svc['id'])
|
||||
service_obj.Service.gc_by_engine(svc['id'])
|
||||
LOG.info(_LI('Done breaking locks for engine %s'), svc['id'])
|
||||
service_obj.Service.delete(ctx, svc['id'])
|
||||
service_obj.Service.delete(svc['id'])
|
||||
|
||||
def service_manage_cleanup(self):
|
||||
self._service_manage_cleanup()
|
||||
|
|
|
@ -34,29 +34,29 @@ class Service(base.SenlinObject, base.VersionedObjectDictCompat):
|
|||
|
||||
@classmethod
|
||||
def create(cls, context, service_id, host=None, binary=None, topic=None):
|
||||
obj = db_api.service_create(context, service_id=service_id, host=host,
|
||||
binary=binary, topic=topic)
|
||||
obj = db_api.service_create(service_id, host=host, binary=binary,
|
||||
topic=topic)
|
||||
return cls._from_db_object(context, cls(context), obj)
|
||||
|
||||
@classmethod
|
||||
def get(cls, context, service_id):
|
||||
obj = db_api.service_get(context, service_id)
|
||||
obj = db_api.service_get(service_id)
|
||||
return cls._from_db_object(context, cls(), obj)
|
||||
|
||||
@classmethod
|
||||
def get_all(cls, context):
|
||||
objs = db_api.service_get_all(context)
|
||||
objs = db_api.service_get_all()
|
||||
return [cls._from_db_object(context, cls(), obj) for obj in objs]
|
||||
|
||||
@classmethod
|
||||
def update(cls, context, obj_id, values=None):
|
||||
obj = db_api.service_update(context, obj_id, values=values)
|
||||
obj = db_api.service_update(obj_id, values=values)
|
||||
return cls._from_db_object(context, cls(), obj)
|
||||
|
||||
@classmethod
|
||||
def delete(cls, context, obj_id):
|
||||
db_api.service_delete(context, obj_id)
|
||||
def delete(cls, obj_id):
|
||||
db_api.service_delete(obj_id)
|
||||
|
||||
@classmethod
|
||||
def gc_by_engine(cls, context, engine_id):
|
||||
db_api.gc_by_engine(context, engine_id)
|
||||
def gc_by_engine(cls, engine_id):
|
||||
db_api.gc_by_engine(engine_id)
|
||||
|
|
|
@ -245,7 +245,7 @@ class GCByEngineTest(base.SenlinTestCase):
|
|||
db_api.cluster_lock_acquire(self.cluster.id, action.id, -1)
|
||||
|
||||
# do it
|
||||
db_api.gc_by_engine(self.ctx, engine_id)
|
||||
db_api.gc_by_engine(engine_id)
|
||||
|
||||
# assertion
|
||||
observed = db_api.cluster_lock_acquire(self.cluster.id, UUID2, -1)
|
||||
|
@ -275,7 +275,7 @@ class GCByEngineTest(base.SenlinTestCase):
|
|||
db_api.node_lock_acquire(self.cluster.id, action.id)
|
||||
|
||||
# do it
|
||||
db_api.gc_by_engine(self.ctx, engine_id)
|
||||
db_api.gc_by_engine(engine_id)
|
||||
|
||||
# assertion
|
||||
# even a read lock is okay now
|
||||
|
@ -311,7 +311,7 @@ class GCByEngineTest(base.SenlinTestCase):
|
|||
db_api.node_lock_acquire(self.node.id, action.id)
|
||||
|
||||
# do it
|
||||
db_api.gc_by_engine(self.ctx, engine_id)
|
||||
db_api.gc_by_engine(engine_id)
|
||||
|
||||
# assertion
|
||||
# a read lock is okay now and cluster lock state not broken
|
||||
|
|
|
@ -23,7 +23,7 @@ class DBAPIRegistryTest(base.SenlinTestCase):
|
|||
super(DBAPIRegistryTest, self).setUp()
|
||||
self.ctx = utils.dummy_context()
|
||||
|
||||
db_api.service_create(self.ctx, 'SERVICE_ID')
|
||||
db_api.service_create('SERVICE_ID')
|
||||
|
||||
def _create_registry(self, cluster_id, check_type, interval, params,
|
||||
engine_id):
|
||||
|
@ -76,7 +76,7 @@ class DBAPIRegistryTest(base.SenlinTestCase):
|
|||
|
||||
@mock.patch.object(db_utils, 'is_service_dead')
|
||||
def test_registry_claim_with_dead_engine(self, mock_check):
|
||||
db_api.service_create(self.ctx, 'SERVICE_ID_DEAD')
|
||||
db_api.service_create('SERVICE_ID_DEAD')
|
||||
self._create_registry(
|
||||
cluster_id='CLUSTER_1', check_type='NODE_STATUS_POLLING',
|
||||
interval=60, params={}, engine_id='SERVICE_ID')
|
||||
|
|
|
@ -22,20 +22,22 @@ class DBAPIServiceTest(base.SenlinTestCase):
|
|||
super(DBAPIServiceTest, self).setUp()
|
||||
self.ctx = utils.dummy_context()
|
||||
|
||||
def _create_service(self, **kwargs):
|
||||
def _create_service(self, service_id=None, **kwargs):
|
||||
service_id = service_id or 'f9aff81e-bc1f-4119-941d-ad1ea7f31d19'
|
||||
values = {
|
||||
'service_id': 'f9aff81e-bc1f-4119-941d-ad1ea7f31d19',
|
||||
'host': 'host1.devstack.org',
|
||||
'binary': 'senlin-engine',
|
||||
'topic': 'engine',
|
||||
}
|
||||
|
||||
values.update(kwargs)
|
||||
return db_api.service_create(self.ctx, **values)
|
||||
return db_api.service_create(service_id, **values)
|
||||
|
||||
def test_service_create_get(self):
|
||||
service = self._create_service()
|
||||
ret_service = db_api.service_get(self.ctx, service.id)
|
||||
|
||||
ret_service = db_api.service_get(service.id)
|
||||
|
||||
self.assertIsNotNone(ret_service)
|
||||
self.assertEqual(service.id, ret_service.id)
|
||||
self.assertEqual(service.binary, ret_service.binary)
|
||||
|
@ -47,32 +49,35 @@ class DBAPIServiceTest(base.SenlinTestCase):
|
|||
self.assertIsNotNone(service.updated_at)
|
||||
|
||||
def test_service_get_all(self):
|
||||
values = []
|
||||
for i in range(4):
|
||||
values.append({'service_id': uuidutils.generate_uuid(),
|
||||
'host': 'host-%s' % i})
|
||||
service_id = uuidutils.generate_uuid()
|
||||
values = {'host': 'host-%s' % i}
|
||||
self._create_service(service_id, **values)
|
||||
|
||||
[self._create_service(**val) for val in values]
|
||||
services = db_api.service_get_all()
|
||||
|
||||
services = db_api.service_get_all(self.ctx)
|
||||
self.assertEqual(4, len(services))
|
||||
|
||||
def test_service_update(self):
|
||||
old_service = self._create_service()
|
||||
old_updated_time = old_service.updated_at
|
||||
values = {'host': 'host-updated'}
|
||||
new_service = db_api.service_update(self.ctx, old_service.id, values)
|
||||
|
||||
new_service = db_api.service_update(old_service.id, values)
|
||||
|
||||
self.assertEqual('host-updated', new_service.host)
|
||||
self.assertGreater(new_service.updated_at, old_updated_time)
|
||||
|
||||
def test_service_update_values_none(self):
|
||||
old_service = self._create_service()
|
||||
old_updated_time = old_service.updated_at
|
||||
new_service = db_api.service_update(self.ctx, old_service.id)
|
||||
new_service = db_api.service_update(old_service.id)
|
||||
self.assertGreater(new_service.updated_at, old_updated_time)
|
||||
|
||||
def test_service_delete(self):
|
||||
service = self._create_service()
|
||||
db_api.service_delete(self.ctx, service.id)
|
||||
res = db_api.service_get(self.ctx, service.id)
|
||||
|
||||
db_api.service_delete(service.id)
|
||||
|
||||
res = db_api.service_get(service.id)
|
||||
self.assertIsNone(res)
|
||||
|
|
|
@ -96,7 +96,7 @@ class EngineBasicTest(base.SenlinTestCase):
|
|||
mock_disp.stop.assert_called_once_with()
|
||||
mock_hm.stop.assert_called_once_with()
|
||||
|
||||
mock_delete.assert_called_once_with(mock.ANY, self.fake_id)
|
||||
mock_delete.assert_called_once_with(self.fake_id)
|
||||
|
||||
def test_engine_stop_with_exception(self, mock_msg_cls, mock_hm_cls,
|
||||
mock_disp_cls):
|
||||
|
@ -128,31 +128,12 @@ class EngineStatusTest(base.SenlinTestCase):
|
|||
self.get_rpc = self.patchobject(rpc_messaging, 'get_rpc_server',
|
||||
return_value=self.fake_rpc_server)
|
||||
|
||||
@mock.patch.object(service_obj.Service, 'create')
|
||||
@mock.patch.object(service_obj.Service, 'update')
|
||||
def test_service_manage_report_create(self, mock_update, mock_create):
|
||||
mock_update.return_value = None
|
||||
|
||||
self.eng.service_manage_report()
|
||||
|
||||
mock_create.assert_called_once_with(
|
||||
mock.ANY, self.eng.engine_id, self.eng.host, 'senlin-engine',
|
||||
self.eng.topic)
|
||||
|
||||
@mock.patch.object(service_obj.Service, 'update')
|
||||
def test_service_manage_report_update(self, mock_update):
|
||||
mock_update.return_value = mock.Mock()
|
||||
self.eng.service_manage_report()
|
||||
mock_update.assert_called_once_with(mock.ANY, self.eng.engine_id)
|
||||
|
||||
@mock.patch.object(service_obj.Service, 'update')
|
||||
def test_service_manage_report_error(self, mock_update):
|
||||
mock_update.side_effect = [Exception]
|
||||
self.eng.service_manage_report()
|
||||
mock_update.assert_called_once_with(mock.ANY, self.eng.engine_id)
|
||||
expect_str = 'Service %s update failed' % self.eng.engine_id
|
||||
self.assertIn(expect_str, self.LOG.output)
|
||||
|
||||
@mock.patch.object(service_obj.Service, 'gc_by_engine')
|
||||
@mock.patch.object(service_obj.Service, 'get_all')
|
||||
@mock.patch.object(service_obj.Service, 'delete')
|
||||
|
@ -161,5 +142,5 @@ class EngineStatusTest(base.SenlinTestCase):
|
|||
ages_a_go = timeutils.utcnow(True) - delta
|
||||
mock_get_all.return_value = [{'id': 'foo', 'updated_at': ages_a_go}]
|
||||
self.eng._service_manage_cleanup()
|
||||
mock_delete.assert_called_once_with(mock.ANY, 'foo')
|
||||
mock_gc.assert_called_once_with(mock.ANY, 'foo')
|
||||
mock_delete.assert_called_once_with('foo')
|
||||
mock_gc.assert_called_once_with('foo')
|
||||
|
|
Loading…
Reference in New Issue