Flaper Fesp 55cb4f4473 Decode input and encode output
Currently glanceclient doesn't support non-ASCII characters for images
names and properties (names and values as well). This patch introduces 2
functions (utils.py) that will help encoding and decoding strings in a
more "secure" way.

About the ensure_(str|unicode) functions:

    They both try to use first the encoding used in stdin (or python's
    default encoding if that's None) and fallback to utf-8 if those
    encodings fail to decode a given text.

About the changes in glanceclient:

    The major change is that all inputs will be decoded and will kept as
    such inside the client's functions and will then be encoded before
    being printed / sent out the client.

    There are other small changes, all related to encoding to str,
    around in order to avoid fails during some conversions. i.e: quoting
    url encoded parameters.

Fixes bug: 1061150

Change-Id: I5c3ea93a716edfe284d19f6291d4e36028f91eb2
2013-02-13 21:53:11 +01:00

266 lines
7.6 KiB
Python

# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import errno
import hashlib
import os
import sys
import uuid
import prettytable
from glanceclient import exc
from glanceclient.openstack.common import importutils
# Decorator for cli-args
def arg(*args, **kwargs):
def _decorator(func):
# Because of the sematics of decorator composition if we just append
# to the options list positional options will appear to be backwards.
func.__dict__.setdefault('arguments', []).insert(0, (args, kwargs))
return func
return _decorator
def pretty_choice_list(l):
return ', '.join("'%s'" % i for i in l)
def print_list(objs, fields, formatters={}):
pt = prettytable.PrettyTable([f for f in fields], caching=False)
pt.align = 'l'
for o in objs:
row = []
for field in fields:
if field in formatters:
row.append(formatters[field](o))
else:
field_name = field.lower().replace(' ', '_')
data = getattr(o, field_name, None) or ''
row.append(data)
pt.add_row(row)
print ensure_str(pt.get_string())
def print_dict(d):
pt = prettytable.PrettyTable(['Property', 'Value'], caching=False)
pt.align = 'l'
[pt.add_row(list(r)) for r in d.iteritems()]
print ensure_str(pt.get_string(sortby='Property'))
def find_resource(manager, name_or_id):
"""Helper for the _find_* methods."""
# first try to get entity as integer id
try:
if isinstance(name_or_id, int) or name_or_id.isdigit():
return manager.get(int(name_or_id))
except exc.NotFound:
pass
# now try to get entity as uuid
try:
uuid.UUID(ensure_str(name_or_id))
return manager.get(name_or_id)
except (ValueError, exc.NotFound):
pass
# finally try to find entity by name
matches = list(manager.list(filters={'name': name_or_id}))
num_matches = len(matches)
if num_matches == 0:
msg = "No %s with a name or ID of '%s' exists." % \
(manager.resource_class.__name__.lower(), name_or_id)
raise exc.CommandError(msg)
elif num_matches > 1:
msg = ("Multiple %s matches found for '%s', use an ID to be more"
" specific." % (manager.resource_class.__name__.lower(),
name_or_id))
raise exc.CommandError(msg)
else:
return matches[0]
def skip_authentication(f):
"""Function decorator used to indicate a caller may be unauthenticated."""
f.require_authentication = False
return f
def is_authentication_required(f):
"""Checks to see if the function requires authentication.
Use the skip_authentication decorator to indicate a caller may
skip the authentication step.
"""
return getattr(f, 'require_authentication', True)
def string_to_bool(arg):
return arg.strip().lower() in ('t', 'true', 'yes', '1')
def env(*vars, **kwargs):
"""Search for the first defined of possibly many env vars
Returns the first environment variable defined in vars, or
returns the default defined in kwargs.
"""
for v in vars:
value = os.environ.get(v, None)
if value:
return value
return kwargs.get('default', '')
def import_versioned_module(version, submodule=None):
module = 'glanceclient.v%s' % version
if submodule:
module = '.'.join((module, submodule))
return importutils.import_module(module)
def exit(msg=''):
if msg:
print >> sys.stderr, ensure_str(msg)
sys.exit(1)
def save_image(data, path):
"""
Save an image to the specified path.
:param data: binary data of the image
:param path: path to save the image to
"""
if path is None:
image = sys.stdout
else:
image = open(path, 'wb')
try:
for chunk in data:
image.write(chunk)
finally:
if path is not None:
image.close()
def integrity_iter(iter, checksum):
"""
Check image data integrity.
:raises: IOError
"""
md5sum = hashlib.md5()
for chunk in iter:
yield chunk
md5sum.update(chunk)
md5sum = md5sum.hexdigest()
if md5sum != checksum:
raise IOError(errno.EPIPE,
'Corrupt image download. Checksum was %s expected %s' %
(md5sum, checksum))
def make_size_human_readable(size):
suffix = ['B', 'kB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB']
base = 1024.0
index = 0
while size >= base:
index = index + 1
size = size / base
padded = '%.1f' % size
stripped = padded.rstrip('0').rstrip('.')
return '%s%s' % (stripped, suffix[index])
def ensure_unicode(text, incoming=None, errors='strict'):
"""
Decodes incoming objects using `incoming` if they're
not already unicode.
:param incoming: Text's current encoding
:param errors: Errors handling policy.
:returns: text or a unicode `incoming` encoded
representation of it.
"""
if isinstance(text, unicode):
return text
if not incoming:
incoming = sys.stdin.encoding or \
sys.getdefaultencoding()
# Calling `str` in case text is a non str
# object.
text = str(text)
try:
return text.decode(incoming, errors)
except UnicodeDecodeError:
# Note(flaper87) If we get here, it means that
# sys.stdin.encoding / sys.getdefaultencoding
# didn't return a suitable encoding to decode
# text. This happens mostly when global LANG
# var is not set correctly and there's no
# default encoding. In this case, most likely
# python will use ASCII or ANSI encoders as
# default encodings but they won't be capable
# of decoding non-ASCII characters.
#
# Also, UTF-8 is being used since it's an ASCII
# extension.
return text.decode('utf-8', errors)
def ensure_str(text, incoming=None,
encoding='utf-8', errors='strict'):
"""
Encodes incoming objects using `encoding`. If
incoming is not specified, text is expected to
be encoded with current python's default encoding.
(`sys.getdefaultencoding`)
:param incoming: Text's current encoding
:param encoding: Expected encoding for text (Default UTF-8)
:param errors: Errors handling policy.
:returns: text or a bytestring `encoding` encoded
representation of it.
"""
if not incoming:
incoming = sys.stdin.encoding or \
sys.getdefaultencoding()
if not isinstance(text, basestring):
# try to convert `text` to string
# This allows this method for receiving
# objs that can be converted to string
text = str(text)
if isinstance(text, unicode):
return text.encode(encoding, errors)
elif text and encoding != incoming:
# Decode text before encoding it with `encoding`
text = ensure_unicode(text, incoming, errors)
return text.encode(encoding, errors)
return text