Sort the statistics by the period start date before returning them, and restore the test that verifies this behavior. bug 1151345 Change-Id: I4f05ea049b7609cea9dab947738b7aabf3f062ef Signed-off-by: Doug Hellmann <doug.hellmann@dreamhost.com>
		
			
				
	
	
		
			596 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			596 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- encoding: utf-8 -*-
 | 
						|
#
 | 
						|
# Copyright © 2012 New Dream Network, LLC (DreamHost)
 | 
						|
#
 | 
						|
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
						|
# not use this file except in compliance with the License. You may obtain
 | 
						|
# a copy of the License at
 | 
						|
#
 | 
						|
#      http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
						|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
						|
# License for the specific language governing permissions and limitations
 | 
						|
# under the License.
 | 
						|
"""MongoDB storage backend
 | 
						|
"""
 | 
						|
 | 
						|
import copy
 | 
						|
import datetime
 | 
						|
import operator
 | 
						|
import re
 | 
						|
import urlparse
 | 
						|
 | 
						|
import bson.code
 | 
						|
import pymongo
 | 
						|
 | 
						|
from ceilometer.openstack.common import log
 | 
						|
from ceilometer.storage import base
 | 
						|
 | 
						|
 | 
						|
LOG = log.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
class MongoDBStorage(base.StorageEngine):
 | 
						|
    """Put the data into a MongoDB database
 | 
						|
 | 
						|
    Collections::
 | 
						|
 | 
						|
        - user
 | 
						|
          - { _id: user id
 | 
						|
              source: [ array of source ids reporting for the user ]
 | 
						|
              }
 | 
						|
        - project
 | 
						|
          - { _id: project id
 | 
						|
              source: [ array of source ids reporting for the project ]
 | 
						|
              }
 | 
						|
        - meter
 | 
						|
          - the raw incoming data
 | 
						|
        - resource
 | 
						|
          - the metadata for resources
 | 
						|
          - { _id: uuid of resource,
 | 
						|
              metadata: metadata dictionaries
 | 
						|
              timestamp: datetime of last update
 | 
						|
              user_id: uuid
 | 
						|
              project_id: uuid
 | 
						|
              meter: [ array of {counter_name: string, counter_type: string,
 | 
						|
                                 counter_unit: string} ]
 | 
						|
            }
 | 
						|
    """
 | 
						|
 | 
						|
    OPTIONS = []
 | 
						|
 | 
						|
    def register_opts(self, conf):
 | 
						|
        """Register any configuration options used by this engine.
 | 
						|
        """
 | 
						|
        conf.register_opts(self.OPTIONS)
 | 
						|
 | 
						|
    def get_connection(self, conf):
 | 
						|
        """Return a Connection instance based on the configuration settings.
 | 
						|
        """
 | 
						|
        return Connection(conf)
 | 
						|
 | 
						|
 | 
						|
def make_timestamp_range(start, end):
 | 
						|
    """Given two possible datetimes, create the query
 | 
						|
    document to find timestamps within that range
 | 
						|
    using $gte for the lower bound and $lt for the
 | 
						|
    upper bound.
 | 
						|
    """
 | 
						|
    ts_range = {}
 | 
						|
    if start:
 | 
						|
        ts_range['$gte'] = start
 | 
						|
    if end:
 | 
						|
        ts_range['$lt'] = end
 | 
						|
    return ts_range
 | 
						|
 | 
						|
 | 
						|
