Merge pull request #768 from meejah/newapi-improvements--transport-smart-class.rebase

Pre-parse configuration to _Transport class
This commit is contained in:
Tobias Oberstein
2016-12-29 15:40:25 +01:00
committed by GitHub
3 changed files with 346 additions and 66 deletions

View File

@@ -35,7 +35,6 @@ from twisted.internet.interfaces import IStreamClientEndpoint
from twisted.internet.endpoints import UNIXClientEndpoint from twisted.internet.endpoints import UNIXClientEndpoint
from twisted.internet.endpoints import TCP4ClientEndpoint from twisted.internet.endpoints import TCP4ClientEndpoint
from twisted.internet.error import ReactorNotRunning from twisted.internet.error import ReactorNotRunning
from twisted.internet.task import react
try: try:
_TLS = True _TLS = True
@@ -112,18 +111,12 @@ def _create_transport_serializer(serializer_id):
raise RuntimeError('could not create serializer for "{}"'.format(serializer_id)) raise RuntimeError('could not create serializer for "{}"'.format(serializer_id))
def _create_transport_serializers(transport_config): def _create_transport_serializers(transport):
""" """
Create a list of serializers to use with a WAMP protocol factory. Create a list of serializers to use with a WAMP protocol factory.
""" """
if u'serializers' in transport_config:
serializer_ids = _unique_list(transport_config[u'serializers'])
else:
serializer_ids = [u'msgpack', u'json']
serializers = [] serializers = []
for serializer_id in transport.serializers:
for serializer_id in serializer_ids:
if serializer_id == u'msgpack': if serializer_id == u'msgpack':
# try MsgPack WAMP serializer # try MsgPack WAMP serializer
try: try:
@@ -152,18 +145,18 @@ def _create_transport_serializers(transport_config):
return serializers return serializers
def _create_transport_factory(reactor, transport_config, session_factory): def _create_transport_factory(reactor, transport, session_factory):
""" """
Create a WAMP-over-XXX transport factory. Create a WAMP-over-XXX transport factory.
""" """
if transport_config['type'] == 'websocket': if transport.type == 'websocket':
# FIXME: forward WebSocket options # FIXME: forward WebSocket options
serializers = _create_transport_serializers(transport_config) serializers = _create_transport_serializers(transport)
return WampWebSocketClientFactory(session_factory, url=transport_config['url'], serializers=serializers) return WampWebSocketClientFactory(session_factory, url=transport.url, serializers=serializers)
elif transport_config['type'] == 'rawsocket': elif transport.type == 'rawsocket':
# FIXME: forward RawSocket options # FIXME: forward RawSocket options
serializer = _create_transport_serializer(transport_config.get('serializer', u'json')) serializer = _create_transport_serializer(transport.serializer)
return WampRawSocketClientFactory(session_factory, serializer=serializer) return WampRawSocketClientFactory(session_factory, serializer=serializer)
else: else:
@@ -277,12 +270,12 @@ class Component(component.Component):
" provider" " provider"
) )
def _connect_transport(self, reactor, transport_config, session_factory): def _connect_transport(self, reactor, transport, session_factory):
""" """
Create and connect a WAMP-over-XXX transport. Create and connect a WAMP-over-XXX transport.
""" """
transport_factory = _create_transport_factory(reactor, transport_config, session_factory) transport_factory = _create_transport_factory(reactor, transport, session_factory)
transport_endpoint = _create_transport_endpoint(reactor, transport_config['endpoint']) transport_endpoint = _create_transport_endpoint(reactor, transport.endpoint)
return transport_endpoint.connect(transport_factory) return transport_endpoint.connect(transport_factory)
# XXX think: is it okay to use inlineCallbacks (in this # XXX think: is it okay to use inlineCallbacks (in this
@@ -330,14 +323,16 @@ class Component(component.Component):
) )
yield sleep(delay) yield sleep(delay)
try: try:
transport.connect_attempts += 1
yield self._connect_once(reactor, transport) yield self._connect_once(reactor, transport)
self.log.info('Component completed successfully') transport.connect_sucesses += 1
except Exception as e: except Exception as e:
# need concept of "fatal errors", for which a transport.connect_failures += 1
# connection is *never* going to work. Might want f = txaio.create_failure()
# to also add, for example, things like self.log.error(u'component failed: {error}', error=txaio.failure_message(f))
# SyntaxError self.log.debug(u'{tb}', tb=txaio.failure_format_traceback(f))
# If this is a "fatal error" that will never work,
# we bail out now
if isinstance(e, ApplicationError): if isinstance(e, ApplicationError):
if e.error in [u'wamp.error.no_such_realm']: if e.error in [u'wamp.error.no_such_realm']:
reconnect = False reconnect = False
@@ -428,4 +423,6 @@ def _run(reactor, components):
def run(components): def run(components):
# only for Twisted > 12
from twisted.internet.task import react
react(_run, [components]) react(_run, [components])

