diff --git a/bin/heat-api b/bin/heat-api index c43e8f6ef0..ca82d89773 100755 --- a/bin/heat-api +++ b/bin/heat-api @@ -21,6 +21,7 @@ if os.path.exists(os.path.join(possible_topdir, 'heat', '__init__.py')): gettext.install('heat', unicode=1) +from heat import rpc from heat.common import config from heat.common import wsgi from paste import httpserver @@ -30,6 +31,8 @@ if __name__ == '__main__': try: conf = config.HeatConfigOpts() conf() + config.FLAGS = conf + rpc.configure(conf) app = config.load_paste_app(conf) diff --git a/bin/heat-engine b/bin/heat-engine index ae99e518a8..b6e4cbf156 100755 --- a/bin/heat-engine +++ b/bin/heat-engine @@ -38,6 +38,7 @@ if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'heat', '__init__.py')): gettext.install('heat', unicode=1) +from heat import rpc from heat import service from heat.common import config from heat.common import utils @@ -47,13 +48,18 @@ logger = logging.getLogger('heat.engine') if __name__ == '__main__': - config.FLAGS(sys.argv) - config.setup_logging(config.FLAGS) - db_api.configure(config.FLAGS) + conf = config.HeatEngineConfigOpts() + conf() + config.FLAGS = conf + + config.setup_logging(conf) + rpc.configure(conf) + db_api.configure(conf) #utils.monkey_patch() server = service.Service.create(binary='heat-engine', topic='engine', - manager='heat.engine.manager.EngineManager') + manager='heat.engine.manager.EngineManager', + config=conf) service.serve(server) service.wait() diff --git a/heat/common/config.py b/heat/common/config.py index 2633a9c64a..0bdb418b12 100644 --- a/heat/common/config.py +++ b/heat/common/config.py @@ -38,35 +38,63 @@ paste_deploy_opts = [ cfg.StrOpt('config_file'), ] +FLAGS = None -class HeatConfigOpts(cfg.CommonConfigOpts): - - def __init__(self, default_config_files=None, **kwargs): - super(HeatConfigOpts, self).__init__( - project='heat', - version='%%prog %s' % version.version_string(), - default_config_files=default_config_files, - **kwargs) - -class HeatEngineConfigOpts(cfg.CommonConfigOpts): - db_opts = [ - cfg.StrOpt('db_backend', default='heat.db.anydbm.api', help='The backend to use for db'), - cfg.StrOpt('sql_connection', - default='mysql://heat:heat@localhost/heat', - help='The SQLAlchemy connection string used to connect to the ' - 'database'), - cfg.IntOpt('sql_idle_timeout', - default=3600, - help='timeout before idle sql connections are reaped'), - ] - engine_opts = [ - cfg.StrOpt('host', - default=socket.gethostname(), - help='Name of this node. This can be an opaque identifier. ' - 'It is not necessarily a hostname, FQDN, or IP address.'), - cfg.StrOpt('instance_driver', - default='heat.engine.nova', - help='Driver to use for controlling instances'), +rpc_opts = [ + cfg.StrOpt('rpc_backend', + default='heat.rpc.impl_qpid', + help="The messaging module to use, defaults to kombu."), + cfg.IntOpt('rpc_thread_pool_size', + default=1024, + help='Size of RPC thread pool'), + cfg.IntOpt('rpc_conn_pool_size', + default=30, + help='Size of RPC connection pool'), + cfg.IntOpt('rpc_response_timeout', + default=60, + help='Seconds to wait for a response from call or multicall'), + cfg.StrOpt('qpid_hostname', + default='localhost', + help='Qpid broker hostname'), + cfg.StrOpt('qpid_port', + default='5672', + help='Qpid broker port'), + cfg.StrOpt('qpid_username', + default='', + help='Username for qpid connection'), + cfg.StrOpt('qpid_password', + default='', + help='Password for qpid connection'), + cfg.StrOpt('qpid_sasl_mechanisms', + default='', + help='Space separated list of SASL mechanisms to use for auth'), + cfg.BoolOpt('qpid_reconnect', + default=True, + help='Automatically reconnect'), + cfg.IntOpt('qpid_reconnect_timeout', + default=0, + help='Reconnection timeout in seconds'), + cfg.IntOpt('qpid_reconnect_limit', + default=0, + help='Max reconnections before giving up'), + cfg.IntOpt('qpid_reconnect_interval_min', + default=0, + help='Minimum seconds between reconnection attempts'), + cfg.IntOpt('qpid_reconnect_interval_max', + default=0, + help='Maximum seconds between reconnection attempts'), + cfg.IntOpt('qpid_reconnect_interval', + default=0, + help='Equivalent to setting max and min to the same value'), + cfg.IntOpt('qpid_heartbeat', + default=5, + help='Seconds between connection keepalive heartbeats'), + cfg.StrOpt('qpid_protocol', + default='tcp', + help="Transport to use, either 'tcp' or 'ssl'"), + cfg.BoolOpt('qpid_tcp_nodelay', + default=True, + help='Disable Nagle algorithm'), cfg.StrOpt('rabbit_host', default='localhost', help='the RabbitMQ host'), @@ -102,6 +130,73 @@ class HeatEngineConfigOpts(cfg.CommonConfigOpts): ] + +class HeatConfigOpts(cfg.CommonConfigOpts): + def __init__(self, default_config_files=None, **kwargs): + super(HeatConfigOpts, self).__init__( + project='heat', + version='%%prog %s' % version.version_string(), + default_config_files=default_config_files, + **kwargs) + self.register_cli_opts(rpc_opts) + +class HeatEngineConfigOpts(cfg.CommonConfigOpts): + + service_opts = [ + cfg.IntOpt('report_interval', + default=10, + help='seconds between nodes reporting state to datastore'), + cfg.IntOpt('periodic_interval', + default=60, + help='seconds between running periodic tasks'), + cfg.StrOpt('ec2_listen', + default="0.0.0.0", + help='IP address for EC2 API to listen'), + cfg.IntOpt('ec2_listen_port', + default=8773, + help='port for ec2 api to listen'), + cfg.StrOpt('osapi_compute_listen', + default="0.0.0.0", + help='IP address for OpenStack API to listen'), + cfg.IntOpt('osapi_compute_listen_port', + default=8774, + help='list port for osapi compute'), + cfg.StrOpt('metadata_manager', + default='nova.api.manager.MetadataManager', + help='OpenStack metadata service manager'), + cfg.StrOpt('metadata_listen', + default="0.0.0.0", + help='IP address for metadata api to listen'), + cfg.IntOpt('metadata_listen_port', + default=8775, + help='port for metadata api to listen'), + cfg.StrOpt('osapi_volume_listen', + default="0.0.0.0", + help='IP address for OpenStack Volume API to listen'), + cfg.IntOpt('osapi_volume_listen_port', + default=8776, + help='port for os volume api to listen'), + ] + db_opts = [ + cfg.StrOpt('db_backend', default='heat.db.anydbm.api', help='The backend to use for db'), + cfg.StrOpt('sql_connection', + default='mysql://heat:heat@localhost/heat', + help='The SQLAlchemy connection string used to connect to the ' + 'database'), + cfg.IntOpt('sql_idle_timeout', + default=3600, + help='timeout before idle sql connections are reaped'), + ] + engine_opts = [ + cfg.StrOpt('host', + default=socket.gethostname(), + help='Name of this node. This can be an opaque identifier. ' + 'It is not necessarily a hostname, FQDN, or IP address.'), + cfg.StrOpt('instance_driver', + default='heat.engine.nova', + help='Driver to use for controlling instances'), + ] + def __init__(self, default_config_files=None, **kwargs): super(HeatEngineConfigOpts, self).__init__( project='heat', @@ -111,9 +206,8 @@ class HeatEngineConfigOpts(cfg.CommonConfigOpts): prog='heat-engine') self.register_cli_opts(self.engine_opts) self.register_cli_opts(self.db_opts) - -FLAGS = HeatEngineConfigOpts() - + self.register_cli_opts(self.service_opts) + self.register_cli_opts(rpc_opts) def setup_logging(conf): """ diff --git a/heat/common/context.py b/heat/common/context.py index db689bde1e..99c82aced0 100644 --- a/heat/common/context.py +++ b/heat/common/context.py @@ -14,9 +14,9 @@ # under the License. from heat.common import exception -from heat.common import utils from heat.common import wsgi from heat.openstack.common import cfg +from heat.openstack.common import utils class RequestContext(object): diff --git a/heat/common/exception.py b/heat/common/exception.py index af80956588..d175ec669a 100644 --- a/heat/common/exception.py +++ b/heat/common/exception.py @@ -19,41 +19,12 @@ import functools import urlparse - +from heat.openstack.common.exception import OpenstackException class RedirectException(Exception): def __init__(self, url): self.url = urlparse.urlparse(url) - -class HeatException(Exception): - """ - Base Heat Exception - - To correctly use this class, inherit from it and define - a 'message' property. That message will get printf'd - with the keyword arguments provided to the constructor. - """ - message = _("An unknown exception occurred") - - def __init__(self, *args, **kwargs): - try: - self._error_string = self.message % kwargs - except Exception: - # at least get the core message out if something happened - self._error_string = self.message - if len(args) > 0: - # If there is a non-kwarg parameter, assume it's the error - # message or reason description and tack it on to the end - # of the exception message - # Convert all arguments into their string representations... - args = ["%s" % arg for arg in args] - self._error_string = (self._error_string + - "\nDetails: %s" % '\n'.join(args)) - - def __str__(self): - return self._error_string - def wrap_exception(notifier=None, publisher_id=None, event_type=None, level=None): """This decorator wraps a method to catch any exceptions that may @@ -100,94 +71,30 @@ def wrap_exception(notifier=None, publisher_id=None, event_type=None, return functools.wraps(f)(wrapped) return inner - -class NovaException(Exception): - """Base Nova Exception - - To correctly use this class, inherit from it and define - a 'message' property. That message will get printf'd - with the keyword arguments provided to the constructor. - - """ - message = _("An unknown exception occurred.") - - def __init__(self, message=None, **kwargs): - self.kwargs = kwargs - - if 'code' not in self.kwargs: - try: - self.kwargs['code'] = self.code - except AttributeError: - pass - - if not message: - try: - message = self.message % kwargs - - except Exception as e: - # at least get the core message out if something happened - message = self.message - - super(NovaException, self).__init__(message) - -class MissingArgumentError(HeatException): - message = _("Missing required argument.") - - -class MissingCredentialError(HeatException): +class MissingCredentialError(OpenstackException): message = _("Missing required credential: %(required)s") -class BadAuthStrategy(HeatException): +class BadAuthStrategy(OpenstackException): message = _("Incorrect auth strategy, expected \"%(expected)s\" but " "received \"%(received)s\"") - -class NotFound(HeatException): - message = _("An object with the specified identifier was not found.") - - -class UnknownScheme(HeatException): - message = _("Unknown scheme '%(scheme)s' found in URI") - - -class BadStoreUri(HeatException): - message = _("The Store URI %(uri)s was malformed. Reason: %(reason)s") - - -class Duplicate(HeatException): - message = _("An object with the same identifier already exists.") - - -class StorageFull(HeatException): - message = _("There is not enough disk space on the image storage media.") - - -class StorageWriteDenied(HeatException): - message = _("Permission to write image storage media denied.") - - -class ImportFailure(HeatException): - message = _("Failed to import requested object/class: '%(import_str)s'. " - "Reason: %(reason)s") - - -class AuthBadRequest(HeatException): +class AuthBadRequest(OpenstackException): message = _("Connect error/bad request to Auth service at URL %(url)s.") -class AuthUrlNotFound(HeatException): +class AuthUrlNotFound(OpenstackException): message = _("Auth service at URL %(url)s not found.") -class AuthorizationFailure(HeatException): +class AuthorizationFailure(OpenstackException): message = _("Authorization failed.") -class NotAuthenticated(HeatException): +class NotAuthenticated(OpenstackException): message = _("You are not authenticated.") -class Forbidden(HeatException): +class Forbidden(OpenstackException): message = _("You are not authorized to complete this action.") #NOTE(bcwaldon): here for backwards-compatability, need to deprecate. @@ -195,32 +102,24 @@ class NotAuthorized(Forbidden): message = _("You are not authorized to complete this action.") -class Invalid(HeatException): +class Invalid(OpenstackException): message = _("Data supplied was not valid.") -class AuthorizationRedirect(HeatException): +class AuthorizationRedirect(OpenstackException): message = _("Redirecting to %(uri)s for authorization.") -class DatabaseMigrationError(HeatException): - message = _("There was an error migrating the database.") - - -class ClientConnectionError(HeatException): - message = _("There was an error connecting to a server") - - -class ClientConfigurationError(HeatException): +class ClientConfigurationError(OpenstackException): message = _("There was an error configuring the client.") -class MultipleChoices(HeatException): +class MultipleChoices(OpenstackException): message = _("The request returned a 302 Multiple Choices. This generally " "means that you have not included a version indicator in a " "request URI.\n\nThe body of response returned:\n%(body)s") -class LimitExceeded(HeatException): +class LimitExceeded(OpenstackException): message = _("The request returned a 413 Request Entity Too Large. This " "generally means that rate limiting or a quota threshold was " "breached.\n\nThe response body:\n%(body)s") @@ -231,7 +130,7 @@ class LimitExceeded(HeatException): super(LimitExceeded, self).__init__(*args, **kwargs) -class ServiceUnavailable(HeatException): +class ServiceUnavailable(OpenstackException): message = _("The request returned a 503 ServiceUnavilable. This " "generally occurs on service overload or other transient " "outage.") @@ -241,65 +140,27 @@ class ServiceUnavailable(HeatException): else None) super(ServiceUnavailable, self).__init__(*args, **kwargs) -class RequestUriTooLong(HeatException): +class RequestUriTooLong(OpenstackException): message = _("The URI was too long.") -class ServerError(HeatException): +class ServerError(OpenstackException): message = _("The request returned 500 Internal Server Error" "\n\nThe response body:\n%(body)s") - -class UnexpectedStatus(HeatException): - message = _("The request returned an unexpected status: %(status)s." - "\n\nThe response body:\n%(body)s") - - -class InvalidContentType(HeatException): - message = _("Invalid content type %(content_type)s") - - -class BadRegistryConnectionConfiguration(HeatException): - message = _("Registry was not configured correctly on API server. " - "Reason: %(reason)s") - - -class BadStoreConfiguration(HeatException): - message = _("Store %(store_name)s could not be configured correctly. " - "Reason: %(reason)s") - - -class BadDriverConfiguration(HeatException): - message = _("Driver %(driver_name)s could not be configured correctly. " - "Reason: %(reason)s") - - -class StoreDeleteNotSupported(HeatException): - message = _("Deleting images from this store is not supported.") - - -class StoreAddDisabled(HeatException): - message = _("Configuration for store failed. Adding images to this " - "store is disabled.") - - -class InvalidNotifierStrategy(HeatException): - message = _("'%(strategy)s' is not an available notifier strategy.") - - -class MaxRedirectsExceeded(HeatException): +class MaxRedirectsExceeded(OpenstackException): message = _("Maximum redirects (%(redirects)s) was exceeded.") -class InvalidRedirect(HeatException): +class InvalidRedirect(OpenstackException): message = _("Received invalid HTTP redirect.") -class NoServiceEndpoint(HeatException): +class NoServiceEndpoint(OpenstackException): message = _("Response from Keystone does not contain a Heat endpoint.") -class RegionAmbiguity(HeatException): +class RegionAmbiguity(OpenstackException): message = _("Multiple 'image' service matches for region %(region)s. This " "generally means that a region is required and you have not " "supplied one.") diff --git a/heat/common/utils.py b/heat/common/utils.py index 6b39e41c90..d6e7eaa884 100644 --- a/heat/common/utils.py +++ b/heat/common/utils.py @@ -29,60 +29,10 @@ from eventlet import greenthread from eventlet import semaphore from eventlet.green import subprocess -from heat.common import exception +from heat.openstack.common import exception PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f" -def import_class(import_str): - """Returns a class from a string including module and class.""" - mod_str, _sep, class_str = import_str.rpartition('.') - try: - __import__(mod_str) - return getattr(sys.modules[mod_str], class_str) - except (ImportError, ValueError, AttributeError), exc: - #LOG.debug(_('Inner Exception: %s'), exc) - raise exception.ClassNotFound(class_name=class_str, exception=exc) - - -def import_object(import_str): - """Returns an object including a module or module and class.""" - try: - __import__(import_str) - return sys.modules[import_str] - except ImportError: - cls = import_class(import_str) - return cls() - -class LazyPluggable(object): - """A pluggable backend loaded lazily based on some value.""" - - def __init__(self, pivot, **backends): - self.__backends = backends - self.__pivot = pivot - self.__backend = None - - def __get_backend(self): - if not self.__backend: - backend_name = FLAGS[self.__pivot] - if backend_name not in self.__backends: - raise exception.Error(_('Invalid backend: %s') % backend_name) - - backend = self.__backends[backend_name] - if isinstance(backend, tuple): - name = backend[0] - fromlist = backend[1] - else: - name = backend - fromlist = backend - - self.__backend = __import__(name, None, None, fromlist) - #LOG.debug(_('backend %s'), self.__backend) - return self.__backend - - def __getattr__(self, key): - backend = self.__get_backend() - return getattr(backend, key) - def chunkreadable(iter, chunk_size=65536): """ Wrap a readable iterator with a reader yielding chunks of @@ -109,27 +59,6 @@ def chunkiter(fp, chunk_size=65536): break -def import_class(import_str): - """Returns a class from a string including module and class""" - mod_str, _sep, class_str = import_str.rpartition('.') - try: - __import__(mod_str) - return getattr(sys.modules[mod_str], class_str) - except (ImportError, ValueError, AttributeError), e: - raise exception.ImportFailure(import_str=import_str, - reason=e) - - -def import_object(import_str): - """Returns an object including a module or module and class""" - try: - __import__(import_str) - return sys.modules[import_str] - except ImportError: - cls = import_class(import_str) - return cls() - - def generate_uuid(): return str(uuid.uuid4()) @@ -157,29 +86,6 @@ def isotime(at=None): str += ('Z' if tz == 'UTC' else tz) return str - -def parse_isotime(timestr): - """Turn an iso formatted time back into a datetime.""" - try: - return iso8601.parse_date(timestr) - except (iso8601.ParseError, TypeError) as e: - raise ValueError(e.message) - - -def normalize_time(timestamp): - """Normalize time in arbitrary timezone to UTC""" - offset = timestamp.utcoffset() - return timestamp.replace(tzinfo=None) - offset if offset else timestamp - -def utcnow(): - """Overridable version of utils.utcnow.""" - if utcnow.override_time: - return utcnow.override_time - return datetime.datetime.utcnow() - -utcnow.override_time = None - - class LoopingCallDone(Exception): """Exception to break out and stop a LoopingCall. diff --git a/heat/common/wsgi.py b/heat/common/wsgi.py index 3d7497748f..a2f5b7f51d 100644 --- a/heat/common/wsgi.py +++ b/heat/common/wsgi.py @@ -40,8 +40,8 @@ import webob.dec import webob.exc from heat.common import exception -from heat.common import utils from heat.openstack.common import cfg +from heat.openstack.common import utils bind_opts = [ diff --git a/heat/context.py b/heat/context.py index 914b494076..50c3fa7227 100644 --- a/heat/context.py +++ b/heat/context.py @@ -23,14 +23,15 @@ import copy import logging from heat.openstack.common import local -from heat.common import utils +from heat.openstack.common import utils +from heat.common import utils as heat_utils LOG = logging.getLogger(__name__) def generate_request_id(): - return 'req-' + str(utils.gen_uuid()) + return 'req-' + str(heat_utils.gen_uuid()) class RequestContext(object): @@ -74,7 +75,7 @@ class RequestContext(object): if not timestamp: timestamp = utils.utcnow() if isinstance(timestamp, basestring): - timestamp = utils.parse_strtime(timestamp) + timestamp = heat_utils.parse_strtime(timestamp) self.timestamp = timestamp if not request_id: request_id = generate_request_id() @@ -93,7 +94,7 @@ class RequestContext(object): 'read_deleted': self.read_deleted, 'roles': self.roles, 'remote_address': self.remote_address, - 'timestamp': utils.strtime(self.timestamp), + 'timestamp': heat_utils.strtime(self.timestamp), 'request_id': self.request_id, 'auth_token': self.auth_token} diff --git a/heat/db/api.py b/heat/db/api.py index c449703024..c3aa5006b3 100644 --- a/heat/db/api.py +++ b/heat/db/api.py @@ -22,11 +22,11 @@ Usage: >>> db.event_get(context, event_id) # Event object received -The underlying driver is loaded as a :class:`LazyPluggable`. SQLAlchemy is -currently the only supported backend. +The underlying driver is loaded . SQLAlchemy is currently the only +supported backend. ''' -from heat.common import utils +from heat.openstack.common import utils def configure(conf): global IMPL diff --git a/heat/engine/resources.py b/heat/engine/resources.py index a4649d8620..770778aff7 100644 --- a/heat/engine/resources.py +++ b/heat/engine/resources.py @@ -21,9 +21,6 @@ import string from novaclient.v1_1 import client from heat.db import api as db_api -from heat.common.config import HeatEngineConfigOpts -import pdb -db_api.configure(HeatEngineConfigOpts()) logger = logging.getLogger('heat.engine.resources') diff --git a/heat/openstack/common/exception.py b/heat/openstack/common/exception.py new file mode 100644 index 0000000000..ba32da550b --- /dev/null +++ b/heat/openstack/common/exception.py @@ -0,0 +1,147 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Exceptions common to OpenStack projects +""" + +import logging + + +class ProcessExecutionError(IOError): + def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None, + description=None): + if description is None: + description = "Unexpected error while running command." + if exit_code is None: + exit_code = '-' + message = "%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r" % ( + description, cmd, exit_code, stdout, stderr) + IOError.__init__(self, message) + + +class Error(Exception): + def __init__(self, message=None): + super(Error, self).__init__(message) + + +class ApiError(Error): + def __init__(self, message='Unknown', code='Unknown'): + self.message = message + self.code = code + super(ApiError, self).__init__('%s: %s' % (code, message)) + + +class NotFound(Error): + pass + + +class UnknownScheme(Error): + + msg = "Unknown scheme '%s' found in URI" + + def __init__(self, scheme): + msg = self.__class__.msg % scheme + super(UnknownScheme, self).__init__(msg) + + +class BadStoreUri(Error): + + msg = "The Store URI %s was malformed. Reason: %s" + + def __init__(self, uri, reason): + msg = self.__class__.msg % (uri, reason) + super(BadStoreUri, self).__init__(msg) + + +class Duplicate(Error): + pass + + +class NotAuthorized(Error): + pass + + +class NotEmpty(Error): + pass + + +class Invalid(Error): + pass + + +class BadInputError(Exception): + """Error resulting from a client sending bad input to a server""" + pass + + +class MissingArgumentError(Error): + pass + + +class DatabaseMigrationError(Error): + pass + + +class ClientConnectionError(Exception): + """Error resulting from a client connecting to a server""" + pass + + +def wrap_exception(f): + def _wrap(*args, **kw): + try: + return f(*args, **kw) + except Exception, e: + if not isinstance(e, Error): + #exc_type, exc_value, exc_traceback = sys.exc_info() + logging.exception('Uncaught exception') + #logging.error(traceback.extract_stack(exc_traceback)) + raise Error(str(e)) + raise + _wrap.func_name = f.func_name + return _wrap + + +class OpenstackException(Exception): + """ + Base Exception + + To correctly use this class, inherit from it and define + a 'message' property. That message will get printf'd + with the keyword arguments provided to the constructor. + """ + message = "An unknown exception occurred" + + def __init__(self, **kwargs): + try: + self._error_string = self.message % kwargs + + except Exception: + # at least get the core message out if something happened + self._error_string = self.message + + def __str__(self): + return self._error_string + + +class MalformedRequestBody(OpenstackException): + message = "Malformed message body: %(reason)s" + + +class InvalidContentType(OpenstackException): + message = "Invalid content type %(content_type)s" diff --git a/heat/openstack/common/utils.py b/heat/openstack/common/utils.py new file mode 100644 index 0000000000..ed238ebcf3 --- /dev/null +++ b/heat/openstack/common/utils.py @@ -0,0 +1,233 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +System-level utilities and helper functions. +""" + +import datetime +import logging +import os +import random +import shlex +import sys + +from eventlet import greenthread +from eventlet.green import subprocess +import iso8601 + +from heat.openstack.common import exception + + +TIME_FORMAT = "%Y-%m-%dT%H:%M:%S" +LOG = logging.getLogger(__name__) + + +def int_from_bool_as_string(subject): + """ + Interpret a string as a boolean and return either 1 or 0. + + Any string value in: + ('True', 'true', 'On', 'on', '1') + is interpreted as a boolean True. + + Useful for JSON-decoded stuff and config file parsing + """ + return bool_from_string(subject) and 1 or 0 + + +def bool_from_string(subject): + """ + Interpret a string as a boolean. + + Any string value in: + ('True', 'true', 'On', 'on', 'Yes', 'yes', '1') + is interpreted as a boolean True. + + Useful for JSON-decoded stuff and config file parsing + """ + if isinstance(subject, bool): + return subject + if isinstance(subject, basestring): + if subject.strip().lower() in ('true', 'on', 'yes', '1'): + return True + return False + + +def execute(*cmd, **kwargs): + """ + Helper method to execute command with optional retry. + + :cmd Passed to subprocess.Popen. + :process_input Send to opened process. + :check_exit_code Defaults to 0. Raise exception.ProcessExecutionError + unless program exits with this code. + :delay_on_retry True | False. Defaults to True. If set to True, wait a + short amount of time before retrying. + :attempts How many times to retry cmd. + :run_as_root True | False. Defaults to False. If set to True, + the command is prefixed by the command specified + in the root_helper kwarg. + :root_helper command to prefix all cmd's with + + :raises exception.Error on receiving unknown arguments + :raises exception.ProcessExecutionError + """ + + process_input = kwargs.pop('process_input', None) + check_exit_code = kwargs.pop('check_exit_code', 0) + delay_on_retry = kwargs.pop('delay_on_retry', True) + attempts = kwargs.pop('attempts', 1) + run_as_root = kwargs.pop('run_as_root', False) + root_helper = kwargs.pop('root_helper', '') + if len(kwargs): + raise exception.Error(_('Got unknown keyword args ' + 'to utils.execute: %r') % kwargs) + if run_as_root: + cmd = shlex.split(root_helper) + list(cmd) + cmd = map(str, cmd) + + while attempts > 0: + attempts -= 1 + try: + LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd)) + _PIPE = subprocess.PIPE # pylint: disable=E1101 + obj = subprocess.Popen(cmd, + stdin=_PIPE, + stdout=_PIPE, + stderr=_PIPE, + close_fds=True) + result = None + if process_input is not None: + result = obj.communicate(process_input) + else: + result = obj.communicate() + obj.stdin.close() # pylint: disable=E1101 + _returncode = obj.returncode # pylint: disable=E1101 + if _returncode: + LOG.debug(_('Result was %s') % _returncode) + if (isinstance(check_exit_code, int) and + not isinstance(check_exit_code, bool) and + _returncode != check_exit_code): + (stdout, stderr) = result + raise exception.ProcessExecutionError( + exit_code=_returncode, + stdout=stdout, + stderr=stderr, + cmd=' '.join(cmd)) + return result + except exception.ProcessExecutionError: + if not attempts: + raise + else: + LOG.debug(_('%r failed. Retrying.'), cmd) + if delay_on_retry: + greenthread.sleep(random.randint(20, 200) / 100.0) + finally: + # NOTE(termie): this appears to be necessary to let the subprocess + # call clean something up in between calls, without + # it two execute calls in a row hangs the second one + greenthread.sleep(0) + + +def import_class(import_str): + """Returns a class from a string including module and class""" + mod_str, _sep, class_str = import_str.rpartition('.') + try: + __import__(mod_str) + return getattr(sys.modules[mod_str], class_str) + except (ImportError, ValueError, AttributeError): + raise exception.NotFound('Class %s cannot be found' % class_str) + + +def import_object(import_str): + """Returns an object including a module or module and class""" + try: + __import__(import_str) + return sys.modules[import_str] + except ImportError: + return import_class(import_str) + + +def isotime(at=None): + """Stringify time in ISO 8601 format""" + if not at: + at = datetime.datetime.utcnow() + str = at.strftime(TIME_FORMAT) + tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC' + str += ('Z' if tz == 'UTC' else tz) + return str + + +def parse_isotime(timestr): + """Parse time from ISO 8601 format""" + try: + return iso8601.parse_date(timestr) + except iso8601.ParseError as e: + raise ValueError(e.message) + except TypeError as e: + raise ValueError(e.message) + + +def normalize_time(timestamp): + """Normalize time in arbitrary timezone to UTC""" + offset = timestamp.utcoffset() + return timestamp.replace(tzinfo=None) - offset if offset else timestamp + + +def utcnow(): + """Overridable version of utils.utcnow.""" + if utcnow.override_time: + return utcnow.override_time + return datetime.datetime.utcnow() + + +utcnow.override_time = None + + +def set_time_override(override_time=datetime.datetime.utcnow()): + """Override utils.utcnow to return a constant time.""" + utcnow.override_time = override_time + + +def clear_time_override(): + """Remove the overridden time.""" + utcnow.override_time = None + + +def auth_str_equal(provided, known): + """Constant-time string comparison. + + :params provided: the first string + :params known: the second string + + :return: True if the strings are equal. + + This function takes two strings and compares them. It is intended to be + used when doing a comparison for authentication purposes to help guard + against timing attacks. When using the function for this purpose, always + provide the user-provided password as the first argument. The time this + function will take is always a factor of the length of this string. + """ + result = 0 + p_len = len(provided) + k_len = len(known) + for i in xrange(p_len): + a = ord(provided[i]) if i < p_len else 0 + b = ord(known[i]) if i < k_len else 0 + result |= a ^ b + return (p_len == k_len) & (result == 0) diff --git a/heat/rpc/__init__.py b/heat/rpc/__init__.py index 6ad0992c22..a352b34f30 100644 --- a/heat/rpc/__init__.py +++ b/heat/rpc/__init__.py @@ -18,16 +18,7 @@ # under the License. from heat.openstack.common import cfg -from heat.common import utils -from heat.common import config - - -rpc_backend_opt = cfg.StrOpt('rpc_backend', - default='heat.rpc.impl_qpid', - help="The messaging module to use, defaults to kombu.") - -FLAGS = config.FLAGS -FLAGS.register_opt(rpc_backend_opt) +from heat.openstack.common import utils def create_connection(new=True): @@ -193,10 +184,17 @@ def fanout_cast_to_server(context, server_params, topic, msg): _RPCIMPL = None +def configure(conf): + """Delay import of rpc_backend until FLAGS are loaded.""" + print 'configuring rpc %s' % conf.rpc_backend + global _RPCIMPL + _RPCIMPL = utils.import_object(conf.rpc_backend) def _get_impl(): """Delay import of rpc_backend until FLAGS are loaded.""" global _RPCIMPL if _RPCIMPL is None: - _RPCIMPL = utils.import_object(FLAGS.rpc_backend) + print 'rpc not configured' + return _RPCIMPL + diff --git a/heat/rpc/amqp.py b/heat/rpc/amqp.py index c05f0be764..3faec435d6 100644 --- a/heat/rpc/amqp.py +++ b/heat/rpc/amqp.py @@ -42,13 +42,14 @@ from heat.openstack.common import local import heat.rpc.common as rpc_common LOG = logging.getLogger(__name__) +FLAGS = config.FLAGS class Pool(pools.Pool): """Class that implements a Pool of Connections.""" def __init__(self, *args, **kwargs): self.connection_cls = kwargs.pop("connection_cls", None) - kwargs.setdefault("max_size", config.FLAGS.rpc_conn_pool_size) + kwargs.setdefault("max_size", FLAGS.rpc_conn_pool_size) kwargs.setdefault("order_as_stack", True) super(Pool, self).__init__(*args, **kwargs) @@ -206,7 +207,7 @@ class ProxyCallback(object): def __init__(self, proxy, connection_pool): self.proxy = proxy - self.pool = greenpool.GreenPool(config.FLAGS.rpc_thread_pool_size) + self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size) self.connection_pool = connection_pool def __call__(self, message_data): @@ -267,7 +268,7 @@ class MulticallWaiter(object): def __init__(self, connection, timeout): self._connection = connection self._iterator = connection.iterconsume( - timeout=timeout or config.FLAGS.rpc_response_timeout) + timeout=timeout or FLAGS.rpc_response_timeout) self._result = None self._done = False self._got_ending = False diff --git a/heat/rpc/common.py b/heat/rpc/common.py index fb621cc3f9..28d182475b 100644 --- a/heat/rpc/common.py +++ b/heat/rpc/common.py @@ -20,29 +20,15 @@ import copy import logging -from heat.common import exception from heat.openstack.common import cfg +from heat.openstack.common import exception from heat.common import config LOG = logging.getLogger(__name__) -rpc_opts = [ - cfg.IntOpt('rpc_thread_pool_size', - default=1024, - help='Size of RPC thread pool'), - cfg.IntOpt('rpc_conn_pool_size', - default=30, - help='Size of RPC connection pool'), - cfg.IntOpt('rpc_response_timeout', - default=60, - help='Seconds to wait for a response from call or multicall'), - ] -config.FLAGS.register_opts(rpc_opts) - - -class RemoteError(exception.NovaException): +class RemoteError(exception.OpenstackException): """Signifies that a remote class has raised an exception. Contains a string representation of the type of the original exception, @@ -62,7 +48,7 @@ class RemoteError(exception.NovaException): traceback=traceback) -class Timeout(exception.NovaException): +class Timeout(exception.OpenstackException): """Signifies that a timeout has occurred. This exception is raised if the rpc_response_timeout is reached while diff --git a/heat/rpc/impl_qpid.py b/heat/rpc/impl_qpid.py index 95b9ee52da..9d83c95005 100644 --- a/heat/rpc/impl_qpid.py +++ b/heat/rpc/impl_qpid.py @@ -33,55 +33,6 @@ from heat.rpc import common as rpc_common LOG = logging.getLogger(__name__) -qpid_opts = [ - cfg.StrOpt('qpid_hostname', - default='localhost', - help='Qpid broker hostname'), - cfg.StrOpt('qpid_port', - default='5672', - help='Qpid broker port'), - cfg.StrOpt('qpid_username', - default='', - help='Username for qpid connection'), - cfg.StrOpt('qpid_password', - default='', - help='Password for qpid connection'), - cfg.StrOpt('qpid_sasl_mechanisms', - default='', - help='Space separated list of SASL mechanisms to use for auth'), - cfg.BoolOpt('qpid_reconnect', - default=True, - help='Automatically reconnect'), - cfg.IntOpt('qpid_reconnect_timeout', - default=0, - help='Reconnection timeout in seconds'), - cfg.IntOpt('qpid_reconnect_limit', - default=0, - help='Max reconnections before giving up'), - cfg.IntOpt('qpid_reconnect_interval_min', - default=0, - help='Minimum seconds between reconnection attempts'), - cfg.IntOpt('qpid_reconnect_interval_max', - default=0, - help='Maximum seconds between reconnection attempts'), - cfg.IntOpt('qpid_reconnect_interval', - default=0, - help='Equivalent to setting max and min to the same value'), - cfg.IntOpt('qpid_heartbeat', - default=5, - help='Seconds between connection keepalive heartbeats'), - cfg.StrOpt('qpid_protocol', - default='tcp', - help="Transport to use, either 'tcp' or 'ssl'"), - cfg.BoolOpt('qpid_tcp_nodelay', - default=True, - help='Disable Nagle algorithm'), - ] - -FLAGS = config.FLAGS -FLAGS.register_opts(qpid_opts) - - class ConsumerBase(object): """Consumer base class.""" @@ -174,7 +125,7 @@ class TopicConsumer(ConsumerBase): """ super(TopicConsumer, self).__init__(session, callback, - "%s/%s" % (FLAGS.control_exchange, topic), {}, + "%s/%s" % (config.FLAGS.control_exchange, topic), {}, topic, {}) @@ -248,7 +199,7 @@ class TopicPublisher(Publisher): """init a 'topic' publisher. """ super(TopicPublisher, self).__init__(session, - "%s/%s" % (FLAGS.control_exchange, topic)) + "%s/%s" % (config.FLAGS.control_exchange, topic)) class FanoutPublisher(Publisher): @@ -266,7 +217,7 @@ class NotifyPublisher(Publisher): """init a 'topic' publisher. """ super(NotifyPublisher, self).__init__(session, - "%s/%s" % (FLAGS.control_exchange, topic), + "%s/%s" % (config.FLAGS.control_exchange, topic), {"durable": True}) @@ -281,10 +232,10 @@ class Connection(object): if server_params is None: server_params = {} - default_params = dict(hostname=FLAGS.qpid_hostname, - port=FLAGS.qpid_port, - username=FLAGS.qpid_username, - password=FLAGS.qpid_password) + default_params = dict(hostname=config.FLAGS.qpid_hostname, + port=config.FLAGS.qpid_port, + username=config.FLAGS.qpid_username, + password=config.FLAGS.qpid_password) params = server_params for key in default_params.keys(): @@ -298,23 +249,23 @@ class Connection(object): # before we call open self.connection.username = params['username'] self.connection.password = params['password'] - self.connection.sasl_mechanisms = FLAGS.qpid_sasl_mechanisms - self.connection.reconnect = FLAGS.qpid_reconnect - if FLAGS.qpid_reconnect_timeout: - self.connection.reconnect_timeout = FLAGS.qpid_reconnect_timeout - if FLAGS.qpid_reconnect_limit: - self.connection.reconnect_limit = FLAGS.qpid_reconnect_limit - if FLAGS.qpid_reconnect_interval_max: + self.connection.sasl_mechanisms = config.FLAGS.qpid_sasl_mechanisms + self.connection.reconnect = config.FLAGS.qpid_reconnect + if config.FLAGS.qpid_reconnect_timeout: + self.connection.reconnect_timeout = config.FLAGS.qpid_reconnect_timeout + if config.FLAGS.qpid_reconnect_limit: + self.connection.reconnect_limit = config.FLAGS.qpid_reconnect_limit + if config.FLAGS.qpid_reconnect_interval_max: self.connection.reconnect_interval_max = ( - FLAGS.qpid_reconnect_interval_max) - if FLAGS.qpid_reconnect_interval_min: + config.FLAGS.qpid_reconnect_interval_max) + if config.FLAGS.qpid_reconnect_interval_min: self.connection.reconnect_interval_min = ( - FLAGS.qpid_reconnect_interval_min) - if FLAGS.qpid_reconnect_interval: - self.connection.reconnect_interval = FLAGS.qpid_reconnect_interval - self.connection.hearbeat = FLAGS.qpid_heartbeat - self.connection.protocol = FLAGS.qpid_protocol - self.connection.tcp_nodelay = FLAGS.qpid_tcp_nodelay + config.FLAGS.qpid_reconnect_interval_min) + if config.FLAGS.qpid_reconnect_interval: + self.connection.reconnect_interval = config.FLAGS.qpid_reconnect_interval + self.connection.hearbeat = config.FLAGS.qpid_heartbeat + self.connection.protocol = config.FLAGS.qpid_protocol + self.connection.tcp_nodelay = config.FLAGS.qpid_tcp_nodelay # Open is part of reconnect - # NOTE(WGH) not sure we need this with the reconnect flags @@ -339,7 +290,7 @@ class Connection(object): self.connection.open() except qpid.messaging.exceptions.ConnectionError, e: LOG.error(_('Unable to connect to AMQP server: %s ') % e) - time.sleep(FLAGS.qpid_reconnect_interval or 1) + time.sleep(config.FLAGS.qpid_reconnect_interval or 1) else: break diff --git a/heat/service.py b/heat/service.py index bf4340f554..126045ab39 100644 --- a/heat/service.py +++ b/heat/service.py @@ -27,9 +27,9 @@ import logging import greenlet from heat.openstack.common import cfg +from heat.openstack.common import utils -from heat.common import utils -from heat.common import config +from heat.common import utils as heat_utils from heat.common import exception from heat import context @@ -38,46 +38,6 @@ from heat import version LOG = logging.getLogger(__name__) -service_opts = [ - cfg.IntOpt('report_interval', - default=10, - help='seconds between nodes reporting state to datastore'), - cfg.IntOpt('periodic_interval', - default=60, - help='seconds between running periodic tasks'), - cfg.StrOpt('ec2_listen', - default="0.0.0.0", - help='IP address for EC2 API to listen'), - cfg.IntOpt('ec2_listen_port', - default=8773, - help='port for ec2 api to listen'), - cfg.StrOpt('osapi_compute_listen', - default="0.0.0.0", - help='IP address for OpenStack API to listen'), - cfg.IntOpt('osapi_compute_listen_port', - default=8774, - help='list port for osapi compute'), - cfg.StrOpt('metadata_manager', - default='nova.api.manager.MetadataManager', - help='OpenStack metadata service manager'), - cfg.StrOpt('metadata_listen', - default="0.0.0.0", - help='IP address for metadata api to listen'), - cfg.IntOpt('metadata_listen_port', - default=8775, - help='port for metadata api to listen'), - cfg.StrOpt('osapi_volume_listen', - default="0.0.0.0", - help='IP address for OpenStack Volume API to listen'), - cfg.IntOpt('osapi_volume_listen_port', - default=8776, - help='port for os volume api to listen'), - ] - -FLAGS = config.FLAGS -FLAGS.register_opts(service_opts) - - class Launcher(object): """Launch one or more services and wait for them to complete.""" @@ -178,7 +138,7 @@ class Service(object): self.conn.consume_in_thread() if self.periodic_interval: - periodic = utils.LoopingCall(self.periodic_tasks) + periodic = heat_utils.LoopingCall(self.periodic_tasks) periodic.start(interval=self.periodic_interval, now=False) self.timers.append(periodic) @@ -188,7 +148,7 @@ class Service(object): @classmethod def create(cls, host=None, binary=None, topic=None, manager=None, - periodic_interval=None): + periodic_interval=None, config=None): """Instantiates class and passes back application object. :param host: defaults to FLAGS.host @@ -198,6 +158,8 @@ class Service(object): :param periodic_interval: defaults to FLAGS.periodic_interval """ + global FLAGS + FLAGS = config if not host: host = FLAGS.host if not binary: diff --git a/openstack-common.conf b/openstack-common.conf index 6d10b5a971..94d5c1237d 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -1,7 +1,7 @@ [DEFAULT] # The list of modules to copy from openstack-common -modules=cfg,local,iniparser +modules=cfg,local,iniparser,utils,exception # The base module to hold the copy of openstack.common base=heat