Worker Model

- More information about the actual worker code can be found
  in `designate/worker/README.md` and in the inline docstrings

- Stand up a `designate-worker` process with an rpcapi, all
  the usual jazz

- Implement a base `Task` class that defines the behavior of
  a task and exposes resources to the task.

- Implement CUD Zone tasks, which includes Tasks that poll for zones,
  send Notifies, and update status. These are all done in parallel
  with threads using a shared threadpool, rather than iteratively.

- Implement a `recover_shard` task that serves the function
  of a periodic recovery, but only for a shard. Call that
  task with various shards from the zone manager.

- Put some shims in central and mdns so that the worker can
  be switched on/off with a few config values.

- Changes Zone Manager -> Producer
    - Removes zm rpcapi
    - Adds startable designate-producer service
    - Makes zone-manager an alias for producer service with a warning log
    - Lots of renaming

- Moves zone export to worker
    - API now uses central_api.export_zone to get zonefiles
    - Central uses worker_api.start_zone_export to init exports
    - Now including unit tests
    - Temporary workarounds for upgrade/migration move the logic
      into central if worker isn't available.

- Deprecates Pool manager polling options and adds warning msg on
  starting designate-pool-manager

- Get some devstack going

- Changes powerdns backend to get new sqlalchemy sessions for each
  action

- Sets the default number of threads in a worker process to 200,
  this is pretty much a shot in the dark, but 1000 seemed like
  too many, and 20 wasn't enough.

- Grenade upgrade testing

- Deprecation warnings for zone/pool mgr

The way to run this is simple, just stop `designate-pool-manager`
and `designate-zone-manager`, toggle the config settings in the
`service:worker` section: enabled = true, notify = true
and start `designate-worker` and `designate-producer` and you
should be good to go.

Change-Id: I259e9825d3a4eea58e082303ba3bdbdb7bf8c363
This commit is contained in:
Tim Simmons 2016-02-12 23:24:05 +00:00
parent addc6b0df8
commit 81ce132e90
47 changed files with 2774 additions and 464 deletions

View File

@ -47,8 +47,7 @@ cfg.CONF.register_opts([
cfg.StrOpt('mdns-topic', default='mdns', help='mDNS Topic'),
cfg.StrOpt('pool-manager-topic', default='pool_manager',
help='Pool Manager Topic'),
cfg.StrOpt('zone-manager-topic', default='zone_manager',
help='Zone Manager Topic'),
cfg.StrOpt('worker-topic', default='worker', help='Worker Topic'),
# Default TTL
cfg.IntOpt('default-ttl', default=3600),

View File

@ -34,7 +34,6 @@ from oslo_log import log as logging
from designate import exceptions
from designate.central import rpcapi as central_rpcapi
from designate.pool_manager import rpcapi as pool_mgr_rpcapi
from designate.zone_manager import rpcapi as zone_manager_rpcapi
from designate.i18n import _
@ -60,10 +59,6 @@ class RestController(pecan.rest.RestController):
def pool_mgr_api(self):
return pool_mgr_rpcapi.PoolManagerAPI.get_instance()
@property
def zone_manager_api(self):
return zone_manager_rpcapi.ZoneManagerAPI.get_instance()
def _apply_filter_params(self, params, accepted_filters, criterion):
invalid=[]
for k in params:

View File

@ -39,8 +39,8 @@ class ZoneExportController(rest.RestController):
export = self.central_api.get_zone_export(context, export_id)
if export.location and export.location.startswith('designate://'):
return self.zone_manager_api.\
render_zone(context, export['zone_id'])
return self.central_api.\
export_zone(context, export['zone_id'])
else:
msg = 'Zone can not be exported synchronously'
raise exceptions.BadRequest(msg)

View File

@ -66,38 +66,29 @@ class PowerDNSBackend(base.Backend):
self.connection = self.options.get('connection', default_connection)
@property
def session(self):
# NOTE: This uses a thread local store, allowing each greenthread to
# have it's own session stored correctly. Without this, each
# greenthread may end up using a single global session, which
# leads to bad things happening.
if not hasattr(self.local_store, 'session'):
self.local_store.session = session.get_session(
self.name, self.connection, self.target.id)
def get_session(self):
return session.get_session(self.name, self.connection, self.target.id)
return self.local_store.session
def _create(self, table, values):
def _create(self, sess, table, values):
query = table.insert()
resultproxy = self.session.execute(query, values)
resultproxy = sess.execute(query, values)
# Refetch the row, for generated columns etc
query = select([table])\
.where(table.c.id == resultproxy.inserted_primary_key[0])
resultproxy = self.session.execute(query)
resultproxy = sess.execute(query)
return _map_col(query.columns.keys(), resultproxy.fetchone())
def _get(self, table, id_, exc_notfound, id_col=None):
def _get(self, sess, table, id_, exc_notfound, id_col=None):
if id_col is None:
id_col = table.c.id
query = select([table])\
.where(id_col == id_)
resultproxy = self.session.execute(query)
resultproxy = sess.execute(query)
results = resultproxy.fetchall()
@ -107,22 +98,25 @@ class PowerDNSBackend(base.Backend):
# Map col keys to values in result
return _map_col(query.columns.keys(), results[0])
def _delete(self, table, id_, exc_notfound, id_col=None):
def _delete(self, sess, table, id_, exc_notfound, id_col=None):
if id_col is None:
id_col = table.c.id
query = table.delete()\
.where(id_col == id_)
resultproxy = self.session.execute(query)
resultproxy = sess.execute(query)
if resultproxy.rowcount != 1:
raise exc_notfound()
# Zone Methods
def create_zone(self, context, zone):
# Get a new session
sess = self.get_session()
try:
self.session.begin()
sess.begin()
def _parse_master(master):
return '%s:%d' % (master.host, master.port)
@ -136,7 +130,7 @@ class PowerDNSBackend(base.Backend):
'account': context.tenant
}
self._create(tables.domains, domain_values)
self._create(sess, tables.domains, domain_values)
except DBDuplicateEntry:
LOG.debug('Successful create of %s in pdns, zone already exists'
% zone['name'])
@ -144,19 +138,27 @@ class PowerDNSBackend(base.Backend):
pass
except Exception:
with excutils.save_and_reraise_exception():
self.session.rollback()
sess.rollback()
else:
self.session.commit()
sess.commit()
self.mdns_api.notify_zone_changed(
context, zone, self.host, self.port, self.timeout,
self.retry_interval, self.max_retries, self.delay)
def delete_zone(self, context, zone):
# TODO(kiall): We should make this match create_zone with regard to
# transactions.
# Get a new session
sess = self.get_session()
try:
self._get(tables.domains, zone['id'], exceptions.ZoneNotFound,
sess.begin()
self._get(sess, tables.domains, zone['id'],
exceptions.ZoneNotFound,
id_col=tables.domains.c.designate_id)
self._delete(sess, tables.domains, zone['id'],
exceptions.ZoneNotFound,
id_col=tables.domains.c.designate_id)
except exceptions.ZoneNotFound:
# If the Zone is already gone, that's ok. We're deleting it
@ -165,7 +167,8 @@ class PowerDNSBackend(base.Backend):
'not present in the backend. ID: %s') %
zone['id'])
return
self._delete(tables.domains, zone['id'],
exceptions.ZoneNotFound,
id_col=tables.domains.c.designate_id)
except Exception:
with excutils.save_and_reraise_exception():
sess.rollback()
else:
sess.commit()

View File

@ -52,7 +52,7 @@ from designate import storage
from designate.mdns import rpcapi as mdns_rpcapi
from designate.pool_manager import rpcapi as pool_manager_rpcapi
from designate.storage import transaction
from designate.zone_manager import rpcapi as zone_manager_rpcapi
from designate.worker import rpcapi as worker_rpcapi
LOG = logging.getLogger(__name__)
@ -253,8 +253,15 @@ class Service(service.RPCService, service.Service):
return pool_manager_rpcapi.PoolManagerAPI.get_instance()
@property
def zone_manager_api(self):
return zone_manager_rpcapi.ZoneManagerAPI.get_instance()
def worker_api(self):
return worker_rpcapi.WorkerAPI.get_instance()
@property
def zone_api(self):
# TODO(timsim): Remove this when pool_manager_api is gone
if cfg.CONF['service:worker'].enabled:
return self.worker_api
return self.pool_manager_api
def _is_valid_zone_name(self, context, zone_name):
# Validate zone name length
@ -898,7 +905,7 @@ class Service(service.RPCService, service.Service):
zone = self._create_zone_in_storage(context, zone)
self.pool_manager_api.create_zone(context, zone)
self.zone_api.create_zone(context, zone)
if zone.type == 'SECONDARY':
self.mdns_api.perform_zone_xfr(context, zone)
@ -1038,7 +1045,7 @@ class Service(service.RPCService, service.Service):
if 'masters' in changes:
self.mdns_api.perform_zone_xfr(context, zone)
self.pool_manager_api.update_zone(context, zone)
self.zone_api.update_zone(context, zone)
return zone
@ -1093,7 +1100,7 @@ class Service(service.RPCService, service.Service):
zone = self.storage.delete_zone(context, zone.id)
else:
zone = self._delete_zone_in_storage(context, zone)
self.pool_manager_api.delete_zone(context, zone)
self.zone_api.delete_zone(context, zone)
return zone
@ -1208,7 +1215,7 @@ class Service(service.RPCService, service.Service):
self._touch_zone_in_storage(context, zone)
self.pool_manager_api.update_zone(context, zone)
self.zone_api.update_zone(context, zone)
return zone
@ -1243,7 +1250,7 @@ class Service(service.RPCService, service.Service):
recordset, zone = self._create_recordset_in_storage(
context, zone, recordset, increment_serial=increment_serial)
self.pool_manager_api.update_zone(context, zone)
self.zone_api.update_zone(context, zone)
recordset.zone_name = zone.name
recordset.obj_reset_changes(['zone_name'])
@ -1405,7 +1412,7 @@ class Service(service.RPCService, service.Service):
recordset, zone = self._update_recordset_in_storage(
context, zone, recordset, increment_serial=increment_serial)
self.pool_manager_api.update_zone(context, zone)
self.zone_api.update_zone(context, zone)
return recordset
@ -1468,7 +1475,7 @@ class Service(service.RPCService, service.Service):
recordset, zone = self._delete_recordset_in_storage(
context, zone, recordset, increment_serial=increment_serial)
self.pool_manager_api.update_zone(context, zone)
self.zone_api.update_zone(context, zone)
recordset.zone_name = zone.name
recordset.obj_reset_changes(['zone_name'])
@ -1536,7 +1543,7 @@ class Service(service.RPCService, service.Service):
context, zone, recordset, record,
increment_serial=increment_serial)
self.pool_manager_api.update_zone(context, zone)
self.zone_api.update_zone(context, zone)
return record
@ -1647,7 +1654,7 @@ class Service(service.RPCService, service.Service):
record, zone = self._update_record_in_storage(
context, zone, record, increment_serial=increment_serial)
self.pool_manager_api.update_zone(context, zone)
self.zone_api.update_zone(context, zone)
return record
@ -1708,7 +1715,7 @@ class Service(service.RPCService, service.Service):
record, zone = self._delete_record_in_storage(
context, zone, record, increment_serial=increment_serial)
self.pool_manager_api.update_zone(context, zone)
self.zone_api.update_zone(context, zone)
return record
@ -1786,7 +1793,7 @@ class Service(service.RPCService, service.Service):
policy.check('diagnostics_sync_record', context, target)
self.pool_manager_api.update_zone(context, zone)
self.zone_api.update_zone(context, zone)
def ping(self, context):
policy.check('diagnostics_ping', context)
@ -2769,9 +2776,40 @@ class Service(service.RPCService, service.Service):
created_zone_export = self.storage.create_zone_export(context,
zone_export)
if not cfg.CONF['service:worker'].enabled:
# So that we can maintain asynch behavior during the time that this
# lives in central, we'll return the 'PENDING' object, and then the
# 'COMPLETE'/'ERROR' status will be available on the first poll.
export = copy.deepcopy(created_zone_export)
self.zone_manager_api.start_zone_export(context, zone,
created_zone_export)
synchronous = cfg.CONF['service:zone_manager'].export_synchronous
criterion = {'zone_id': zone_id}
count = self.storage.count_recordsets(context, criterion)
if synchronous:
try:
self.quota.limit_check(
context, context.tenant, api_export_size=count)
except exceptions.OverQuota:
LOG.debug('Zone Export too large to perform synchronously')
export.status = 'ERROR'
export.message = 'Zone is too large to export'
return export
export.location = \
"designate://v2/zones/tasks/exports/%(eid)s/export" % \
{'eid': export.id}
export.status = 'COMPLETE'
else:
LOG.debug('No method found to export zone')
export.status = 'ERROR'
export.message = 'No suitable method for export'
self.update_zone_export(context, export)
else:
export = copy.deepcopy(created_zone_export)
self.worker_api.start_zone_export(context, zone, export)
return created_zone_export

View File

