Use lazy properties in services

Seeing as it is common to mock / patch the storage (and any service
interface point), this makes the quota and storage attributes lazy
properties that will only be loaded upon usage. This allows
instantiating the class without a database connection as well as easy
patching via setting an attribute.

For example:

    def test_create_zone(self):
        central = Service()
	central._storage = Mock()
	central.create_zone(self.context, self.zone)
	...

Change-Id: I189559738a5a43001bbc4bdfe94659ad8c58143b
This commit is contained in:
Eric Larson 2016-02-24 13:37:09 -06:00
parent 79714fc08b
commit f62907d66a
7 changed files with 60 additions and 26 deletions

View File

@ -26,6 +26,7 @@ CONF = cfg.CONF
class Service(service.DNSService, service.Service):
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)

View File

@ -268,15 +268,23 @@ class Service(service.RPCService, service.Service):
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
# Get a storage connection
storage_driver = cfg.CONF['service:central'].storage_driver
self.storage = storage.get_storage(storage_driver)
# Get a quota manager instance
self.quota = quota.get_quota()
self.network_api = network_api.get_network_api(cfg.CONF.network_api)
@property
def quota(self):
if not hasattr(self, '_quota'):
# Get a quota manager instance
self._quota = quota.get_quota()
return self._quota
@property
def storage(self):
if not hasattr(self, '_storage'):
# Get a storage connection
storage_driver = cfg.CONF['service:central'].storage_driver
self._storage = storage.get_storage(storage_driver)
return self._storage
@property
def service_name(self):
return 'central'

View File

@ -29,11 +29,15 @@ CONF = cfg.CONF
class Service(service.DNSService, service.RPCService, service.Service):
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
# Get a storage connection
self.storage = storage.get_storage(CONF['service:mdns'].storage_driver)
@property
def storage(self):
if not hasattr(self, '_storage'):
# Get a storage connection
self._storage = storage.get_storage(
CONF['service:mdns'].storage_driver
)
return self._storage
@property
def service_name(self):

View File

@ -184,15 +184,22 @@ class Service(service.RPCService, coordination.CoordinationMixin,
@property
def central_api(self):
return central_api.CentralAPI.get_instance()
if not hasattr(self, '_central_api'):
self._central_api = central_api.CentralAPI.get_instance()
return self._central_api
@property
def mdns_api(self):
return mdns_api.MdnsAPI.get_instance()
if not hasattr(self, '_mdns_adpi'):
self._mdns_api = mdns_api.MdnsAPI.get_instance()
return self._mdns_api
@property
def pool_manager_api(self):
return pool_manager_rpcapi.PoolManagerAPI.get_instance()
if not hasattr(self, '_pool_manager_api'):
pool_mgr_api = pool_manager_rpcapi.PoolManagerAPI
self._pool_manager_api = pool_mgr_api.get_instance()
return self._pool_manager_api
def _get_admin_context_all_tenants(self):
return DesignateContext.get_admin_context(all_tenants=True)

View File

@ -313,8 +313,19 @@ class CentralServiceTestCase(CentralBasic):
def test_init(self):
self.assertTrue(self.service.check_for_tlds)
# Ensure these attributes are lazy
self.assertFalse(designate.central.service.storage.get_storage.called)
self.assertFalse(designate.central.service.quota.get_quota.called)
def test_storage_loads_lazily(self):
assert self.service.storage
self.assertTrue(designate.central.service.storage.get_storage.called)
def test_quota_loads_lazily(self):
assert self.service.quota
self.assertTrue(designate.central.service.quota.get_quota.called)
def test__is_valid_ttl(self):
self.CONF.set_override('min_ttl', 10, 'service:central',
enforce_type=True)

View File

@ -1,4 +1,3 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Federico Ceratto <federico.ceratto@hpe.com>
@ -28,8 +27,7 @@ import designate.zone_manager.service as zms
@mock.patch.object(zms.rpcapi.CentralAPI, 'get_instance')
class ZoneManagerTest(test.BaseTestCase):
@mock.patch.object(zms.storage, 'get_storage')
def setUp(self, mock_get_storage):
def setUp(self):
zms.CONF = RoObject({
'service:zone_manager': RoObject({
'enabled_tasks': None, # enable all tasks
@ -39,9 +37,9 @@ class ZoneManagerTest(test.BaseTestCase):
})
super(ZoneManagerTest, self).setUp()
self.tm = zms.Service()
assert mock_get_storage.called
self.tm._storage = mock.Mock()
self.tm._rpc_server = mock.Mock()
self.tm.quota = mock.Mock()
self.tm._quota = mock.Mock()
self.tm.quota.limit_check = mock.Mock()
def test_service_name(self, _):

View File

@ -40,14 +40,19 @@ class Service(service.RPCService, coordination.CoordinationMixin,
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
@property
def storage(self):
if not hasattr(self, '_storage'):
storage_driver = cfg.CONF['service:zone_manager'].storage_driver
self._storage = storage.get_storage(storage_driver)
return self._storage
storage_driver = cfg.CONF['service:zone_manager'].storage_driver
self.storage = storage.get_storage(storage_driver)
# Get a quota manager instance
self.quota = quota.get_quota()
@property
def quota(self):
if not hasattr(self, '_quota'):
# Get a quota manager instance
self._quota = quota.get_quota()
return self._quota
@property
def service_name(self):