Add tasks and periodic emits
Change-Id: I645abc6634442a842f2af01667e97951c2cc4809
This commit is contained in:
parent
0efe815a63
commit
8481674b11
0
designate/tests/test_zone_manager/__init__.py
Normal file
0
designate/tests/test_zone_manager/__init__.py
Normal file
27
designate/tests/test_zone_manager/test_service.py
Normal file
27
designate/tests/test_zone_manager/test_service.py
Normal file
@ -0,0 +1,27 @@
|
||||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Author: Endre Karlson <endre.karlson@hp.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from oslo_log import log as logging
|
||||
|
||||
from designate.tests import TestCase
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ZoneManagerServiceTest(TestCase):
|
||||
def test_stop(self):
|
||||
# Test stopping the service
|
||||
service = self.start_service("zone_manager")
|
||||
service.stop()
|
109
designate/tests/test_zone_manager/test_tasks.py
Normal file
109
designate/tests/test_zone_manager/test_tasks.py
Normal file
@ -0,0 +1,109 @@
|
||||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Author: Endre Karlson <endre.karlson@hp.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import datetime
|
||||
import time
|
||||
|
||||
import mock
|
||||
from oslo_messaging.notify import notifier
|
||||
|
||||
from designate import objects
|
||||
from designate import utils
|
||||
from designate.zone_manager import tasks
|
||||
from designate.tests import TestCase
|
||||
|
||||
|
||||
class TaskTest(TestCase):
|
||||
def setUp(self):
|
||||
super(TaskTest, self).setUp()
|
||||
utils.register_plugin_opts()
|
||||
|
||||
def _enable_tasks(self, tasks):
|
||||
self.config(
|
||||
enabled_tasks=",".join(tasks),
|
||||
group="service:zone_manager")
|
||||
|
||||
|
||||
class PeriodicExistsTest(TaskTest):
|
||||
def setUp(self):
|
||||
super(PeriodicExistsTest, self).setUp()
|
||||
self.config(
|
||||
interval=2,
|
||||
group="zone_manager_task:periodic_exists")
|
||||
self._enable_tasks("periodic_exists")
|
||||
|
||||
def _wait_for_cond(self, condition, interval=0.5, max_attempts=20):
|
||||
attempts = 0
|
||||
while attempts < max_attempts:
|
||||
result = condition()
|
||||
if result:
|
||||
return result
|
||||
time.sleep(interval)
|
||||
attempts += 1
|
||||
raise ValueError
|
||||
|
||||
@mock.patch.object(notifier.Notifier, 'info')
|
||||
def test_emit_exists(self, mock_notifier):
|
||||
domain = self.create_domain()
|
||||
# Clear the create domain notification
|
||||
mock_notifier.reset_mock()
|
||||
|
||||
# Install our own period results
|
||||
start, end = tasks.PeriodicExistsTask._get_period(2)
|
||||
with mock.patch.object(tasks.PeriodicExistsTask, "_get_period",
|
||||
return_value=(start, end,)):
|
||||
|
||||
svc = self.start_service("zone_manager")
|
||||
result = self._wait_for_cond(
|
||||
lambda: mock_notifier.called is True, .5, 3)
|
||||
self.assertEqual(True, result)
|
||||
svc.stop()
|
||||
|
||||
# Make some notification data in the same format that the task does
|
||||
data = dict(domain)
|
||||
del data["attributes"]
|
||||
# For some reason domain.created when doing dict(domain) is a datetime
|
||||
data["created_at"] = datetime.datetime.isoformat(domain.created_at)
|
||||
data["audit_period_start"] = str(start)
|
||||
data["audit_period_end"] = str(end)
|
||||
|
||||
# .info(ctxt, event, payload)
|
||||
mock_notifier.assert_called_with(mock.ANY, "dns.domain.exists", data)
|
||||
|
||||
@mock.patch.object(notifier.Notifier, 'info')
|
||||
def test_emit_exists_no_zones(self, mock_notifier):
|
||||
self.start_service("zone_manager")
|
||||
# Since the interval is 2 seconds we wait for the call to have been
|
||||
# executed for 3 seconds
|
||||
time.sleep(2)
|
||||
self.assertEqual(False, mock_notifier.called)
|
||||
|
||||
@mock.patch.object(notifier.Notifier, 'info')
|
||||
def test_emit_exists_multiple_zones(self, mock_notifier):
|
||||
zones = []
|
||||
for i in range(0, 10):
|
||||
z = self.central_service.create_domain(
|
||||
self.admin_context,
|
||||
objects.Domain(
|
||||
name="example%s.net." % i,
|
||||
email="foo@example.com"))
|
||||
zones.append(z)
|
||||
|
||||
# Clear any notifications from create etc.
|
||||
mock_notifier.reset_mock()
|
||||
|
||||
# Start ZM so that the periodic task fires
|
||||
self.start_service("zone_manager")
|
||||
self._wait_for_cond(lambda: mock_notifier.call_count is 10)
|
@ -106,6 +106,9 @@ def register_plugin_opts():
|
||||
# Avoid circular dependency imports
|
||||
from designate import plugin
|
||||
|
||||
plugin.Plugin.register_cfg_opts('designate.zone_manager_tasks')
|
||||
plugin.Plugin.register_extra_cfg_opts('designate.zone_manager_tasks')
|
||||
|
||||
# Register Backend Plugin Config Options
|
||||
plugin.Plugin.register_cfg_opts('designate.backend')
|
||||
plugin.Plugin.register_extra_cfg_opts('designate.backend')
|
||||
|
@ -26,6 +26,8 @@ OPTS = [
|
||||
help='Number of Zone Manager worker processes to spawn'),
|
||||
cfg.IntOpt('threads', default=1000,
|
||||
help='Number of Zone Manager greenthreads to spawn'),
|
||||
cfg.ListOpt('enabled_tasks', default=None,
|
||||
help='Enabled tasks to run')
|
||||
]
|
||||
|
||||
CONF.register_opts(OPTS, group='service:zone_manager')
|
||||
|
@ -19,12 +19,14 @@ from oslo_log import log as logging
|
||||
from designate.i18n import _LI
|
||||
from designate import coordination
|
||||
from designate import service
|
||||
from designate.central import rpcapi as central_api
|
||||
from designate.zone_manager import tasks
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
NS = 'designate.periodic_tasks'
|
||||
|
||||
|
||||
class Service(coordination.CoordinationMixin, service.Service):
|
||||
def __init__(self, threads=None):
|
||||
@ -43,10 +45,18 @@ class Service(coordination.CoordinationMixin, service.Service):
|
||||
self._partitioner.start()
|
||||
self._partitioner.watch_partition_change(self._rebalance)
|
||||
|
||||
for task in tasks.PeriodicTask.get_extensions():
|
||||
LOG.debug("Registering task %s" % task)
|
||||
|
||||
# Instantiate the task
|
||||
task = task()
|
||||
|
||||
# Subscribe for partition size updates.
|
||||
self._partitioner.watch_partition_change(task.on_partition_change)
|
||||
|
||||
interval = CONF[task.get_canonical_name()].interval
|
||||
self.tg.add_timer(interval, task)
|
||||
|
||||
@property
|
||||
def service_name(self):
|
||||
return 'zone_manager'
|
||||
|
||||
@property
|
||||
def central_api(self):
|
||||
return central_api.CentralAPI.get_instance()
|
||||
|
111
designate/zone_manager/tasks.py
Normal file
111
designate/zone_manager/tasks.py
Normal file
@ -0,0 +1,111 @@
|
||||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Author: Endre Karlson <endre.karlson@hp.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import datetime
|
||||
|
||||
from designate import context
|
||||
from designate import plugin
|
||||
from designate import rpc
|
||||
from designate.central import rpcapi
|
||||
from designate.i18n import _LI
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import timeutils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PeriodicTask(plugin.ExtensionPlugin):
|
||||
__plugin_ns__ = 'designate.zone_manager_tasks'
|
||||
__plugin_type__ = 'zone_manager_task'
|
||||
__interval__ = None
|
||||
|
||||
def __init__(self):
|
||||
self.my_partitions = None
|
||||
self.options = cfg.CONF[self.get_canonical_name()]
|
||||
|
||||
@classmethod
|
||||
def get_base_opts(cls):
|
||||
return [cfg.IntOpt('interval', default=cls.__interval__)]
|
||||
|
||||
def on_partition_change(self, my_partitions, members, event):
|
||||
self.my_partitions = my_partitions
|
||||
|
||||
def _my_range(self):
|
||||
return self.my_partitions[0], self.my_partitions[-1]
|
||||
|
||||
def _filter_between(self, col):
|
||||
return {col: "BETWEEN %s,%s" % self._my_range()}
|
||||
|
||||
|
||||
class PeriodicExistsTask(PeriodicTask):
|
||||
__plugin_name__ = 'periodic_exists'
|
||||
__interval__ = 3600
|
||||
|
||||
def __init__(self):
|
||||
super(PeriodicExistsTask, self).__init__()
|
||||
self.notifier = rpc.get_notifier('zone_manager')
|
||||
|
||||
@classmethod
|
||||
def get_cfg_opts(cls):
|
||||
group = cfg.OptGroup(cls.get_canonical_name())
|
||||
options = cls.get_base_opts() + [
|
||||
cfg.IntOpt('per_page', default=100)
|
||||
]
|
||||
return [(group, options)]
|
||||
|
||||
@property
|
||||
def central_api(self):
|
||||
return rpcapi.CentralAPI.get_instance()
|
||||
|
||||
@staticmethod
|
||||
def _get_period(seconds):
|
||||
interval = datetime.timedelta(seconds=seconds)
|
||||
end = timeutils.utcnow()
|
||||
return end - interval, end
|
||||
|
||||
def __call__(self):
|
||||
pstart, pend = self._my_range()
|
||||
msg = _LI("Emitting zone exist events for %(start)s to %(end)s")
|
||||
LOG.info(msg % {"start": pstart, "end": pend})
|
||||
|
||||
ctxt = context.DesignateContext.get_admin_context()
|
||||
ctxt.all_tenants = True
|
||||
criterion = self._filter_between('shard')
|
||||
|
||||
start, end = self._get_period(self.options.interval)
|
||||
|
||||
data = {
|
||||
"audit_period_start": str(start),
|
||||
"audit_period_end": str(end)
|
||||
}
|
||||
|
||||
marker = None
|
||||
while True:
|
||||
zones = self.central_api.find_domains(ctxt, criterion,
|
||||
marker=marker,
|
||||
limit=self.options.per_page)
|
||||
if len(zones) == 0:
|
||||
LOG.info(_LI("Finished emitting events."))
|
||||
break
|
||||
else:
|
||||
marker = zones.objects[-1].id
|
||||
|
||||
for zone in zones:
|
||||
zone_data = dict(zone)
|
||||
zone_data.update(data)
|
||||
|
||||
self.notifier.info(ctxt, 'dns.domain.exists', zone_data)
|
@ -110,6 +110,9 @@ designate.manage =
|
||||
powerdns = designate.manage.powerdns:DatabaseCommands
|
||||
tlds = designate.manage.tlds:TLDCommands
|
||||
|
||||
designate.zone_manager_tasks =
|
||||
periodic_exists = designate.zone_manager.tasks:PeriodicExistsTask
|
||||
|
||||
[build_sphinx]
|
||||
all_files = 1
|
||||
build-dir = doc/build
|
||||
|
Loading…
Reference in New Issue
Block a user