From 80f379bfa2af9c9a80ce369beb275361ed78963c Mon Sep 17 00:00:00 2001
From: Nick Maludy <nmaludy@gmail.com>
Date: Fri, 29 Mar 2019 21:43:11 -0400
Subject: [PATCH] Implements Group API for the Consul driver

Also fixes tox env for py38-consul tests
Update zuul script to use consul 1.7.4 (2020) vs 0.6.3 (2016)

Adds ACL tokens to all session management calls.

Change-Id: Iaddf21f14c434129541e7c9ec7134e0661f7be52
---
 .zuul.yaml                        |   2 +-
 doc/source/user/compatibility.rst |   2 +-
 doc/source/user/drivers.rst       |   4 +-
 setup-consul-env.sh               |   8 +-
 tools/compat-matrix.py            |   2 +-
 tooz/drivers/consul.py            | 335 ++++++++++++++++++++++++++++--
 6 files changed, 329 insertions(+), 24 deletions(-)

diff --git a/.zuul.yaml b/.zuul.yaml
index ce4fd058..4ed2e3ca 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -57,7 +57,7 @@
     description: |
       Run tests using ``py38-consul`` environment.
     vars:
-      tox_envlist: mysql-python
+      tox_envlist: py38-consul
 
 - job:
     name: tooz-tox-py38-etcd
diff --git a/doc/source/user/compatibility.rst b/doc/source/user/compatibility.rst
index 6a38c298..63c201f0 100644
--- a/doc/source/user/compatibility.rst
+++ b/doc/source/user/compatibility.rst
@@ -30,7 +30,7 @@ Driver support
    * - Driver
      - Supported
    * - :py:class:`~tooz.drivers.consul.ConsulDriver`
-     - No
+     - Yes
    * - :py:class:`~tooz.drivers.etcd.EtcdDriver`
      - No
    * - :py:class:`~tooz.drivers.file.FileDriver`
diff --git a/doc/source/user/drivers.rst b/doc/source/user/drivers.rst
index c03d9de9..bea3398f 100644
--- a/doc/source/user/drivers.rst
+++ b/doc/source/user/drivers.rst
@@ -243,8 +243,8 @@ Consul
 
 **Summary:**
 
-The `consul`_ driver is a driver providing only distributed locks (for now)
-and is based on the consul server key/value storage and/or
+The `consul`_ driver is a driver providing distributed locking and group
+membership and is based on the consul server key/value storage and/or
 primitives. When a lock is acquired it will release either when explicitly
 released or automatically when the consul session ends (for example if
 the program using the lock crashes).
diff --git a/setup-consul-env.sh b/setup-consul-env.sh
index 21ecef8f..a8a8dd88 100755
--- a/setup-consul-env.sh
+++ b/setup-consul-env.sh
@@ -2,7 +2,13 @@
 set -eux
 
 if [ -z "$(which consul)" ]; then
-    CONSUL_VERSION=0.6.3
+    # originally we used 0.6.3 (released in Jan 2016).
+    # updated to 1.7.4 in Change-Id: Iaddf21f14c434129541e7c9ec7134e0661f7be52
+    # 1.4.0 (released Nov 2018) has a new ACL system.
+    # 1.6.1 (released Sep 2019) is the version used by python-consul2 for testing.
+    # 1.7.0 (released Feb 2020) changes standards enforcement.
+    # For details see upgrade notes in Change-Id: I98fc96468b21368ce66365e3fc38c495b1f2918a
+    CONSUL_VERSION=1.7.4
     CONSUL_RELEASE_URL=https://releases.hashicorp.com/consul
     case `uname -s` in
         Darwin)
