Support for watch()

This commit is contained in:
Davanum Srinivas 2017-03-27 06:36:44 -04:00
parent a600eac6d2
commit 222c5b3bdc
4 changed files with 153 additions and 0 deletions

View File

@ -11,9 +11,11 @@
# under the License.
import json
import threading
import uuid
import requests
from six.moves import queue
from etcd3gw.lease import Lease
from etcd3gw.lock import Lock
@ -21,6 +23,7 @@ from etcd3gw.utils import _decode
from etcd3gw.utils import _encode
from etcd3gw.utils import _increment_last_byte
from etcd3gw.utils import DEFAULT_TIMEOUT
from etcd3gw import watch
class Client(object):
@ -263,3 +266,68 @@ class Client(object):
"""
return self.post(self.get_url("/kv/txn"),
data=json.dumps(txn))
def watch(self, key, **kwargs):
"""Watch a key.
:param key: key to watch
:returns: tuple of ``events_iterator`` and ``cancel``.
Use ``events_iterator`` to get the events of key changes
and ``cancel`` to cancel the watch request
"""
event_queue = queue.Queue()
def callback(event):
event_queue.put(event)
w = watch.Watcher(self, key, callback, **kwargs)
canceled = threading.Event()
def cancel():
canceled.set()
event_queue.put(None)
w.cancel()
def iterator():
while not canceled.is_set():
event = event_queue.get()
if event is None:
canceled.set()
if not canceled.is_set():
yield event
return iterator(), cancel
def watch_prefix(self, key_prefix, **kwargs):
"""The same as ``watch``, but watches a range of keys with a prefix."""
kwargs['range_end'] = \
_increment_last_byte(_encode(key_prefix))
return self.watch(key_prefix, **kwargs)
def watch_once(self, key, timeout=None, **kwargs):
"""Watch a key and stops after the first event.
:param key: key to watch
:param timeout: (optional) timeout in seconds.
:returns: event
"""
event_queue = queue.Queue()
def callback(event):
event_queue.put(event)
w = watch.Watcher(self, key, callback, **kwargs)
try:
return event_queue.get(timeout=timeout)
except queue.Empty:
raise watch.WatchTimedOut()
finally:
w.cancel(self)
def watch_prefix_once(self, key_prefix, timeout=None, **kwargs):
"""Watches a range of keys with a prefix, similar to watch_once"""
kwargs['range_end'] = \
_increment_last_byte(_encode(key_prefix))
return self.watch_once(key_prefix, timeout=timeout, **kwargs)

View File

@ -0,0 +1,24 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from etcd3gw.client import Client
def main():
client = Client()
events, cancel = client.watch('foo')
for event in events:
print(">>>> event : %r", event)
if __name__ == "__main__":
main()

60
etcd3gw/watch.py Normal file
View File

@ -0,0 +1,60 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import futurist
from etcd3gw.utils import _encode
class WatchTimedOut(Exception):
pass
def _watch(client, key, callback, **kwargs):
create_watch = {
'key': _encode(key)
}
if 'range_end' in kwargs:
create_watch['range_end'] = _encode(kwargs['range_end'])
if 'start_revision' in kwargs:
create_watch['start_revision'] = kwargs['start_revision']
if 'progress_notify' in kwargs:
create_watch['progress_notify'] = kwargs['progress_notify']
if 'filters' in kwargs:
create_watch['filters'] = kwargs['filters']
if 'prev_kv' in kwargs:
create_watch['prev_kv'] = kwargs['prev_kv']
create_request = {
"create_request": create_watch
}
resp = client.session.post(client.get_url('/watch'),
json=create_request,
stream=True)
for line in resp.iter_content(chunk_size=None, decode_unicode=True):
if line:
decoded_line = line.decode('utf-8')
callback(json.loads(decoded_line))
class Watcher(object):
def __init__(self, client, key, callback, **kwargs):
self._executor = futurist.ThreadPoolExecutor(max_workers=1)
self._executor.submit(_watch, client, key, callback, **kwargs)
def alive(self):
return self._executor.alive
def shutdown(self):
self._executor.shutdown()

View File

@ -6,3 +6,4 @@ pbr>=2.0 # Apache-2.0
urllib3>=1.8.3 # MIT
requests>=2.10.0,!=2.12.2,!=2.13.0 # Apache-2.0
six>=1.9.0 # MIT
futurist!=0.15.0,>=0.11.0 # Apache-2.0