Files
deb-python-trollius/examples/cacheclt.py
Victor Stinner 684f3be000 Python issue #23208: Add BaseEventLoop._current_handle
In debug mode, BaseEventLoop._run_once() now sets the
BaseEventLoop._current_handle attribute to the handle currently executed.
In release mode or when no handle is executed, the attribute is None.

BaseEventLoop.default_exception_handler() displays the traceback of the current
handle if available.
2015-01-26 10:52:45 +01:00

214 lines
6.9 KiB
Python

"""Client for cache server.
See cachesvr.py for protocol description.
"""
import argparse
import asyncio
from asyncio import test_utils
import json
import logging
ARGS = argparse.ArgumentParser(description='Cache client example.')
ARGS.add_argument(
'--tls', action='store_true', dest='tls',
default=False, help='Use TLS')
ARGS.add_argument(
'--iocp', action='store_true', dest='iocp',
default=False, help='Use IOCP event loop (Windows only)')
ARGS.add_argument(
'--host', action='store', dest='host',
default='localhost', help='Host name')
ARGS.add_argument(
'--port', action='store', dest='port',
default=54321, type=int, help='Port number')
ARGS.add_argument(
'--timeout', action='store', dest='timeout',
default=5, type=float, help='Timeout')
ARGS.add_argument(
'--max_backoff', action='store', dest='max_backoff',
default=5, type=float, help='Max backoff on reconnect')
ARGS.add_argument(
'--ntasks', action='store', dest='ntasks',
default=10, type=int, help='Number of tester tasks')
ARGS.add_argument(
'--ntries', action='store', dest='ntries',
default=5, type=int, help='Number of request tries before giving up')
args = ARGS.parse_args()
class CacheClient:
"""Multiplexing cache client.
This wraps a single connection to the cache client. The
connection is automatically re-opened when an error occurs.
Multiple tasks may share this object; the requests will be
serialized.
The public API is get(), set(), delete() (all are coroutines).
"""
def __init__(self, host, port, sslctx=None, loop=None):
self.host = host
self.port = port
self.sslctx = sslctx
self.loop = loop
self.todo = set()
self.initialized = False
self.task = asyncio.Task(self.activity(), loop=self.loop)
@asyncio.coroutine
def get(self, key):
resp = yield from self.request('get', key)
if resp is None:
return None
return resp.get('value')
@asyncio.coroutine
def set(self, key, value):
resp = yield from self.request('set', key, value)
if resp is None:
return False
return resp.get('status') == 'ok'
@asyncio.coroutine
def delete(self, key):
resp = yield from self.request('delete', key)
if resp is None:
return False
return resp.get('status') == 'ok'
@asyncio.coroutine
def request(self, type, key, value=None):
assert not self.task.done()
data = {'type': type, 'key': key}
if value is not None:
data['value'] = value
payload = json.dumps(data).encode('utf8')
waiter = asyncio.Future(loop=self.loop)
if self.initialized:
try:
yield from self.send(payload, waiter)
except IOError:
self.todo.add((payload, waiter))
else:
self.todo.add((payload, waiter))
return (yield from waiter)
@asyncio.coroutine
def activity(self):
backoff = 0
while True:
try:
self.reader, self.writer = yield from asyncio.open_connection(
self.host, self.port, ssl=self.sslctx, loop=self.loop)
except Exception as exc:
backoff = min(args.max_backoff, backoff + (backoff//2) + 1)
logging.info('Error connecting: %r; sleep %s', exc, backoff)
yield from asyncio.sleep(backoff, loop=self.loop)
continue
backoff = 0
self.next_id = 0
self.pending = {}
self. initialized = True
try:
while self.todo:
payload, waiter = self.todo.pop()
if not waiter.done():
yield from self.send(payload, waiter)
while True:
resp_id, resp = yield from self.process()
if resp_id in self.pending:
payload, waiter = self.pending.pop(resp_id)
if not waiter.done():
waiter.set_result(resp)
except Exception as exc:
self.initialized = False
self.writer.close()
while self.pending:
req_id, pair = self.pending.popitem()
payload, waiter = pair
if not waiter.done():
self.todo.add(pair)
logging.info('Error processing: %r', exc)
@asyncio.coroutine
def send(self, payload, waiter):
self.next_id += 1
req_id = self.next_id
frame = 'request %d %d\n' % (req_id, len(payload))
self.writer.write(frame.encode('ascii'))
self.writer.write(payload)
self.pending[req_id] = payload, waiter
yield from self.writer.drain()
@asyncio.coroutine
def process(self):
frame = yield from self.reader.readline()
if not frame:
raise EOFError()
head, tail = frame.split(None, 1)
if head == b'error':
raise IOError('OOB error: %r' % tail)
if head != b'response':
raise IOError('Bad frame: %r' % frame)
resp_id, resp_size = map(int, tail.split())
data = yield from self.reader.readexactly(resp_size)
if len(data) != resp_size:
raise EOFError()
resp = json.loads(data.decode('utf8'))
return resp_id, resp
def main():
asyncio.set_event_loop(None)
if args.iocp:
from asyncio.windows_events import ProactorEventLoop
loop = ProactorEventLoop()
else:
loop = asyncio.new_event_loop()
sslctx = None
if args.tls:
sslctx = test_utils.dummy_ssl_context()
cache = CacheClient(args.host, args.port, sslctx=sslctx, loop=loop)
try:
loop.run_until_complete(
asyncio.gather(
*[testing(i, cache, loop) for i in range(args.ntasks)],
loop=loop))
finally:
loop.close()
@asyncio.coroutine
def testing(label, cache, loop):
def w(g):
return asyncio.wait_for(g, args.timeout, loop=loop)
key = 'foo-%s' % label
while True:
logging.info('%s %s', label, '-'*20)
try:
ret = yield from w(cache.set(key, 'hello-%s-world' % label))
logging.info('%s set %s', label, ret)
ret = yield from w(cache.get(key))
logging.info('%s get %s', label, ret)
ret = yield from w(cache.delete(key))
logging.info('%s del %s', label, ret)
ret = yield from w(cache.get(key))
logging.info('%s get2 %s', label, ret)
except asyncio.TimeoutError:
logging.warn('%s Timeout', label)
except Exception as exc:
logging.exception('%s Client exception: %r', label, exc)
break
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
main()