Merge "Add an on-demand single-target sync method"
This commit is contained in:
commit
a05ccf3525
54
designate/api/admin/controllers/extensions/target_sync.py
Normal file
54
designate/api/admin/controllers/extensions/target_sync.py
Normal file
@ -0,0 +1,54 @@
|
||||
# COPYRIGHT 2015 Rackspace Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import pecan
|
||||
from oslo_log import log as logging
|
||||
from oslo_config import cfg
|
||||
|
||||
from designate.api.v2.controllers import rest
|
||||
from designate import exceptions
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class TargetSyncController(rest.RestController):
|
||||
|
||||
@staticmethod
|
||||
def get_path():
|
||||
return '.target_sync'
|
||||
|
||||
@pecan.expose(template='json:', content_type='application/json')
|
||||
def post_all(self):
|
||||
"""Initialize a Target Syncing"""
|
||||
request = pecan.request
|
||||
context = request.environ['context']
|
||||
|
||||
body = request.body_dict
|
||||
|
||||
fields = ['target_id', 'timestamp']
|
||||
for f in fields:
|
||||
if f not in body:
|
||||
raise exceptions.BadRequest('Failed to supply correct fields')
|
||||
|
||||
if (not isinstance(body['timestamp'], int) or body['timestamp'] < 0):
|
||||
raise exceptions.BadRequest(
|
||||
'Timestamp should be a positive integer')
|
||||
|
||||
pool_id = CONF['service:pool_manager'].pool_id
|
||||
|
||||
return {
|
||||
'message': self.pool_mgr_api.target_sync(context, pool_id,
|
||||
body['target_id'], body['timestamp'])
|
||||
}
|
@ -16,6 +16,7 @@ from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from stevedore import named
|
||||
|
||||
from designate.i18n import _LI
|
||||
from designate.api.v2.controllers import errors
|
||||
|
||||
|
||||
@ -38,6 +39,7 @@ class RootController(object):
|
||||
for ext in self._mgr:
|
||||
controller = self
|
||||
path = ext.obj.get_path()
|
||||
LOG.info(_LI("Registering an API extension at path %s"), path)
|
||||
for p in path.split('.')[:-1]:
|
||||
if p != '':
|
||||
controller = getattr(controller, p)
|
||||
|
@ -33,6 +33,7 @@ from oslo_log import log as logging
|
||||
|
||||
from designate import exceptions
|
||||
from designate.central import rpcapi as central_rpcapi
|
||||
from designate.pool_manager import rpcapi as pool_mgr_rpcapi
|
||||
from designate.zone_manager import rpcapi as zone_manager_rpcapi
|
||||
from designate.i18n import _
|
||||
|
||||
@ -55,6 +56,10 @@ class RestController(pecan.rest.RestController):
|
||||
def central_api(self):
|
||||
return central_rpcapi.CentralAPI.get_instance()
|
||||
|
||||
@property
|
||||
def pool_mgr_api(self):
|
||||
return pool_mgr_rpcapi.PoolManagerAPI.get_instance()
|
||||
|
||||
@property
|
||||
def zone_manager_api(self):
|
||||
return zone_manager_rpcapi.ZoneManagerAPI.get_instance()
|
||||
|
@ -36,15 +36,16 @@ class PoolManagerAPI(object):
|
||||
|
||||
1.0 - Initial version
|
||||
2.0 - Rename domains to zones
|
||||
2.1 - Add target_sync
|
||||
"""
|
||||
RPC_API_VERSION = '2.0'
|
||||
RPC_API_VERSION = '2.1'
|
||||
|
||||
def __init__(self, topic=None):
|
||||
self.topic = topic if topic else cfg.CONF.pool_manager_topic
|
||||
|
||||
target = messaging.Target(topic=self.topic,
|
||||
version=self.RPC_API_VERSION)
|
||||
self.client = rpc.get_client(target, version_cap='2.0')
|
||||
self.client = rpc.get_client(target, version_cap='2.1')
|
||||
|
||||
@classmethod
|
||||
def get_instance(cls):
|
||||
@ -60,6 +61,18 @@ class PoolManagerAPI(object):
|
||||
MNGR_API = cls()
|
||||
return MNGR_API
|
||||
|
||||
def target_sync(self, context, pool_id, target_id, timestamp):
|
||||
LOG.info(_LI("target_sync: Syncing target %(target) since "
|
||||
"%(timestamp)d."),
|
||||
{'target': target_id, 'timestamp': timestamp})
|
||||
|
||||
# Modifying the topic so it is pool manager instance specific.
|
||||
topic = '%s.%s' % (self.topic, pool_id)
|
||||
cctxt = self.client.prepare(topic=topic)
|
||||
return cctxt.call(
|
||||
context, 'target_sync', pool_id=pool_id, target_id=target_id,
|
||||
timestamp=timestamp)
|
||||
|
||||
def create_zone(self, context, zone):
|
||||
LOG.info(_LI("create_zone: Calling pool manager for %(zone)s, "
|
||||
"serial:%(serial)s"),
|
||||
|
@ -16,6 +16,7 @@
|
||||
from contextlib import contextmanager
|
||||
from decimal import Decimal
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
from oslo_config import cfg
|
||||
import oslo_messaging as messaging
|
||||
@ -86,8 +87,10 @@ class Service(service.RPCService, coordination.CoordinationMixin,
|
||||
API version history:
|
||||
|
||||
1.0 - Initial version
|
||||
2.0 - The Big Rename
|
||||
2.1 - Add target_sync
|
||||
"""
|
||||
RPC_API_VERSION = '2.0'
|
||||
RPC_API_VERSION = '2.1'
|
||||
|
||||
target = messaging.Target(version=RPC_API_VERSION)
|
||||
|
||||
@ -276,6 +279,79 @@ class Service(service.RPCService, coordination.CoordinationMixin,
|
||||
self.central_api.update_status(context, zone.id, ERROR_STATUS,
|
||||
zone.serial)
|
||||
|
||||
def target_sync(self, context, pool_id, target_id, timestamp):
|
||||
"""
|
||||
Replay all the events that we can since a certain timestamp
|
||||
"""
|
||||
context = self._get_admin_context_all_tenants()
|
||||
context.show_deleted = True
|
||||
|
||||
target = None
|
||||
for tar in self.pool.targets:
|
||||
if tar.id == target_id:
|
||||
target = tar
|
||||
if target is None:
|
||||
raise exceptions.BadRequest('Please supply a valid target id.')
|
||||
|
||||
LOG.info(_LI('Starting Target Sync'))
|
||||
|
||||
criterion = {
|
||||
'pool_id': pool_id,
|
||||
'updated_at': '>%s' % datetime.fromtimestamp(timestamp).
|
||||
isoformat(),
|
||||
}
|
||||
|
||||
zones = self.central_api.find_zones(context, criterion=criterion,
|
||||
sort_key='updated_at', sort_dir='asc')
|
||||
|
||||
self.tg.add_thread(self._target_sync,
|
||||
context, zones, target, timestamp)
|
||||
|
||||
return 'Syncing %(len)s zones on %(target)s' % {'len': len(zones),
|
||||
'target': target_id}
|
||||
|
||||
def _target_sync(self, context, zones, target, timestamp):
|
||||
zone_ops = []
|
||||
timestamp_dt = datetime.fromtimestamp(timestamp)
|
||||
|
||||
for zone in zones:
|
||||
if zone.status == 'DELETED':
|
||||
# Remove any other ops for this zone
|
||||
for zone_op in zone_ops:
|
||||
if zone.name == zone_op[0].name:
|
||||
zone_ops.remove(zone_op)
|
||||
# If the zone was created before the timestamp delete it,
|
||||
# otherwise, it will just never be created
|
||||
if (datetime.strptime(zone.created_at, "%Y-%m-%dT%H:%M:%S.%f")
|
||||
<= timestamp_dt):
|
||||
zone_ops.append((zone, 'DELETE'))
|
||||
elif (datetime.strptime(zone.created_at, "%Y-%m-%dT%H:%M:%S.%f") >
|
||||
timestamp_dt):
|
||||
# If the zone was created after the timestamp
|
||||
for zone_op in zone_ops:
|
||||
if (
|
||||
zone.name == zone_op[0].name and
|
||||
zone_op[1] == 'DELETE'
|
||||
):
|
||||
zone_ops.remove(zone_op)
|
||||
|
||||
zone_ops.append((zone, 'CREATE'))
|
||||
else:
|
||||
zone_ops.append((zone, 'UPDATE'))
|
||||
|
||||
for zone, action in zone_ops:
|
||||
if action == 'CREATE':
|
||||
self._create_zone_on_target(context, target, zone)
|
||||
elif action == 'UPDATE':
|
||||
self._update_zone_on_target(context, target, zone)
|
||||
elif action == 'DELETE':
|
||||
self._delete_zone_on_target(context, target, zone)
|
||||
zone.serial = 0
|
||||
for nameserver in self.pool.nameservers:
|
||||
self.mdns_api.poll_for_serial_number(
|
||||
context, zone, nameserver, self.timeout,
|
||||
self.retry_interval, self.max_retries, self.delay)
|
||||
|
||||
# Standard Create/Update/Delete Methods
|
||||
|
||||
def create_zone(self, context, zone):
|
||||
|
@ -134,7 +134,7 @@ debug = False
|
||||
#enable_api_admin = False
|
||||
|
||||
# Enabled Admin API extensions
|
||||
# Can be one or more of : reports, quotas, counts, tenants, zones
|
||||
# Can be one or more of : reports, quotas, counts, tenants, target_sync
|
||||
# zone export is in zones extension
|
||||
#enabled_extensions_admin =
|
||||
|
||||
|
@ -64,6 +64,7 @@ designate.api.admin.extensions =
|
||||
reports = designate.api.admin.controllers.extensions.reports:ReportsController
|
||||
quotas = designate.api.admin.controllers.extensions.quotas:QuotasController
|
||||
zones = designate.api.admin.controllers.extensions.zones:ZonesController
|
||||
target_sync = designate.api.admin.controllers.extensions.target_sync:TargetSyncController
|
||||
|
||||
designate.storage =
|
||||
sqlalchemy = designate.storage.impl_sqlalchemy:SQLAlchemyStorage
|
||||
|
Loading…
Reference in New Issue
Block a user