# Copyright 2012 OpenStack LLC. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import errno import hashlib import os import sys import uuid import prettytable from glanceclient import exc from glanceclient.openstack.common import importutils # Decorator for cli-args def arg(*args, **kwargs): def _decorator(func): # Because of the sematics of decorator composition if we just append # to the options list positional options will appear to be backwards. func.__dict__.setdefault('arguments', []).insert(0, (args, kwargs)) return func return _decorator def pretty_choice_list(l): return ', '.join("'%s'" % i for i in l) def print_list(objs, fields, formatters={}): pt = prettytable.PrettyTable([f for f in fields], caching=False) pt.align = 'l' for o in objs: row = [] for field in fields: if field in formatters: row.append(formatters[field](o)) else: field_name = field.lower().replace(' ', '_') data = getattr(o, field_name, None) or '' row.append(data) pt.add_row(row) print ensure_str(pt.get_string()) def print_dict(d): pt = prettytable.PrettyTable(['Property', 'Value'], caching=False) pt.align = 'l' [pt.add_row(list(r)) for r in d.iteritems()] print ensure_str(pt.get_string(sortby='Property')) def find_resource(manager, name_or_id): """Helper for the _find_* methods.""" # first try to get entity as integer id try: if isinstance(name_or_id, int) or name_or_id.isdigit(): return manager.get(int(name_or_id)) except exc.NotFound: pass # now try to get entity as uuid try: uuid.UUID(ensure_str(name_or_id)) return manager.get(name_or_id) except (ValueError, exc.NotFound): pass # finally try to find entity by name matches = list(manager.list(filters={'name': name_or_id})) num_matches = len(matches) if num_matches == 0: msg = "No %s with a name or ID of '%s' exists." % \ (manager.resource_class.__name__.lower(), name_or_id) raise exc.CommandError(msg) elif num_matches > 1: msg = ("Multiple %s matches found for '%s', use an ID to be more" " specific." % (manager.resource_class.__name__.lower(), name_or_id)) raise exc.CommandError(msg) else: return matches[0] def skip_authentication(f): """Function decorator used to indicate a caller may be unauthenticated.""" f.require_authentication = False return f def is_authentication_required(f): """Checks to see if the function requires authentication. Use the skip_authentication decorator to indicate a caller may skip the authentication step. """ return getattr(f, 'require_authentication', True) def string_to_bool(arg): return arg.strip().lower() in ('t', 'true', 'yes', '1') def env(*vars, **kwargs): """Search for the first defined of possibly many env vars Returns the first environment variable defined in vars, or returns the default defined in kwargs. """ for v in vars: value = os.environ.get(v, None) if value: return value return kwargs.get('default', '') def import_versioned_module(version, submodule=None): module = 'glanceclient.v%s' % version if submodule: module = '.'.join((module, submodule)) return importutils.import_module(module) def exit(msg=''): if msg: print >> sys.stderr, ensure_str(msg) sys.exit(1) def save_image(data, path): """ Save an image to the specified path. :param data: binary data of the image :param path: path to save the image to """ if path is None: image = sys.stdout else: image = open(path, 'wb') try: for chunk in data: image.write(chunk) finally: if path is not None: image.close() def integrity_iter(iter, checksum): """ Check image data integrity. :raises: IOError """ md5sum = hashlib.md5() for chunk in iter: yield chunk md5sum.update(chunk) md5sum = md5sum.hexdigest() if md5sum != checksum: raise IOError(errno.EPIPE, 'Corrupt image download. Checksum was %s expected %s' % (md5sum, checksum)) def make_size_human_readable(size): suffix = ['B', 'kB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB'] base = 1024.0 index = 0 while size >= base: index = index + 1 size = size / base padded = '%.1f' % size stripped = padded.rstrip('0').rstrip('.') return '%s%s' % (stripped, suffix[index]) def ensure_unicode(text, incoming=None, errors='strict'): """ Decodes incoming objects using `incoming` if they're not already unicode. :param incoming: Text's current encoding :param errors: Errors handling policy. :returns: text or a unicode `incoming` encoded representation of it. """ if isinstance(text, unicode): return text if not incoming: incoming = sys.stdin.encoding or \ sys.getdefaultencoding() # Calling `str` in case text is a non str # object. text = str(text) try: return text.decode(incoming, errors) except UnicodeDecodeError: # Note(flaper87) If we get here, it means that # sys.stdin.encoding / sys.getdefaultencoding # didn't return a suitable encoding to decode # text. This happens mostly when global LANG # var is not set correctly and there's no # default encoding. In this case, most likely # python will use ASCII or ANSI encoders as # default encodings but they won't be capable # of decoding non-ASCII characters. # # Also, UTF-8 is being used since it's an ASCII # extension. return text.decode('utf-8', errors) def ensure_str(text, incoming=None, encoding='utf-8', errors='strict'): """ Encodes incoming objects using `encoding`. If incoming is not specified, text is expected to be encoded with current python's default encoding. (`sys.getdefaultencoding`) :param incoming: Text's current encoding :param encoding: Expected encoding for text (Default UTF-8) :param errors: Errors handling policy. :returns: text or a bytestring `encoding` encoded representation of it. """ if not incoming: incoming = sys.stdin.encoding or \ sys.getdefaultencoding() if not isinstance(text, basestring): # try to convert `text` to string # This allows this method for receiving # objs that can be converted to string text = str(text) if isinstance(text, unicode): return text.encode(encoding, errors) elif text and encoding != incoming: # Decode text before encoding it with `encoding` text = ensure_unicode(text, incoming, errors) return text.encode(encoding, errors) return text def getsockopt(self, *args, **kwargs): """ A function which allows us to monkey patch eventlet's GreenSocket, adding a required 'getsockopt' method. TODO: (mclaren) we can remove this once the eventlet fix (https://bitbucket.org/eventlet/eventlet/commits/609f230) lands in mainstream packages. """ return self.fd.getsockopt(*args, **kwargs)