channel id; totp; various

This commit is contained in:
Tobias Oberstein 2016-01-22 17:59:59 +01:00
parent 79543da789
commit 7ac2ca9692
5 changed files with 103 additions and 73 deletions

View File

@ -27,13 +27,12 @@
from __future__ import absolute_import
import binascii
import hashlib
from twisted.internet.protocol import Factory
from twisted.protocols.basic import Int32StringReceiver
from twisted.internet.error import ConnectionDone
from autobahn.twisted.util import peer2str
from autobahn.twisted.util import peer2str, transport_channel_id
from autobahn.wamp.exception import ProtocolError, SerializationError, TransportLost
import txaio
@ -247,22 +246,11 @@ class WampRawSocketServerProtocol(WampRawSocketProtocol):
if data:
self.dataReceived(data)
def get_channel_id(self):
def get_channel_id(self, channel_id_type=u'tls-unique'):
"""
Implements :func:`autobahn.wamp.interfaces.ITransport.get_channel_id`
"""
if hasattr(self.transport, '_tlsConnection'):
# Obtain latest TLS Finished message that we expected from peer, or None if handshake is not completed.
# http://www.pyopenssl.org/en/stable/api/ssl.html#OpenSSL.SSL.Connection.get_peer_finished
# for routers (=servers), the channel ID is based on the TLS Finished message we
# expected to receive from the client
tls_finished_msg = self.transport._tlsConnection.get_peer_finished()
m = hashlib.sha256()
m.update(tls_finished_msg)
return m.digest()
else:
return None
return transport_channel_id(self.transport, is_server=True, channel_id_type=channel_id_type)
class WampRawSocketClientProtocol(WampRawSocketProtocol):
@ -331,22 +319,11 @@ class WampRawSocketClientProtocol(WampRawSocketProtocol):
if data:
self.dataReceived(data)
def get_channel_id(self):
def get_channel_id(self, channel_id_type=u'tls-unique'):
"""
Implements :func:`autobahn.wamp.interfaces.ITransport.get_channel_id`
"""
if hasattr(self.transport, '_tlsConnection'):
# Obtain latest TLS Finished message that we sent, or None if handshake is not completed.
# http://www.pyopenssl.org/en/stable/api/ssl.html#OpenSSL.SSL.Connection.get_finished
# for clients, the channel ID is based on the TLS Finished message we sent
# to the router (=server)
tls_finished_msg = self.transport._tlsConnection.get_finished()
m = hashlib.sha256()
m.update(tls_finished_msg)
return m.digest()
else:
return None
return transport_channel_id(self.transport, is_server=False, channel_id_type=channel_id_type)
class WampRawSocketFactory(Factory):

View File

@ -26,6 +26,8 @@
from __future__ import absolute_import
import hashlib
from twisted.internet.defer import Deferred
from twisted.internet.address import IPv4Address, UNIXAddress
try:
@ -42,7 +44,8 @@ except ImportError:
__all = (
'sleep',
'peer2str'
'peer2str',
'transport_channel_id'
)
@ -86,3 +89,42 @@ def peer2str(addr):
res = u"?:{0}".format(addr)
return res
def transport_channel_id(transport, is_server, channel_id_type):
"""
Application-layer user authentication protocols are vulnerable to generic
credential forwarding attacks, where an authentication credential sent by
a client C to a server M may then be used by M to impersonate C at another
server S. To prevent such credential forwarding attacks, modern authentication
protocols rely on channel bindings. For example, WAMP-cryptosign can use
the tls-unique channel identifier provided by the TLS layer to strongly bind
authentication credentials to the underlying channel, so that a credential
received on one TLS channel cannot be forwarded on another.
"""
if channel_id_type is None:
return None
if channel_id_type not in [u'tls-unique']:
raise Exception("invalid channel ID type {}".format(channel_id_type))
if hasattr(transport, '_tlsConnection'):
# Obtain latest TLS Finished message that we expected from peer, or None if handshake is not completed.
# http://www.pyopenssl.org/en/stable/api/ssl.html#OpenSSL.SSL.Connection.get_peer_finished
if is_server:
# for routers (=servers), the channel ID is based on the TLS Finished message we
# expected to receive from the client
tls_finished_msg = transport._tlsConnection.get_peer_finished()
else:
# for clients, the channel ID is based on the TLS Finished message we sent
# to the router (=server)
tls_finished_msg = transport._tlsConnection.get_finished()
m = hashlib.sha256()
m.update(tls_finished_msg)
return m.digest()
else:
return None

