Implemented RPC messaging

API and processors now talk via oslo.messaging.
Updated openstack-common to add lockutils support.

Change-Id: I0b06a8c8b6612eda6a2ff8785a4f0d516347f09a
This commit is contained in:
Stéphane Albert 2014-08-26 17:48:55 +02:00
parent c6e9c4e903
commit 72bfda50d0
22 changed files with 999 additions and 77 deletions

View File

@ -19,15 +19,18 @@ import os
from wsgiref import simple_server
from oslo.config import cfg
from oslo import messaging
from paste import deploy
import pecan
from cloudkitty.api import config as api_config
from cloudkitty.api import hooks
from cloudkitty.common import rpc
from cloudkitty import config # noqa
from cloudkitty.openstack.common import log
from cloudkitty.openstack.common import log as logging
LOG = log.getLogger(__name__)
LOG = logging.getLogger(__name__)
auth_opts = [
cfg.StrOpt('api_paste_config',
@ -62,12 +65,22 @@ def setup_app(pecan_config=None, extra_hooks=None):
app_conf = get_pecan_config()
target = messaging.Target(topic='cloudkitty',
version='1.0')
client = rpc.get_client(target)
app_hooks = [
hooks.RPCHook(client)
]
return pecan.make_app(
app_conf.app.root,
static_root=app_conf.app.static_root,
template_path=app_conf.app.template_path,
debug=CONF.debug,
force_canonical=getattr(app_conf.app, 'force_canonical', True),
hooks=app_hooks,
guess_content_type_from_ext=False
)

View File

@ -23,7 +23,6 @@ import wsmeext.pecan as wsme_pecan
from cloudkitty.api.controllers import v1
from cloudkitty.openstack.common import log as logging
LOG = logging.getLogger(__name__)

View File

@ -16,6 +16,7 @@
# @author: Stéphane Albert
#
from oslo.config import cfg
import pecan
from pecan import rest
from stevedore import extension
from wsme import types as wtypes
@ -113,19 +114,7 @@ class BillingController(rest.RestController):
:param res_data: List of resource descriptions.
:return: Total price for these descriptions.
"""
# TODO(sheeprine): Send RPC request for quote
from cloudkitty import extension_manager
b_processors = {}
processors = extension_manager.EnabledExtensionManager(
'cloudkitty.billing.processors',
)
for processor in processors:
b_name = processor.name
b_obj = processor.obj
b_processors[b_name] = b_obj
client = pecan.request.rpc_client.prepare(namespace='billing')
res_dict = {}
for res in res_data:
if res.service not in res_dict:
@ -133,14 +122,8 @@ class BillingController(rest.RestController):
json_data = res.to_json()
res_dict[res.service].extend(json_data[res.service])
for processor in b_processors.values():
processor.process([{'usage': res_dict}])
price = 0.0
for res in res_dict.values():
for data in res:
price += data.get('billing', {}).get('price', 0.0)
return price
res = client.call({}, 'quote', res_data=[{'usage': res_dict}])
return res
class ReportController(rest.RestController):

26
cloudkitty/api/hooks.py Normal file
View File

@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
from pecan import hooks
class RPCHook(hooks.PecanHook):
def __init__(self, rcp_client):
self._rpc_client = rcp_client
def before(self, state):
state.request.rpc_client = self._rpc_client

View File

@ -83,6 +83,13 @@ class BillingEnableController(rest.RestController):
"""
api = db_api.get_instance()
module_db = api.get_module_enable_state()
client = pecan.request.rpc_client.prepare(namespace='billing',
fanout=True)
if state:
operation = 'enable_module'
else:
operation = 'disable_module'
client.cast({}, operation, name=self.module_name)
return module_db.set_state(self.module_name, state)
@ -92,6 +99,11 @@ class BillingConfigController(rest.RestController):
"""
def notify_reload(self):
client = pecan.request.rpc_client.prepare(namespace='billing',
fanout=True)
client.cast({}, 'reload_module', name=self.module_name)
def _not_configurable(self):
try:
raise BillingModuleNotConfigurable(self.module_name)

View File

@ -25,7 +25,6 @@ from cloudkitty.billing.hash.db import api
from cloudkitty.db import api as db_api
from cloudkitty.openstack.common import log as logging
LOG = logging.getLogger(__name__)
MAP_TYPE = wtypes.Enum(wtypes.text, 'flat', 'rate')
@ -146,6 +145,7 @@ class BasicHashMapConfigController(billing.BillingConfigController):
pecan.response.headers['Location'] = pecan.request.path
except api.ServiceAlreadyExists as e:
pecan.abort(409, str(e))
self.notify_reload()
pecan.response.status = 201
@wsme_pecan.wsexpose(None, wtypes.text, wtypes.text, wtypes.text,

View File

@ -26,7 +26,6 @@ from cloudkitty.billing.hash.db.sqlalchemy import models
from cloudkitty import db
from cloudkitty.openstack.common import log as logging
LOG = logging.getLogger(__name__)

View File

@ -15,17 +15,14 @@
#
# @author: Stéphane Albert
#
import sys
from oslo.config import cfg
from cloudkitty.api import app
from cloudkitty.openstack.common import log as logging
from cloudkitty.common import rpc
from cloudkitty import service
def main():
cfg.CONF(sys.argv[1:], project='cloudkitty')
logging.setup('cloudkitty')
service.prepare_service()
rpc.init()
server = app.build_server()
try:
server.serve_forever()

View File

@ -15,14 +15,12 @@
#
# @author: Stéphane Albert
#
import sys
from oslo.config import cfg
from stevedore import extension
from cloudkitty import config # noqa
from cloudkitty.db import api as db_api
from cloudkitty.openstack.common import log as logging
from cloudkitty import service
CONF = cfg.CONF
@ -147,6 +145,5 @@ CONF.register_cli_opt(command_opt)
def main():
cfg.CONF(sys.argv[1:], project='cloudkitty')
logging.setup('cloudkitty')
service.prepare_service()
CONF.command.func()

View File

@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
from oslo.config import cfg
from cloudkitty.common import rpc
from cloudkitty.openstack.common import log as logging
from cloudkitty import orchestrator
from cloudkitty import service
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
def main():
service.prepare_service()
rpc.init()
processor = orchestrator.Orchestrator()
try:
processor.process()
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()

41
cloudkitty/common/rpc.py Normal file
View File

@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
from oslo.config import cfg
from oslo import messaging
TRANSPORT = None
def init():
global TRANSPORT
TRANSPORT = messaging.get_transport(cfg.CONF)
def get_client(target, version_cap=None):
assert TRANSPORT is not None
return messaging.RPCClient(TRANSPORT,
target,
version_cap=version_cap)
def get_server(target, endpoints):
assert TRANSPORT is not None
return messaging.get_rpc_server(TRANSPORT,
target,
endpoints,
executor='eventlet')

View File

@ -0,0 +1,113 @@
# Copyright 2011 OpenStack Foundation.
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Exception related utilities.
"""
import logging
import sys
import time
import traceback
import six
from cloudkitty.openstack.common.gettextutils import _LE
class save_and_reraise_exception(object):
"""Save current exception, run some code and then re-raise.
In some cases the exception context can be cleared, resulting in None
being attempted to be re-raised after an exception handler is run. This
can happen when eventlet switches greenthreads or when running an
exception handler, code raises and catches an exception. In both
cases the exception context will be cleared.
To work around this, we save the exception state, run handler code, and
then re-raise the original exception. If another exception occurs, the
saved exception is logged and the new exception is re-raised.
In some cases the caller may not want to re-raise the exception, and
for those circumstances this context provides a reraise flag that
can be used to suppress the exception. For example::
except Exception:
with save_and_reraise_exception() as ctxt:
decide_if_need_reraise()
if not should_be_reraised:
ctxt.reraise = False
If another exception occurs and reraise flag is False,
the saved exception will not be logged.
If the caller wants to raise new exception during exception handling
he/she sets reraise to False initially with an ability to set it back to
True if needed::
except Exception:
with save_and_reraise_exception(reraise=False) as ctxt:
[if statements to determine whether to raise a new exception]
# Not raising a new exception, so reraise
ctxt.reraise = True
"""
def __init__(self, reraise=True):
self.reraise = reraise
def __enter__(self):
self.type_, self.value, self.tb, = sys.exc_info()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
if self.reraise:
logging.error(_LE('Original exception being dropped: %s'),
traceback.format_exception(self.type_,
self.value,
self.tb))
return False
if self.reraise:
six.reraise(self.type_, self.value, self.tb)
def forever_retry_uncaught_exceptions(infunc):
def inner_func(*args, **kwargs):
last_log_time = 0
last_exc_message = None
exc_count = 0
while True:
try:
return infunc(*args, **kwargs)
except Exception as exc:
this_exc_message = six.u(str(exc))
if this_exc_message == last_exc_message:
exc_count += 1
else:
exc_count = 1
# Do not log any more frequently than once a minute unless
# the exception message changes
cur_time = int(time.time())
if (cur_time - last_log_time > 60 or
this_exc_message != last_exc_message):
logging.exception(
_LE('Unexpected exception occurred %d time(s)... '
'retrying.') % exc_count)
last_log_time = cur_time
last_exc_message = this_exc_message
exc_count = 0
# This should be a very rare event. In case it isn't, do
# a sleep.
time.sleep(1)
return inner_func

View File

@ -0,0 +1,146 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import errno
import os
import tempfile
from cloudkitty.openstack.common import excutils
from cloudkitty.openstack.common import log as logging
LOG = logging.getLogger(__name__)
_FILE_CACHE = {}
def ensure_tree(path):
"""Create a directory (and any ancestor directories required)
:param path: Directory to create
"""
try:
os.makedirs(path)
except OSError as exc:
if exc.errno == errno.EEXIST:
if not os.path.isdir(path):
raise
else:
raise
def read_cached_file(filename, force_reload=False):
"""Read from a file if it has been modified.
:param force_reload: Whether to reload the file.
:returns: A tuple with a boolean specifying if the data is fresh
or not.
"""
global _FILE_CACHE
if force_reload:
delete_cached_file(filename)
reloaded = False
mtime = os.path.getmtime(filename)
cache_info = _FILE_CACHE.setdefault(filename, {})
if not cache_info or mtime > cache_info.get('mtime', 0):
LOG.debug("Reloading cached file %s" % filename)
with open(filename) as fap:
cache_info['data'] = fap.read()
cache_info['mtime'] = mtime
reloaded = True
return (reloaded, cache_info['data'])
def delete_cached_file(filename):
"""Delete cached file if present.
:param filename: filename to delete
"""
global _FILE_CACHE
if filename in _FILE_CACHE:
del _FILE_CACHE[filename]
def delete_if_exists(path, remove=os.unlink):
"""Delete a file, but ignore file not found error.
:param path: File to delete
:param remove: Optional function to remove passed path
"""
try:
remove(path)
except OSError as e:
if e.errno != errno.ENOENT:
raise
@contextlib.contextmanager
def remove_path_on_error(path, remove=delete_if_exists):
"""Protect code that wants to operate on PATH atomically.
Any exception will cause PATH to be removed.
:param path: File to work with
:param remove: Optional function to remove passed path
"""
try:
yield
except Exception:
with excutils.save_and_reraise_exception():
remove(path)
def file_open(*args, **kwargs):
"""Open file
see built-in open() documentation for more details
Note: The reason this is kept in a separate module is to easily
be able to provide a stub module that doesn't alter system
state at all (for unit tests)
"""
return open(*args, **kwargs)
def write_to_tempfile(content, path=None, suffix='', prefix='tmp'):
"""Create temporary file or use existing file.
This util is needed for creating temporary file with
specified content, suffix and prefix. If path is not None,
it will be used for writing content. If the path doesn't
exist it'll be created.
:param content: content for temporary file.
:param path: same as parameter 'dir' for mkstemp
:param suffix: same as parameter 'suffix' for mkstemp
:param prefix: same as parameter 'prefix' for mkstemp
For example: it can be used in database tests for creating
configuration files.
"""
if path:
ensure_tree(path)
(fd, path) = tempfile.mkstemp(suffix=suffix, dir=path, prefix=prefix)
try:
os.write(fd, content)
finally:
os.close(fd)
return path

View File

@ -38,11 +38,13 @@ import inspect
import itertools
import sys
is_simplejson = False
if sys.version_info < (2, 7):
# On Python <= 2.6, json module is not C boosted, so try to use
# simplejson module if available
try:
import simplejson as json
is_simplejson = True
except ImportError:
import json
else:
@ -165,10 +167,14 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
def dumps(value, default=to_primitive, **kwargs):
if is_simplejson:
kwargs['namedtuple_as_object'] = False
return json.dumps(value, default=default, **kwargs)
def dump(obj, fp, *args, **kwargs):
if is_simplejson:
kwargs['namedtuple_as_object'] = False
return json.dump(obj, fp, *args, **kwargs)

View File

@ -0,0 +1,377 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import errno
import functools
import logging
import os
import shutil
import subprocess
import sys
import tempfile
import threading
import time
import weakref
from oslo.config import cfg
from cloudkitty.openstack.common import fileutils
from cloudkitty.openstack.common.gettextutils import _, _LE, _LI
LOG = logging.getLogger(__name__)
util_opts = [
cfg.BoolOpt('disable_process_locking', default=False,
help='Enables or disables inter-process locks.'),
cfg.StrOpt('lock_path',
default=os.environ.get("CLOUDKITTY_LOCK_PATH"),
help='Directory to use for lock files.')
]
CONF = cfg.CONF
CONF.register_opts(util_opts)
def set_defaults(lock_path):
cfg.set_defaults(util_opts, lock_path=lock_path)
class _FileLock(object):
"""Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
not require any cleanup. Since the lock is always held on a file
descriptor rather than outside of the process, the lock gets dropped
automatically if the process crashes, even if __exit__ is not executed.
There are no guarantees regarding usage by multiple green threads in a
single process here. This lock works only between processes. Exclusive
access between local threads should be achieved using the semaphores
in the @synchronized decorator.
Note these locks are released when the descriptor is closed, so it's not
safe to close the file descriptor while another green thread holds the
lock. Just opening and closing the lock file can break synchronisation,
so lock files must be accessed only using this abstraction.
"""
def __init__(self, name):
self.lockfile = None
self.fname = name
def acquire(self):
basedir = os.path.dirname(self.fname)
if not os.path.exists(basedir):
fileutils.ensure_tree(basedir)
LOG.info(_LI('Created lock path: %s'), basedir)
self.lockfile = open(self.fname, 'w')
while True:
try:
# Using non-blocking locks since green threads are not
# patched to deal with blocking locking calls.
# Also upon reading the MSDN docs for locking(), it seems
# to have a laughable 10 attempts "blocking" mechanism.
self.trylock()
LOG.debug('Got file lock "%s"', self.fname)
return True
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
# external locks synchronise things like iptables
# updates - give it some time to prevent busy spinning
time.sleep(0.01)
else:
raise threading.ThreadError(_("Unable to acquire lock on"
" `%(filename)s` due to"
" %(exception)s") %
{'filename': self.fname,
'exception': e})
def __enter__(self):
self.acquire()
return self
def release(self):
try:
self.unlock()
self.lockfile.close()
LOG.debug('Released file lock "%s"', self.fname)
except IOError:
LOG.exception(_LE("Could not release the acquired lock `%s`"),
self.fname)
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
def exists(self):
return os.path.exists(self.fname)
def trylock(self):
raise NotImplementedError()
def unlock(self):
raise NotImplementedError()
class _WindowsLock(_FileLock):
def trylock(self):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1)
def unlock(self):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1)
class _FcntlLock(_FileLock):
def trylock(self):
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
def unlock(self):
fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
class _PosixLock(object):
def __init__(self, name):
# Hash the name because it's not valid to have POSIX semaphore
# names with things like / in them. Then use base64 to encode
# the digest() instead taking the hexdigest() because the
# result is shorter and most systems can't have shm sempahore
# names longer than 31 characters.
h = hashlib.sha1()
h.update(name.encode('ascii'))
self.name = str((b'/' + base64.urlsafe_b64encode(
h.digest())).decode('ascii'))
def acquire(self, timeout=None):
self.semaphore = posix_ipc.Semaphore(self.name,
flags=posix_ipc.O_CREAT,
initial_value=1)
self.semaphore.acquire(timeout)
return self
def __enter__(self):
self.acquire()
return self
def release(self):
self.semaphore.release()
self.semaphore.close()
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
def exists(self):
try:
semaphore = posix_ipc.Semaphore(self.name)
except posix_ipc.ExistentialError:
return False
else:
semaphore.close()
return True
if os.name == 'nt':
import msvcrt
InterProcessLock = _WindowsLock
FileLock = _WindowsLock
else:
import base64
import fcntl
import hashlib
import posix_ipc
InterProcessLock = _PosixLock
FileLock = _FcntlLock
_semaphores = weakref.WeakValueDictionary()
_semaphores_lock = threading.Lock()
def _get_lock_path(name, lock_file_prefix, lock_path=None):
# NOTE(mikal): the lock name cannot contain directory
# separators
name = name.replace(os.sep, '_')
if lock_file_prefix:
sep = '' if lock_file_prefix.endswith('-') else '-'
name = '%s%s%s' % (lock_file_prefix, sep, name)
local_lock_path = lock_path or CONF.lock_path
if not local_lock_path:
# NOTE(bnemec): Create a fake lock path for posix locks so we don't
# unnecessarily raise the RequiredOptError below.
if InterProcessLock is not _PosixLock:
raise cfg.RequiredOptError('lock_path')
local_lock_path = 'posixlock:/'
return os.path.join(local_lock_path, name)
def external_lock(name, lock_file_prefix=None, lock_path=None):
LOG.debug('Attempting to grab external lock "%(lock)s"',
{'lock': name})
lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path)
# NOTE(bnemec): If an explicit lock_path was passed to us then it
# means the caller is relying on file-based locking behavior, so
# we can't use posix locks for those calls.
if lock_path:
return FileLock(lock_file_path)
return InterProcessLock(lock_file_path)
def remove_external_lock_file(name, lock_file_prefix=None):
"""Remove an external lock file when it's not used anymore
This will be helpful when we have a lot of lock files
"""
with internal_lock(name):
lock_file_path = _get_lock_path(name, lock_file_prefix)
try:
os.remove(lock_file_path)
except OSError:
LOG.info(_LI('Failed to remove file %(file)s'),
{'file': lock_file_path})
def internal_lock(name):
with _semaphores_lock:
try:
sem = _semaphores[name]
except KeyError:
sem = threading.Semaphore()
_semaphores[name] = sem
LOG.debug('Got semaphore "%(lock)s"', {'lock': name})
return sem
@contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
"""Context based lock
This function yields a `threading.Semaphore` instance (if we don't use
eventlet.monkey_patch(), else `semaphore.Semaphore`) unless external is
True, in which case, it'll yield an InterProcessLock instance.
:param lock_file_prefix: The lock_file_prefix argument is used to provide
lock files on disk with a meaningful prefix.
:param external: The external keyword argument denotes whether this lock
should work across multiple processes. This means that if two different
workers both run a method decorated with @synchronized('mylock',
external=True), only one of them will execute at a time.
"""
int_lock = internal_lock(name)
with int_lock:
if external and not CONF.disable_process_locking:
ext_lock = external_lock(name, lock_file_prefix, lock_path)
with ext_lock:
yield ext_lock
else:
yield int_lock
LOG.debug('Released semaphore "%(lock)s"', {'lock': name})
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
"""Synchronization decorator.
Decorating a method like so::
@synchronized('mylock')
def foo(self, *args):
...
ensures that only one thread will execute the foo method at a time.
Different methods can share the same lock::
@synchronized('mylock')
def foo(self, *args):
...
@synchronized('mylock')
def bar(self, *args):
...
This way only one of either foo or bar can be executing at a time.
"""
def wrap(f):
@functools.wraps(f)
def inner(*args, **kwargs):
try:
with lock(name, lock_file_prefix, external, lock_path):
LOG.debug('Got semaphore / lock "%(function)s"',
{'function': f.__name__})
return f(*args, **kwargs)
finally:
LOG.debug('Semaphore / lock released "%(function)s"',
{'function': f.__name__})
return inner
return wrap
def synchronized_with_prefix(lock_file_prefix):
"""Partial object generator for the synchronization decorator.
Redefine @synchronized in each project like so::
(in nova/utils.py)
from nova.openstack.common import lockutils
synchronized = lockutils.synchronized_with_prefix('nova-')
(in nova/foo.py)
from nova import utils
@utils.synchronized('mylock')
def bar(self, *args):
...
The lock_file_prefix argument is used to provide lock files on disk with a
meaningful prefix.
"""
return functools.partial(synchronized, lock_file_prefix=lock_file_prefix)
def main(argv):
"""Create a dir for locks and pass it to command from arguments
If you run this:
python -m openstack.common.lockutils python setup.py testr <etc>
a temporary directory will be created for all your locks and passed to all
your tests in an environment variable. The temporary dir will be deleted
afterwards and the return value will be preserved.
"""
lock_dir = tempfile.mkdtemp()
os.environ["CLOUDKITTY_LOCK_PATH"] = lock_dir
try:
ret_val = subprocess.call(argv[1:])
finally:
shutil.rmtree(lock_dir, ignore_errors=True)
return ret_val
if __name__ == '__main__':
sys.exit(main(sys.argv))

