diff --git a/.zuul.yaml b/.zuul.yaml index ce4fd058..4ed2e3ca 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -57,7 +57,7 @@ description: | Run tests using ``py38-consul`` environment. vars: - tox_envlist: mysql-python + tox_envlist: py38-consul - job: name: tooz-tox-py38-etcd diff --git a/doc/source/user/compatibility.rst b/doc/source/user/compatibility.rst index 6a38c298..63c201f0 100644 --- a/doc/source/user/compatibility.rst +++ b/doc/source/user/compatibility.rst @@ -30,7 +30,7 @@ Driver support * - Driver - Supported * - :py:class:`~tooz.drivers.consul.ConsulDriver` - - No + - Yes * - :py:class:`~tooz.drivers.etcd.EtcdDriver` - No * - :py:class:`~tooz.drivers.file.FileDriver` diff --git a/doc/source/user/drivers.rst b/doc/source/user/drivers.rst index c03d9de9..bea3398f 100644 --- a/doc/source/user/drivers.rst +++ b/doc/source/user/drivers.rst @@ -243,8 +243,8 @@ Consul **Summary:** -The `consul`_ driver is a driver providing only distributed locks (for now) -and is based on the consul server key/value storage and/or +The `consul`_ driver is a driver providing distributed locking and group +membership and is based on the consul server key/value storage and/or primitives. When a lock is acquired it will release either when explicitly released or automatically when the consul session ends (for example if the program using the lock crashes). diff --git a/setup-consul-env.sh b/setup-consul-env.sh index 21ecef8f..a8a8dd88 100755 --- a/setup-consul-env.sh +++ b/setup-consul-env.sh @@ -2,7 +2,13 @@ set -eux if [ -z "$(which consul)" ]; then - CONSUL_VERSION=0.6.3 + # originally we used 0.6.3 (released in Jan 2016). + # updated to 1.7.4 in Change-Id: Iaddf21f14c434129541e7c9ec7134e0661f7be52 + # 1.4.0 (released Nov 2018) has a new ACL system. + # 1.6.1 (released Sep 2019) is the version used by python-consul2 for testing. + # 1.7.0 (released Feb 2020) changes standards enforcement. + # For details see upgrade notes in Change-Id: I98fc96468b21368ce66365e3fc38c495b1f2918a + CONSUL_VERSION=1.7.4 CONSUL_RELEASE_URL=https://releases.hashicorp.com/consul case `uname -s` in Darwin) diff --git a/tools/compat-matrix.py b/tools/compat-matrix.py index 53574b84..1e35aa24 100644 --- a/tools/compat-matrix.py +++ b/tools/compat-matrix.py @@ -73,7 +73,7 @@ print_header("Driver support", delim="-") print("") grouping_table = [ [ - "No", # Consul + "Yes", # Consul "No", # Etcd "Yes", # File "No", # IPC diff --git a/tooz/drivers/consul.py b/tooz/drivers/consul.py index 6f6ef8ae..1ef91b40 100644 --- a/tooz/drivers/consul.py +++ b/tooz/drivers/consul.py @@ -14,8 +14,12 @@ # License for the specific language governing permissions and limitations # under the License. +import contextlib +import functools + import consul from oslo_utils import encodeutils +import requests import tooz from tooz import _retry @@ -24,6 +28,33 @@ from tooz import locking from tooz import utils +@contextlib.contextmanager +def _failure_translator(): + + """Translates common consul exceptions into tooz exceptions.""" + try: + yield + except (consul.Timeout, requests.exceptions.RequestException) as e: + utils.raise_with_cause(coordination.ToozConnectionError, + encodeutils.exception_to_unicode(e), + cause=e) + except (consul.ConsulException, ValueError) as e: + # ValueError = Typically json decoding failed for some reason. + utils.raise_with_cause(tooz.ToozError, + encodeutils.exception_to_unicode(e), + cause=e) + + +def _translate_failures(func): + + @functools.wraps(func) + def wrapper(*args, **kwargs): + with _failure_translator(): + return func(*args, **kwargs) + + return wrapper + + class ConsulLock(locking.Lock): def __init__(self, name, node, address, session_id, client, token=None): super(ConsulLock, self).__init__(name) @@ -40,6 +71,7 @@ class ConsulLock(locking.Lock): raise tooz.NotImplemented @_retry.retry(stop_max_delay=blocking) + @_translate_failures def _acquire(): # Check if we are the owner and if we are simulate # blocking (because consul will not block a second @@ -54,7 +86,7 @@ class ConsulLock(locking.Lock): else: # The value can be anything. gotten = self._client.kv.put(key=self._name, - value=u"I got it!", + value="I got it!", acquire=self._session_id, token=self._acl_token) if gotten: @@ -87,11 +119,12 @@ class ConsulLock(locking.Lock): return False -class ConsulDriver(coordination.CoordinationDriver): +class ConsulDriver(coordination.CoordinationDriverCachedRunWatchers, + coordination.CoordinationDriverWithExecutor): """This driver uses `python-consul`_ client against `consul`_ servers. - The ConsulDriver implements a minimal set of coordination driver API(s) - needed to make Consul being used as an option for Distributed Locking. The + The ConsulDriver implements the coordination driver API(s) so that Consul + can be used as an option for Distributed Locking and Group Membership. The data is stored in Consul's key-value store. The Consul driver connection URI should look like:: @@ -110,14 +143,43 @@ class ConsulDriver(coordination.CoordinationDriver): ================== ======= For details on the available options, refer to - http://python-consul.readthedocs.org/en/latest/. + http://python-consul2.readthedocs.org/en/latest/. - .. _python-consul: http://python-consul.readthedocs.org/ + The following Key/Value paths are utilized in Consul to implement the + coordination APIs: + +-------------------------------------------+--------------+--------------+ + | Key | Value | Description | + +===========================================+==============+==============+ + | /groups/ | None | Group of | + | | | members. | + +-------------------------------------------+--------------+--------------+ + | /groups// | Member | Member in a | + | | capabilities | group. | + | | encoded as | | + | | msgpack | | + +-------------------------------------------+--------------+--------------+ + | /group_locks/ | Consul | Lock for | + | | session ID | group | + | | | membership | + +-------------------------------------------+--------------+--------------+ + | /locks/ | Consul | Each key is | + | | session ID | a distributed| + | | | lock. | + +-------------------------------------------+--------------+--------------+ + + NOTE: A group in tooz is NOT the same as a Consul service, so tooz groups + are implemented using Consul Key/Values and not with native services. + Groups in tooz do not have host:port details or health checks that report + to consul to verify that the service is still alive and listening on that + host:port. If you need to use native Consul services, configure the Consul + agent directly (not via tooz). + + .. _python-consul: http://python-consul2.readthedocs.org/ .. _consul: https://consul.io/ """ #: Default namespace when none is provided - TOOZ_NAMESPACE = u"tooz" + TOOZ_NAMESPACE = "tooz" #: Default TTL DEFAULT_TTL = 15 @@ -128,6 +190,31 @@ class ConsulDriver(coordination.CoordinationDriver): #: Consul ACL Token if not provided ACL_TOKEN = None + CHARACTERISTICS = ( + # client liveness is based on more than just timeouts + coordination.Characteristics.NON_TIMEOUT_BASED, + # multiple service instances (group members) could register + # with different ports per thread/proc + # but the agent is always on locahost: so not DISTRIBUTED_ACROSS_HOSTS + coordination.Characteristics.DISTRIBUTED_ACROSS_THREADS, + coordination.Characteristics.DISTRIBUTED_ACROSS_PROCESSES, + # https://www.consul.io/docs/internals/consensus#consistency-modes + # The consul consistency mode determines the history characteristics. + # Writes *always* go through a single leader process into raft log. + # Reads *may* use all servers depending on the request's mode: + # - 'consistent' = LINEARIZABLE + # - 'default' = SEQUENTIAL + # - 'stale' = CAUSAL + coordination.Characteristics.SEQUENTIAL, # 'default' consistency mode + # https://www.consul.io/docs/internals/consensus + # raft log of servers is snapshotted + compacted + coordination.Characteristics.SERIALIZABLE, + ) + """ + Tuple of :py:class:`~tooz.coordination.Characteristics` introspectable + enum member(s) that can be used to interogate how this driver works. + """ + def __init__(self, member_id, parsed_url, options): super(ConsulDriver, self).__init__(member_id, parsed_url, options) options = utils.collapse(options) @@ -139,6 +226,8 @@ class ConsulDriver(coordination.CoordinationDriver): namespace = options.get('namespace', self.TOOZ_NAMESPACE) self._namespace = encodeutils.safe_decode(namespace) self._acl_token = options.get('acl_token', self.ACL_TOKEN) + # the empty group name adds a trailing / needed for lookups + self._groups_prefix = self._paths_join(self._namespace, "groups", "") self._client = None def _start(self): @@ -151,6 +240,7 @@ class ConsulDriver(coordination.CoordinationDriver): local_agent = self._client.agent.self() self._node = local_agent['Member']['Name'] self._address = local_agent['Member']['Addr'] + # implicitly uses the agent's datacenter (set in consul agent config) # Register a Node self._client.catalog.register(node=self._node, @@ -167,7 +257,8 @@ class ConsulDriver(coordination.CoordinationDriver): def _stop(self): if self._client is not None: if self._session_id is not None: - self._client.session.destroy(self._session_id) + self._client.session.destroy(self._session_id, + token=self._acl_token) self._session_id = None self._client = None @@ -179,32 +270,240 @@ class ConsulDriver(coordination.CoordinationDriver): # session has died or is unreachable. When a session expires all locks # are released and any services that were registered with that session # are marked as no longer active. - self._client.session.renew(self._session_id) + self._client.session.renew(self._session_id, token=self._acl_token) # renew the session every half-TTL or 1 second, whatever is larger sleep_sec = max(self._ttl / 2, 1) return sleep_sec - def get_lock(self, name): - real_name = self._paths_join(self._namespace, u"locks", name) + def _get_lock(self, real_name): return ConsulLock(real_name, self._node, self._address, session_id=self._session_id, client=self._client, token=self._acl_token) + def get_lock(self, name): + real_name = self._path_lock(name) + return self._get_lock(real_name) + + def _get_group_lock(self, group_id): + real_name = self._path_group_lock(group_id) + return self._get_lock(real_name) + @staticmethod def _paths_join(*args): pieces = [] for arg in args: pieces.append(encodeutils.safe_decode(arg)) - return u"/".join(pieces) + return "/".join(pieces) - def watch_join_group(self, group_id, callback): + def _path_lock(self, name): + return self._paths_join(self._namespace, "locks", name) + + def _path_group_lock(self, name): + return self._paths_join(self._namespace, "group_locks", name) + + def _path_group(self, group_id): + # add an extra '/' at the end so that searches with this path + # will go down and find their children + return self._paths_join(self._namespace, "groups", group_id) + "/" + + def _path_member(self, group_id, member_id): + return self._paths_join( + self._namespace, "groups", group_id, member_id + ) + + def _group_path_to_id(self, base_path, group_path): + """Translates a path into a group name. + + The group name is the last part of the path. So, we simply split on + the path separator '/' and return the last element. + + Example: + group_id = self._path_to_group_id("tooz/groups/helloworld") + print(group_id) # "helloworld" + """ + if group_path.startswith(base_path): + group_id = group_path[len(base_path):] + else: + group_id = group_path + # if a group has members (sub-keys) it will contain a trailing / + # we need to strip this to get just the name + # if a group has no members there is no trailing / (for some reason) + group_id = group_id.strip("/") + return utils.to_binary(group_id) + + def _get_group_members(self, group_path): + index, data = self._client.kv.get(group_path, recurse=True) + group = None + members = [] + for kv in (data or []): + if kv["Key"] == group_path: + group = kv + else: + members.append(kv) + return (group, members) + + def get_groups(self): + + @_translate_failures + def _get_groups(): + groups = [] + index, data = self._client.kv.get(self._groups_prefix, keys=True, + separator="/") + for key in (data or []): + if key != self._groups_prefix: + group_id = self._group_path_to_id(self._groups_prefix, key) + groups.append(group_id) + return groups + + return ConsulFutureResult(self._executor.submit(_get_groups)) + + def create_group(self, group_id): + + @_translate_failures + def _create_group(): + group_path = self._path_group(group_id) + # create with Check-And-Set index 0 will only succeed if the key + # doesn't exit + result = self._client.kv.put(group_path, "", cas=0) + if not result: + raise coordination.GroupAlreadyExist(group_id) + return result + + return ConsulFutureResult(self._executor.submit(_create_group)) + + def _destroy_group(self, group_id): + """Should only be used in tests...""" + with self._get_group_lock(group_id) as lock: + group_path = self._path_group(group_id) + self._client.kv.delete(group_path, recurse=True) + self._client.kv.delete(lock._name) + + def delete_group(self, group_id): + + @_translate_failures + def _delete_group(): + # create a lock for the group so that other operations on this + # group do not conflict while the group is being deleted + with self._get_group_lock(group_id) as lock: + group_path = self._path_group(group_id) + group, members = self._get_group_members(group_path) + if not group: + raise coordination.GroupNotCreated(group_id) + + if members: + raise coordination.GroupNotEmpty(group_id) + + # delete the group recursively + result = self._client.kv.delete(group_path, recurse=True) + + # delete the lock for the group + self._client.kv.delete(lock._name) + return result + + return ConsulFutureResult(self._executor.submit(_delete_group)) + + def join_group(self, group_id, capabilities=b""): + + @_translate_failures + def _join_group(): + # lock the group so that it doesn't get deleted while we join + with self._get_group_lock(group_id): + group_path = self._path_group(group_id) + member_path = self._path_member(group_id, self._member_id) + group, members = self._get_group_members(group_path) + if not group: + raise coordination.GroupNotCreated(group_id) + + for m in members: + if m["Key"] == member_path: + raise coordination.MemberAlreadyExist(group_id, + self._member_id) + + # create with Check-And-Set index 0 will only succeed if the + # key doesn't exit + self._client.kv.put(member_path, utils.dumps(capabilities), + cas=0) + self._joined_groups.add(group_id) + + return ConsulFutureResult(self._executor.submit(_join_group)) + + def leave_group(self, group_id): + + @_translate_failures + def _leave_group(): + # NOTE: We do NOT have to lock the group here because deletes in + # Consul are atomic and succeed even if the key doesn't exist + # This means that there is no race condition between checking + # if the member exists and then deleting it. + group_path = self._path_group(group_id) + member_path = self._path_member(group_id, self._member_id) + group, members = self._get_group_members(group_path) + member = None + for m in members: + if m["Key"] == member_path: + member = m + break + else: + raise coordination.MemberNotJoined(group_id, self._member_id) + + # delete the member key with Check-And-Set semantics based on index + # we read above + self._client.kv.delete(member_path, cas=member["ModifyIndex"]) + self._joined_groups.discard(group_id) + + return ConsulFutureResult(self._executor.submit(_leave_group)) + + def get_members(self, group_id): + + @_translate_failures + def _get_members(): + group_path = self._path_group(group_id) + group, members = self._get_group_members(group_path) + if not group: + raise coordination.GroupNotCreated(group_id) + + result = set() + for m in members: + member_id = self._group_path_to_id(group_path, m["Key"]) + result.add(member_id) + return result + + return ConsulFutureResult(self._executor.submit(_get_members)) + + def get_member_capabilities(self, group_id, member_id): + + @_translate_failures + def _get_member_capabilities(): + member_path = self._path_member(group_id, member_id) + index, data = self._client.kv.get(member_path) + if not data: + raise coordination.MemberNotJoined(group_id, member_id) + return utils.loads(data["Value"]) + + return ConsulFutureResult( + self._executor.submit(_get_member_capabilities)) + + def update_capabilities(self, group_id, capabilities): + + @_translate_failures + def _update_capabilities(): + member_path = self._path_member(group_id, self._member_id) + index, data = self._client.kv.get(member_path) + if not data: + raise coordination.MemberNotJoined(group_id, self._member_id) + # no need to Check-And-Set here, latest write wins + self._client.kv.put(member_path, utils.dumps(capabilities)) + + return ConsulFutureResult(self._executor.submit(_update_capabilities)) + + @staticmethod + def watch_elected_as_leader(group_id, callback): raise tooz.NotImplemented - def unwatch_join_group(self, group_id, callback): + @staticmethod + def unwatch_elected_as_leader(group_id, callback): raise tooz.NotImplemented - def watch_leave_group(self, group_id, callback): - raise tooz.NotImplemented - def unwatch_leave_group(self, group_id, callback): - raise tooz.NotImplemented +ConsulFutureResult = functools.partial(coordination.CoordinatorResult, + failure_translator=_failure_translator)