From 58ed4484dc840f6f91a16136f1c03faabb612ee4 Mon Sep 17 00:00:00 2001 From: tengqm Date: Sun, 19 Feb 2017 22:00:32 -0500 Subject: [PATCH] Remove context from service db apis This also adds service create call to the service start up procedure. Change-Id: Ia7a0296929ab2b17250128fee808846d9222e9d3 --- senlin/cmd/manage.py | 2 +- senlin/db/api.py | 27 ++++++++-------- senlin/db/sqlalchemy/api.py | 19 ++++++------ senlin/engine/service.py | 25 ++++++--------- senlin/objects/service.py | 18 +++++------ senlin/tests/unit/db/test_lock_api.py | 6 ++-- senlin/tests/unit/db/test_registry_api.py | 4 +-- senlin/tests/unit/db/test_service_api.py | 31 +++++++++++-------- .../unit/engine/test_engine_startstop.py | 25 ++------------- 9 files changed, 69 insertions(+), 88 deletions(-) diff --git a/senlin/cmd/manage.py b/senlin/cmd/manage.py index 05964e110..e9504cfa7 100644 --- a/senlin/cmd/manage.py +++ b/senlin/cmd/manage.py @@ -105,7 +105,7 @@ class ServiceManageCommand(object): svc = self._format_service(service) if svc['status'] == 'down': print(_('Dead service %s is removed.') % svc['service_id']) - service_obj.Service.delete(self.ctx, svc['service_id']) + service_obj.Service.delete(svc['service_id']) @staticmethod def add_service_parsers(subparsers): diff --git a/senlin/db/api.py b/senlin/db/api.py index 08e54d52a..fe829bd9f 100644 --- a/senlin/db/api.py +++ b/senlin/db/api.py @@ -434,30 +434,29 @@ def receiver_delete(context, receiver_id): return IMPL.receiver_delete(context, receiver_id) -def service_create(context, service_id, host=None, binary=None, - topic=None): - return IMPL.service_create(context, service_id=service_id, host=host, - binary=binary, topic=topic) +def service_create(service_id, host=None, binary=None, topic=None): + return IMPL.service_create(service_id, host=host, binary=binary, + topic=topic) -def service_update(context, service_id, values=None): - return IMPL.service_update(context, service_id, values=values) +def service_update(service_id, values=None): + return IMPL.service_update(service_id, values=values) -def service_delete(context, service_id): - return IMPL.service_delete(context, service_id) +def service_delete(service_id): + return IMPL.service_delete(service_id) -def service_get(context, service_id): - return IMPL.service_get(context, service_id) +def service_get(service_id): + return IMPL.service_get(service_id) -def service_get_all(context): - return IMPL.service_get_all(context) +def service_get_all(): + return IMPL.service_get_all() -def gc_by_engine(context, engine_id): - return IMPL.gc_by_engine(context, engine_id) +def gc_by_engine(engine_id): + return IMPL.gc_by_engine(engine_id) def registry_create(context, cluster_id, check_type, interval, params, diff --git a/senlin/db/sqlalchemy/api.py b/senlin/db/sqlalchemy/api.py index 53dea9c15..d29ca7b94 100644 --- a/senlin/db/sqlalchemy/api.py +++ b/senlin/db/sqlalchemy/api.py @@ -1341,8 +1341,7 @@ def receiver_delete(context, receiver_id): session.delete(receiver) -def service_create(context, service_id, host=None, binary=None, - topic=None): +def service_create(service_id, host=None, binary=None, topic=None): with session_for_write() as session: time_now = timeutils.utcnow(True) svc = models.Service(id=service_id, host=host, binary=binary, @@ -1352,7 +1351,7 @@ def service_create(context, service_id, host=None, binary=None, return svc -def service_update(context, service_id, values=None): +def service_update(service_id, values=None): with session_for_write() as session: service = session.query(models.Service).get(service_id) if not service: @@ -1367,21 +1366,23 @@ def service_update(context, service_id, values=None): return service -def service_delete(context, service_id): +def service_delete(service_id): with session_for_write() as session: session.query(models.Service).filter_by( id=service_id).delete(synchronize_session='fetch') -def service_get(context, service_id): - return model_query(context, models.Service).get(service_id) +def service_get(service_id): + with session_for_read() as session: + return session.query(models.Service).get(service_id) -def service_get_all(context): - return model_query(context, models.Service).all() +def service_get_all(): + with session_for_read() as session: + return session.query(models.Service).all() -def gc_by_engine(context, engine_id): +def gc_by_engine(engine_id): # Get all actions locked by an engine with session_for_write() as session: q_actions = session.query(models.Action).filter_by(owner=engine_id) diff --git a/senlin/engine/service.py b/senlin/engine/service.py index 112d7b460..ef0cc4b9a 100644 --- a/senlin/engine/service.py +++ b/senlin/engine/service.py @@ -128,6 +128,11 @@ class EngineService(service.Service): target, self, serializer=serializer) self._rpc_server.start() + # create service record + ctx = senlin_context.get_admin_context() + service_obj.Service.create(ctx, self.engine_id, self.host, + 'senlin-engine', self.topic) + # create a health manager RPC service for this engine. self.health_mgr = health_manager.HealthManager( self, self.health_mgr_topic, consts.RPC_API_VERSION) @@ -139,8 +144,7 @@ class EngineService(service.Service): self.cleanup_timer = self.TG.add_timer(2 * CONF.periodic_interval, self.service_manage_cleanup) - self.TG.add_timer(CONF.periodic_interval, - self.service_manage_report) + self.TG.add_timer(CONF.periodic_interval, self.service_manage_report) super(EngineService, self).start() def _stop_rpc_server(self): @@ -167,23 +171,14 @@ class EngineService(service.Service): self.TG.stop() - ctx = senlin_context.get_admin_context() - service_obj.Service.delete(ctx, self.engine_id) + service_obj.Service.delete(self.engine_id) LOG.info(_LI('Engine %s is deleted'), self.engine_id) super(EngineService, self).stop() def service_manage_report(self): ctx = senlin_context.get_admin_context() - try: - svc = service_obj.Service.update(ctx, self.engine_id) - # if svc is None, means it's not created. - if svc is None: - service_obj.Service.create(ctx, self.engine_id, self.host, - 'senlin-engine', self.topic) - except Exception as ex: - LOG.error(_LE('Service %(service_id)s update failed: %(error)s'), - {'service_id': self.engine_id, 'error': ex}) + service_obj.Service.update(ctx, self.engine_id) def _service_manage_cleanup(self): ctx = senlin_context.get_admin_context() @@ -195,9 +190,9 @@ class EngineService(service.Service): if timeutils.is_older_than(svc['updated_at'], time_window): LOG.info(_LI('Service %s was aborted'), svc['id']) LOG.info(_LI('Breaking locks for dead engine %s'), svc['id']) - service_obj.Service.gc_by_engine(ctx, svc['id']) + service_obj.Service.gc_by_engine(svc['id']) LOG.info(_LI('Done breaking locks for engine %s'), svc['id']) - service_obj.Service.delete(ctx, svc['id']) + service_obj.Service.delete(svc['id']) def service_manage_cleanup(self): self._service_manage_cleanup() diff --git a/senlin/objects/service.py b/senlin/objects/service.py index ca06594ae..229f2e41d 100644 --- a/senlin/objects/service.py +++ b/senlin/objects/service.py @@ -34,29 +34,29 @@ class Service(base.SenlinObject, base.VersionedObjectDictCompat): @classmethod def create(cls, context, service_id, host=None, binary=None, topic=None): - obj = db_api.service_create(context, service_id=service_id, host=host, - binary=binary, topic=topic) + obj = db_api.service_create(service_id, host=host, binary=binary, + topic=topic) return cls._from_db_object(context, cls(context), obj) @classmethod def get(cls, context, service_id): - obj = db_api.service_get(context, service_id) + obj = db_api.service_get(service_id) return cls._from_db_object(context, cls(), obj) @classmethod def get_all(cls, context): - objs = db_api.service_get_all(context) + objs = db_api.service_get_all() return [cls._from_db_object(context, cls(), obj) for obj in objs] @classmethod def update(cls, context, obj_id, values=None): - obj = db_api.service_update(context, obj_id, values=values) + obj = db_api.service_update(obj_id, values=values) return cls._from_db_object(context, cls(), obj) @classmethod - def delete(cls, context, obj_id): - db_api.service_delete(context, obj_id) + def delete(cls, obj_id): + db_api.service_delete(obj_id) @classmethod - def gc_by_engine(cls, context, engine_id): - db_api.gc_by_engine(context, engine_id) + def gc_by_engine(cls, engine_id): + db_api.gc_by_engine(engine_id) diff --git a/senlin/tests/unit/db/test_lock_api.py b/senlin/tests/unit/db/test_lock_api.py index dab7de341..75cf89b00 100644 --- a/senlin/tests/unit/db/test_lock_api.py +++ b/senlin/tests/unit/db/test_lock_api.py @@ -245,7 +245,7 @@ class GCByEngineTest(base.SenlinTestCase): db_api.cluster_lock_acquire(self.cluster.id, action.id, -1) # do it - db_api.gc_by_engine(self.ctx, engine_id) + db_api.gc_by_engine(engine_id) # assertion observed = db_api.cluster_lock_acquire(self.cluster.id, UUID2, -1) @@ -275,7 +275,7 @@ class GCByEngineTest(base.SenlinTestCase): db_api.node_lock_acquire(self.cluster.id, action.id) # do it - db_api.gc_by_engine(self.ctx, engine_id) + db_api.gc_by_engine(engine_id) # assertion # even a read lock is okay now @@ -311,7 +311,7 @@ class GCByEngineTest(base.SenlinTestCase): db_api.node_lock_acquire(self.node.id, action.id) # do it - db_api.gc_by_engine(self.ctx, engine_id) + db_api.gc_by_engine(engine_id) # assertion # a read lock is okay now and cluster lock state not broken diff --git a/senlin/tests/unit/db/test_registry_api.py b/senlin/tests/unit/db/test_registry_api.py index 44eaf03c2..083984c6c 100644 --- a/senlin/tests/unit/db/test_registry_api.py +++ b/senlin/tests/unit/db/test_registry_api.py @@ -23,7 +23,7 @@ class DBAPIRegistryTest(base.SenlinTestCase): super(DBAPIRegistryTest, self).setUp() self.ctx = utils.dummy_context() - db_api.service_create(self.ctx, 'SERVICE_ID') + db_api.service_create('SERVICE_ID') def _create_registry(self, cluster_id, check_type, interval, params, engine_id): @@ -76,7 +76,7 @@ class DBAPIRegistryTest(base.SenlinTestCase): @mock.patch.object(db_utils, 'is_service_dead') def test_registry_claim_with_dead_engine(self, mock_check): - db_api.service_create(self.ctx, 'SERVICE_ID_DEAD') + db_api.service_create('SERVICE_ID_DEAD') self._create_registry( cluster_id='CLUSTER_1', check_type='NODE_STATUS_POLLING', interval=60, params={}, engine_id='SERVICE_ID') diff --git a/senlin/tests/unit/db/test_service_api.py b/senlin/tests/unit/db/test_service_api.py index c6f9deeab..9eed794fa 100644 --- a/senlin/tests/unit/db/test_service_api.py +++ b/senlin/tests/unit/db/test_service_api.py @@ -22,20 +22,22 @@ class DBAPIServiceTest(base.SenlinTestCase): super(DBAPIServiceTest, self).setUp() self.ctx = utils.dummy_context() - def _create_service(self, **kwargs): + def _create_service(self, service_id=None, **kwargs): + service_id = service_id or 'f9aff81e-bc1f-4119-941d-ad1ea7f31d19' values = { - 'service_id': 'f9aff81e-bc1f-4119-941d-ad1ea7f31d19', 'host': 'host1.devstack.org', 'binary': 'senlin-engine', 'topic': 'engine', } values.update(kwargs) - return db_api.service_create(self.ctx, **values) + return db_api.service_create(service_id, **values) def test_service_create_get(self): service = self._create_service() - ret_service = db_api.service_get(self.ctx, service.id) + + ret_service = db_api.service_get(service.id) + self.assertIsNotNone(ret_service) self.assertEqual(service.id, ret_service.id) self.assertEqual(service.binary, ret_service.binary) @@ -47,32 +49,35 @@ class DBAPIServiceTest(base.SenlinTestCase): self.assertIsNotNone(service.updated_at) def test_service_get_all(self): - values = [] for i in range(4): - values.append({'service_id': uuidutils.generate_uuid(), - 'host': 'host-%s' % i}) + service_id = uuidutils.generate_uuid() + values = {'host': 'host-%s' % i} + self._create_service(service_id, **values) - [self._create_service(**val) for val in values] + services = db_api.service_get_all() - services = db_api.service_get_all(self.ctx) self.assertEqual(4, len(services)) def test_service_update(self): old_service = self._create_service() old_updated_time = old_service.updated_at values = {'host': 'host-updated'} - new_service = db_api.service_update(self.ctx, old_service.id, values) + + new_service = db_api.service_update(old_service.id, values) + self.assertEqual('host-updated', new_service.host) self.assertGreater(new_service.updated_at, old_updated_time) def test_service_update_values_none(self): old_service = self._create_service() old_updated_time = old_service.updated_at - new_service = db_api.service_update(self.ctx, old_service.id) + new_service = db_api.service_update(old_service.id) self.assertGreater(new_service.updated_at, old_updated_time) def test_service_delete(self): service = self._create_service() - db_api.service_delete(self.ctx, service.id) - res = db_api.service_get(self.ctx, service.id) + + db_api.service_delete(service.id) + + res = db_api.service_get(service.id) self.assertIsNone(res) diff --git a/senlin/tests/unit/engine/test_engine_startstop.py b/senlin/tests/unit/engine/test_engine_startstop.py index a255b960d..13989bd5b 100644 --- a/senlin/tests/unit/engine/test_engine_startstop.py +++ b/senlin/tests/unit/engine/test_engine_startstop.py @@ -96,7 +96,7 @@ class EngineBasicTest(base.SenlinTestCase): mock_disp.stop.assert_called_once_with() mock_hm.stop.assert_called_once_with() - mock_delete.assert_called_once_with(mock.ANY, self.fake_id) + mock_delete.assert_called_once_with(self.fake_id) def test_engine_stop_with_exception(self, mock_msg_cls, mock_hm_cls, mock_disp_cls): @@ -128,31 +128,12 @@ class EngineStatusTest(base.SenlinTestCase): self.get_rpc = self.patchobject(rpc_messaging, 'get_rpc_server', return_value=self.fake_rpc_server) - @mock.patch.object(service_obj.Service, 'create') - @mock.patch.object(service_obj.Service, 'update') - def test_service_manage_report_create(self, mock_update, mock_create): - mock_update.return_value = None - - self.eng.service_manage_report() - - mock_create.assert_called_once_with( - mock.ANY, self.eng.engine_id, self.eng.host, 'senlin-engine', - self.eng.topic) - @mock.patch.object(service_obj.Service, 'update') def test_service_manage_report_update(self, mock_update): mock_update.return_value = mock.Mock() self.eng.service_manage_report() mock_update.assert_called_once_with(mock.ANY, self.eng.engine_id) - @mock.patch.object(service_obj.Service, 'update') - def test_service_manage_report_error(self, mock_update): - mock_update.side_effect = [Exception] - self.eng.service_manage_report() - mock_update.assert_called_once_with(mock.ANY, self.eng.engine_id) - expect_str = 'Service %s update failed' % self.eng.engine_id - self.assertIn(expect_str, self.LOG.output) - @mock.patch.object(service_obj.Service, 'gc_by_engine') @mock.patch.object(service_obj.Service, 'get_all') @mock.patch.object(service_obj.Service, 'delete') @@ -161,5 +142,5 @@ class EngineStatusTest(base.SenlinTestCase): ages_a_go = timeutils.utcnow(True) - delta mock_get_all.return_value = [{'id': 'foo', 'updated_at': ages_a_go}] self.eng._service_manage_cleanup() - mock_delete.assert_called_once_with(mock.ANY, 'foo') - mock_gc.assert_called_once_with(mock.ANY, 'foo') + mock_delete.assert_called_once_with('foo') + mock_gc.assert_called_once_with('foo')