refactoring.,,
This commit is contained in:
parent
d3f4b33bcb
commit
bf0baf26ff
websocket
@ -220,3 +220,68 @@ class ABNF(object):
|
||||
return _d.tobytes()
|
||||
else:
|
||||
return _d.tostring()
|
||||
|
||||
|
||||
class FrameBuffer(object):
|
||||
_HEADER_MASK_INDEX = 5
|
||||
_HEADER_LENGHT_INDEX = 6
|
||||
|
||||
def __init__(self):
|
||||
self.clear()
|
||||
|
||||
def clear(self):
|
||||
self.header = None
|
||||
self.length = None
|
||||
self.mask = None
|
||||
|
||||
def has_received_header(self):
|
||||
return self.header is None
|
||||
|
||||
def recv_header(self, recv_fn):
|
||||
header = recv_fn(2)
|
||||
b1 = header[0]
|
||||
|
||||
if six.PY2:
|
||||
b1 = ord(b1)
|
||||
|
||||
fin = b1 >> 7 & 1
|
||||
rsv1 = b1 >> 6 & 1
|
||||
rsv2 = b1 >> 5 & 1
|
||||
rsv3 = b1 >> 4 & 1
|
||||
opcode = b1 & 0xf
|
||||
b2 = header[1]
|
||||
|
||||
if six.PY2:
|
||||
b2 = ord(b2)
|
||||
|
||||
has_mask = b2 >> 7 & 1
|
||||
length_bits = b2 & 0x7f
|
||||
|
||||
self.header = (fin, rsv1, rsv2, rsv3, opcode, has_mask, length_bits)
|
||||
|
||||
def has_mask(self):
|
||||
if not self.header:
|
||||
return False
|
||||
return self.header[FrameBuffer._HEADER_MASK_INDEX]
|
||||
|
||||
|
||||
def has_received_length(self):
|
||||
return self.length is None
|
||||
|
||||
def recv_length(self, recv_fn):
|
||||
bits = self.header[FrameBuffer._HEADER_LENGHT_INDEX]
|
||||
length_bits = bits & 0x7f
|
||||
if length_bits == 0x7e:
|
||||
v = recv_fn(2)
|
||||
self.length = struct.unpack("!H", v)[0]
|
||||
elif length_bits == 0x7f:
|
||||
v = recv_fn(8)
|
||||
self.length = struct.unpack("!Q", v)[0]
|
||||
else:
|
||||
self.length = length_bits
|
||||
|
||||
def has_received_mask(self):
|
||||
return self.mask is None
|
||||
|
||||
def recv_mask(self, recv_fn):
|
||||
self.mask = recv_fn(4) if self.has_mask() else ""
|
@ -41,7 +41,7 @@ except ImportError:
|
||||
|
||||
HAVE_SSL = False
|
||||
|
||||
from six.moves.urllib.parse import urlparse
|
||||
|
||||
if six.PY3:
|
||||
from base64 import encodebytes as base64encode
|
||||
else:
|
||||
@ -58,7 +58,8 @@ import logging
|
||||
# websocket modules
|
||||
from ._exceptions import *
|
||||
from ._abnf import *
|
||||
from ._utils import NoLock, validate_utf8
|
||||
from ._utils import *
|
||||
from ._url import *
|
||||
|
||||
"""
|
||||
websocket python client.
|
||||
@ -104,12 +105,14 @@ def enableTrace(tracable):
|
||||
logger.addHandler(logging.StreamHandler())
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
def _dump(title, message):
|
||||
if traceEnabled:
|
||||
logger.debug("--- " + title + " ---")
|
||||
logger.debug(message)
|
||||
logger.debug("-----------------------")
|
||||
|
||||
|
||||
def setdefaulttimeout(timeout):
|
||||
"""
|
||||
Set the global timeout setting to connect.
|
||||
@ -127,105 +130,6 @@ def getdefaulttimeout():
|
||||
return default_timeout
|
||||
|
||||
|
||||
def _parse_url(url):
|
||||
"""
|
||||
parse url and the result is tuple of
|
||||
(hostname, port, resource path and the flag of secure mode)
|
||||
|
||||
url: url string.
|
||||
"""
|
||||
if ":" not in url:
|
||||
raise ValueError("url is invalid")
|
||||
|
||||
scheme, url = url.split(":", 1)
|
||||
|
||||
parsed = urlparse(url, scheme="ws")
|
||||
if parsed.hostname:
|
||||
hostname = parsed.hostname
|
||||
else:
|
||||
raise ValueError("hostname is invalid")
|
||||
port = 0
|
||||
if parsed.port:
|
||||
port = parsed.port
|
||||
|
||||
is_secure = False
|
||||
if scheme == "ws":
|
||||
if not port:
|
||||
port = 80
|
||||
elif scheme == "wss":
|
||||
is_secure = True
|
||||
if not port:
|
||||
port = 443
|
||||
else:
|
||||
raise ValueError("scheme %s is invalid" % scheme)
|
||||
|
||||
if parsed.path:
|
||||
resource = parsed.path
|
||||
else:
|
||||
resource = "/"
|
||||
|
||||
if parsed.query:
|
||||
resource += "?" + parsed.query
|
||||
|
||||
return (hostname, port, resource, is_secure)
|
||||
|
||||
|
||||
DEFAULT_NO_PROXY_HOST = ["localhost", "127.0.0.1"]
|
||||
|
||||
def _is_no_proxy_host(hostname, no_proxy):
|
||||
if not no_proxy:
|
||||
v = os.environ.get("no_proxy", "").replace(" ", "")
|
||||
no_proxy = v.split(",")
|
||||
if not no_proxy:
|
||||
no_proxy = DEFAULT_NO_PROXY_HOST
|
||||
|
||||
return hostname in no_proxy
|
||||
|
||||
def _get_proxy_info(hostname, is_secure, **options):
|
||||
"""
|
||||
try to retrieve proxy host and port from environment if not provided in options.
|
||||
result is (proxy_host, proxy_port, proxy_auth).
|
||||
proxy_auth is tuple of username and password of proxy authentication information.
|
||||
|
||||
hostname: websocket server name.
|
||||
|
||||
is_secure: is the connection secure? (wss)
|
||||
looks for "https_proxy" in env before falling back to "http_proxy"
|
||||
|
||||
options: "http_proxy_host" - http proxy host name.
|
||||
"http_proxy_port" - http proxy port.
|
||||
"http_no_proxy" - host names, which doesn't use proxy.
|
||||
"http_proxy_auth" - http proxy auth infomation. tuple of username and password.
|
||||
defualt is None
|
||||
"""
|
||||
if _is_no_proxy_host(hostname, options.get("http_no_proxy", None)):
|
||||
return None, 0, None
|
||||
|
||||
http_proxy_host = options.get("http_proxy_host", None)
|
||||
if http_proxy_host:
|
||||
return http_proxy_host, options.get("http_proxy_port", 0), options.get("http_proxy_auth", None)
|
||||
|
||||
env_keys = ["http_proxy"]
|
||||
if is_secure:
|
||||
env_keys.insert(0, "https_proxy")
|
||||
|
||||
for key in env_keys:
|
||||
value = os.environ.get(key, None)
|
||||
if value:
|
||||
proxy = urlparse(value)
|
||||
auth = (proxy.username, proxy.password) if proxy.username else None
|
||||
return proxy.hostname, proxy.port, auth
|
||||
|
||||
return None, 0, None
|
||||
|
||||
def _extract_err_message(exception):
|
||||
message = getattr(exception, 'strerror', '')
|
||||
if not message:
|
||||
message = getattr(exception, 'message', '')
|
||||
|
||||
return message
|
||||
|
||||
|
||||
def create_connection(url, timeout=None, **options):
|
||||
"""
|
||||
connect to url and return websocket object.
|
||||
@ -270,13 +174,6 @@ def create_connection(url, timeout=None, **options):
|
||||
websock.connect(url, **options)
|
||||
return websock
|
||||
|
||||
_MAX_INTEGER = (1 << 32) -1
|
||||
_AVAILABLE_KEY_CHARS = list(range(0x21, 0x2f + 1)) + list(range(0x3a, 0x7e + 1))
|
||||
_MAX_CHAR_BYTE = (1<<8) -1
|
||||
|
||||
# ref. Websocket gets an update, and it breaks stuff.
|
||||
# http://axod.blogspot.com/2010/06/websocket-gets-update-and-it-breaks.html
|
||||
|
||||
|
||||
def _create_sec_websocket_key():
|
||||
uid = uuid.uuid4()
|
||||
@ -289,71 +186,6 @@ _HEADERS_TO_CHECK = {
|
||||
}
|
||||
|
||||
|
||||
class _FrameBuffer(object):
|
||||
_HEADER_MASK_INDEX = 5
|
||||
_HEADER_LENGHT_INDEX = 6
|
||||
|
||||
def __init__(self):
|
||||
self.clear()
|
||||
|
||||
def clear(self):
|
||||
self.header = None
|
||||
self.length = None
|
||||
self.mask = None
|
||||
|
||||
def has_received_header(self):
|
||||
return self.header is None
|
||||
|
||||
def recv_header(self, recv_fn):
|
||||
header = recv_fn(2)
|
||||
b1 = header[0]
|
||||
|
||||
if six.PY2:
|
||||
b1 = ord(b1)
|
||||
|
||||
fin = b1 >> 7 & 1
|
||||
rsv1 = b1 >> 6 & 1
|
||||
rsv2 = b1 >> 5 & 1
|
||||
rsv3 = b1 >> 4 & 1
|
||||
opcode = b1 & 0xf
|
||||
b2 = header[1]
|
||||
|
||||
if six.PY2:
|
||||
b2 = ord(b2)
|
||||
|
||||
has_mask = b2 >> 7 & 1
|
||||
length_bits = b2 & 0x7f
|
||||
|
||||
self.header = (fin, rsv1, rsv2, rsv3, opcode, has_mask, length_bits)
|
||||
|
||||
def has_mask(self):
|
||||
if not self.header:
|
||||
return False
|
||||
return self.header[_FrameBuffer._HEADER_MASK_INDEX]
|
||||
|
||||
|
||||
def has_received_length(self):
|
||||
return self.length is None
|
||||
|
||||
def recv_length(self, recv_fn):
|
||||
bits = self.header[_FrameBuffer._HEADER_LENGHT_INDEX]
|
||||
length_bits = bits & 0x7f
|
||||
if length_bits == 0x7e:
|
||||
v = recv_fn(2)
|
||||
self.length = struct.unpack("!H", v)[0]
|
||||
elif length_bits == 0x7f:
|
||||
v = recv_fn(8)
|
||||
self.length = struct.unpack("!Q", v)[0]
|
||||
else:
|
||||
self.length = length_bits
|
||||
|
||||
def has_received_mask(self):
|
||||
return self.mask is None
|
||||
|
||||
def recv_mask(self, recv_fn):
|
||||
self.mask = recv_fn(4) if self.has_mask() else ""
|
||||
|
||||
|
||||
class WebSocket(object):
|
||||
"""
|
||||
Low level WebSocket interface.
|
||||
@ -403,7 +235,7 @@ class WebSocket(object):
|
||||
# bytes of bytes are received.
|
||||
self._recv_buffer = []
|
||||
# These buffer over the build-up of a single frame.
|
||||
self._frame_buffer = _FrameBuffer()
|
||||
self._frame_buffer = FrameBuffer()
|
||||
self._cont_data = None
|
||||
self._recving_frames = None
|
||||
if enable_multithread:
|
||||
@ -472,8 +304,8 @@ class WebSocket(object):
|
||||
|
||||
"""
|
||||
|
||||
hostname, port, resource, is_secure = _parse_url(url)
|
||||
proxy_host, proxy_port, proxy_auth = _get_proxy_info(hostname, is_secure, **options)
|
||||
hostname, port, resource, is_secure = parse_url(url)
|
||||
proxy_host, proxy_port, proxy_auth = get_proxy_info(hostname, is_secure, **options)
|
||||
if not proxy_host:
|
||||
addrinfo_list = socket.getaddrinfo(hostname, port, 0, 0, socket.SOL_TCP)
|
||||
else:
|
||||
@ -922,10 +754,10 @@ class WebSocket(object):
|
||||
try:
|
||||
return self.sock.send(data)
|
||||
except socket.timeout as e:
|
||||
message = _extract_err_message(e)
|
||||
message = extract_err_message(e)
|
||||
raise WebSocketTimeoutException(message)
|
||||
except Exception as e:
|
||||
message = _extract_err_message(e)
|
||||
message = extract_err_message(e)
|
||||
if message and "timed out" in message:
|
||||
raise WebSocketTimeoutException(message)
|
||||
else:
|
||||
@ -938,10 +770,10 @@ class WebSocket(object):
|
||||
try:
|
||||
bytes = self.sock.recv(bufsize)
|
||||
except socket.timeout as e:
|
||||
message = _extract_err_message(e)
|
||||
message = extract_err_message(e)
|
||||
raise WebSocketTimeoutException(message)
|
||||
except SSLError as e:
|
||||
message = _extract_err_message(e)
|
||||
message = extract_err_message(e)
|
||||
if message == "The read operation timed out":
|
||||
raise WebSocketTimeoutException(message)
|
||||
else:
|
||||
|
118
websocket/_url.py
Normal file
118
websocket/_url.py
Normal file
@ -0,0 +1,118 @@
|
||||
"""
|
||||
websocket - WebSocket client library for Python
|
||||
|
||||
Copyright (C) 2010 Hiroki Ohtani(liris)
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU Lesser General Public
|
||||
License as published by the Free Software Foundation; either
|
||||
version 2.1 of the License, or (at your option) any later version.
|
||||
|
||||
This library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public
|
||||
License along with this library; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor,
|
||||
Boston, MA 02110-1335 USA
|
||||
|
||||
"""
|
||||
|
||||
from six.moves.urllib.parse import urlparse
|
||||
import os
|
||||
|
||||
def parse_url(url):
|
||||
"""
|
||||
parse url and the result is tuple of
|
||||
(hostname, port, resource path and the flag of secure mode)
|
||||
|
||||
url: url string.
|
||||
"""
|
||||
if ":" not in url:
|
||||
raise ValueError("url is invalid")
|
||||
|
||||
scheme, url = url.split(":", 1)
|
||||
|
||||
parsed = urlparse(url, scheme="ws")
|
||||
if parsed.hostname:
|
||||
hostname = parsed.hostname
|
||||
else:
|
||||
raise ValueError("hostname is invalid")
|
||||
port = 0
|
||||
if parsed.port:
|
||||
port = parsed.port
|
||||
|
||||
is_secure = False
|
||||
if scheme == "ws":
|
||||
if not port:
|
||||
port = 80
|
||||
elif scheme == "wss":
|
||||
is_secure = True
|
||||
if not port:
|
||||
port = 443
|
||||
else:
|
||||
raise ValueError("scheme %s is invalid" % scheme)
|
||||
|
||||
if parsed.path:
|
||||
resource = parsed.path
|
||||
else:
|
||||
resource = "/"
|
||||
|
||||
if parsed.query:
|
||||
resource += "?" + parsed.query
|
||||
|
||||
return (hostname, port, resource, is_secure)
|
||||
|
||||
|
||||
DEFAULT_NO_PROXY_HOST = ["localhost", "127.0.0.1"]
|
||||
|
||||
def _is_no_proxy_host(hostname, no_proxy):
|
||||
if not no_proxy:
|
||||
v = os.environ.get("no_proxy", "").replace(" ", "")
|
||||
no_proxy = v.split(",")
|
||||
if not no_proxy:
|
||||
no_proxy = DEFAULT_NO_PROXY_HOST
|
||||
|
||||
return hostname in no_proxy
|
||||
|
||||
def get_proxy_info(hostname, is_secure, **options):
|
||||
"""
|
||||
try to retrieve proxy host and port from environment if not provided in options.
|
||||
result is (proxy_host, proxy_port, proxy_auth).
|
||||
proxy_auth is tuple of username and password of proxy authentication information.
|
||||
|
||||
hostname: websocket server name.
|
||||
|
||||
is_secure: is the connection secure? (wss)
|
||||
looks for "https_proxy" in env before falling back to "http_proxy"
|
||||
|
||||
options: "http_proxy_host" - http proxy host name.
|
||||
"http_proxy_port" - http proxy port.
|
||||
"http_no_proxy" - host names, which doesn't use proxy.
|
||||
"http_proxy_auth" - http proxy auth infomation. tuple of username and password.
|
||||
defualt is None
|
||||
"""
|
||||
if _is_no_proxy_host(hostname, options.get("http_no_proxy", None)):
|
||||
return None, 0, None
|
||||
|
||||
http_proxy_host = options.get("http_proxy_host", None)
|
||||
if http_proxy_host:
|
||||
return http_proxy_host, options.get("http_proxy_port", 0), options.get("http_proxy_auth", None)
|
||||
|
||||
env_keys = ["http_proxy"]
|
||||
if is_secure:
|
||||
env_keys.insert(0, "https_proxy")
|
||||
|
||||
for key in env_keys:
|
||||
value = os.environ.get(key, None)
|
||||
if value:
|
||||
proxy = urlparse(value)
|
||||
auth = (proxy.username, proxy.password) if proxy.username else None
|
||||
return proxy.hostname, proxy.port, auth
|
||||
|
||||
return None, 0, None
|
||||
|
||||
|
||||
|
@ -82,4 +82,9 @@ def validate_utf8(utfbytes):
|
||||
return True
|
||||
|
||||
|
||||
def extract_err_message(exception):
|
||||
message = getattr(exception, 'strerror', '')
|
||||
if not message:
|
||||
message = getattr(exception, 'message', '')
|
||||
|
||||
return message
|
||||
|
@ -25,8 +25,8 @@ import uuid
|
||||
|
||||
# websocket-client
|
||||
import websocket as ws
|
||||
from websocket._core import _parse_url, _create_sec_websocket_key
|
||||
from websocket._core import _get_proxy_info
|
||||
from websocket._core import _create_sec_websocket_key
|
||||
from websocket._url import parse_url, get_proxy_info
|
||||
from websocket._utils import validate_utf8
|
||||
|
||||
|
||||
@ -89,84 +89,84 @@ class WebSocketTest(unittest.TestCase):
|
||||
ws.setdefaulttimeout(None)
|
||||
|
||||
def testParseUrl(self):
|
||||
p = _parse_url("ws://www.example.com/r")
|
||||
p = parse_url("ws://www.example.com/r")
|
||||
self.assertEqual(p[0], "www.example.com")
|
||||
self.assertEqual(p[1], 80)
|
||||
self.assertEqual(p[2], "/r")
|
||||
self.assertEqual(p[3], False)
|
||||
|
||||
p = _parse_url("ws://www.example.com/r/")
|
||||
p = parse_url("ws://www.example.com/r/")
|
||||
self.assertEqual(p[0], "www.example.com")
|
||||
self.assertEqual(p[1], 80)
|
||||
self.assertEqual(p[2], "/r/")
|
||||
self.assertEqual(p[3], False)
|
||||
|
||||
p = _parse_url("ws://www.example.com/")
|
||||
p = parse_url("ws://www.example.com/")
|
||||
self.assertEqual(p[0], "www.example.com")
|
||||
self.assertEqual(p[1], 80)
|
||||
self.assertEqual(p[2], "/")
|
||||
self.assertEqual(p[3], False)
|
||||
|
||||
p = _parse_url("ws://www.example.com")
|
||||
p = parse_url("ws://www.example.com")
|
||||
self.assertEqual(p[0], "www.example.com")
|
||||
self.assertEqual(p[1], 80)
|
||||
self.assertEqual(p[2], "/")
|
||||
self.assertEqual(p[3], False)
|
||||
|
||||
p = _parse_url("ws://www.example.com:8080/r")
|
||||
p = parse_url("ws://www.example.com:8080/r")
|
||||
self.assertEqual(p[0], "www.example.com")
|
||||
self.assertEqual(p[1], 8080)
|
||||
self.assertEqual(p[2], "/r")
|
||||
self.assertEqual(p[3], False)
|
||||
|
||||
p = _parse_url("ws://www.example.com:8080/")
|
||||
p = parse_url("ws://www.example.com:8080/")
|
||||
self.assertEqual(p[0], "www.example.com")
|
||||
self.assertEqual(p[1], 8080)
|
||||
self.assertEqual(p[2], "/")
|
||||
self.assertEqual(p[3], False)
|
||||
|
||||
p = _parse_url("ws://www.example.com:8080")
|
||||
p = parse_url("ws://www.example.com:8080")
|
||||
self.assertEqual(p[0], "www.example.com")
|
||||
self.assertEqual(p[1], 8080)
|
||||
self.assertEqual(p[2], "/")
|
||||
self.assertEqual(p[3], False)
|
||||
|
||||
p = _parse_url("wss://www.example.com:8080/r")
|
||||
p = parse_url("wss://www.example.com:8080/r")
|
||||
self.assertEqual(p[0], "www.example.com")
|
||||
self.assertEqual(p[1], 8080)
|
||||
self.assertEqual(p[2], "/r")
|
||||
self.assertEqual(p[3], True)
|
||||
|
||||
p = _parse_url("wss://www.example.com:8080/r?key=value")
|
||||
p = parse_url("wss://www.example.com:8080/r?key=value")
|
||||
self.assertEqual(p[0], "www.example.com")
|
||||
self.assertEqual(p[1], 8080)
|
||||
self.assertEqual(p[2], "/r?key=value")
|
||||
self.assertEqual(p[3], True)
|
||||
|
||||
self.assertRaises(ValueError, _parse_url, "http://www.example.com/r")
|
||||
self.assertRaises(ValueError, parse_url, "http://www.example.com/r")
|
||||
|
||||
if sys.version_info[0] == 2 and sys.version_info[1] < 7:
|
||||
return
|
||||
|
||||
p = _parse_url("ws://[2a03:4000:123:83::3]/r")
|
||||
p = parse_url("ws://[2a03:4000:123:83::3]/r")
|
||||
self.assertEqual(p[0], "2a03:4000:123:83::3")
|
||||
self.assertEqual(p[1], 80)
|
||||
self.assertEqual(p[2], "/r")
|
||||
self.assertEqual(p[3], False)
|
||||
|
||||
p = _parse_url("ws://[2a03:4000:123:83::3]:8080/r")
|
||||
p = parse_url("ws://[2a03:4000:123:83::3]:8080/r")
|
||||
self.assertEqual(p[0], "2a03:4000:123:83::3")
|
||||
self.assertEqual(p[1], 8080)
|
||||
self.assertEqual(p[2], "/r")
|
||||
self.assertEqual(p[3], False)
|
||||
|
||||
p = _parse_url("wss://[2a03:4000:123:83::3]/r")
|
||||
p = parse_url("wss://[2a03:4000:123:83::3]/r")
|
||||
self.assertEqual(p[0], "2a03:4000:123:83::3")
|
||||
self.assertEqual(p[1], 443)
|
||||
self.assertEqual(p[2], "/r")
|
||||
self.assertEqual(p[3], True)
|
||||
|
||||
p = _parse_url("wss://[2a03:4000:123:83::3]:8080/r")
|
||||
p = parse_url("wss://[2a03:4000:123:83::3]:8080/r")
|
||||
self.assertEqual(p[0], "2a03:4000:123:83::3")
|
||||
self.assertEqual(p[1], 8080)
|
||||
self.assertEqual(p[2], "/r")
|
||||
@ -561,74 +561,74 @@ class ProxyInfoTest(unittest.TestCase):
|
||||
|
||||
|
||||
def testProxyFromArgs(self):
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", False, http_proxy_host="localhost"), ("localhost", 0, None))
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", False, http_proxy_host="localhost", http_proxy_port=3128), ("localhost", 3128, None))
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", True, http_proxy_host="localhost"), ("localhost", 0, None))
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", True, http_proxy_host="localhost", http_proxy_port=3128), ("localhost", 3128, None))
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", False, http_proxy_host="localhost"), ("localhost", 0, None))
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", False, http_proxy_host="localhost", http_proxy_port=3128), ("localhost", 3128, None))
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", True, http_proxy_host="localhost"), ("localhost", 0, None))
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", True, http_proxy_host="localhost", http_proxy_port=3128), ("localhost", 3128, None))
|
||||
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", False, http_proxy_host="localhost", http_proxy_auth=("a", "b")),
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", False, http_proxy_host="localhost", http_proxy_auth=("a", "b")),
|
||||
("localhost", 0, ("a", "b")))
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", False, http_proxy_host="localhost", http_proxy_port=3128, http_proxy_auth=("a", "b")),
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", False, http_proxy_host="localhost", http_proxy_port=3128, http_proxy_auth=("a", "b")),
|
||||
("localhost", 3128, ("a", "b")))
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", True, http_proxy_host="localhost", http_proxy_auth=("a", "b")),
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", True, http_proxy_host="localhost", http_proxy_auth=("a", "b")),
|
||||
("localhost", 0, ("a", "b")))
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", True, http_proxy_host="localhost", http_proxy_port=3128, http_proxy_auth=("a", "b")),
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", True, http_proxy_host="localhost", http_proxy_port=3128, http_proxy_auth=("a", "b")),
|
||||
("localhost", 3128, ("a", "b")))
|
||||
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", True, http_proxy_host="localhost", http_proxy_port=3128, http_no_proxy=["example.com"], http_proxy_auth=("a", "b")),
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", True, http_proxy_host="localhost", http_proxy_port=3128, http_no_proxy=["example.com"], http_proxy_auth=("a", "b")),
|
||||
("localhost", 3128, ("a", "b")))
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", True, http_proxy_host="localhost", http_proxy_port=3128, http_no_proxy=["echo.websocket.org"], http_proxy_auth=("a", "b")),
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", True, http_proxy_host="localhost", http_proxy_port=3128, http_no_proxy=["echo.websocket.org"], http_proxy_auth=("a", "b")),
|
||||
(None, 0, None))
|
||||
|
||||
|
||||
def testProxyFromEnv(self):
|
||||
os.environ["http_proxy"] = "http://localhost/"
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", False), ("localhost", None, None))
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, None))
|
||||
os.environ["http_proxy"] = "http://localhost:3128/"
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", False), ("localhost", 3128, None))
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, None))
|
||||
|
||||
os.environ["http_proxy"] = "http://localhost/"
|
||||
os.environ["https_proxy"] = "http://localhost2/"
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", False), ("localhost", None, None))
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, None))
|
||||
os.environ["http_proxy"] = "http://localhost:3128/"
|
||||
os.environ["https_proxy"] = "http://localhost2:3128/"
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", False), ("localhost", 3128, None))
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, None))
|
||||
|
||||
os.environ["http_proxy"] = "http://localhost/"
|
||||
os.environ["https_proxy"] = "http://localhost2/"
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", True), ("localhost2", None, None))
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", None, None))
|
||||
os.environ["http_proxy"] = "http://localhost:3128/"
|
||||
os.environ["https_proxy"] = "http://localhost2:3128/"
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", True), ("localhost2", 3128, None))
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", 3128, None))
|
||||
|
||||
|
||||
os.environ["http_proxy"] = "http://a:b@localhost/"
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", False), ("localhost", None, ("a", "b")))
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, ("a", "b")))
|
||||
os.environ["http_proxy"] = "http://a:b@localhost:3128/"
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", False), ("localhost", 3128, ("a", "b")))
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, ("a", "b")))
|
||||
|
||||
os.environ["http_proxy"] = "http://a:b@localhost/"
|
||||
os.environ["https_proxy"] = "http://a:b@localhost2/"
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", False), ("localhost", None, ("a", "b")))
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, ("a", "b")))
|
||||
os.environ["http_proxy"] = "http://a:b@localhost:3128/"
|
||||
os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", False), ("localhost", 3128, ("a", "b")))
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, ("a", "b")))
|
||||
|
||||
os.environ["http_proxy"] = "http://a:b@localhost/"
|
||||
os.environ["https_proxy"] = "http://a:b@localhost2/"
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", True), ("localhost2", None, ("a", "b")))
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", None, ("a", "b")))
|
||||
os.environ["http_proxy"] = "http://a:b@localhost:3128/"
|
||||
os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", True), ("localhost2", 3128, ("a", "b")))
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", 3128, ("a", "b")))
|
||||
|
||||
os.environ["http_proxy"] = "http://a:b@localhost/"
|
||||
os.environ["https_proxy"] = "http://a:b@localhost2/"
|
||||
os.environ["no_proxy"] = "example1.com,example2.com"
|
||||
self.assertEqual(_get_proxy_info("example.1.com", True), ("localhost2", None, ("a", "b")))
|
||||
self.assertEqual(get_proxy_info("example.1.com", True), ("localhost2", None, ("a", "b")))
|
||||
os.environ["http_proxy"] = "http://a:b@localhost:3128/"
|
||||
os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
|
||||
os.environ["no_proxy"] = "example1.com,example2.com, echo.websocket.org"
|
||||
self.assertEqual(_get_proxy_info("echo.websocket.org", True), (None, 0, None))
|
||||
self.assertEqual(get_proxy_info("echo.websocket.org", True), (None, 0, None))
|
||||
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user