2015-03-08 15:01:09 +00:00
|
|
|
#
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
|
|
# not use this file except in compliance with the License. You may obtain
|
|
|
|
# a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
|
|
# License for the specific language governing permissions and limitations
|
|
|
|
# under the License.
|
|
|
|
"""Utility functions grab bag."""
|
|
|
|
|
2015-08-10 21:44:34 +01:00
|
|
|
import os
|
|
|
|
|
2015-04-27 14:07:17 +01:00
|
|
|
|
2015-07-14 11:01:36 +01:00
|
|
|
try: # Python 3
|
2015-07-14 11:30:12 +02:00
|
|
|
ConnectionRefused = ConnectionRefusedError
|
2015-07-14 11:01:36 +01:00
|
|
|
except NameError: # Python 2
|
2015-07-14 11:30:12 +02:00
|
|
|
import socket
|
|
|
|
ConnectionRefused = socket.error
|
|
|
|
|
|
|
|
|
2016-05-07 14:00:37 +01:00
|
|
|
import colorama
|
|
|
|
from six.moves.urllib import parse as urlparse
|
|
|
|
|
|
|
|
|
2015-10-08 17:54:00 +01:00
|
|
|
def create_url(base_url, host, port=None, prefix='', ssl=False):
|
2015-10-08 16:39:09 +01:00
|
|
|
"""Given pieces of a path-based url, return a fully qualified url."""
|
|
|
|
scheme = 'http'
|
2016-06-01 12:21:26 +01:00
|
|
|
|
|
|
|
# A host with : in it at this stage is assumed to be an IPv6
|
|
|
|
# address of some kind (they come in many forms). Port should
|
|
|
|
# already have been stripped off.
|
|
|
|
if ':' in host and not (host.startswith('[') and host.endswith(']')):
|
|
|
|
host = '[%s]' % host
|
2015-10-08 16:39:09 +01:00
|
|
|
|
2015-10-14 10:54:00 +01:00
|
|
|
if port and not _port_follows_standard(port, ssl):
|
2015-10-08 16:39:09 +01:00
|
|
|
netloc = '%s:%s' % (host, port)
|
2016-06-01 12:21:26 +01:00
|
|
|
else:
|
|
|
|
netloc = host
|
2015-10-08 16:39:09 +01:00
|
|
|
|
|
|
|
if ssl:
|
|
|
|
scheme = 'https'
|
|
|
|
|
2015-10-08 17:54:00 +01:00
|
|
|
parsed_url = urlparse.urlsplit(base_url)
|
2015-10-08 16:39:09 +01:00
|
|
|
query_string = parsed_url.query
|
|
|
|
path = parsed_url.path
|
|
|
|
|
2015-10-08 17:54:00 +01:00
|
|
|
# Guard against a prefix of None
|
2015-10-08 16:39:09 +01:00
|
|
|
if prefix:
|
|
|
|
path = '%s%s' % (prefix, path)
|
|
|
|
|
|
|
|
return urlparse.urlunsplit((scheme, netloc, path, query_string, ''))
|
|
|
|
|
|
|
|
|
2015-08-10 20:23:50 +01:00
|
|
|
def decode_response_content(header_dict, content):
|
2015-08-09 18:13:05 +01:00
|
|
|
"""Decode content to a proper string."""
|
|
|
|
content_type, charset = extract_content_type(header_dict)
|
2015-04-27 14:07:17 +01:00
|
|
|
|
2015-08-09 18:13:05 +01:00
|
|
|
if not_binary(content_type):
|
|
|
|
return content.decode(charset)
|
|
|
|
else:
|
|
|
|
return content
|
2015-04-27 14:07:17 +01:00
|
|
|
|
2015-03-08 15:01:09 +00:00
|
|
|
|
2015-08-09 18:13:05 +01:00
|
|
|
def extract_content_type(header_dict):
|
|
|
|
"""Extract content-type from headers."""
|
|
|
|
content_type = header_dict.get('content-type',
|
|
|
|
'application/binary').strip().lower()
|
2015-07-22 20:01:03 +01:00
|
|
|
charset = 'utf-8'
|
2015-03-08 15:01:09 +00:00
|
|
|
if ';' in content_type:
|
2015-07-22 20:11:00 +01:00
|
|
|
content_type, parameter_strings = (attr.strip() for attr
|
|
|
|
in content_type.split(';', 1))
|
2015-07-22 20:01:03 +01:00
|
|
|
try:
|
|
|
|
parameter_pairs = [atom.strip().split('=')
|
|
|
|
for atom in parameter_strings.split(';')]
|
|
|
|
parameters = {name: value for name, value in parameter_pairs}
|
|
|
|
charset = parameters['charset']
|
|
|
|
except (ValueError, KeyError):
|
|
|
|
# KeyError when no charset found.
|
|
|
|
# ValueError when the parameter_strings are poorly
|
|
|
|
# formed (for example trailing ;)
|
|
|
|
pass
|
2015-03-08 15:01:09 +00:00
|
|
|
|
2015-08-09 18:13:05 +01:00
|
|
|
return (content_type, charset)
|
2015-04-27 14:07:17 +01:00
|
|
|
|
|
|
|
|
2015-08-10 20:43:18 +01:00
|
|
|
def get_colorizer(stream):
|
|
|
|
"""Return a function to colorize a string.
|
|
|
|
|
|
|
|
Only if stream is a tty .
|
|
|
|
"""
|
2015-08-10 21:44:34 +01:00
|
|
|
if stream.isatty() or os.environ.get('GABBI_FORCE_COLOR', False):
|
2015-08-10 20:43:18 +01:00
|
|
|
colorama.init()
|
|
|
|
return _colorize
|
|
|
|
else:
|
|
|
|
return lambda x, y: y
|
|
|
|
|
|
|
|
|
2015-03-08 15:01:09 +00:00
|
|
|
def not_binary(content_type):
|
|
|
|
"""Decide if something is content we'd like to treat as a string."""
|
|
|
|
return (content_type.startswith('text/') or
|
|
|
|
content_type.endswith('+xml') or
|
|
|
|
content_type.endswith('+json') or
|
|
|
|
content_type == 'application/javascript' or
|
2015-08-04 18:34:10 +01:00
|
|
|
content_type.startswith('application/json'))
|
2015-08-10 20:43:18 +01:00
|
|
|
|
|
|
|
|
|
|
|
def _colorize(color, message):
|
|
|
|
"""Add a color to the message."""
|
|
|
|
try:
|
|
|
|
return getattr(colorama.Fore, color) + message + colorama.Fore.RESET
|
|
|
|
except AttributeError:
|
|
|
|
return message
|
2015-10-08 17:54:00 +01:00
|
|
|
|
|
|
|
|
|
|
|
def _port_follows_standard(port, ssl):
|
|
|
|
"""Return True if a standard port is using a non-standard ssl setting."""
|
|
|
|
port = int(port)
|
|
|
|
return (port == 443 and ssl) or (port == 80 and not ssl)
|