This change allow to get a disabled alarm. To do this, it: * add the ability to query the alarm-list API with the 'enabled' fields. * change the default visibility of alarm-list API from enabled only to all. Fixes bug #1231328 Change-Id: I80cd691696f7630d2ac9e9b2a69b5b0ae5ee7968
		
			
				
	
	
		
			976 lines
		
	
	
		
			36 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			976 lines
		
	
	
		
			36 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- encoding: utf-8 -*-
 | 
						|
#
 | 
						|
# Copyright © 2012 New Dream Network, LLC (DreamHost)
 | 
						|
# Copyright © 2013 eNovance
 | 
						|
#
 | 
						|
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
 | 
						|
#         Julien Danjou <julien@danjou.info>
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
						|
# not use this file except in compliance with the License. You may obtain
 | 
						|
# a copy of the License at
 | 
						|
#
 | 
						|
#      http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
						|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
						|
# License for the specific language governing permissions and limitations
 | 
						|
# under the License.
 | 
						|
"""MongoDB storage backend
 | 
						|
"""
 | 
						|
 | 
						|
import calendar
 | 
						|
import copy
 | 
						|
import operator
 | 
						|
import weakref
 | 
						|
 | 
						|
import bson.code
 | 
						|
import bson.objectid
 | 
						|
import json
 | 
						|
import pymongo
 | 
						|
 | 
						|
from oslo.config import cfg
 | 
						|
 | 
						|
from ceilometer.openstack.common import log
 | 
						|
from ceilometer import storage
 | 
						|
from ceilometer.storage import base
 | 
						|
from ceilometer.storage import models
 | 
						|
from ceilometer.openstack.common.gettextutils import _
 | 
						|
 | 
						|
cfg.CONF.import_opt('time_to_live', 'ceilometer.storage',
 | 
						|
                    group="database")
 | 
						|
 | 
						|
LOG = log.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
class MongoDBStorage(base.StorageEngine):
 | 
						|
    """Put the data into a MongoDB database
 | 
						|
 | 
						|
    Collections::
 | 
						|
 | 
						|
        - user
 | 
						|
          - { _id: user id
 | 
						|
              source: [ array of source ids reporting for the user ]
 | 
						|
              }
 | 
						|
        - project
 | 
						|
          - { _id: project id
 | 
						|
              source: [ array of source ids reporting for the project ]
 | 
						|
              }
 | 
						|
        - meter
 | 
						|
          - the raw incoming data
 | 
						|
        - resource
 | 
						|
          - the metadata for resources
 | 
						|
          - { _id: uuid of resource,
 | 
						|
              metadata: metadata dictionaries
 | 
						|
              user_id: uuid
 | 
						|
              project_id: uuid
 | 
						|
              meter: [ array of {counter_name: string, counter_type: string,
 | 
						|
                                 counter_unit: string} ]
 | 
						|
            }
 | 
						|
    """
 | 
						|
 | 
						|
    def get_connection(self, conf):
 | 
						|
        """Return a Connection instance based on the configuration settings.
 | 
						|
        """
 | 
						|
        return Connection(conf)
 | 
						|
 | 
						|
 | 
						|
def make_timestamp_range(start, end,
 | 
						|
                         start_timestamp_op=None, end_timestamp_op=None):
 | 
						|
    """Given two possible datetimes and their operations, create the query
 | 
						|
    document to find timestamps within that range.
 | 
						|
    By default, using $gte for the lower bound and $lt for the
 | 
						|
    upper bound.
 | 
						|
    """
 | 
						|
    ts_range = {}
 | 
						|
 | 
						|
    if start:
 | 
						|
        if start_timestamp_op == 'gt':
 | 
						|
            start_timestamp_op = '$gt'
 | 
						|
        else:
 | 
						|
            start_timestamp_op = '$gte'
 | 
						|
        ts_range[start_timestamp_op] = start
 | 
						|
 | 
						|
    if end:
 | 
						|
        if end_timestamp_op == 'le':
 | 
						|
            end_timestamp_op = '$lte'
 | 
						|
        else:
 | 
						|
            end_timestamp_op = '$lt'
 | 
						|
        ts_range[end_timestamp_op] = end
 | 
						|
    return ts_range
 | 
						|
 | 
						|
 | 
						|
def make_query_from_filter(sample_filter, require_meter=True):
 | 
						|
    """Return a query dictionary based on the settings in the filter.
 | 
						|
 | 
						|
    :param filter: SampleFilter instance
 | 
						|
    :param require_meter: If true and the filter does not have a meter,
 | 
						|
                          raise an error.
 | 
						|
    """
 | 
						|
    q = {}
 | 
						|
 | 
						|
    if sample_filter.user:
 | 
						|
        q['user_id'] = sample_filter.user
 | 
						|
    if sample_filter.project:
 | 
						|
        q['project_id'] = sample_filter.project
 | 
						|
 | 
						|
    if sample_filter.meter:
 | 
						|
        q['counter_name'] = sample_filter.meter
 | 
						|
    elif require_meter:
 | 
						|
        raise RuntimeError('Missing required meter specifier')
 | 
						|
 | 
						|
    ts_range = make_timestamp_range(sample_filter.start, sample_filter.end,
 | 
						|
                                    sample_filter.start_timestamp_op,
 | 
						|
                                    sample_filter.end_timestamp_op)
 | 
						|
    if ts_range:
 | 
						|
        q['timestamp'] = ts_range
 | 
						|
 | 
						|
    if sample_filter.resource:
 | 
						|
        q['resource_id'] = sample_filter.resource
 | 
						|
    if sample_filter.source:
 | 
						|
        q['source'] = sample_filter.source
 | 
						|
 | 
						|
    # so the samples call metadata resource_metadata, so we convert
 | 
						|
    # to that.
 | 
						|
    q.update(dict(('resource_%s' % k, v)
 | 
						|
                  for (k, v) in sample_filter.metaquery.iteritems()))
 | 
						|
    return q
 | 
						|
 | 
						|
 | 
						|
