Fix watcher failing with huge payloads
When the watcher receives a huge payload from the Etcd server, the payload may be split into 2 chunks: the first chunk contains the json data, and the second contains only a newline (\n). Skip these empty lines because json.loads raises an uncaught exception and the thread fails silently. Closes-Bug: #2072492 Change-Id: Iced7f057cbe928033b7a8bb8fe7086a8a1d3c4c5
This commit is contained in:
parent
3337348e77
commit
b5ff7f1625
@ -288,6 +288,51 @@ class TestEtcd3Gateway(base.TestCase):
|
|||||||
|
|
||||||
t.join()
|
t.join()
|
||||||
|
|
||||||
|
@unittest.skipUnless(
|
||||||
|
_is_etcd3_running(), "etcd3 is not available")
|
||||||
|
def test_watch_huge_payload(self):
|
||||||
|
key = '/%s-watch_key/watch/huge_payload' % str(uuid.uuid4())
|
||||||
|
|
||||||
|
def update_etcd(v):
|
||||||
|
print(f"put({key}, {v}")
|
||||||
|
self.client.put(key, v)
|
||||||
|
out = self.client.get(key)
|
||||||
|
self.assertEqual([v.encode("latin-1")], out)
|
||||||
|
|
||||||
|
def update_key():
|
||||||
|
# sleep to make watch can get the event
|
||||||
|
time.sleep(3)
|
||||||
|
update_etcd('0' * 10000)
|
||||||
|
time.sleep(1)
|
||||||
|
update_etcd('1' * 10000)
|
||||||
|
time.sleep(1)
|
||||||
|
update_etcd('2' * 10000)
|
||||||
|
time.sleep(1)
|
||||||
|
update_etcd('3' * 10000)
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
t = threading.Thread(name="update_key_huge", target=update_key)
|
||||||
|
t.start()
|
||||||
|
|
||||||
|
change_count = 0
|
||||||
|
events_iterator, cancel = self.client.watch(key)
|
||||||
|
for event in events_iterator:
|
||||||
|
self.assertEqual(event['kv']['key'], key.encode("latin-1"))
|
||||||
|
self.assertEqual(
|
||||||
|
event['kv']['value'],
|
||||||
|
(str(change_count) * 10000).encode("latin-1"),
|
||||||
|
)
|
||||||
|
|
||||||
|
# if cancel worked, we should not receive event 3
|
||||||
|
assert event['kv']['value'][0] != b'3'
|
||||||
|
|
||||||
|
change_count += 1
|
||||||
|
if change_count > 2:
|
||||||
|
# if cancel not work, we will block in this for-loop forever
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
t.join()
|
||||||
|
|
||||||
@unittest.skipUnless(
|
@unittest.skipUnless(
|
||||||
_is_etcd3_running(), "etcd3 is not available")
|
_is_etcd3_running(), "etcd3 is not available")
|
||||||
def test_watch_prefix(self):
|
def test_watch_prefix(self):
|
||||||
|
@ -21,6 +21,10 @@ from etcd3gw.utils import _get_threadpool_executor
|
|||||||
def _watch(resp, callback):
|
def _watch(resp, callback):
|
||||||
for line in resp.iter_content(chunk_size=None, decode_unicode=False):
|
for line in resp.iter_content(chunk_size=None, decode_unicode=False):
|
||||||
decoded_line = line.decode('utf-8')
|
decoded_line = line.decode('utf-8')
|
||||||
|
# Skip a possible empty line (only "\n")
|
||||||
|
# https://bugs.launchpad.net/python-etcd3gw/+bug/2072492
|
||||||
|
if not decoded_line.strip():
|
||||||
|
continue
|
||||||
payload = json.loads(decoded_line)
|
payload = json.loads(decoded_line)
|
||||||
if 'created' in payload['result']:
|
if 'created' in payload['result']:
|
||||||
if payload['result']['created']:
|
if payload['result']['created']:
|
||||||
|
Loading…
Reference in New Issue
Block a user