more
This commit is contained in:
@@ -301,6 +301,7 @@ def cluster_leader_actions():
|
||||
SwiftProxyClusterRPC().notify_leader_changed()
|
||||
return
|
||||
elif ack_key in settings:
|
||||
token = settings[ack_key]
|
||||
# Find out if all peer units have been stopped.
|
||||
responses = []
|
||||
for rid in relation_ids('cluster'):
|
||||
@@ -319,16 +320,10 @@ def cluster_leader_actions():
|
||||
default=0))
|
||||
log("Syncing rings and builders (peers-only=%s)" % (peers_only),
|
||||
level=DEBUG)
|
||||
broadcast_rings_available(storage=not peers_only)
|
||||
broadcast_rings_available(token, storage=not peers_only)
|
||||
else:
|
||||
log("Not all peer apis stopped - skipping sync until all peers "
|
||||
"ready (got %s)" % (responses), level=INFO)
|
||||
else:
|
||||
# Otherwise it might be a new swift-proxy unit so tell it to sync
|
||||
# rings. Note that broker info may already be present in the cluster
|
||||
# relation so don't use a trigger otherwise the hook will re-fire on
|
||||
# all peers.
|
||||
broadcast_rings_available(storage=False, use_trigger=False)
|
||||
|
||||
CONFIGS.write_all()
|
||||
|
||||
|
||||
@@ -41,6 +41,8 @@ from charmhelpers.core.hookenv import (
|
||||
unit_get,
|
||||
relation_set,
|
||||
relation_ids,
|
||||
remote_unit,
|
||||
local_unit,
|
||||
)
|
||||
from charmhelpers.fetch import (
|
||||
apt_update,
|
||||
@@ -207,18 +209,14 @@ class SwiftProxyClusterRPC(object):
|
||||
rq['peers-only'] = echo_peers_only
|
||||
return rq
|
||||
|
||||
def sync_rings_request(self, broker_host, use_trigger=True,
|
||||
def sync_rings_request(self, broker_host, broker_token,
|
||||
builders_only=False):
|
||||
"""Request for peer to sync rings.
|
||||
|
||||
NOTE: leader action
|
||||
"""
|
||||
rq = self.template()
|
||||
# There may be cases where we don't want to use the trigger e.g. when
|
||||
# we are re-issuing a request that may already have been received but
|
||||
# we don't want the receiver hook to re-fire.
|
||||
if use_trigger:
|
||||
rq['trigger'] = str(uuid.uuid4())
|
||||
rq['trigger'] = broker_token
|
||||
|
||||
if builders_only:
|
||||
rq['sync-only-builders'] = 1
|
||||
@@ -694,6 +692,49 @@ def get_builders_checksum():
|
||||
return sha.hexdigest()
|
||||
|
||||
|
||||
def rings_synced():
|
||||
r_unit = remote_unit()
|
||||
if not r_unit:
|
||||
return False
|
||||
|
||||
token_rid = None
|
||||
token = None
|
||||
ack_token = None
|
||||
for rid in relation_ids('cluster'):
|
||||
broker = relation_get(attribute='builder-broker', rid=rid,
|
||||
unit=r_unit)
|
||||
if broker:
|
||||
token_rid = rid
|
||||
token = relation_get(attribute='token', rid=rid, unit=r_unit)
|
||||
else:
|
||||
token = None
|
||||
|
||||
if token:
|
||||
key = SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC_ACK
|
||||
ack_token = relation_get(attribute=key, rid=token_rid,
|
||||
unit=local_unit())
|
||||
return token == ack_token
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def get_broker_token():
|
||||
r_unit = remote_unit()
|
||||
if not r_unit:
|
||||
log("No remote unit", level=DEBUG)
|
||||
return None
|
||||
|
||||
for rid in relation_ids('cluster'):
|
||||
responses = relation_get(unit=r_unit, rid=rid)
|
||||
|
||||
key = SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC_ACK
|
||||
if not all_responses_equal(responses, key):
|
||||
log("Not all acks equal", level=DEBUG)
|
||||
return None
|
||||
|
||||
return responses[0].get(key, None)
|
||||
|
||||
|
||||
def sync_builders_and_rings_if_changed(f):
|
||||
"""Only trigger a ring or builder sync if they have changed as a result of
|
||||
the decorated operation.
|
||||
@@ -715,20 +756,23 @@ def sync_builders_and_rings_if_changed(f):
|
||||
rings_ready = len(glob.glob(rings_path)) == len(SWIFT_RINGS)
|
||||
rings_changed = rings_after != rings_before
|
||||
builders_changed = builders_after != builders_before
|
||||
|
||||
# Copy builders and rings (if available) to the server dir.
|
||||
# Note that we may be a recently-elected leader so we need to ensure
|
||||
# rings are available.
|
||||
update_www_rings(rings=rings_ready)
|
||||
if rings_changed or builders_changed:
|
||||
broker_token = get_broker_token()
|
||||
if broker_token and (rings_changed or builders_changed):
|
||||
# Copy builders and rings (if available) to the server dir.
|
||||
update_www_rings(rings=rings_ready)
|
||||
if rings_ready:
|
||||
# Trigger sync
|
||||
cluster_sync_rings(peers_only=not rings_changed)
|
||||
cluster_sync_rings(broker_token, peers_only=not rings_changed)
|
||||
else:
|
||||
cluster_sync_rings(peers_only=True, builders_only=True)
|
||||
cluster_sync_rings(broker_token, peers_only=True,
|
||||
builders_only=True)
|
||||
log("Rings not ready for sync - skipping", level=DEBUG)
|
||||
else:
|
||||
log("Rings/builders unchanged so skipping sync", level=DEBUG)
|
||||
if rings_synced():
|
||||
# Note that we may be a recently-elected leader so we need to
|
||||
# ensure rings are available.
|
||||
update_www_rings(rings=rings_ready)
|
||||
|
||||
return ret
|
||||
|
||||
@@ -815,7 +859,7 @@ def mark_www_rings_deleted():
|
||||
os.rename(path, "%s.deleted" % (path))
|
||||
|
||||
|
||||
def notify_peers_builders_available(use_trigger=True, builders_only=False):
|
||||
def notify_peers_builders_available(broker_token, builders_only=False):
|
||||
"""Notify peer swift-proxy units that they should synchronise ring and
|
||||
builder files.
|
||||
|
||||
@@ -835,14 +879,14 @@ def notify_peers_builders_available(use_trigger=True, builders_only=False):
|
||||
# Notify peers that builders are available
|
||||
log("Notifying peer(s) that rings are ready for sync.", level=INFO)
|
||||
rq = SwiftProxyClusterRPC().sync_rings_request(hostname,
|
||||
use_trigger=use_trigger,
|
||||
broker_token,
|
||||
builders_only=builders_only)
|
||||
for rid in relation_ids('cluster'):
|
||||
log("Notifying rid=%s (%s)" % (rid, rq), level=DEBUG)
|
||||
relation_set(relation_id=rid, relation_settings=rq)
|
||||
|
||||
|
||||
def broadcast_rings_available(peers=True, storage=True, use_trigger=True,
|
||||
def broadcast_rings_available(broker_token, peers=True, storage=True,
|
||||
builders_only=False):
|
||||
"""Notify storage relations and cluster (peer) relations that rings and
|
||||
builders are availble for sync.
|
||||
@@ -857,13 +901,13 @@ def broadcast_rings_available(peers=True, storage=True, use_trigger=True,
|
||||
log("Skipping notify storage relations", level=DEBUG)
|
||||
|
||||
if peers:
|
||||
notify_peers_builders_available(use_trigger=use_trigger,
|
||||
notify_peers_builders_available(broker_token,
|
||||
builders_only=builders_only)
|
||||
else:
|
||||
log("Skipping notify peer relations", level=DEBUG)
|
||||
|
||||
|
||||
def cluster_sync_rings(peers_only=False, builders_only=False):
|
||||
def cluster_sync_rings(broker_token, peers_only=False, builders_only=False):
|
||||
"""Notify peer relations that they should stop their proxy services.
|
||||
|
||||
Peer units will then be expected to do a relation_set with
|
||||
@@ -884,7 +928,8 @@ def cluster_sync_rings(peers_only=False, builders_only=False):
|
||||
# relations. If we have been instructed to only broadcast to peers, do
|
||||
# nothing.
|
||||
if not peer_units():
|
||||
broadcast_rings_available(peers=False, storage=not peers_only,
|
||||
broadcast_rings_available(broker_token, peers=False,
|
||||
storage=not peers_only,
|
||||
builders_only=builders_only)
|
||||
return
|
||||
|
||||
|
||||
@@ -21,6 +21,8 @@ def init_ring_paths(tmpdir):
|
||||
|
||||
class SwiftUtilsTestCase(unittest.TestCase):
|
||||
|
||||
@mock.patch('swift_utils.rings_synced')
|
||||
@mock.patch('swift_utils.get_broker_token')
|
||||
@mock.patch('swift_utils.update_www_rings')
|
||||
@mock.patch('swift_utils.get_builders_checksum')
|
||||
@mock.patch('swift_utils.get_rings_checksum')
|
||||
@@ -35,7 +37,10 @@ class SwiftUtilsTestCase(unittest.TestCase):
|
||||
mock_is_elected_leader, mock_path_exists,
|
||||
mock_log, mock_balance_rings,
|
||||
mock_get_rings_checksum,
|
||||
mock_get_builders_checksum, mock_update_www_rings):
|
||||
mock_get_builders_checksum, mock_update_www_rings,
|
||||
mock_get_broker_token, mock_rings_synced):
|
||||
mock_get_broker_token.return_value = "token1"
|
||||
mock_rings_synced.return_value = True
|
||||
|
||||
# Make sure same is returned for both so that we don't try to sync
|
||||
mock_get_rings_checksum.return_value = None
|
||||
@@ -73,6 +78,7 @@ class SwiftUtilsTestCase(unittest.TestCase):
|
||||
self.assertTrue(mock_set_min_hours.called)
|
||||
self.assertTrue(mock_balance_rings.called)
|
||||
|
||||
@mock.patch('swift_utils.get_broker_token')
|
||||
@mock.patch('swift_utils.balance_rings')
|
||||
@mock.patch('swift_utils.log')
|
||||
@mock.patch('swift_utils.is_elected_leader')
|
||||
@@ -84,7 +90,9 @@ class SwiftUtilsTestCase(unittest.TestCase):
|
||||
mock_config,
|
||||
mock_is_elected_leader,
|
||||
mock_log,
|
||||
mock_balance_rings):
|
||||
mock_balance_rings,
|
||||
mock_get_broker_token):
|
||||
mock_get_broker_token.return_value = "token1"
|
||||
|
||||
@swift_utils.sync_builders_and_rings_if_changed
|
||||
def mock_balance():
|
||||
@@ -152,11 +160,9 @@ class SwiftUtilsTestCase(unittest.TestCase):
|
||||
'stop-proxy-service-ack': 'token1',
|
||||
'sync-only-builders': None}, rq)
|
||||
|
||||
@mock.patch('swift_utils.uuid')
|
||||
def test_cluster_rpc_sync_request(self, mock_uuid):
|
||||
mock_uuid.uuid4.return_value = 'token1'
|
||||
def test_cluster_rpc_sync_request(self):
|
||||
rpc = swift_utils.SwiftProxyClusterRPC()
|
||||
rq = rpc.sync_rings_request('HostA')
|
||||
rq = rpc.sync_rings_request('HostA', 'token1')
|
||||
self.assertEqual({'trigger': 'token1',
|
||||
'builder-broker': 'HostA',
|
||||
'peers-only': None,
|
||||
|
||||
Reference in New Issue
Block a user