Merge "implement sqlalchemy dbengine backend"
This commit is contained in:
commit
dc466535d4
298
ceilometer/storage/impl_sqlalchemy.py
Normal file
298
ceilometer/storage/impl_sqlalchemy.py
Normal file
@ -0,0 +1,298 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Author: John Tran <jhtran@att.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""SQLAlchemy storage backend
|
||||
"""
|
||||
|
||||
import copy
|
||||
import datetime
|
||||
|
||||
from ceilometer.openstack.common import log
|
||||
from ceilometer.openstack.common import cfg
|
||||
from ceilometer.storage import base
|
||||
from ceilometer.storage.sqlalchemy.models import Meter, Project, Resource
|
||||
from ceilometer.storage.sqlalchemy.models import Source, User
|
||||
from ceilometer.storage.sqlalchemy.session import get_session
|
||||
import ceilometer.storage.sqlalchemy.session as session
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class SQLAlchemyStorage(base.StorageEngine):
|
||||
"""Put the data into a SQLAlchemy database
|
||||
|
||||
Tables:
|
||||
|
||||
- user
|
||||
- { _id: user id
|
||||
source: [ array of source ids reporting for the user ]
|
||||
}
|
||||
- project
|
||||
- { _id: project id
|
||||
source: [ array of source ids reporting for the project ]
|
||||
}
|
||||
- meter
|
||||
- the raw incoming data
|
||||
- resource
|
||||
- the metadata for resources
|
||||
- { _id: uuid of resource,
|
||||
metadata: metadata dictionaries
|
||||
timestamp: datetime of last update
|
||||
user_id: uuid
|
||||
project_id: uuid
|
||||
meter: [ array of {counter_name: string, counter_type: string} ]
|
||||
}
|
||||
"""
|
||||
|
||||
OPTIONS = []
|
||||
|
||||
def register_opts(self, conf):
|
||||
"""Register any configuration options used by this engine.
|
||||
"""
|
||||
conf.register_opts(self.OPTIONS)
|
||||
|
||||
def get_connection(self, conf):
|
||||
"""Return a Connection instance based on the configuration settings.
|
||||
"""
|
||||
return Connection(conf)
|
||||
|
||||
|
||||
def make_query_from_filter(query, event_filter, require_meter=True):
|
||||
"""Return a query dictionary based on the settings in the filter.
|
||||
|
||||
:param filter: EventFilter instance
|
||||
:param require_meter: If true and the filter does not have a meter,
|
||||
raise an error.
|
||||
"""
|
||||
|
||||
if event_filter.meter:
|
||||
query = query.filter(Meter.counter_name == event_filter.meter)
|
||||
elif require_meter:
|
||||
raise RuntimeError('Missing required meter specifier')
|
||||
if event_filter.source:
|
||||
query = query.filter_by(source=event_filter.source)
|
||||
if event_filter.start:
|
||||
query = query = query.filter(Meter.timestamp >= event_filter.start)
|
||||
if event_filter.end:
|
||||
query = query = query.filter(Meter.timestamp < event_filter.end)
|
||||
if event_filter.user:
|
||||
query = query.filter_by(user_id=event_filter.user)
|
||||
elif event_filter.project:
|
||||
query = query.filter_by(project_id=event_filter.project)
|
||||
if event_filter.resource:
|
||||
query = query.filter_by(resource_id=event_filter.resource)
|
||||
|
||||
return query
|
||||
|
||||
|
||||
class Connection(base.Connection):
|
||||
"""SqlAlchemy connection.
|
||||
"""
|
||||
|
||||
def __init__(self, conf):
|
||||
LOG.info('connecting to %s', conf.database_connection)
|
||||
self.session = self._get_connection(conf)
|
||||
return
|
||||
|
||||
def _get_connection(self, conf):
|
||||
"""Return a connection to the database.
|
||||
"""
|
||||
return session.get_session()
|
||||
|
||||
def record_metering_data(self, data):
|
||||
"""Write the data to the backend storage system.
|
||||
|
||||
:param data: a dictionary such as returned by
|
||||
ceilometer.meter.meter_message_from_counter
|
||||
"""
|
||||
if data['source']:
|
||||
source = self.session.query(Source).get(data['source'])
|
||||
if not source:
|
||||
source = Source(id=data['source'])
|
||||
self.session.add(source)
|
||||
else:
|
||||
source = None
|
||||
|
||||
# create/update user && project, add/update their sources list
|
||||
if data['user_id']:
|
||||
user = self.session.merge(User(id=data['user_id']))
|
||||
if not filter(lambda x: x.id == source.id, user.sources):
|
||||
user.sources.append(source)
|
||||
else:
|
||||
user = None
|
||||
|
||||
if data['project_id']:
|
||||
project = self.session.merge(Project(id=data['project_id']))
|
||||
if not filter(lambda x: x.id == source.id, project.sources):
|
||||
project.sources.append(source)
|
||||
else:
|
||||
project = None
|
||||
|
||||
# Record the updated resource metadata
|
||||
rtimestamp = datetime.datetime.utcnow()
|
||||
rmetadata = data['resource_metadata']
|
||||
|
||||
resource = self.session.merge(Resource(id=data['resource_id']))
|
||||
if not filter(lambda x: x.id == source.id, resource.sources):
|
||||
resource.sources.append(source)
|
||||
resource.project = project
|
||||
resource.user = user
|
||||
resource.timestamp = data['timestamp']
|
||||
resource.received_timestamp = rtimestamp
|
||||
# Current metadata being used and when it was last updated.
|
||||
resource.resource_metadata = rmetadata
|
||||
# autoflush didn't catch this one, requires manual flush
|
||||
self.session.flush()
|
||||
|
||||
# Record the raw data for the event.
|
||||
meter = Meter(counter_type=data['counter_type'],
|
||||
counter_name=data['counter_name'], resource=resource)
|
||||
self.session.add(meter)
|
||||
if not filter(lambda x: x.id == source.id, meter.sources):
|
||||
meter.sources.append(source)
|
||||
meter.project = project
|
||||
meter.user = user
|
||||
meter.timestamp = data['timestamp']
|
||||
meter.resource_metadata = rmetadata
|
||||
meter.counter_duration = data['counter_duration']
|
||||
meter.counter_volume = data['counter_volume']
|
||||
meter.message_signature = data['message_signature']
|
||||
meter.message_id = data['message_id']
|
||||
|
||||
return
|
||||
|
||||
def get_users(self, source=None):
|
||||
"""Return an iterable of user id strings.
|
||||
|
||||
:param source: Optional source filter.
|
||||
"""
|
||||
query = model_query(User.id, session=self.session)
|
||||
if source is not None:
|
||||
query = query.filter(User.sources.any(id=source))
|
||||
return (x[0] for x in query.all())
|
||||
|
||||
def get_projects(self, source=None):
|
||||
"""Return an iterable of project id strings.
|
||||
|
||||
:param source: Optional source filter.
|
||||
"""
|
||||
query = model_query(Project.id, session=self.session)
|
||||
if source:
|
||||
query = query.filter(Project.sources.any(id=source))
|
||||
return (x[0] for x in query.all())
|
||||
|
||||
def get_resources(self, user=None, project=None, source=None,
|
||||
start_timestamp=None, end_timestamp=None,
|
||||
session=None):
|
||||
"""Return an iterable of dictionaries containing resource information.
|
||||
|
||||
{ 'resource_id': UUID of the resource,
|
||||
'project_id': UUID of project owning the resource,
|
||||
'user_id': UUID of user owning the resource,
|
||||
'timestamp': UTC datetime of last update to the resource,
|
||||
'metadata': most current metadata for the resource,
|
||||
'meter': list of the meters reporting data for the resource,
|
||||
}
|
||||
|
||||
:param user: Optional ID for user that owns the resource.
|
||||
:param project: Optional ID for project that owns the resource.
|
||||
:param source: Optional source filter.
|
||||
:param start_timestamp: Optional modified timestamp start range.
|
||||
:param end_timestamp: Optional modified timestamp end range.
|
||||
"""
|
||||
query = model_query(Resource, session=session)
|
||||
if user is not None:
|
||||
query = query.filter(Resource.user_id == user)
|
||||
if source is not None:
|
||||
query = query.filter(Resource.sources.any(id=source))
|
||||
if start_timestamp is not None:
|
||||
query = query.filter(Resource.timestamp >= start_timestamp)
|
||||
if end_timestamp:
|
||||
query = query.filter(Resource.timestamp < end_timestamp)
|
||||
if project is not None:
|
||||
query = query.filter(Resource.project_id == project)
|
||||
|
||||
for resource in query.all():
|
||||
r = row2dict(resource)
|
||||
# Replace the '_id' key with 'resource_id' to meet the
|
||||
# caller's expectations.
|
||||
r['resource_id'] = r['id']
|
||||
del r['id']
|
||||
yield r
|
||||
|
||||
def get_raw_events(self, event_filter):
|
||||
"""Return an iterable of raw event data as created by
|
||||
:func:`ceilometer.meter.meter_message_from_counter`.
|
||||
"""
|
||||
query = model_query(Meter, session=self.session)
|
||||
query = make_query_from_filter(query, event_filter,
|
||||
require_meter=False)
|
||||
events = query.all()
|
||||
|
||||
for e in events:
|
||||
# Remove the id generated by the database when
|
||||
# the event was inserted. It is an implementation
|
||||
# detail that should not leak outside of the driver.
|
||||
e = row2dict(e)
|
||||
del e['id']
|
||||
yield e
|
||||
|
||||
def get_volume_sum(self, event_filter):
|
||||
# it isn't clear these are used
|
||||
pass
|
||||
|
||||
def get_volume_max(self, event_filter):
|
||||
# it isn't clear these are used
|
||||
pass
|
||||
|
||||
def get_event_interval(self, event_filter):
|
||||
"""Return the min and max timestamps from events,
|
||||
using the event_filter to limit the events seen.
|
||||
|
||||
( datetime.datetime(), datetime.datetime() )
|
||||
"""
|
||||
func = session.func()
|
||||
query = self.session.query(func.min(Meter.timestamp),
|
||||
func.max(Meter.timestamp))
|
||||
query = make_query_from_filter(query, event_filter)
|
||||
results = query.all()
|
||||
a_min, a_max = results[0]
|
||||
return (a_min, a_max)
|
||||
|
||||
|
||||
############################
|
||||
|
||||
|
||||
def model_query(*args, **kwargs):
|
||||
"""Query helper
|
||||
|
||||
:param session: if present, the session to use
|
||||
"""
|
||||
session = kwargs.get('session') or get_session()
|
||||
query = session.query(*args)
|
||||
return query
|
||||
|
||||
|
||||
def row2dict(row, srcflag=None):
|
||||
"""Convert User, Project, Meter, Resource instance to dictionary object
|
||||
with nested Source(s)
|
||||
"""
|
||||
d = copy.copy(row.__dict__)
|
||||
for col in ['_sa_instance_state', 'sources']:
|
||||
if col in d:
|
||||
del d[col]
|
||||
if not srcflag:
|
||||
d['sources'] = map(lambda x: row2dict(x, True), row.sources)
|
||||
return d
|
29
ceilometer/storage/migration.py
Normal file
29
ceilometer/storage/migration.py
Normal file
@ -0,0 +1,29 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Author: John Tran <jhtran@att.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Database setup and migration commands."""
|
||||
|
||||
import ceilometer.storage.sqlalchemy.migration as IMPL
|
||||
|
||||
|
||||
def db_sync(version=None):
|
||||
"""Migrate the database to `version` or the most recent version."""
|
||||
return IMPL.db_sync(version=version)
|
||||
|
||||
|
||||
def db_version():
|
||||
"""Display the current database version."""
|
||||
return IMPL.db_version()
|
0
ceilometer/storage/sqlalchemy/__init__.py
Normal file
0
ceilometer/storage/sqlalchemy/__init__.py
Normal file
4
ceilometer/storage/sqlalchemy/migrate_repo/README
Normal file
4
ceilometer/storage/sqlalchemy/migrate_repo/README
Normal file
@ -0,0 +1,4 @@
|
||||
This is a database migration repository.
|
||||
|
||||
More information at
|
||||
http://code.google.com/p/sqlalchemy-migrate/
|
1
ceilometer/storage/sqlalchemy/migrate_repo/__init__.py
Normal file
1
ceilometer/storage/sqlalchemy/migrate_repo/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
# template repository default module
|
5
ceilometer/storage/sqlalchemy/migrate_repo/manage.py
Normal file
5
ceilometer/storage/sqlalchemy/migrate_repo/manage.py
Normal file
@ -0,0 +1,5 @@
|
||||
#!/usr/bin/env python
|
||||
from migrate.versioning.shell import main
|
||||
|
||||
if __name__ == '__main__':
|
||||
main(debug='False')
|
25
ceilometer/storage/sqlalchemy/migrate_repo/migrate.cfg
Normal file
25
ceilometer/storage/sqlalchemy/migrate_repo/migrate.cfg
Normal file
@ -0,0 +1,25 @@
|
||||
[db_settings]
|
||||
# Used to identify which repository this database is versioned under.
|
||||
# You can use the name of your project.
|
||||
repository_id=ceilometer
|
||||
|
||||
# The name of the database table used to track the schema version.
|
||||
# This name shouldn't already be used by your project.
|
||||
# If this is changed once a database is under version control, you'll need to
|
||||
# change the table name in each database too.
|
||||
version_table=migrate_version
|
||||
|
||||
# When committing a change script, Migrate will attempt to generate the
|
||||
# sql for all supported databases; normally, if one of them fails - probably
|
||||
# because you don't have that database installed - it is ignored and the
|
||||
# commit continues, perhaps ending successfully.
|
||||
# Databases in this list MUST compile successfully during a commit, or the
|
||||
# entire commit will fail. List the databases your application will actually
|
||||
# be using to ensure your updates to that database work properly.
|
||||
# This must be a list; example: ['postgres','sqlite']
|
||||
required_dbs=[]
|
||||
|
||||
# When creating new change scripts, Migrate will stamp the new script with
|
||||
# a version number. By default this is latest_version + 1. You can set this
|
||||
# to 'true' to tell Migrate to use the UTC timestamp instead.
|
||||
use_timestamp_numbering=False
|
@ -0,0 +1,90 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Author: John Tran <jhtran@att.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from sqlalchemy import *
|
||||
from ceilometer.openstack.common import timeutils
|
||||
|
||||
meta = MetaData()
|
||||
|
||||
meter = Table(
|
||||
'meter', meta,
|
||||
Column('id', Integer, primary_key=True, index=True),
|
||||
Column('counter_name', String(255)),
|
||||
Column('user_id', String(255), index=True),
|
||||
Column('project_id', String(255), index=True),
|
||||
Column('resource_id', String(255)),
|
||||
Column('resource_metadata', String(5000)),
|
||||
Column('counter_type', String(255)),
|
||||
Column('counter_volume', Integer),
|
||||
Column('counter_duration', Integer),
|
||||
Column('timestamp', DateTime(timezone=False), index=True),
|
||||
Column('message_signature', String(1000)),
|
||||
Column('message_id', String(1000))
|
||||
)
|
||||
|
||||
resource = Table(
|
||||
'resource', meta,
|
||||
Column('id', String(255), primary_key=True, index=True),
|
||||
Column('resource_metadata', String(5000)),
|
||||
Column('project_id', String(255), index=True),
|
||||
Column('received_timestamp', DateTime(timezone=False)),
|
||||
Column('timestamp', DateTime(timezone=False), index=True),
|
||||
Column('user_id', String(255), index=True)
|
||||
)
|
||||
|
||||
user = Table(
|
||||
'user', meta,
|
||||
Column('id', String(255), primary_key=True, index=True),
|
||||
)
|
||||
|
||||
project = Table(
|
||||
'project', meta,
|
||||
Column('id', String(255), primary_key=True, index=True),
|
||||
)
|
||||
|
||||
sourceassoc = Table(
|
||||
'sourceassoc', meta,
|
||||
Column('source_id', String(255), index=True),
|
||||
Column('user_id', String(255)),
|
||||
Column('project_id', String(255)),
|
||||
Column('resource_id', String(255)),
|
||||
Column('meter_id', Integer),
|
||||
Index('idx_su', 'source_id', 'user_id'),
|
||||
Index('idx_sp', 'source_id', 'project_id'),
|
||||
Index('idx_sr', 'source_id', 'resource_id'),
|
||||
Index('idx_sm', 'source_id', 'meter_id')
|
||||
)
|
||||
|
||||
source = Table(
|
||||
'source', meta,
|
||||
Column('id', String(255), primary_key=True, index=True),
|
||||
UniqueConstraint('id')
|
||||
)
|
||||
|
||||
|
||||
tables = [meter, project, resource, user, source, sourceassoc]
|
||||
|
||||
|
||||
def upgrade(migrate_engine):
|
||||
meta.bind = migrate_engine
|
||||
for i in sorted(tables):
|
||||
i.create()
|
||||
|
||||
|
||||
def downgrade(migrate_engine):
|
||||
meta.bind = migrate_engine
|
||||
for i in sorted(tables, reverse=True):
|
||||
i.drop()
|
@ -0,0 +1 @@
|
||||
# template repository default versions module
|
106
ceilometer/storage/sqlalchemy/migration.py
Normal file
106
ceilometer/storage/sqlalchemy/migration.py
Normal file
@ -0,0 +1,106 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Author: John Tran <jhtran@att.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import distutils.version as dist_version
|
||||
import os
|
||||
|
||||
from ceilometer.storage.sqlalchemy.session import get_engine
|
||||
from ceilometer.openstack.common import log as logging
|
||||
|
||||
|
||||
import migrate
|
||||
from migrate.versioning import util as migrate_util
|
||||
import sqlalchemy
|
||||
|
||||
INIT_VERSION = 1
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@migrate_util.decorator
|
||||
def patched_with_engine(f, *a, **kw):
|
||||
url = a[0]
|
||||
engine = migrate_util.construct_engine(url, **kw)
|
||||
|
||||
try:
|
||||
kw['engine'] = engine
|
||||
return f(*a, **kw)
|
||||
finally:
|
||||
if isinstance(engine, migrate_util.Engine) and engine is not url:
|
||||
migrate_util.log.debug('Disposing SQLAlchemy engine %s', engine)
|
||||
engine.dispose()
|
||||
|
||||
|
||||
# TODO(jkoelker) When migrate 0.7.3 is released and nova depends
|
||||
# on that version or higher, this can be removed
|
||||
MIN_PKG_VERSION = dist_version.StrictVersion('0.7.3')
|
||||
if (not hasattr(migrate, '__version__') or
|
||||
dist_version.StrictVersion(migrate.__version__) < MIN_PKG_VERSION):
|
||||
migrate_util.with_engine = patched_with_engine
|
||||
|
||||
|
||||
# NOTE(jkoelker) Delay importing migrate until we are patched
|
||||
from migrate import exceptions as versioning_exceptions
|
||||
from migrate.versioning import api as versioning_api
|
||||
from migrate.versioning.repository import Repository
|
||||
|
||||
_REPOSITORY = None
|
||||
|
||||
|
||||
def db_sync(version=None):
|
||||
if version is not None:
|
||||
try:
|
||||
version = int(version)
|
||||
except ValueError:
|
||||
raise Exception(_("version should be an integer"))
|
||||
|
||||
current_version = db_version()
|
||||
repository = _find_migrate_repo()
|
||||
if version is None or version > current_version:
|
||||
return versioning_api.upgrade(get_engine(), repository, version)
|
||||
else:
|
||||
return versioning_api.downgrade(get_engine(), repository,
|
||||
version)
|
||||
|
||||
|
||||
def db_version():
|
||||
repository = _find_migrate_repo()
|
||||
try:
|
||||
return versioning_api.db_version(get_engine(), repository)
|
||||
except versioning_exceptions.DatabaseNotControlledError:
|
||||
meta = sqlalchemy.MetaData()
|
||||
engine = get_engine()
|
||||
meta.reflect(bind=engine)
|
||||
tables = meta.tables
|
||||
if len(tables) == 0:
|
||||
db_version_control(0)
|
||||
return versioning_api.db_version(get_engine(), repository)
|
||||
|
||||
|
||||
def db_version_control(version=None):
|
||||
repository = _find_migrate_repo()
|
||||
versioning_api.version_control(get_engine(), repository, version)
|
||||
return version
|
||||
|
||||
|
||||
def _find_migrate_repo():
|
||||
"""Get the path for the migrate repository."""
|
||||
global _REPOSITORY
|
||||
path = os.path.join(os.path.abspath(os.path.dirname(__file__)),
|
||||
'migrate_repo')
|
||||
assert os.path.exists(path)
|
||||
if _REPOSITORY is None:
|
||||
_REPOSITORY = Repository(path)
|
||||
return _REPOSITORY
|
141
ceilometer/storage/sqlalchemy/models.py
Normal file
141
ceilometer/storage/sqlalchemy/models.py
Normal file
@ -0,0 +1,141 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Author: John Tran <jhtran@att.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
SQLAlchemy models for nova data.
|
||||
"""
|
||||
|
||||
import json
|
||||
from sqlalchemy import Column, Integer, String, Table
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy import ForeignKey, DateTime
|
||||
from sqlalchemy.orm import relationship, backref
|
||||
from sqlalchemy.types import TypeDecorator, VARCHAR
|
||||
from urlparse import urlparse
|
||||
|
||||
import ceilometer.openstack.common.cfg as cfg
|
||||
from ceilometer.openstack.common import timeutils
|
||||
|
||||
sql_opts = [
|
||||
cfg.IntOpt('mysql_engine',
|
||||
default='InnoDB',
|
||||
help='MySQL engine')
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(sql_opts)
|
||||
|
||||
|
||||
def table_args():
|
||||
engine_name = urlparse(cfg.CONF.database_connection).scheme
|
||||
if engine_name == 'mysql':
|
||||
return {'mysql_engine': cfg.CONF.mysql_engine}
|
||||
return None
|
||||
|
||||
|
||||
class JSONEncodedDict(TypeDecorator):
|
||||
"Represents an immutable structure as a json-encoded string."
|
||||
|
||||
impl = VARCHAR
|
||||
|
||||
def process_bind_param(self, value, dialect):
|
||||
if value is not None:
|
||||
value = json.dumps(value)
|
||||
return value
|
||||
|
||||
def process_result_value(self, value, dialect):
|
||||
if value is not None:
|
||||
value = json.loads(value)
|
||||
return value
|
||||
|
||||
|
||||
class CeilometerBase(object):
|
||||
"""Base class for Ceilometer Models."""
|
||||
__table_args__ = table_args()
|
||||
__table_initialized__ = False
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
setattr(self, key, value)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return getattr(self, key)
|
||||
|
||||
|
||||
Base = declarative_base(cls=CeilometerBase)
|
||||
|
||||
|
||||
sourceassoc = Table('sourceassoc', Base.metadata,
|
||||
Column('meter_id', Integer, ForeignKey("meter.id")),
|
||||
Column('project_id', String(255), ForeignKey("project.id")),
|
||||
Column('resource_id', String(255), ForeignKey("resource.id")),
|
||||
Column('user_id', String(255), ForeignKey("user.id")),
|
||||
Column('source_id', String(255), ForeignKey("source.id"))
|
||||
)
|
||||
|
||||
|
||||
class Source(Base):
|
||||
__tablename__ = 'source'
|
||||
id = Column(String(255), primary_key=True)
|
||||
|
||||
|
||||
class Meter(Base):
|
||||
"""Metering data"""
|
||||
|
||||
__tablename__ = 'meter'
|
||||
id = Column(Integer, primary_key=True)
|
||||
counter_name = Column(String(255))
|
||||
sources = relationship("Source", secondary=lambda: sourceassoc,
|
||||
lazy='joined')
|
||||
user_id = Column(String(255), ForeignKey('user.id'))
|
||||
project_id = Column(String(255), ForeignKey('project.id'))
|
||||
resource_id = Column(String(255), ForeignKey('resource.id'))
|
||||
resource_metadata = Column(JSONEncodedDict)
|
||||
counter_type = Column(String(255))
|
||||
counter_volume = Column(Integer)
|
||||
counter_duration = Column(Integer)
|
||||
timestamp = Column(DateTime, default=timeutils.utcnow)
|
||||
message_signature = Column(String)
|
||||
message_id = Column(String)
|
||||
|
||||
|
||||
class User(Base):
|
||||
__tablename__ = 'user'
|
||||
id = Column(String(255), primary_key=True)
|
||||
sources = relationship("Source", secondary=lambda: sourceassoc,
|
||||
lazy='joined')
|
||||
resources = relationship("Resource", backref='user', lazy='joined')
|
||||
meters = relationship("Meter", backref='user', lazy='joined')
|
||||
|
||||
|
||||
class Project(Base):
|
||||
__tablename__ = 'project'
|
||||
id = Column(String(255), primary_key=True)
|
||||
sources = relationship("Source", secondary=lambda: sourceassoc,
|
||||
lazy='joined')
|
||||
resources = relationship("Resource", backref='project', lazy='joined')
|
||||
meters = relationship("Meter", backref='project', lazy='joined')
|
||||
|
||||
|
||||
class Resource(Base):
|
||||
__tablename__ = 'resource'
|
||||
id = Column(String(255), primary_key=True)
|
||||
sources = relationship("Source", secondary=lambda: sourceassoc,
|
||||
lazy='joined')
|
||||
timestamp = Column(DateTime)
|
||||
resource_metadata = Column(JSONEncodedDict)
|
||||
received_timestamp = Column(DateTime, default=timeutils.utcnow)
|
||||
user_id = Column(String(255), ForeignKey('user.id'))
|
||||
project_id = Column(String(255), ForeignKey('project.id'))
|
||||
meters = relationship("Meter", backref='resource', lazy='joined')
|
196
ceilometer/storage/sqlalchemy/session.py
Normal file
196
ceilometer/storage/sqlalchemy/session.py
Normal file
@ -0,0 +1,196 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Session Handling for SQLAlchemy backend."""
|
||||
|
||||
import re
|
||||
import time
|
||||
|
||||
import sqlalchemy
|
||||
from sqlalchemy.exc import DisconnectionError, OperationalError
|
||||
import sqlalchemy.orm
|
||||
from sqlalchemy.pool import NullPool, StaticPool
|
||||
|
||||
import ceilometer.openstack.common.cfg as cfg
|
||||
import ceilometer.openstack.common.log as logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
_MAKER = None
|
||||
_ENGINE = None
|
||||
|
||||
sql_opts = [
|
||||
cfg.IntOpt('sql_connection_debug',
|
||||
default=0,
|
||||
help='Verbosity of SQL debugging information. 0=None, '
|
||||
'100=Everything'),
|
||||
cfg.BoolOpt('sql_connection_trace',
|
||||
default=False,
|
||||
help='Add python stack traces to SQL as comment strings'),
|
||||
cfg.BoolOpt('sqlite_synchronous',
|
||||
default=True,
|
||||
help='If passed, use synchronous mode for sqlite'),
|
||||
cfg.IntOpt('sql_idle_timeout',
|
||||
default=3600,
|
||||
help='timeout before idle sql connections are reaped'),
|
||||
cfg.IntOpt('sql_max_retries',
|
||||
default=10,
|
||||
help='maximum db connection retries during startup. '
|
||||
'(setting -1 implies an infinite retry count)'),
|
||||
cfg.IntOpt('sql_retry_interval',
|
||||
default=10,
|
||||
help='interval between retries of opening a sql connection'),
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(sql_opts)
|
||||
|
||||
|
||||
def get_session(autocommit=True, expire_on_commit=False, autoflush=True):
|
||||
"""Return a SQLAlchemy session."""
|
||||
global _MAKER
|
||||
|
||||
if _MAKER is None:
|
||||
engine = get_engine()
|
||||
_MAKER = get_maker(engine, autocommit, expire_on_commit, autoflush)
|
||||
|
||||
session = _MAKER()
|
||||
return session
|
||||
|
||||
|
||||
def synchronous_switch_listener(dbapi_conn, connection_rec):
|
||||
"""Switch sqlite connections to non-synchronous mode"""
|
||||
dbapi_conn.execute("PRAGMA synchronous = OFF")
|
||||
|
||||
|
||||
def add_regexp_listener(dbapi_con, con_record):
|
||||
"""Add REGEXP function to sqlite connections."""
|
||||
|
||||
def regexp(expr, item):
|
||||
reg = re.compile(expr)
|
||||
return reg.search(unicode(item)) is not None
|
||||
dbapi_con.create_function('regexp', 2, regexp)
|
||||
|
||||
|
||||
def ping_listener(dbapi_conn, connection_rec, connection_proxy):
|
||||
"""
|
||||
Ensures that MySQL connections checked out of the
|
||||
pool are alive.
|
||||
|
||||
Borrowed from:
|
||||
http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
|
||||
"""
|
||||
try:
|
||||
dbapi_conn.cursor().execute('select 1')
|
||||
except dbapi_conn.OperationalError, ex:
|
||||
if ex.args[0] in (2006, 2013, 2014, 2045, 2055):
|
||||
LOG.warn('Got mysql server has gone away: %s', ex)
|
||||
raise DisconnectionError("Database server went away")
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
def is_db_connection_error(args):
|
||||
"""Return True if error in connecting to db."""
|
||||
# NOTE(adam_g): This is currently MySQL specific and needs to be extended
|
||||
# to support Postgres and others.
|
||||
conn_err_codes = ('2002', '2003', '2006')
|
||||
for err_code in conn_err_codes:
|
||||
if args.find(err_code) != -1:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def get_engine():
|
||||
"""Return a SQLAlchemy engine."""
|
||||
global _ENGINE
|
||||
if _ENGINE is None:
|
||||
connection_dict = sqlalchemy.engine.url.make_url(
|
||||
cfg.CONF.database_connection)
|
||||
|
||||
engine_args = {
|
||||
"pool_recycle": cfg.CONF.sql_idle_timeout,
|
||||
"echo": False,
|
||||
'convert_unicode': True,
|
||||
}
|
||||
|
||||
# Map our SQL debug level to SQLAlchemy's options
|
||||
if cfg.CONF.sql_connection_debug >= 100:
|
||||
engine_args['echo'] = 'debug'
|
||||
elif cfg.CONF.sql_connection_debug >= 50:
|
||||
engine_args['echo'] = True
|
||||
|
||||
if "sqlite" in connection_dict.drivername:
|
||||
engine_args["poolclass"] = NullPool
|
||||
|
||||
if cfg.CONF.database_connection == "sqlite://":
|
||||
engine_args["poolclass"] = StaticPool
|
||||
engine_args["connect_args"] = {'check_same_thread': False}
|
||||
|
||||
_ENGINE = sqlalchemy.create_engine(cfg.CONF.database_connection,
|
||||
**engine_args)
|
||||
|
||||
if 'mysql' in connection_dict.drivername:
|
||||
sqlalchemy.event.listen(_ENGINE, 'checkout', ping_listener)
|
||||
elif "sqlite" in connection_dict.drivername:
|
||||
if not cfg.CONF.sqlite_synchronous:
|
||||
sqlalchemy.event.listen(_ENGINE, 'connect',
|
||||
synchronous_switch_listener)
|
||||
sqlalchemy.event.listen(_ENGINE, 'connect', add_regexp_listener)
|
||||
|
||||
if (cfg.CONF.sql_connection_trace and
|
||||
_ENGINE.dialect.dbapi.__name__ == 'MySQLdb'):
|
||||
import MySQLdb.cursors
|
||||
_do_query = debug_mysql_do_query()
|
||||
setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query)
|
||||
|
||||
try:
|
||||
_ENGINE.connect()
|
||||
except OperationalError, e:
|
||||
if not is_db_connection_error(e.args[0]):
|
||||
raise
|
||||
|
||||
remaining = cfg.CONF.sql_max_retries
|
||||
if remaining == -1:
|
||||
remaining = 'infinite'
|
||||
while True:
|
||||
msg = _('SQL connection failed. %s attempts left.')
|
||||
LOG.warn(msg % remaining)
|
||||
if remaining != 'infinite':
|
||||
remaining -= 1
|
||||
time.sleep(cfg.CONF.sql_retry_interval)
|
||||
try:
|
||||
_ENGINE.connect()
|
||||
break
|
||||
except OperationalError, e:
|
||||
if (remaining != 'infinite' and remaining == 0) or \
|
||||
not is_db_connection_error(e.args[0]):
|
||||
raise
|
||||
return _ENGINE
|
||||
|
||||
|
||||
def get_maker(engine, autocommit=True, expire_on_commit=False, autoflush=True):
|
||||
"""Return a SQLAlchemy sessionmaker using the given engine."""
|
||||
return sqlalchemy.orm.sessionmaker(bind=engine,
|
||||
autocommit=autocommit,
|
||||
autoflush=autoflush,
|
||||
expire_on_commit=expire_on_commit)
|
||||
|
||||
|
||||
def func():
|
||||
# ugly hack sqlalchemy name conflict from impl_sqlalchemy
|
||||
return sqlalchemy.func
|
3
setup.py
3
setup.py
@ -70,5 +70,8 @@ setuptools.setup(
|
||||
[ceilometer.storage]
|
||||
log = ceilometer.storage.impl_log:LogStorage
|
||||
mongodb = ceilometer.storage.impl_mongodb:MongoDBStorage
|
||||
mysql = ceilometer.storage.impl_sqlalchemy:SQLAlchemyStorage
|
||||
postgresql = ceilometer.storage.impl_sqlalchemy:SQLAlchemyStorage
|
||||
sqlite = ceilometer.storage.impl_sqlalchemy:SQLAlchemyStorage
|
||||
"""),
|
||||
)
|
||||
|
466
tests/storage/test_impl_sqlalchemy.py
Normal file
466
tests/storage/test_impl_sqlalchemy.py
Normal file
@ -0,0 +1,466 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Author: John Tran <jhtran@att.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Tests for ceilometer/storage/impl_sqlalchemy.py
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import unittest
|
||||
|
||||
import mox
|
||||
|
||||
from nose.plugins import skip
|
||||
from sqlalchemy import MetaData, text
|
||||
|
||||
from ceilometer import counter
|
||||
from ceilometer import meter
|
||||
from ceilometer import storage
|
||||
from ceilometer.storage import migration
|
||||
import ceilometer.openstack.common.cfg as cfg
|
||||
from ceilometer.storage import impl_sqlalchemy
|
||||
from ceilometer.storage.sqlalchemy.models import Meter, Project, Resource
|
||||
from ceilometer.storage.sqlalchemy.models import Source, User
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CEILOMETER_TEST_LIVE = bool(int(os.environ.get('CEILOMETER_TEST_LIVE', 0)))
|
||||
if CEILOMETER_TEST_LIVE:
|
||||
MYSQL_DBNAME = 'ceilometer_test'
|
||||
MYSQL_URL = 'mysql://ceilometer:somepass@localhost/%s' % MYSQL_DBNAME
|
||||
|
||||
|
||||
class Connection(impl_sqlalchemy.Connection):
|
||||
|
||||
def _get_connection(self, conf):
|
||||
try:
|
||||
return super(Connection, self)._get_connection(conf)
|
||||
except:
|
||||
LOG.debug('Unable to connect to %s' % conf.database_connection)
|
||||
raise
|
||||
|
||||
|
||||
class SQLAlchemyEngineTestBase(unittest.TestCase):
|
||||
|
||||
def tearDown(self):
|
||||
super(SQLAlchemyEngineTestBase, self).tearDown()
|
||||
engine_conn = self.session.bind.connect()
|
||||
if CEILOMETER_TEST_LIVE:
|
||||
engine_conn.execute(text('drop database %s' % MYSQL_DBNAME))
|
||||
engine_conn.execute(text('create database %s' % MYSQL_DBNAME))
|
||||
# needed for sqlite in-memory db to destroy
|
||||
self.session.close_all()
|
||||
self.session.bind.dispose()
|
||||
|
||||
def setUp(self):
|
||||
super(SQLAlchemyEngineTestBase, self).setUp()
|
||||
|
||||
self.conf = cfg.CONF
|
||||
self.conf.database_connection = 'sqlite://'
|
||||
# Use a real MySQL server if we can connect, but fall back
|
||||
# to a Sqlite in-memory connection if we cannot.
|
||||
if CEILOMETER_TEST_LIVE:
|
||||
# should pull from conf file but for now manually specified
|
||||
# just make sure ceilometer_test db exists in mysql
|
||||
self.conf.database_connection = MYSQL_URL
|
||||
|
||||
self.conn = Connection(self.conf)
|
||||
self.session = self.conn.session
|
||||
|
||||
migration.db_sync()
|
||||
|
||||
self.counter = counter.Counter(
|
||||
'test-1',
|
||||
'instance',
|
||||
'cumulative',
|
||||
volume=1,
|
||||
user_id='user-id',
|
||||
project_id='project-id',
|
||||
resource_id='resource-id',
|
||||
timestamp=datetime.datetime(2012, 7, 2, 10, 40),
|
||||
duration=0,
|
||||
resource_metadata={'display_name': 'test-server',
|
||||
'tag': 'self.counter',
|
||||
}
|
||||
)
|
||||
self.msg1 = meter.meter_message_from_counter(self.counter)
|
||||
self.conn.record_metering_data(self.msg1)
|
||||
|
||||
self.counter2 = counter.Counter(
|
||||
'test-2',
|
||||
'instance',
|
||||
'cumulative',
|
||||
volume=1,
|
||||
user_id='user-id',
|
||||
project_id='project-id',
|
||||
resource_id='resource-id-alternate',
|
||||
timestamp=datetime.datetime(2012, 7, 2, 10, 41),
|
||||
duration=0,
|
||||
resource_metadata={'display_name': 'test-server',
|
||||
'tag': 'self.counter2',
|
||||
}
|
||||
)
|
||||
self.msg2 = meter.meter_message_from_counter(self.counter2)
|
||||
self.conn.record_metering_data(self.msg2)
|
||||
|
||||
self.counter3 = counter.Counter(
|
||||
'test-3',
|
||||
'instance',
|
||||
'cumulative',
|
||||
volume=1,
|
||||
user_id='user-id-alternate',
|
||||
project_id='project-id',
|
||||
resource_id='resource-id-alternate',
|
||||
timestamp=datetime.datetime(2012, 7, 2, 10, 41),
|
||||
duration=0,
|
||||
resource_metadata={'display_name': 'test-server',
|
||||
'tag': 'self.counter3',
|
||||
}
|
||||
)
|
||||
self.msg3 = meter.meter_message_from_counter(self.counter3)
|
||||
self.conn.record_metering_data(self.msg3)
|
||||
|
||||
for i in range(2, 4):
|
||||
c = counter.Counter(
|
||||
'test',
|
||||
'instance',
|
||||
'cumulative',
|
||||
1,
|
||||
'user-id-%s' % i,
|
||||
'project-id-%s' % i,
|
||||
'resource-id-%s' % i,
|
||||
timestamp=datetime.datetime(2012, 7, 2, 10, 40 + i),
|
||||
duration=0,
|
||||
resource_metadata={'display_name': 'test-server',
|
||||
'tag': 'counter-%s' % i,
|
||||
}
|
||||
)
|
||||
msg = meter.meter_message_from_counter(c)
|
||||
self.conn.record_metering_data(msg)
|
||||
|
||||
|
||||
class UserTest(SQLAlchemyEngineTestBase):
|
||||
|
||||
def test_new_user(self):
|
||||
user = self.session.query(User).get('user-id')
|
||||
assert user is not None
|
||||
|
||||
def test_new_user_source(self):
|
||||
user = self.session.query(User).get('user-id')
|
||||
assert hasattr(user, 'sources')
|
||||
sources = user.sources
|
||||
assert map(lambda x: x.id, user.sources) == ['test-1', 'test-2']
|
||||
|
||||
def test_get_users(self):
|
||||
users = self.conn.get_users()
|
||||
xpct = set(['user-id', 'user-id-alternate', 'user-id-2', 'user-id-3'])
|
||||
assert set(self.conn.get_users()) == xpct
|
||||
|
||||
def test_get_users_by_source(self):
|
||||
assert set(self.conn.get_users(source='test-1')) == set(['user-id'])
|
||||
|
||||
|
||||
class ProjectTest(SQLAlchemyEngineTestBase):
|
||||
|
||||
def test_new_project(self):
|
||||
project = self.session.query(Project).get('project-id')
|
||||
assert project is not None
|
||||
|
||||
def test_new_project_source(self):
|
||||
project = self.session.query(Project).get('project-id')
|
||||
assert hasattr(project, 'sources')
|
||||
expected = ['test-1', 'test-2', 'test-3']
|
||||
assert map(lambda x: x.id, project.sources) == expected
|
||||
|
||||
def test_get_projects(self):
|
||||
projects = self.session.query(Project).all()
|
||||
projects = map(lambda x: x.id, projects)
|
||||
expect = set(['project-id', 'project-id-2', 'project-id-3'])
|
||||
assert set(projects) == expect
|
||||
|
||||
def test_get_projects_by_source(self):
|
||||
projects = self.conn.get_projects(source='test-1')
|
||||
assert list(projects) == ['project-id']
|
||||
|
||||
|
||||
class ResourceTest(SQLAlchemyEngineTestBase):
|
||||
|
||||
def test_new_resource(self):
|
||||
resource = self.session.query(Resource).get('resource-id')
|
||||
assert resource is not None
|
||||
|
||||
def test_new_resource_project(self):
|
||||
resource = self.session.query(Resource).get('resource-id')
|
||||
assert hasattr(resource, 'project')
|
||||
assert resource.project.id == 'project-id'
|
||||
|
||||
def test_new_resource_user(self):
|
||||
resource = self.session.query(Resource).get('resource-id')
|
||||
assert hasattr(resource, 'user')
|
||||
assert resource.user.id == 'user-id'
|
||||
|
||||
def test_new_resource_meter(self):
|
||||
resource = self.session.query(Resource).filter_by(id='resource-id').\
|
||||
filter(Meter.counter_name == 'instance').\
|
||||
filter(Meter.counter_type == 'cumulative').first()
|
||||
assert len(set(resource.meters)) == 1
|
||||
foo = map(lambda x: [x.counter_name, x.counter_type], resource.meters)
|
||||
assert ['instance', 'cumulative'] in foo
|
||||
|
||||
def test_new_resource_metadata(self):
|
||||
resource = self.session.query(Resource).get('resource-id')
|
||||
assert hasattr(resource, 'metadata')
|
||||
metadata = resource.resource_metadata
|
||||
assert metadata['display_name'] == 'test-server'
|
||||
|
||||
def test_get_resources(self):
|
||||
resources = list(self.conn.get_resources())
|
||||
assert len(resources) == 4
|
||||
for resource in resources:
|
||||
if resource['resource_id'] != 'resource-id':
|
||||
continue
|
||||
assert resource['resource_id'] == 'resource-id'
|
||||
assert resource['project_id'] == 'project-id'
|
||||
assert resource['user_id'] == 'user-id'
|
||||
assert 'resource_metadata' in resource
|
||||
assert 'meters' in resource
|
||||
foo = map(lambda x: [x['counter_name'], x['counter_type']],
|
||||
resource['meters'])
|
||||
assert ['instance', 'cumulative'] in foo
|
||||
break
|
||||
else:
|
||||
assert False, 'Never found resource-id'
|
||||
|
||||
def test_get_resources_start_timestamp(self):
|
||||
timestamp = datetime.datetime(2012, 7, 2, 10, 42)
|
||||
resources = list(self.conn.get_resources(start_timestamp=timestamp))
|
||||
resource_ids = [r['resource_id'] for r in resources]
|
||||
expected = set(['resource-id-2', 'resource-id-3'])
|
||||
assert set(resource_ids) == expected
|
||||
|
||||
def test_get_resources_end_timestamp(self):
|
||||
timestamp = datetime.datetime(2012, 7, 2, 10, 42)
|
||||
resources = list(self.conn.get_resources(end_timestamp=timestamp))
|
||||
resource_ids = [r['resource_id'] for r in resources]
|
||||
expected = set(['resource-id', 'resource-id-alternate'])
|
||||
assert set(resource_ids) == expected
|
||||
|
||||
def test_get_resources_both_timestamps(self):
|
||||
start_ts = datetime.datetime(2012, 7, 2, 10, 42)
|
||||
end_ts = datetime.datetime(2012, 7, 2, 10, 43)
|
||||
resources = list(self.conn.get_resources(start_timestamp=start_ts,
|
||||
end_timestamp=end_ts)
|
||||
)
|
||||
resource_ids = [r['resource_id'] for r in resources]
|
||||
assert set(resource_ids) == set(['resource-id-2'])
|
||||
|
||||
def test_get_resources_by_source(self):
|
||||
resources = list(self.conn.get_resources(source='test-1'))
|
||||
assert len(resources) == 1
|
||||
ids = set(r['resource_id'] for r in resources)
|
||||
assert ids == set(['resource-id'])
|
||||
|
||||
def test_get_resources_by_user(self):
|
||||
resources = list(self.conn.get_resources(user='user-id'))
|
||||
assert len(resources) == 1
|
||||
ids = set(r['resource_id'] for r in resources)
|
||||
assert ids == set(['resource-id'])
|
||||
|
||||
def test_get_resources_by_project(self):
|
||||
resources = list(self.conn.get_resources(project='project-id'))
|
||||
assert len(resources) == 2
|
||||
ids = set(r['resource_id'] for r in resources)
|
||||
assert ids == set(['resource-id', 'resource-id-alternate'])
|
||||
|
||||
|
||||
class MeterTest(SQLAlchemyEngineTestBase):
|
||||
|
||||
def _compare_raw(self, msg_dict, result_dict):
|
||||
for k, v in msg_dict.items():
|
||||
if k in ['timestamp', 'source']:
|
||||
continue
|
||||
if k == 'resource_metadata':
|
||||
key = result_dict[k]
|
||||
value = v
|
||||
else:
|
||||
key = str(result_dict[k])
|
||||
value = str(v)
|
||||
assert key == value
|
||||
|
||||
def _iterate_msgs(self, results):
|
||||
for meter in results:
|
||||
labels = map(lambda x: x['id'], meter['sources'])
|
||||
# should only have one source
|
||||
assert len(labels) == 1
|
||||
count = re.match('test-(\d+)', labels[0]).group(1)
|
||||
self._compare_raw(getattr(self, 'msg' + count), meter)
|
||||
|
||||
def test_new_meter(self):
|
||||
meter = self.session.query(Meter).first()
|
||||
assert meter is not None
|
||||
|
||||
def test_get_raw_events_by_user(self):
|
||||
f = storage.EventFilter(user='user-id')
|
||||
results = list(self.conn.get_raw_events(f))
|
||||
assert len(results) == 2
|
||||
self._iterate_msgs(results)
|
||||
|
||||
def test_get_raw_events_by_project(self):
|
||||
f = storage.EventFilter(project='project-id')
|
||||
results = list(self.conn.get_raw_events(f))
|
||||
assert len(results) == 3
|
||||
self._iterate_msgs(results)
|
||||
|
||||
def test_get_raw_events_by_resource(self):
|
||||
f = storage.EventFilter(user='user-id', resource='resource-id')
|
||||
results = list(self.conn.get_raw_events(f))
|
||||
assert len(results) == 1
|
||||
self._compare_raw(self.msg1, results[0])
|
||||
|
||||
def test_get_raw_events_by_start_time(self):
|
||||
f = storage.EventFilter(
|
||||
user='user-id',
|
||||
start=datetime.datetime(2012, 7, 2, 10, 41),
|
||||
)
|
||||
results = list(self.conn.get_raw_events(f))
|
||||
assert len(results) == 1
|
||||
assert results[0]['timestamp'] == datetime.datetime(2012, 7, 2, 10, 41)
|
||||
|
||||
def test_get_raw_events_by_end_time(self):
|
||||
f = storage.EventFilter(
|
||||
user='user-id',
|
||||
end=datetime.datetime(2012, 7, 2, 10, 41),
|
||||
)
|
||||
results = list(self.conn.get_raw_events(f))
|
||||
length = len(results)
|
||||
assert length == 1
|
||||
assert results[0]['timestamp'] == datetime.datetime(2012, 7, 2, 10, 40)
|
||||
|
||||
def test_get_raw_events_by_both_times(self):
|
||||
f = storage.EventFilter(
|
||||
start=datetime.datetime(2012, 7, 2, 10, 42),
|
||||
end=datetime.datetime(2012, 7, 2, 10, 43),
|
||||
)
|
||||
results = list(self.conn.get_raw_events(f))
|
||||
length = len(results)
|
||||
assert length == 1
|
||||
assert results[0]['timestamp'] == datetime.datetime(2012, 7, 2, 10, 42)
|
||||
|
||||
def test_get_raw_events_by_meter(self):
|
||||
f = storage.EventFilter(user='user-id', meter='no-such-meter')
|
||||
results = list(self.conn.get_raw_events(f))
|
||||
assert not results
|
||||
|
||||
def test_get_raw_events_by_meter2(self):
|
||||
f = storage.EventFilter(user='user-id', meter='instance')
|
||||
results = list(self.conn.get_raw_events(f))
|
||||
assert results
|
||||
|
||||
|
||||
class TestGetEventInterval(SQLAlchemyEngineTestBase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestGetEventInterval, self).setUp()
|
||||
|
||||
# Create events relative to the range and pretend
|
||||
# that the intervening events exist.
|
||||
|
||||
self.start = datetime.datetime(2012, 8, 28, 0, 0)
|
||||
self.end = datetime.datetime(2012, 8, 29, 0, 0)
|
||||
|
||||
self.early1 = self.start - datetime.timedelta(minutes=20)
|
||||
self.early2 = self.start - datetime.timedelta(minutes=10)
|
||||
|
||||
self.middle1 = self.start + datetime.timedelta(minutes=10)
|
||||
self.middle2 = self.end - datetime.timedelta(minutes=10)
|
||||
|
||||
self.late1 = self.end + datetime.timedelta(minutes=10)
|
||||
self.late2 = self.end + datetime.timedelta(minutes=20)
|
||||
|
||||
self._filter = storage.EventFilter(
|
||||
resource='111',
|
||||
meter='instance',
|
||||
start=self.start,
|
||||
end=self.end,
|
||||
)
|
||||
|
||||
def _make_events(self, *timestamps):
|
||||
for t in timestamps:
|
||||
c = counter.Counter(
|
||||
'test',
|
||||
'instance',
|
||||
'cumulative',
|
||||
1,
|
||||
'11',
|
||||
'1',
|
||||
'111',
|
||||
timestamp=t,
|
||||
duration=0,
|
||||
resource_metadata={'display_name': 'test-server',
|
||||
}
|
||||
)
|
||||
msg = meter.meter_message_from_counter(c)
|
||||
self.conn.record_metering_data(msg)
|
||||
|
||||
def test_before_range(self):
|
||||
self._make_events(self.early1, self.early2)
|
||||
s, e = self.conn.get_event_interval(self._filter)
|
||||
assert s is None
|
||||
assert e is None
|
||||
|
||||
def test_overlap_range_start(self):
|
||||
self._make_events(self.early1, self.start, self.middle1)
|
||||
s, e = self.conn.get_event_interval(self._filter)
|
||||
assert s == self.start
|
||||
assert e == self.middle1
|
||||
|
||||
def test_within_range(self):
|
||||
self._make_events(self.middle1, self.middle2)
|
||||
s, e = self.conn.get_event_interval(self._filter)
|
||||
assert s == self.middle1
|
||||
assert e == self.middle2
|
||||
|
||||
def test_within_range_zero_duration(self):
|
||||
self._make_events(self.middle1)
|
||||
s, e = self.conn.get_event_interval(self._filter)
|
||||
assert s == self.middle1
|
||||
assert e == self.middle1
|
||||
|
||||
def test_within_range_zero_duration_two_events(self):
|
||||
self._make_events(self.middle1, self.middle1)
|
||||
s, e = self.conn.get_event_interval(self._filter)
|
||||
assert s == self.middle1
|
||||
assert e == self.middle1
|
||||
|
||||
def test_overlap_range_end(self):
|
||||
self._make_events(self.middle2, self.end, self.late1)
|
||||
s, e = self.conn.get_event_interval(self._filter)
|
||||
assert s == self.middle2
|
||||
assert e == self.middle2
|
||||
|
||||
def test_overlap_range_end_with_offset(self):
|
||||
self._make_events(self.middle2, self.end, self.late1)
|
||||
self._filter.end = self.late1
|
||||
s, e = self.conn.get_event_interval(self._filter)
|
||||
assert s == self.middle2
|
||||
assert e == self.end
|
||||
|
||||
def test_after_range(self):
|
||||
self._make_events(self.late1, self.late2)
|
||||
s, e = self.conn.get_event_interval(self._filter)
|
||||
assert s is None
|
||||
assert e is None
|
27
tools/dbsync
Executable file
27
tools/dbsync
Executable file
@ -0,0 +1,27 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Author: John Tran <jhtran@att.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Run SQLAlchemy db migration.
|
||||
"""
|
||||
|
||||
import sys
|
||||
from ceilometer import storage
|
||||
from ceilometer.storage import migration
|
||||
from ceilometer.openstack.common import cfg
|
||||
|
||||
if __name__ == '__main__':
|
||||
cfg.CONF(sys.argv[1:])
|
||||
migration.db_sync()
|
Loading…
Reference in New Issue
Block a user