Charmhelper sync
This commit is contained in:
parent
c0904504b1
commit
a95691cd73
@ -414,10 +414,16 @@ class CephBrokerRq(object):
|
||||
|
||||
The API is versioned and defaults to version 1.
|
||||
"""
|
||||
def __init__(self, api_version=1):
|
||||
def __init__(self, api_version=1, ops=None, request_id=None):
|
||||
self.api_version = api_version
|
||||
self.request_id = str(uuid.uuid1())
|
||||
self.ops = []
|
||||
if request_id:
|
||||
self.request_id = request_id
|
||||
else:
|
||||
self.request_id = str(uuid.uuid1())
|
||||
if ops:
|
||||
self.ops = ops
|
||||
else:
|
||||
self.ops = []
|
||||
|
||||
def add_op_create_pool(self, name, replica_count=3):
|
||||
self.ops.append({'op': 'create-pool', 'name': name,
|
||||
@ -428,6 +434,28 @@ class CephBrokerRq(object):
|
||||
return json.dumps({'api-version': self.api_version, 'ops': self.ops,
|
||||
'request-id': self.request_id})
|
||||
|
||||
def _ops_equal(self, other):
|
||||
if len(self.ops) == len(other.ops):
|
||||
for req_no in range(0, len(self.ops)):
|
||||
for key in ['replicas', 'name', 'op']:
|
||||
if self.ops[req_no][key] != other.ops[req_no][key]:
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
return True
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, self.__class__):
|
||||
return False
|
||||
if self.api_version == other.api_version and \
|
||||
self._ops_equal(other):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self.__eq__(other)
|
||||
|
||||
|
||||
class CephBrokerRsp(object):
|
||||
"""Ceph broker response.
|
||||
@ -454,17 +482,42 @@ class CephBrokerRsp(object):
|
||||
return self.rsp.get('stderr')
|
||||
|
||||
|
||||
def request_states(request_needed):
|
||||
"""Return dict showing if a request has been sent and completed per rid"""
|
||||
def get_previous_request(rid):
|
||||
"""Return the last ceph broker request sent on a given relation
|
||||
|
||||
@param rid: Relation id to query for request
|
||||
"""
|
||||
request = None
|
||||
broker_req = relation_get(attribute='broker_req', rid=rid,
|
||||
unit=local_unit())
|
||||
if broker_req:
|
||||
request_data = json.loads(broker_req)
|
||||
request = CephBrokerRq(api_version=request_data['api-version'],
|
||||
ops=request_data['ops'],
|
||||
request_id=request_data['request-id'])
|
||||
return request
|
||||
|
||||
|
||||
def get_request_states(request):
|
||||
"""Return a dict of requests per relation id with their corresponding
|
||||
completion state.
|
||||
|
||||
This allows a charm, which has a request for ceph, to see whether there is
|
||||
an equivalent request already being processed and if so what state that
|
||||
request is in.
|
||||
|
||||
@param request: A CephBrokerRq object
|
||||
"""
|
||||
complete = []
|
||||
requests = {}
|
||||
for rid in relation_ids('ceph'):
|
||||
complete = False
|
||||
previous_request = relation_get(attribute='broker_req', rid=rid, unit=local_unit())
|
||||
sent = equivalent_broker_requests(previous_request, request_needed.request)
|
||||
if sent:
|
||||
complete = broker_request_completed(previous_request, rid)
|
||||
previous_request = get_previous_request(rid)
|
||||
if request == previous_request:
|
||||
sent = True
|
||||
complete = is_broker_request_complete(previous_request, rid)
|
||||
else:
|
||||
sent = False
|
||||
complete = False
|
||||
requests[rid] = {
|
||||
'sent': sent,
|
||||
@ -473,48 +526,47 @@ def request_states(request_needed):
|
||||
return requests
|
||||
|
||||
|
||||
def request_sent(request_needed):
|
||||
"""Check to see if a matching request has been sent"""
|
||||
states = request_states(request_needed)
|
||||
def is_request_sent(request):
|
||||
"""Check to see if a functionally equivalent request has already been sent
|
||||
|
||||
Returns True if a similair request has been sent
|
||||
|
||||
@param request: A CephBrokerRq object
|
||||
"""
|
||||
states = get_request_states(request)
|
||||
for rid in states.keys():
|
||||
if not states[rid]['sent']:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def request_complete(request_needed):
|
||||
"""Check to see if a matching request has been completed"""
|
||||
states = request_states(request_needed)
|
||||
def is_request_complete(request):
|
||||
"""Check to see if a functionally equivalent request has already been
|
||||
completed
|
||||
|
||||
Returns True if a similair request has been completed
|
||||
|
||||
@param request: A CephBrokerRq object
|
||||
"""
|
||||
states = get_request_states(request)
|
||||
for rid in states.keys():
|
||||
if not states[rid]['complete']:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def equivalent_broker_requests(encoded_req1, encoded_req2):
|
||||
"""Check to see if two requests are equivalent (ignore request id)"""
|
||||
if not encoded_req1 or not encoded_req2:
|
||||
return False
|
||||
req1 = json.loads(encoded_req1)
|
||||
req2 = json.loads(encoded_req2)
|
||||
if len(req1['ops']) != len(req2['ops']):
|
||||
return False
|
||||
for req_no in range(0, len(req1['ops'])):
|
||||
for key in ['replicas', 'name', 'op']:
|
||||
if req1['ops'][req_no][key] != req2['ops'][req_no][key]:
|
||||
return False
|
||||
return True
|
||||
def is_broker_request_complete(request, rid):
|
||||
"""Check if a given request has been completed on the given relation
|
||||
|
||||
|
||||
def broker_request_completed(encoded_req, rid):
|
||||
"""Check if a given request has been completed on the given relation"""
|
||||
req = json.loads(encoded_req)
|
||||
@param request: A CephBrokerRq object
|
||||
@param rid: Relation ID
|
||||
"""
|
||||
broker_key = get_broker_rsp_key()
|
||||
for unit in related_units(rid):
|
||||
rdata = relation_get(rid=rid, unit=unit)
|
||||
if rdata.get(broker_key):
|
||||
rsp = CephBrokerRsp(rdata.get(broker_key))
|
||||
if rsp.request_id == req.get('request-id'):
|
||||
if rsp.request_id == request.request_id:
|
||||
if not rsp.exit_code:
|
||||
return True
|
||||
else:
|
||||
@ -522,12 +574,13 @@ def broker_request_completed(encoded_req, rid):
|
||||
# remote ceph cluster does not support unit targeted replies or it
|
||||
# has not processed our request yet.
|
||||
if rdata.get('broker_rsp'):
|
||||
if rdata.get('unit-targeted-reponses'):
|
||||
request_data = json.loads(rdata['broker_rsp'])
|
||||
if request_data.get('request-id'):
|
||||
log('Ignoring legacy broker_rsp without unit key as remote '
|
||||
'service supports unit specific replies')
|
||||
'service supports unit specific replies', level=DEBUG)
|
||||
else:
|
||||
log('Using legacy broker_rsp as remote service does not '
|
||||
'supports unit specific replies')
|
||||
'supports unit specific replies', level=DEBUG)
|
||||
rsp = CephBrokerRsp(rdata['broker_rsp'])
|
||||
if not rsp.exit_code:
|
||||
return True
|
||||
@ -535,15 +588,23 @@ def broker_request_completed(encoded_req, rid):
|
||||
|
||||
|
||||
def get_broker_rsp_key():
|
||||
"""Return broker request key for this unit"""
|
||||
"""Return broker response key for this unit
|
||||
|
||||
This is the key that ceph is going to use to pass request status
|
||||
information back to this unit
|
||||
"""
|
||||
return 'broker-rsp-' + local_unit().replace('/', '-')
|
||||
|
||||
|
||||
def send_request_if_needed(rq):
|
||||
"""Send broker request if one has not already been sent"""
|
||||
if request_sent(rq):
|
||||
log('Request already sent but not complete, not sending new request')
|
||||
def send_request_if_needed(request):
|
||||
"""Send broker request if an equivalent request has not already been sent
|
||||
|
||||
@param request: A CephBrokerRq object
|
||||
"""
|
||||
if is_request_sent(request):
|
||||
log('Request already sent but not complete, not sending new request',
|
||||
level=DEBUG)
|
||||
else:
|
||||
for rid in relation_ids('ceph'):
|
||||
log('Sending request {}'.format(rq.request_id))
|
||||
relation_set(relation_id=rid, broker_req=rq.request)
|
||||
log('Sending request {}'.format(request.request_id), level=DEBUG)
|
||||
relation_set(relation_id=rid, broker_req=request.request)
|
||||
|
Loading…
Reference in New Issue
Block a user