@ -19,6 +19,8 @@ from oslo_config import cfg
from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
from designate.i18n import _LE
from designate.i18n import _LW
from designate import service
from designate import utils
from designate import version
@ -26,6 +28,7 @@ from designate import hookpoints
from designate.pool_manager import service as pool_manager_service
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.import_opt('workers', 'designate.pool_manager',
group='service:pool_manager')
@ -39,6 +42,19 @@ def main():
logging.setup(CONF, 'designate')
gmr.TextGuruMeditation.setup_autorun(version)
# NOTE(timsim): This is to ensure people don't start the wrong
# services when the worker model is enabled.
if cfg.CONF['service:worker'].enabled:
LOG.error(_LE('You have designate-worker enabled, starting '
'designate-pool-manager is incompatible with '
'designate-worker. You need to start '
'designate-worker instead.'))
sys.exit(1)
LOG.warning(_LW('designate-pool-manager is DEPRECATED in favor of '
'designate-worker and will be removed during the Ocata '
'cycle'))
server = pool_manager_service.Service(
threads=CONF['service:pool_manager'].threads
)

53
designate/cmd/producer.py Normal file
View File

@ -0,0 +1,53 @@
# Copyright 2016 Rackspace Inc.
#
# Author: Tim Simmons <tim.simmons@rackspace.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from oslo_config import cfg
from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
from designate.i18n import _LE
from designate import hookpoints
from designate import service
from designate import utils
from designate import version
from designate.producer import service as producer_service
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.import_opt('workers', 'designate.producer', group='service:producer')
CONF.import_opt('threads', 'designate.producer', group='service:producer')
def main():
utils.read_config('designate', sys.argv)
logging.setup(CONF, 'designate')
gmr.TextGuruMeditation.setup_autorun(version)
# NOTE(timsim): This is to ensure people don't start the wrong
# services when the worker model is enabled.
if not cfg.CONF['service:worker'].enabled:
LOG.error(_LE('You do not have designate-worker enabled, starting '
'designate-producer is not allowed. '
'You need to start designate-zone-manager instead.'))
sys.exit(1)
hookpoints.log_hook_setup()
server = producer_service.Service(threads=CONF['service:producer'].threads)
service.serve(server, workers=CONF['service:producer'].workers)
service.wait()

53
designate/cmd/worker.py Normal file
View File

@ -0,0 +1,53 @@
# Copyright 2016 Rackspace Inc.
#
# Author: Tim Simmons <tim.simmons@rackspace.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from oslo_config import cfg
from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
from designate.i18n import _LE
from designate import hookpoints
from designate import service
from designate import utils
from designate import version
from designate.worker import service as worker_service
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.import_opt('workers', 'designate.worker', group='service:worker')
CONF.import_opt('threads', 'designate.worker', group='service:worker')
def main():
utils.read_config('designate', sys.argv)
logging.setup(CONF, 'designate')
gmr.TextGuruMeditation.setup_autorun(version)
# NOTE(timsim): This is to ensure people don't start the wrong
# services when the worker model is enabled.
if not cfg.CONF['service:worker'].enabled:
LOG.error(_LE('You do not have designate-worker enabled, starting '
'designate-worker is not allowed. '
'You need to start designate-pool-manager instead.'))
sys.exit(1)
hookpoints.log_hook_setup()
server = worker_service.Service(threads=CONF['service:worker'].threads)
service.serve(server, workers=CONF['service:worker'].workers)
service.wait()

View File

