Merge "MDNS part of Secondary zones"

This commit is contained in:
Jenkins 2015-03-17 13:33:38 +00:00 committed by Gerrit Code Review
commit 0c982b7d8d
15 changed files with 531 additions and 40 deletions

View File

@ -0,0 +1,28 @@
#!/usr/bin/env python
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import binascii
import sys
import dns
import dns.message
import dns.rdatatype
unhexed = binascii.unhexlify(sys.argv[1])
response = dns.message.from_wire(unhexed)
print (response.to_text())

View File

@ -43,6 +43,7 @@ from designate import quota
from designate import service
from designate import utils
from designate import storage
from designate.mdns import rpcapi as mdns_rpcapi
from designate.pool_manager import rpcapi as pool_manager_rpcapi
@ -281,6 +282,10 @@ class Service(service.RPCService, service.Service):
def stop(self):
super(Service, self).stop()
@property
def mdns_api(self):
return mdns_rpcapi.MdnsAPI.get_instance()
@property
def pool_manager_api(self):
return pool_manager_rpcapi.PoolManagerAPI.get_instance()
@ -845,6 +850,8 @@ class Service(service.RPCService, service.Service):
self.pool_manager_api.create_domain(context, domain)
self.mdns_api.perform_zone_xfr(context, domain)
# If domain is a superdomain, update subdomains
# with new parent IDs
for subdomain in subdomains:
@ -962,6 +969,10 @@ class Service(service.RPCService, service.Service):
domain = self._update_domain_in_storage(
context, domain, increment_serial=increment_serial)
# Fire off a XFR
if 'masters' in changes:
self.mdns_api.perform_zone_xfr(context, domain)
self.pool_manager_api.update_domain(context, domain)
return domain

View File

@ -13,13 +13,17 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import random
import socket
import base64
import dns
import dns.exception
import dns.zone
import eventlet
from dns import rdatatype
from oslo_log import log as logging
from oslo.config import cfg
from designate import context
from designate import exceptions
@ -30,6 +34,11 @@ from designate.i18n import _LI
LOG = logging.getLogger(__name__)
util_opts = [
cfg.IntOpt('xfr_timeout', help="Timeout in seconds for XFR's.", default=10)
]
class DNSMiddleware(object):
"""Base DNS Middleware class with some utility methods"""
def __init__(self, application):
@ -181,7 +190,10 @@ def from_dnspython_zone(dnspython_zone):
values = {
'name': dnspython_zone.origin.to_text(),
'email': email,
'ttl': soa.ttl
'ttl': soa.ttl,
'serial': soa[0].serial,
'retry': soa[0].retry,
'expire': soa[0].expire
}
zone = objects.Domain(**values)
@ -249,25 +261,66 @@ def bind_udp(host, port):
return sock_udp
def do_axfr(zone_name, masters, source=None):
def expand_servers(servers):
"""
Expands list of server:port into a list of dicts.
Example: [{"host": ..., "port": 53}]
"""
data = []
for srv in servers:
if isinstance(srv, basestring):
parts = srv.split(":")
host = parts[0]
port = int(parts[1]) if len(parts) == 2 else 53
srv = {"ip": host, "port": port}
data.append(srv)
return data
def do_axfr(zone_name, servers, timeout=None, source=None):
"""
Performs an AXFR for a given zone name
"""
# TODO(Tim): Try the first master, try others if they exist
master = masters[0]
random.shuffle(servers)
timeout = timeout or 10
LOG.info(_LI("Doing AXFR for %(name)s from %(host)s") %
{'name': zone_name, 'host': master})
xfr = dns.query.xfr(master['ip'], zone_name, relativize=False,
port=master['port'], source=source)
try:
# TODO(Tim): Add a timeout to this function
raw_zone = dns.zone.from_xfr(xfr, relativize=False)
except Exception:
LOG.exception(_LE("There was a problem with the AXFR"))
raise
xfr = None
for srv in servers:
timeout = eventlet.Timeout(timeout)
log_info = {'name': zone_name, 'host': srv}
try:
LOG.info(_LI("Doing AXFR for %(name)s from %(host)s") % log_info)
xfr = dns.query.xfr(srv['ip'], zone_name, relativize=False,
timeout=1, port=srv['port'], source=source)
raw_zone = dns.zone.from_xfr(xfr, relativize=False)
break
except eventlet.Timeout as t:
if t == timeout:
msg = _LE("AXFR timed out for %(name)s from %(host)s")
LOG.error(msg % log_info)
continue
except dns.exception.FormError:
msg = _LE("Domain %(name)s is not present on %(host)s."
"Trying next server.")
LOG.error(msg % log_info)
except socket.error:
msg = _LE("Connection error when doing AXFR for %(name)s from "
"%(host)s")
LOG.error(msg % log_info)
except Exception:
msg = _LE("Problem doing AXFR %(name)s from %(host)s. "
"Trying next server.")
LOG.exception(msg % log_info)
finally:
timeout.cancel()
continue
else:
msg = _LE("XFR failed for %(name)s. No servers in %(servers)s was "
"reached.")
raise exceptions.XFRFailure(
msg % {"name": zone_name, "servers": servers})
LOG.debug("AXFR Successful for %s" % raw_zone.origin.to_text())

