# Copyright 2015 Google Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Helper functions for commonly used utilities.""" import base64 import functools import inspect import json import logging import os import warnings import six from six.moves import urllib __author__ = ( 'rafek@google.com (Rafe Kaplan)', 'guido@google.com (Guido van Rossum)', ) logger = logging.getLogger(__name__) POSITIONAL_WARNING = 'WARNING' POSITIONAL_EXCEPTION = 'EXCEPTION' POSITIONAL_IGNORE = 'IGNORE' POSITIONAL_SET = frozenset([POSITIONAL_WARNING, POSITIONAL_EXCEPTION, POSITIONAL_IGNORE]) positional_parameters_enforcement = POSITIONAL_WARNING _SYM_LINK_MESSAGE = 'File: {0}: Is a symbolic link.' _IS_DIR_MESSAGE = '{0}: Is a directory' _MISSING_FILE_MESSAGE = 'Cannot access {0}: No such file or directory' def positional(max_positional_args): """A decorator to declare that only the first N arguments my be positional. This decorator makes it easy to support Python 3 style keyword-only parameters. For example, in Python 3 it is possible to write:: def fn(pos1, *, kwonly1=None, kwonly1=None): ... All named parameters after ``*`` must be a keyword:: fn(10, 'kw1', 'kw2') # Raises exception. fn(10, kwonly1='kw1') # Ok. Example ^^^^^^^ To define a function like above, do:: @positional(1) def fn(pos1, kwonly1=None, kwonly2=None): ... If no default value is provided to a keyword argument, it becomes a required keyword argument:: @positional(0) def fn(required_kw): ... This must be called with the keyword parameter:: fn() # Raises exception. fn(10) # Raises exception. fn(required_kw=10) # Ok. When defining instance or class methods always remember to account for ``self`` and ``cls``:: class MyClass(object): @positional(2) def my_method(self, pos1, kwonly1=None): ... @classmethod @positional(2) def my_method(cls, pos1, kwonly1=None): ... The positional decorator behavior is controlled by ``_helpers.positional_parameters_enforcement``, which may be set to ``POSITIONAL_EXCEPTION``, ``POSITIONAL_WARNING`` or ``POSITIONAL_IGNORE`` to raise an exception, log a warning, or do nothing, respectively, if a declaration is violated. Args: max_positional_arguments: Maximum number of positional arguments. All parameters after the this index must be keyword only. Returns: A decorator that prevents using arguments after max_positional_args from being used as positional parameters. Raises: TypeError: if a key-word only argument is provided as a positional parameter, but only if _helpers.positional_parameters_enforcement is set to POSITIONAL_EXCEPTION. """ def positional_decorator(wrapped): @functools.wraps(wrapped) def positional_wrapper(*args, **kwargs): if len(args) > max_positional_args: plural_s = '' if max_positional_args != 1: plural_s = 's' message = ('{function}() takes at most {args_max} positional ' 'argument{plural} ({args_given} given)'.format( function=wrapped.__name__, args_max=max_positional_args, args_given=len(args), plural=plural_s)) if positional_parameters_enforcement == POSITIONAL_EXCEPTION: raise TypeError(message) elif positional_parameters_enforcement == POSITIONAL_WARNING: logger.warning(message) return wrapped(*args, **kwargs) return positional_wrapper if isinstance(max_positional_args, six.integer_types): return positional_decorator else: args, _, _, defaults = inspect.getargspec(max_positional_args) return positional(len(args) - len(defaults))(max_positional_args) def scopes_to_string(scopes): """Converts scope value to a string. If scopes is a string then it is simply passed through. If scopes is an iterable then a string is returned that is all the individual scopes concatenated with spaces. Args: scopes: string or iterable of strings, the scopes. Returns: The scopes formatted as a single string. """ if isinstance(scopes, six.string_types): return scopes else: return ' '.join(scopes) def string_to_scopes(scopes): """Converts stringifed scope value to a list. If scopes is a list then it is simply passed through. If scopes is an string then a list of each individual scope is returned. Args: scopes: a string or iterable of strings, the scopes. Returns: The scopes in a list. """ if not scopes: return [] elif isinstance(scopes, six.string_types): return scopes.split(' ') else: return scopes def _add_query_parameter(url, name, value): """Adds a query parameter to a url. Replaces the current value if it already exists in the URL. Args: url: string, url to add the query parameter to. name: string, query parameter name. value: string, query parameter value. Returns: Updated query parameter. Does not update the url if value is None. """ if value is None: return url else: parsed = list(urllib.parse.urlparse(url)) query = dict(urllib.parse.parse_qsl(parsed[4])) query[name] = value parsed[4] = urllib.parse.urlencode(query) return urllib.parse.urlunparse(parsed) def validate_file(filename): if os.path.islink(filename): raise IOError(_SYM_LINK_MESSAGE.format(filename)) elif os.path.isdir(filename): raise IOError(_IS_DIR_MESSAGE.format(filename)) elif not os.path.isfile(filename): warnings.warn(_MISSING_FILE_MESSAGE.format(filename)) def _parse_pem_key(raw_key_input): """Identify and extract PEM keys. Determines whether the given key is in the format of PEM key, and extracts the relevant part of the key if it is. Args: raw_key_input: The contents of a private key file (either PEM or PKCS12). Returns: string, The actual key if the contents are from a PEM file, or else None. """ offset = raw_key_input.find(b'-----BEGIN ') if offset != -1: return raw_key_input[offset:] def _json_encode(data): return json.dumps(data, separators=(',', ':')) def _to_bytes(value, encoding='ascii'): """Converts a string value to bytes, if necessary. Unfortunately, ``six.b`` is insufficient for this task since in Python2 it does not modify ``unicode`` objects. Args: value: The string/bytes value to be converted. encoding: The encoding to use to convert unicode to bytes. Defaults to "ascii", which will not allow any characters from ordinals larger than 127. Other useful values are "latin-1", which which will only allows byte ordinals (up to 255) and "utf-8", which will encode any unicode that needs to be. Returns: The original value converted to bytes (if unicode) or as passed in if it started out as bytes. Raises: ValueError if the value could not be converted to bytes. """ result = (value.encode(encoding) if isinstance(value, six.text_type) else value) if isinstance(result, six.binary_type): return result else: raise ValueError('{0!r} could not be converted to bytes'.format(value)) def _from_bytes(value): """Converts bytes to a string value, if necessary. Args: value: The string/bytes value to be converted. Returns: The original value converted to unicode (if bytes) or as passed in if it started out as unicode. Raises: ValueError if the value could not be converted to unicode. """ result = (value.decode('utf-8') if isinstance(value, six.binary_type) else value) if isinstance(result, six.text_type): return result else: raise ValueError( '{0!r} could not be converted to unicode'.format(value)) def _urlsafe_b64encode(raw_bytes): raw_bytes = _to_bytes(raw_bytes, encoding='utf-8') return base64.urlsafe_b64encode(raw_bytes).rstrip(b'=') def _urlsafe_b64decode(b64string): # Guard against unicode strings, which base64 can't handle. b64string = _to_bytes(b64string) padded = b64string + b'=' * (4 - len(b64string) % 4) return base64.urlsafe_b64decode(padded)