View File

@ -33,6 +33,7 @@ import logging
import logging.config
import logging.handlers
import os
import socket
import sys
import traceback
@ -126,7 +127,9 @@ DEFAULT_LOG_LEVELS = ['amqp=WARN', 'amqplib=WARN', 'boto=WARN',
'qpid=WARN', 'sqlalchemy=WARN', 'suds=INFO',
'oslo.messaging=INFO', 'iso8601=WARN',
'requests.packages.urllib3.connectionpool=WARN',
'urllib3.connectionpool=WARN', 'websocket=WARN']
'urllib3.connectionpool=WARN', 'websocket=WARN',
"keystonemiddleware=WARN", "routes.middleware=WARN",
"stevedore=WARN"]
log_opts = [
cfg.StrOpt('logging_context_format_string',
@ -300,11 +303,10 @@ class ContextAdapter(BaseLoggerAdapter):
self.warn(stdmsg, *args, **kwargs)
def process(self, msg, kwargs):
# NOTE(mrodden): catch any Message/other object and
# coerce to unicode before they can get
# to the python logging and possibly
# cause string encoding trouble
if not isinstance(msg, six.string_types):
# NOTE(jecarey): If msg is not unicode, coerce it into unicode
# before it can get to the python logging and
# possibly cause string encoding trouble
if not isinstance(msg, six.text_type):
msg = six.text_type(msg)
if 'extra' not in kwargs:
@ -483,18 +485,6 @@ def _setup_logging_from_conf(project, version):
for handler in log_root.handlers:
log_root.removeHandler(handler)
if CONF.use_syslog:
facility = _find_facility_from_conf()
# TODO(bogdando) use the format provided by RFCSysLogHandler
# after existing syslog format deprecation in J
if CONF.use_syslog_rfc_format:
syslog = RFCSysLogHandler(address='/dev/log',
facility=facility)
else:
syslog = logging.handlers.SysLogHandler(address='/dev/log',
facility=facility)
log_root.addHandler(syslog)
logpath = _get_log_file_path()
if logpath:
filelog = logging.handlers.WatchedFileHandler(logpath)
@ -553,6 +543,20 @@ def _setup_logging_from_conf(project, version):
else:
logger.setLevel(level_name)
if CONF.use_syslog:
try:
facility = _find_facility_from_conf()
# TODO(bogdando) use the format provided by RFCSysLogHandler
# after existing syslog format deprecation in J
if CONF.use_syslog_rfc_format:
syslog = RFCSysLogHandler(facility=facility)
else:
syslog = logging.handlers.SysLogHandler(facility=facility)
log_root.addHandler(syslog)
except socket.error:
log_root.error('Unable to add syslog handler. Verify that syslog'
'is running.')
_loggers = {}
@ -622,6 +626,12 @@ class ContextFormatter(logging.Formatter):
def format(self, record):
"""Uses contextstring if request_id is set, otherwise default."""
# NOTE(jecarey): If msg is not unicode, coerce it into unicode
# before it can get to the python logging and
# possibly cause string encoding trouble
if not isinstance(record.msg, six.text_type):
record.msg = six.text_type(record.msg)
# store project info
record.project = self.project
record.version = self.version

View File

@ -16,29 +16,84 @@
#
# @author: Stéphane Albert
#
from __future__ import print_function
import datetime
import sys
import time
import eventlet
from keystoneclient.v2_0 import client as kclient
from oslo.config import cfg
from oslo import messaging
from stevedore import driver
from stevedore import named
from cloudkitty.common import rpc
from cloudkitty import config # NOQA
from cloudkitty import extension_manager
from cloudkitty.openstack.common import importutils as i_utils
from cloudkitty.openstack.common import lockutils
from cloudkitty.openstack.common import log as logging
from cloudkitty import state
from cloudkitty import write_orchestrator as w_orch
eventlet.monkey_patch()
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
PROCESSORS_NAMESPACE = 'cloudkitty.billing.processors'
WRITERS_NAMESPACE = 'cloudkitty.output.writers'
COLLECTORS_NAMESPACE = 'cloudkitty.collector.backends'
class BillingEndpoint(object):
target = messaging.Target(namespace='billing',
version='1.0')
def __init__(self, orchestrator):
self._pending_reload = []
self._module_state = {}
self._orchestrator = orchestrator
def get_reload_list(self):
lock = lockutils.lock('module-reload')
with lock:
reload_list = self._pending_reload
self._pending_reload = []
return reload_list
def get_module_state(self):
lock = lockutils.lock('module-state')
with lock:
module_list = self._module_state
self._module_state = {}
return module_list
def quote(self, ctxt, res_data):
LOG.debug('Received quote from RPC.')
return self._orchestrator.process_quote(res_data)
def reload_module(self, ctxt, name):
LOG.info('Received reload command for module {}.'.format(name))
lock = lockutils.lock('module-reload')
with lock:
if name not in self._pending_reload:
self._pending_reload.append(name)
def enable_module(self, ctxt, name):
LOG.info('Received enable command for module {}.'.format(name))
lock = lockutils.lock('module-state')
with lock:
self._module_state[name] = True
def disable_module(self, ctxt, name):
LOG.info('Received disable command for module {}.'.format(name))
lock = lockutils.lock('module-state')
with lock:
self._module_state[name] = False
if name in self._pending_reload:
self._pending_reload.remove(name)
class Orchestrator(object):
def __init__(self):
@ -59,7 +114,7 @@ class Orchestrator(object):
'keystone_url': CONF.auth.url,
'period': CONF.collect.period}
self.collector = driver.DriverManager(
'cloudkitty.collector.backends',
COLLECTORS_NAMESPACE,
CONF.collect.collector,
invoke_on_load=True,
invoke_kwds=collector_args).driver
@ -77,11 +132,26 @@ class Orchestrator(object):
# Output settings
output_pipeline = named.NamedExtensionManager(
'cloudkitty.output.writers',
WRITERS_NAMESPACE,
CONF.output.pipeline)
for writer in output_pipeline:
self.wo.add_writer(writer.plugin)
# RPC
self.server = None
self._billing_endpoint = BillingEndpoint(self)
self._init_messaging()
def _init_messaging(self):
target = messaging.Target(topic='cloudkitty',
server=CONF.host,
version='1.0')
endpoints = [
self._billing_endpoint,
]
self.server = rpc.get_server(target, endpoints)
self.server.start()
def _check_state(self):
def _get_this_month_timestamp():
now = datetime.datetime.now()
@ -112,7 +182,7 @@ class Orchestrator(object):
def _load_billing_processors(self):
self.b_processors = {}
processors = extension_manager.EnabledExtensionManager(
'cloudkitty.billing.processors',
PROCESSORS_NAMESPACE,
)
for processor in processors:
@ -120,12 +190,48 @@ class Orchestrator(object):
b_obj = processor.obj
self.b_processors[b_name] = b_obj
def process_quote(self, res_data):
for processor in self.b_processors.values():
processor.process(res_data)
price = 0.0
for res in res_data:
for res_usage in res['usage'].values():
for data in res_usage:
price += data.get('billing', {}).get('price', 0.0)
return price
def process_messages(self):
pending_reload = self._billing_endpoint.get_reload_list()
pending_states = self._billing_endpoint.get_module_state()
for name in pending_reload:
if name in self.b_processors:
if name in self.b_processors.keys():
LOG.info('Reloading configuration of {} module.'.format(
name))
self.b_processors[name].reload_config()
else:
LOG.info('Tried to reload a disabled module: {}.'.format(
name))
for name, status in pending_states.items():
if name in self.b_processors and not status:
LOG.info('Disabling {} module.'.format(name))
self.b_processors.pop(name)
else:
LOG.info('Enabling {} module.'.format(name))
processors = extension_manager.EnabledExtensionManager(
PROCESSORS_NAMESPACE)
for processor in processors:
if processor.name == name:
self.b_processors.append(processor)
def process(self):
while True:
self.process_messages()
timestamp = self._check_state()
if not timestamp:
print("Nothing left to do.")
break
eventlet.sleep(CONF.collect.period)
continue
for service in CONF.collect.services:
data = self._collect(service, timestamp)
@ -141,14 +247,3 @@ class Orchestrator(object):
self.wo.commit()
self.wo.close()
def main():
CONF(sys.argv[1:], project='cloudkitty')
logging.setup('cloudkitty')
orchestrator = Orchestrator()
orchestrator.process()
if __name__ == "__main__":
main()

42
cloudkitty/service.py Normal file
View File

@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
import socket
import sys
from oslo.config import cfg
from cloudkitty.openstack.common import log as logging
LOG = logging.getLogger(__name__)
service_opts = [
cfg.StrOpt('host',
default=socket.getfqdn(),
help='Name of this node. This can be an opaque identifier. '
'It is not necessarily a hostname, FQDN, or IP address. '
'However, the node name must be valid within '
'an AMQP key, and if using ZeroMQ, a valid '
'hostname, FQDN, or IP address.')
]
cfg.CONF.register_opts(service_opts)
def prepare_service():
cfg.CONF(sys.argv[1:], project='cloudkitty')
logging.setup('cloudkitty')

View File

@ -194,6 +194,18 @@
#control_exchange=openstack
#
# Options defined in cloudkitty.service
#
# Name of this node. This can be an opaque identifier. It is
# not necessarily a hostname, FQDN, or IP address. However,
# the node name must be valid within an AMQP key, and if using
# ZeroMQ, a valid hostname, FQDN, or IP address. (string
# value)
#host=cloudkitty
#
# Options defined in cloudkitty.api.app
#
@ -203,6 +215,17 @@
#api_paste_config=api_paste.ini
#
# Options defined in cloudkitty.openstack.common.lockutils
#
# Enables or disables inter-process locks. (boolean value)
#disable_process_locking=false
# Directory to use for lock files. (string value)
#lock_path=<None>
#
# Options defined in cloudkitty.openstack.common.log
#
@ -235,7 +258,7 @@
#logging_exception_prefix=%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s %(instance)s
# List of logger=LEVEL pairs. (list value)
#default_log_levels=amqp=WARN,amqplib=WARN,boto=WARN,qpid=WARN,sqlalchemy=WARN,suds=INFO,oslo.messaging=INFO,iso8601=WARN,requests.packages.urllib3.connectionpool=WARN,urllib3.connectionpool=WARN,websocket=WARN
#default_log_levels=amqp=WARN,amqplib=WARN,boto=WARN,qpid=WARN,sqlalchemy=WARN,suds=INFO,oslo.messaging=INFO,iso8601=WARN,requests.packages.urllib3.connectionpool=WARN,urllib3.connectionpool=WARN,websocket=WARN,keystonemiddleware=WARN,routes.middleware=WARN,stevedore=WARN
# Enables or disables publication of error events. (boolean
# value)

View File

@ -4,6 +4,7 @@ base=cloudkitty
module=config
module=importutils
module=jsonutils
module=lockutils
module=log
script=tools/config/check_uptodate.sh

View File

@ -2,6 +2,7 @@ python-ceilometerclient
python-keystoneclient
iso8601
PasteDeploy==1.5.2
posix_ipc
pecan==0.5.0
wsme
oslo.config>=1.2.0

View File

@ -22,7 +22,7 @@ packages =
console_scripts =
cloudkitty-api = cloudkitty.cli.api:main
cloudkitty-dbsync = cloudkitty.cli.dbsync:main
cloudkitty-processor = cloudkitty.orchestrator:main
cloudkitty-processor = cloudkitty.cli.processor:main
cloudkitty.collector.backends =
ceilometer = cloudkitty.collector.ceilometer:CeilometerCollector