View File

@@ -0,0 +1,172 @@
###############################################################################
#
# The MIT License (MIT)
#
# Copyright (c) Tavendo GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
###############################################################################
from __future__ import absolute_import
import os
import unittest
if os.environ.get('USE_TWISTED', False):
from autobahn.twisted.component import Component
class InvalidTransportConfigs(unittest.TestCase):
def test_invalid_key(self):
with self.assertRaises(ValueError) as ctx:
Component(
main=lambda r, s: None,
transports=dict(
foo='bar', # totally invalid key
),
)
self.assertIn("'foo' is not", str(ctx.exception))
def test_invalid_key_transport_list(self):
with self.assertRaises(ValueError) as ctx:
Component(
main=lambda r, s: None,
transports=[
dict(type='websocket', url='ws://127.0.0.1/ws'),
dict(foo='bar'), # totally invalid key
]
)
self.assertIn("'foo' is not a valid configuration item", str(ctx.exception))
def test_invalid_serializer_key(self):
with self.assertRaises(ValueError) as ctx:
Component(
main=lambda r, s: None,
transports=[
{
"url": "ws://127.0.0.1/ws",
"serializer": ["quux"],
}
]
)
self.assertIn("only for rawsocket", str(ctx.exception))
def test_invalid_serializer(self):
with self.assertRaises(ValueError) as ctx:
Component(
main=lambda r, s: None,
transports=[
{
"url": "ws://127.0.0.1/ws",
"serializers": ["quux"],
}
]
)
self.assertIn("Invalid serializer", str(ctx.exception))
def test_invalid_serializer_type_0(self):
with self.assertRaises(ValueError) as ctx:
Component(
main=lambda r, s: None,
transports=[
{
"url": "ws://127.0.0.1/ws",
"serializers": [1, 2],
}
]
)
self.assertIn("must be a list", str(ctx.exception))
def test_invalid_serializer_type_1(self):
with self.assertRaises(ValueError) as ctx:
Component(
main=lambda r, s: None,
transports=[
{
"url": "ws://127.0.0.1/ws",
"serializers": 1,
}
]
)
self.assertIn("must be a list", str(ctx.exception))
def test_invalid_type_key(self):
with self.assertRaises(ValueError) as ctx:
Component(
main=lambda r, s: None,
transports=[
{
"type": "bad",
}
]
)
self.assertIn("Invalid transport type", str(ctx.exception))
def test_invalid_type(self):
with self.assertRaises(ValueError) as ctx:
Component(
main=lambda r, s: None,
transports=[
"foo"
]
)
self.assertIn("must be a dict", str(ctx.exception))
def test_no_url(self):
with self.assertRaises(ValueError) as ctx:
Component(
main=lambda r, s: None,
transports=[
{
"type": "websocket",
}
]
)
self.assertIn("Transport requires 'url'", str(ctx.exception))
def test_endpoint_bogus_object(self):
with self.assertRaises(ValueError) as ctx:
Component(
main=lambda r, s: None,
transports=[
{
"type": "websocket",
"url": "ws://example.com/ws",
"endpoint": ("not", "a", "dict"),
}
]
)
self.assertIn("'endpoint' configuration must be", str(ctx.exception))
def test_endpoint_valid(self):
Component(
main=lambda r, s: None,
transports=[
{
"type": "websocket",
"url": "ws://example.com/ws",
"endpoint": {
"type": "tcp",
"host": "1.2.3.4",
"port": "4321",
}
}
]
)