def make_query_from_filter(event_filter, require_meter=True):
 | 
						|
    """Return a query dictionary based on the settings in the filter.
 | 
						|
 | 
						|
    :param filter: EventFilter instance
 | 
						|
    :param require_meter: If true and the filter does not have a meter,
 | 
						|
                          raise an error.
 | 
						|
    """
 | 
						|
    q = {}
 | 
						|
 | 
						|
    if event_filter.user:
 | 
						|
        q['user_id'] = event_filter.user
 | 
						|
    if event_filter.project:
 | 
						|
        q['project_id'] = event_filter.project
 | 
						|
 | 
						|
    if event_filter.meter:
 | 
						|
        q['counter_name'] = event_filter.meter
 | 
						|
    elif require_meter:
 | 
						|
        raise RuntimeError('Missing required meter specifier')
 | 
						|
 | 
						|
    ts_range = make_timestamp_range(event_filter.start, event_filter.end)
 | 
						|
    if ts_range:
 | 
						|
        q['timestamp'] = ts_range
 | 
						|
 | 
						|
    if event_filter.resource:
 | 
						|
        q['resource_id'] = event_filter.resource
 | 
						|
    if event_filter.source:
 | 
						|
        q['source'] = event_filter.source
 | 
						|
 | 
						|
    # so the events call metadata resource_metadata, so we convert
 | 
						|
    # to that.
 | 
						|
    q.update(dict(('resource_%s' % k, v)
 | 
						|
                  for (k, v) in event_filter.metaquery.iteritems()))
 | 
						|
    return q
 | 
						|
 | 
						|
 | 
						|