class ConnectionPool(object):
 | 
						|
 | 
						|
    def __init__(self):
 | 
						|
        self._pool = {}
 | 
						|
 | 
						|
    def connect(self, url):
 | 
						|
        connection_options = pymongo.uri_parser.parse_uri(url)
 | 
						|
        del connection_options['database']
 | 
						|
        del connection_options['username']
 | 
						|
        del connection_options['password']
 | 
						|
        del connection_options['collection']
 | 
						|
        pool_key = tuple(connection_options)
 | 
						|
 | 
						|
        if pool_key in self._pool:
 | 
						|
            client = self._pool.get(pool_key)()
 | 
						|
            if client:
 | 
						|
                return client
 | 
						|
        LOG.info('connecting to MongoDB on %s', url)
 | 
						|
        client = pymongo.MongoClient(
 | 
						|
            url,
 | 
						|
            safe=True)
 | 
						|
        self._pool[pool_key] = weakref.ref(client)
 | 
						|
        return client
 | 
						|
 | 
						|
 | 
						|
class Connection(base.Connection):
 | 
						|
    """MongoDB connection.
 | 
						|
    """
 | 
						|
 | 
						|
    CONNECTION_POOL = ConnectionPool()
 | 
						|
 | 
						|
    REDUCE_GROUP_CLEAN = bson.code.Code("""
 | 
						|
    function ( curr, result ) {
 | 
						|
        if (result.resources.indexOf(curr.resource_id) < 0)
 | 
						|
            result.resources.push(curr.resource_id);
 | 
						|
        if (result.users.indexOf(curr.user_id) < 0)
 | 
						|
            result.users.push(curr.user_id);
 | 
						|
        if (result.projects.indexOf(curr.project_id) < 0)
 | 
						|
            result.projects.push(curr.project_id);
 | 
						|
    }
 | 
						|
    """)
 | 
						|
 | 
						|
    EMIT_STATS_COMMON = """
 | 
						|
        emit(%(key_val)s, { unit: this.counter_unit,
 | 
						|
                            min : this.counter_volume,
 | 
						|
                            max : this.counter_volume,
 | 
						|
                            sum : this.counter_volume,
 | 
						|
                            count : NumberInt(1),
 | 
						|
                            groupby : %(groupby_val)s,
 | 
						|
                            duration_start : this.timestamp,
 | 
						|
                            duration_end : this.timestamp,
 | 
						|
                            period_start : %(period_start_val)s,
 | 
						|
                            period_end : %(period_end_val)s} )
 | 
						|
    """
 | 
						|
 | 
						|
    MAP_STATS_PERIOD_VAR = """
 | 
						|
        var period = %(period)d * 1000;
 | 
						|
        var period_first = %(period_first)d * 1000;
 | 
						|
        var period_start = period_first
 | 
						|
                           + (Math.floor(new Date(this.timestamp.getTime()
 | 
						|
                                         - period_first) / period)
 | 
						|
                              * period);
 | 
						|
    """
 | 
						|
 | 
						|
    MAP_STATS_GROUPBY_VAR = """
 | 
						|
        var groupby_fields = %(groupby_fields)s;
 | 
						|
        var groupby = {};
 | 
						|
        var groupby_key = {};
 | 
						|
 | 
						|
        for ( var i=0; i<groupby_fields.length; i++ ) {
 | 
						|
            groupby[groupby_fields[i]] = this[groupby_fields[i]]
 | 
						|
            groupby_key[groupby_fields[i]] = this[groupby_fields[i]]
 | 
						|
        }
 | 
						|
    """
 | 
						|
 | 
						|
    PARAMS_MAP_STATS = {'key_val': '\'statistics\'',
 | 
						|
                        'groupby_val': 'null',
 | 
						|
                        'period_start_val': 'this.timestamp',
 | 
						|
                        'period_end_val': 'this.timestamp'}
 | 
						|
 | 
						|
    MAP_STATS = bson.code.Code("function () {" +
 | 
						|
                               EMIT_STATS_COMMON % PARAMS_MAP_STATS +
 | 
						|
                               "}")
 | 
						|
 | 
						|
    PARAMS_MAP_STATS_PERIOD = {
 | 
						|
        'key_val': 'period_start',
 | 
						|
        'groupby_val': 'null',
 | 
						|
        'period_start_val': 'new Date(period_start)',
 | 
						|
        'period_end_val': 'new Date(period_start + period)'
 | 
						|
    }
 | 
						|
 | 
						|
    MAP_STATS_PERIOD = bson.code.Code(
 | 
						|
        "function () {" +
 | 
						|
        MAP_STATS_PERIOD_VAR +
 | 
						|
        EMIT_STATS_COMMON % PARAMS_MAP_STATS_PERIOD +
 | 
						|
        "}")
 | 
						|
 | 
						|
    PARAMS_MAP_STATS_GROUPBY = {'key_val': 'groupby_key',
 | 
						|
                                'groupby_val': 'groupby',
 | 
						|
                                'period_start_val': 'this.timestamp',
 | 
						|
                                'period_end_val': 'this.timestamp'}
 | 
						|
 | 
						|
    MAP_STATS_GROUPBY = bson.code.Code(
 | 
						|
        "function () {" +
 | 
						|
        MAP_STATS_GROUPBY_VAR +
 | 
						|
        EMIT_STATS_COMMON % PARAMS_MAP_STATS_GROUPBY +
 | 
						|
        "}")
 | 
						|
 | 
						|
    PARAMS_MAP_STATS_PERIOD_GROUPBY = {
 | 
						|
        'key_val': 'groupby_key',
 | 
						|
        'groupby_val': 'groupby',
 | 
						|
        'period_start_val': 'new Date(period_start)',
 | 
						|
        'period_end_val': 'new Date(period_start + period)'
 | 
						|
    }
 | 
						|
 | 
						|
    MAP_STATS_PERIOD_GROUPBY = bson.code.Code(
 | 
						|
        "function () {" +
 | 
						|
        MAP_STATS_PERIOD_VAR +
 | 
						|
        MAP_STATS_GROUPBY_VAR +
 | 
						|
        "    groupby_key['period_start'] = period_start\n" +
 | 
						|
        EMIT_STATS_COMMON % PARAMS_MAP_STATS_PERIOD_GROUPBY +
 | 
						|
        "}")
 | 
						|
 | 
						|
    REDUCE_STATS = bson.code.Code("""
 | 
						|
    function (key, values) {
 | 
						|
        var res = { unit: values[0].unit,
 | 
						|
                    min: values[0].min,
 | 
						|
                    max: values[0].max,
 | 
						|
                    count: values[0].count,
 | 
						|
                    sum: values[0].sum,
 | 
						|
                    groupby: values[0].groupby,
 | 
						|
                    period_start: values[0].period_start,
 | 
						|
                    period_end: values[0].period_end,
 | 
						|
                    duration_start: values[0].duration_start,
 | 
						|
                    duration_end: values[0].duration_end };
 | 
						|
        for ( var i=1; i<values.length; i++ ) {
 | 
						|
            if ( values[i].min < res.min )
 | 
						|
               res.min = values[i].min;
 | 
						|
            if ( values[i].max > res.max )
 | 
						|
               res.max = values[i].max;
 | 
						|
            res.count = NumberInt(res.count + values[i].count);
 | 
						|
            res.sum += values[i].sum;
 | 
						|
            if ( values[i].duration_start < res.duration_start )
 | 
						|
               res.duration_start = values[i].duration_start;
 | 
						|
            if ( values[i].duration_end > res.duration_end )
 | 
						|
               res.duration_end = values[i].duration_end;
 | 
						|
        }
 | 
						|
        return res;
 | 
						|
    }
 | 
						|
    """)
 | 
						|
 | 
						|
    FINALIZE_STATS = bson.code.Code("""
 | 
						|
    function (key, value) {
 | 
						|
        value.avg = value.sum / value.count;
 | 
						|
        value.duration = (value.duration_end - value.duration_start) / 1000;
 | 
						|
        value.period = NumberInt((value.period_end - value.period_start)
 | 
						|
                                  / 1000);
 | 
						|
        return value;
 | 
						|
    }""")
 | 
						|
 | 
						|
    SORT_OPERATION_MAPPING = {'desc': (pymongo.DESCENDING, '$lt'),
 | 
						|
                              'asc': (pymongo.ASCENDING, '$gt')}
 | 
						|
 | 
						|
    def __init__(self, conf):
 | 
						|
        url = conf.database.connection
 | 
						|
 | 
						|
        # NOTE(jd) Use our own connection pooling on top of the Pymongo one.
 | 
						|
        # We need that otherwise we overflow the MongoDB instance with new
 | 
						|
        # connection since we instanciate a Pymongo client each time someone
 | 
						|
        # requires a new storage connection.
 | 
						|
        self.conn = self.CONNECTION_POOL.connect(url)
 | 
						|
 | 
						|
        # Require MongoDB 2.2 to use aggregate() and TTL
 | 
						|
        if self.conn.server_info()['versionArray'] < [2, 2]:
 | 
						|
            raise storage.StorageBadVersion("Need at least MongoDB 2.2")
 | 
						|
 | 
						|
        connection_options = pymongo.uri_parser.parse_uri(url)
 | 
						|
        self.db = getattr(self.conn, connection_options['database'])
 | 
						|
        if connection_options.get('username'):
 | 
						|
            self.db.authenticate(connection_options['username'],
 | 
						|
                                 connection_options['password'])
 | 
						|
 | 
						|
        # NOTE(jd) Upgrading is just about creating index, so let's do this
 | 
						|
        # on connection to be sure at least the TTL is correcly updated if
 | 
						|
        # needed.
 | 
						|
        self.upgrade()
 | 
						|
 | 
						|
    def upgrade(self):
 | 
						|
        # Establish indexes
 | 
						|
        #
 | 
						|
        # We need variations for user_id vs. project_id because of the
 | 
						|
        # way the indexes are stored in b-trees. The user_id and
 | 
						|
        # project_id values are usually mutually exclusive in the
 | 
						|
        # queries, so the database won't take advantage of an index
 | 
						|
        # including both.
 | 
						|
        for primary in ['user_id', 'project_id']:
 | 
						|
            self.db.resource.ensure_index([
 | 
						|
                (primary, pymongo.ASCENDING),
 | 
						|
                ('source', pymongo.ASCENDING),
 | 
						|
            ], name='resource_idx')
 | 
						|
            self.db.meter.ensure_index([
 | 
						|
                ('resource_id', pymongo.ASCENDING),
 | 
						|
                (primary, pymongo.ASCENDING),
 | 
						|
                ('counter_name', pymongo.ASCENDING),
 | 
						|
                ('timestamp', pymongo.ASCENDING),
 | 
						|
                ('source', pymongo.ASCENDING),
 | 
						|
            ], name='meter_idx')
 | 
						|
        self.db.meter.ensure_index([('timestamp', pymongo.DESCENDING)],
 | 
						|
                                   name='timestamp_idx')
 | 
						|
 | 
						|
        indexes = self.db.meter.index_information()
 | 
						|
 | 
						|
        ttl = cfg.CONF.database.time_to_live
 | 
						|
 | 
						|
        if ttl <= 0:
 | 
						|
            if 'meter_ttl' in indexes:
 | 
						|
                self.db.meter.drop_index('meter_ttl')
 | 
						|
            return
 | 
						|
 | 
						|
        if 'meter_ttl' in indexes:
 | 
						|
            # NOTE(sileht): manually check expireAfterSeconds because
 | 
						|
            # ensure_index doesn't update index options if the index already
 | 
						|
            # exists
 | 
						|
            if ttl == indexes['meter_ttl'].get('expireAfterSeconds', -1):
 | 
						|
                return
 | 
						|
 | 
						|
            self.db.meter.drop_index('meter_ttl')
 | 
						|
 | 
						|
        self.db.meter.create_index(
 | 
						|
            [('timestamp', pymongo.ASCENDING)],
 | 
						|
            expireAfterSeconds=ttl,
 | 
						|
            name='meter_ttl'
 | 
						|
        )
 | 
						|
 | 
						|
    def clear(self):
 | 
						|
        self.conn.drop_database(self.db)
 | 
						|
        # Connection will be reopened automatically if needed
 | 
						|
        self.conn.close()
 | 
						|
 | 
						|
    def record_metering_data(self, data):
 | 
						|
        """Write the data to the backend storage system.
 | 
						|
 | 
						|
        :param data: a dictionary such as returned by
 | 
						|
                     ceilometer.meter.meter_message_from_counter
 | 
						|
        """
 | 
						|
        # Make sure we know about the user and project
 | 
						|
        self.db.user.update(
 | 
						|
            {'_id': data['user_id']},
 | 
						|
            {'$addToSet': {'source': data['source'],
 | 
						|
                           },
 | 
						|
             },
 | 
						|
            upsert=True,
 | 
						|
        )
 | 
						|
        self.db.project.update(
 | 
						|
            {'_id': data['project_id']},
 | 
						|
            {'$addToSet': {'source': data['source'],
 | 
						|
                           },
 | 
						|
             },
 | 
						|
            upsert=True,
 | 
						|
        )
 | 
						|
 | 
						|
        # Record the updated resource metadata
 | 
						|
        self.db.resource.update(
 | 
						|
            {'_id': data['resource_id']},
 | 
						|
            {'$set': {'project_id': data['project_id'],
 | 
						|
                      'user_id': data['user_id'],
 | 
						|
                      'metadata': data['resource_metadata'],
 | 
						|
                      'source': data['source'],
 | 
						|
                      },
 | 
						|
             '$addToSet': {'meter': {'counter_name': data['counter_name'],
 | 
						|
                                     'counter_type': data['counter_type'],
 | 
						|
                                     'counter_unit': data['counter_unit'],
 | 
						|
                                     },
 | 
						|
                           },
 | 
						|
             },
 | 
						|
            upsert=True,
 | 
						|
        )
 | 
						|
 | 
						|
        # Record the raw data for the meter. Use a copy so we do not
 | 
						|
        # modify a data structure owned by our caller (the driver adds
 | 
						|
        # a new key '_id').
 | 
						|
        record = copy.copy(data)
 | 
						|
        self.db.meter.insert(record)
 | 
						|
 | 
						|
    def clear_expired_metering_data(self, ttl):
 | 
						|
        """Clear expired data from the backend storage system according to the
 | 
						|
        time-to-live.
 | 
						|
 | 
						|
        :param ttl: Number of seconds to keep records for.
 | 
						|
 | 
						|
        """
 | 
						|
        results = self.db.meter.group(
 | 
						|
            key={},
 | 
						|
            condition={},
 | 
						|
            reduce=self.REDUCE_GROUP_CLEAN,
 | 
						|
            initial={
 | 
						|
                'resources': [],
 | 
						|
                'users': [],
 | 
						|
                'projects': [],
 | 
						|
            }
 | 
						|
        )[0]
 | 
						|
 | 
						|
        self.db.user.remove({'_id': {'$nin': results['users']}})
 | 
						|
        self.db.project.remove({'_id': {'$nin': results['projects']}})
 | 
						|
        self.db.resource.remove({'_id': {'$nin': results['resources']}})
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def _get_marker(db_collection, marker_pairs):
 | 
						|
        """Return the mark document according to the attribute-value pairs.
 | 
						|
 | 
						|
        :param db_collection: Database collection that be query.
 | 
						|
        :param maker_pairs: Attribute-value pairs filter.
 | 
						|
        """
 | 
						|
        if db_collection is None:
 | 
						|
            return
 | 
						|
        if not marker_pairs:
 | 
						|
            return
 | 
						|
        ret = db_collection.find(marker_pairs, limit=2)
 | 
						|
 | 
						|
        if ret.count() == 0:
 | 
						|
            raise base.NoResultFound
 | 
						|
        elif ret.count() > 1:
 | 
						|
            raise base.MultipleResultsFound
 | 
						|
        else:
 | 
						|
            _ret = ret.__getitem__(0)
 | 
						|
            return _ret
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def _recurse_sort_keys(cls, sort_keys, marker, flag):
 | 
						|
        _first = sort_keys[0]
 | 
						|
        value = marker[_first]
 | 
						|
        if len(sort_keys) == 1:
 | 
						|
            return {_first: {flag: value}}
 | 
						|
        else:
 | 
						|
            criteria_equ = {_first: {'eq': value}}
 | 
						|
            criteria_cmp = cls._recurse_sort_keys(sort_keys[1:], marker, flag)
 | 
						|
        return dict(criteria_equ, ** criteria_cmp)
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def _build_paginate_query(cls, marker, sort_keys=[], sort_dir='desc'):
 | 
						|
        """Returns a query with sorting / pagination.
 | 
						|
 | 
						|
        Pagination works by requiring sort_key and sort_dir.
 | 
						|
        We use the last item in previous page as the 'marker' for pagination.
 | 
						|
        So we return values that follow the passed marker in the order.
 | 
						|
        :param q: The query dict passed in.
 | 
						|
        :param marker: the last item of the previous page; we return the next
 | 
						|
                       results after this item.
 | 
						|
        :param sort_keys: array of attributes by which results be sorted.
 | 
						|
        :param sort_dir: direction in which results be sorted (asc, desc).
 | 
						|
        :return: sort parameters, query to use
 | 
						|
        """
 | 
						|
        all_sort = []
 | 
						|
        all_sort, _op = cls._build_sort_instructions(sort_keys, sort_dir)
 | 
						|
 | 
						|
        if marker is not None:
 | 
						|
            sort_criteria_list = []
 | 
						|
 | 
						|
            for i in range(0, len(sort_keys)):
 | 
						|
                sort_criteria_list.append(cls._recurse_sort_keys(
 | 
						|
                                          sort_keys[:(len(sort_keys) - i)],
 | 
						|
                                          marker, _op))
 | 
						|
 | 
						|
            metaquery = {"$or": sort_criteria_list}
 | 
						|
        else:
 | 
						|
            metaquery = {}
 | 
						|
 | 
						|
        return all_sort, metaquery
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def _build_sort_instructions(cls, sort_keys=[], sort_dir='desc'):
 | 
						|
        """Returns a sort_instruction and paging operator.
 | 
						|
 | 
						|
        Sort instructions are used in the query to determine what attributes
 | 
						|
        to sort on and what direction to use.
 | 
						|
        :param q: The query dict passed in.
 | 
						|
        :param sort_keys: array of attributes by which results be sorted.
 | 
						|
        :param sort_dir: direction in which results be sorted (asc, desc).
 | 
						|
        :return: sort instructions and paging operator
 | 
						|
        """
 | 
						|
        sort_instructions = []
 | 
						|
        _sort_dir, operation = cls.SORT_OPERATION_MAPPING.get(
 | 
						|
            sort_dir, cls.SORT_OPERATION_MAPPING['desc'])
 | 
						|
 | 
						|
        for _sort_key in sort_keys:
 | 
						|
            _instruction = (_sort_key, _sort_dir)
 | 
						|
            sort_instructions.append(_instruction)
 | 
						|
 | 
						|
        return sort_instructions, operation
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def paginate_query(cls, q, db_collection, limit=None, marker=None,
 | 
						|
                       sort_keys=[], sort_dir='desc'):
 | 
						|
        """Returns a query result with sorting / pagination.
 | 
						|
 | 
						|
        Pagination works by requiring sort_key and sort_dir.
 | 
						|
        We use the last item in previous page as the 'marker' for pagination.
 | 
						|
        So we return values that follow the passed marker in the order.
 | 
						|
        :param q: the query dict passed in.
 | 
						|
        :param db_collection: Database collection that be query.
 | 
						|
        :param limit: maximum number of items to return.
 | 
						|
        :param marker: the last item of the previous page; we return the next
 | 
						|
                       results after this item.
 | 
						|
        :param sort_keys: array of attributes by which results be sorted.
 | 
						|
        :param sort_dir: direction in which results be sorted (asc, desc).
 | 
						|
        return: The query with sorting/pagination added.
 | 
						|
        """
 | 
						|
 | 
						|
        all_sort, query = cls._build_paginate_query(marker,
 | 
						|
                                                    sort_keys,
 | 
						|
                                                    sort_dir)
 | 
						|
        q.update(query)
 | 
						|
 | 
						|
        #NOTE(Fengqian):MongoDB collection.find can not handle limit
 | 
						|
        #when it equals None, it will raise TypeError, so we treate
 | 
						|
        #None as 0 for the value of limit.
 | 
						|
        if limit is None:
 | 
						|
            limit = 0
 | 
						|
        return db_collection.find(q, limit=limit, sort=all_sort)
 | 
						|
 | 
						|
    def get_users(self, source=None):
 | 
						|
        """Return an iterable of user id strings.
 | 
						|
 | 
						|
        :param source: Optional source filter.
 | 
						|
        """
 | 
						|
        q = {}
 | 
						|
        if source is not None:
 | 
						|
            q['source'] = source
 | 
						|
 | 
						|
        return (doc['_id'] for doc in
 | 
						|
                self.db.user.find(q, fields=['_id'],
 | 
						|
                                  sort=[('_id', pymongo.ASCENDING)]))
 | 
						|
 | 
						|
    def get_projects(self, source=None):
 | 
						|
        """Return an iterable of project id strings.
 | 
						|
 | 
						|
        :param source: Optional source filter.
 | 
						|
        """
 | 
						|
        q = {}
 | 
						|
        if source is not None:
 | 
						|
            q['source'] = source
 | 
						|
 | 
						|
        return (doc['_id'] for doc in
 | 
						|
                self.db.project.find(q, fields=['_id'],
 | 
						|
                                     sort=[('_id', pymongo.ASCENDING)]))
 | 
						|
 | 
						|
    def get_resources(self, user=None, project=None, source=None,
 | 
						|
                      start_timestamp=None, start_timestamp_op=None,
 | 
						|
                      end_timestamp=None, end_timestamp_op=None,
 | 
						|
                      metaquery={}, resource=None, pagination=None):
 | 
						|
        """Return an iterable of models.Resource instances
 | 
						|
 | 
						|
        :param user: Optional ID for user that owns the resource.
 | 
						|
        :param project: Optional ID for project that owns the resource.
 | 
						|
        :param source: Optional source filter.
 | 
						|
        :param start_timestamp: Optional modified timestamp start range.
 | 
						|
        :param start_timestamp_op: Optional start time operator, like gt, ge.
 | 
						|
        :param end_timestamp: Optional modified timestamp end range.
 | 
						|
        :param end_timestamp_op: Optional end time operator, like lt, le.
 | 
						|
        :param metaquery: Optional dict with metadata to match on.
 | 
						|
        :param resource: Optional resource filter.
 | 
						|
        :param pagination: Optional pagination query.
 | 
						|
        """
 | 
						|
        if pagination:
 | 
						|
            raise NotImplementedError(_('Pagination not implemented'))
 | 
						|
 | 
						|
        q = {}
 | 
						|
        if user is not None:
 | 
						|
            q['user_id'] = user
 | 
						|
        if project is not None:
 | 
						|
            q['project_id'] = project
 | 
						|
        if source is not None:
 | 
						|
            q['source'] = source
 | 
						|
        if resource is not None:
 | 
						|
            q['resource_id'] = resource
 | 
						|
        # Add resource_ prefix so it matches the field in the db
 | 
						|
        q.update(dict(('resource_' + k, v)
 | 
						|
                      for (k, v) in metaquery.iteritems()))
 | 
						|
 | 
						|
        # FIXME(dhellmann): This may not perform very well,
 | 
						|
        # but doing any better will require changing the database
 | 
						|
        # schema and that will need more thought than I have time
 | 
						|
        # to put into it today.
 | 
						|
        if start_timestamp or end_timestamp:
 | 
						|
            # Look for resources matching the above criteria and with
 | 
						|
            # samples in the time range we care about, then change the
 | 
						|
            # resource query to return just those resources by id.
 | 
						|
            ts_range = make_timestamp_range(start_timestamp, end_timestamp,
 | 
						|
                                            start_timestamp_op,
 | 
						|
                                            end_timestamp_op)
 | 
						|
            if ts_range:
 | 
						|
                q['timestamp'] = ts_range
 | 
						|
 | 
						|
        sort_keys = base._handle_sort_key('resource')
 | 
						|
        sort_instructions = self._build_sort_instructions(sort_keys)[0]
 | 
						|
 | 
						|
        aggregate = self.db.meter.aggregate([
 | 
						|
            {"$match": q},
 | 
						|
            {"$sort": dict(sort_instructions)},
 | 
						|
            {"$group": {
 | 
						|
                "_id": "$resource_id",
 | 
						|
                "user_id": {"$first": "$user_id"},
 | 
						|
                "project_id": {"$first": "$project_id"},
 | 
						|
                "source": {"$first": "$source"},
 | 
						|
                "first_sample_timestamp": {"$min": "$timestamp"},
 | 
						|
                "last_sample_timestamp": {"$max": "$timestamp"},
 | 
						|
                "metadata": {"$first": "$resource_metadata"},
 | 
						|
                "meters_name": {"$push": "$counter_name"},
 | 
						|
                "meters_type": {"$push": "$counter_type"},
 | 
						|
                "meters_unit": {"$push": "$counter_unit"},
 | 
						|
            }},
 | 
						|
        ])
 | 
						|
 | 
						|
        for result in aggregate['result']:
 | 
						|
            yield models.Resource(
 | 
						|
                resource_id=result['_id'],
 | 
						|
                user_id=result['user_id'],
 | 
						|
                project_id=result['project_id'],
 | 
						|
                first_sample_timestamp=result['first_sample_timestamp'],
 | 
						|
                last_sample_timestamp=result['last_sample_timestamp'],
 | 
						|
                source=result['source'],
 | 
						|
                metadata=result['metadata'],
 | 
						|
                meter=[
 | 
						|
                    models.ResourceMeter(
 | 
						|
                        counter_name=m_n,
 | 
						|
                        counter_type=m_t,
 | 
						|
                        counter_unit=m_u,
 | 
						|
                    )
 | 
						|
                    for m_n, m_u, m_t in zip(result['meters_name'],
 | 
						|
                                             result['meters_unit'],
 | 
						|
                                             result['meters_type'])
 | 
						|
                ],
 | 
						|
            )
 | 
						|
 | 
						|
    def get_meters(self, user=None, project=None, resource=None, source=None,
 | 
						|
                   metaquery={}, pagination=None):
 | 
						|
        """Return an iterable of models.Meter instances
 | 
						|
 | 
						|
        :param user: Optional ID for user that owns the resource.
 | 
						|
        :param project: Optional ID for project that owns the resource.
 | 
						|
        :param resource: Optional resource filter.
 | 
						|
        :param source: Optional source filter.
 | 
						|
        :param metaquery: Optional dict with metadata to match on.
 | 
						|
        :param pagination: Optional pagination query.
 | 
						|
        """
 | 
						|
        if pagination:
 | 
						|
            raise NotImplementedError(_('Pagination not implemented'))
 | 
						|
 | 
						|
        q = {}
 | 
						|
        if user is not None:
 | 
						|
            q['user_id'] = user
 | 
						|
        if project is not None:
 | 
						|
            q['project_id'] = project
 | 
						|
        if resource is not None:
 | 
						|
            q['_id'] = resource
 | 
						|
        if source is not None:
 | 
						|
            q['source'] = source
 | 
						|
        q.update(metaquery)
 | 
						|
 | 
						|
        for r in self.db.resource.find(q):
 | 
						|
            for r_meter in r['meter']:
 | 
						|
                yield models.Meter(
 | 
						|
                    name=r_meter['counter_name'],
 | 
						|
                    type=r_meter['counter_type'],
 | 
						|
                    # Return empty string if 'counter_unit' is not valid for
 | 
						|
                    # backward compatibility.
 | 
						|
                    unit=r_meter.get('counter_unit', ''),
 | 
						|
                    resource_id=r['_id'],
 | 
						|
                    project_id=r['project_id'],
 | 
						|
                    source=r['source'],
 | 
						|
                    user_id=r['user_id'],
 | 
						|
                )
 | 
						|
 | 
						|
    def get_samples(self, sample_filter, limit=None):
 | 
						|
        """Return an iterable of model.Sample instances.
 | 
						|
 | 
						|
        :param sample_filter: Filter.
 | 
						|
        :param limit: Maximum number of results to return.
 | 
						|
        """
 | 
						|
        if limit == 0:
 | 
						|
            return
 | 
						|
        q = make_query_from_filter(sample_filter, require_meter=False)
 | 
						|
        if limit:
 | 
						|
            samples = self.db.meter.find(
 | 
						|
                q, limit=limit, sort=[("timestamp", pymongo.DESCENDING)])
 | 
						|
        else:
 | 
						|
            samples = self.db.meter.find(
 | 
						|
                q, sort=[("timestamp", pymongo.DESCENDING)])
 | 
						|
 | 
						|
        for s in samples:
 | 
						|
            # Remove the ObjectId generated by the database when
 | 
						|
            # the sample was inserted. It is an implementation
 | 
						|
            # detail that should not leak outside of the driver.
 | 
						|
            del s['_id']
 | 
						|
            # Backward compatibility for samples without units
 | 
						|
            s['counter_unit'] = s.get('counter_unit', '')
 | 
						|
            yield models.Sample(**s)
 | 
						|
 | 
						|
    def get_meter_statistics(self, sample_filter, period=None, groupby=None):
 | 
						|
        """Return an iterable of models.Statistics instance containing meter
 | 
						|
        statistics described by the query parameters.
 | 
						|
 | 
						|
        The filter must have a meter value set.
 | 
						|
 | 
						|
        """
 | 
						|
        if (groupby and
 | 
						|
                set(groupby) - set(['user_id', 'project_id',
 | 
						|
                                    'resource_id', 'source'])):
 | 
						|
            raise NotImplementedError("Unable to group by these fields")
 | 
						|
 | 
						|
        q = make_query_from_filter(sample_filter)
 | 
						|
 | 
						|
        if period:
 | 
						|
            if sample_filter.start:
 | 
						|
                period_start = sample_filter.start
 | 
						|
            else:
 | 
						|
                period_start = self.db.meter.find(
 | 
						|
                    limit=1, sort=[('timestamp',
 | 
						|
                                    pymongo.ASCENDING)])[0]['timestamp']
 | 
						|
            period_start = int(calendar.timegm(period_start.utctimetuple()))
 | 
						|
            params_period = {'period': period,
 | 
						|
                             'period_first': period_start,
 | 
						|
                             'groupby_fields': json.dumps(groupby)}
 | 
						|
            if groupby:
 | 
						|
                map_stats = self.MAP_STATS_PERIOD_GROUPBY % params_period
 | 
						|
            else:
 | 
						|
                map_stats = self.MAP_STATS_PERIOD % params_period
 | 
						|
        else:
 | 
						|
            if groupby:
 | 
						|
                params_groupby = {'groupby_fields': json.dumps(groupby)}
 | 
						|
                map_stats = self.MAP_STATS_GROUPBY % params_groupby
 | 
						|
            else:
 | 
						|
                map_stats = self.MAP_STATS
 | 
						|
 | 
						|
        results = self.db.meter.map_reduce(
 | 
						|
            map_stats,
 | 
						|
            self.REDUCE_STATS,
 | 
						|
            {'inline': 1},
 | 
						|
            finalize=self.FINALIZE_STATS,
 | 
						|
            query=q,
 | 
						|
        )
 | 
						|
 | 
						|
        # FIXME(terriyu) Fix get_meter_statistics() so we don't use sorted()
 | 
						|
        # to return the results
 | 
						|
        return sorted(
 | 
						|
            (models.Statistics(**(r['value'])) for r in results['results']),
 | 
						|
            key=operator.attrgetter('period_start'))
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def _decode_matching_metadata(matching_metadata):
 | 
						|
        if isinstance(matching_metadata, dict):
 | 
						|
            #note(sileht): keep compatibility with alarm
 | 
						|
            #with matching_metadata as a dict
 | 
						|
            return matching_metadata
 | 
						|
        else:
 | 
						|
            new_matching_metadata = {}
 | 
						|
            for elem in matching_metadata:
 | 
						|
                new_matching_metadata[elem['key']] = elem['value']
 | 
						|
            return new_matching_metadata
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def _ensure_encapsulated_rule_format(cls, alarm):
 | 
						|
        """This ensure the alarm returned by the storage have the correct
 | 
						|
        format. The previous format looks like:
 | 
						|
        {
 | 
						|
            'alarm_id': '0ld-4l3rt',
 | 
						|
            'enabled': True,
 | 
						|
            'name': 'old-alert',
 | 
						|
            'description': 'old-alert',
 | 
						|
            'timestamp': None,
 | 
						|
            'meter_name': 'cpu',
 | 
						|
            'user_id': 'me',
 | 
						|
            'project_id': 'and-da-boys',
 | 
						|
            'comparison_operator': 'lt',
 | 
						|
            'threshold': 36,
 | 
						|
            'statistic': 'count',
 | 
						|
            'evaluation_periods': 1,
 | 
						|
            'period': 60,
 | 
						|
            'state': "insufficient data",
 | 
						|
            'state_timestamp': None,
 | 
						|
            'ok_actions': [],
 | 
						|
            'alarm_actions': ['http://nowhere/alarms'],
 | 
						|
            'insufficient_data_actions': [],
 | 
						|
            'repeat_actions': False,
 | 
						|
            'matching_metadata': {'key': 'value'}
 | 
						|
            # or 'matching_metadata': [{'key': 'key', 'value': 'value'}]
 | 
						|
        }
 | 
						|
        """
 | 
						|
 | 
						|
        if isinstance(alarm.get('rule'), dict):
 | 
						|
            return
 | 
						|
 | 
						|
        alarm['type'] = 'threshold'
 | 
						|
        alarm['rule'] = {}
 | 
						|
        alarm['matching_metadata'] = cls._decode_matching_metadata(
 | 
						|
            alarm['matching_metadata'])
 | 
						|
        for field in ['period', 'evaluation_periods', 'threshold',
 | 
						|
                      'statistic', 'comparison_operator', 'meter_name']:
 | 
						|
            if field in alarm:
 | 
						|
                alarm['rule'][field] = alarm[field]
 | 
						|
                del alarm[field]
 | 
						|
 | 
						|
        query = []
 | 
						|
        for key in alarm['matching_metadata']:
 | 
						|
            query.append({'field': key,
 | 
						|
                          'op': 'eq',
 | 
						|
                          'value': alarm['matching_metadata'][key],
 | 
						|
                          'type': 'string'})
 | 
						|
        del alarm['matching_metadata']
 | 
						|
        alarm['rule']['query'] = query
 | 
						|
 | 
						|
    def get_alarms(self, name=None, user=None,
 | 
						|
                   project=None, enabled=None, alarm_id=None, pagination=None):
 | 
						|
        """Yields a lists of alarms that match filters
 | 
						|
        :param name: The Alarm name.
 | 
						|
        :param user: Optional ID for user that owns the resource.
 | 
						|
        :param project: Optional ID for project that owns the resource.
 | 
						|
        :param enabled: Optional boolean to list disable alarm.
 | 
						|
        :param alarm_id: Optional alarm_id to return one alarm.
 | 
						|
        :param pagination: Optional pagination query.
 | 
						|
        """
 | 
						|
        if pagination:
 | 
						|
            raise NotImplementedError(_('Pagination not implemented'))
 | 
						|
 | 
						|
        q = {}
 | 
						|
        if user is not None:
 | 
						|
            q['user_id'] = user
 | 
						|
        if project is not None:
 | 
						|
            q['project_id'] = project
 | 
						|
        if name is not None:
 | 
						|
            q['name'] = name
 | 
						|
        if enabled is not None:
 | 
						|
            q['enabled'] = enabled
 | 
						|
        if alarm_id is not None:
 | 
						|
            q['alarm_id'] = alarm_id
 | 
						|
 | 
						|
        for alarm in self.db.alarm.find(q):
 | 
						|
            a = {}
 | 
						|
            a.update(alarm)
 | 
						|
            del a['_id']
 | 
						|
            self._ensure_encapsulated_rule_format(a)
 | 
						|
            yield models.Alarm(**a)
 | 
						|
 | 
						|
    def update_alarm(self, alarm):
 | 
						|
        """update alarm
 | 
						|
        """
 | 
						|
        data = alarm.as_dict()
 | 
						|
 | 
						|
        self.db.alarm.update(
 | 
						|
            {'alarm_id': alarm.alarm_id},
 | 
						|
            {'$set': data},
 | 
						|
            upsert=True)
 | 
						|
 | 
						|
        stored_alarm = self.db.alarm.find({'alarm_id': alarm.alarm_id})[0]
 | 
						|
        del stored_alarm['_id']
 | 
						|
        self._ensure_encapsulated_rule_format(stored_alarm)
 | 
						|
        return models.Alarm(**stored_alarm)
 | 
						|
 | 
						|
    create_alarm = update_alarm
 | 
						|
 | 
						|
    def delete_alarm(self, alarm_id):
 | 
						|
        """Delete a alarm
 | 
						|
        """
 | 
						|
        self.db.alarm.remove({'alarm_id': alarm_id})
 | 
						|
 | 
						|
    def get_alarm_changes(self, alarm_id, on_behalf_of,
 | 
						|
                          user=None, project=None, type=None,
 | 
						|
                          start_timestamp=None, start_timestamp_op=None,
 | 
						|
                          end_timestamp=None, end_timestamp_op=None):
 | 
						|
        """Yields list of AlarmChanges describing alarm history
 | 
						|
 | 
						|
        Changes are always sorted in reverse order of occurence, given
 | 
						|
        the importance of currency.
 | 
						|
 | 
						|
        Segregation for non-administrative users is done on the basis
 | 
						|
        of the on_behalf_of parameter. This allows such users to have
 | 
						|
        visibility on both the changes initiated by themselves directly
 | 
						|
        (generally creation, rule changes, or deletion) and also on those
 | 
						|
        changes initiated on their behalf by the alarming service (state
 | 
						|
        transitions after alarm thresholds are crossed).
 | 
						|
 | 
						|
        :param alarm_id: ID of alarm to return changes for
 | 
						|
        :param on_behalf_of: ID of tenant to scope changes query (None for
 | 
						|
                             administrative user, indicating all projects)
 | 
						|
        :param user: Optional ID of user to return changes for
 | 
						|
        :param project: Optional ID of project to return changes for
 | 
						|
        :project type: Optional change type
 | 
						|
        :param start_timestamp: Optional modified timestamp start range
 | 
						|
        :param start_timestamp_op: Optional timestamp start range operation
 | 
						|
        :param end_timestamp: Optional modified timestamp end range
 | 
						|
        :param end_timestamp_op: Optional timestamp end range operation
 | 
						|
        """
 | 
						|
        q = dict(alarm_id=alarm_id)
 | 
						|
        if on_behalf_of is not None:
 | 
						|
            q['on_behalf_of'] = on_behalf_of
 | 
						|
        if user is not None:
 | 
						|
            q['user_id'] = user
 | 
						|
        if project is not None:
 | 
						|
            q['project_id'] = project
 | 
						|
        if type is not None:
 | 
						|
            q['type'] = type
 | 
						|
        if start_timestamp or end_timestamp:
 | 
						|
            ts_range = make_timestamp_range(start_timestamp, end_timestamp,
 | 
						|
                                            start_timestamp_op,
 | 
						|
                                            end_timestamp_op)
 | 
						|
            if ts_range:
 | 
						|
                q['timestamp'] = ts_range
 | 
						|
 | 
						|
        sort = [("timestamp", pymongo.DESCENDING)]
 | 
						|
        for alarm_change in self.db.alarm_history.find(q, sort=sort):
 | 
						|
            ac = {}
 | 
						|
            ac.update(alarm_change)
 | 
						|
            del ac['_id']
 | 
						|
            yield models.AlarmChange(**ac)
 | 
						|
 | 
						|
    def record_alarm_change(self, alarm_change):
 | 
						|
        """Record alarm change event.
 | 
						|
        """
 | 
						|
        self.db.alarm_history.insert(alarm_change)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def record_events(events):
 | 
						|
        """Write the events.
 | 
						|
 | 
						|
        :param events: a list of model.Event objects.
 | 
						|
        """
 | 
						|
        raise NotImplementedError('Events not implemented.')
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def get_events(event_filter):
 | 
						|
        """Return an iterable of model.Event objects.
 | 
						|
 | 
						|
        :param event_filter: EventFilter instance
 | 
						|
        """
 | 
						|
        raise NotImplementedError('Events not implemented.')
 |