Modernized producer tests

* Dropped test_ from folder name.
* Dropped usage of assert.

Change-Id: I9d76b6d5640084f019d220ddd5e85851cd824fd1
This commit is contained in:
Erik Olof Gunnar Andersson 2019-05-20 23:57:44 -07:00
parent 178328f0a8
commit 6158c83717
3 changed files with 29 additions and 32 deletions

View File

@ -21,15 +21,14 @@ Unit-test Producer service
import mock
from oslotest import base as test
from designate.producer import service
from designate.tests.unit import RoObject
import designate.producer.service as ps
@mock.patch.object(ps.rpcapi.CentralAPI, 'get_instance')
@mock.patch.object(service.rpcapi.CentralAPI, 'get_instance')
class ProducerTest(test.BaseTestCase):
def setUp(self):
ps.CONF = RoObject({
service.CONF = RoObject({
'service:producer': RoObject({
'enabled_tasks': None, # enable all tasks
}),
@ -41,20 +40,20 @@ class ProducerTest(test.BaseTestCase):
'producer_task:zone_purge': '',
})
super(ProducerTest, self).setUp()
self.tm = ps.Service()
self.tm._storage = mock.Mock()
self.tm._rpc_server = mock.Mock()
self.tm._quota = mock.Mock()
self.tm.quota.limit_check = mock.Mock()
self.service = service.Service()
self.service._storage = mock.Mock()
self.service._rpc_server = mock.Mock()
self.service._quota = mock.Mock()
self.service.quota.limit_check = mock.Mock()
def test_service_name(self, _):
self.assertEqual('producer', self.tm.service_name)
self.assertEqual('producer', self.service.service_name)
def test_central_api(self, _):
capi = self.tm.central_api
assert isinstance(capi, mock.MagicMock)
capi = self.service.central_api
self.assertIsInstance(capi, mock.MagicMock)
@mock.patch.object(ps.tasks, 'PeriodicTask')
@mock.patch.object(ps.coordination, 'Partitioner')
@mock.patch.object(service.tasks, 'PeriodicTask')
@mock.patch.object(service.coordination, 'Partitioner')
def test_stark(self, _, mock_partitioner, mock_PeriodicTask):
self.tm.start()
self.service.start()

View File

@ -19,18 +19,17 @@ Unit test Producer tasks
"""
import datetime
from oslo_utils import timeutils
from oslotest import base as test
import fixtures
import mock
import testtools
import oslotest.base
from oslo_utils import timeutils
from designate.utils import generate_uuid
from designate.central import rpcapi as central_api
from designate import context
from designate import rpc
from designate.central import rpcapi as central_api
from designate.producer import tasks
from designate.tests.unit import RoObject
from designate.utils import generate_uuid
class DummyTask(tasks.PeriodicTask):
@ -38,7 +37,7 @@ class DummyTask(tasks.PeriodicTask):
__plugin_name__ = 'dummy'
class TaskTest(test.BaseTestCase):
class TaskTest(oslotest.base.BaseTestCase):
def setup_opts(self, config):
opts = RoObject(**config)
@ -75,8 +74,7 @@ class PeriodicTest(TaskTest):
central.find_zones.return_value = []
with testtools.ExpectedException(StopIteration):
next(iterer)
self.assertRaises(StopIteration, next, iterer)
@mock.patch.object(central_api.CentralAPI, 'get_instance')
def test_iter_zones(self, get_central):
@ -100,14 +98,14 @@ class PeriodicTest(TaskTest):
# Call next on the iterator and see it trying to load a new page.
# Also this will raise a StopIteration since there are no more items.
central.find_zones.return_value = []
with testtools.ExpectedException(StopIteration):
next(iterer)
self.assertRaises(StopIteration, next, iterer)
central.find_zones.assert_called_once_with(
ctxt,
{"shard": "BETWEEN 0,9"},
marker=items[-1].id,
limit=100)
limit=100
)
def test_my_range(self):
self.assertEqual((0, 9), self.task._my_range())
@ -155,8 +153,7 @@ class PeriodicExistsTest(TaskTest):
))
def test_emit_exists(self):
zone = RoObject(
id=generate_uuid())
zone = RoObject(id=generate_uuid())
with mock.patch.object(self.task, '_iter_zones') as iter_:
iter_.return_value = [zone]
@ -181,13 +178,13 @@ class PeriodicExistsTest(TaskTest):
self.assertFalse(self.mock_notifier.info.called)
def test_emit_exists_multiple_zones(self):
zones = [RoObject() for i in range(0, 10)]
zones = [RoObject()] * 10
with mock.patch.object(self.task, '_iter_zones') as iter_:
iter_.return_value = zones
self.task()
for z in zones:
data = dict(z)
for zone in zones:
data = dict(zone)
data.update(self.period_data)
# Ensure both the old (domain) and new (zone) events are fired
@ -253,7 +250,8 @@ class PeriodicSecondaryRefreshTest(TaskTest):
zone = RoObject(
id=generate_uuid(),
transferred_at=datetime.datetime.isoformat(transferred),
refresh=3600)
refresh=3600
)
with mock.patch.object(self.task, '_iter') as _iter:
_iter.return_value = [zone]