class Connection(base.Connection):
 | 
						|
    """MongoDB connection.
 | 
						|
    """
 | 
						|
 | 
						|
    # JavaScript function for doing map-reduce to get a counter volume
 | 
						|
    # total.
 | 
						|
    MAP_COUNTER_VOLUME = bson.code.Code("""
 | 
						|
        function() {
 | 
						|
            emit(this.resource_id, this.counter_volume);
 | 
						|
        }
 | 
						|
        """)
 | 
						|
 | 
						|
    # JavaScript function for doing map-reduce to get a maximum value
 | 
						|
    # from a range.  (from
 | 
						|
    # http://cookbook.mongodb.org/patterns/finding_max_and_min/)
 | 
						|
    REDUCE_MAX = bson.code.Code("""
 | 
						|
        function (key, values) {
 | 
						|
            return Math.max.apply(Math, values);
 | 
						|
        }
 | 
						|
        """)
 | 
						|
 | 
						|
    # JavaScript function for doing map-reduce to get a sum.
 | 
						|
    REDUCE_SUM = bson.code.Code("""
 | 
						|
        function (key, values) {
 | 
						|
            var total = 0;
 | 
						|
            for (var i = 0; i < values.length; i++) {
 | 
						|
                total += values[i];
 | 
						|
            }
 | 
						|
            return total;
 | 
						|
        }
 | 
						|
        """)
 | 
						|
 | 
						|
    # MAP_TIMESTAMP and REDUCE_MIN_MAX are based on the recipe
 | 
						|
    # http://cookbook.mongodb.org/patterns/finding_max_and_min_values_for_a_key
 | 
						|
    MAP_TIMESTAMP = bson.code.Code("""
 | 
						|
    function () {
 | 
						|
        emit('timestamp', { min : this.timestamp,
 | 
						|
                            max : this.timestamp } )
 | 
						|
    }
 | 
						|
    """)
 | 
						|
 | 
						|
    REDUCE_MIN_MAX = bson.code.Code("""
 | 
						|
    function (key, values) {
 | 
						|
        var res = values[0];
 | 
						|
        for ( var i=1; i<values.length; i++ ) {
 | 
						|
            if ( values[i].min < res.min )
 | 
						|
               res.min = values[i].min;
 | 
						|
            if ( values[i].max > res.max )
 | 
						|
               res.max = values[i].max;
 | 
						|
        }
 | 
						|
        return res;
 | 
						|
    }
 | 
						|
    """)
 | 
						|
 | 
						|
    MAP_STATS = bson.code.Code("""
 | 
						|
    function () {
 | 
						|
        emit('statistics', { min : this.counter_volume,
 | 
						|
                             max : this.counter_volume,
 | 
						|
                             sum : this.counter_volume,
 | 
						|
                             count : NumberInt(1),
 | 
						|
                             duration_start : this.timestamp,
 | 
						|
                             duration_end : this.timestamp,
 | 
						|
                             period_start : this.timestamp,
 | 
						|
                             period_end : this.timestamp} )
 | 
						|
    }
 | 
						|
    """)
 | 
						|
 | 
						|
    MAP_STATS_PERIOD = bson.code.Code("""
 | 
						|
    function () {
 | 
						|
        var period = %d * 1000;
 | 
						|
        var period_first = %d * 1000;
 | 
						|
        var period_start = period_first
 | 
						|
                           + (Math.floor(new Date(this.timestamp.getTime()
 | 
						|
                                         - period_first) / period)
 | 
						|
                              * period);
 | 
						|
        emit(period_start,
 | 
						|
             { min : this.counter_volume,
 | 
						|
               max : this.counter_volume,
 | 
						|
               sum : this.counter_volume,
 | 
						|
               count : NumberInt(1),
 | 
						|
               duration_start : this.timestamp,
 | 
						|
               duration_end : this.timestamp,
 | 
						|
               period_start : new Date(period_start),
 | 
						|
               period_end : new Date(period_start + period) } )
 | 
						|
    }
 | 
						|
    """)
 | 
						|
 | 
						|
    REDUCE_STATS = bson.code.Code("""
 | 
						|
    function (key, values) {
 | 
						|
        var res = values[0];
 | 
						|
        for ( var i=1; i<values.length; i++ ) {
 | 
						|
            if ( values[i].min < res.min )
 | 
						|
               res.min = values[i].min;
 | 
						|
            if ( values[i].max > res.max )
 | 
						|
               res.max = values[i].max;
 | 
						|
            res.count += values[i].count;
 | 
						|
            res.sum += values[i].sum;
 | 
						|
            if ( values[i].duration_start < res.duration_start )
 | 
						|
               res.duration_start = values[i].duration_start;
 | 
						|
            if ( values[i].duration_end > res.duration_end )
 | 
						|
               res.duration_end = values[i].duration_end;
 | 
						|
        }
 | 
						|
        return res;
 | 
						|
    }
 | 
						|
    """)
 | 
						|
 | 
						|
    FINALIZE_STATS = bson.code.Code("""
 | 
						|
    function (key, value) {
 | 
						|
        value.avg = value.sum / value.count;
 | 
						|
        value.duration = (value.duration_end - value.duration_start) / 1000;
 | 
						|
        value.period = NumberInt((value.period_end - value.period_start)
 | 
						|
                                  / 1000);
 | 
						|
        return value;
 | 
						|
    }""")
 | 
						|
 | 
						|
    def __init__(self, conf):
 | 
						|
        opts = self._parse_connection_url(conf.database_connection)
 | 
						|
        LOG.info('connecting to MongoDB on %s:%s', opts['host'], opts['port'])
 | 
						|
        self.conn = self._get_connection(opts)
 | 
						|
        self.db = getattr(self.conn, opts['dbname'])
 | 
						|
        if 'username' in opts:
 | 
						|
            self.db.authenticate(opts['username'], opts['password'])
 | 
						|
 | 
						|
        # Establish indexes
 | 
						|
        #
 | 
						|
        # We need variations for user_id vs. project_id because of the
 | 
						|
        # way the indexes are stored in b-trees. The user_id and
 | 
						|
        # project_id values are usually mutually exclusive in the
 | 
						|
        # queries, so the database won't take advantage of an index
 | 
						|
        # including both.
 | 
						|
        for primary in ['user_id', 'project_id']:
 | 
						|
            self.db.resource.ensure_index([
 | 
						|
                (primary, pymongo.ASCENDING),
 | 
						|
                ('source', pymongo.ASCENDING),
 | 
						|
            ], name='resource_idx')
 | 
						|
            self.db.meter.ensure_index([
 | 
						|
                ('resource_id', pymongo.ASCENDING),
 | 
						|
                (primary, pymongo.ASCENDING),
 | 
						|
                ('counter_name', pymongo.ASCENDING),
 | 
						|
                ('timestamp', pymongo.ASCENDING),
 | 
						|
                ('source', pymongo.ASCENDING),
 | 
						|
            ], name='meter_idx')
 | 
						|
 | 
						|
    def upgrade(self, version=None):
 | 
						|
        pass
 | 
						|
 | 
						|
    def clear(self):
 | 
						|
        self.conn.drop_database(self.db)
 | 
						|
 | 
						|
    def _get_connection(self, opts):
 | 
						|
        """Return a connection to the database.
 | 
						|
 | 
						|
        .. note::
 | 
						|
 | 
						|
          The tests use a subclass to override this and return an
 | 
						|
          in-memory connection.
 | 
						|
        """
 | 
						|
        return pymongo.Connection(opts['host'], opts['port'], safe=True)
 | 
						|
 | 
						|
    def _parse_connection_url(self, url):
 | 
						|
        opts = {}
 | 
						|
        result = urlparse.urlparse(url)
 | 
						|
        opts['dbtype'] = result.scheme
 | 
						|
        opts['dbname'] = result.path.replace('/', '')
 | 
						|
        netloc_match = re.match(r'(?:(\w+:\w+)@)?(.*)', result.netloc)
 | 
						|
        auth = netloc_match.group(1)
 | 
						|
        netloc = netloc_match.group(2)
 | 
						|
        if auth:
 | 
						|
            opts['username'], opts['password'] = auth.split(':')
 | 
						|
        if ':' in netloc:
 | 
						|
            opts['host'], port = netloc.split(':')
 | 
						|
        else:
 | 
						|
            opts['host'] = netloc
 | 
						|
            port = 27017
 | 
						|
        opts['port'] = port and int(port) or 27017
 | 
						|
        return opts
 | 
						|
 | 
						|
    def record_metering_data(self, data):
 | 
						|
        """Write the data to the backend storage system.
 | 
						|
 | 
						|
        :param data: a dictionary such as returned by
 | 
						|
                     ceilometer.meter.meter_message_from_counter
 | 
						|
        """
 | 
						|
        # Make sure we know about the user and project
 | 
						|
        self.db.user.update(
 | 
						|
            {'_id': data['user_id']},
 | 
						|
            {'$addToSet': {'source': data['source'],
 | 
						|
                           },
 | 
						|
             },
 | 
						|
            upsert=True,
 | 
						|
        )
 | 
						|
        self.db.project.update(
 | 
						|
            {'_id': data['project_id']},
 | 
						|
            {'$addToSet': {'source': data['source'],
 | 
						|
                           },
 | 
						|
             },
 | 
						|
            upsert=True,
 | 
						|
        )
 | 
						|
 | 
						|
        # Record the updated resource metadata
 | 
						|
        received_timestamp = datetime.datetime.utcnow()
 | 
						|
        self.db.resource.update(
 | 
						|
            {'_id': data['resource_id']},
 | 
						|
            {'$set': {'project_id': data['project_id'],
 | 
						|
                      'user_id': data['user_id'],
 | 
						|
                      # Current metadata being used and when it was
 | 
						|
                      # last updated.
 | 
						|
                      'timestamp': data['timestamp'],
 | 
						|
                      'received_timestamp': received_timestamp,
 | 
						|
                      'metadata': data['resource_metadata'],
 | 
						|
                      'source': data['source'],
 | 
						|
                      },
 | 
						|
             '$addToSet': {'meter': {'counter_name': data['counter_name'],
 | 
						|
                                     'counter_type': data['counter_type'],
 | 
						|
                                     'counter_unit': data['counter_unit'],
 | 
						|
                                     },
 | 
						|
                           },
 | 
						|
             },
 | 
						|
            upsert=True,
 | 
						|
        )
 | 
						|
 | 
						|
        # Record the raw data for the event. Use a copy so we do not
 | 
						|
        # modify a data structure owned by our caller (the driver adds
 | 
						|
        # a new key '_id').
 | 
						|
        record = copy.copy(data)
 | 
						|
        self.db.meter.insert(record)
 | 
						|
        return
 | 
						|
 | 
						|
    def get_users(self, source=None):
 | 
						|
        """Return an iterable of user id strings.
 | 
						|
 | 
						|
        :param source: Optional source filter.
 | 
						|
        """
 | 
						|
        q = {}
 | 
						|
        if source is not None:
 | 
						|
            q['source'] = source
 | 
						|
        return sorted(self.db.user.find(q).distinct('_id'))
 | 
						|
 | 
						|
    def get_projects(self, source=None):
 | 
						|
        """Return an iterable of project id strings.
 | 
						|
 | 
						|
        :param source: Optional source filter.
 | 
						|
        """
 | 
						|
        q = {}
 | 
						|
        if source is not None:
 | 
						|
            q['source'] = source
 | 
						|
        return sorted(self.db.project.find(q).distinct('_id'))
 | 
						|
 | 
						|
    def get_resources(self, user=None, project=None, source=None,
 | 
						|
                      start_timestamp=None, end_timestamp=None,
 | 
						|
                      metaquery={}, resource=None):
 | 
						|
        """Return an iterable of dictionaries containing resource information.
 | 
						|
 | 
						|
        { 'resource_id': UUID of the resource,
 | 
						|
          'project_id': UUID of project owning the resource,
 | 
						|
          'user_id': UUID of user owning the resource,
 | 
						|
          'timestamp': UTC datetime of last update to the resource,
 | 
						|
          'metadata': most current metadata for the resource,
 | 
						|
          'meter': list of the meters reporting data for the resource,
 | 
						|
          }
 | 
						|
 | 
						|
        :param user: Optional ID for user that owns the resource.
 | 
						|
        :param project: Optional ID for project that owns the resource.
 | 
						|
        :param source: Optional source filter.
 | 
						|
        :param start_timestamp: Optional modified timestamp start range.
 | 
						|
        :param end_timestamp: Optional modified timestamp end range.
 | 
						|
        :param metaquery: Optional dict with metadata to match on.
 | 
						|
        :param resource: Optional resource filter.
 | 
						|
        """
 | 
						|
        q = {}
 | 
						|
        if user is not None:
 | 
						|
            q['user_id'] = user
 | 
						|
        if project is not None:
 | 
						|
            q['project_id'] = project
 | 
						|
        if source is not None:
 | 
						|
            q['source'] = source
 | 
						|
        if resource is not None:
 | 
						|
            q['_id'] = resource
 | 
						|
        q.update(metaquery)
 | 
						|
 | 
						|
        # FIXME(dhellmann): This may not perform very well,
 | 
						|
        # but doing any better will require changing the database
 | 
						|
        # schema and that will need more thought than I have time
 | 
						|
        # to put into it today.
 | 
						|
        if start_timestamp or end_timestamp:
 | 
						|
            # Look for resources matching the above criteria and with
 | 
						|
            # events in the time range we care about, then change the
 | 
						|
            # resource query to return just those resources by id.
 | 
						|
            ts_range = make_timestamp_range(start_timestamp, end_timestamp)
 | 
						|
            if ts_range:
 | 
						|
                q['timestamp'] = ts_range
 | 
						|
            resource_ids = self.db.meter.find(q).distinct('resource_id')
 | 
						|
            # Overwrite the query to just filter on the ids
 | 
						|
            # we have discovered to be interesting.
 | 
						|
            q = {'_id': {'$in': resource_ids}}
 | 
						|
        for resource in self.db.resource.find(q):
 | 
						|
            r = {}
 | 
						|
            r.update(resource)
 | 
						|
            # Replace the '_id' key with 'resource_id' to meet the
 | 
						|
            # caller's expectations.
 | 
						|
            r['resource_id'] = r['_id']
 | 
						|
            del r['_id']
 | 
						|
            yield r
 | 
						|
 | 
						|
    def get_meters(self, user=None, project=None, resource=None, source=None,
 | 
						|
                   metaquery={}):
 | 
						|
        """Return an iterable of dictionaries containing meter information.
 | 
						|
 | 
						|
        { 'name': name of the meter,
 | 
						|
          'type': type of the meter (guage, counter),
 | 
						|
          'unit': unit of the meter,
 | 
						|
          'resource_id': UUID of the resource,
 | 
						|
          'project_id': UUID of project owning the resource,
 | 
						|
          'user_id': UUID of user owning the resource,
 | 
						|
          }
 | 
						|
 | 
						|
        :param user: Optional ID for user that owns the resource.
 | 
						|
        :param project: Optional ID for project that owns the resource.
 | 
						|
        :param resource: Optional resource filter.
 | 
						|
        :param source: Optional source filter.
 | 
						|
        :param metaquery: Optional dict with metadata to match on.
 | 
						|
        """
 | 
						|
        q = {}
 | 
						|
        if user is not None:
 | 
						|
            q['user_id'] = user
 | 
						|
        if project is not None:
 | 
						|
            q['project_id'] = project
 | 
						|
        if resource is not None:
 | 
						|
            q['_id'] = resource
 | 
						|
        if source is not None:
 | 
						|
            q['source'] = source
 | 
						|
        q.update(metaquery)
 | 
						|
 | 
						|
        for r in self.db.resource.find(q):
 | 
						|
            for r_meter in r['meter']:
 | 
						|
                m = {}
 | 
						|
                m['name'] = r_meter['counter_name']
 | 
						|
                m['type'] = r_meter['counter_type']
 | 
						|
                # Return empty string if 'counter_unit' is not valid for
 | 
						|
                # backward compaitiblity.
 | 
						|
                m['unit'] = r_meter.get('counter_unit', '')
 | 
						|
                m['resource_id'] = r['_id']
 | 
						|
                m['project_id'] = r['project_id']
 | 
						|
                m['user_id'] = r['user_id']
 | 
						|
                yield m
 | 
						|
 | 
						|
    def get_raw_events(self, event_filter):
 | 
						|
        """Return an iterable of raw event data as created by
 | 
						|
        :func:`ceilometer.meter.meter_message_from_counter`.
 | 
						|
        """
 | 
						|
        q = make_query_from_filter(event_filter, require_meter=False)
 | 
						|
        events = self.db.meter.find(q)
 | 
						|
        for e in events:
 | 
						|
            # Remove the ObjectId generated by the database when
 | 
						|
            # the event was inserted. It is an implementation
 | 
						|
            # detail that should not leak outside of the driver.
 | 
						|
            del e['_id']
 | 
						|
            yield e
 | 
						|
 | 
						|
    def get_meter_statistics(self, event_filter, period=None):
 | 
						|
        """Return a dictionary containing meter statistics.
 | 
						|
        described by the query parameters.
 | 
						|
 | 
						|
        The filter must have a meter value set.
 | 
						|
 | 
						|
        { 'min':
 | 
						|
          'max':
 | 
						|
          'avg':
 | 
						|
          'sum':
 | 
						|
          'count':
 | 
						|
          'period':
 | 
						|
          'period_start':
 | 
						|
          'period_end':
 | 
						|
          'duration':
 | 
						|
          'duration_start':
 | 
						|
          'duration_end':
 | 
						|
          }
 | 
						|
 | 
						|
        """
 | 
						|
        q = make_query_from_filter(event_filter)
 | 
						|
 | 
						|
        if period:
 | 
						|
            map_stats = self.MAP_STATS_PERIOD % \
 | 
						|
                (period,
 | 
						|
                 int(event_filter.start.strftime('%s'))
 | 
						|
                 if event_filter.start else 0)
 | 
						|
        else:
 | 
						|
            map_stats = self.MAP_STATS
 | 
						|
 | 
						|
        results = self.db.meter.map_reduce(
 | 
						|
            map_stats,
 | 
						|
            self.REDUCE_STATS,
 | 
						|
            {'inline': 1},
 | 
						|
            finalize=self.FINALIZE_STATS,
 | 
						|
            query=q,
 | 
						|
        )
 | 
						|
 | 
						|
        return sorted((r['value'] for r in results['results']),
 | 
						|
                      key=operator.itemgetter('period_start'))
 | 
						|
 | 
						|
    def get_volume_sum(self, event_filter):
 | 
						|
        """Return the sum of the volume field for the events
 | 
						|
        described by the query parameters.
 | 
						|
        """
 | 
						|
        q = make_query_from_filter(event_filter)
 | 
						|
        results = self.db.meter.map_reduce(self.MAP_COUNTER_VOLUME,
 | 
						|
                                           self.REDUCE_SUM,
 | 
						|
                                           {'inline': 1},
 | 
						|
                                           query=q,
 | 
						|
                                           )
 | 
						|
        return ({'resource_id': r['_id'], 'value': r['value']}
 | 
						|
                for r in results['results'])
 | 
						|
 | 
						|
    def get_volume_max(self, event_filter):
 | 
						|
        """Return the maximum of the volume field for the events
 | 
						|
        described by the query parameters.
 | 
						|
        """
 | 
						|
        q = make_query_from_filter(event_filter)
 | 
						|
        results = self.db.meter.map_reduce(self.MAP_COUNTER_VOLUME,
 | 
						|
                                           self.REDUCE_MAX,
 | 
						|
                                           {'inline': 1},
 | 
						|
                                           query=q,
 | 
						|
                                           )
 | 
						|
        return ({'resource_id': r['_id'], 'value': r['value']}
 | 
						|
                for r in results['results'])
 | 
						|
 | 
						|
    def _fix_interval_min_max(self, a_min, a_max):
 | 
						|
        if hasattr(a_min, 'valueOf') and a_min.valueOf is not None:
 | 
						|
            # NOTE (dhellmann): HACK ALERT
 | 
						|
            #
 | 
						|
            # The real MongoDB server can handle Date objects and
 | 
						|
            # the driver converts them to datetime instances
 | 
						|
            # correctly but the in-memory implementation in MIM
 | 
						|
            # (used by the tests) returns a spidermonkey.Object
 | 
						|
            # representing the "value" dictionary and there
 | 
						|
            # doesn't seem to be a way to recursively introspect
 | 
						|
            # that object safely to convert the min and max values
 | 
						|
            # back to datetime objects. In this method, we know
 | 
						|
            # what type the min and max values are expected to be,
 | 
						|
            # so it is safe to do the conversion
 | 
						|
            # here. JavaScript's time representation uses
 | 
						|
            # different units than Python's, so we divide to
 | 
						|
            # convert to the right units and then create the
 | 
						|
            # datetime instances to return.
 | 
						|
            #
 | 
						|
            # The issue with MIM is documented at
 | 
						|
            # https://sourceforge.net/p/merciless/bugs/3/
 | 
						|
            #
 | 
						|
            a_min = datetime.datetime.fromtimestamp(
 | 
						|
                a_min.valueOf() // 1000)
 | 
						|
            a_max = datetime.datetime.fromtimestamp(
 | 
						|
                a_max.valueOf() // 1000)
 | 
						|
        return (a_min, a_max)
 | 
						|
 | 
						|
    def get_event_interval(self, event_filter):
 | 
						|
        """Return the min and max timestamps from events,
 | 
						|
        using the event_filter to limit the events seen.
 | 
						|
 | 
						|
        ( datetime.datetime(), datetime.datetime() )
 | 
						|
        """
 | 
						|
        q = make_query_from_filter(event_filter)
 | 
						|
        results = self.db.meter.map_reduce(self.MAP_TIMESTAMP,
 | 
						|
                                           self.REDUCE_MIN_MAX,
 | 
						|
                                           {'inline': 1},
 | 
						|
                                           query=q,
 | 
						|
                                           )
 | 
						|
        if results['results']:
 | 
						|
            answer = results['results'][0]['value']
 | 
						|
            return self._fix_interval_min_max(answer['min'], answer['max'])
 | 
						|
        return (None, None)
 |