View File

@ -52,6 +52,10 @@ class NotImplemented(Base, NotImplementedError):
pass
class XFRFailure(Base):
pass
class ConfigurationError(Base):
error_type = 'configuration_error'

View File

@ -15,6 +15,9 @@
# under the License.
from oslo.config import cfg
from designate import dnsutils
cfg.CONF.register_group(cfg.OptGroup(
name='service:mdns', title="Configuration for mDNS Service"
))
@ -40,3 +43,4 @@ OPTS = [
]
cfg.CONF.register_opts(OPTS, group='service:mdns')
cfg.CONF.register_opts(dnsutils.util_opts, group='service:mdns')

View File

@ -24,6 +24,9 @@ from oslo.config import cfg
from oslo_log import log as logging
from designate import exceptions
from designate.mdns import xfr
from designate.central import rpcapi as central_api
from designate.i18n import _LI
from designate.i18n import _LW
@ -34,12 +37,16 @@ CONF.import_opt('default_pool_id', 'designate.central',
group='service:central')
class RequestHandler(object):
"""MiniDNS Request Handler"""
# TODO(kiall): This class is getting a little unwieldy, we should rework
# with a little more structure.
def __init__(self, storage):
class RequestHandler(xfr.XFRMixin):
def __init__(self, storage, tg):
# Get a storage connection
self.storage = storage
self.tg = tg
@property
def central_api(self):
return central_api.CentralAPI.get_instance()
def __call__(self, request):
"""
@ -62,9 +69,74 @@ class RequestHandler(object):
response = self._handle_axfr(request)
else:
response = self._handle_record_query(request)
elif request.opcode() == dns.opcode.NOTIFY:
response = self._handle_notify(request)
else:
# Unhandled OpCode's include STATUS, IQUERY, NOTIFY, UPDATE
response = self._handle_query_error(request, dns.rcode.REFUSED)
return response
def _handle_notify(self, request):
"""
Constructs the response to a NOTIFY and acts accordingly on it.
* Checks if the master sending the NOTIFY is in the Zone's masters,
if not it is ignored.
* Checks if SOA query response serial != local serial.
"""
context = request.environ['context']
response = dns.message.make_response(request)
if len(request.question) != 1:
response.set_rcode(dns.rcode.FORMERR)
return response
else:
question = request.question[0]
criterion = {
'name': question.name.to_text(),
'type': 'SECONDARY',
'deleted': False
}
try:
domain = self.storage.find_domain(context, criterion)
except exceptions.DomainNotFound:
response.set_rcode(dns.rcode.NOTAUTH)
return response
notify_addr = request.environ['addr'][0]
# We check if the src_master which is the assumed master for the zone
# that is sending this NOTIFY OP is actually the master. If it's not
# We'll reply but don't do anything with the NOTIFY.
master_addr = domain.get_master_by_ip(notify_addr)
if not master_addr:
msg = _LW("NOTIFY for %(name)s from non-master server "
"%(addr)s, ignoring.")
LOG.warn(msg % {"name": domain.name, "addr": notify_addr})
response.set_rcode(dns.rcode.REFUSED)
return response
resolver = dns.resolver.Resolver()
# According to RFC we should query the server that sent the NOTIFY
resolver.nameservers = [notify_addr]
soa_answer = resolver.query(domain.name, 'SOA')
soa_serial = soa_answer[0].serial
if soa_serial == domain.serial:
msg = _LI("Serial %(serial)s is the same for master and us for "
"%(domain_id)s")
LOG.info(msg % {"serial": soa_serial, "domain_id": domain.id})
else:
msg = _LI("Scheduling AXFR for %(domain_id)s from %(master_addr)s")
info = {"domain_id": domain.id, "master_addr": master_addr}
LOG.info(msg % info)
self.tg.add_thread(self.domain_sync, context, domain,
[master_addr])
response.flags |= dns.flags.AA
return response

View File

@ -28,6 +28,8 @@ from oslo.config import cfg
from oslo_log import log as logging
from designate.pool_manager import rpcapi as pool_mngr_api
from designate.central import rpcapi as central_api
from designate.mdns import xfr
from designate.i18n import _LI
from designate.i18n import _LW
@ -35,14 +37,19 @@ LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class NotifyEndpoint(object):
class NotifyEndpoint(xfr.XFRMixin):
RPC_NOTIFY_API_VERSION = '1.1'
target = messaging.Target(
namespace='notify', version=RPC_NOTIFY_API_VERSION)
def __init__(self, *args, **kwargs):
def __init__(self, tg, *args, **kwargs):
LOG.info(_LI("started mdns notify endpoint"))
self.tg = tg
@property
def central_api(self):
return central_api.CentralAPI.get_instance()
@property
def pool_manager_api(self):

View File

@ -26,6 +26,7 @@ MDNS_API = None
class MdnsAPI(object):
"""
Client side of the mdns RPC API.
@ -33,8 +34,12 @@ class MdnsAPI(object):
1.0 - Added notify_zone_changed and poll_for_serial_number.
1.1 - Added get_serial_number.
XFR API version history:
1.0 - Added perform_zone_xfr.
"""
RPC_NOTIFY_API_VERSION = '1.1'
RPC_XFR_API_VERSION = '1.0'
def __init__(self, topic=None):
topic = topic if topic else cfg.CONF.mdns_topic
@ -44,6 +49,11 @@ class MdnsAPI(object):
version=self.RPC_NOTIFY_API_VERSION)
self.notify_client = rpc.get_client(notify_target, version_cap='1.1')
xfr_target = messaging.Target(topic=topic,
namespace='xfr',
version=self.RPC_XFR_API_VERSION)
self.xfr_client = rpc.get_client(xfr_target, version_cap='1.0')
@classmethod
def get_instance(cls):
"""
@ -96,3 +106,8 @@ class MdnsAPI(object):
context, 'get_serial_number', domain=domain,
server=server, timeout=timeout, retry_interval=retry_interval,
max_retries=max_retries, delay=delay)
def perform_zone_xfr(self, context, domain):
LOG.info(_LI("perform_zone_xfr: Calling mdns for zone %(zone)s") %
{"zone": domain.name})
return self.xfr_client.cast(context, 'perform_zone_xfr', domain=domain)

