7fc0d1e6c7
enable a listener in collector that listens to events queue. this patch also renames 'metering_*' options which are now used by both events and samples. Change-Id: I6250c91e913864c3973e2d93022e38ad8bed8328 Implements: blueprint notification-pipelines
261 lines
8.1 KiB
Python
261 lines
8.1 KiB
Python
# Copyright 2010 United States Government as represented by the
|
|
# Administrator of the National Aeronautics and Space Administration.
|
|
# Copyright 2011 Justin Santa Barbara
|
|
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""Utilities and helper functions."""
|
|
|
|
import bisect
|
|
import calendar
|
|
import copy
|
|
import datetime
|
|
import decimal
|
|
import hashlib
|
|
import multiprocessing
|
|
import struct
|
|
|
|
from oslo_concurrency import processutils
|
|
from oslo_config import cfg
|
|
from oslo_utils import timeutils
|
|
from oslo_utils import units
|
|
import six
|
|
|
|
|
|
OPTS = [
|
|
cfg.StrOpt('rootwrap_config',
|
|
default="/etc/ceilometer/rootwrap.conf",
|
|
help='Path to the rootwrap configuration file to'
|
|
'use for running commands as root'),
|
|
]
|
|
CONF = cfg.CONF
|
|
CONF.register_opts(OPTS)
|
|
|
|
|
|
def _get_root_helper():
|
|
return 'sudo ceilometer-rootwrap %s' % CONF.rootwrap_config
|
|
|
|
|
|
def execute(*cmd, **kwargs):
|
|
"""Convenience wrapper around oslo's execute() method."""
|
|
if 'run_as_root' in kwargs and 'root_helper' not in kwargs:
|
|
kwargs['root_helper'] = _get_root_helper()
|
|
return processutils.execute(*cmd, **kwargs)
|
|
|
|
|
|
def decode_unicode(input):
|
|
"""Decode the unicode of the message, and encode it into utf-8."""
|
|
if isinstance(input, dict):
|
|
temp = {}
|
|
# If the input data is a dict, create an equivalent dict with a
|
|
# predictable insertion order to avoid inconsistencies in the
|
|
# message signature computation for equivalent payloads modulo
|
|
# ordering
|
|
for key, value in sorted(six.iteritems(input)):
|
|
temp[decode_unicode(key)] = decode_unicode(value)
|
|
return temp
|
|
elif isinstance(input, (tuple, list)):
|
|
# When doing a pair of JSON encode/decode operations to the tuple,
|
|
# the tuple would become list. So we have to generate the value as
|
|
# list here.
|
|
return [decode_unicode(element) for element in input]
|
|
elif isinstance(input, six.text_type):
|
|
return input.encode('utf-8')
|
|
else:
|
|
return input
|
|
|
|
|
|
def recursive_keypairs(d, separator=':'):
|
|
"""Generator that produces sequence of keypairs for nested dictionaries."""
|
|
for name, value in sorted(six.iteritems(d)):
|
|
if isinstance(value, dict):
|
|
for subname, subvalue in recursive_keypairs(value, separator):
|
|
yield ('%s%s%s' % (name, separator, subname), subvalue)
|
|
elif isinstance(value, (tuple, list)):
|
|
yield name, decode_unicode(value)
|
|
else:
|
|
yield name, value
|
|
|
|
|
|
def restore_nesting(d, separator=':'):
|
|
"""Unwinds a flattened dict to restore nesting."""
|
|
d = copy.copy(d) if any([separator in k for k in d.keys()]) else d
|
|
for k, v in d.copy().items():
|
|
if separator in k:
|
|
top, rem = k.split(separator, 1)
|
|
nest = d[top] if isinstance(d.get(top), dict) else {}
|
|
nest[rem] = v
|
|
d[top] = restore_nesting(nest, separator)
|
|
del d[k]
|
|
return d
|
|
|
|
|
|
def dt_to_decimal(utc):
|
|
"""Datetime to Decimal.
|
|
|
|
Some databases don't store microseconds in datetime
|
|
so we always store as Decimal unixtime.
|
|
"""
|
|
if utc is None:
|
|
return None
|
|
|
|
decimal.getcontext().prec = 30
|
|
return (decimal.Decimal(str(calendar.timegm(utc.utctimetuple()))) +
|
|
(decimal.Decimal(str(utc.microsecond)) /
|
|
decimal.Decimal("1000000.0")))
|
|
|
|
|
|
def decimal_to_dt(dec):
|
|
"""Return a datetime from Decimal unixtime format."""
|
|
if dec is None:
|
|
return None
|
|
|
|
integer = int(dec)
|
|
micro = (dec - decimal.Decimal(integer)) * decimal.Decimal(units.M)
|
|
daittyme = datetime.datetime.utcfromtimestamp(integer)
|
|
return daittyme.replace(microsecond=int(round(micro)))
|
|
|
|
|
|
def sanitize_timestamp(timestamp):
|
|
"""Return a naive utc datetime object."""
|
|
if not timestamp:
|
|
return timestamp
|
|
if not isinstance(timestamp, datetime.datetime):
|
|
timestamp = timeutils.parse_isotime(timestamp)
|
|
return timeutils.normalize_time(timestamp)
|
|
|
|
|
|
def stringify_timestamps(data):
|
|
"""Stringify any datetimes in given dict."""
|
|
isa_timestamp = lambda v: isinstance(v, datetime.datetime)
|
|
return dict((k, v.isoformat() if isa_timestamp(v) else v)
|
|
for (k, v) in six.iteritems(data))
|
|
|
|
|
|
def dict_to_keyval(value, key_base=None):
|
|
"""Expand a given dict to its corresponding key-value pairs.
|
|
|
|
Generated keys are fully qualified, delimited using dot notation.
|
|
ie. key = 'key.child_key.grandchild_key[0]'
|
|
"""
|
|
val_iter, key_func = None, None
|
|
if isinstance(value, dict):
|
|
val_iter = six.iteritems(value)
|
|
key_func = lambda k: key_base + '.' + k if key_base else k
|
|
elif isinstance(value, (tuple, list)):
|
|
val_iter = enumerate(value)
|
|
key_func = lambda k: key_base + '[%d]' % k
|
|
|
|
if val_iter:
|
|
for k, v in val_iter:
|
|
key_gen = key_func(k)
|
|
if isinstance(v, dict) or isinstance(v, (tuple, list)):
|
|
for key_gen, v in dict_to_keyval(v, key_gen):
|
|
yield key_gen, v
|
|
else:
|
|
yield key_gen, v
|
|
|
|
|
|
def lowercase_keys(mapping):
|
|
"""Converts the values of the keys in mapping to lowercase."""
|
|
items = mapping.items()
|
|
for key, value in items:
|
|
del mapping[key]
|
|
mapping[key.lower()] = value
|
|
|
|
|
|
def lowercase_values(mapping):
|
|
"""Converts the values in the mapping dict to lowercase."""
|
|
items = mapping.items()
|
|
for key, value in items:
|
|
mapping[key] = value.lower()
|
|
|
|
|
|
def update_nested(original_dict, updates):
|
|
"""Updates the leaf nodes in a nest dict.
|
|
|
|
Updates occur without replacing entire sub-dicts.
|
|
"""
|
|
dict_to_update = copy.deepcopy(original_dict)
|
|
for key, value in six.iteritems(updates):
|
|
if isinstance(value, dict):
|
|
sub_dict = update_nested(dict_to_update.get(key, {}), value)
|
|
dict_to_update[key] = sub_dict
|
|
else:
|
|
dict_to_update[key] = updates[key]
|
|
return dict_to_update
|
|
|
|
|
|
def cpu_count():
|
|
try:
|
|
return multiprocessing.cpu_count() or 1
|
|
except NotImplementedError:
|
|
return 1
|
|
|
|
|
|
def uniq(dupes, attrs):
|
|
"""Exclude elements of dupes with a duplicated set of attribute values."""
|
|
key = lambda d: '/'.join([getattr(d, a) or '' for a in attrs])
|
|
keys = []
|
|
deduped = []
|
|
for d in dupes:
|
|
if key(d) not in keys:
|
|
deduped.append(d)
|
|
keys.append(key(d))
|
|
return deduped
|
|
|
|
|
|
def hash_of_set(s):
|
|
return str(hash(frozenset(s)))
|
|
|
|
|
|
class HashRing(object):
|
|
|
|
def __init__(self, nodes, replicas=100):
|
|
self._ring = dict()
|
|
self._sorted_keys = []
|
|
|
|
for node in nodes:
|
|
for r in six.moves.range(replicas):
|
|
hashed_key = self._hash('%s-%s' % (node, r))
|
|
self._ring[hashed_key] = node
|
|
self._sorted_keys.append(hashed_key)
|
|
self._sorted_keys.sort()
|
|
|
|
@staticmethod
|
|
def _hash(key):
|
|
return struct.unpack_from('>I',
|
|
hashlib.md5(str(key).encode()).digest())[0]
|
|
|
|
def _get_position_on_ring(self, key):
|
|
hashed_key = self._hash(key)
|
|
position = bisect.bisect(self._sorted_keys, hashed_key)
|
|
return position if position < len(self._sorted_keys) else 0
|
|
|
|
def get_node(self, key):
|
|
if not self._ring:
|
|
return None
|
|
pos = self._get_position_on_ring(key)
|
|
return self._ring[self._sorted_keys[pos]]
|
|
|
|
|
|
def kill_listeners(listeners):
|
|
# NOTE(gordc): correct usage of oslo.messaging listener is to stop(),
|
|
# which stops new messages, and wait(), which processes remaining
|
|
# messages and closes connection
|
|
for listener in listeners:
|
|
listener.stop()
|
|
listener.wait()
|