Improve unit testing
Change-Id: I574525565f5c3efb9ce54dd146fbeb5815a2c2e9
This commit is contained in:
@@ -0,0 +1,54 @@
|
||||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Author: Federico Ceratto <federico.ceratto@hpe.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Unit test utilities
|
||||
"""
|
||||
|
||||
import six
|
||||
|
||||
|
||||
class RoObject(object):
|
||||
"""Read-only object: raise exception on unexpected
|
||||
__setitem__ or __setattr__
|
||||
"""
|
||||
def __init__(self, d=None, **kw):
|
||||
if d:
|
||||
kw.update(d)
|
||||
|
||||
self.__dict__.update(kw)
|
||||
|
||||
def __getitem__(self, k):
|
||||
try:
|
||||
return self.__dict__[k]
|
||||
except KeyError:
|
||||
raise NotImplementedError(
|
||||
"Attempt to perform __getitem__"
|
||||
" %r on RoObject %r" % (k, self.__dict__)
|
||||
)
|
||||
|
||||
def __setitem__(self, k, v):
|
||||
raise NotImplementedError(
|
||||
"Attempt to perform __setitem__ or __setattr__"
|
||||
" %r on RoObject %r" % (k, self.__dict__)
|
||||
)
|
||||
|
||||
def __setattr__(self, k, v):
|
||||
self.__setitem__(k, v)
|
||||
|
||||
def __iter__(self):
|
||||
for k in six.iterkeys(self.__dict__):
|
||||
yield k, self.__dict__[k]
|
||||
|
||||
211
designate/tests/unit/test_backend/test_agent.py
Normal file
211
designate/tests/unit/test_backend/test_agent.py
Normal file
@@ -0,0 +1,211 @@
|
||||
|
||||
# Author: Federico Ceratto <federico.ceratto@hpe.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
"""Unit-test backend agent
|
||||
"""
|
||||
|
||||
from mock import MagicMock
|
||||
from mock import Mock
|
||||
from mock import patch
|
||||
from oslotest import base
|
||||
import dns
|
||||
import dns.rdataclass
|
||||
import dns.rdatatype
|
||||
import mock
|
||||
import testtools
|
||||
|
||||
from designate import exceptions
|
||||
from designate.tests.unit import RoObject
|
||||
import designate.backend.agent as agent
|
||||
|
||||
|
||||
class SCAgentPoolBackend(agent.AgentPoolBackend):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
|
||||
@mock.patch.object(agent.mdns_api.MdnsAPI, 'get_instance')
|
||||
@patch.object(agent.base.Backend, '__init__')
|
||||
class BackendAgentTest(base.BaseTestCase):
|
||||
|
||||
def setUp(self, *mocks):
|
||||
super(BackendAgentTest, self).setUp()
|
||||
agent.CONF = RoObject({
|
||||
'service:mdns': RoObject(all_tcp=False)
|
||||
})
|
||||
self.agent = SCAgentPoolBackend()
|
||||
self.agent.timeout = 1
|
||||
self.agent.host = 2
|
||||
self.agent.port = 3
|
||||
self.agent.retry_interval = 4
|
||||
self.agent.max_retries = 5
|
||||
self.agent.delay = 6
|
||||
|
||||
def test_mdns_api(self, *mock):
|
||||
assert isinstance(self.agent.mdns_api, MagicMock)
|
||||
|
||||
def test_create_zone(self, *mock):
|
||||
self.agent._make_and_send_dns_message = Mock(return_value=(1, 2))
|
||||
|
||||
out = self.agent.create_zone('ctx', RoObject(name='zn'))
|
||||
|
||||
self.agent._make_and_send_dns_message.assert_called_with(
|
||||
'zn', 1, 14, agent.CREATE, agent.SUCCESS, 2, 3)
|
||||
self.assertEqual(None, out)
|
||||
|
||||
def test_create_zone_exception(self, *mock):
|
||||
self.agent._make_and_send_dns_message = Mock(return_value=(None, 2))
|
||||
|
||||
with testtools.ExpectedException(exceptions.Backend):
|
||||
self.agent.create_zone('ctx', RoObject(name='zn'))
|
||||
|
||||
self.agent._make_and_send_dns_message.assert_called_with(
|
||||
'zn', 1, 14, agent.CREATE, agent.SUCCESS, 2, 3)
|
||||
|
||||
def test_update_zone(self, *mock):
|
||||
self.agent.mdns_api.notify_zone_changed = Mock()
|
||||
zone = RoObject(name='zn')
|
||||
|
||||
out = self.agent.update_zone('ctx', zone)
|
||||
|
||||
self.agent.mdns_api.notify_zone_changed.assert_called_with(
|
||||
'ctx', zone, 2, 3, 1, 4, 5, 6)
|
||||
self.assertEqual(None, out)
|
||||
|
||||
def test_delete_zone(self, *mock):
|
||||
self.agent._make_and_send_dns_message = Mock(return_value=(1, 2))
|
||||
|
||||
out = self.agent.delete_zone('ctx', RoObject(name='zn'))
|
||||
|
||||
self.agent._make_and_send_dns_message.assert_called_with(
|
||||
'zn', 1, 14, agent.DELETE, agent.SUCCESS, 2, 3)
|
||||
self.assertEqual(None, out)
|
||||
|
||||
def test_delete_zone_exception(self, *mock):
|
||||
self.agent._make_and_send_dns_message = Mock(return_value=(None, 2))
|
||||
|
||||
with testtools.ExpectedException(exceptions.Backend):
|
||||
self.agent.delete_zone('ctx', RoObject(name='zn'))
|
||||
|
||||
self.agent._make_and_send_dns_message.assert_called_with(
|
||||
'zn', 1, 14, agent.DELETE, agent.SUCCESS, 2, 3)
|
||||
|
||||
def test_make_and_send_dns_message_timeout(self, *mocks):
|
||||
self.agent._make_dns_message = Mock(return_value='')
|
||||
self.agent._send_dns_message = Mock(
|
||||
return_value=dns.exception.Timeout())
|
||||
|
||||
out = self.agent._make_and_send_dns_message('h', 123, 1, 2, 3, 4, 5)
|
||||
|
||||
self.assertEqual((None, 0), out)
|
||||
|
||||
def test_make_and_send_dns_message_bad_response(self, *mocks):
|
||||
self.agent._make_dns_message = Mock(return_value='')
|
||||
self.agent._send_dns_message = Mock(
|
||||
return_value=agent.dns_query.BadResponse())
|
||||
|
||||
out = self.agent._make_and_send_dns_message('h', 123, 1, 2, 3, 4, 5)
|
||||
|
||||
self.assertEqual((None, 0), out)
|
||||
|
||||
def test_make_and_send_dns_message_missing_AA_flags(self, *mocks):
|
||||
self.agent._make_dns_message = Mock(return_value='')
|
||||
response = RoObject(
|
||||
rcode=Mock(return_value=dns.rcode.NOERROR),
|
||||
# rcode is NOERROR but (flags & dns.flags.AA) gives 0
|
||||
flags=0,
|
||||
)
|
||||
self.agent._send_dns_message = Mock(return_value=response)
|
||||
|
||||
out = self.agent._make_and_send_dns_message('h', 123, 1, 2, 3, 4, 5)
|
||||
|
||||
self.assertEqual((None, 0), out)
|
||||
|
||||
def test_make_and_send_dns_message_error_flags(self, *mocks):
|
||||
self.agent._make_dns_message = Mock(return_value='')
|
||||
response = RoObject(
|
||||
rcode=Mock(return_value=dns.rcode.NOERROR),
|
||||
# rcode is NOERROR but flags are not NOERROR
|
||||
flags=123,
|
||||
ednsflags=321
|
||||
)
|
||||
self.agent._send_dns_message = Mock(return_value=response)
|
||||
|
||||
out = self.agent._make_and_send_dns_message('h', 123, 1, 2, 3, 4, 5)
|
||||
|
||||
self.assertEqual((None, 0), out)
|
||||
|
||||
def test_make_and_send_dns_message(self, *mock):
|
||||
self.agent._make_dns_message = Mock(return_value='')
|
||||
response = RoObject(
|
||||
rcode=Mock(return_value=dns.rcode.NOERROR),
|
||||
flags=agent.dns.flags.AA,
|
||||
ednsflags=321
|
||||
)
|
||||
self.agent._send_dns_message = Mock(return_value=response)
|
||||
|
||||
out = self.agent._make_and_send_dns_message('h', 123, 1, 2, 3, 4, 5)
|
||||
|
||||
self.assertEqual((response, 0), out)
|
||||
|
||||
@mock.patch.object(agent.dns_query, 'tcp')
|
||||
@mock.patch.object(agent.dns_query, 'udp')
|
||||
def test_send_dns_message(self, *mocks):
|
||||
mocks[0].return_value = 'mock udp resp'
|
||||
|
||||
out = self.agent._send_dns_message('msg', 'host', 123, 1)
|
||||
|
||||
assert not agent.dns_query.tcp.called
|
||||
agent.dns_query.udp.assert_called_with('msg', 'host', port=123,
|
||||
timeout=1)
|
||||
self.assertEqual('mock udp resp', out)
|
||||
|
||||
@mock.patch.object(agent.dns_query, 'tcp')
|
||||
@mock.patch.object(agent.dns_query, 'udp')
|
||||
def test_send_dns_message_timeout(self, *mocks):
|
||||
mocks[0].side_effect = dns.exception.Timeout
|
||||
|
||||
out = self.agent._send_dns_message('msg', 'host', 123, 1)
|
||||
|
||||
agent.dns_query.udp.assert_called_with('msg', 'host', port=123,
|
||||
timeout=1)
|
||||
assert isinstance(out, dns.exception.Timeout)
|
||||
|
||||
@mock.patch.object(agent.dns_query, 'tcp')
|
||||
@mock.patch.object(agent.dns_query, 'udp')
|
||||
def test_send_dns_message_bad_response(self, *mocks):
|
||||
mocks[0].side_effect = agent.dns_query.BadResponse
|
||||
|
||||
out = self.agent._send_dns_message('msg', 'host', 123, 1)
|
||||
|
||||
agent.dns_query.udp.assert_called_with('msg', 'host', port=123,
|
||||
timeout=1)
|
||||
assert isinstance(out, agent.dns_query.BadResponse)
|
||||
|
||||
@mock.patch.object(agent.dns_query, 'tcp')
|
||||
@mock.patch.object(agent.dns_query, 'udp')
|
||||
def test_send_dns_message_tcp(self, *mocks):
|
||||
agent.CONF = RoObject({
|
||||
'service:mdns': RoObject(all_tcp=True)
|
||||
})
|
||||
mocks[1].return_value = 'mock tcp resp'
|
||||
|
||||
out = self.agent._send_dns_message('msg', 'host', 123, 1)
|
||||
|
||||
assert not agent.dns_query.udp.called
|
||||
agent.dns_query.tcp.assert_called_with('msg', 'host', port=123,
|
||||
timeout=1)
|
||||
self.assertEqual('mock tcp resp', out)
|
||||
0
designate/tests/unit/test_mdns/__init__.py
Normal file
0
designate/tests/unit/test_mdns/__init__.py
Normal file
224
designate/tests/unit/test_mdns/test_notify.py
Normal file
224
designate/tests/unit/test_mdns/test_notify.py
Normal file
@@ -0,0 +1,224 @@
|
||||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Author: Federico Ceratto <federico.ceratto@hpe.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
"""Unit-test MiniDNS service
|
||||
"""
|
||||
|
||||
from mock import Mock
|
||||
from oslotest import base
|
||||
import dns
|
||||
import dns.rdataclass
|
||||
import dns.rdatatype
|
||||
import mock
|
||||
|
||||
from designate.tests.unit import RoObject
|
||||
import designate.mdns.notify as notify
|
||||
import designate.mdns.base as mdnsbase
|
||||
|
||||
|
||||
@mock.patch.object(notify.time, 'sleep')
|
||||
@mock.patch.object(mdnsbase.pool_mngr_api.PoolManagerAPI, 'get_instance')
|
||||
class MdnsNotifyTest(base.BaseTestCase):
|
||||
|
||||
@mock.patch.object(mdnsbase.central_api.CentralAPI, 'get_instance')
|
||||
def setUp(self, *mocks):
|
||||
super(MdnsNotifyTest, self).setUp()
|
||||
notify.CONF = RoObject({
|
||||
'service:mdns': RoObject(all_tcp=False)
|
||||
})
|
||||
self.tg = Mock(name='tg')
|
||||
self.notify = notify.NotifyEndpoint(self.tg)
|
||||
|
||||
def test_notify_zone_changed(self, *mocks):
|
||||
self.notify._make_and_send_dns_message = Mock()
|
||||
|
||||
self.notify.notify_zone_changed(*range(8))
|
||||
|
||||
self.notify._make_and_send_dns_message.assert_called_with(
|
||||
1, 2, 3, 4, 5, 6, notify=True)
|
||||
|
||||
def test_poll_for_serial_number(self, *mocks):
|
||||
self.notify.get_serial_number = Mock(
|
||||
return_value=('status', 99, 9)
|
||||
)
|
||||
ns = RoObject(host='host', port=1234)
|
||||
|
||||
self.notify.poll_for_serial_number('c', 'z', ns, 1, 2, 3, 4)
|
||||
|
||||
self.notify.get_serial_number.assert_called_with(
|
||||
'c', 'z', 'host', 1234, 1, 2, 3, 4)
|
||||
self.notify.pool_manager_api.update_status.assert_called_with(
|
||||
'c', 'z', ns, 'status', 99)
|
||||
|
||||
def test_get_serial_number_nxdomain(self, *mocks):
|
||||
response = RoObject(
|
||||
answer=[RoObject(
|
||||
rdclass=dns.rdataclass.IN,
|
||||
rdtype=dns.rdatatype.SOA
|
||||
)],
|
||||
rcode=Mock(return_value=dns.rcode.NXDOMAIN)
|
||||
)
|
||||
zone = RoObject(name='zn', serial=314)
|
||||
self.notify._make_and_send_dns_message = Mock(
|
||||
return_value=(response, 1)
|
||||
)
|
||||
|
||||
out = self.notify.get_serial_number('c', zone, 'h', 1234, 1, 2, 3, 4)
|
||||
|
||||
self.assertEqual(('NO_ZONE', None, 0), out)
|
||||
|
||||
def test_get_serial_number_ok(self, *mocks):
|
||||
zone = RoObject(name='zn', serial=314)
|
||||
ds = RoObject(items=[zone])
|
||||
response = RoObject(
|
||||
answer=[RoObject(
|
||||
name='zn',
|
||||
rdclass=dns.rdataclass.IN,
|
||||
rdtype=dns.rdatatype.SOA,
|
||||
to_rdataset=Mock(return_value=ds)
|
||||
)],
|
||||
rcode=Mock(return_value=dns.rcode.NOERROR)
|
||||
)
|
||||
self.notify._make_and_send_dns_message = Mock(
|
||||
return_value=(response, 1)
|
||||
)
|
||||
|
||||
out = self.notify.get_serial_number('c', zone, 'h', 1234, 1, 2, 3, 4)
|
||||
|
||||
self.assertEqual(('SUCCESS', 314, 3), out)
|
||||
|
||||
def test_get_serial_number_too_many_retries(self, *mocks):
|
||||
zone = RoObject(name='zn', serial=314)
|
||||
ds = RoObject(items=[RoObject(serial=310)])
|
||||
response = RoObject(
|
||||
answer=[RoObject(
|
||||
name='zn',
|
||||
rdclass=dns.rdataclass.IN,
|
||||
rdtype=dns.rdatatype.SOA,
|
||||
to_rdataset=Mock(return_value=ds)
|
||||
)],
|
||||
rcode=Mock(return_value=dns.rcode.NOERROR)
|
||||
)
|
||||
self.notify._make_and_send_dns_message = Mock(
|
||||
return_value=(response, 1)
|
||||
)
|
||||
|
||||
out = self.notify.get_serial_number('c', zone, 'h', 1234, 1, 2, 3, 4)
|
||||
|
||||
self.assertEqual(('ERROR', 310, 0), out)
|
||||
|
||||
def test_make_and_send_dns_message_timeout(self, *mocks):
|
||||
zone = RoObject(name='zn')
|
||||
self.notify._make_dns_message = Mock(return_value='')
|
||||
self.notify._send_dns_message = Mock(
|
||||
return_value=dns.exception.Timeout())
|
||||
|
||||
out = self.notify._make_and_send_dns_message(zone, 'host',
|
||||
123, 1, 2, 3)
|
||||
|
||||
self.assertEqual((None, 3), out)
|
||||
|
||||
def test_make_and_send_dns_message_bad_response(self, *mocks):
|
||||
zone = RoObject(name='zn')
|
||||
self.notify._make_dns_message = Mock(return_value='')
|
||||
self.notify._send_dns_message = Mock(
|
||||
return_value=notify.dns_query.BadResponse())
|
||||
|
||||
out = self.notify._make_and_send_dns_message(zone, 'host',
|
||||
123, 1, 2, 3)
|
||||
|
||||
self.assertEqual((None, 1), out)
|
||||
|
||||
def test_make_and_send_dns_message_nxdomain(self, *mocks):
|
||||
zone = RoObject(name='zn')
|
||||
self.notify._make_dns_message = Mock(return_value='')
|
||||
response = RoObject(rcode=Mock(return_value=dns.rcode.NXDOMAIN))
|
||||
self.notify._send_dns_message = Mock(return_value=response)
|
||||
|
||||
out = self.notify._make_and_send_dns_message(zone, 'host',
|
||||
123, 1, 2, 3)
|
||||
|
||||
self.assertEqual((response, 1), out)
|
||||
|
||||
def test_make_and_send_dns_message_missing_AA_flags(self, *mocks):
|
||||
zone = RoObject(name='zn')
|
||||
self.notify._make_dns_message = Mock(return_value='')
|
||||
response = RoObject(
|
||||
rcode=Mock(return_value=dns.rcode.NOERROR),
|
||||
# rcode is NOERROR but (flags & dns.flags.AA) gives 0
|
||||
flags=0,
|
||||
)
|
||||
self.notify._send_dns_message = Mock(return_value=response)
|
||||
|
||||
out = self.notify._make_and_send_dns_message(zone, 'host',
|
||||
123, 1, 2, 3)
|
||||
|
||||
self.assertEqual((None, 1), out)
|
||||
|
||||
def test_make_and_send_dns_message_error_flags(self, *mocks):
|
||||
zone = RoObject(name='zn')
|
||||
self.notify._make_dns_message = Mock(return_value='')
|
||||
response = RoObject(
|
||||
rcode=Mock(return_value=dns.rcode.NOERROR),
|
||||
# rcode is NOERROR but flags are not NOERROR
|
||||
flags=123,
|
||||
ednsflags=321
|
||||
)
|
||||
self.notify._send_dns_message = Mock(return_value=response)
|
||||
|
||||
out = self.notify._make_and_send_dns_message(zone, 'host',
|
||||
123, 1, 2, 3)
|
||||
|
||||
self.assertEqual((None, 1), out)
|
||||
|
||||
def test_make_dns_message(self, *mocks):
|
||||
msg = self.notify._make_dns_message('zone_name')
|
||||
txt = msg.to_text().split('\n')[1:]
|
||||
self.assertEqual([
|
||||
'opcode QUERY',
|
||||
'rcode NOERROR',
|
||||
'flags RD',
|
||||
';QUESTION',
|
||||
'zone_name. IN SOA',
|
||||
';ANSWER',
|
||||
';AUTHORITY',
|
||||
';ADDITIONAL'
|
||||
], txt)
|
||||
|
||||
def test_make_dns_message_notify(self, *mocks):
|
||||
msg = self.notify._make_dns_message('zone_name', notify=True)
|
||||
txt = msg.to_text().split('\n')[1:]
|
||||
self.assertEqual([
|
||||
'opcode NOTIFY',
|
||||
'rcode NOERROR',
|
||||
'flags AA',
|
||||
';QUESTION',
|
||||
'zone_name. IN SOA',
|
||||
';ANSWER',
|
||||
';AUTHORITY',
|
||||
';ADDITIONAL',
|
||||
], txt)
|
||||
|
||||
@mock.patch.object(notify.dns_query, 'tcp')
|
||||
@mock.patch.object(notify.dns_query, 'udp')
|
||||
def test_send_dns_message(self, *mocks):
|
||||
out = self.notify._send_dns_message('msg', 'host', 123, 1)
|
||||
|
||||
assert not notify.dns_query.tcp.called
|
||||
notify.dns_query.udp.assert_called_with('msg', 'host', port=123,
|
||||
timeout=1)
|
||||
assert isinstance(out, Mock)
|
||||
75
designate/tests/unit/test_mdns/test_service.py
Normal file
75
designate/tests/unit/test_mdns/test_service.py
Normal file
@@ -0,0 +1,75 @@
|
||||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Author: Federico Ceratto <federico.ceratto@hpe.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
"""Unit-test MiniDNS service
|
||||
"""
|
||||
import unittest
|
||||
|
||||
from oslotest import base
|
||||
import mock
|
||||
|
||||
from designate.tests.unit import RoObject
|
||||
import designate.mdns.service as mdns
|
||||
|
||||
# TODO(Federico): fix skipped tests
|
||||
|
||||
|
||||
@mock.patch.object(mdns.utils, 'cache_result')
|
||||
@mock.patch.object(mdns.notify, 'NotifyEndpoint')
|
||||
@mock.patch.object(mdns.xfr, 'XfrEndpoint')
|
||||
class MdnsServiceTest(base.BaseTestCase):
|
||||
|
||||
@mock.patch.object(mdns.storage, 'get_storage', name='get_storage')
|
||||
@mock.patch.object(mdns.Service, '_rpc_endpoints')
|
||||
def setUp(self, *mocks):
|
||||
super(MdnsServiceTest, self).setUp()
|
||||
mdns.CONF = RoObject({
|
||||
'service:mdns': RoObject(storage_driver=None)
|
||||
})
|
||||
# _rpc_endpoints is a property
|
||||
mock_rpc_endpoints = mocks[0]
|
||||
mock_rpc_endpoints.__get__ = mock.Mock(
|
||||
return_value=[mock.MagicMock(), mock.MagicMock()]
|
||||
)
|
||||
|
||||
self.mdns = mdns.Service()
|
||||
self.mdns.tg = mock.Mock(name='tg')
|
||||
|
||||
def test_service_name(self, mc, mn, mx):
|
||||
self.assertEqual('mdns', self.mdns.service_name)
|
||||
|
||||
@unittest.skip("Fails when run together with designate/tests/test_mdns/")
|
||||
def test_rpc_endpoints(self, _, mock_notify, mock_xfr):
|
||||
out = self.mdns._rpc_endpoints
|
||||
self.assertEqual(2, len(out))
|
||||
assert isinstance(out[0], mock.MagicMock), out
|
||||
assert isinstance(out[1], mock.MagicMock), out
|
||||
|
||||
@unittest.skip("Fails when run together with designate/tests/test_mdns/")
|
||||
@mock.patch.object(mdns.handler, 'RequestHandler', name='reqh')
|
||||
@mock.patch.object(mdns.dnsutils, 'TsigInfoMiddleware', name='tsig')
|
||||
@mock.patch.object(mdns.dnsutils, 'SerializationMiddleware')
|
||||
def test_dns_application(self, *mocks):
|
||||
mock_serialization, mock_tsiginf, mock_req_handler = mocks[:3]
|
||||
mock_req_handler.return_value = mock.Mock(name='app')
|
||||
|
||||
app = self.mdns._dns_application
|
||||
|
||||
assert isinstance(app, mock.MagicMock), repr(app)
|
||||
assert mock_req_handler.called
|
||||
assert mock_tsiginf.called
|
||||
assert mock_serialization.called
|
||||
83
designate/tests/unit/test_zone_manager/test_service.py
Normal file
83
designate/tests/unit/test_zone_manager/test_service.py
Normal file
@@ -0,0 +1,83 @@
|
||||
|
||||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Author: Federico Ceratto <federico.ceratto@hpe.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Unit-test Zone Manager service
|
||||
"""
|
||||
|
||||
import mock
|
||||
from oslotest import base as test
|
||||
|
||||
from designate.tests.unit import RoObject
|
||||
import designate.zone_manager.service as zms
|
||||
|
||||
|
||||
@mock.patch.object(zms.rpcapi.CentralAPI, 'get_instance')
|
||||
class ZoneManagerTest(test.BaseTestCase):
|
||||
|
||||
@mock.patch.object(zms.storage, 'get_storage')
|
||||
def setUp(self, mock_get_storage):
|
||||
zms.CONF = RoObject({
|
||||
'service:zone_manager': RoObject({
|
||||
'enabled_tasks': None, # enable all tasks
|
||||
'export_synchronous': True
|
||||
}),
|
||||
'zone_manager_task:zone_purge': '',
|
||||
})
|
||||
super(ZoneManagerTest, self).setUp()
|
||||
self.tm = zms.Service()
|
||||
assert mock_get_storage.called
|
||||
self.tm._rpc_server = mock.Mock()
|
||||
self.tm.quota = mock.Mock()
|
||||
self.tm.quota.limit_check = mock.Mock()
|
||||
|
||||
def test_service_name(self, _):
|
||||
self.assertEqual('zone_manager', self.tm.service_name)
|
||||
|
||||
def test_central_api(self, _):
|
||||
capi = self.tm.central_api
|
||||
assert isinstance(capi, mock.MagicMock)
|
||||
|
||||
@mock.patch.object(zms.tasks, 'PeriodicTask')
|
||||
@mock.patch.object(zms.coordination, 'Partitioner')
|
||||
def test_stark(self, _, mock_partitioner, mock_PeriodicTask):
|
||||
self.tm.start()
|
||||
|
||||
def test_start_zone_export(self, _):
|
||||
zone = RoObject(id=3)
|
||||
context = mock.Mock()
|
||||
export = {}
|
||||
self.tm.storage.count_recordsets.return_value = 1
|
||||
assert self.tm.storage.count_recordsets() == 1
|
||||
self.tm._determine_export_method = mock.Mock()
|
||||
self.tm.start_zone_export(context, zone, export)
|
||||
assert self.tm._determine_export_method.called
|
||||
assert self.tm.central_api.update_zone_export.called
|
||||
call_args = self.tm._determine_export_method.call_args_list[0][0]
|
||||
self.assertEqual((context, export, 1), call_args)
|
||||
|
||||
def test_determine_export_method(self, _):
|
||||
context = mock.Mock()
|
||||
export = dict(location=None, id=4)
|
||||
size = mock.Mock()
|
||||
out = self.tm._determine_export_method(context, export, size)
|
||||
self.assertDictEqual(
|
||||
{
|
||||
'status': 'COMPLETE', 'id': 4,
|
||||
'location': 'designate://v2/zones/tasks/exports/4/export'
|
||||
},
|
||||
out
|
||||
)
|
||||
Reference in New Issue
Block a user