View File

@ -22,6 +22,7 @@ from designate import storage
from designate import dnsutils
from designate.mdns import handler
from designate.mdns import notify
from designate.mdns import xfr
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
@ -41,14 +42,14 @@ class Service(service.DNSService, service.RPCService, service.Service):
@property
@utils.cache_result
def _rpc_endpoints(self):
return [notify.NotifyEndpoint()]
return [notify.NotifyEndpoint(self.tg), xfr.XfrEndpoint(self.tg)]
@property
@utils.cache_result
def _dns_application(self):
# Create an instance of the RequestHandler class and wrap with
# necessary middleware.
application = handler.RequestHandler(self.storage)
application = handler.RequestHandler(self.storage, self.tg)
application = dnsutils.TsigInfoMiddleware(application, self.storage)
application = dnsutils.SerializationMiddleware(
application, dnsutils.TsigKeyring(self.storage))

69
designate/mdns/xfr.py Normal file
View File

@ -0,0 +1,69 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo import messaging
from oslo.config import cfg
from oslo_utils import timeutils
from oslo_log import log as logging
from designate.i18n import _LI
from designate.central import rpcapi as central_api
from designate import dnsutils
from designate import exceptions
LOG = logging.getLogger(__name__)
class XFRMixin(object):
"""
Utility mixin that holds common methods for XFR functionality.
"""
RPC_XFR_API_VERSION = '1.0'
target = messaging.Target(
namespace='xfr', version=RPC_XFR_API_VERSION)
def domain_sync(self, context, domain, servers=None):
servers = servers or domain.masters
servers = dnsutils.expand_servers(servers)
timeout = cfg.CONF["service:mdns"].xfr_timeout
try:
dnspython_zone = dnsutils.do_axfr(domain.name, servers,
timeout=timeout)
except exceptions.XFRFailure as e:
LOG.warning(e.message)
return
zone = dnsutils.from_dnspython_zone(dnspython_zone)
domain.update(zone)
domain.transferred_at = timeutils.utcnow()
self.central_api.update_domain(context, domain, increment_serial=False)
class XfrEndpoint(XFRMixin):
def __init__(self, tg, *args, **kwargs):
LOG.info(_LI("started mdns xfr endpoint"))
self.tg = tg
@property
def central_api(self):
return central_api.CentralAPI.get_instance()
def perform_zone_xfr(self, context, domain):
self.tg.add_thread(self.domain_sync, context, domain)

