diff --git a/cloudkitty/api/app.py b/cloudkitty/api/app.py index 8923d768..f7fd254d 100644 --- a/cloudkitty/api/app.py +++ b/cloudkitty/api/app.py @@ -19,15 +19,18 @@ import os from wsgiref import simple_server from oslo.config import cfg +from oslo import messaging from paste import deploy import pecan from cloudkitty.api import config as api_config +from cloudkitty.api import hooks +from cloudkitty.common import rpc from cloudkitty import config # noqa -from cloudkitty.openstack.common import log +from cloudkitty.openstack.common import log as logging -LOG = log.getLogger(__name__) +LOG = logging.getLogger(__name__) auth_opts = [ cfg.StrOpt('api_paste_config', @@ -62,12 +65,22 @@ def setup_app(pecan_config=None, extra_hooks=None): app_conf = get_pecan_config() + target = messaging.Target(topic='cloudkitty', + version='1.0') + + client = rpc.get_client(target) + + app_hooks = [ + hooks.RPCHook(client) + ] + return pecan.make_app( app_conf.app.root, static_root=app_conf.app.static_root, template_path=app_conf.app.template_path, debug=CONF.debug, force_canonical=getattr(app_conf.app, 'force_canonical', True), + hooks=app_hooks, guess_content_type_from_ext=False ) diff --git a/cloudkitty/api/controllers/root.py b/cloudkitty/api/controllers/root.py index cb2a4b2d..97034a4b 100644 --- a/cloudkitty/api/controllers/root.py +++ b/cloudkitty/api/controllers/root.py @@ -23,7 +23,6 @@ import wsmeext.pecan as wsme_pecan from cloudkitty.api.controllers import v1 from cloudkitty.openstack.common import log as logging - LOG = logging.getLogger(__name__) diff --git a/cloudkitty/api/controllers/v1.py b/cloudkitty/api/controllers/v1.py index 8bd87e00..a681da62 100644 --- a/cloudkitty/api/controllers/v1.py +++ b/cloudkitty/api/controllers/v1.py @@ -16,6 +16,7 @@ # @author: Stéphane Albert # from oslo.config import cfg +import pecan from pecan import rest from stevedore import extension from wsme import types as wtypes @@ -113,19 +114,7 @@ class BillingController(rest.RestController): :param res_data: List of resource descriptions. :return: Total price for these descriptions. """ - - # TODO(sheeprine): Send RPC request for quote - from cloudkitty import extension_manager - b_processors = {} - processors = extension_manager.EnabledExtensionManager( - 'cloudkitty.billing.processors', - ) - - for processor in processors: - b_name = processor.name - b_obj = processor.obj - b_processors[b_name] = b_obj - + client = pecan.request.rpc_client.prepare(namespace='billing') res_dict = {} for res in res_data: if res.service not in res_dict: @@ -133,14 +122,8 @@ class BillingController(rest.RestController): json_data = res.to_json() res_dict[res.service].extend(json_data[res.service]) - for processor in b_processors.values(): - processor.process([{'usage': res_dict}]) - - price = 0.0 - for res in res_dict.values(): - for data in res: - price += data.get('billing', {}).get('price', 0.0) - return price + res = client.call({}, 'quote', res_data=[{'usage': res_dict}]) + return res class ReportController(rest.RestController): diff --git a/cloudkitty/api/hooks.py b/cloudkitty/api/hooks.py new file mode 100644 index 00000000..e50f0ae8 --- /dev/null +++ b/cloudkitty/api/hooks.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Stéphane Albert +# +from pecan import hooks + + +class RPCHook(hooks.PecanHook): + def __init__(self, rcp_client): + self._rpc_client = rcp_client + + def before(self, state): + state.request.rpc_client = self._rpc_client diff --git a/cloudkitty/billing/__init__.py b/cloudkitty/billing/__init__.py index a076369f..777d783b 100644 --- a/cloudkitty/billing/__init__.py +++ b/cloudkitty/billing/__init__.py @@ -83,6 +83,13 @@ class BillingEnableController(rest.RestController): """ api = db_api.get_instance() module_db = api.get_module_enable_state() + client = pecan.request.rpc_client.prepare(namespace='billing', + fanout=True) + if state: + operation = 'enable_module' + else: + operation = 'disable_module' + client.cast({}, operation, name=self.module_name) return module_db.set_state(self.module_name, state) @@ -92,6 +99,11 @@ class BillingConfigController(rest.RestController): """ + def notify_reload(self): + client = pecan.request.rpc_client.prepare(namespace='billing', + fanout=True) + client.cast({}, 'reload_module', name=self.module_name) + def _not_configurable(self): try: raise BillingModuleNotConfigurable(self.module_name) diff --git a/cloudkitty/billing/hash/__init__.py b/cloudkitty/billing/hash/__init__.py index 880ecb24..38fb1a08 100644 --- a/cloudkitty/billing/hash/__init__.py +++ b/cloudkitty/billing/hash/__init__.py @@ -25,7 +25,6 @@ from cloudkitty.billing.hash.db import api from cloudkitty.db import api as db_api from cloudkitty.openstack.common import log as logging - LOG = logging.getLogger(__name__) MAP_TYPE = wtypes.Enum(wtypes.text, 'flat', 'rate') @@ -146,6 +145,7 @@ class BasicHashMapConfigController(billing.BillingConfigController): pecan.response.headers['Location'] = pecan.request.path except api.ServiceAlreadyExists as e: pecan.abort(409, str(e)) + self.notify_reload() pecan.response.status = 201 @wsme_pecan.wsexpose(None, wtypes.text, wtypes.text, wtypes.text, diff --git a/cloudkitty/billing/hash/db/sqlalchemy/api.py b/cloudkitty/billing/hash/db/sqlalchemy/api.py index 26c98d47..7df2040c 100644 --- a/cloudkitty/billing/hash/db/sqlalchemy/api.py +++ b/cloudkitty/billing/hash/db/sqlalchemy/api.py @@ -26,7 +26,6 @@ from cloudkitty.billing.hash.db.sqlalchemy import models from cloudkitty import db from cloudkitty.openstack.common import log as logging - LOG = logging.getLogger(__name__) diff --git a/cloudkitty/cli/api.py b/cloudkitty/cli/api.py index 214a1cac..094f52a8 100644 --- a/cloudkitty/cli/api.py +++ b/cloudkitty/cli/api.py @@ -15,17 +15,14 @@ # # @author: Stéphane Albert # -import sys - -from oslo.config import cfg - from cloudkitty.api import app -from cloudkitty.openstack.common import log as logging +from cloudkitty.common import rpc +from cloudkitty import service def main(): - cfg.CONF(sys.argv[1:], project='cloudkitty') - logging.setup('cloudkitty') + service.prepare_service() + rpc.init() server = app.build_server() try: server.serve_forever() diff --git a/cloudkitty/cli/dbsync.py b/cloudkitty/cli/dbsync.py index c65ad6a7..6e0d2dda 100644 --- a/cloudkitty/cli/dbsync.py +++ b/cloudkitty/cli/dbsync.py @@ -15,14 +15,12 @@ # # @author: Stéphane Albert # -import sys - from oslo.config import cfg from stevedore import extension from cloudkitty import config # noqa from cloudkitty.db import api as db_api -from cloudkitty.openstack.common import log as logging +from cloudkitty import service CONF = cfg.CONF @@ -147,6 +145,5 @@ CONF.register_cli_opt(command_opt) def main(): - cfg.CONF(sys.argv[1:], project='cloudkitty') - logging.setup('cloudkitty') + service.prepare_service() CONF.command.func() diff --git a/cloudkitty/cli/processor.py b/cloudkitty/cli/processor.py new file mode 100644 index 00000000..a20d9bc1 --- /dev/null +++ b/cloudkitty/cli/processor.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Stéphane Albert +# +from oslo.config import cfg + +from cloudkitty.common import rpc +from cloudkitty.openstack.common import log as logging +from cloudkitty import orchestrator +from cloudkitty import service + +LOG = logging.getLogger(__name__) + +CONF = cfg.CONF + + +def main(): + service.prepare_service() + rpc.init() + processor = orchestrator.Orchestrator() + try: + processor.process() + except KeyboardInterrupt: + pass + + +if __name__ == '__main__': + main() diff --git a/cloudkitty/common/rpc.py b/cloudkitty/common/rpc.py new file mode 100644 index 00000000..7f63e27f --- /dev/null +++ b/cloudkitty/common/rpc.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Stéphane Albert +# +from oslo.config import cfg +from oslo import messaging + +TRANSPORT = None + + +def init(): + global TRANSPORT + TRANSPORT = messaging.get_transport(cfg.CONF) + + +def get_client(target, version_cap=None): + assert TRANSPORT is not None + return messaging.RPCClient(TRANSPORT, + target, + version_cap=version_cap) + + +def get_server(target, endpoints): + assert TRANSPORT is not None + return messaging.get_rpc_server(TRANSPORT, + target, + endpoints, + executor='eventlet') diff --git a/cloudkitty/openstack/common/excutils.py b/cloudkitty/openstack/common/excutils.py new file mode 100644 index 00000000..4907c6b6 --- /dev/null +++ b/cloudkitty/openstack/common/excutils.py @@ -0,0 +1,113 @@ +# Copyright 2011 OpenStack Foundation. +# Copyright 2012, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Exception related utilities. +""" + +import logging +import sys +import time +import traceback + +import six + +from cloudkitty.openstack.common.gettextutils import _LE + + +class save_and_reraise_exception(object): + """Save current exception, run some code and then re-raise. + + In some cases the exception context can be cleared, resulting in None + being attempted to be re-raised after an exception handler is run. This + can happen when eventlet switches greenthreads or when running an + exception handler, code raises and catches an exception. In both + cases the exception context will be cleared. + + To work around this, we save the exception state, run handler code, and + then re-raise the original exception. If another exception occurs, the + saved exception is logged and the new exception is re-raised. + + In some cases the caller may not want to re-raise the exception, and + for those circumstances this context provides a reraise flag that + can be used to suppress the exception. For example:: + + except Exception: + with save_and_reraise_exception() as ctxt: + decide_if_need_reraise() + if not should_be_reraised: + ctxt.reraise = False + + If another exception occurs and reraise flag is False, + the saved exception will not be logged. + + If the caller wants to raise new exception during exception handling + he/she sets reraise to False initially with an ability to set it back to + True if needed:: + + except Exception: + with save_and_reraise_exception(reraise=False) as ctxt: + [if statements to determine whether to raise a new exception] + # Not raising a new exception, so reraise + ctxt.reraise = True + """ + def __init__(self, reraise=True): + self.reraise = reraise + + def __enter__(self): + self.type_, self.value, self.tb, = sys.exc_info() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + if self.reraise: + logging.error(_LE('Original exception being dropped: %s'), + traceback.format_exception(self.type_, + self.value, + self.tb)) + return False + if self.reraise: + six.reraise(self.type_, self.value, self.tb) + + +def forever_retry_uncaught_exceptions(infunc): + def inner_func(*args, **kwargs): + last_log_time = 0 + last_exc_message = None + exc_count = 0 + while True: + try: + return infunc(*args, **kwargs) + except Exception as exc: + this_exc_message = six.u(str(exc)) + if this_exc_message == last_exc_message: + exc_count += 1 + else: + exc_count = 1 + # Do not log any more frequently than once a minute unless + # the exception message changes + cur_time = int(time.time()) + if (cur_time - last_log_time > 60 or + this_exc_message != last_exc_message): + logging.exception( + _LE('Unexpected exception occurred %d time(s)... ' + 'retrying.') % exc_count) + last_log_time = cur_time + last_exc_message = this_exc_message + exc_count = 0 + # This should be a very rare event. In case it isn't, do + # a sleep. + time.sleep(1) + return inner_func diff --git a/cloudkitty/openstack/common/fileutils.py b/cloudkitty/openstack/common/fileutils.py new file mode 100644 index 00000000..d8eecde2 --- /dev/null +++ b/cloudkitty/openstack/common/fileutils.py @@ -0,0 +1,146 @@ +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib +import errno +import os +import tempfile + +from cloudkitty.openstack.common import excutils +from cloudkitty.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + +_FILE_CACHE = {} + + +def ensure_tree(path): + """Create a directory (and any ancestor directories required) + + :param path: Directory to create + """ + try: + os.makedirs(path) + except OSError as exc: + if exc.errno == errno.EEXIST: + if not os.path.isdir(path): + raise + else: + raise + + +def read_cached_file(filename, force_reload=False): + """Read from a file if it has been modified. + + :param force_reload: Whether to reload the file. + :returns: A tuple with a boolean specifying if the data is fresh + or not. + """ + global _FILE_CACHE + + if force_reload: + delete_cached_file(filename) + + reloaded = False + mtime = os.path.getmtime(filename) + cache_info = _FILE_CACHE.setdefault(filename, {}) + + if not cache_info or mtime > cache_info.get('mtime', 0): + LOG.debug("Reloading cached file %s" % filename) + with open(filename) as fap: + cache_info['data'] = fap.read() + cache_info['mtime'] = mtime + reloaded = True + return (reloaded, cache_info['data']) + + +def delete_cached_file(filename): + """Delete cached file if present. + + :param filename: filename to delete + """ + global _FILE_CACHE + + if filename in _FILE_CACHE: + del _FILE_CACHE[filename] + + +def delete_if_exists(path, remove=os.unlink): + """Delete a file, but ignore file not found error. + + :param path: File to delete + :param remove: Optional function to remove passed path + """ + + try: + remove(path) + except OSError as e: + if e.errno != errno.ENOENT: + raise + + +@contextlib.contextmanager +def remove_path_on_error(path, remove=delete_if_exists): + """Protect code that wants to operate on PATH atomically. + Any exception will cause PATH to be removed. + + :param path: File to work with + :param remove: Optional function to remove passed path + """ + + try: + yield + except Exception: + with excutils.save_and_reraise_exception(): + remove(path) + + +def file_open(*args, **kwargs): + """Open file + + see built-in open() documentation for more details + + Note: The reason this is kept in a separate module is to easily + be able to provide a stub module that doesn't alter system + state at all (for unit tests) + """ + return open(*args, **kwargs) + + +def write_to_tempfile(content, path=None, suffix='', prefix='tmp'): + """Create temporary file or use existing file. + + This util is needed for creating temporary file with + specified content, suffix and prefix. If path is not None, + it will be used for writing content. If the path doesn't + exist it'll be created. + + :param content: content for temporary file. + :param path: same as parameter 'dir' for mkstemp + :param suffix: same as parameter 'suffix' for mkstemp + :param prefix: same as parameter 'prefix' for mkstemp + + For example: it can be used in database tests for creating + configuration files. + """ + if path: + ensure_tree(path) + + (fd, path) = tempfile.mkstemp(suffix=suffix, dir=path, prefix=prefix) + try: + os.write(fd, content) + finally: + os.close(fd) + return path diff --git a/cloudkitty/openstack/common/jsonutils.py b/cloudkitty/openstack/common/jsonutils.py index 5d8ed23a..cdad06a8 100644 --- a/cloudkitty/openstack/common/jsonutils.py +++ b/cloudkitty/openstack/common/jsonutils.py @@ -38,11 +38,13 @@ import inspect import itertools import sys +is_simplejson = False if sys.version_info < (2, 7): # On Python <= 2.6, json module is not C boosted, so try to use # simplejson module if available try: import simplejson as json + is_simplejson = True except ImportError: import json else: @@ -165,10 +167,14 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, def dumps(value, default=to_primitive, **kwargs): + if is_simplejson: + kwargs['namedtuple_as_object'] = False return json.dumps(value, default=default, **kwargs) def dump(obj, fp, *args, **kwargs): + if is_simplejson: + kwargs['namedtuple_as_object'] = False return json.dump(obj, fp, *args, **kwargs) diff --git a/cloudkitty/openstack/common/lockutils.py b/cloudkitty/openstack/common/lockutils.py new file mode 100644 index 00000000..815081be --- /dev/null +++ b/cloudkitty/openstack/common/lockutils.py @@ -0,0 +1,377 @@ +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib +import errno +import functools +import logging +import os +import shutil +import subprocess +import sys +import tempfile +import threading +import time +import weakref + +from oslo.config import cfg + +from cloudkitty.openstack.common import fileutils +from cloudkitty.openstack.common.gettextutils import _, _LE, _LI + + +LOG = logging.getLogger(__name__) + + +util_opts = [ + cfg.BoolOpt('disable_process_locking', default=False, + help='Enables or disables inter-process locks.'), + cfg.StrOpt('lock_path', + default=os.environ.get("CLOUDKITTY_LOCK_PATH"), + help='Directory to use for lock files.') +] + + +CONF = cfg.CONF +CONF.register_opts(util_opts) + + +def set_defaults(lock_path): + cfg.set_defaults(util_opts, lock_path=lock_path) + + +class _FileLock(object): + """Lock implementation which allows multiple locks, working around + issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does + not require any cleanup. Since the lock is always held on a file + descriptor rather than outside of the process, the lock gets dropped + automatically if the process crashes, even if __exit__ is not executed. + + There are no guarantees regarding usage by multiple green threads in a + single process here. This lock works only between processes. Exclusive + access between local threads should be achieved using the semaphores + in the @synchronized decorator. + + Note these locks are released when the descriptor is closed, so it's not + safe to close the file descriptor while another green thread holds the + lock. Just opening and closing the lock file can break synchronisation, + so lock files must be accessed only using this abstraction. + """ + + def __init__(self, name): + self.lockfile = None + self.fname = name + + def acquire(self): + basedir = os.path.dirname(self.fname) + + if not os.path.exists(basedir): + fileutils.ensure_tree(basedir) + LOG.info(_LI('Created lock path: %s'), basedir) + + self.lockfile = open(self.fname, 'w') + + while True: + try: + # Using non-blocking locks since green threads are not + # patched to deal with blocking locking calls. + # Also upon reading the MSDN docs for locking(), it seems + # to have a laughable 10 attempts "blocking" mechanism. + self.trylock() + LOG.debug('Got file lock "%s"', self.fname) + return True + except IOError as e: + if e.errno in (errno.EACCES, errno.EAGAIN): + # external locks synchronise things like iptables + # updates - give it some time to prevent busy spinning + time.sleep(0.01) + else: + raise threading.ThreadError(_("Unable to acquire lock on" + " `%(filename)s` due to" + " %(exception)s") % + {'filename': self.fname, + 'exception': e}) + + def __enter__(self): + self.acquire() + return self + + def release(self): + try: + self.unlock() + self.lockfile.close() + LOG.debug('Released file lock "%s"', self.fname) + except IOError: + LOG.exception(_LE("Could not release the acquired lock `%s`"), + self.fname) + + def __exit__(self, exc_type, exc_val, exc_tb): + self.release() + + def exists(self): + return os.path.exists(self.fname) + + def trylock(self): + raise NotImplementedError() + + def unlock(self): + raise NotImplementedError() + + +class _WindowsLock(_FileLock): + def trylock(self): + msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1) + + def unlock(self): + msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1) + + +class _FcntlLock(_FileLock): + def trylock(self): + fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) + + def unlock(self): + fcntl.lockf(self.lockfile, fcntl.LOCK_UN) + + +class _PosixLock(object): + def __init__(self, name): + # Hash the name because it's not valid to have POSIX semaphore + # names with things like / in them. Then use base64 to encode + # the digest() instead taking the hexdigest() because the + # result is shorter and most systems can't have shm sempahore + # names longer than 31 characters. + h = hashlib.sha1() + h.update(name.encode('ascii')) + self.name = str((b'/' + base64.urlsafe_b64encode( + h.digest())).decode('ascii')) + + def acquire(self, timeout=None): + self.semaphore = posix_ipc.Semaphore(self.name, + flags=posix_ipc.O_CREAT, + initial_value=1) + self.semaphore.acquire(timeout) + return self + + def __enter__(self): + self.acquire() + return self + + def release(self): + self.semaphore.release() + self.semaphore.close() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.release() + + def exists(self): + try: + semaphore = posix_ipc.Semaphore(self.name) + except posix_ipc.ExistentialError: + return False + else: + semaphore.close() + return True + + +if os.name == 'nt': + import msvcrt + InterProcessLock = _WindowsLock + FileLock = _WindowsLock +else: + import base64 + import fcntl + import hashlib + + import posix_ipc + InterProcessLock = _PosixLock + FileLock = _FcntlLock + +_semaphores = weakref.WeakValueDictionary() +_semaphores_lock = threading.Lock() + + +def _get_lock_path(name, lock_file_prefix, lock_path=None): + # NOTE(mikal): the lock name cannot contain directory + # separators + name = name.replace(os.sep, '_') + if lock_file_prefix: + sep = '' if lock_file_prefix.endswith('-') else '-' + name = '%s%s%s' % (lock_file_prefix, sep, name) + + local_lock_path = lock_path or CONF.lock_path + + if not local_lock_path: + # NOTE(bnemec): Create a fake lock path for posix locks so we don't + # unnecessarily raise the RequiredOptError below. + if InterProcessLock is not _PosixLock: + raise cfg.RequiredOptError('lock_path') + local_lock_path = 'posixlock:/' + + return os.path.join(local_lock_path, name) + + +def external_lock(name, lock_file_prefix=None, lock_path=None): + LOG.debug('Attempting to grab external lock "%(lock)s"', + {'lock': name}) + + lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path) + + # NOTE(bnemec): If an explicit lock_path was passed to us then it + # means the caller is relying on file-based locking behavior, so + # we can't use posix locks for those calls. + if lock_path: + return FileLock(lock_file_path) + return InterProcessLock(lock_file_path) + + +def remove_external_lock_file(name, lock_file_prefix=None): + """Remove an external lock file when it's not used anymore + This will be helpful when we have a lot of lock files + """ + with internal_lock(name): + lock_file_path = _get_lock_path(name, lock_file_prefix) + try: + os.remove(lock_file_path) + except OSError: + LOG.info(_LI('Failed to remove file %(file)s'), + {'file': lock_file_path}) + + +def internal_lock(name): + with _semaphores_lock: + try: + sem = _semaphores[name] + except KeyError: + sem = threading.Semaphore() + _semaphores[name] = sem + + LOG.debug('Got semaphore "%(lock)s"', {'lock': name}) + return sem + + +@contextlib.contextmanager +def lock(name, lock_file_prefix=None, external=False, lock_path=None): + """Context based lock + + This function yields a `threading.Semaphore` instance (if we don't use + eventlet.monkey_patch(), else `semaphore.Semaphore`) unless external is + True, in which case, it'll yield an InterProcessLock instance. + + :param lock_file_prefix: The lock_file_prefix argument is used to provide + lock files on disk with a meaningful prefix. + + :param external: The external keyword argument denotes whether this lock + should work across multiple processes. This means that if two different + workers both run a method decorated with @synchronized('mylock', + external=True), only one of them will execute at a time. + """ + int_lock = internal_lock(name) + with int_lock: + if external and not CONF.disable_process_locking: + ext_lock = external_lock(name, lock_file_prefix, lock_path) + with ext_lock: + yield ext_lock + else: + yield int_lock + LOG.debug('Released semaphore "%(lock)s"', {'lock': name}) + + +def synchronized(name, lock_file_prefix=None, external=False, lock_path=None): + """Synchronization decorator. + + Decorating a method like so:: + + @synchronized('mylock') + def foo(self, *args): + ... + + ensures that only one thread will execute the foo method at a time. + + Different methods can share the same lock:: + + @synchronized('mylock') + def foo(self, *args): + ... + + @synchronized('mylock') + def bar(self, *args): + ... + + This way only one of either foo or bar can be executing at a time. + """ + + def wrap(f): + @functools.wraps(f) + def inner(*args, **kwargs): + try: + with lock(name, lock_file_prefix, external, lock_path): + LOG.debug('Got semaphore / lock "%(function)s"', + {'function': f.__name__}) + return f(*args, **kwargs) + finally: + LOG.debug('Semaphore / lock released "%(function)s"', + {'function': f.__name__}) + return inner + return wrap + + +def synchronized_with_prefix(lock_file_prefix): + """Partial object generator for the synchronization decorator. + + Redefine @synchronized in each project like so:: + + (in nova/utils.py) + from nova.openstack.common import lockutils + + synchronized = lockutils.synchronized_with_prefix('nova-') + + + (in nova/foo.py) + from nova import utils + + @utils.synchronized('mylock') + def bar(self, *args): + ... + + The lock_file_prefix argument is used to provide lock files on disk with a + meaningful prefix. + """ + + return functools.partial(synchronized, lock_file_prefix=lock_file_prefix) + + +def main(argv): + """Create a dir for locks and pass it to command from arguments + + If you run this: + python -m openstack.common.lockutils python setup.py testr + + a temporary directory will be created for all your locks and passed to all + your tests in an environment variable. The temporary dir will be deleted + afterwards and the return value will be preserved. + """ + + lock_dir = tempfile.mkdtemp() + os.environ["CLOUDKITTY_LOCK_PATH"] = lock_dir + try: + ret_val = subprocess.call(argv[1:]) + finally: + shutil.rmtree(lock_dir, ignore_errors=True) + return ret_val + + +if __name__ == '__main__': + sys.exit(main(sys.argv)) diff --git a/cloudkitty/openstack/common/log.py b/cloudkitty/openstack/common/log.py index 60582708..65b93840 100644 --- a/cloudkitty/openstack/common/log.py +++ b/cloudkitty/openstack/common/log.py @@ -33,6 +33,7 @@ import logging import logging.config import logging.handlers import os +import socket import sys import traceback @@ -126,7 +127,9 @@ DEFAULT_LOG_LEVELS = ['amqp=WARN', 'amqplib=WARN', 'boto=WARN', 'qpid=WARN', 'sqlalchemy=WARN', 'suds=INFO', 'oslo.messaging=INFO', 'iso8601=WARN', 'requests.packages.urllib3.connectionpool=WARN', - 'urllib3.connectionpool=WARN', 'websocket=WARN'] + 'urllib3.connectionpool=WARN', 'websocket=WARN', + "keystonemiddleware=WARN", "routes.middleware=WARN", + "stevedore=WARN"] log_opts = [ cfg.StrOpt('logging_context_format_string', @@ -300,11 +303,10 @@ class ContextAdapter(BaseLoggerAdapter): self.warn(stdmsg, *args, **kwargs) def process(self, msg, kwargs): - # NOTE(mrodden): catch any Message/other object and - # coerce to unicode before they can get - # to the python logging and possibly - # cause string encoding trouble - if not isinstance(msg, six.string_types): + # NOTE(jecarey): If msg is not unicode, coerce it into unicode + # before it can get to the python logging and + # possibly cause string encoding trouble + if not isinstance(msg, six.text_type): msg = six.text_type(msg) if 'extra' not in kwargs: @@ -483,18 +485,6 @@ def _setup_logging_from_conf(project, version): for handler in log_root.handlers: log_root.removeHandler(handler) - if CONF.use_syslog: - facility = _find_facility_from_conf() - # TODO(bogdando) use the format provided by RFCSysLogHandler - # after existing syslog format deprecation in J - if CONF.use_syslog_rfc_format: - syslog = RFCSysLogHandler(address='/dev/log', - facility=facility) - else: - syslog = logging.handlers.SysLogHandler(address='/dev/log', - facility=facility) - log_root.addHandler(syslog) - logpath = _get_log_file_path() if logpath: filelog = logging.handlers.WatchedFileHandler(logpath) @@ -553,6 +543,20 @@ def _setup_logging_from_conf(project, version): else: logger.setLevel(level_name) + if CONF.use_syslog: + try: + facility = _find_facility_from_conf() + # TODO(bogdando) use the format provided by RFCSysLogHandler + # after existing syslog format deprecation in J + if CONF.use_syslog_rfc_format: + syslog = RFCSysLogHandler(facility=facility) + else: + syslog = logging.handlers.SysLogHandler(facility=facility) + log_root.addHandler(syslog) + except socket.error: + log_root.error('Unable to add syslog handler. Verify that syslog' + 'is running.') + _loggers = {} @@ -622,6 +626,12 @@ class ContextFormatter(logging.Formatter): def format(self, record): """Uses contextstring if request_id is set, otherwise default.""" + # NOTE(jecarey): If msg is not unicode, coerce it into unicode + # before it can get to the python logging and + # possibly cause string encoding trouble + if not isinstance(record.msg, six.text_type): + record.msg = six.text_type(record.msg) + # store project info record.project = self.project record.version = self.version diff --git a/cloudkitty/orchestrator.py b/cloudkitty/orchestrator.py index 07c13efb..8670dd5b 100644 --- a/cloudkitty/orchestrator.py +++ b/cloudkitty/orchestrator.py @@ -16,29 +16,84 @@ # # @author: Stéphane Albert # -from __future__ import print_function import datetime -import sys import time +import eventlet from keystoneclient.v2_0 import client as kclient from oslo.config import cfg +from oslo import messaging from stevedore import driver from stevedore import named +from cloudkitty.common import rpc from cloudkitty import config # NOQA from cloudkitty import extension_manager from cloudkitty.openstack.common import importutils as i_utils +from cloudkitty.openstack.common import lockutils from cloudkitty.openstack.common import log as logging from cloudkitty import state from cloudkitty import write_orchestrator as w_orch +eventlet.monkey_patch() LOG = logging.getLogger(__name__) - CONF = cfg.CONF +PROCESSORS_NAMESPACE = 'cloudkitty.billing.processors' +WRITERS_NAMESPACE = 'cloudkitty.output.writers' +COLLECTORS_NAMESPACE = 'cloudkitty.collector.backends' + + +class BillingEndpoint(object): + target = messaging.Target(namespace='billing', + version='1.0') + + def __init__(self, orchestrator): + self._pending_reload = [] + self._module_state = {} + self._orchestrator = orchestrator + + def get_reload_list(self): + lock = lockutils.lock('module-reload') + with lock: + reload_list = self._pending_reload + self._pending_reload = [] + return reload_list + + def get_module_state(self): + lock = lockutils.lock('module-state') + with lock: + module_list = self._module_state + self._module_state = {} + return module_list + + def quote(self, ctxt, res_data): + LOG.debug('Received quote from RPC.') + return self._orchestrator.process_quote(res_data) + + def reload_module(self, ctxt, name): + LOG.info('Received reload command for module {}.'.format(name)) + lock = lockutils.lock('module-reload') + with lock: + if name not in self._pending_reload: + self._pending_reload.append(name) + + def enable_module(self, ctxt, name): + LOG.info('Received enable command for module {}.'.format(name)) + lock = lockutils.lock('module-state') + with lock: + self._module_state[name] = True + + def disable_module(self, ctxt, name): + LOG.info('Received disable command for module {}.'.format(name)) + lock = lockutils.lock('module-state') + with lock: + self._module_state[name] = False + if name in self._pending_reload: + self._pending_reload.remove(name) + class Orchestrator(object): def __init__(self): @@ -59,7 +114,7 @@ class Orchestrator(object): 'keystone_url': CONF.auth.url, 'period': CONF.collect.period} self.collector = driver.DriverManager( - 'cloudkitty.collector.backends', + COLLECTORS_NAMESPACE, CONF.collect.collector, invoke_on_load=True, invoke_kwds=collector_args).driver @@ -77,11 +132,26 @@ class Orchestrator(object): # Output settings output_pipeline = named.NamedExtensionManager( - 'cloudkitty.output.writers', + WRITERS_NAMESPACE, CONF.output.pipeline) for writer in output_pipeline: self.wo.add_writer(writer.plugin) + # RPC + self.server = None + self._billing_endpoint = BillingEndpoint(self) + self._init_messaging() + + def _init_messaging(self): + target = messaging.Target(topic='cloudkitty', + server=CONF.host, + version='1.0') + endpoints = [ + self._billing_endpoint, + ] + self.server = rpc.get_server(target, endpoints) + self.server.start() + def _check_state(self): def _get_this_month_timestamp(): now = datetime.datetime.now() @@ -112,7 +182,7 @@ class Orchestrator(object): def _load_billing_processors(self): self.b_processors = {} processors = extension_manager.EnabledExtensionManager( - 'cloudkitty.billing.processors', + PROCESSORS_NAMESPACE, ) for processor in processors: @@ -120,12 +190,48 @@ class Orchestrator(object): b_obj = processor.obj self.b_processors[b_name] = b_obj + def process_quote(self, res_data): + for processor in self.b_processors.values(): + processor.process(res_data) + + price = 0.0 + for res in res_data: + for res_usage in res['usage'].values(): + for data in res_usage: + price += data.get('billing', {}).get('price', 0.0) + return price + + def process_messages(self): + pending_reload = self._billing_endpoint.get_reload_list() + pending_states = self._billing_endpoint.get_module_state() + for name in pending_reload: + if name in self.b_processors: + if name in self.b_processors.keys(): + LOG.info('Reloading configuration of {} module.'.format( + name)) + self.b_processors[name].reload_config() + else: + LOG.info('Tried to reload a disabled module: {}.'.format( + name)) + for name, status in pending_states.items(): + if name in self.b_processors and not status: + LOG.info('Disabling {} module.'.format(name)) + self.b_processors.pop(name) + else: + LOG.info('Enabling {} module.'.format(name)) + processors = extension_manager.EnabledExtensionManager( + PROCESSORS_NAMESPACE) + for processor in processors: + if processor.name == name: + self.b_processors.append(processor) + def process(self): while True: + self.process_messages() timestamp = self._check_state() if not timestamp: - print("Nothing left to do.") - break + eventlet.sleep(CONF.collect.period) + continue for service in CONF.collect.services: data = self._collect(service, timestamp) @@ -141,14 +247,3 @@ class Orchestrator(object): self.wo.commit() self.wo.close() - - -def main(): - CONF(sys.argv[1:], project='cloudkitty') - logging.setup('cloudkitty') - orchestrator = Orchestrator() - orchestrator.process() - - -if __name__ == "__main__": - main() diff --git a/cloudkitty/service.py b/cloudkitty/service.py new file mode 100644 index 00000000..5970e0c3 --- /dev/null +++ b/cloudkitty/service.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Stéphane Albert +# +import socket +import sys + +from oslo.config import cfg + +from cloudkitty.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + +service_opts = [ + cfg.StrOpt('host', + default=socket.getfqdn(), + help='Name of this node. This can be an opaque identifier. ' + 'It is not necessarily a hostname, FQDN, or IP address. ' + 'However, the node name must be valid within ' + 'an AMQP key, and if using ZeroMQ, a valid ' + 'hostname, FQDN, or IP address.') +] + +cfg.CONF.register_opts(service_opts) + + +def prepare_service(): + cfg.CONF(sys.argv[1:], project='cloudkitty') + logging.setup('cloudkitty') diff --git a/etc/cloudkitty/cloudkitty.conf.sample b/etc/cloudkitty/cloudkitty.conf.sample index 0903440a..9bd32b5c 100644 --- a/etc/cloudkitty/cloudkitty.conf.sample +++ b/etc/cloudkitty/cloudkitty.conf.sample @@ -194,6 +194,18 @@ #control_exchange=openstack +# +# Options defined in cloudkitty.service +# + +# Name of this node. This can be an opaque identifier. It is +# not necessarily a hostname, FQDN, or IP address. However, +# the node name must be valid within an AMQP key, and if using +# ZeroMQ, a valid hostname, FQDN, or IP address. (string +# value) +#host=cloudkitty + + # # Options defined in cloudkitty.api.app # @@ -203,6 +215,17 @@ #api_paste_config=api_paste.ini +# +# Options defined in cloudkitty.openstack.common.lockutils +# + +# Enables or disables inter-process locks. (boolean value) +#disable_process_locking=false + +# Directory to use for lock files. (string value) +#lock_path= + + # # Options defined in cloudkitty.openstack.common.log # @@ -235,7 +258,7 @@ #logging_exception_prefix=%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s %(instance)s # List of logger=LEVEL pairs. (list value) -#default_log_levels=amqp=WARN,amqplib=WARN,boto=WARN,qpid=WARN,sqlalchemy=WARN,suds=INFO,oslo.messaging=INFO,iso8601=WARN,requests.packages.urllib3.connectionpool=WARN,urllib3.connectionpool=WARN,websocket=WARN +#default_log_levels=amqp=WARN,amqplib=WARN,boto=WARN,qpid=WARN,sqlalchemy=WARN,suds=INFO,oslo.messaging=INFO,iso8601=WARN,requests.packages.urllib3.connectionpool=WARN,urllib3.connectionpool=WARN,websocket=WARN,keystonemiddleware=WARN,routes.middleware=WARN,stevedore=WARN # Enables or disables publication of error events. (boolean # value) diff --git a/openstack-common.conf b/openstack-common.conf index 930a4596..3e0fb87f 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -4,6 +4,7 @@ base=cloudkitty module=config module=importutils module=jsonutils +module=lockutils module=log script=tools/config/check_uptodate.sh diff --git a/requirements.txt b/requirements.txt index 9cd0606c..2fa0e9f3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ python-ceilometerclient python-keystoneclient iso8601 PasteDeploy==1.5.2 +posix_ipc pecan==0.5.0 wsme oslo.config>=1.2.0 diff --git a/setup.cfg b/setup.cfg index 0e504b8d..f2222bac 100644 --- a/setup.cfg +++ b/setup.cfg @@ -22,7 +22,7 @@ packages = console_scripts = cloudkitty-api = cloudkitty.cli.api:main cloudkitty-dbsync = cloudkitty.cli.dbsync:main - cloudkitty-processor = cloudkitty.orchestrator:main + cloudkitty-processor = cloudkitty.cli.processor:main cloudkitty.collector.backends = ceilometer = cloudkitty.collector.ceilometer:CeilometerCollector