View File

@@ -42,7 +42,7 @@ from autobahn.wamp.exception import ApplicationError
__all__ = ('Connection') __all__ = ('Connection')
def _normalize_endpoint(endpoint, check_native_endpoint=None): def _validate_endpoint(endpoint, check_native_endpoint=None):
""" """
Check a WAMP connecting endpoint configuration. Check a WAMP connecting endpoint configuration.
""" """
@@ -99,60 +99,130 @@ def _normalize_endpoint(endpoint, check_native_endpoint=None):
assert False, 'should not arrive here' assert False, 'should not arrive here'
def _normalize_transport(transport, check_native_endpoint=None): def _create_transport(index, transport, check_native_endpoint=None):
""" """
Check a WAMP connecting transport configuration, and add any Internal helper to insert defaults and create _Transport instances.
defaults that we can. These are:
- type: websocket :param transport: a (possibly valid) transport configuration
- endpoint: if not specified, fill in from URL :type transport: dict
:returns: a _Transport instance
:raises: ValueError on invalid configuration
""" """
if type(transport) != dict: if type(transport) != dict:
raise RuntimeError('invalid type {} for transport configuration - must be a dict'.format(type(transport))) raise ValueError('invalid type {} for transport configuration - must be a dict'.format(type(transport)))
if 'type' not in transport: valid_transport_keys = ['type', 'url', 'endpoint', 'serializer', 'serializers']
transport['type'] = 'websocket' for k in transport.keys():
if k not in valid_transport_keys:
raise ValueError(
"'{}' is not a valid configuration item".format(k)
)
if transport['type'] not in ['websocket', 'rawsocket']: kind = 'websocket'
raise RuntimeError('invalid transport type {}'.format(transport['type'])) if 'type' in transport:
if transport['type'] not in ['websocket', 'rawsocket']:
raise ValueError('Invalid transport type {}'.format(transport['type']))
kind = transport['type']
if transport['type'] == 'websocket': if kind == 'websocket':
if 'url' not in transport: for key in ['url']:
raise ValueError("Missing 'url' in transport") if key not in transport:
raise ValueError("Transport requires '{}' key".format(key))
# endpoint not required; we will deduce from URL if it's not provided
# XXX not in the branch I rebased; can this go away? (is it redundant??)
if 'endpoint' not in transport: if 'endpoint' not in transport:
is_secure, host, port, resource, path, params = parse_url(transport['url']) is_secure, host, port, resource, path, params = parse_url(transport['url'])
transport['endpoint'] = { endpoint_config = {
'type': 'tcp', 'type': 'tcp',
'host': host, 'host': host,
'port': port, 'port': port,
'tls': False if not is_secure else dict(hostname=host), 'tls': False if not is_secure else dict(hostname=host),
} }
_normalize_endpoint(transport['endpoint'], check_native_endpoint) else:
# XXX can/should we check e.g. serializers here? # note: we're avoiding mutating the incoming "configuration"
# dict, so this should avoid that too...
endpoint_config = transport['endpoint']
_validate_endpoint(endpoint_config, check_native_endpoint)
elif transport['type'] == 'rawsocket': if 'serializer' in transport:
raise ValueError("'serializer' is only for rawsocket; use 'serializers'")
if 'serializers' in transport:
if not isinstance(transport['serializers'], (list, tuple)):
raise ValueError("'serializers' must be a list of strings")
if not all([
isinstance(s, (six.text_type, str))
for s in transport['serializers']]):
raise ValueError("'serializers' must be a list of strings")
valid_serializers = ('msgpack', 'json')
for serial in transport['serializers']:
if serial not in valid_serializers:
raise ValueError(
"Invalid serializer '{}' (expected one of: {})".format(
serial,
', '.join([repr(s) for s in valid_serializers]),
)
)
serializer_config = transport.get('serializers', [u'msgpack', u'json'])
elif kind == 'rawsocket':
if 'endpoint' not in transport: if 'endpoint' not in transport:
raise ValueError("Missing 'endpoint' in transport") raise ValueError("Missing 'endpoint' in transport")
endpoint_config = transport['endpoint']
if 'serializers' in transport:
raise ValueError("'serializers' is only for websocket; use 'serializer'")
# always a list; len == 1 for rawsocket
if 'serializer' in transport:
if not isinstance(transport['serializer'], (six.text_type, str)):
raise ValueError("'serializer' must be a string")
serializer_config = [transport['serializer']]
else:
serializer_config = [u'msgpack']
else: else:
assert False, 'should not arrive here' assert False, 'should not arrive here'
kw = {}
for key in ['max_retries', 'max_retry_delay', 'initial_retry_delay',
'retry_delay_growth', 'retry_delay_jitter']:
if key in transport:
kw[key] = transport[key]
class Transport(object): return _Transport(
index,
kind=kind,
url=transport['url'],
endpoint=endpoint_config,
serializers=serializer_config,
**kw
)
class _Transport(object):
""" """
Thin-wrapper for WAMP transports used by a Connection. Thin-wrapper for WAMP transports used by a Connection.
""" """
def __init__(self, idx, config, max_retries=15, max_retry_delay=300, def __init__(self, idx, kind, url, endpoint, serializers,
initial_retry_delay=1.5, retry_delay_growth=1.5, max_retries=15,
max_retry_delay=300,
initial_retry_delay=1.5,
retry_delay_growth=1.5,
retry_delay_jitter=0.1): retry_delay_jitter=0.1):
""" """
:param config: The transport configuration. Should already be
validated + normalized
:type config: dict
""" """
self.idx = idx self.idx = idx
self.config = config
self.type = kind
self.url = url
self.endpoint = endpoint
self.serializers = serializers
if self.type == 'rawsocket' and len(serializers) != 1:
raise ValueError(
"'rawsocket' transport requires exactly one serializer"
)
self.max_retries = max_retries self.max_retries = max_retries
self.max_retry_delay = max_retry_delay self.max_retry_delay = max_retry_delay
@@ -160,11 +230,16 @@ class Transport(object):
self.retry_delay_growth = retry_delay_growth self.retry_delay_growth = retry_delay_growth
self.retry_delay_jitter = retry_delay_jitter self.retry_delay_jitter = retry_delay_jitter
# used via can_reconnect() and failed() to record this
# transport is never going to work
self._permanent_failure = False self._permanent_failure = False
self.reset() self.reset()
def reset(self): def reset(self):
"""
set connection failure rates and retry-delay to initial values
"""
self.connect_attempts = 0 self.connect_attempts = 0
self.connect_sucesses = 0 self.connect_sucesses = 0
self.connect_failures = 0 self.connect_failures = 0
@@ -173,8 +248,8 @@ class Transport(object):
def failed(self): def failed(self):
""" """
Mark this transport as failed, meaning we won't try to connect to Mark this transport as failed, meaning we won't try to connect to
it any longer (can_reconnect() will always return False afer it any longer (that is: can_reconnect() will always return
calling this). False afer calling this).
""" """
self._permanent_failure = True self._permanent_failure = True
@@ -196,6 +271,18 @@ class Transport(object):
self.retry_delay = self.max_retry_delay self.retry_delay = self.max_retry_delay
return self.retry_delay return self.retry_delay
def describe_endpoint(self):
"""
returns a human-readable description of the endpoint
"""
if isinstance(self.endpoint, dict):
return self.endpoint['type']
return repr(self.endpoint)
# this could probably implement twisted.application.service.IService
# if we wanted; or via an adapter...which just adds a startService()
# and stopService() [latter can be async]
class Component(ObservableMixin): class Component(ObservableMixin):
""" """
@@ -225,15 +312,31 @@ class Component(ObservableMixin):
which signals that the component is now "ready". The component will continue to run until which signals that the component is now "ready". The component will continue to run until
it explicitly closes the session or the underlying transport closes. it explicitly closes the session or the underlying transport closes.
:type setup: callable :type setup: callable
:param transports: Transport configurations for creating transports.
:type transports: None or unicode or list
:param config: Session configuration. :param transports: Transport configurations for creating
transports. Each transport can be a WAMP URL, or a dict
containing the following configuration keys:
- ``type`` (optional): ``websocket`` (default) or ``rawsocket``
- ``url``: the WAMP URL
- ``endpoint`` (optional, derived from URL if not provided):
- ``type``: "tcp" or "unix"
- ``host``, ``port``: only for TCP
- ``path``: only for unix
- ``timeout``: in seconds
- ``tls``: ``True`` or (under Twisted) an
``twisted.internet.ssl.IOpenSSLClientComponentCreator``
instance (such as returned from
``twisted.internet.ssl.optionsForClientTLS``) or
``CertificateOptions`` instance.
:type transports: None or unicode or list of dicts
:param config: Session configuration (currently unused?)
:type config: None or dict :type config: None or dict
:param realm: the realm to join :param realm: the realm to join
:type realm: unicode :type realm: unicode
""" """
self.set_valid_events( self.set_valid_events(
[ [
@@ -275,13 +378,20 @@ class Component(ObservableMixin):
} }
transports = [transport] transports = [transport]
# allows a single transport instead of a list (convenience)
elif isinstance(transports, dict):
transports = [transports]
# XXX do we want to be able to provide an infinite iterable of
# transports here? e.g. a generator that makes new transport
# to try?
# now check and save list of transports # now check and save list of transports
self._transports = [] self._transports = []
idx = 0 for idx, transport in enumerate(transports):
for transport in transports: self._transports.append(
_normalize_transport(transport, self._check_native_endpoint) _create_transport(idx, transport, self._check_native_endpoint)
self._transports.append(Transport(idx, transport)) )
idx += 1
self._realm = realm self._realm = realm
self._extra = None # XXX FIXME self._extra = None # XXX FIXME
@@ -298,9 +408,10 @@ class Component(ObservableMixin):
def _connect_once(self, reactor, transport): def _connect_once(self, reactor, transport):
self.log.info( self.log.info(
'connecting to URL "{url}" with "{transport_type}" transport', 'connecting once using transport type "{transport_type}" '
transport_type=transport.config['type'], 'over endpoint "{endpoint_desc}"',
url=transport.config['url'], transport_type=transport.type,
endpoint_desc=transport.describe_endpoint(),
) )
done = txaio.create_future() done = txaio.create_future()
@@ -336,7 +447,7 @@ class Component(ObservableMixin):
self.log.info( self.log.info(
'Successfully connected to transport #{transport_idx}: url={url}', 'Successfully connected to transport #{transport_idx}: url={url}',
transport_idx=transport.idx, transport_idx=transport.idx,
url=transport.config['url'], url=transport.url,
) )
d = txaio.as_future(self._entry, reactor, session) d = txaio.as_future(self._entry, reactor, session)
@@ -360,7 +471,7 @@ class Component(ObservableMixin):
self.log.info( self.log.info(
'Successfully connected to transport #{transport_idx}: url={url}', 'Successfully connected to transport #{transport_idx}: url={url}',
transport_idx=transport.idx, transport_idx=transport.idx,
url=transport.config['url'], url=transport.url,
) )
d = txaio.as_future(self._entry, reactor, session) d = txaio.as_future(self._entry, reactor, session)
@@ -407,7 +518,7 @@ class Component(ObservableMixin):
return session return session
transport.connect_attempts += 1 transport.connect_attempts += 1
d = self._connect_transport(reactor, transport.config, create_session) d = self._connect_transport(reactor, transport, create_session)
def on_connect_sucess(proto): def on_connect_sucess(proto):
# if e.g. an SSL handshake fails, we will have # if e.g. an SSL handshake fails, we will have