View File

@ -62,6 +62,15 @@ class Domain(base.DictObjectMixin, base.SoftDeleteObjectMixin,
attributes.append(obj)
self.attributes = attributes
def get_master_by_ip(self, host):
"""
Utility to get the master by it's ip for this domain.
"""
for srv in self.masters:
if host == srv.split(":")[0]:
return srv
return False
class DomainList(base.ListObjectMixin, base.DesignateObject,
base.PagedListObjectMixin):

View File

@ -330,6 +330,31 @@ class SQLAlchemyStorage(sqlalchemy_base.SQLAlchemy, storage_base.Storage):
attr.domain_id = domain.id
self.create_domain_attribute(context, domain.id, attr)
if domain.obj_attr_is_set('recordsets'):
existing = self.find_recordsets(context, {'domain_id': domain.id})
data = {}
for rrset in existing:
data[rrset.name, rrset.type] = rrset
keep = set()
for rrset in domain.recordsets:
current = data.get((rrset.name, rrset.type))
if current:
current.update(rrset)
current.records = rrset.records
self.update_recordset(context, current)
keep.add(current.id)
else:
self.create_recordset(context, domain.id, rrset)
keep.add(rrset.id)
if domain.type == 'SECONDARY':
# Purge anything that shouldn't be there :P
for i in set([i.id for i in data.values()]) - keep:
self.delete_recordset(context, i)
if tenant_id_changed:
recordsets_query = tables.recordsets.update().\
where(tables.recordsets.c.domain_id == domain.id)\

View File

@ -16,20 +16,42 @@
import binascii
import dns
import dns.rdataclass
import dns.rdatatype
import dns.resolver
import dns.rrset
import mock
from oslo.config import cfg
from designate import context
from designate import objects
from designate.tests.test_mdns import MdnsTestCase
from designate.mdns import handler
CONF = cfg.CONF
default_pool_id = CONF['service:central'].default_pool_id
ANSWER = [
"id 1234",
"opcode QUERY",
"rcode NOERROR",
"flags QR AA RD",
";QUESTION",
"example.com. IN SOA",
";ANSWER",
"example.com. 3600 IN SOA ns1.example.com. root.master.com. "
"%(serial)s 3600 1800 604800 3600",
";AUTHORITY",
"example.com. 3600 IN NS ns1.master.com.",
";ADDITIONAL"
]
class MdnsRequestHandlerTest(MdnsTestCase):
def setUp(self):
super(MdnsRequestHandlerTest, self).setUp()
self.handler = handler.RequestHandler(self.storage)
self.mock_tg = mock.Mock()
self.handler = handler.RequestHandler(self.storage, self.mock_tg)
self.addr = ["0.0.0.0", 5556]
self.context = context.DesignateContext.get_admin_context(
@ -98,27 +120,194 @@ class MdnsRequestHandlerTest(MdnsTestCase):
self.assertEqual(expected_response, binascii.b2a_hex(response))
def test_dispatch_opcode_notify(self):
# DNS packet with NOTIFY opcode
payload = "271321000001000000000000076578616d706c6503636f6d0000010001"
def _get_secondary_domain(self, values=None, attributes=None):
attributes = attributes or []
fixture = self.get_domain_fixture("SECONDARY", values=values)
fixture['email'] = cfg.CONF['service:central'].managed_resource_email
# expected response is an error code REFUSED. The other fields are
# id 10003
domain = objects.Domain(**fixture)
domain.attributes = objects.DomainAttributeList()
return domain
def _get_soa_answer(self, serial):
text = "\n".join(ANSWER) % {"serial": str(serial)}
msg = dns.message.from_text(text)
name = dns.name.from_text('example.com.')
answer = dns.resolver.Answer(name, dns.rdatatype.SOA,
dns.rdataclass.IN, msg)
return answer
@mock.patch.object(dns.resolver.Resolver, 'query')
def test_dispatch_opcode_notify_different_serial(self, func):
# DNS packet with NOTIFY opcode
payload = "c38021000001000000000000076578616d706c6503636f6d0000060001"
master = "10.0.0.1"
domain = self._get_secondary_domain({"serial": 123})
domain.attributes.append(objects.DomainAttribute(
**{"key": "master", "value": master}))
# expected response is an error code NOERROR. The other fields are
# id 50048
# opcode NOTIFY
# rcode REFUSED
# flags QR RD
# rcode NOERROR
# flags QR AA RD
# ;QUESTION
# example.com. IN A
# ;ANSWER
# ;AUTHORITY
# ;ADDITIONAL
expected_response = ("2713a1050001000000000000076578616d706c6503636f6d"
"0000010001")
expected_response = ("c380a5000001000000000000076578616d706c6503636f6d"
"0000060001")
# The SOA serial should be different from the one in thedomain and
# will trigger a AXFR
func.return_value = self._get_soa_answer(123123)
request = dns.message.from_wire(binascii.a2b_hex(payload))
request.environ = {'addr': self.addr, 'context': self.context}
request.environ = {
'addr': (master, 53),
'context': self.context
}
with mock.patch.object(self.handler.storage, 'find_domain',
return_value=domain):
response = self.handler(request).to_wire()
self.mock_tg.add_thread.assert_called_with(
self.handler.domain_sync, self.context, domain, [master])
self.assertEqual(expected_response, binascii.b2a_hex(response))
@mock.patch.object(dns.resolver.Resolver, 'query')
def test_dispatch_opcode_notify_same_serial(self, func):
# DNS packet with NOTIFY opcode
payload = "c38021000001000000000000076578616d706c6503636f6d0000060001"
master = "10.0.0.1"
domain = self._get_secondary_domain({"serial": 123})
domain.attributes.append(objects.DomainAttribute(
**{"key": "master", "value": master}))
# expected response is an error code NOERROR. The other fields are
# id 50048
# opcode NOTIFY
# rcode NOERROR
# flags QR AA RD
# ;QUESTION
# example.com. IN SOA
# ;ANSWER
# ;AUTHORITY
# ;ADDITIONAL
expected_response = ("c380a5000001000000000000076578616d706c6503636f6d"
"0000060001")
# The SOA serial should be different from the one in thedomain and
# will trigger a AXFR
func.return_value = self._get_soa_answer(domain.serial)
request = dns.message.from_wire(binascii.a2b_hex(payload))
request.environ = {
'addr': (master, 53),
'context': self.context
}
with mock.patch.object(self.handler.storage, 'find_domain',
return_value=domain):
response = self.handler(request).to_wire()
assert not self.mock_tg.add_thread.called
self.assertEqual(expected_response, binascii.b2a_hex(response))
def test_dispatch_opcode_notify_invalid_master(self):
# DNS packet with NOTIFY opcode
payload = "c38021000001000000000000076578616d706c6503636f6d0000060001"
# Have a domain with different master then the one where the notify
# comes from causing it to be "ignored" as in not transferred and
# logged
master = "10.0.0.1"
domain = self._get_secondary_domain({"serial": 123})
domain.attributes.append(objects.DomainAttribute(
**{"key": "master", "value": master}))
# expected response is an error code REFUSED. The other fields are
# id 50048
# opcode NOTIFY
# rcode REFUSED
# flags QR AA RD
# ;QUESTION
# example.com. IN SOA
# ;ANSWER
# ;AUTHORITY
# ;ADDITIONAL
expected_response = ("c380a1050001000000000000076578616d706c6503636f6d"
"0000060001")
request = dns.message.from_wire(binascii.a2b_hex(payload))
request.environ = {
'addr': ("10.0.0.2", 53),
'context': self.context
}
with mock.patch.object(self.handler.storage, 'find_domain',
return_value=domain):
response = self.handler(request).to_wire()
assert not self.mock_tg.add_thread.called
self.assertEqual(expected_response, binascii.b2a_hex(response))
def test_dispatch_opcode_notify_no_question_formerr(self):
# DNS packet with NOTIFY opcode and no question
payload = "f16320000000000000000000"
# expected response is an error code FORMERR. The other fields are
# id 61795
# opcode NOTIFY
# rcode FORMERR
# flags QR RD
# ;QUESTION
# ;ANSWER
# ;AUTHORITY
# ;ADDITIONAL
expected_response = ("f163a0010000000000000000")
request = dns.message.from_wire(binascii.a2b_hex(payload))
request.environ = {
'addr': ("10.0.0.2", 53),
'context': self.context
}
response = self.handler(request).to_wire()
assert not self.mock_tg.add_thread.called
self.assertEqual(expected_response, binascii.b2a_hex(response))
def test_dispatch_opcode_notify_invalid_domain(self):
# DNS packet with NOTIFY opcode
payload = "c38021000001000000000000076578616d706c6503636f6d0000060001"
# expected response is an error code NOTAUTH. The other fields are
# id 50048
# opcode NOTIFY
# rcode NOTAUTH
# flags QR RD
# ;QUESTION
# example.com. IN SOA
# ;ANSWER
# ;AUTHORITY
# ;ADDITIONAL
expected_response = ("c380a1090001000000000000076578616d706c6503636f6"
"d0000060001")
request = dns.message.from_wire(binascii.a2b_hex(payload))
request.environ = {
'addr': ("10.0.0.2", 53),
'context': self.context
}
response = self.handler(request).to_wire()
assert not self.mock_tg.add_thread.called
self.assertEqual(expected_response, binascii.b2a_hex(response))
def test_dispatch_opcode_update(self):

View File

@ -18,6 +18,7 @@ import dns
import dns.message
import dns.query
import dns.exception
import mock
from mock import patch
from designate.tests.test_mdns import MdnsTestCase
@ -35,7 +36,6 @@ class MdnsNotifyTest(MdnsTestCase):
def setUp(self):
super(MdnsNotifyTest, self).setUp()
self.notify = notify.NotifyEndpoint()
server_values = {
'id': 'f278782a-07dc-4502-9177-b5d85c5f7c7e',
'host': '127.0.0.1',
@ -43,6 +43,8 @@ class MdnsNotifyTest(MdnsTestCase):
'backend': 'fake'
}
self.server = objects.PoolServer.from_dict(server_values)
self.mock_tg = mock.Mock()
self.notify = notify.NotifyEndpoint(self.mock_tg)
def test_send_notify_message(self):
# id 10001

View File

@ -9,6 +9,8 @@
"admin_or_owner_or_target":"rule:owner_or_target or rule:admin",
"admin_or_target":"rule:admin or rule:target",
"domain_primary_or_admin": "('PRIMARY':%(domain_type)s and rule:admin_or_owner) OR ('SECONDARY':%(domain_type)s AND is_admin:True)",
"default": "rule:admin_or_owner",
"all_tenants": "rule:admin",
@ -48,13 +50,13 @@
"count_domains": "rule:admin_or_owner",
"touch_domain": "rule:admin_or_owner",
"create_recordset": "('PRIMARY':%(domain_type)s and rule:admin_or_owner) OR ('SECONDARY':%(domain_type)s AND is_admin:True)",
"create_recordset": "rule:domain_primary_or_admin",
"get_recordsets": "rule:admin_or_owner",
"get_recordset": "rule:admin_or_owner",
"find_recordsets": "rule:admin_or_owner",
"find_recordset": "rule:admin_or_owner",
"update_recordset": "('PRIMARY':%(domain_type)s and rule:admin_or_owner) OR ('SECONDARY':%(domain_type)s AND is_admin:True)",
"delete_recordset": "('PRIMARY':%(domain_type)s and rule:admin_or_owner) OR ('SECONDARY':%(domain_type)s AND is_admin:True)",
"update_recordset": "rule:domain_primary_or_admin",
"delete_recordset": "rule:domain_primary_or_admin",
"count_recordset": "rule:admin_or_owner",
"create_record": "rule:admin_or_owner",