View File

@ -26,7 +26,6 @@
from __future__ import absolute_import
import hashlib
from base64 import b64encode, b64decode
from zope.interface import implementer
@ -44,7 +43,7 @@ from autobahn.wamp import websocket
from autobahn.websocket.types import ConnectionRequest, ConnectionResponse, \
ConnectionDeny
from autobahn.websocket import protocol
from autobahn.twisted.util import peer2str
from autobahn.twisted.util import peer2str, transport_channel_id
from autobahn.websocket.compress import PerMessageDeflateOffer, \
PerMessageDeflateOfferAccept, \
@ -207,22 +206,11 @@ class WebSocketServerProtocol(WebSocketAdapterProtocol, protocol.WebSocketServer
res.addErrback(forwardError)
def get_channel_id(self):
def get_channel_id(self, channel_id_type=u'tls-unique'):
"""
Implements :func:`autobahn.wamp.interfaces.ITransport.get_channel_id`
"""
if hasattr(self.transport, '_tlsConnection'):
# Obtain latest TLS Finished message that we expected from peer, or None if handshake is not completed.
# http://www.pyopenssl.org/en/stable/api/ssl.html#OpenSSL.SSL.Connection.get_peer_finished
# for routers (=servers), the channel ID is based on the TLS Finished message we
# expected to receive from the client
tls_finished_msg = self.transport._tlsConnection.get_peer_finished()
m = hashlib.sha256()
m.update(tls_finished_msg)
return m.digest()
else:
return None
return transport_channel_id(self.transport, is_server=True, channel_id_type=channel_id_type)
class WebSocketClientProtocol(WebSocketAdapterProtocol, protocol.WebSocketClientProtocol):
@ -237,22 +225,11 @@ class WebSocketClientProtocol(WebSocketAdapterProtocol, protocol.WebSocketClient
self.log.debug("Starting TLS upgrade")
self.transport.startTLS(self.factory.contextFactory)
def get_channel_id(self):
def get_channel_id(self, channel_id_type=u'tls-unique'):
"""
Implements :func:`autobahn.wamp.interfaces.ITransport.get_channel_id`
"""
if hasattr(self.transport, '_tlsConnection'):
# Obtain latest TLS Finished message that we sent, or None if handshake is not completed.
# http://www.pyopenssl.org/en/stable/api/ssl.html#OpenSSL.SSL.Connection.get_finished
# for clients, the channel ID is based on the TLS Finished message we sent
# to the router (=server)
tls_finished_msg = self.transport._tlsConnection.get_finished()
m = hashlib.sha256()
m.update(tls_finished_msg)
return m.digest()
else:
return None
return transport_channel_id(self.transport, is_server=False, channel_id_type=channel_id_type)
class WebSocketAdapterFactory(object):

View File

@ -59,10 +59,10 @@ def generate_totp_secret(length=10):
:returns: The generated secret in Base32 (letters ``A-Z`` and digits ``2-7``).
The length of the generated secret is ``length * 8 / 5`` octets.
:rtype: bytes
:rtype: unicode
"""
assert(type(length) in six.integer_types)
return base64.b32encode(os.urandom(length))
return base64.b32encode(os.urandom(length)).decode('ascii')
def compute_totp(secret, offset=0):
@ -70,14 +70,14 @@ def compute_totp(secret, offset=0):
Computes the current TOTP code.
:param secret: Base32 encoded secret.
:type secret: bytes
:type secret: unicode
:param offset: Time offset for which to compute TOTP.
:type offset: int
:returns: TOTP for current time (+/- offset).
:rtype: bytes
:rtype: unicode
"""
assert(type(secret) == bytes)
assert(type(secret) == six.text_type)
assert(type(offset) in six.integer_types)
try:
key = base64.b32decode(secret)
@ -88,15 +88,43 @@ def compute_totp(secret, offset=0):
digest = hmac.new(key, msg, hashlib.sha1).digest()
o = 15 & (digest[19] if six.PY3 else ord(digest[19]))
token = (struct.unpack('>I', digest[o:o + 4])[0] & 0x7fffffff) % 1000000
return '{0:06d}'.format(token).encode('ascii')
return u'{0:06d}'.format(token)
##
def check_totp(secret, ticket):
"""
The Internet can be slow, and clocks might not match exactly, so some leniency is allowed. RFC6238 recommends looking an extra time step in either direction, which essentially opens the window from 30 seconds to 90 seconds.
"""
pass
def qrcode_from_totp(secret, label, issuer):
if type(secret) != six.text_type:
raise Exception('secret must be of type unicode, not {}'.format(type(secret)))
if type(label) != six.text_type:
raise Exception('label must be of type unicode, not {}'.format(type(label)))
try:
import pyqrcode
except ImportError:
raise Exception('pyqrcode not installed')
import io
buffer = io.BytesIO()
data = pyqrcode.create(u'otpauth://totp/{}?secret={}&issuer={}'.format(label, secret, issuer))
data.svg(buffer, omithw=True)
return buffer.getvalue()
#
# The following code is adapted from the pbkdf2_bin() function
# in here https://github.com/mitsuhiko/python-pbkdf2
# Copyright 2011 by Armin Ronacher. Licensed under BSD license.
# https://github.com/mitsuhiko/python-pbkdf2/blob/master/LICENSE
##
#
_pack_int = Struct('>I').pack
if six.PY3:
@ -242,3 +270,8 @@ def compute_wcs(key, challenge):
challenge = challenge.encode('utf8')
sig = hmac.new(key, challenge, hashlib.sha256).digest()
return binascii.b2a_base64(sig).strip()
if __name__ == '__main__':
with open('test.svg', 'w') as f:
f.write(qrcode_from_totp(u'CACKN3GRF3KQZMEK', u'tobias1', u'Tavendo'))

View File

@ -29,6 +29,7 @@ from __future__ import absolute_import
import unittest2 as unittest
import platform
import six
import re
import json
import binascii
@ -78,28 +79,28 @@ class TestWampAuthHelpers(unittest.TestCase):
def test_generate_totp_secret_default(self):
secret = auth.generate_totp_secret()
self.assertEqual(type(secret), bytes)
self.assertEqual(type(secret), six.text_type)
self.assertEqual(len(secret), 10 * 8 / 5)
def test_generate_totp_secret_length(self):
for length in [5, 10, 20, 30, 40, 50]:
secret = auth.generate_totp_secret(length)
self.assertEqual(type(secret), bytes)
self.assertEqual(type(secret), six.text_type)
self.assertEqual(len(secret), length * 8 / 5)
def test_compute_totp(self):
pat = re.compile(b"\d{6}")
secret = b"MFRGGZDFMZTWQ2LK"
pat = re.compile(u"\d{6}")
secret = u"MFRGGZDFMZTWQ2LK"
signature = auth.compute_totp(secret)
self.assertEqual(type(signature), bytes)
self.assertEqual(type(signature), six.text_type)
self.assertTrue(pat.match(signature) is not None)
def test_compute_totp_offset(self):
pat = re.compile(b"\d{6}")
secret = b"MFRGGZDFMZTWQ2LK"
pat = re.compile(u"\d{6}")
secret = u"MFRGGZDFMZTWQ2LK"
for offset in range(-10, 10):
signature = auth.compute_totp(secret, offset)
self.assertEqual(type(signature), bytes)
self.assertEqual(type(signature), six.text_type)
self.assertTrue(pat.match(signature) is not None)
def test_derive_key(self):