diff --git a/src/ceph_client/requires.py b/src/ceph_client/requires.py index b67048e..b8a8c56 100644 --- a/src/ceph_client/requires.py +++ b/src/ceph_client/requires.py @@ -14,7 +14,33 @@ from .lib import base_requires +from charms.reactive import ( + when, +) + class CephClientRequires(base_requires.CephRequires): - pass + @when('endpoint.{endpoint_name}.joined') + def joined(self): + super().joined() + + @when('endpoint.{endpoint_name}.changed') + def changed(self): + super().changed() + + @when('endpoint.{endpoint_name}.departed') + def departed(self): + super().changed() + + @when('endpoint.{endpoint_name}.broken') + def broken(self): + super().broken() + + def initial_ceph_response(self): + data = { + 'key': self.key(), + 'auth': self.auth(), + 'mon_hosts': self.mon_hosts() + } + return data diff --git a/src/lib/base_requires.py b/src/lib/base_requires.py index ebfeefa..c821fe2 100644 --- a/src/lib/base_requires.py +++ b/src/lib/base_requires.py @@ -14,61 +14,63 @@ import json -from charms.reactive import hook -from charms.reactive import RelationBase -from charms.reactive import scopes -from charmhelpers.core import hookenv +import charms.reactive as reactive + from charmhelpers.core.hookenv import log from charmhelpers.contrib.network.ip import format_ipv6_addr from charmhelpers.contrib.storage.linux.ceph import ( CephBrokerRq, is_request_complete, - send_request_if_needed, + is_request_sent, ) -class CephRequires(RelationBase): - scope = scopes.GLOBAL +class CephRequires(reactive.Endpoint): - auto_accessors = ['auth', 'key'] - - @hook('{requires:ceph-client}-relation-{joined}') def joined(self): - self.set_state('{relation_name}.connected') + reactive.set_flag(self.expand_name('{endpoint_name}.connected')) + + def key(self): + return self.all_joined_units.received.get('key') + + def auth(self): + return self.all_joined_units.received.get('auth') + + @property + def relation_name(self): + return self.expand_name('{endpoint_name}') + + def initial_ceph_response(self): + raise NotImplementedError - @hook('{requires:ceph-client}-relation-{changed,departed}') def changed(self): - data = { - 'key': self.key(), - 'auth': self.auth(), - 'mon_hosts': self.mon_hosts() - } + data = self.initial_ceph_response() if all(data.values()): - self.set_state('{relation_name}.available') + reactive.set_flag(self.expand_name('{endpoint_name}.available')) - json_rq = self.get_local(key='broker_req') - if json_rq: - rq = CephBrokerRq() - j = json.loads(json_rq) - rq.ops = j['ops'] + rq = self.get_current_request() + if rq: log("changed broker_req: {}".format(rq.ops)) - if rq and is_request_complete(rq, - relation=self.relation_name): + if rq and is_request_complete(rq, relation=self.relation_name): log("Setting ceph-client.pools.available") - self.set_state('{relation_name}.pools.available') + reactive.set_flag( + self.expand_name('{endpoint_name}.pools.available')) else: log("incomplete request. broker_req not found") - @hook('{requires:ceph-client}-relation-{broken}') def broken(self): - self.remove_state('{relation_name}.available') - self.remove_state('{relation_name}.connected') - self.remove_state('{relation_name}.pools.available') + reactive.clear_flag( + self.expand_name('{endpoint_name}.available')) + reactive.clear_flag( + self.expand_name('{endpoint_name}.connected')) + reactive.clear_flag( + self.expand_name('{endpoint_name}.pools.available')) def create_replicated_pool(self, name, replicas=3, weight=None, - pg_num=None, group=None, namespace=None): + pg_num=None, group=None, namespace=None, + app_name=None): """ Request pool setup @@ -82,17 +84,22 @@ class CephRequires(RelationBase): @param group: Group to add pool to. @param namespace: A group can optionally have a namespace defined that will be used to further restrict pool access. + @param app_name: (Optional) Tag pool with application name. Note that + there is certain protocols emerging upstream with + regard to meaningful application names to use. + Examples are ``rbd`` and ``rgw``. """ - rq = self.get_current_request() + rq = self.get_current_request() or CephBrokerRq() rq.add_op_create_replicated_pool(name=name, replica_count=replicas, pg_num=pg_num, weight=weight, group=group, - namespace=namespace) - self.set_local(key='broker_req', value=rq.request) - send_request_if_needed(rq, relation=self.relation_name) - self.remove_state('{relation_name}.pools.available') + namespace=namespace, + app_name=app_name) + self.send_request_if_needed(rq) + reactive.clear_flag( + self.expand_name('{endpoint_name}.pools.available')) def create_pool(self, name, replicas=3, weight=None, pg_num=None, group=None, namespace=None): @@ -130,7 +137,7 @@ class CephRequires(RelationBase): @param max_objects: Maximum object quota to apply @param allow_ec_overwrites: Allow EC pools to be overwritten """ - rq = self.get_current_request() + rq = self.get_current_request() or CephBrokerRq() rq.add_op_create_erasure_pool(name=name, erasure_profile=erasure_profile, weight=weight, @@ -139,9 +146,9 @@ class CephRequires(RelationBase): max_bytes=max_bytes, max_objects=max_objects, allow_ec_overwrites=allow_ec_overwrites) - self.set_local(key='broker_req', value=rq.request) - send_request_if_needed(rq, relation=self.relation_name) - self.remove_state('{relation_name}.pools.available') + self.send_request_if_needed(rq, relation=self.relation_name) + reactive.clear_flag( + self.expand_name('{endpoint_name}.pools.available')) def create_erasure_profile(self, name, erasure_type='jerasure', @@ -181,7 +188,7 @@ class CephRequires(RelationBase): Type of crush bucket in which set of chunks defined by lrc_locality will be stored. """ - rq = self.get_current_request() + rq = self.get_current_request() or CephBrokerRq() rq.add_op_create_erasure_profile( name=name, erasure_type=erasure_type, @@ -195,12 +202,13 @@ class CephRequires(RelationBase): clay_scalar_mds=clay_scalar_mds, lrc_crush_locality=lrc_crush_locality ) - self.set_local(key='broker_req', value=rq.request) - send_request_if_needed(rq, relation=self.relation_name) - self.remove_state('{relation_name}.pools.available') + self.send_request_if_needed(rq, relation=self.relation_name) + reactive.clear_flag( + self.expand_name('{endpoint_name}.pools.available')) def request_access_to_group(self, name, namespace=None, permission=None, - key_name=None, object_prefix_permissions=None): + key_name=None, + object_prefix_permissions=None): """ Adds the requested permissions to service's Ceph key @@ -218,44 +226,49 @@ class CephRequires(RelationBase): @param key_name: userid to grant permission to @param object_prefix_permissions: Add object_prefix permissions. """ - current_request = self.get_current_request() + current_request = self.get_current_request() or CephBrokerRq() current_request.add_op_request_access_to_group( name, namespace=namespace, permission=permission, key_name=key_name, object_prefix_permissions=object_prefix_permissions) - self.set_local(key='broker_req', value=current_request.request) - send_request_if_needed(current_request, relation=self.relation_name) + self.send_request_if_needed(current_request) + + def send_request_if_needed(self, request): + """Send broker request if an equivalent request has not been sent + + @param request: A CephBrokerRq object + """ + if is_request_sent(request, relation=self.relation_name): + log('Request already sent but not complete, ' + 'not sending new request') + else: + for relation in self.relations: + relation.to_publish['broker_req'] = json.loads( + request.request) def get_current_request(self): - """Return the current broker request for the interface.""" - # json.dumps of the CephBrokerRq() - rq = CephBrokerRq() - - json_rq = self.get_local(key='broker_req') - if json_rq: - try: - j = json.loads(json_rq) - log("Json request: {}".format(json_rq)) - rq.set_ops(j['ops']) - except ValueError as err: - log("Unable to decode broker_req: {}. Error {}".format( - json_rq, err)) - return rq + broker_reqs = [] + for relation in self.relations: + broker_req = relation.to_publish.get('broker_req', {}) + if broker_req: + rq = CephBrokerRq() + rq.set_ops(broker_req['ops']) + broker_reqs.append(rq) + # Check that if there are multiple requests then they are the same. + assert all(x == broker_reqs[0] for x in broker_reqs) + if broker_reqs: + return broker_reqs[0] def get_remote_all(self, key, default=None): """Return a list of all values presented by remote units for key""" - # TODO: might be a nicer way todo this - written a while back! values = [] - for conversation in self.conversations(): - for relation_id in conversation.relation_ids: - for unit in hookenv.related_units(relation_id): - value = hookenv.relation_get(key, - unit, - relation_id) or default - if value: - values.append(value) + for relation in self.relations: + for unit in relation.units: + value = unit.received.get(key, default) + if value: + values.append(value) return list(set(values)) def mon_hosts(self): diff --git a/unit_tests/test_requires.py b/unit_tests/test_requires.py index cd8348a..1f76ec9 100644 --- a/unit_tests/test_requires.py +++ b/unit_tests/test_requires.py @@ -10,10 +10,13 @@ # See the License for the specific language governing permissions and # limitations under the License. - +import json import unittest from unittest import mock +from charmhelpers.contrib.storage.linux.ceph import ( + CephBrokerRq, +) with mock.patch('charmhelpers.core.hookenv.metadata') as _meta: _meta.return_Value = 'ss' @@ -27,7 +30,6 @@ _hook_args = {} TO_PATCH = [] TO_PATCH_BASE_REQUIRES = [ 'is_request_complete', - 'send_request_if_needed', ] @@ -41,6 +43,14 @@ def mock_hook(*args, **kwargs): return inner +class DummyRequest(CephBrokerRq): + + def __init__(self, req_json=None, request_id=12): + super().__init__(request_id=request_id) + if req_json: + self.set_ops(json.loads(req_json)['ops']) + + class TestCephClientRequires(unittest.TestCase): @classmethod @@ -84,6 +94,8 @@ class TestCephClientRequires(unittest.TestCase): setattr(self, method, self.patch(method)) for method in TO_PATCH_BASE_REQUIRES: setattr(self, method, self.patch(method, requires.base_requires)) + self.patch_object(requires.base_requires.reactive, 'set_flag') + self.patch_object(requires.base_requires.reactive, 'clear_flag') def tearDown(self): self.cr = None @@ -101,6 +113,26 @@ class TestCephClientRequires(unittest.TestCase): self._patches_start[attr] = started setattr(self, attr, started) + def patch_object(self, obj, attr, name=None, **kwargs): + """Patch a patchable thing. Uses mock.patch.object() to do the work. + Automatically unpatches at the end of the test. + + The mock gets added to the test object (self) using 'name' or the attr + passed in the arguments. + + :param obj: an object that needs to have an attribute patched. + :param attr: that represents the attribute being patched. + :param name: optional name to call the mock. + :param **kwargs: any other args to pass to mock.patch() + """ + mocked = mock.patch.object(obj, attr, **kwargs) + if name is None: + name = attr + started = mocked.start() + self._patches[name] = mocked + self._patches_start[name] = started + setattr(self, name, started) + def test_registered_hooks(self): # test that the decorators actually registered the relation # expressions that are meaningful for this interface: this is to @@ -109,78 +141,59 @@ class TestCephClientRequires(unittest.TestCase): hook_patterns = { 'data_changed': ('endpoint.{endpoint_name}.changed', ), 'joined': ('endpoint.{endpoint_name}.joined', ), - 'broken': ('endpoint.{endpoint_name}.joined', ), + 'broken': ('endpoint.{endpoint_name}.broken', ), + 'changed': ('endpoint.{endpoint_name}.changed', ), + 'departed': ('endpoint.{endpoint_name}.departed', ), } for k, v in _hook_args.items(): self.assertEqual(hook_patterns[k], v['args']) - def test_date_changed(self): + def test_data_changed(self): self.patch_kr('key', 'key1') self.patch_kr('auth', 'auth1') self.patch_kr('mon_hosts', 'host1') - self.patch_kr('get_local', None) - self.patch_kr('set_state') self.cr.changed() - self.set_state.assert_called_once_with('{relation_name}.available') + self.set_flag.assert_called_once_with('some-relation.available') - def test_date_changed_incomplete(self): + def test_data_changed_incomplete(self): self.patch_kr('key', 'key1') self.patch_kr('auth', None) self.patch_kr('mon_hosts', 'host1') - self.patch_kr('get_local', None) - self.patch_kr('set_state') self.cr.changed() - self.assertFalse(self.set_state.called) + self.assertFalse(self.set_flag.called) - def test_date_changed_existing_broker_rq(self): - broker_req = ( - '{"api-version": 1, ' - '"request-id": "4f7e247d-f953-11e8-a4f3-fa163e55565e",' - '"ops": [{"group": "volumes", "name": "cinder-ceph", ' - '"weight": 40, "replicas": 3, "pg_num": null, ' - '"group-namespace": null, "op": "create-pool"}]}') + def test_data_changed_existing_broker_rq(self): self.patch_kr('key', 'key1') self.patch_kr('auth', 'auth1') self.patch_kr('mon_hosts', 'host1') - self.patch_kr('get_local', broker_req) - self.patch_kr('set_state') + self.patch_kr('get_current_request', DummyRequest()) self.is_request_complete.return_value = True self.cr.changed() - self.set_state.assert_has_calls([ - mock.call('{relation_name}.available'), - mock.call('{relation_name}.pools.available')]) + self.set_flag.assert_has_calls([ + mock.call('some-relation.available'), + mock.call('some-relation.pools.available')]) def test_date_changed_existing_broker_rq_incomplete(self): - broker_req = ( - '{"api-version": 1, ' - '"request-id": "4f7e247d-f953-11e8-a4f3-fa163e55565e",' - '"ops": [{"group": "volumes", "name": "cinder-ceph", ' - '"weight": 40, "replicas": 3, "pg_num": null, ' - '"group-namespace": null, "op": "create-pool"}]}') self.patch_kr('key', 'key1') self.patch_kr('auth', 'auth1') self.patch_kr('mon_hosts', 'host1') - self.patch_kr('get_local', broker_req) - self.patch_kr('set_state') self.is_request_complete.return_value = False self.cr.changed() # Side effect of asserting pools.available was not set. - self.set_state.assert_called_once_with('{relation_name}.available') + self.set_flag.assert_called_once_with('some-relation.available') def test_broken(self): - self.patch_kr('remove_state') self.cr.broken() - self.remove_state.assert_has_calls([ - mock.call('{relation_name}.available'), - mock.call('{relation_name}.connected'), - mock.call('{relation_name}.pools.available')]) + self.clear_flag.assert_has_calls([ + mock.call('some-relation.available'), + mock.call('some-relation.connected'), + mock.call('some-relation.pools.available')]) @mock.patch.object(charmhelpers.contrib.storage.linux.ceph.uuid, 'uuid1') def test_create_pool_new_request(self, _uuid1): - self.patch_kr('remove_state') + self.patch_kr('get_current_request', None) + self.patch_kr('send_request_if_needed') _uuid1.return_value = '9e34123e-fa0c-11e8-ad9c-fa163ed1cc55' - self.patch_kr('get_local', None) - self.patch_kr('set_local') self.cr.create_pool('bob') ceph_broker_rq = self.send_request_if_needed.mock_calls[0][1][0] self.assertEqual( @@ -199,7 +212,7 @@ class TestCephClientRequires(unittest.TestCase): @mock.patch.object(charmhelpers.contrib.storage.linux.ceph.uuid, 'uuid1') def test_create_pool_existing_request(self, _uuid1): - self.patch_kr('remove_state') + self.patch_kr('send_request_if_needed') _uuid1.return_value = '9e34123e-fa0c-11e8-ad9c-fa163ed1cc55' req = ( '{"api-version": 1, ' @@ -208,7 +221,8 @@ class TestCephClientRequires(unittest.TestCase): '"group-namespace": null, "app-name": null, "max-bytes": null, ' '"max-objects": null}], ' '"request-id": "9e34123e-fa0c-11e8-ad9c-fa163ed1cc55"}') - self.patch_kr('get_local', req) + existing_request = DummyRequest(req_json=req) + self.patch_kr('get_current_request', existing_request) self.cr.create_pool('bob') ceph_broker_rq = self.send_request_if_needed.mock_calls[0][1][0] self.assertEqual( @@ -226,8 +240,7 @@ class TestCephClientRequires(unittest.TestCase): 'weight': None}]) def test_request_access_to_group_new_request(self): - self.patch_kr('get_local', '{"ops": []}') - self.patch_kr('set_local') + self.patch_kr('send_request_if_needed') self.cr.request_access_to_group( 'volumes', key_name='cinder', @@ -245,21 +258,22 @@ class TestCephClientRequires(unittest.TestCase): 'op': 'add-permissions-to-key'}]) def test_request_access_to_group_existing_request(self): + self.patch_kr('send_request_if_needed') req = ( '{"api-version": 1, ' '"ops": [{"op": "create-pool", "name": "volumes", "replicas": 3, ' '"pg_num": null, "weight": null, "group": null, ' '"group-namespace": null}], ' '"request-id": "9e34123e-fa0c-11e8-ad9c-fa163ed1cc55"}') - self.patch_kr('get_local', req) + existing_request = DummyRequest(req_json=req) + self.patch_kr('get_current_request', existing_request) self.cr.request_access_to_group( 'volumes', key_name='cinder', object_prefix_permissions={'class-read': ['rbd_children']}, permission='rwx') - ceph_broker_rq = self.send_request_if_needed.mock_calls[0][1][0] self.assertEqual( - ceph_broker_rq.ops, + existing_request.ops, [ { 'op': 'create-pool', @@ -278,9 +292,7 @@ class TestCephClientRequires(unittest.TestCase): 'class-read': ['rbd_children']}, 'op': 'add-permissions-to-key'}]) - @mock.patch.object(requires.base_requires.hookenv, 'related_units') - @mock.patch.object(requires.base_requires.hookenv, 'relation_get') - def test_get_remote_all(self, relation_get, related_units): + def test_get_remote_all(self): unit_data = { 'rid:1': { 'app1/0': { @@ -294,16 +306,21 @@ class TestCephClientRequires(unittest.TestCase): 'key1': 'value1', 'key2': 'value3'}}, 'rid:3': {}} + unit0_r1_mock = mock.MagicMock() + unit0_r1_mock.received = unit_data['rid:1']['app1/0'] + unit1_r1_mock = mock.MagicMock() + unit1_r1_mock.received = unit_data['rid:1']['app1/1'] + unit0_r2_mock = mock.MagicMock() + unit0_r2_mock.received = unit_data['rid:2']['app2/0'] + rel1 = mock.MagicMock() + rel1.units = [unit0_r1_mock, unit1_r1_mock] + rel2 = mock.MagicMock() + rel2.units = [unit0_r2_mock] + rel3 = mock.MagicMock() + rel3.units = [] - def get_unit_data(key, unit, relation_id): - return unit_data[relation_id].get(unit, {}).get(key, {}) - conv1 = mock.MagicMock() - conv1.relation_ids = ['rid:1', 'rid:2'] - conv2 = mock.MagicMock() - conv2.relation_ids = ['rid:3'] - self.patch_kr('conversations', [conv1, conv2]) - related_units.side_effect = lambda x: unit_data[x].keys() - relation_get.side_effect = get_unit_data + self.patch_kr('_relations') + self._relations.__iter__.return_value = [rel1, rel2, rel3] # Check de-duplication: self.assertEqual( self.cr.get_remote_all('key1'),