@ -19,16 +19,19 @@ from oslo_config import cfg
from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
from designate.i18n import _LE
from designate.i18n import _LW
from designate import service
from designate import utils
from designate import version
from designate.zone_manager import service as zone_manager_service
from designate.producer import service as producer_service
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.import_opt('workers', 'designate.zone_manager',
CONF.import_opt('workers', 'designate.producer',
group='service:zone_manager')
CONF.import_opt('threads', 'designate.zone_manager',
CONF.import_opt('threads', 'designate.producer',
group='service:zone_manager')
@ -37,7 +40,20 @@ def main():
logging.setup(CONF, 'designate')
gmr.TextGuruMeditation.setup_autorun(version)
server = zone_manager_service.Service(
# NOTE(timsim): This is to ensure people don't start the wrong
# services when the worker model is enabled.
if cfg.CONF['service:worker'].enabled:
LOG.error(_LE('You have designate-worker enabled, starting '
'designate-zone-manager is incompatible with '
'designate-worker. You need to start '
'designate-producer instead.'))
sys.exit(1)
LOG.warning(_LW('designate-zone-manager is DEPRECATED in favor of '
'designate-producer, starting designate-producer '
'under the zone-manager name'))
server = producer_service.Service(
threads=CONF['service:zone_manager'].threads)
service.serve(server, workers=CONF['service:zone_manager'].workers)
service.wait()

View File

@ -20,6 +20,7 @@ from designate.i18n import _LI
from designate import rpc
from designate.loggingutils import rpc_logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
MDNS_API = None
@ -77,6 +78,10 @@ class MdnsAPI(object):
def notify_zone_changed(self, context, zone, host, port, timeout,
retry_interval, max_retries, delay):
if CONF['service:worker'].notify and CONF['service:worker'].enabled:
LOG.debug('Letting worker send NOTIFYs instead')
return True
LOG.info(_LI("notify_zone_changed: Calling mdns for zone '%(zone)s', "
"serial '%(serial)s' to nameserver '%(host)s:%(port)s'"),
{'zone': zone.name, 'serial': zone.serial,

View File

@ -48,6 +48,7 @@ class PoolTarget(base.DictObjectMixin, base.PersistentObjectMixin,
'relation': True,
'relation_cls': 'PoolTargetOptionList'
},
'backend': {}
}
STRING_KEYS = [

View File

@ -31,18 +31,28 @@ OPTS = [
'Pool Manager'),
cfg.IntOpt('threshold-percentage', default=100,
help='The percentage of servers requiring a successful update '
'for a zone change to be considered active'),
'for a zone change to be considered active',
deprecated_for_removal=True,
deprecated_reason='Migrated to designate-worker'),
cfg.IntOpt('poll-timeout', default=30,
help='The time to wait for a response from a server'),
help='The time to wait for a response from a server',
deprecated_for_removal=True,
deprecated_reason='Migrated to designate-worker'),
cfg.IntOpt('poll-retry-interval', default=15,
help='The time between retrying to send a request and '
'waiting for a response from a server'),
'waiting for a response from a server',
deprecated_for_removal=True,
deprecated_reason='Migrated to designate-worker'),
cfg.IntOpt('poll-max-retries', default=10,
help='The maximum number of times to retry sending a request '
'and wait for a response from a server'),
'and wait for a response from a server',
deprecated_for_removal=True,
deprecated_reason='Migrated to designate-worker'),
cfg.IntOpt('poll-delay', default=5,
help='The time to wait before sending the first request '
'to a server'),
'to a server',
deprecated_for_removal=True,
deprecated_reason='Migrated to designate-worker'),
cfg.BoolOpt('enable-recovery-timer', default=True,
help='The flag for the recovery timer'),
cfg.IntOpt('periodic-recovery-interval', default=120,

View File

@ -113,9 +113,8 @@ class Service(service.RPCService, coordination.CoordinationMixin,
CONF['service:pool_manager'].periodic_sync_retry_interval
# Compute a time (seconds) by which things should have propagated
self.max_prop_time = (self.timeout * self.max_retries +
self.max_retries * self.retry_interval +
self.delay)
self.max_prop_time = utils.max_prop_time(self.timeout,
self.max_retries, self.retry_interval, self.delay)
def _setup_target_backends(self):
self.target_backends = {}
@ -147,7 +146,6 @@ class Service(service.RPCService, coordination.CoordinationMixin,
return topic
def start(self):
# Build the Pool (and related) Object from Config
context = DesignateContext.get_admin_context()
pool_id = CONF['service:pool_manager'].pool_id

View File

@ -0,0 +1,69 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hpe.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
CONF = cfg.CONF
CONF.register_group(cfg.OptGroup(
name='service:producer', title="Configuration for Producer Service"
))
OPTS = [
cfg.IntOpt('workers',
help='Number of Producer worker processes to spawn'),
cfg.IntOpt('threads', default=1000,
help='Number of Producer greenthreads to spawn'),
cfg.ListOpt('enabled_tasks',
help='Enabled tasks to run'),
cfg.StrOpt('storage-driver', default='sqlalchemy',
help='The storage driver to use'),
cfg.BoolOpt('export-synchronous', default=True,
help='Whether to allow synchronous zone exports',
deprecated_for_removal=True,
deprecated_reason='Migrated to designate-worker'),
]
CONF.register_opts(OPTS, group='service:producer')
# TODO(timsim): Remove these when zone-manager is removed
CONF.register_group(cfg.OptGroup(
name='service:zone_manager', title="Configuration for Zone Manager Service"
))
ZONEMGROPTS = [
cfg.IntOpt('workers',
help='Number of Zone Manager worker processes to spawn',
deprecated_for_removal=True,
deprecated_reason='Migrated to designate-worker'),
cfg.IntOpt('threads', default=1000,
help='Number of Zone Manager greenthreads to spawn',
deprecated_for_removal=True,
deprecated_reason='Migrated to designate-worker'),
cfg.ListOpt('enabled_tasks',
help='Enabled tasks to run',
deprecated_for_removal=True,
deprecated_reason='Migrated to designate-worker'),
cfg.StrOpt('storage-driver', default='sqlalchemy',
help='The storage driver to use',
deprecated_for_removal=True,
deprecated_reason='Migrated to designate-worker'),
cfg.BoolOpt('export-synchronous', default=True,
help='Whether to allow synchronous zone exports',
deprecated_for_removal=True,
deprecated_reason='Migrated to designate-worker'),
]
CONF.register_opts(ZONEMGROPTS, group='service:zone_manager')

View File

@ -0,0 +1,97 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hpe.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from designate.i18n import _LI
from designate import coordination
from designate import quota
from designate import service
from designate import storage
from designate.central import rpcapi
from designate.producer import tasks
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
NS = 'designate.periodic_tasks'
class Service(service.RPCService, coordination.CoordinationMixin,
service.Service):
RPC_API_VERSION = '1.0'
target = messaging.Target(version=RPC_API_VERSION)
@property
def storage(self):
if not hasattr(self, '_storage'):
# TODO(timsim): Remove this when zone_mgr goes away
storage_driver = cfg.CONF['service:zone_manager'].storage_driver
if cfg.CONF['service:producer'].storage_driver != storage_driver:
storage_driver = cfg.CONF['service:producer'].storage_driver
self._storage = storage.get_storage(storage_driver)
return self._storage
@property
def quota(self):
if not hasattr(self, '_quota'):
# Get a quota manager instance
self._quota = quota.get_quota()
return self._quota
@property
def service_name(self):
return 'producer'
@property
def central_api(self):
return rpcapi.CentralAPI.get_instance()
def start(self):
super(Service, self).start()
self._partitioner = coordination.Partitioner(
self._coordinator, self.service_name, self._coordination_id,
range(0, 4095))
self._partitioner.start()
self._partitioner.watch_partition_change(self._rebalance)
# TODO(timsim): Remove this when zone_mgr goes away
zmgr_enabled_tasks = CONF['service:zone_manager'].enabled_tasks
producer_enabled_tasks = CONF['service:producer'].enabled_tasks
enabled = zmgr_enabled_tasks
if producer_enabled_tasks != []:
enabled = producer_enabled_tasks
for task in tasks.PeriodicTask.get_extensions(enabled):
LOG.debug("Registering task %s", task)
# Instantiate the task
task = task()
# Subscribe for partition size updates.
self._partitioner.watch_partition_change(task.on_partition_change)
interval = CONF[task.get_canonical_name()].interval
self.tg.add_timer(interval, task)
def _rebalance(self, my_partitions, members, event):
LOG.info(_LI("Received rebalance event %s"), event)
self.partition_range = my_partitions

View File

@ -19,8 +19,9 @@ from designate import context
from designate import plugin
from designate import rpc
from designate.central import rpcapi
from designate.worker import rpcapi as worker_rpcapi
from designate.pool_manager import rpcapi as pool_manager_rpcapi
from designate.i18n import _LI
from designate.pool_manager.rpcapi import PoolManagerAPI
from oslo_config import cfg
from oslo_log import log as logging
@ -30,10 +31,10 @@ LOG = logging.getLogger(__name__)
class PeriodicTask(plugin.ExtensionPlugin):
"""Abstract Zone Manager periodic task
"""Abstract Producer periodic task
"""
__plugin_ns__ = 'designate.zone_manager_tasks'
__plugin_type__ = 'zone_manager_task'
__plugin_ns__ = 'designate.producer_tasks'
__plugin_type__ = 'producer_task'
__interval__ = None
def __init__(self):
@ -56,6 +57,21 @@ class PeriodicTask(plugin.ExtensionPlugin):
def central_api(self):
return rpcapi.CentralAPI.get_instance()
@property
def worker_api(self):
return worker_rpcapi.WorkerAPI.get_instance()
@property
def pool_manager_api(self):
return pool_manager_rpcapi.PoolManagerAPI.get_instance()
@property
def zone_api(self):
# TODO(timsim): Remove this when pool_manager_api is gone
if cfg.CONF['service:worker'].enabled:
return self.worker_api
return self.pool_manager_api
def on_partition_change(self, my_partitions, members, event):
"""Refresh partitions attribute
"""
@ -153,7 +169,7 @@ class PeriodicExistsTask(PeriodicTask):
def __init__(self):
super(PeriodicExistsTask, self).__init__()
self.notifier = rpc.get_notifier('zone_manager')
self.notifier = rpc.get_notifier('producer')
@classmethod
def get_cfg_opts(cls):
@ -211,7 +227,7 @@ class PeriodicSecondaryRefreshTask(PeriodicTask):
def __call__(self):
pstart, pend = self._my_range()
msg = _LI("Refreshing zones between for %(start)s to %(end)s")
msg = _LI("Refreshing zones for shards %(start)s to %(end)s")
LOG.info(msg, {"start": pstart, "end": pend})
ctxt = context.DesignateContext.get_admin_context()
@ -269,7 +285,7 @@ class PeriodicGenerateDelayedNotifyTask(PeriodicTask):
def __call__(self):
"""Fetch a list of zones with the delayed_notify flag set up to
"batch_size"
Call Pool Manager to emit NOTIFY transactions,
Call Worker to emit NOTIFY transactions,
Reset the flag.
"""
pstart, pend = self._my_range()
@ -293,8 +309,38 @@ class PeriodicGenerateDelayedNotifyTask(PeriodicTask):
msg = _LI("Performing delayed NOTIFY for %(start)s to %(end)s: %(n)d")
LOG.debug(msg % dict(start=pstart, end=pend, n=len(zones)))
pm_api = PoolManagerAPI.get_instance()
for z in zones:
pm_api.update_zone(ctxt, z)
self.zone_api.update_zone(ctxt, z)
z.delayed_notify = False
self.central_api.update_zone(ctxt, z)
class WorkerPeriodicRecovery(PeriodicTask):
__plugin_name__ = 'worker_periodic_recovery'
__interval__ = 120
@classmethod
def get_cfg_opts(cls):
group = cfg.OptGroup(cls.get_canonical_name())
options = cls.get_base_opts() + [
cfg.IntOpt(
'interval',
default=cls.__interval__,
help='Run interval in seconds'
),
]
return [(group, options)]
def __call__(self):
# TODO(timsim): Remove this when worker is always on
if not cfg.CONF['service:worker'].enabled:
return
pstart, pend = self._my_range()
msg = _LI("Recovering zones for shards %(start)s to %(end)s")
LOG.info(msg, {"start": pstart, "end": pend})
ctxt = context.DesignateContext.get_admin_context()
ctxt.all_tenants = True
self.worker_api.recover_shard(ctxt, pstart, pend)

View File

@ -52,10 +52,12 @@ class PowerDNSBackendTestCase(BackendTestCase):
self.assertEqual(commit, session_mock.commit.call_count)
self.assertEqual(rollback, session_mock.rollback.call_count)
# Tests for Public Methpds
@mock.patch.object(impl_powerdns.PowerDNSBackend, 'session',
new_callable=mock.MagicMock)
def test_create_zone(self, session_mock):
# Tests for Public Methods
@mock.patch.object(impl_powerdns.PowerDNSBackend, 'get_session')
def test_create_zone(self, get_session_mock):
session_mock = mock.MagicMock()
get_session_mock.return_value = session_mock
context = self.get_context()
self.backend.create_zone(context, self.zone)
@ -80,11 +82,14 @@ class PowerDNSBackendTestCase(BackendTestCase):
session_mock.execute.call_args_list[1][0][0],
sqlalchemy.sql.selectable.Select)
@mock.patch.object(impl_powerdns.PowerDNSBackend, 'session',
new_callable=mock.Mock)
@mock.patch.object(impl_powerdns.PowerDNSBackend, 'get_session')
@mock.patch.object(impl_powerdns.PowerDNSBackend, '_create',
side_effect=Exception)
def test_create_zone_failure_on_create(self, create_mock, session_mock):
def test_create_zone_failure_on_create(self, create_mock,
get_session_mock):
session_mock = mock.MagicMock()
get_session_mock.return_value = session_mock
with testtools.ExpectedException(Exception):
self.backend.create_zone(self.get_context(), self.zone)
@ -94,11 +99,14 @@ class PowerDNSBackendTestCase(BackendTestCase):
# Ensure we called out into the _create method exactly once
self.assertEqual(1, create_mock.call_count)
@mock.patch.object(impl_powerdns.PowerDNSBackend, 'session',
new_callable=mock.Mock)
@mock.patch.object(impl_powerdns.PowerDNSBackend, 'get_session')
@mock.patch.object(impl_powerdns.PowerDNSBackend, '_create',
return_value=None)
def test_create_zone_failure_on_commit(self, create_mock, session_mock):
def test_create_zone_failure_on_commit(self, create_mock,
get_session_mock):
session_mock = mock.MagicMock()
get_session_mock.return_value = session_mock
# Configure the Session mocks's commit method to raise an exception
session_mock.commit.side_effect = Exception
@ -111,11 +119,13 @@ class PowerDNSBackendTestCase(BackendTestCase):
# Ensure we called out into the _create method exactly once
self.assertEqual(1, create_mock.call_count)
@mock.patch.object(impl_powerdns.PowerDNSBackend, 'session',
new_callable=mock.Mock)
@mock.patch.object(impl_powerdns.PowerDNSBackend, 'get_session')
@mock.patch.object(impl_powerdns.PowerDNSBackend, '_get',
return_value=None)
def test_delete_zone(self, get_mock, session_mock):
def test_delete_zone(self, get_mock, get_session_mock):
session_mock = mock.MagicMock()
get_session_mock.return_value = session_mock
# Configure the Session mocks's execute method to return a fudged
# resultproxy.
rp_mock = mock.Mock()
@ -128,8 +138,8 @@ class PowerDNSBackendTestCase(BackendTestCase):
# Ensure the _get method was called with the correct arguments
get_mock.assert_called_once_with(
tables.domains, self.zone.id, exceptions.ZoneNotFound,
id_col=tables.domains.c.designate_id)
session_mock, tables.domains, self.zone.id,
exceptions.ZoneNotFound, id_col=tables.domains.c.designate_id)
# Ensure we have one query, a DELETE
self.assertEqual(1, session_mock.execute.call_count)
@ -140,21 +150,23 @@ class PowerDNSBackendTestCase(BackendTestCase):
# TODO(kiall): Validate the ID being deleted
@mock.patch.object(impl_powerdns.PowerDNSBackend, 'session',
new_callable=mock.Mock)
@mock.patch.object(impl_powerdns.PowerDNSBackend, 'get_session')
@mock.patch.object(impl_powerdns.PowerDNSBackend, '_get',
side_effect=exceptions.ZoneNotFound)
@mock.patch.object(impl_powerdns.PowerDNSBackend, '_delete',
return_value=None)
def test_delete_zone_zone_not_found(self, delete_mock, get_mock,
session_mock):
get_session_mock):
session_mock = mock.MagicMock()
get_session_mock.return_value = session_mock
context = self.get_context()
self.backend.delete_zone(context, self.zone)
# Ensure the _get method was called with the correct arguments
get_mock.assert_called_once_with(
tables.domains, self.zone.id, exceptions.ZoneNotFound,
id_col=tables.domains.c.designate_id)
session_mock, tables.domains, self.zone.id,
exceptions.ZoneNotFound, id_col=tables.domains.c.designate_id)
# Ensure the _delete method was not called
self.assertFalse(delete_mock.called)

View File

@ -20,8 +20,8 @@ from designate.tests import TestCase
LOG = logging.getLogger(__name__)
class ZoneManagerServiceTest(TestCase):
class ProducerServiceTest(TestCase):
def test_stop(self):
# Test stopping the service
service = self.start_service("zone_manager")
service = self.start_service("producer")
service.stop()

View File

@ -16,17 +16,13 @@
import datetime
from mock import MagicMock
from oslo_log import log as logging
from oslo_utils import timeutils
from designate.pool_manager.rpcapi import PoolManagerAPI
from designate.storage.impl_sqlalchemy import tables
from designate.tests import TestCase
from designate.tests import fixtures
from designate.zone_manager import tasks
from fixtures import MockPatch
from designate.producer import tasks
LOG = logging.getLogger(__name__)
@ -39,7 +35,7 @@ class TaskTest(TestCase):
def _enable_tasks(self, tasks):
self.config(
enabled_tasks=tasks,
group="service:zone_manager")
group="service:producer")
class DeletedzonePurgeTest(TaskTest):
@ -50,7 +46,7 @@ class DeletedzonePurgeTest(TaskTest):
interval=3600,
time_threshold=604800,
batch_size=100,
group="zone_manager_task:zone_purge"
group="producer_task:zone_purge"
)
self.purge_task_fixture = self.useFixture(
@ -99,7 +95,7 @@ class DeletedzonePurgeTest(TaskTest):
return zones
def test_purge_zones(self):
# Create 18 zones, run zone_manager, check if 7 zones are remaining
# Create 18 zones, run producer, check if 7 zones are remaining
self.config(quota_zones=1000)
self._create_deleted_zones()
@ -110,14 +106,6 @@ class DeletedzonePurgeTest(TaskTest):
self.assertEqual(7, len(zones))
fx_pool_manager = MockPatch(
'designate.zone_manager.tasks.PoolManagerAPI.get_instance',
MagicMock(spec_set=[
'update_zone',
])
)
class PeriodicGenerateDelayedNotifyTaskTest(TaskTest):
def setUp(self):
@ -126,7 +114,7 @@ class PeriodicGenerateDelayedNotifyTaskTest(TaskTest):
self.config(
interval=5,
batch_size=100,
group="zone_manager_task:delayed_notify"
group="producer_task:delayed_notify"
)
self.generate_delayed_notify_task_fixture = self.useFixture(
@ -158,31 +146,21 @@ class PeriodicGenerateDelayedNotifyTaskTest(TaskTest):
self.config(
interval=1,
batch_size=5,
group="zone_manager_task:delayed_notify"
group="producer_task:delayed_notify"
)
self._create_zones()
zones = self._fetch_zones(tables.zones.select().where(
tables.zones.c.delayed_notify == True)) # nopep8
self.assertEqual(10, len(zones))
# Run the task and check if it reset the delayed_notify flag
with fx_pool_manager:
pm_api = PoolManagerAPI.get_instance()
pm_api.update_zone.reset_mock()
self.generate_delayed_notify_task_fixture.task()
self.assertEqual(10, pm_api.update_zone.call_count)
zones = self._fetch_zones(tables.zones.select().where(
tables.zones.c.delayed_notify == True)) # nopep8
self.assertEqual(5, len(zones))
# Run the task and check if it reset the delayed_notify flag
with fx_pool_manager:
self.generate_delayed_notify_task_fixture.task()
pm_api = PoolManagerAPI.get_instance()
self.assertEqual(20, pm_api.update_zone.call_count)
zones = self._fetch_zones(tables.zones.select().where(
tables.zones.c.delayed_notify == True)) # nopep8

View File

@ -0,0 +1,30 @@
# Copyright 2016 Rackspace Inc.
#
# Author: Eric Larson <eric.larson@rackspace.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.mport threading
from unittest import TestCase
from designate.worker.tasks import base
class TestTask(TestCase):
def setUp(self):
self.task = base.Task(None)
def test_constructor(self):
assert self.task
def test_call(self):
self.assertRaises(NotImplementedError, self.task)

View File

@ -0,0 +1,46 @@
# Copyright 2016 Rackspace Inc.
#
# Author: Eric Larson <eric.larson@rackspace.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.mport threading
from unittest import TestCase
from designate.worker import processing
class TestProcessingExecutor(TestCase):
def test_execute_multiple_tasks(self):
def t1():
return 1
def t2():
return 2
tasks = [t1, t2, t1, t2, t1]
exe = processing.Executor()
results = exe.run(tasks)
assert results == [1, 2, 1, 2, 1]
def test_execute_single_task(self):
def t1():
return 1
def t2():
return 2
exe = processing.Executor()
results = exe.run(t1)
assert results == [1]

View File

@ -0,0 +1,99 @@
# Copyright 2016 Rackspace Inc.
#
# Author: Eric Larson <eric.larson@rackspace.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.mport threading
from unittest import TestCase
import mock
from designate.worker import service
class TestService(TestCase):
def setUp(self):
self.context = mock.Mock()
self.zone = mock.Mock()
self.service = service.Service()
def test_create_zone(self):
self.service._do_zone_action = mock.Mock()
self.service.create_zone(self.context, self.zone)
self.service._do_zone_action.assert_called_with(
self.context, self.zone
)
def test_delete_zone(self):
self.service._do_zone_action = mock.Mock()
self.service.delete_zone(self.context, self.zone)
self.service._do_zone_action.assert_called_with(
self.context, self.zone
)
def test_update_zone(self):
self.service._do_zone_action = mock.Mock()
self.service.update_zone(self.context, self.zone)
self.service._do_zone_action.assert_called_with(
self.context, self.zone
)
@mock.patch.object(service.zonetasks, 'ZoneAction')
def test_do_zone_action(self, ZoneAction):
self.service._executor = mock.Mock()
self.service._pool = mock.Mock()
self.service.get_pool = mock.Mock()
pool = mock.Mock()
self.service.get_pool.return_value = pool
self.service._do_zone_action(self.context, self.zone)
ZoneAction.assert_called_with(
self.service.executor,
self.context,
pool,
self.zone,
self.zone.action
)
self.service._executor.run.assert_called_with(ZoneAction())
def test_get_pool(self):
pool = mock.Mock()
self.service.load_pool = mock.Mock()
self.service.load_pool.return_value = pool
self.service._pools_map = {'1': pool}
assert self.service.get_pool('1') == pool
assert self.service.get_pool('2') == pool
@mock.patch.object(service.zonetasks, 'RecoverShard')
def test_recover_shard(self, RecoverShard):
self.service._executor = mock.Mock()
self.service._pool = mock.Mock()
self.service.recover_shard(self.context, 1, 10)
RecoverShard.assert_called_with(
self.service.executor,
self.context,
1, 10
)
self.service.executor.run.assert_called_with(RecoverShard())

View File

@ -0,0 +1,574 @@
# Copyright 2016 Rackspace Inc.
#
# Author: Eric Larson <eric.larson@rackspace.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.mport threading
from unittest import TestCase
import mock
import testtools
from oslo_config import cfg
from functionaltests.common import utils
from designate import exceptions
from designate.worker.tasks import zone
from designate.worker import processing
class TestZoneAction(TestCase):
def setUp(self):
self.context = mock.Mock()
self.pool = 'default_pool'
self.executor = mock.Mock()
self.task = zone.ZoneAction(
self.executor, self.context, self.pool, mock.Mock(), 'CREATE'
)
self.task._wait_for_nameservers = mock.Mock()
def test_constructor(self):
assert self.task
def test_call(self):
self.task._zone_action_on_targets = mock.Mock(return_value=True)
self.task._poll_for_zone = mock.Mock(return_value=True)
result = self.task()
assert result is True
assert self.task._wait_for_nameservers.called
assert self.task._zone_action_on_targets.called
assert self.task._poll_for_zone.called
def test_call_on_delete(self):
myzone = mock.Mock()
task = zone.ZoneAction(
self.executor, self.context, self.pool, myzone, 'DELETE'
)
task._zone_action_on_targets = mock.Mock(return_value=True)
task._poll_for_zone = mock.Mock(return_value=True)
task._wait_for_nameservers = mock.Mock()
assert task()
assert myzone.serial == 0
def test_call_fails_on_zone_targets(self):
self.task._zone_action_on_targets = mock.Mock(return_value=False)
assert not self.task()
def test_call_fails_on_poll_for_zone(self):
self.task._zone_action_on_targets = mock.Mock(return_value=False)
assert not self.task()
@mock.patch.object(zone, 'time')
def test_wait_for_nameservers(self, time):
# It is just a time.sleep :(
task = zone.ZoneAction(
self.executor, self.context, self.pool, mock.Mock(), 'CREATE'
)
task._wait_for_nameservers()
time.sleep.assert_called_with(task.delay)
class TestZoneActor(TestCase):
"""The zone actor runs actions for zones in multiple threads and
ensures the result meets the required thresholds for calling it
done.
"""
def setUp(self):
self.context = mock.Mock()
self.pool = mock.Mock()
self.executor = mock.Mock()
self.actor = zone.ZoneActor(
self.executor,
self.context,
self.pool,
mock.Mock(action='CREATE'),
)
def test_invalid_action(self):
with testtools.ExpectedException(Exception, "Bad Action"):
self.actor._validate_action('BAD')
def test_threshold_from_config(self):
actor = zone.ZoneActor(
self.executor, self.context, self.pool, mock.Mock(action='CREATE')
)
default = cfg.CONF['service:worker'].threshold_percentage
assert actor.threshold == default
def test_execute(self):
self.pool.targets = ['target 1']
self.actor.executor.run.return_value = ['foo']
results = self.actor._execute()
assert results == ['foo']
def test_call(self):
self.actor.pool.targets = ['target 1']
self.actor.executor.run.return_value = [True]
assert self.actor() is True
def test_threshold_met_true(self):
self.actor._threshold = 80
results = [True for i in range(8)] + [False, False]
assert self.actor._threshold_met(results)
def test_threshold_met_false(self):
self.actor._threshold = 90
self.actor._update_status = mock.Mock()
results = [False] + [True for i in range(8)] + [False]
assert not self.actor._threshold_met(results)
assert self.actor._update_status.called
assert self.actor.zone.status == 'ERROR'
QUERY_RESULTS = {
'delete_success_all': {
'case': {
'action': 'DELETE',
'results': [0, 0, 0, 0],
'zone_serial': 1,
'positives': 4,
'no_zones': 4,
'consensus_serial': 0
}
},
'delete_success_half': {
'case': {
'action': 'DELETE',
'results': [1, 0, 1, 0],
'zone_serial': 1,
'positives': 2,
'no_zones': 2,
'consensus_serial': 0
},
},
'update_success_all': {
'case': {
'action': 'UPDATE',
'results': [2, 2, 2, 2],
'zone_serial': 2,
'positives': 4,
'no_zones': 0,
'consensus_serial': 2
},
},
'update_fail_all': {
'case': {
'action': 'UPDATE',
'results': [1, 1, 1, 1],
'zone_serial': 2,
'positives': 0,
'no_zones': 0,
# The consensus serial is never updated b/c the nameserver
# serials are less than the zone serial.
'consensus_serial': 0
},
},
'update_success_with_higher_serial': {
'case': {
'action': 'UPDATE',
'results': [2, 1, 0, 3],
'zone_serial': 2,
'positives': 2,
'no_zones': 1,
'consensus_serial': 2
},
},
'update_success_all_higher_serial': {
'case': {
'action': 'UPDATE',
'results': [3, 3, 3, 3],
'zone_serial': 2,
'positives': 4,
'no_zones': 0,
'consensus_serial': 3,
}
},
}
@utils.parameterized_class
class TestParseQueryResults(TestCase):
@utils.parameterized(QUERY_RESULTS)
def test_result_cases(self, case):
z = mock.Mock(action=case['action'])
if case.get('zone_serial'):
z.serial = case['zone_serial']
result = zone.parse_query_results(
case['results'], z
)
assert result.positives == case['positives']
assert result.no_zones == case['no_zones']
assert result.consensus_serial == case['consensus_serial']
class TestZonePoller(TestCase):
def setUp(self):
self.context = mock.Mock()
self.pool = mock.Mock()
self.zone = mock.Mock(name='example.com.', serial=1)
self.threshold = 80
self.executor = mock.Mock()
self.poller = zone.ZonePoller(
self.executor,
self.context,
self.pool,
self.zone,
)
self.poller._threshold = self.threshold
def test_constructor(self):
assert self.poller
assert self.poller.threshold == self.threshold
def test_call_on_success(self):
ns_results = [2 for i in range(8)] + [0, 0]
result = zone.DNSQueryResult(
positives=8,
no_zones=2,
consensus_serial=2,
results=ns_results,
)
self.poller.zone.action = 'UPDATE'
self.poller.zone.serial = 2
self.poller._do_poll = mock.Mock(return_value=result)
self.poller._on_success = mock.Mock(return_value=True)
self.poller._update_status = mock.Mock()
assert self.poller()
self.poller._on_success.assert_called_with(result, 'SUCCESS')
self.poller._update_status.called
self.poller.zone.serial = 2
self.poller.zone.status = 'SUCCESS'
def test_threshold_met_true(self):
ns_results = [2 for i in range(8)] + [0, 0]
result = zone.DNSQueryResult(
positives=8,
no_zones=2,
consensus_serial=2,
results=ns_results,
)
success, status = self.poller._threshold_met(result)
assert success
assert status == 'SUCCESS'
def test_threshold_met_false_low_positives(self):
# 6 positives, 4 behind the serial (aka 0 no_zones)
ns_results = [2 for i in range(6)] + [1 for i in range(4)]
result = zone.DNSQueryResult(
positives=6,
no_zones=0,
consensus_serial=2,
results=ns_results,
)
success, status = self.poller._threshold_met(result)
assert not success
assert status == 'ERROR'
def test_threshold_met_true_no_zones(self):
# Change is looking for serial 2
# 4 positives, 4 no zones, 2 behind the serial
ns_results = [2 for i in range(4)] + [0 for i in range(4)] + [1, 1]
result = zone.DNSQueryResult(
positives=4,
no_zones=4,
consensus_serial=1,
results=ns_results,
)
# Set the threshold to 30%
self.poller._threshold = 30
self.poller.zone.action = 'UPDATE'
success, status = self.poller._threshold_met(result)
assert success
assert status == 'SUCCESS'
def test_threshold_met_false_no_zones(self):
# Change is looking for serial 2
# 4 positives, 4 no zones
ns_results = [2 for i in range(4)] + [0 for i in range(4)]
result = zone.DNSQueryResult(
positives=4,
no_zones=4,
consensus_serial=2,
results=ns_results,
)
# Set the threshold to 100%
self.poller._threshold = 100
self.poller.zone.action = 'UPDATE'
success, status = self.poller._threshold_met(result)
assert not success
assert status == 'NO_ZONE'
def test_threshold_met_false_no_zones_one_result(self):
# Change is looking for serial 2
# 4 positives, 4 no zones
ns_results = [0]
result = zone.DNSQueryResult(
positives=0,
no_zones=1,
consensus_serial=2,
results=ns_results,
)
# Set the threshold to 100%
self.poller._threshold = 100
self.poller.zone.action = 'UPDATE'
success, status = self.poller._threshold_met(result)
assert not success
assert status == 'NO_ZONE'
def test_on_success(self):
query_result = mock.Mock(consensus_serial=10)
result = self.poller._on_success(query_result, 'FOO')
assert result is True
assert self.zone.serial == 10
assert self.zone.status == 'FOO'
def test_on_error_failure(self):
result = self.poller._on_failure('FOO')
assert result is False
assert self.zone.status == 'FOO'
def test_on_no_zones_failure(self):
result = self.poller._on_failure('NO_ZONE')
assert result is False
assert self.zone.status == 'NO_ZONE'
assert self.zone.action == 'CREATE'
class TestZonePollerPolling(TestCase):
def setUp(self):
self.executor = processing.Executor()
self.context = mock.Mock()
self.zone = mock.Mock(name='example.com.', action='UPDATE', serial=10)
self.pool = mock.Mock(nameservers=['ns1', 'ns2'])
self.threshold = 80
self.poller = zone.ZonePoller(
self.executor,
self.context,
self.pool,
self.zone,
)
self.max_retries = 4
self.retry_interval = 2
self.poller._max_retries = self.max_retries
self.poller._retry_interval = self.retry_interval
@mock.patch.object(zone, 'PollForZone')
def test_do_poll(self, PollForZone):
PollForZone.return_value = mock.Mock(return_value=10)
result = self.poller._do_poll()
assert result
assert result.positives == 2
assert result.no_zones == 0
assert result.results == [10, 10]
@mock.patch.object(zone, 'time', mock.Mock())
def test_do_poll_with_retry(self):
exe = mock.Mock()
exe.run.side_effect = [
[0, 0], [10, 10]
]
self.poller.executor = exe
result = self.poller._do_poll()
assert result
zone.time.sleep.assert_called_with(self.retry_interval)
# retried once
assert len(zone.time.sleep.mock_calls) == 1
@mock.patch.object(zone, 'time', mock.Mock())
def test_do_poll_with_retry_until_fail(self):
exe = mock.Mock()
exe.run.return_value = [0, 0]
self.poller.executor = exe
self.poller._do_poll()
assert len(zone.time.sleep.mock_calls) == self.max_retries
class TestUpdateStatus(TestCase):
def setUp(self):
self.executor = processing.Executor()
self.task = zone.UpdateStatus(self.executor, mock.Mock(), mock.Mock())
self.task._central_api = mock.Mock()
def test_call_on_delete(self):
self.task.zone.action = 'DELETE'
self.task()
assert self.task.zone.action == 'NONE'
assert self.task.zone.status == 'NO_ZONE'
assert self.task.central_api.update_status.called
def test_call_on_success(self):
self.task.zone.status = 'SUCCESS'
self.task()
assert self.task.zone.action == 'NONE'
assert self.task.central_api.update_status.called
def test_call_central_call(self):
self.task.zone.status = 'SUCCESS'
self.task()
self.task.central_api.update_status.assert_called_with(
self.task.context,
self.task.zone.id,
self.task.zone.status,
self.task.zone.serial,
)
def test_call_on_delete_error(self):
self.task.zone.action = 'DELETE'
self.task.zone.status = 'ERROR'
self.task()
assert self.task.zone.action == 'DELETE'
assert self.task.zone.status == 'ERROR'
assert self.task.central_api.update_status.called
def test_call_on_create_error(self):
self.task.zone.action = 'CREATE'
self.task.zone.status = 'ERROR'
self.task()
assert self.task.zone.action == 'CREATE'
assert self.task.zone.status == 'ERROR'
assert self.task.central_api.update_status.called
def test_call_on_update_error(self):
self.task.zone.action = 'UPDATE'
self.task.zone.status = 'ERROR'
self.task()
assert self.task.zone.action == 'UPDATE'
assert self.task.zone.status == 'ERROR'
assert self.task.central_api.update_status.called
class TestPollForZone(TestCase):
def setUp(self):
self.zone = mock.Mock(serial=1)
self.zone.name = 'example.org.'
self.executor = processing.Executor()
self.ns = mock.Mock(host='ns.example.org', port=53)
self.task = zone.PollForZone(self.executor, self.zone, self.ns)
self.task._max_retries = 3
self.task._retry_interval = 2
@mock.patch.object(zone.wutils, 'get_serial', mock.Mock(return_value=10))
def test_get_serial(self):
assert self.task._get_serial() == 10
zone.wutils.get_serial.assert_called_with(
'example.org.',
'ns.example.org',
port=53
)
def test_call(self):
self.task._get_serial = mock.Mock(return_value=10)
result = self.task()
assert result == 10
class TestExportZone(TestCase):
def setUp(self):
self.zone = mock.Mock(name='example.com.', serial=1)
self.export = mock.Mock()
self.export.id = '1'
self.executor = processing.Executor()
self.context = mock.Mock()
self.task = zone.ExportZone(
self.executor, self.context, self.zone, self.export)
self.task._central_api = mock.Mock()
self.task._storage = mock.Mock()
self.task._quota = mock.Mock()
self.task._quota.limit_check = mock.Mock()
self.task._storage.count_recordsets = mock.Mock(return_value=1)
self.task._synchronous_export = mock.Mock(return_value=True)
def test_sync_export_right_size(self):
self.task()
assert self.export.status == 'COMPLETE'
s = "designate://v2/zones/tasks/exports/%s/export" % self.export.id
assert self.export.location == s
def test_sync_export_wrong_size_fails(self):
self.task._quota.limit_check = mock.Mock(
side_effect=exceptions.OverQuota)
self.task()
assert self.export.status == 'ERROR'
def test_async_export_fails(self):
self.task._synchronous_export = mock.Mock(return_value=False)
self.task()
assert self.export.status == 'ERROR'

View File

@ -1912,16 +1912,18 @@ class CentralZoneExportTests(CentralBasic):
)
self.service.storage.create_zone_export = Mock(
return_value=RoObject(
return_value=RwObject(
id='1',
zone_id='123',
task_type='EXPORT',
status='PENDING',
message=None,
tenant_id='t'
tenant_id='t',
location=None,
)
)
self.service.zone_manager_api.start_zone_export = Mock()
self.service.worker_api.start_zone_export = Mock()
out = self.service.create_zone_export(
self.context,

View File

@ -0,0 +1,60 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Federico Ceratto <federico.ceratto@hpe.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit-test Producer service
"""
import mock
from oslotest import base as test
from designate.tests.unit import RoObject
import designate.producer.service as ps
@mock.patch.object(ps.rpcapi.CentralAPI, 'get_instance')
class ProducerTest(test.BaseTestCase):
def setUp(self):
ps.CONF = RoObject({
'service:producer': RoObject({
'enabled_tasks': None, # enable all tasks
}),
# TODO(timsim): Remove this
'service:zone_manager': RoObject({
'enabled_tasks': None, # enable all tasks
'export_synchronous': True
}),
'producer_task:zone_purge': '',
})
super(ProducerTest, self).setUp()
self.tm = ps.Service()
self.tm._storage = mock.Mock()
self.tm._rpc_server = mock.Mock()
self.tm._quota = mock.Mock()
self.tm.quota.limit_check = mock.Mock()
def test_service_name(self, _):
self.assertEqual('producer', self.tm.service_name)
def test_central_api(self, _):
capi = self.tm.central_api
assert isinstance(capi, mock.MagicMock)
@mock.patch.object(ps.tasks, 'PeriodicTask')
@mock.patch.object(ps.coordination, 'Partitioner')
def test_stark(self, _, mock_partitioner, mock_PeriodicTask):
self.tm.start()

View File

@ -15,7 +15,7 @@
# under the License.
"""
Unit test Zone Manager tasks
Unit test Producer tasks
"""
import datetime
import uuid
@ -29,7 +29,7 @@ import testtools
from designate.central import rpcapi as central_api
from designate import context
from designate import rpc
from designate.zone_manager import tasks
from designate.producer import tasks
from designate.tests.unit import RoObject
@ -55,7 +55,7 @@ class PeriodicTest(TaskTest):
super(PeriodicTest, self).setUp()
opts = {
"zone_manager_task:dummy": RoObject({
"producer_task:dummy": RoObject({
"per_page": 100,
})
}
@ -118,7 +118,7 @@ class PeriodicExistsTest(TaskTest):
super(PeriodicExistsTest, self).setUp()
opts = {
"zone_manager_task:periodic_exists": RoObject({
"producer_task:periodic_exists": RoObject({
"per_page": 100,
"interval": 5
})
@ -204,7 +204,7 @@ class PeriodicSecondaryRefreshTest(TaskTest):
super(PeriodicSecondaryRefreshTest, self).setUp()
opts = {
"zone_manager_task:periodic_secondary_refresh": RoObject({
"producer_task:periodic_secondary_refresh": RoObject({
"per_page": 100
})
}

View File

@ -1,101 +0,0 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Federico Ceratto <federico.ceratto@hpe.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Unit-test Zone Manager service
"""
import mock
from oslotest import base as test
from designate import exceptions
from designate.tests.unit import RoObject
import designate.zone_manager.service as zms
@mock.patch.object(zms.rpcapi.CentralAPI, 'get_instance')
class ZoneManagerTest(test.BaseTestCase):
def setUp(self):
zms.CONF = RoObject({
'service:zone_manager': RoObject({
'enabled_tasks': None, # enable all tasks
'export_synchronous': True
}),
'zone_manager_task:zone_purge': '',
})
super(ZoneManagerTest, self).setUp()
self.tm = zms.Service()
self.tm._storage = mock.Mock()
self.tm._rpc_server = mock.Mock()
self.tm._quota = mock.Mock()
self.tm.quota.limit_check = mock.Mock()
def test_service_name(self, _):
self.assertEqual('zone_manager', self.tm.service_name)
def test_central_api(self, _):
capi = self.tm.central_api
assert isinstance(capi, mock.MagicMock)
@mock.patch.object(zms.tasks, 'PeriodicTask')
@mock.patch.object(zms.coordination, 'Partitioner')
def test_stark(self, _, mock_partitioner, mock_PeriodicTask):
self.tm.start()
def test_start_zone_export(self, _):
zone = RoObject(id=3)
context = mock.Mock()
export = {}
self.tm.storage.count_recordsets.return_value = 1
assert self.tm.storage.count_recordsets() == 1
self.tm._determine_export_method = mock.Mock()
self.tm.start_zone_export(context, zone, export)
assert self.tm._determine_export_method.called
assert self.tm.central_api.update_zone_export.called
call_args = self.tm._determine_export_method.call_args_list[0][0]
self.assertEqual((context, export, 1), call_args)
def test_determine_export_method(self, _):
context = mock.Mock()
export = dict(location=None, id=4)
size = mock.Mock()
out = self.tm._determine_export_method(context, export, size)
self.assertDictEqual(
{
'status': 'COMPLETE', 'id': 4,
'location': 'designate://v2/zones/tasks/exports/4/export'
},
out
)
def test_exceed_size_quota(self, _):
context = mock.Mock()
export = dict(location=None, id=4)
size = 9999999999
self.tm.quota.limit_check.side_effect = exceptions.OverQuota()
out = self.tm._determine_export_method(context, export, size)
self.tm.quota.limit_check.side_effect = None
self.assertDictEqual(
{
'status': 'ERROR',
'id': 4,
'location': None,
'message': 'Zone is too large to export'
},
out
)

View File

@ -119,8 +119,9 @@ def register_plugin_opts():
# Avoid circular dependency imports
from designate import plugin
plugin.Plugin.register_cfg_opts('designate.zone_manager_tasks')
plugin.Plugin.register_extra_cfg_opts('designate.zone_manager_tasks')
# Register Producer Tasks
plugin.Plugin.register_cfg_opts('designate.producer_tasks')
plugin.Plugin.register_extra_cfg_opts('designate.producer_tasks')
# Register Backend Plugin Config Options
plugin.Plugin.register_cfg_opts('designate.backend')
@ -536,3 +537,7 @@ def bind_udp(host, port):
LOG.info(_LI('Listening on UDP port %(port)d'), {'port': newport})
return sock_udp
def max_prop_time(timeout, max_retries, retry_interval, delay):
return timeout * max_retries + max_retries * retry_interval + delay

206
designate/worker/README.md Normal file
View File

@ -0,0 +1,206 @@
# Worker Model Code
The general service looks like any other Designate RPC service. Available
RPC calls are defined in `rpcapi.py` and implemented in `service.py`. Where
this differs is that the `service.py` implementations generally spawn threads
with a directive to invoke some sort of "task".
# Tasks
Tasks are discrete units of work that are represented in the form
of *_callable_* python objects. They can optionally return a value to be
used in the caller.
For (abbreviated) example:
```python
class SendNotify(base.Task):
"""
Send a NOTIFY packet for a zone to a target
:return: Success/Failure delivering the notify (bool)
"""
def __init__(self, executor, zone, target):
super(SendNotify, self).__init__(executor)
self.zone = zone
self.target = target
def __call__(self):
host = self.target.options.get('host')
port = int(self.target.options.get('port'))
try:
wutils.notify(self.zone.name, host, port=port)
return True
except Exception:
return False
```
To invoke:
If you're ok executing it on the local thread: `SendNotify(executor, zone, target)()`
If you want to schedule it in it's own thread, allowing it to yield to others:
```python
self.executor.run(zonetasks.SendNotify(
self.executor, zone, target
))
```
Most tasks are executed using the executor at the top-level, for example when
the worker gets a message to `create_zone`, it will say "pop a thread to create
this zone on the pool", which will eventually flow to "I need to create this
zone on N targets", which will result in a:
```python
results = self.executor.run([
ZoneActionOnTarget(self.executor, self.context, self.zone, target)
for target in self.pool.targets
])
```
You can find the tasks in `designate/worker/tasks`, most tasks inherit from a base
that gives basic access like other rpcapis, storage, etc.
So the one thread for doing the entire zone create will use N threads in the
pool to go and do that, and when they're finished, the task will be back down
to using one thread as it evaluates results. Then it will do something similar
when it needs to poll N nameservers.
# Execution in Threads
The core of how this works is using the
[Python ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor).
This is plugabble, someone could certainly add a different executor,
but it's a simple idea that lets you map callables (tasks) across threads.
Here's an example that shows how you can make multiple calls to a single
ThreadPoolExecutor from concurrent threads (similar to how tasks calling
subtasks would do it).
```python
import concurrent.futures
# Initialize 4 executors
# e is so that we can make two concurrent calls to another executor
e = concurrent.futures.ThreadPoolExecutor(2)
# e_one is the executor that shows that we can make multiple calls from
# different threads to one executor
e_one = concurrent.futures.ThreadPoolExecutor(2)
# e_two and e_three are just separate pools to be used to print numbers
e_two = concurrent.futures.ThreadPoolExecutor(5)
e_three = concurrent.futures.ThreadPoolExecutor(5)
def do(task):
task()
def one():
print '1'
def two():
print '2'
def do_one(tup):
"""
Call the callable len(tup[1]) times concurrently
Since e_one only has two threads in it's pool, it will only be
able to handle two concurrent "jobs"
tup is (callable, list(list))
If one were to pass in (func, [[1]]) the resulting function calls would be:
func([1])
If it was (func, [1, 2]) it would be
func(1)
func(2)
"""
print 'mapping e_one for a list of len %d' % len(tup[1])
e_one.map(tup[0], tup[1])
def do_a(alist):
print 'using e_two to map a list of len %d using do()' % len(alist)
e_two.map(do, alist)
def do_b(alist):
print 'using e_three to map a list of len %d using do()' % len(alist)
e_three.map(do, alist)
# init lists of five callables that will just print a number
ones = [one] * 5
twos = [two] * 5
# a list of tuples, len two that include a function to be mapped eventually, and a list of callables
ones_twos = [(do_a, [ones]), (do_b, [twos])]
# We call do_one twice concurrently on the two tuples
# This makes two concurrent calls to e_one.map, each of which make only
# _one_ call to another function that executes the lists of five callables
# in parallel.
# We do this so that we can see that two concurrent calls to e_one from
# different threads will work concurrently if there is enough room
# in the thread pool.
e.map(do_one, ones_twos)
# Example output:
# $ python threadexectest.py
# mapping e_one for a list of len 1
# mapping e_one for a list of len 1
#
# mapping e_two for a list of len 5
# mapping e_three for a list of len 5
# 1
# 2
# 2
# 1
# 2
# 1
# 2
# 1
# 2
# 1
```
# Metrics
I ran a few tests that did used the old code vs the new code. There are obviously
a ton of different variables here (number of apis/centrals, dns server used, database
setup, rabbit setup), but other tests that I've done in different random configurations
have shown similar results to these two, so I think it's a good representation of what
the differences are.
Pool Manager Test
- 8 Nameservers
- 12 `designate-pool-manager` processes
- 1 hour
- Testing actual DNS propagation
Results:
| Operation | Number | Propagation Stats |
| --------------- | ------ | --------------------------------------------- |
| Creates/Imports | 5700 | Avg propagation 19s >99% propagation in 30min |
| Zone Deletes | 4600 | Avg propagation 16s >99% propagation in 30min |
| Zone Updates | 18057 | Avg propagation 384s ~90 propagation in 30min |
Propagation Graph: ![](http://i.imgur.com/g3kodip.png)
Notice the prop times are increasing as time went on, so a longer test would
almost certainly show even worse times.
Worker Test
- 8 Nameservers
- 12 `designate-worker` processes
- 1 hour
- Testing actual DNS propagation
Results:
| Operation | Number | Propagation Stats |
| --------------- | ------ | ---------------------------------------------- |
| Creates/Imports | 6413 | Avg propagation 8s >99.99% propagation in 5min |
| Zone Deletes | 2077 | Avg propagation 4s 100% propagation in 5min |
| Zone Updates | 23750 | Avg propagation 5s ~99.99% propagation in 5min |
Propagation Graph: ![](http://i.imgur.com/fM9J9l9.png)

View File

@ -0,0 +1,60 @@
# Copyright 2016 Rackspace Inc.
#
# Author: Tim Simmons <tim.simmons@rackspace.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
CONF = cfg.CONF
CONF.register_group(cfg.OptGroup(
name='service:worker', title="Configuration for the Worker Service"
))
OPTS = [
cfg.BoolOpt('enabled', default=False,
help='Whether to send events to worker instead of '
'Pool Manager',
deprecated_for_removal=True,
deprecated_reason='In Newton, this option will disappear'
'because worker will be enabled by default'),
cfg.IntOpt('workers',
help='Number of Worker worker processes to spawn'),
cfg.IntOpt('threads', default=200,
help='Number of Worker threads to spawn per process'),
# cfg.ListOpt('enabled_tasks',
# help='Enabled tasks to run'),
cfg.StrOpt('storage-driver', default='sqlalchemy',
help='The storage driver to use'),
cfg.IntOpt('threshold-percentage', default=100,
help='The percentage of servers requiring a successful update '
'for a domain change to be considered active'),
cfg.IntOpt('poll-timeout', default=30,
help='The time to wait for a response from a server'),
cfg.IntOpt('poll-retry-interval', default=15,
help='The time between retrying to send a request and '
'waiting for a response from a server'),
cfg.IntOpt('poll-max-retries', default=10,
help='The maximum number of times to retry sending a request '
'and wait for a response from a server'),
cfg.IntOpt('poll-delay', default=5,
help='The time to wait before sending the first request '
'to a server'),
cfg.BoolOpt('notify', default=True,
help='Whether to allow worker to send NOTIFYs, this will '
'noop NOTIFYs in mdns if true'),
cfg.BoolOpt('export-synchronous', default=True,
help='Whether to allow synchronous zone exports')
]
CONF.register_opts(OPTS, group='service:worker')

View File

@ -0,0 +1,79 @@
# Copyright 2016 Rackspace Inc.
#
# Author: Eric Larson <eric.larson@rackspace.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from concurrent import futures
from oslo_log import log as logging
from oslo_config import cfg
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
def default_executor():
thread_count = 5
try:
thread_count = CONF['service:worker'].threads
except Exception:
pass
return futures.ThreadPoolExecutor(thread_count)
class Executor(object):
"""
Object to facilitate the running of a task, or a set of tasks on an
executor that can map multiple tasks across a configurable number of
threads
"""
def __init__(self, executor=None):
self._executor = executor or default_executor()
@staticmethod
def do(task):
return task()
def task_name(self, task):
if hasattr(task, 'task_name'):
return str(task.task_name)
if hasattr(task, 'func_name'):
return str(task.func_name)
return 'UnnamedTask'
def run(self, tasks):
"""
Run task or set of tasks
:param tasks: the task or tasks you want to execute in the
executor's pool
:return: The results of the tasks (list)
If a single task is pass
"""
self.start_time = time.time()
if callable(tasks):
tasks = [tasks]
results = [r for r in self._executor.map(self.do, tasks)]
self.end_time = time.time()
self.task_time = self.end_time - self.start_time
task_names = [self.task_name(t) for t in tasks]
LOG.debug("Finished Tasks %(tasks)s in %(time)fs",
{'tasks': task_names, 'time': self.task_time})
return results

View File

@ -1,4 +1,4 @@
# Copyright 2015 Rackspace Inc.
# Copyright 2016 Rackspace Inc.
#
# Author: Tim Simmons <tim.simmons@rackspace.com>
#
@ -20,21 +20,15 @@ import oslo_messaging as messaging
from designate import rpc
from designate.loggingutils import rpc_logging
LOG = logging.getLogger(__name__)
ZONE_MANAGER_API = None
WORKER_API = None
def reset():
global ZONE_MANAGER_API
ZONE_MANAGER_API = None
@rpc_logging(LOG, 'zone_manager')
class ZoneManagerAPI(object):
@rpc_logging(LOG, 'worker')
class WorkerAPI(object):
"""
Client side of the zone manager RPC API.
Client side of the worker RPC API.
API version history:
@ -43,7 +37,7 @@ class ZoneManagerAPI(object):
RPC_API_VERSION = '1.0'
def __init__(self, topic=None):
topic = topic if topic else cfg.CONF.zone_manager_topic
topic = topic if topic else cfg.CONF.worker_topic
target = messaging.Target(topic=topic, version=self.RPC_API_VERSION)
self.client = rpc.get_client(target, version_cap='1.0')
@ -57,16 +51,27 @@ class ZoneManagerAPI(object):
This fixes that by creating the rpcapi when demanded.
"""
global ZONE_MANAGER_API
if not ZONE_MANAGER_API:
ZONE_MANAGER_API = cls()
return ZONE_MANAGER_API
global WORKER_API
if not WORKER_API:
WORKER_API = cls()
return WORKER_API
def create_zone(self, context, zone):
return self.client.cast(
context, 'create_zone', zone=zone)
def update_zone(self, context, zone):
return self.client.cast(
context, 'update_zone', zone=zone)
def delete_zone(self, context, zone):
return self.client.cast(
context, 'delete_zone', zone=zone)
def recover_shard(self, context, begin, end):
return self.client.cast(
context, 'recover_shard', begin=begin, end=end)
# Zone Export
def start_zone_export(self, context, zone, export):
return self.client.cast(context, 'start_zone_export', zone=zone,
export=export)
def render_zone(self, context, zone_id):
return self.client.call(context, 'render_zone',
zone_id=zone_id)
return self.client.cast(
context, 'start_zone_export', zone=zone, export=export)

172
designate/worker/service.py Normal file
View File

@ -0,0 +1,172 @@
# Copyright 2016 Rackspace Inc.
#
# Author: Tim Simmons <tim.simmons@rackspace.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from designate.i18n import _LI
from designate.i18n import _LE
from designate import backend
from designate import exceptions
from designate import service
from designate import storage
from designate.central import rpcapi as central_api
from designate.context import DesignateContext
from designate.worker.tasks import zone as zonetasks
from designate.worker import processing
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class Service(service.RPCService, service.Service):
RPC_API_VERSION = '1.0'
target = messaging.Target(version=RPC_API_VERSION)
@property
def central_api(self):
if not hasattr(self, '_central_api'):
self._central_api = central_api.CentralAPI.get_instance()
return self._central_api
def _setup_target_backends(self, pool):
for target in pool.targets:
# Fetch an instance of the Backend class
target.backend = backend.get_backend(
target.type, target)
LOG.info(_LI('%d targets setup'), len(pool.targets))
if len(pool.targets) == 0:
raise exceptions.NoPoolTargetsConfigured()
return pool
def load_pool(self, pool_id):
# Build the Pool (and related) Object from Config
context = DesignateContext.get_admin_context()
pool = None
has_targets = False
while not has_targets:
try:
pool = self.central_api.get_pool(context, pool_id)
if len(pool.targets) > 0:
has_targets = True
else:
LOG.error(_LE("No targets for %s found."), pool)
time.sleep(5)
# Pool data may not have migrated to the DB yet
except exceptions.PoolNotFound:
LOG.error(_LE("Pool ID %s not found."), pool_id)
time.sleep(5)
# designate-central service may not have started yet
except messaging.exceptions.MessagingTimeout:
time.sleep(0.2)
return self._setup_target_backends(pool)
@property
def service_name(self):
return 'worker'
@property
def storage(self):
if not hasattr(self, '_storage'):
storage_driver = cfg.CONF['service:worker'].storage_driver
self._storage = storage.get_storage(storage_driver)
return self._storage
@property
def executor(self):
if not hasattr(self, '_executor'):
# TODO(elarson): Create this based on config
self._executor = processing.Executor()
return self._executor
@property
def pools_map(self):
if not hasattr(self, '_pools_map'):
self._pools_map = {}
return self._pools_map
def get_pool(self, pool_id):
if pool_id not in self.pools_map:
LOG.info(_LI("Lazily loading pool %s"), pool_id)
self.pools_map[pool_id] = self.load_pool(pool_id)
return self.pools_map[pool_id]
def start(self):
super(Service, self).start()
LOG.info(_LI('Started worker'))
def _do_zone_action(self, context, zone):
pool = self.get_pool(zone.pool_id)
task = zonetasks.ZoneAction(
self.executor, context, pool, zone, zone.action
)
return self.executor.run(task)
def create_zone(self, context, zone):
"""
:param context: Security context information.
:param zone: Zone to be created
:return: None
"""
self._do_zone_action(context, zone)
def update_zone(self, context, zone):
"""
:param context: Security context information.
:param zone: Zone to be updated
:return: None
"""
self._do_zone_action(context, zone)
def delete_zone(self, context, zone):
"""
:param context: Security context information.
:param zone: Zone to be deleted
:return: None
"""
self._do_zone_action(context, zone)
def recover_shard(self, context, begin, end):
"""
:param begin: the beginning of the shards to recover
:param end: the end of the shards to recover
:return: None
"""
return self.executor.run(zonetasks.RecoverShard(
self.executor, context, begin, end
))
def start_zone_export(self, context, zone, export):
"""
:param zone: Zone to be exported
:param export: Zone Export object to update
:return: None
"""
return self.executor.run(zonetasks.ExportZone(
self.executor, context, zone, export
))

View File

View File

@ -0,0 +1,127 @@
# Copyright 2016 Rackspace Inc.
#
# Author: Tim Simmons <tim.simmons@rackspace.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.mport threading
from oslo_config import cfg
from oslo_log import log as logging
from designate.central import rpcapi as central_rpcapi
from designate import quota
from designate import storage
from designate import utils
from designate.worker import rpcapi as worker_rpcapi
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class TaskConfig(object):
"""
Configuration mixin for the various configuration settings that
a task may want to access
"""
@property
def config(self):
if not hasattr(self, '_config'):
self._config = CONF['service:worker']
return self._config
@property
def threshold_percentage(self):
if not hasattr(self, '_threshold_percentage'):
self._threshold_percentage = self.config.threshold_percentage
return self._threshold_percentage
@property
def timeout(self):
if not hasattr(self, '_timeout'):
self._timeout = self.config.poll_timeout
return self._timeout
@property
def retry_interval(self):
if not hasattr(self, '_retry_interval'):
self._retry_interval = self.config.poll_retry_interval
return self._retry_interval
@property
def max_retries(self):
if not hasattr(self, '_max_retries'):
self._max_retries = self.config.poll_max_retries
return self._max_retries
@property
def delay(self):
if not hasattr(self, '_delay'):
self._delay = self.config.poll_delay
return self._delay
@property
def max_prop_time(self):
# Compute a time (seconds) by which things should have propagated
if not hasattr(self, '_max_prop_time'):
self._max_prop_time = utils.max_prop_time(
self.timeout,
self.max_retries,
self.retry_interval,
self.delay
)
return self._max_prop_time
class Task(TaskConfig):
"""
Base task interface that includes some helpful connections to other
services and the basic skeleton for tasks.
Tasks are:
- Callable
- Take an executor as their first parameter
- Can optionally return something
"""
def __init__(self, executor, **kwargs):
self.executor = executor
self.task_name = self.__class__.__name__
self.options = {}
@property
def storage(self):
if not hasattr(self, '_storage'):
# Get a storage connection
storage_driver = cfg.CONF['service:central'].storage_driver
self._storage = storage.get_storage(storage_driver)
return self._storage
@property
def quota(self):
if not hasattr(self, '_quota'):
# Get a quota manager instance
self._quota = quota.get_quota()
return self._quota
@property
def central_api(self):
if not hasattr(self, '_central_api'):
self._central_api = central_rpcapi.CentralAPI.get_instance()
return self._central_api
@property
def worker_api(self):
if not hasattr(self, '_worker_api'):
self._worker_api = worker_rpcapi.WorkerAPI.get_instance()
return self._worker_api
def __call__(self):
raise NotImplementedError

View File

@ -0,0 +1,609 @@
# Copyright 2016 Rackspace Inc.
#
# Author: Tim Simmons <tim.simmons@rackspace.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from collections import namedtuple
import dns
from oslo_config import cfg
from oslo_log import log as logging
from designate.i18n import _LI
from designate.i18n import _LW
from designate.worker import utils as wutils
from designate.worker.tasks import base
from designate import exceptions
from designate import utils
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
def percentage(part, whole):
if whole == 0:
return 0
return 100 * float(part) / float(whole)
class ThresholdMixin(object):
@property
def threshold(self):
if not hasattr(self, '_threshold') or self._threshold is None:
self._threshold = CONF['service:worker'].threshold_percentage
return self._threshold
def _compare_threshold(self, successes, total):
p = percentage(successes, total)
return p >= self.threshold
######################
# CRUD Zone Operations
######################
class ZoneActionOnTarget(base.Task):
"""
Perform a Create/Update/Delete of the zone on a pool target
:return: Success/Failure of the target action (bool)
"""
def __init__(self, executor, context, zone, target):
super(ZoneActionOnTarget, self).__init__(executor)
self.zone = zone
self.action = zone.action
self.target = target
self.context = context
self.task_name = 'ZoneActionOnTarget-%s' % self.action.title()
def __call__(self):
LOG.debug("Attempting %(action)s zone %(zone)s on %(target)s",
{'action': self.action, 'zone': self.zone.name,
'target': self.target})
for retry in range(0, self.max_retries):
try:
if self.action == 'CREATE':
self.target.backend.create_zone(self.context, self.zone)
SendNotify(self.executor, self.zone, self.target)()
elif self.action == 'UPDATE':
self.target.backend.update_zone(self.context, self.zone)
SendNotify(self.executor, self.zone, self.target)()
elif self.action == 'DELETE':
self.target.backend.delete_zone(self.context, self.zone)
LOG.debug("Successful %s zone %s on %s",
self.action, self.zone.name, self.target)
return True
except Exception as e:
LOG.info(_LI('Failed to %(action)s zone %(zone)s on '
'target %(target)s on attempt %(attempt)d, '
'Error: %(error)s.'), {'action': self.action,
'zone': self.zone.name, 'target': self.target.id,
'attempt': retry + 1, 'error': str(e)})
time.sleep(self.retry_interval)
return False
class SendNotify(base.Task):
"""
Send a NOTIFY packet and retry on failure to receive
:raises: Various exceptions from dnspython
:return: Success/Failure delivering the notify (bool)
"""
def __init__(self, executor, zone, target):
super(SendNotify, self).__init__(executor)
self.zone = zone
self.target = target
def __call__(self):
if not CONF['service:worker'].notify:
# TODO(timsim): Remove this someday
return True
host = self.target.options.get('host')
port = int(self.target.options.get('port'))
try:
wutils.notify(self.zone.name, host, port=port)
LOG.debug('Sent NOTIFY to %(host)s:%(port)s for zone '
'%(zone)s', {'host': host,
'port': port, 'zone': self.zone.name})
return True
except dns.exception.Timeout as e:
LOG.info(_LI('Timeout on NOTIFY to %(host)s:%(port)s for zone '
'%(zone)s'), {'host': host,
'port': port, 'zone': self.zone.name})
raise e
return False
class ZoneActor(base.Task, ThresholdMixin):
"""
Orchestrate the Create/Update/Delete action on targets and update status
if it fails. We would only update status here on an error to perform the
necessary backend CRUD action. If there's a failure in propagating all
the way to the nameservers, that will be picked up in a ZonePoller.
:return: Whether the ActionOnTarget got to a satisfactory number
of targets (bool)
"""
def __init__(self, executor, context, pool, zone):
self.executor = executor
self.context = context
self.pool = pool
self.zone = zone
def _validate_action(self, action):
if action not in ['CREATE', 'UPDATE', 'DELETE']:
raise Exception('Bad Action')
def _execute(self):
results = self.executor.run([
ZoneActionOnTarget(self.executor, self.context, self.zone, target)
for target in self.pool.targets
])
return results
def _update_status(self):
task = UpdateStatus(self.executor, self.context, self.zone)
task()
def _threshold_met(self, results):
# If we don't meet threshold for action, update status
met_action_threshold = self._compare_threshold(
results.count(True), len(results))
if not met_action_threshold:
LOG.info(_LI('Could not %(action)s %(zone)s on enough targets. '
'Updating status to ERROR'),
{'action': self.zone.action, 'zone': self.zone.name})
self.zone.status = 'ERROR'
self._update_status()
return False
return True
def __call__(self):
self._validate_action(self.zone.action)
results = self._execute()
return self._threshold_met(results)
class ZoneAction(base.Task):
"""
Orchestrate a complete Create/Update/Delete of the specified zone on the
pool and the polling for the change
:return: Success/Failure of the change propagating to a satisfactory
number of nameservers (bool)
"""
def __init__(self, executor, context, pool, zone, action):
super(ZoneAction, self).__init__(executor)
self.context = context
self.pool = pool
self.zone = zone
self.action = action
self.task_name = 'ZoneAction-%s' % self.action.title()
def _wait_for_nameservers(self):
"""
Pause to give the nameservers a chance to update
"""
time.sleep(self.delay)
def _zone_action_on_targets(self):
actor = ZoneActor(
self.executor, self.context, self.pool, self.zone
)
return actor()
def _poll_for_zone(self):
poller = ZonePoller(self.executor, self.context, self.pool, self.zone)
return poller()
def __call__(self):
LOG.info(_LI('Attempting %(action)s on zone %(name)s'),
{'action': self.action, 'name': self.zone.name})
if not self._zone_action_on_targets():
return False
self._wait_for_nameservers()
if self.action == 'DELETE':
self.zone.serial = 0
if not self._poll_for_zone():
return False
return True
##############
# Zone Polling
##############
DNSQueryResult = namedtuple(
'DNSQueryResult', [
'positives',
'no_zones',
'consensus_serial',
'results'
]
)
def parse_query_results(results, zone):
"""
results is a [serial/None, ...]
"""
delete = zone.action == 'DELETE'
positives = 0
no_zones = 0
low_serial = 0
for serial in results:
if serial is None:
# Intentionally don't handle None
continue
if delete:
if serial == 0:
no_zones += 1
positives += 1
else:
if serial >= zone.serial:
positives += 1
# Update the lowest valid serial aka the consensus
# serial
if low_serial == 0 or serial < low_serial:
low_serial = serial
else:
if serial == 0:
no_zones += 1
result = DNSQueryResult(positives, no_zones, low_serial, results)
LOG.debug('Results for polling %(zone)s-%(serial)d: %(tup)s',
{'zone': zone.name, 'serial': zone.serial, 'tup': result})
return result
class PollForZone(base.Task):
"""
Send SOA queries to a nameserver for the zone. This could be a serial
number, or that the zone does not exist.
:return: A serial number if the zone exists (int), None if the zone
does not exist
"""
def __init__(self, executor, zone, ns):
super(PollForZone, self).__init__(executor)
self.zone = zone
self.ns = ns
def _get_serial(self):
return wutils.get_serial(
self.zone.name,
self.ns.host,
port=self.ns.port
)
def __call__(self):
LOG.debug('Polling for zone %(zone)s serial %(serial)s on %(ns)s',
{'zone': self.zone.name, 'serial': self.zone.serial,
'ns': self.ns})
try:
serial = self._get_serial()
LOG.debug('Found serial %(serial)d on %(host)s for zone '
'%(zone)s', {'serial': serial, 'host': self.ns.host,
'zone': self.zone.name})
return serial
# TODO(timsim): cache if it's higher than cache
except dns.exception.Timeout:
LOG.info(_LI('Timeout polling for serial %(serial)d '
'%(host)s for zone %(zone)s'), {'serial': self.zone.serial,
'host': self.ns.host, 'zone': self.zone.name})
except Exception as e:
LOG.warning(_LW('Unexpected failure polling for serial %(serial)d '
'%(host)s for zone %(zone)s. Error: %(error)s'),
{'serial': self.zone.serial, 'host': self.ns.host,
'zone': self.zone.name, 'error': str(e)})
return None
class ZonePoller(base.Task, ThresholdMixin):
"""
Orchestrate polling for a change across the nameservers in a pool
and compute the proper zone status, and update it.
:return: Whether the change was succesfully polled for on a satisfactory
number of nameservers in the pool
"""
def __init__(self, executor, context, pool, zone):
self.executor = executor
self.context = context
self.pool = pool
self.zone = zone
def _update_status(self):
task = UpdateStatus(self.executor, self.context, self.zone)
task()
def _do_poll(self):
"""
Poll nameservers, compute basic success, return detailed query results
for further computation. Retry on failure to poll (meet threshold for
success).
:return: a DNSQueryResult object with the results of polling
"""
nameservers = self.pool.nameservers
retry_interval = self.retry_interval
query_result = DNSQueryResult(0, 0, 0, 0)
results = []
for retry in range(0, self.max_retries):
results = self.executor.run([
PollForZone(self.executor, self.zone, ns)
for ns in nameservers
])
query_result = parse_query_results(results, self.zone)
if self._compare_threshold(query_result.positives, len(results)):
LOG.debug('Successful poll for %(zone)s',
{'zone': self.zone.name})
break
LOG.debug('Unsuccessful poll for %(zone)s on attempt %(n)d',
{'zone': self.zone.name, 'n': retry + 1})
time.sleep(retry_interval)
return query_result
def _on_failure(self, error_status):
LOG.info(_LI('Could not find %(serial)s for %(zone)s on enough '
'nameservers.'),
{'serial': self.zone.serial, 'zone': self.zone.name})
self.zone.status = error_status
if error_status == 'NO_ZONE':
self.zone.action = 'CREATE'
return False
def _on_success(self, query_result, status):
# TODO(timsim): Change this back to active, so dumb central
self.zone.status = status
LOG.debug('Found success for %(zone)s at serial %(serial)d',
{'zone': self.zone.name, 'serial': self.zone.serial})
self.zone.serial = query_result.consensus_serial
return True
def _threshold_met(self, query_result):
"""
Compute whether the thresholds were met. Provide an answer,
and an error status if there was a failure.
The error status should be either:
- ERROR: the operation failed
- NO_ZONE: the zone doesn't exist on enough name servers
:return: Whether the the polling was succesful, and a status
describing the state (bool, str)
"""
total = len(query_result.results)
is_not_delete = self.zone.action != 'DELETE'
# Ensure if we don't have too many nameservers that
# don't have the zone.
over_no_zone_threshold = self._compare_threshold(
(total - query_result.no_zones), total
)
if not over_no_zone_threshold and is_not_delete:
return False, 'NO_ZONE'
# The action should have been pushed out to a minimum
# number of nameservers.
if not self._compare_threshold(query_result.positives, total):
return False, 'ERROR'
# We have success of the action on the nameservers and enough
# nameservers have the zone to call this a success.
return True, 'SUCCESS'
def __call__(self):
query_result = self._do_poll()
result = None
success, status = self._threshold_met(query_result)
if success:
result = self._on_success(query_result, status)
else:
result = self._on_failure(status)
self._update_status()
return result
###################
# Status Management
###################
class UpdateStatus(base.Task):
"""
Inspect the zone object and call central's update_status method.
Some logic is applied that could be removed when central's logic
for updating status is sane
:return: No return value
"""
def __init__(self, executor, context, zone):
super(UpdateStatus, self).__init__(executor)
self.zone = zone
self.context = context
def __call__(self):
# TODO(timsim): Fix this when central's logic is sane
if self.zone.action == 'DELETE' and self.zone.status != 'ERROR':
self.zone.action = 'NONE'
self.zone.status = 'NO_ZONE'
if self.zone.status == 'SUCCESS':
self.zone.action = 'NONE'
# This log message will always have action as NONE and then we
# don't use the action in the update_status call.
LOG.debug('Updating status for %(zone)s to %(status)s:%(action)s',
{'zone': self.zone.name, 'status': self.zone.status,
'action': self.zone.action})
self.central_api.update_status(
self.context,
self.zone.id,
self.zone.status,
self.zone.serial
)
###################
# Periodic Recovery
###################
class RecoverShard(base.Task):
"""
Given a beginning and ending shard, create the work to recover any
zones in an undesirable state within those shards.
:return: No return value
"""
def __init__(self, executor, context, begin, end):
super(RecoverShard, self).__init__(executor)
self.context = context
self.begin_shard = begin
self.end_shard = end
def _get_zones(self):
criterion = {
'shard': "BETWEEN %s,%s" % (self.begin_shard, self.end_shard),
'status': 'ERROR'
}
error_zones = self.storage.find_zones(self.context, criterion)
# Include things that have been hanging out in PENDING
# status for longer than they should
# Generate the current serial, will provide a UTC Unix TS.
current = utils.increment_serial()
stale_criterion = {
'shard': "BETWEEN %s,%s" % (self.begin_shard, self.end_shard),
'status': 'PENDING',
'serial': "<%s" % (current - self.max_prop_time)
}
stale_zones = self.storage.find_zones(self.context, stale_criterion)
if stale_zones:
LOG.warn(_LW('Found %(len)d zones PENDING for more than %(sec)d '
'seconds'), {'len': len(stale_zones),
'sec': self.max_prop_time})
error_zones.extend(stale_zones)
return error_zones
def __call__(self):
zones = self._get_zones()
for zone in zones:
if zone.action == 'CREATE':
self.worker_api.create_zone(self.context, zone)
elif zone.action == 'UPDATE':
self.worker_api.update_zone(self.context, zone)
elif zone.action == 'DELETE':
self.worker_api.delete_zone(self.context, zone)
##############
# Zone Exports
##############
class ExportZone(base.Task):
"""
Given a zone, determine the proper method, based on size, and
perform the necessary actions to Export the zone, and update the
export row in storage via central.
"""
def __init__(self, executor, context, zone, export):
super(ExportZone, self).__init__(executor)
self.context = context
self.zone = zone
self.export = export
def _synchronous_export(self):
return CONF['service:worker'].export_synchronous
def _determine_export_method(self, context, export, size):
# NOTE(timsim):
# The logic here with swift will work like this:
# cfg.CONF.export_swift_enabled:
# An export will land in their swift container, even if it's
# small, but the link that comes back will be the synchronous
# link (unless export_syncronous is False, in which case it
# will behave like the next option)
# cfg.CONF.export_swift_preffered:
# The link that the user gets back will always be the swift
# container, and status of the export resource will depend
# on the Swift process.
# If the export is too large for synchronous, or synchronous is not
# enabled and swift is not enabled, it will fall through to ERROR
# swift = False
synchronous = self._synchronous_export()
if synchronous:
try:
self.quota.limit_check(
context, context.tenant, api_export_size=size)
except exceptions.OverQuota:
LOG.debug('Zone Export too large to perform synchronously')
export.status = 'ERROR'
export.message = 'Zone is too large to export'
return export
export.location = \
'designate://v2/zones/tasks/exports/%(eid)s/export' % \
{'eid': export.id}
export.status = 'COMPLETE'
else:
LOG.debug('No method found to export zone')
export.status = 'ERROR'
export.message = 'No suitable method for export'
return export
def __call__(self):
criterion = {'zone_id': self.zone.id}
count = self.storage.count_recordsets(self.context, criterion)
export = self._determine_export_method(
self.context, self.export, count)
self.central_api.update_zone_export(self.context, export)

82
designate/worker/utils.py Normal file
View File

@ -0,0 +1,82 @@
# Copyright 2016 Rackspace Inc.
#
# Author: Tim Simmons <tim.simmons@rackspace>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.mport threading
import dns
import dns.exception
import dns.query
from oslo_config import cfg
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
def prepare_msg(zone_name, rdatatype=dns.rdatatype.SOA, notify=False):
"""
Do the needful to set up a dns packet with dnspython
"""
dns_message = dns.message.make_query(zone_name, rdatatype)
if notify:
dns_message.set_opcode(dns.opcode.NOTIFY)
else:
dns_message.set_opcode(dns.opcode.QUERY)
return dns_message
def dig(zone_name, host, rdatatype, port=53):
"""
Set up and send a regular dns query, datatype configurable
"""
query = prepare_msg(zone_name, rdatatype=rdatatype)
return send_dns_msg(query, host, port=port)
def notify(zone_name, host, port=53):
"""
Set up a notify packet and send it
"""
msg = prepare_msg(zone_name, notify=True)
return send_dns_msg(msg, host, port=port)
def send_dns_msg(dns_message, host, port=53):
"""
Send the dns message and return the response
:return: dns.Message of the response to the dns query
"""
# This can raise some exceptions, but we'll catch them elsewhere
if not CONF['service:mdns'].all_tcp:
return dns.query.udp(
dns_message, host, port=port, timeout=10)
else:
return dns.query.tcp(
dns_message, host, port=port, timeout=10)
def get_serial(zone_name, host, port=53):
"""
Possibly raises dns.exception.Timeout or dns.query.BadResponse.
Possibly returns 0 if, e.g., the answer section is empty.
"""
resp = dig(zone_name, host, dns.rdatatype.SOA, port=port)
if not resp.answer:
return 0
rdataset = resp.answer[0].to_rdataset()
if not rdataset:
return 0
return rdataset[0].serial

View File

@ -1,37 +0,0 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hpe.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
CONF = cfg.CONF
CONF.register_group(cfg.OptGroup(
name='service:zone_manager', title="Configuration for Zone Manager Service"
))
OPTS = [
cfg.IntOpt('workers',
help='Number of Zone Manager worker processes to spawn'),
cfg.IntOpt('threads', default=1000,
help='Number of Zone Manager greenthreads to spawn'),
cfg.ListOpt('enabled_tasks',
help='Enabled tasks to run'),
cfg.StrOpt('storage-driver', default='sqlalchemy',
help='The storage driver to use'),
cfg.BoolOpt('export-synchronous', default=True,
help='Whether to allow synchronous zone exports'),
]
CONF.register_opts(OPTS, group='service:zone_manager')

View File

@ -1,154 +0,0 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hpe.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from designate.i18n import _LI
from designate import coordination
from designate import exceptions
from designate import quota
from designate import service
from designate import storage
from designate import utils
from designate.central import rpcapi
from designate.zone_manager import tasks
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
NS = 'designate.periodic_tasks'
class Service(service.RPCService, coordination.CoordinationMixin,
service.Service):
RPC_API_VERSION = '1.0'
target = messaging.Target(version=RPC_API_VERSION)
@property
def storage(self):
if not hasattr(self, '_storage'):
storage_driver = cfg.CONF['service:zone_manager'].storage_driver
self._storage = storage.get_storage(storage_driver)
return self._storage
@property
def quota(self):
if not hasattr(self, '_quota'):
# Get a quota manager instance
self._quota = quota.get_quota()
return self._quota
@property
def service_name(self):
return 'zone_manager'
@property
def central_api(self):
return rpcapi.CentralAPI.get_instance()
def start(self):
super(Service, self).start()
self._partitioner = coordination.Partitioner(
self._coordinator, self.service_name, self._coordination_id,
range(0, 4095))
self._partitioner.start()
self._partitioner.watch_partition_change(self._rebalance)
enabled = CONF['service:zone_manager'].enabled_tasks
for task in tasks.PeriodicTask.get_extensions(enabled):
LOG.debug("Registering task %s", task)
# Instantiate the task
task = task()
# Subscribe for partition size updates.
self._partitioner.watch_partition_change(task.on_partition_change)
interval = CONF[task.get_canonical_name()].interval
self.tg.add_timer(interval, task)
def _rebalance(self, my_partitions, members, event):
LOG.info(_LI("Received rebalance event %s"), event)
self.partition_range = my_partitions
# Begin RPC Implementation
# Zone Export
def start_zone_export(self, context, zone, export):
criterion = {'zone_id': zone.id}
count = self.storage.count_recordsets(context, criterion)
export = self._determine_export_method(context, export, count)
self.central_api.update_zone_export(context, export)
def render_zone(self, context, zone_id):
return self._export_zone(context, zone_id)
def _determine_export_method(self, context, export, size):
synchronous = CONF['service:zone_manager'].export_synchronous
# NOTE(timsim):
# The logic here with swift will work like this:
# cfg.CONF.export_swift_enabled:
# An export will land in their swift container, even if it's
# small, but the link that comes back will be the synchronous
# link (unless export_syncronous is False, in which case it
# will behave like the next option)
# cfg.CONF.export_swift_preffered:
# The link that the user gets back will always be the swift
# container, and status of the export resource will depend
# on the Swift process.
# If the export is too large for synchronous, or synchronous is not
# enabled and swift is not enabled, it will fall through to ERROR
# swift = False
if synchronous:
try:
self.quota.limit_check(
context, context.tenant, api_export_size=size)
except exceptions.OverQuota:
LOG.debug('Zone Export too large to perform synchronously')
export['status'] = 'ERROR'
export['message'] = 'Zone is too large to export'
return export
export['location'] = \
"designate://v2/zones/tasks/exports/%(eid)s/export" % \
{'eid': export['id']}
export['status'] = 'COMPLETE'
else:
LOG.debug('No method found to export zone')
export['status'] = 'ERROR'
export['message'] = 'No suitable method for export'
return export
def _export_zone(self, context, zone_id):
zone = self.central_api.get_zone(context, zone_id)
criterion = {'zone_id': zone_id}
recordsets = self.storage.find_recordsets_export(context, criterion)
return utils.render_template('export-zone.jinja2',
zone=zone,
recordsets=recordsets)

View File

@ -96,6 +96,14 @@ function configure_designate {
# mDNS Configuration
iniset $DESIGNATE_CONF service:mdns listen ${DESIGNATE_SERVICE_HOST}:${DESIGNATE_SERVICE_PORT_MDNS}
# Worker Configuration
if is_service_enabled designate-worker; then
iniset $DESIGNATE_CONF service:worker enabled True
iniset $DESIGNATE_CONF service:worker notify True
iniset $DESIGNATE_CONF service:worker poll_max_retries $DESIGNATE_POLL_RETRIES
iniset $DESIGNATE_CONF service:worker poll_retry_interval $DESIGNATE_POLL_INTERVAL
fi
# Set up Notifications/Ceilometer Integration
iniset $DESIGNATE_CONF DEFAULT notification_driver "$DESIGNATE_NOTIFICATION_DRIVER"
iniset $DESIGNATE_CONF DEFAULT notification_topics "$DESIGNATE_NOTIFICATION_TOPICS"
@ -300,6 +308,9 @@ function start_designate {
run_process designate-mdns "$DESIGNATE_BIN_DIR/designate-mdns --config-file $DESIGNATE_CONF"
run_process designate-agent "$DESIGNATE_BIN_DIR/designate-agent --config-file $DESIGNATE_CONF"
run_process designate-sink "$DESIGNATE_BIN_DIR/designate-sink --config-file $DESIGNATE_CONF"
run_process designate-worker "$DESIGNATE_BIN_DIR/designate-worker --config-file $DESIGNATE_CONF"
run_process designate-producer "$DESIGNATE_BIN_DIR/designate-producer --config-file $DESIGNATE_CONF"
# Start proxies if enabled
if is_service_enabled designate-api && is_service_enabled tls-proxy; then
@ -321,6 +332,8 @@ function stop_designate {
stop_process designate-mdns
stop_process designate-agent
stop_process designate-sink
stop_process designate-worker
stop_process designate-producer
stop_designate_backend
}

View File

@ -9,6 +9,8 @@ DESIGNATE_NOTIFICATION_TOPICS=${DESIGNATE_NOTIFICATION_TOPICS:-notifications}
DESIGNATE_PERIODIC_RECOVERY_INTERVAL=${DESIGNATE_PERIODIC_RECOVERY_INTERVAL:-120}
DESIGNATE_PERIODIC_SYNC_INTERVAL=${DESIGNATE_PERIODIC_SYNC_INTERVAL:-1800}
DESIGNATE_COORDINATION_URL=${DESIGNATE_COORDINATION_URL:-}
DESIGNATE_POLL_INTERVAL=${DESIGNATE_POLL_INTERVAL:-5}
DESIGNATE_POLL_RETRIES=${DESIGNATE_POLL_RETRIES:-6}
# Quota Options
DESIGNATE_QUOTA_ZONES=${DESIGNATE_QUOTA_ZONES:-100}

View File

@ -275,7 +275,7 @@ debug = False
#-----------------------
# Zone Manager Service
#-----------------------
[service:zone_manager]
[service:producer]
# Number of Zone Manager worker processes to spawn
#workers = None
@ -292,7 +292,7 @@ debug = False
#------------------------
# Deleted domains purging
#------------------------
[zone_manager_task:domain_purge]
[producer_task:domain_purge]
# How frequently to purge deleted domains, in seconds
#interval = 3600 # 1h
@ -305,12 +305,16 @@ debug = False
#------------------------
# Delayed zones NOTIFY
#------------------------
[zone_manager_task:delayed_notify]
[producer_task:delayed_notify]
# How frequently to scan for zones pending NOTIFY, in seconds
#interval = 5
# How many zones to receive NOTIFY on each run
#batch_size = 100
#------------------------
# Worker Periodic Recovery
#------------------------
[producer_task:worker_periodic_recovery]
# How frequently to scan for zones pending NOTIFY, in seconds
#interval = 120
#-----------------------
# Pool Manager Service
@ -365,6 +369,41 @@ debug = False
# The cache driver to use
#cache_driver = memcache
#-----------------------
# Worker Service
#-----------------------
[service:worker]
# Whether to send events to worker instead of Pool Manager
# enabled = False
# Number of Worker processes to spawn
#workers = None
# Number of Worker greenthreads to spawn
#threads = 1000
# The percentage of servers requiring a successful update for a zone change
# to be considered active
#threshold_percentage = 100
# The time to wait for a response from a server
#poll_timeout = 30
# The time between retrying to send a request and waiting for a response from a
# server
#poll_retry_interval = 15
# The maximum number of times to retry sending a request and wait for a
# response from a server
#poll_max_retries = 10
# The time to wait before sending the first request to a server
#poll_delay = 5
# Whether to allow worker to send NOTIFYs. NOTIFY requests to mdns will noop
# notify = False
###################################
## Pool Manager Cache Configuration
###################################

View File

@ -47,6 +47,8 @@ console_scripts =
designate-zone-manager = designate.cmd.zone_manager:main
designate-sink = designate.cmd.sink:main
designate-agent = designate.cmd.agent:main
designate-worker = designate.cmd.worker:main
designate-producer = designate.cmd.producer:main
designate.api.v1 =
domains = designate.api.v1.domains:blueprint
@ -122,11 +124,12 @@ designate.manage =
powerdns = designate.manage.powerdns:DatabaseCommands
tlds = designate.manage.tlds:TLDCommands
designate.zone_manager_tasks =
zone_purge = designate.zone_manager.tasks:DeletedZonePurgeTask
periodic_exists = designate.zone_manager.tasks:PeriodicExistsTask
periodic_secondary_refresh = designate.zone_manager.tasks:PeriodicSecondaryRefreshTask
delayed_notify = designate.zone_manager.tasks:PeriodicGenerateDelayedNotifyTask
designate.producer_tasks =
zone_purge = designate.producer.tasks:DeletedZonePurgeTask
periodic_exists = designate.producer.tasks:PeriodicExistsTask
periodic_secondary_refresh = designate.producer.tasks:PeriodicSecondaryRefreshTask
delayed_notify = designate.producer.tasks:PeriodicGenerateDelayedNotifyTask
worker_periodic_recovery = designate.producer.tasks:WorkerPeriodicRecovery
designate.heartbeat_emitter =
noop = designate.service_status:NoopEmitter

View File

@ -18,5 +18,5 @@ designate.tests.unit.test_api.test_api_v2
designate.tests.unit.test_backend.test_designate
designate.tests.unit.test_central.test_basic
designate.tests.unit.test_pool
designate.tests.unit.test_zone_manager.test_tasks
designate.tests.unit.test_producer.test_tasks