
It was a bit hard to read and tested as having too much cyclomatic complexity. Also, so that it is more clear what 'type' prefix is, always default it to '' instead of None where possible.
115 lines
3.5 KiB
Python
115 lines
3.5 KiB
Python
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
"""Utility functions grab bag."""
|
|
|
|
import os
|
|
|
|
import colorama
|
|
from six.moves.urllib import parse as urlparse
|
|
|
|
|
|
try: # Python 3
|
|
ConnectionRefused = ConnectionRefusedError
|
|
except NameError: # Python 2
|
|
import socket
|
|
ConnectionRefused = socket.error
|
|
|
|
|
|
def create_url(base_url, host, port=None, prefix='', ssl=False):
|
|
"""Given pieces of a path-based url, return a fully qualified url."""
|
|
scheme = 'http'
|
|
netloc = host
|
|
|
|
if (port and not _port_follows_standard(port, ssl)):
|
|
netloc = '%s:%s' % (host, port)
|
|
|
|
if ssl:
|
|
scheme = 'https'
|
|
|
|
parsed_url = urlparse.urlsplit(base_url)
|
|
query_string = parsed_url.query
|
|
path = parsed_url.path
|
|
|
|
# Guard against a prefix of None
|
|
if prefix:
|
|
path = '%s%s' % (prefix, path)
|
|
|
|
return urlparse.urlunsplit((scheme, netloc, path, query_string, ''))
|
|
|
|
|
|
def decode_response_content(header_dict, content):
|
|
"""Decode content to a proper string."""
|
|
content_type, charset = extract_content_type(header_dict)
|
|
|
|
if not_binary(content_type):
|
|
return content.decode(charset)
|
|
else:
|
|
return content
|
|
|
|
|
|
def extract_content_type(header_dict):
|
|
"""Extract content-type from headers."""
|
|
content_type = header_dict.get('content-type',
|
|
'application/binary').strip().lower()
|
|
charset = 'utf-8'
|
|
if ';' in content_type:
|
|
content_type, parameter_strings = (attr.strip() for attr
|
|
in content_type.split(';', 1))
|
|
try:
|
|
parameter_pairs = [atom.strip().split('=')
|
|
for atom in parameter_strings.split(';')]
|
|
parameters = {name: value for name, value in parameter_pairs}
|
|
charset = parameters['charset']
|
|
except (ValueError, KeyError):
|
|
# KeyError when no charset found.
|
|
# ValueError when the parameter_strings are poorly
|
|
# formed (for example trailing ;)
|
|
pass
|
|
|
|
return (content_type, charset)
|
|
|
|
|
|
def get_colorizer(stream):
|
|
"""Return a function to colorize a string.
|
|
|
|
Only if stream is a tty .
|
|
"""
|
|
if stream.isatty() or os.environ.get('GABBI_FORCE_COLOR', False):
|
|
colorama.init()
|
|
return _colorize
|
|
else:
|
|
return lambda x, y: y
|
|
|
|
|
|
def not_binary(content_type):
|
|
"""Decide if something is content we'd like to treat as a string."""
|
|
return (content_type.startswith('text/') or
|
|
content_type.endswith('+xml') or
|
|
content_type.endswith('+json') or
|
|
content_type == 'application/javascript' or
|
|
content_type.startswith('application/json'))
|
|
|
|
|
|
def _colorize(color, message):
|
|
"""Add a color to the message."""
|
|
try:
|
|
return getattr(colorama.Fore, color) + message + colorama.Fore.RESET
|
|
except AttributeError:
|
|
return message
|
|
|
|
|
|
def _port_follows_standard(port, ssl):
|
|
"""Return True if a standard port is using a non-standard ssl setting."""
|
|
port = int(port)
|
|
return (port == 443 and ssl) or (port == 80 and not ssl)
|