Files
deb-python-trollius/examples/source1.py
2014-12-12 18:08:50 +01:00

99 lines
2.6 KiB
Python

"""Like source.py, but uses streams."""
import argparse
import sys
from asyncio import *
from asyncio import test_utils
ARGS = argparse.ArgumentParser(description="TCP data sink example.")
ARGS.add_argument(
'--tls', action='store_true', dest='tls',
default=False, help='Use TLS')
ARGS.add_argument(
'--iocp', action='store_true', dest='iocp',
default=False, help='Use IOCP event loop (Windows only)')
ARGS.add_argument(
'--stop', action='store_true', dest='stop',
default=False, help='Stop the server by sending it b"stop" as data')
ARGS.add_argument(
'--host', action='store', dest='host',
default='127.0.0.1', help='Host name')
ARGS.add_argument(
'--port', action='store', dest='port',
default=1111, type=int, help='Port number')
ARGS.add_argument(
'--size', action='store', dest='size',
default=16*1024, type=int, help='Data size')
class Debug:
"""A clever little class that suppresses repetitive messages."""
overwriting = False
label = 'stream1:'
def print(self, *args):
if self.overwriting:
print(file=sys.stderr)
self.overwriting = 0
print(self.label, *args, file=sys.stderr)
def oprint(self, *args):
self.overwriting += 1
end = '\n'
if self.overwriting >= 3:
if self.overwriting == 3:
print(self.label, '[...]', file=sys.stderr)
end = '\r'
print(self.label, *args, file=sys.stderr, end=end, flush=True)
@coroutine
def start(loop, args):
d = Debug()
total = 0
sslctx = None
if args.tls:
d.print('using dummy SSLContext')
sslctx = test_utils.dummy_ssl_context()
r, w = yield from open_connection(args.host, args.port, ssl=sslctx)
d.print('r =', r)
d.print('w =', w)
if args.stop:
w.write(b'stop')
w.close()
else:
size = args.size
data = b'x'*size
try:
while True:
total += size
d.oprint('writing', size, 'bytes; total', total)
w.write(data)
f = w.drain()
if f:
d.print('pausing')
yield from f
except (ConnectionResetError, BrokenPipeError) as exc:
d.print('caught', repr(exc))
def main():
global args
args = ARGS.parse_args()
if args.iocp:
from asyncio.windows_events import ProactorEventLoop
loop = ProactorEventLoop()
set_event_loop(loop)
else:
loop = get_event_loop()
try:
loop.run_until_complete(start(loop, args))
finally:
loop.close()
if __name__ == '__main__':
main()