From 222c5b3bdc7dd782a417dec9692e62bf289bc86a Mon Sep 17 00:00:00 2001 From: Davanum Srinivas Date: Mon, 27 Mar 2017 06:36:44 -0400 Subject: [PATCH] Support for watch() --- etcd3gw/client.py | 68 +++++++++++++++++++++++++++++++++++++ etcd3gw/examples/watcher.py | 24 +++++++++++++ etcd3gw/watch.py | 60 ++++++++++++++++++++++++++++++++ requirements.txt | 1 + 4 files changed, 153 insertions(+) create mode 100644 etcd3gw/examples/watcher.py create mode 100644 etcd3gw/watch.py diff --git a/etcd3gw/client.py b/etcd3gw/client.py index c3b61a4..32baf6e 100644 --- a/etcd3gw/client.py +++ b/etcd3gw/client.py @@ -11,9 +11,11 @@ # under the License. import json +import threading import uuid import requests +from six.moves import queue from etcd3gw.lease import Lease from etcd3gw.lock import Lock @@ -21,6 +23,7 @@ from etcd3gw.utils import _decode from etcd3gw.utils import _encode from etcd3gw.utils import _increment_last_byte from etcd3gw.utils import DEFAULT_TIMEOUT +from etcd3gw import watch class Client(object): @@ -263,3 +266,68 @@ class Client(object): """ return self.post(self.get_url("/kv/txn"), data=json.dumps(txn)) + + def watch(self, key, **kwargs): + """Watch a key. + + :param key: key to watch + + :returns: tuple of ``events_iterator`` and ``cancel``. + Use ``events_iterator`` to get the events of key changes + and ``cancel`` to cancel the watch request + """ + event_queue = queue.Queue() + + def callback(event): + event_queue.put(event) + + w = watch.Watcher(self, key, callback, **kwargs) + canceled = threading.Event() + + def cancel(): + canceled.set() + event_queue.put(None) + w.cancel() + + def iterator(): + while not canceled.is_set(): + event = event_queue.get() + if event is None: + canceled.set() + if not canceled.is_set(): + yield event + + return iterator(), cancel + + def watch_prefix(self, key_prefix, **kwargs): + """The same as ``watch``, but watches a range of keys with a prefix.""" + kwargs['range_end'] = \ + _increment_last_byte(_encode(key_prefix)) + return self.watch(key_prefix, **kwargs) + + def watch_once(self, key, timeout=None, **kwargs): + """Watch a key and stops after the first event. + + :param key: key to watch + :param timeout: (optional) timeout in seconds. + :returns: event + """ + event_queue = queue.Queue() + + def callback(event): + event_queue.put(event) + + w = watch.Watcher(self, key, callback, **kwargs) + + try: + return event_queue.get(timeout=timeout) + except queue.Empty: + raise watch.WatchTimedOut() + finally: + w.cancel(self) + + def watch_prefix_once(self, key_prefix, timeout=None, **kwargs): + """Watches a range of keys with a prefix, similar to watch_once""" + kwargs['range_end'] = \ + _increment_last_byte(_encode(key_prefix)) + return self.watch_once(key_prefix, timeout=timeout, **kwargs) diff --git a/etcd3gw/examples/watcher.py b/etcd3gw/examples/watcher.py new file mode 100644 index 0000000..50cfc46 --- /dev/null +++ b/etcd3gw/examples/watcher.py @@ -0,0 +1,24 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from etcd3gw.client import Client + + +def main(): + client = Client() + events, cancel = client.watch('foo') + for event in events: + print(">>>> event : %r", event) + + +if __name__ == "__main__": + main() diff --git a/etcd3gw/watch.py b/etcd3gw/watch.py new file mode 100644 index 0000000..967e819 --- /dev/null +++ b/etcd3gw/watch.py @@ -0,0 +1,60 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json + +import futurist + +from etcd3gw.utils import _encode + + +class WatchTimedOut(Exception): + pass + + +def _watch(client, key, callback, **kwargs): + create_watch = { + 'key': _encode(key) + } + if 'range_end' in kwargs: + create_watch['range_end'] = _encode(kwargs['range_end']) + if 'start_revision' in kwargs: + create_watch['start_revision'] = kwargs['start_revision'] + if 'progress_notify' in kwargs: + create_watch['progress_notify'] = kwargs['progress_notify'] + if 'filters' in kwargs: + create_watch['filters'] = kwargs['filters'] + if 'prev_kv' in kwargs: + create_watch['prev_kv'] = kwargs['prev_kv'] + + create_request = { + "create_request": create_watch + } + resp = client.session.post(client.get_url('/watch'), + json=create_request, + stream=True) + for line in resp.iter_content(chunk_size=None, decode_unicode=True): + if line: + decoded_line = line.decode('utf-8') + callback(json.loads(decoded_line)) + + +class Watcher(object): + def __init__(self, client, key, callback, **kwargs): + self._executor = futurist.ThreadPoolExecutor(max_workers=1) + self._executor.submit(_watch, client, key, callback, **kwargs) + + def alive(self): + return self._executor.alive + + def shutdown(self): + self._executor.shutdown() diff --git a/requirements.txt b/requirements.txt index 7e1e583..a8a4d21 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ pbr>=2.0 # Apache-2.0 urllib3>=1.8.3 # MIT requests>=2.10.0,!=2.12.2,!=2.13.0 # Apache-2.0 six>=1.9.0 # MIT +futurist!=0.15.0,>=0.11.0 # Apache-2.0 \ No newline at end of file