From a95691cd73c52d812ccc8fcc7ba94573043d472c Mon Sep 17 00:00:00 2001 From: Liam Young Date: Fri, 4 Sep 2015 11:44:31 +0100 Subject: [PATCH] Charmhelper sync --- .../contrib/storage/linux/ceph.py | 147 +++++++++++++----- 1 file changed, 104 insertions(+), 43 deletions(-) diff --git a/hooks/charmhelpers/contrib/storage/linux/ceph.py b/hooks/charmhelpers/contrib/storage/linux/ceph.py index db8459f0..0caaabe3 100644 --- a/hooks/charmhelpers/contrib/storage/linux/ceph.py +++ b/hooks/charmhelpers/contrib/storage/linux/ceph.py @@ -414,10 +414,16 @@ class CephBrokerRq(object): The API is versioned and defaults to version 1. """ - def __init__(self, api_version=1): + def __init__(self, api_version=1, ops=None, request_id=None): self.api_version = api_version - self.request_id = str(uuid.uuid1()) - self.ops = [] + if request_id: + self.request_id = request_id + else: + self.request_id = str(uuid.uuid1()) + if ops: + self.ops = ops + else: + self.ops = [] def add_op_create_pool(self, name, replica_count=3): self.ops.append({'op': 'create-pool', 'name': name, @@ -428,6 +434,28 @@ class CephBrokerRq(object): return json.dumps({'api-version': self.api_version, 'ops': self.ops, 'request-id': self.request_id}) + def _ops_equal(self, other): + if len(self.ops) == len(other.ops): + for req_no in range(0, len(self.ops)): + for key in ['replicas', 'name', 'op']: + if self.ops[req_no][key] != other.ops[req_no][key]: + return False + else: + return False + return True + + def __eq__(self, other): + if not isinstance(other, self.__class__): + return False + if self.api_version == other.api_version and \ + self._ops_equal(other): + return True + else: + return False + + def __ne__(self, other): + return not self.__eq__(other) + class CephBrokerRsp(object): """Ceph broker response. @@ -454,17 +482,42 @@ class CephBrokerRsp(object): return self.rsp.get('stderr') -def request_states(request_needed): - """Return dict showing if a request has been sent and completed per rid""" +def get_previous_request(rid): + """Return the last ceph broker request sent on a given relation + + @param rid: Relation id to query for request + """ + request = None + broker_req = relation_get(attribute='broker_req', rid=rid, + unit=local_unit()) + if broker_req: + request_data = json.loads(broker_req) + request = CephBrokerRq(api_version=request_data['api-version'], + ops=request_data['ops'], + request_id=request_data['request-id']) + return request + + +def get_request_states(request): + """Return a dict of requests per relation id with their corresponding + completion state. + + This allows a charm, which has a request for ceph, to see whether there is + an equivalent request already being processed and if so what state that + request is in. + + @param request: A CephBrokerRq object + """ complete = [] requests = {} for rid in relation_ids('ceph'): complete = False - previous_request = relation_get(attribute='broker_req', rid=rid, unit=local_unit()) - sent = equivalent_broker_requests(previous_request, request_needed.request) - if sent: - complete = broker_request_completed(previous_request, rid) + previous_request = get_previous_request(rid) + if request == previous_request: + sent = True + complete = is_broker_request_complete(previous_request, rid) else: + sent = False complete = False requests[rid] = { 'sent': sent, @@ -473,48 +526,47 @@ def request_states(request_needed): return requests -def request_sent(request_needed): - """Check to see if a matching request has been sent""" - states = request_states(request_needed) +def is_request_sent(request): + """Check to see if a functionally equivalent request has already been sent + + Returns True if a similair request has been sent + + @param request: A CephBrokerRq object + """ + states = get_request_states(request) for rid in states.keys(): if not states[rid]['sent']: return False return True -def request_complete(request_needed): - """Check to see if a matching request has been completed""" - states = request_states(request_needed) +def is_request_complete(request): + """Check to see if a functionally equivalent request has already been + completed + + Returns True if a similair request has been completed + + @param request: A CephBrokerRq object + """ + states = get_request_states(request) for rid in states.keys(): if not states[rid]['complete']: return False return True -def equivalent_broker_requests(encoded_req1, encoded_req2): - """Check to see if two requests are equivalent (ignore request id)""" - if not encoded_req1 or not encoded_req2: - return False - req1 = json.loads(encoded_req1) - req2 = json.loads(encoded_req2) - if len(req1['ops']) != len(req2['ops']): - return False - for req_no in range(0, len(req1['ops'])): - for key in ['replicas', 'name', 'op']: - if req1['ops'][req_no][key] != req2['ops'][req_no][key]: - return False - return True +def is_broker_request_complete(request, rid): + """Check if a given request has been completed on the given relation - -def broker_request_completed(encoded_req, rid): - """Check if a given request has been completed on the given relation""" - req = json.loads(encoded_req) + @param request: A CephBrokerRq object + @param rid: Relation ID + """ broker_key = get_broker_rsp_key() for unit in related_units(rid): rdata = relation_get(rid=rid, unit=unit) if rdata.get(broker_key): rsp = CephBrokerRsp(rdata.get(broker_key)) - if rsp.request_id == req.get('request-id'): + if rsp.request_id == request.request_id: if not rsp.exit_code: return True else: @@ -522,12 +574,13 @@ def broker_request_completed(encoded_req, rid): # remote ceph cluster does not support unit targeted replies or it # has not processed our request yet. if rdata.get('broker_rsp'): - if rdata.get('unit-targeted-reponses'): + request_data = json.loads(rdata['broker_rsp']) + if request_data.get('request-id'): log('Ignoring legacy broker_rsp without unit key as remote ' - 'service supports unit specific replies') + 'service supports unit specific replies', level=DEBUG) else: log('Using legacy broker_rsp as remote service does not ' - 'supports unit specific replies') + 'supports unit specific replies', level=DEBUG) rsp = CephBrokerRsp(rdata['broker_rsp']) if not rsp.exit_code: return True @@ -535,15 +588,23 @@ def broker_request_completed(encoded_req, rid): def get_broker_rsp_key(): - """Return broker request key for this unit""" + """Return broker response key for this unit + + This is the key that ceph is going to use to pass request status + information back to this unit + """ return 'broker-rsp-' + local_unit().replace('/', '-') -def send_request_if_needed(rq): - """Send broker request if one has not already been sent""" - if request_sent(rq): - log('Request already sent but not complete, not sending new request') +def send_request_if_needed(request): + """Send broker request if an equivalent request has not already been sent + + @param request: A CephBrokerRq object + """ + if is_request_sent(request): + log('Request already sent but not complete, not sending new request', + level=DEBUG) else: for rid in relation_ids('ceph'): - log('Sending request {}'.format(rq.request_id)) - relation_set(relation_id=rid, broker_req=rq.request) + log('Sending request {}'.format(request.request_id), level=DEBUG) + relation_set(relation_id=rid, broker_req=request.request)