applied post-review changes
This commit is contained in:
@@ -19,7 +19,7 @@ from swift_utils import (
|
||||
setup_ipv6,
|
||||
update_rings,
|
||||
balance_rings,
|
||||
builders_synced,
|
||||
fully_synced,
|
||||
sync_proxy_rings,
|
||||
update_min_part_hours,
|
||||
broadcast_rings_available,
|
||||
@@ -331,8 +331,8 @@ def cluster_non_leader_actions():
|
||||
rq_key = SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC
|
||||
token = settings.get(rq_key, None)
|
||||
if token:
|
||||
log("Peer request to stop proxy service received (%s)" % (token),
|
||||
level=INFO)
|
||||
log("Peer request to stop proxy service received (%s) - sending ack" %
|
||||
(token), level=INFO)
|
||||
service_stop('swift-proxy')
|
||||
peers_only = settings.get('peers-only', None)
|
||||
rq = SwiftProxyClusterRPC().stop_proxy_ack(echo_token=token,
|
||||
@@ -347,22 +347,24 @@ def cluster_non_leader_actions():
|
||||
log("No update available", level=DEBUG)
|
||||
return
|
||||
|
||||
builders_only = int(settings.get('sync-only-builders', 0))
|
||||
path = os.path.basename(get_www_dir())
|
||||
try:
|
||||
sync_proxy_rings('http://%s/%s' % (broker, path))
|
||||
sync_proxy_rings('http://%s/%s' % (broker, path),
|
||||
rings=not builders_only)
|
||||
except subprocess.CalledProcessError:
|
||||
log("Ring builder sync failed, builders not yet available - "
|
||||
"leader not ready?", level=WARNING)
|
||||
return None
|
||||
|
||||
# Re-enable the proxy once all builders are synced
|
||||
if builders_synced():
|
||||
# Re-enable the proxy once all builders and rings are synced
|
||||
if fully_synced():
|
||||
log("Ring builders synced - starting proxy", level=INFO)
|
||||
CONFIGS.write_all()
|
||||
service_start('swift-proxy')
|
||||
else:
|
||||
log("Not all builders synced yet - waiting for peer sync before "
|
||||
"starting proxy", level=INFO)
|
||||
log("Not all builders and rings synced yet - waiting for peer sync "
|
||||
"before starting proxy", level=INFO)
|
||||
|
||||
|
||||
@hooks.hook('cluster-relation-changed',
|
||||
|
||||
@@ -174,10 +174,11 @@ class SwiftProxyClusterRPC(object):
|
||||
'builder-broker': None,
|
||||
self.KEY_STOP_PROXY_SVC: None,
|
||||
self.KEY_STOP_PROXY_SVC_ACK: None,
|
||||
'peers-only': None}}
|
||||
'peers-only': None,
|
||||
'sync-only-builders': None}}
|
||||
return copy.deepcopy(templates[self._version])
|
||||
|
||||
def stop_proxy_request(self, peers_only=None):
|
||||
def stop_proxy_request(self, peers_only=False):
|
||||
"""Request to stop peer proxy service.
|
||||
|
||||
NOTE: leader action
|
||||
@@ -185,7 +186,9 @@ class SwiftProxyClusterRPC(object):
|
||||
rq = self.template()
|
||||
rq['trigger'] = str(uuid.uuid4())
|
||||
rq[self.KEY_STOP_PROXY_SVC] = rq['trigger']
|
||||
rq['peers-only'] = peers_only
|
||||
if peers_only:
|
||||
rq['peers-only'] = 1
|
||||
|
||||
return rq
|
||||
|
||||
def stop_proxy_ack(self, echo_token, echo_peers_only):
|
||||
@@ -200,7 +203,8 @@ class SwiftProxyClusterRPC(object):
|
||||
rq['peers-only'] = echo_peers_only
|
||||
return rq
|
||||
|
||||
def sync_rings_request(self, broker_host, use_trigger=True):
|
||||
def sync_rings_request(self, broker_host, use_trigger=True,
|
||||
builders_only=False):
|
||||
"""Request for peer to sync rings.
|
||||
|
||||
NOTE: leader action
|
||||
@@ -212,6 +216,9 @@ class SwiftProxyClusterRPC(object):
|
||||
if use_trigger:
|
||||
rq['trigger'] = str(uuid.uuid4())
|
||||
|
||||
if builders_only:
|
||||
rq['sync-only-builders'] = 1
|
||||
|
||||
rq['builder-broker'] = broker_host
|
||||
return rq
|
||||
|
||||
@@ -564,7 +571,7 @@ def setup_ipv6():
|
||||
apt_install('haproxy/trusty-backports', fatal=True)
|
||||
|
||||
|
||||
def sync_proxy_rings(broker_url):
|
||||
def sync_proxy_rings(broker_url, builders=True, rings=True):
|
||||
"""The leader proxy is responsible for intialising, updating and
|
||||
rebalancing the ring. Once the leader is ready the rings must then be
|
||||
synced into each other proxy unit.
|
||||
@@ -578,21 +585,23 @@ def sync_proxy_rings(broker_url):
|
||||
synced = []
|
||||
tmpdir = tempfile.mkdtemp(prefix='swiftrings')
|
||||
for server in ['account', 'object', 'container']:
|
||||
url = '%s/%s.builder' % (broker_url, server)
|
||||
log('Fetching %s.' % url, level=DEBUG)
|
||||
builder = "%s.builder" % (server)
|
||||
cmd = ['wget', url, '--retry-connrefused', '-t', '10', '-O',
|
||||
os.path.join(tmpdir, builder)]
|
||||
subprocess.check_call(cmd)
|
||||
synced.append(builder)
|
||||
if builders:
|
||||
url = '%s/%s.builder' % (broker_url, server)
|
||||
log('Fetching %s.' % url, level=DEBUG)
|
||||
builder = "%s.builder" % (server)
|
||||
cmd = ['wget', url, '--retry-connrefused', '-t', '10', '-O',
|
||||
os.path.join(tmpdir, builder)]
|
||||
subprocess.check_call(cmd)
|
||||
synced.append(builder)
|
||||
|
||||
url = '%s/%s.%s' % (broker_url, server, SWIFT_RING_EXT)
|
||||
log('Fetching %s.' % url, level=DEBUG)
|
||||
ring = '%s.%s' % (server, SWIFT_RING_EXT)
|
||||
cmd = ['wget', url, '--retry-connrefused', '-t', '10', '-O',
|
||||
os.path.join(tmpdir, ring)]
|
||||
subprocess.check_call(cmd)
|
||||
synced.append(ring)
|
||||
if rings:
|
||||
url = '%s/%s.%s' % (broker_url, server, SWIFT_RING_EXT)
|
||||
log('Fetching %s.' % url, level=DEBUG)
|
||||
ring = '%s.%s' % (server, SWIFT_RING_EXT)
|
||||
cmd = ['wget', url, '--retry-connrefused', '-t', '10', '-O',
|
||||
os.path.join(tmpdir, ring)]
|
||||
subprocess.check_call(cmd)
|
||||
synced.append(ring)
|
||||
|
||||
# Once all have been successfully downloaded, move them to actual location.
|
||||
for f in synced:
|
||||
@@ -609,22 +618,27 @@ def ensure_www_dir_permissions(www_dir):
|
||||
os.chown(www_dir, uid, gid)
|
||||
|
||||
|
||||
def update_www_rings():
|
||||
def update_www_rings(rings=True, builders=True):
|
||||
"""Copy rings to apache www dir.
|
||||
|
||||
Try to do this as atomically as possible to avoid races with storage nodes
|
||||
syncing rings.
|
||||
"""
|
||||
if not (rings or builders):
|
||||
return
|
||||
|
||||
tmp_dir = tempfile.mkdtemp(prefix='swift-rings-www-tmp')
|
||||
for ring, builder_path in SWIFT_RINGS.iteritems():
|
||||
ringfile = '%s.%s' % (ring, SWIFT_RING_EXT)
|
||||
src = os.path.join(SWIFT_CONF_DIR, ringfile)
|
||||
dst = os.path.join(tmp_dir, ringfile)
|
||||
shutil.copyfile(src, dst)
|
||||
if rings:
|
||||
ringfile = '%s.%s' % (ring, SWIFT_RING_EXT)
|
||||
src = os.path.join(SWIFT_CONF_DIR, ringfile)
|
||||
dst = os.path.join(tmp_dir, ringfile)
|
||||
shutil.copyfile(src, dst)
|
||||
|
||||
src = builder_path
|
||||
dst = os.path.join(tmp_dir, os.path.basename(builder_path))
|
||||
shutil.copyfile(src, dst)
|
||||
if builders:
|
||||
src = builder_path
|
||||
dst = os.path.join(tmp_dir, os.path.basename(builder_path))
|
||||
shutil.copyfile(src, dst)
|
||||
|
||||
www_dir = get_www_dir()
|
||||
deleted = "%s.deleted" % (www_dir)
|
||||
@@ -681,13 +695,17 @@ def sync_builders_and_rings_if_changed(f):
|
||||
rings_changed = rings_after != rings_before
|
||||
builders_changed = builders_after != builders_before
|
||||
if rings_changed or builders_changed:
|
||||
path = os.path.join(SWIFT_CONF_DIR, '*.%s' % (SWIFT_RING_EXT))
|
||||
if len(glob.glob(path)) == len(SWIFT_RINGS):
|
||||
# Copy to www dir
|
||||
rings_path = os.path.join(SWIFT_CONF_DIR, '*.%s' %
|
||||
(SWIFT_RING_EXT))
|
||||
if len(glob.glob(rings_path)) == len(SWIFT_RINGS):
|
||||
# Copy all to www dir
|
||||
update_www_rings()
|
||||
# Trigger sync
|
||||
cluster_sync_rings(peers_only=not rings_changed)
|
||||
else:
|
||||
# Copy just builders to www dir
|
||||
update_www_rings(rings=False)
|
||||
cluster_sync_rings(peers_only=True)
|
||||
log("Rings not ready for sync - skipping", level=DEBUG)
|
||||
else:
|
||||
log("Rings/builders unchanged so skipping sync", level=DEBUG)
|
||||
@@ -749,7 +767,7 @@ def mark_www_rings_deleted():
|
||||
os.rename(path, "%s.deleted" % (path))
|
||||
|
||||
|
||||
def notify_peers_builders_available(use_trigger=True):
|
||||
def notify_peers_builders_available(use_trigger=True, builders_only=True):
|
||||
"""Notify peer swift-proxy units that they should synchronise ring and
|
||||
builder files.
|
||||
|
||||
@@ -769,7 +787,8 @@ def notify_peers_builders_available(use_trigger=True):
|
||||
# Notify peers that builders are available
|
||||
log("Notifying peer(s) that rings are ready for sync.", level=INFO)
|
||||
rq = SwiftProxyClusterRPC().sync_rings_request(hostname,
|
||||
use_trigger=use_trigger)
|
||||
use_trigger=use_trigger,
|
||||
builders_only=builders_only)
|
||||
for rid in relation_ids('cluster'):
|
||||
log("Notifying rid=%s" % (rid), level=DEBUG)
|
||||
relation_set(relation_id=rid, relation_settings=rq)
|
||||
@@ -821,11 +840,6 @@ def cluster_sync_rings(peers_only=False):
|
||||
rel_ids = relation_ids('cluster')
|
||||
trigger = str(uuid.uuid4())
|
||||
|
||||
if peers_only:
|
||||
peers_only = 1
|
||||
else:
|
||||
peers_only = 0
|
||||
|
||||
log("Sending request to stop proxy service to all peers (%s)" % (trigger),
|
||||
level=INFO)
|
||||
rq = SwiftProxyClusterRPC().stop_proxy_request(peers_only)
|
||||
@@ -860,14 +874,19 @@ def notify_storage_rings_available():
|
||||
rings_url=rings_url, trigger=trigger)
|
||||
|
||||
|
||||
def builders_synced():
|
||||
"""Check that we have all the ring builders synced from the leader.
|
||||
def fully_synced():
|
||||
"""Check that we have all the rings and builders synced from the leader.
|
||||
|
||||
Returns True if we have all ring builders.
|
||||
"""
|
||||
for ring in SWIFT_RINGS.itervalues():
|
||||
if not os.path.exists(ring):
|
||||
log("Builder not yet synced - %s" % (ring), level=DEBUG)
|
||||
for ring, builder in SWIFT_RINGS.iteritems():
|
||||
ringfile = '%s.%s' % (ring, SWIFT_RING_EXT)
|
||||
if not os.path.exists(builder):
|
||||
log("Builder not yet synced - %s" % (builder), level=DEBUG)
|
||||
return False
|
||||
|
||||
if not os.path.exists(ringfile):
|
||||
log("Ring not yet synced - %s" % (ringfile), level=DEBUG)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
@@ -123,14 +123,16 @@ class SwiftUtilsTestCase(unittest.TestCase):
|
||||
'builder-broker': None,
|
||||
'peers-only': True,
|
||||
'stop-proxy-service': 'test-uuid',
|
||||
'stop-proxy-service-ack': None}, rq)
|
||||
'stop-proxy-service-ack': None,
|
||||
'sync-only-builders': None}, rq)
|
||||
|
||||
rq = rpc.stop_proxy_request()
|
||||
self.assertEqual({'trigger': 'test-uuid',
|
||||
'builder-broker': None,
|
||||
'peers-only': None,
|
||||
'stop-proxy-service': 'test-uuid',
|
||||
'stop-proxy-service-ack': None}, rq)
|
||||
'stop-proxy-service-ack': None,
|
||||
'sync-only-builders': None}, rq)
|
||||
|
||||
@mock.patch('swift_utils.uuid')
|
||||
def test_cluster_rpc_stop_proxy_ack(self, mock_uuid):
|
||||
@@ -141,7 +143,8 @@ class SwiftUtilsTestCase(unittest.TestCase):
|
||||
'builder-broker': None,
|
||||
'peers-only': '1',
|
||||
'stop-proxy-service': None,
|
||||
'stop-proxy-service-ack': 'token1'}, rq)
|
||||
'stop-proxy-service-ack': 'token1',
|
||||
'sync-only-builders': None}, rq)
|
||||
|
||||
@mock.patch('swift_utils.uuid')
|
||||
def test_cluster_rpc_sync_request(self, mock_uuid):
|
||||
@@ -152,7 +155,8 @@ class SwiftUtilsTestCase(unittest.TestCase):
|
||||
'builder-broker': 'HostA',
|
||||
'peers-only': None,
|
||||
'stop-proxy-service': None,
|
||||
'stop-proxy-service-ack': None}, rq)
|
||||
'stop-proxy-service-ack': None,
|
||||
'sync-only-builders': None}, rq)
|
||||
|
||||
def test_all_responses_equal(self):
|
||||
responses = [{'a': 1, 'c': 3}]
|
||||
|
||||
Reference in New Issue
Block a user