diff --git a/tools/compat-matrix.py b/tools/compat-matrix.py
index 53574b84..1e35aa24 100644
--- a/tools/compat-matrix.py
+++ b/tools/compat-matrix.py
@@ -73,7 +73,7 @@ print_header("Driver support", delim="-")
 print("")
 grouping_table = [
     [
-        "No",  # Consul
+        "Yes",  # Consul
         "No",  # Etcd
         "Yes",  # File
         "No",  # IPC
diff --git a/tooz/drivers/consul.py b/tooz/drivers/consul.py
index 6f6ef8ae..1ef91b40 100644
--- a/tooz/drivers/consul.py
+++ b/tooz/drivers/consul.py
@@ -14,8 +14,12 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import contextlib
+import functools
+
 import consul
 from oslo_utils import encodeutils
+import requests
 
 import tooz
 from tooz import _retry
@@ -24,6 +28,33 @@ from tooz import locking
 from tooz import utils
 
 
+@contextlib.contextmanager
+def _failure_translator():
+
+    """Translates common consul exceptions into tooz exceptions."""
+    try:
+        yield
+    except (consul.Timeout, requests.exceptions.RequestException) as e:
+        utils.raise_with_cause(coordination.ToozConnectionError,
+                               encodeutils.exception_to_unicode(e),
+                               cause=e)
+    except (consul.ConsulException, ValueError) as e:
+        # ValueError = Typically json decoding failed for some reason.
+        utils.raise_with_cause(tooz.ToozError,
+                               encodeutils.exception_to_unicode(e),
+                               cause=e)
+
+
+def _translate_failures(func):
+
+    @functools.wraps(func)
+    def wrapper(*args, **kwargs):
+        with _failure_translator():
+            return func(*args, **kwargs)
+
+    return wrapper
+
+
 class ConsulLock(locking.Lock):
     def __init__(self, name, node, address, session_id, client, token=None):
         super(ConsulLock, self).__init__(name)
@@ -40,6 +71,7 @@ class ConsulLock(locking.Lock):
             raise tooz.NotImplemented
 
         @_retry.retry(stop_max_delay=blocking)
+        @_translate_failures
         def _acquire():
             # Check if we are the owner and if we are simulate
             # blocking (because consul will not block a second
@@ -54,7 +86,7 @@ class ConsulLock(locking.Lock):
             else:
                 # The value can be anything.
                 gotten = self._client.kv.put(key=self._name,
-                                             value=u"I got it!",
+                                             value="I got it!",
                                              acquire=self._session_id,
                                              token=self._acl_token)
                 if gotten:
@@ -87,11 +119,12 @@ class ConsulLock(locking.Lock):
         return False
 
 
-class ConsulDriver(coordination.CoordinationDriver):
+class ConsulDriver(coordination.CoordinationDriverCachedRunWatchers,
+                   coordination.CoordinationDriverWithExecutor):
     """This driver uses `python-consul`_ client against `consul`_ servers.
 
-    The ConsulDriver implements a minimal set of coordination driver API(s)
-    needed to make Consul being used as an option for Distributed Locking. The
+    The ConsulDriver implements the coordination driver API(s) so that Consul
+    can be used as an option for Distributed Locking and Group Membership. The
     data is stored in Consul's key-value store.
 
     The Consul driver connection URI should look like::
@@ -110,14 +143,43 @@ class ConsulDriver(coordination.CoordinationDriver):
     ==================  =======
 
     For details on the available options, refer to
-    http://python-consul.readthedocs.org/en/latest/.
+    http://python-consul2.readthedocs.org/en/latest/.
 
-    .. _python-consul: http://python-consul.readthedocs.org/
+    The following Key/Value paths are utilized in Consul to implement the
+    coordination APIs:
+    +-------------------------------------------+--------------+--------------+
+    | Key                                       | Value        | Description  |
+    +===========================================+==============+==============+
+    | <namespace>/groups/<group_id>             | None         | Group of     |
+    |                                           |              | members.     |
+    +-------------------------------------------+--------------+--------------+
+    | <namespace>/groups/<group_id>/<member_id> | Member       | Member in a  |
+    |                                           | capabilities | group.       |
+    |                                           | encoded as   |              |
+    |                                           | msgpack      |              |
+    +-------------------------------------------+--------------+--------------+
+    | <namespace>/group_locks/<group_id>        | Consul       | Lock for     |
+    |                                           | session ID   | group        |
+    |                                           |              | membership   |
+    +-------------------------------------------+--------------+--------------+
+    | <namespace>/locks/<name>                  | Consul       | Each key is  |
+    |                                           | session ID   | a distributed|
+    |                                           |              | lock.        |
+    +-------------------------------------------+--------------+--------------+
+
+    NOTE: A group in tooz is NOT the same as a Consul service, so tooz groups
+    are implemented using Consul Key/Values and not with native services.
+    Groups in tooz do not have host:port details or health checks that report
+    to consul to verify that the service is still alive and listening on that
+    host:port. If you need to use native Consul services, configure the Consul
+    agent directly (not via tooz).
+
+    .. _python-consul: http://python-consul2.readthedocs.org/
     .. _consul: https://consul.io/
     """
 
     #: Default namespace when none is provided
-    TOOZ_NAMESPACE = u"tooz"
+    TOOZ_NAMESPACE = "tooz"
 
     #: Default TTL
     DEFAULT_TTL = 15
@@ -128,6 +190,31 @@ class ConsulDriver(coordination.CoordinationDriver):
     #: Consul ACL Token if not provided
     ACL_TOKEN = None
 
+    CHARACTERISTICS = (
+        # client liveness is based on more than just timeouts
+        coordination.Characteristics.NON_TIMEOUT_BASED,
+        # multiple service instances (group members) could register
+        # with different ports per thread/proc
+        # but the agent is always on locahost: so not DISTRIBUTED_ACROSS_HOSTS
+        coordination.Characteristics.DISTRIBUTED_ACROSS_THREADS,
+        coordination.Characteristics.DISTRIBUTED_ACROSS_PROCESSES,
+        # https://www.consul.io/docs/internals/consensus#consistency-modes
+        # The consul consistency mode determines the history characteristics.
+        # Writes *always* go through a single leader process into raft log.
+        # Reads *may* use all servers depending on the request's mode:
+        #   - 'consistent' = LINEARIZABLE
+        #   - 'default' = SEQUENTIAL
+        #   - 'stale' = CAUSAL
+        coordination.Characteristics.SEQUENTIAL,  # 'default' consistency mode
+        # https://www.consul.io/docs/internals/consensus
+        # raft log of servers is snapshotted + compacted
+        coordination.Characteristics.SERIALIZABLE,
+    )
+    """
+    Tuple of :py:class:`~tooz.coordination.Characteristics` introspectable
+    enum member(s) that can be used to interogate how this driver works.
+    """
+
     def __init__(self, member_id, parsed_url, options):
         super(ConsulDriver, self).__init__(member_id, parsed_url, options)
         options = utils.collapse(options)
@@ -139,6 +226,8 @@ class ConsulDriver(coordination.CoordinationDriver):
         namespace = options.get('namespace', self.TOOZ_NAMESPACE)
         self._namespace = encodeutils.safe_decode(namespace)
         self._acl_token = options.get('acl_token', self.ACL_TOKEN)
+        # the empty group name adds a trailing / needed for lookups
+        self._groups_prefix = self._paths_join(self._namespace, "groups", "")
         self._client = None
 
     def _start(self):
@@ -151,6 +240,7 @@ class ConsulDriver(coordination.CoordinationDriver):
         local_agent = self._client.agent.self()
         self._node = local_agent['Member']['Name']
         self._address = local_agent['Member']['Addr']
+        # implicitly uses the agent's datacenter (set in consul agent config)
 
         # Register a Node
         self._client.catalog.register(node=self._node,
@@ -167,7 +257,8 @@ class ConsulDriver(coordination.CoordinationDriver):
     def _stop(self):
         if self._client is not None:
             if self._session_id is not None:
-                self._client.session.destroy(self._session_id)
+                self._client.session.destroy(self._session_id,
+                                             token=self._acl_token)
                 self._session_id = None
             self._client = None
 
@@ -179,32 +270,240 @@ class ConsulDriver(coordination.CoordinationDriver):
         # session has died or is unreachable. When a session expires all locks
         # are released and any services that were registered with that session
         # are marked as no longer active.
-        self._client.session.renew(self._session_id)
+        self._client.session.renew(self._session_id, token=self._acl_token)
         # renew the session every half-TTL or 1 second, whatever is larger
         sleep_sec = max(self._ttl / 2, 1)
         return sleep_sec
 
-    def get_lock(self, name):
-        real_name = self._paths_join(self._namespace, u"locks", name)
+    def _get_lock(self, real_name):
         return ConsulLock(real_name, self._node, self._address,
                           session_id=self._session_id,
                           client=self._client, token=self._acl_token)
 
+    def get_lock(self, name):
+        real_name = self._path_lock(name)
+        return self._get_lock(real_name)
+
+    def _get_group_lock(self, group_id):
+        real_name = self._path_group_lock(group_id)
+        return self._get_lock(real_name)
+
     @staticmethod
     def _paths_join(*args):
         pieces = []
         for arg in args:
             pieces.append(encodeutils.safe_decode(arg))
-        return u"/".join(pieces)
+        return "/".join(pieces)
 
-    def watch_join_group(self, group_id, callback):
+    def _path_lock(self, name):
+        return self._paths_join(self._namespace, "locks", name)
+
+    def _path_group_lock(self, name):
+        return self._paths_join(self._namespace, "group_locks", name)
+
+    def _path_group(self, group_id):
+        # add an extra '/' at the end so that searches with this path
+        # will go down and find their children
+        return self._paths_join(self._namespace, "groups", group_id) + "/"
+
+    def _path_member(self, group_id, member_id):
+        return self._paths_join(
+            self._namespace, "groups", group_id, member_id
+        )
+
+    def _group_path_to_id(self, base_path, group_path):
+        """Translates a path into a group name.
+
+        The group name is the last part of the path. So, we simply split on
+        the path separator '/' and return the last element.
+
+        Example:
+           group_id = self._path_to_group_id("tooz/groups/helloworld")
+           print(group_id) # "helloworld"
+        """
+        if group_path.startswith(base_path):
+            group_id = group_path[len(base_path):]
+        else:
+            group_id = group_path
+        # if a group has members (sub-keys) it will contain a trailing /
+        # we need to strip this to get just the name
+        # if a group has no members there is no trailing / (for some reason)
+        group_id = group_id.strip("/")
+        return utils.to_binary(group_id)
+
+    def _get_group_members(self, group_path):
+        index, data = self._client.kv.get(group_path, recurse=True)
+        group = None
+        members = []
+        for kv in (data or []):
+            if kv["Key"] == group_path:
+                group = kv
+            else:
+                members.append(kv)
+        return (group, members)
+
+    def get_groups(self):
+
+        @_translate_failures
+        def _get_groups():
+            groups = []
+            index, data = self._client.kv.get(self._groups_prefix, keys=True,
+                                              separator="/")
+            for key in (data or []):
+                if key != self._groups_prefix:
+                    group_id = self._group_path_to_id(self._groups_prefix, key)
+                    groups.append(group_id)
+            return groups
+
+        return ConsulFutureResult(self._executor.submit(_get_groups))
+
+    def create_group(self, group_id):
+
+        @_translate_failures
+        def _create_group():
+            group_path = self._path_group(group_id)
+            # create with Check-And-Set index 0 will only succeed if the key
+            # doesn't exit
+            result = self._client.kv.put(group_path, "", cas=0)
+            if not result:
+                raise coordination.GroupAlreadyExist(group_id)
+            return result
+
+        return ConsulFutureResult(self._executor.submit(_create_group))
+
+    def _destroy_group(self, group_id):
+        """Should only be used in tests..."""
+        with self._get_group_lock(group_id) as lock:
+            group_path = self._path_group(group_id)
+            self._client.kv.delete(group_path, recurse=True)
+            self._client.kv.delete(lock._name)
+
+    def delete_group(self, group_id):
+
+        @_translate_failures
+        def _delete_group():
+            # create a lock for the group so that other operations on this
+            # group do not conflict while the group is being deleted
+            with self._get_group_lock(group_id) as lock:
+                group_path = self._path_group(group_id)
+                group, members = self._get_group_members(group_path)
+                if not group:
+                    raise coordination.GroupNotCreated(group_id)
+
+                if members:
+                    raise coordination.GroupNotEmpty(group_id)
+
+                # delete the group recursively
+                result = self._client.kv.delete(group_path, recurse=True)
+
+                # delete the lock for the group
+                self._client.kv.delete(lock._name)
+                return result
+
+        return ConsulFutureResult(self._executor.submit(_delete_group))
+
+    def join_group(self, group_id, capabilities=b""):
+
+        @_translate_failures
+        def _join_group():
+            # lock the group so that it doesn't get deleted while we join
+            with self._get_group_lock(group_id):
+                group_path = self._path_group(group_id)
+                member_path = self._path_member(group_id, self._member_id)
+                group, members = self._get_group_members(group_path)
+                if not group:
+                    raise coordination.GroupNotCreated(group_id)
+
+                for m in members:
+                    if m["Key"] == member_path:
+                        raise coordination.MemberAlreadyExist(group_id,
+                                                              self._member_id)
+
+                # create with Check-And-Set index 0 will only succeed if the
+                # key doesn't exit
+                self._client.kv.put(member_path, utils.dumps(capabilities),
+                                    cas=0)
+                self._joined_groups.add(group_id)
+
+        return ConsulFutureResult(self._executor.submit(_join_group))
+
+    def leave_group(self, group_id):
+
+        @_translate_failures
+        def _leave_group():
+            # NOTE: We do NOT have to lock the group here because deletes in
+            #       Consul are atomic and succeed even if the key doesn't exist
+            #       This means that there is no race condition between checking
+            #       if the member exists and then deleting it.
+            group_path = self._path_group(group_id)
+            member_path = self._path_member(group_id, self._member_id)
+            group, members = self._get_group_members(group_path)
+            member = None
+            for m in members:
+                if m["Key"] == member_path:
+                    member = m
+                    break
+            else:
+                raise coordination.MemberNotJoined(group_id, self._member_id)
+
+            # delete the member key with Check-And-Set semantics based on index
+            # we read above
+            self._client.kv.delete(member_path, cas=member["ModifyIndex"])
+            self._joined_groups.discard(group_id)
+
+        return ConsulFutureResult(self._executor.submit(_leave_group))
+
+    def get_members(self, group_id):
+
+        @_translate_failures
+        def _get_members():
+            group_path = self._path_group(group_id)
+            group, members = self._get_group_members(group_path)
+            if not group:
+                raise coordination.GroupNotCreated(group_id)
+
+            result = set()
+            for m in members:
+                member_id = self._group_path_to_id(group_path, m["Key"])
+                result.add(member_id)
+            return result
+
+        return ConsulFutureResult(self._executor.submit(_get_members))
+
+    def get_member_capabilities(self, group_id, member_id):
+
+        @_translate_failures
+        def _get_member_capabilities():
+            member_path = self._path_member(group_id, member_id)
+            index, data = self._client.kv.get(member_path)
+            if not data:
+                raise coordination.MemberNotJoined(group_id, member_id)
+            return utils.loads(data["Value"])
+
+        return ConsulFutureResult(
+            self._executor.submit(_get_member_capabilities))
+
+    def update_capabilities(self, group_id, capabilities):
+
+        @_translate_failures
+        def _update_capabilities():
+            member_path = self._path_member(group_id, self._member_id)
+            index, data = self._client.kv.get(member_path)
+            if not data:
+                raise coordination.MemberNotJoined(group_id, self._member_id)
+            # no need to Check-And-Set here, latest write wins
+            self._client.kv.put(member_path, utils.dumps(capabilities))
+
+        return ConsulFutureResult(self._executor.submit(_update_capabilities))
+
+    @staticmethod
+    def watch_elected_as_leader(group_id, callback):
         raise tooz.NotImplemented
 
-    def unwatch_join_group(self, group_id, callback):
+    @staticmethod
+    def unwatch_elected_as_leader(group_id, callback):
         raise tooz.NotImplemented
 
-    def watch_leave_group(self, group_id, callback):
-        raise tooz.NotImplemented
 
-    def unwatch_leave_group(self, group_id, callback):
-        raise tooz.NotImplemented
+ConsulFutureResult = functools.partial(coordination.CoordinatorResult,
+                                       failure_translator=_failure_translator)