diff --git a/ceilometer/collector/service.py b/ceilometer/collector/service.py index 682122c1..2d92f6ce 100644 --- a/ceilometer/collector/service.py +++ b/ceilometer/collector/service.py @@ -244,9 +244,9 @@ class CollectorService(CollectorBase, rpc_service.Service): source of the notification. This will have to get added back later. """ message_id = body.get('message_id') - event_name = body['event_type'] + event_type = body['event_type'] when = self._extract_when(body) - LOG.debug('Saving event "%s"', event_name) + LOG.debug('Saving event "%s"', event_type) publisher = body.get('publisher_id') request_id = body.get('_context_request_id') @@ -260,7 +260,7 @@ class CollectorService(CollectorBase, rpc_service.Service): # Only store non-None value traits ... traits = [trait for trait in all_traits if trait.value is not None] - event = models.Event(message_id, event_name, when, traits) + event = models.Event(message_id, event_type, when, traits) problem_events = [] for dispatcher in self.dispatcher_manager: diff --git a/ceilometer/storage/__init__.py b/ceilometer/storage/__init__.py index ab380e03..341f4b1d 100644 --- a/ceilometer/storage/__init__.py +++ b/ceilometer/storage/__init__.py @@ -116,7 +116,7 @@ class EventFilter(object): :param start: UTC start datetime (mandatory) :param end: UTC end datetime (mandatory) - :param event_name: the name of the event. None for all. + :param event_type: the name of the event. None for all. :param traits: the trait filter dict, all of which are optional {'key': , 't_string': , @@ -126,10 +126,10 @@ class EventFilter(object): currently, only one trait dict is supported. """ - def __init__(self, start, end, event_name=None, traits={}): + def __init__(self, start, end, event_type=None, traits={}): self.start = utils.sanitize_timestamp(start) self.end = utils.sanitize_timestamp(end) - self.event_name = event_name + self.event_type = event_type self.traits = traits diff --git a/ceilometer/storage/impl_sqlalchemy.py b/ceilometer/storage/impl_sqlalchemy.py index 135306f6..d0f6f0c4 100644 --- a/ceilometer/storage/impl_sqlalchemy.py +++ b/ceilometer/storage/impl_sqlalchemy.py @@ -849,15 +849,33 @@ class Connection(base.Connection): values[value_map[trait_model.dtype]] = value return models.Trait(name, event, trait_model.dtype, **values) + @staticmethod + def _get_or_create_event_type(event_type, session=None): + """Here, we check to see if an event type with the supplied + name already exists. If not, we create it and return the record. + + This may result in a flush. + """ + if session is None: + session = sqlalchemy_session.get_session() + with session.begin(subtransactions=True): + et = session.query(models.EventType).filter( + models.EventType.desc == event_type).first() + if not et: + et = models.EventType(event_type) + session.add(et) + session.flush() + return et + def _record_event(self, session, event_model): """Store a single Event, including related Traits. """ with session.begin(subtransactions=True): - unique = self._get_or_create_unique_name(event_model.event_name, - session=session) + event_type = self._get_or_create_event_type(event_model.event_type, + session=session) generated = utils.dt_to_decimal(event_model.generated) - event = models.Event(event_model.message_id, unique, generated) + event = models.Event(event_model.message_id, event_type, generated) session.add(event) new_traits = [] @@ -867,7 +885,7 @@ class Connection(base.Connection): session.add(t) new_traits.append(t) - # Note: we don't flush here, explicitly (unless a new uniquename + # Note: we don't flush here, explicitly (unless a new event_type # does it). Otherwise, just wait until all the Events are staged. return (event, new_traits) @@ -900,7 +918,9 @@ class Connection(base.Connection): return problem_events def get_events(self, event_filter): - """Return an iterable of model.Event objects. + """Return an iterable of model.Event objects. The event model objects + have their Trait model objects available -- filtered by any traits + in the event_filter. :param event_filter: EventFilter instance """ @@ -909,17 +929,18 @@ class Connection(base.Connection): end = utils.dt_to_decimal(event_filter.end) session = sqlalchemy_session.get_session() with session.begin(): - event_query_filters = [models.Event.generated >= start, - models.Event.generated <= end] sub_query = session.query(models.Event.id)\ - .join(models.Trait, models.Trait.event_id == models.Event.id) + .join(models.EventType, + models.Event.event_type_id == models.EventType.id)\ + .join(models.Trait, + models.Trait.event_id == models.Event.id)\ + .filter(models.Event.generated >= start, + models.Event.generated <= end) - if event_filter.event_name: - event_name = self._get_unique(session, event_filter.event_name) - event_query_filters.append( - models.Event.unique_name == event_name) - - sub_query = sub_query.filter(*event_query_filters) + if event_filter.event_type: + event_type = event_filter.event_type + sub_query = sub_query\ + .filter(models.EventType.desc == event_type) event_models_dict = {} if event_filter.traits: @@ -943,11 +964,19 @@ class Connection(base.Connection): else: # Pre-populate event_models_dict to cover Events without traits events = session.query(models.Event)\ - .filter(*event_query_filters) + .filter(models.Event.generated >= start)\ + .filter(models.Event.generated <= end) + if event_filter.event_type: + events = events\ + .join(models.EventType, + models.EventType.id == + models.Event.event_type_id)\ + .filter(models.EventType.desc == + event_filter.event_type) for db_event in events.all(): generated = utils.decimal_to_dt(db_event.generated) api_event = api_models.Event(db_event.message_id, - db_event.unique_name.key, + db_event.event_type.desc, generated, []) event_models_dict[db_event.id] = api_event @@ -962,7 +991,7 @@ class Connection(base.Connection): if not event: generated = utils.decimal_to_dt(trait.event.generated) event = api_models.Event(trait.event.message_id, - trait.event.unique_name.key, + trait.event.event_type.desc, generated, []) event_models_dict[trait.event_id] = event value = trait.get_value() diff --git a/ceilometer/storage/models.py b/ceilometer/storage/models.py index 40944eff..2222c09f 100644 --- a/ceilometer/storage/models.py +++ b/ceilometer/storage/models.py @@ -52,18 +52,18 @@ class Event(Model): DUPLICATE = 1 UNKNOWN_PROBLEM = 2 - def __init__(self, message_id, event_name, generated, traits): + def __init__(self, message_id, event_type, generated, traits): """Create a new event. :param message_id: Unique ID for the message this event stemmed from. This is different than the Event ID, which comes from the underlying storage system. - :param event_name: Name of the event. + :param event_type: The type of the event. :param generated: UTC time for when the event occured. :param traits: list of Traits on this Event. """ - Model.__init__(self, message_id=message_id, event_name=event_name, + Model.__init__(self, message_id=message_id, event_type=event_type, generated=generated, traits=traits) def append_trait(self, trait_model): @@ -74,7 +74,7 @@ class Event(Model): if self.traits: trait_list = [str(trait) for trait in self.traits] return "" % \ - (self.message_id, self.event_name, self.generated, + (self.message_id, self.event_type, self.generated, " ".join(trait_list)) diff --git a/ceilometer/storage/sqlalchemy/migrate_repo/versions/021_add_event_types.py b/ceilometer/storage/sqlalchemy/migrate_repo/versions/021_add_event_types.py new file mode 100644 index 00000000..8a1e50ec --- /dev/null +++ b/ceilometer/storage/sqlalchemy/migrate_repo/versions/021_add_event_types.py @@ -0,0 +1,120 @@ +# -*- encoding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from ceilometer.storage.sqlalchemy import migration +from migrate import ForeignKeyConstraint +from sqlalchemy import Column +from sqlalchemy import Integer +from sqlalchemy import MetaData +from sqlalchemy import select +from sqlalchemy import String +from sqlalchemy import Table + + +def upgrade(migrate_engine): + meta = MetaData(bind=migrate_engine) + event_type = Table( + 'event_type', meta, + Column('id', Integer, primary_key=True), + Column('desc', String(255), unique=True), + mysql_engine='InnoDB', + mysql_charset='utf8', + ) + event_type.create() + event = Table('event', meta, autoload=True) + unique_name = Table('unique_name', meta, autoload=True) + + # Event type is a specialization of Unique name, so + # we insert into the event_type table all the distinct + # unique names from the event.unique_name field along + # with the key from the unique_name table, and + # then rename the event.unique_name field to event.event_type + conn = migrate_engine.connect() + sql = ("INSERT INTO event_type " + "SELECT unique_name.id, unique_name.key FROM event " + "INNER JOIN unique_name " + "ON event.unique_name_id = unique_name.id " + "GROUP BY unique_name.id") + conn.execute(sql) + # Now we need to drop the foreign key constraint, rename + # the event.unique_name column, and re-add a new foreign + # key constraint + params = {'columns': [event.c.unique_name_id], + 'refcolumns': [unique_name.c.id]} + if migrate_engine.name == 'mysql': + params['name'] = "event_ibfk_1" + fkey = ForeignKeyConstraint(**params) + fkey.drop() + + Column('event_type_id', Integer).create(event) + + # Move data from unique_name_id column into event_type_id column + # and delete the entry from the unique_name table + query = select([event.c.id, event.c.unique_name_id]) + for key, value in migration.paged(query): + event.update().where(event.c.id == key)\ + .values({"event_type_id": value}).execute() + unique_name.delete()\ + .where(unique_name.c.id == key).execute() + + params = {'columns': [event.c.event_type_id], + 'refcolumns': [event_type.c.id]} + if migrate_engine.name == 'mysql': + params['name'] = "_".join(('fk', 'event_type', 'id')) + fkey = ForeignKeyConstraint(**params) + fkey.create() + + event.c.unique_name_id.drop() + conn.close() + + +def downgrade(migrate_engine): + meta = MetaData(bind=migrate_engine) + event_type = Table('event_type', meta, autoload=True) + event = Table('event', meta, autoload=True) + unique_name = Table('unique_name', meta, autoload=True) + # Re-insert the event type table records into the old + # unique_name table. + conn = migrate_engine.connect() + sql = ("INSERT INTO unique_name " + "SELECT event_type.id, event_type.desc FROM event_type") + conn.execute(sql) + # Drop the foreign key constraint to event_type, drop the + # event_type table, rename the event.event_type column to + # event.unique_name, and re-add the old foreign + # key constraint + params = {'columns': [event.c.event_type_id], + 'refcolumns': [event_type.c.id]} + if migrate_engine.name == 'mysql': + params['name'] = "_".join(('fk', 'event_type', 'id')) + fkey = ForeignKeyConstraint(**params) + fkey.drop() + + event_type.drop() + + Column('unique_name_id', Integer).create(event) + + # Move data from event_type_id column to unique_name_id column + query = select([event.c.id, event.c.event_type_id]) + for key, value in migration.paged(query): + event.update().where(event.c.id == key)\ + .values({"unique_name_id": value}).execute() + + event.c.event_type_id.drop() + params = {'columns': [event.c.unique_name_id], + 'refcolumns': [unique_name.c.id]} + if migrate_engine.name == 'mysql': + params['name'] = 'event_ibfk_1' + fkey = ForeignKeyConstraint(**params) + fkey.create() diff --git a/ceilometer/storage/sqlalchemy/migrate_repo/versions/021_sqlite_downgrade.sql b/ceilometer/storage/sqlalchemy/migrate_repo/versions/021_sqlite_downgrade.sql new file mode 100644 index 00000000..fe56e89d --- /dev/null +++ b/ceilometer/storage/sqlalchemy/migrate_repo/versions/021_sqlite_downgrade.sql @@ -0,0 +1,20 @@ +ALTER TABLE event RENAME TO event_orig; + +INSERT INTO unique_name +SELECT et.id, et.desc +FROM event_type et; + +CREATE TABLE event ( + id INTEGER PRIMARY KEY ASC, + generated FLOAT NOT NULL, + message_id VARCHAR(50) UNIQUE, + unique_name_id INTEGER NOT NULL, + FOREIGN KEY (unique_name_id) REFERENCES unique_name (id) +); + +INSERT INTO event +SELECT id, generated, message_id, event_type_id +FROM event_orig; + +DROP TABLE event_orig; +DROP TABLE event_type; diff --git a/ceilometer/storage/sqlalchemy/migrate_repo/versions/021_sqlite_upgrade.sql b/ceilometer/storage/sqlalchemy/migrate_repo/versions/021_sqlite_upgrade.sql new file mode 100644 index 00000000..19030113 --- /dev/null +++ b/ceilometer/storage/sqlalchemy/migrate_repo/versions/021_sqlite_upgrade.sql @@ -0,0 +1,29 @@ +CREATE TABLE event_type ( + id INTEGER PRIMARY KEY ASC, + desc STRING NOT NULL +); + +INSERT INTO event_type +SELECT un.id, un.key +FROM unique_name un +JOIN event e ON un.id = e.unique_name_id +GROUP BY un.id; + +ALTER TABLE event RENAME TO event_orig; + +CREATE TABLE event ( + id INTEGER PRIMARY KEY ASC, + generated FLOAT NOT NULL, + message_id VARCHAR(50) UNIQUE, + event_type_id INTEGER NOT NULL, + FOREIGN KEY (event_type_id) REFERENCES event_type (id) +); + +INSERT INTO event +SELECT id, generated, message_id, unique_name_id +FROM event_orig; + +DROP TABLE event_orig; + +DELETE FROM unique_name +WHERE id IN (SELECT id FROM event_type); diff --git a/ceilometer/storage/sqlalchemy/migration.py b/ceilometer/storage/sqlalchemy/migration.py index d7d4609a..2651e4a4 100644 --- a/ceilometer/storage/sqlalchemy/migration.py +++ b/ceilometer/storage/sqlalchemy/migration.py @@ -104,3 +104,21 @@ def _find_migrate_repo(): if _REPOSITORY is None: _REPOSITORY = Repository(path) return _REPOSITORY + + +def paged(query, size=1000): + """Page query results + + :param query: the SQLAlchemy query to execute + :param size: the max page size + return: generator with query data + """ + offset = 0 + while True: + page = query.offset(offset).limit(size).execute() + if page.rowcount <= 0: + # There are no more rows + break + for row in page: + yield row + offset += size diff --git a/ceilometer/storage/sqlalchemy/models.py b/ceilometer/storage/sqlalchemy/models.py index 8f890b30..faa8f26b 100644 --- a/ceilometer/storage/sqlalchemy/models.py +++ b/ceilometer/storage/sqlalchemy/models.py @@ -307,29 +307,44 @@ class UniqueName(Base): return "" % self.key +class EventType(Base): + """Types of event records.""" + __tablename__ = 'event_type' + + id = Column(Integer, primary_key=True) + desc = Column(String(255), unique=True) + + def __init__(self, event_type): + self.desc = event_type + + def __repr__(self): + return "" % self.desc + + class Event(Base): __tablename__ = 'event' __table_args__ = ( - Index('unique_name_id', 'unique_name_id'), Index('ix_event_message_id', 'message_id'), - Index('ix_event_generated', 'generated'), + Index('ix_event_type_id', 'event_type_id'), + Index('ix_event_generated', 'generated') ) id = Column(Integer, primary_key=True) message_id = Column(String(50), unique=True) generated = Column(Float(asdecimal=True)) - unique_name_id = Column(Integer, ForeignKey('unique_name.id')) - unique_name = relationship("UniqueName", backref=backref('unique_name', - order_by=id)) + event_type_id = Column(Integer, ForeignKey('event_type.id')) + event_type = relationship("EventType", backref=backref('event_type')) - def __init__(self, message_id, event, generated): + def __init__(self, message_id, event_type, generated): self.message_id = message_id - self.unique_name = event + self.event_type = event_type self.generated = generated def __repr__(self): - return "" % \ - (self.id, self.message_id, self.unique_name, self.generated) + return "" % (self.id, + self.message_id, + self.event_type, + self.generated) class Trait(Base): diff --git a/ceilometer/tests/collector/test_service.py b/ceilometer/tests/collector/test_service.py index f0204cea..a3ec3c8b 100644 --- a/ceilometer/tests/collector/test_service.py +++ b/ceilometer/tests/collector/test_service.py @@ -260,7 +260,7 @@ class TestCollectorService(TestCollector): events = mock_dispatcher.record_events.call_args[0] self.assertEqual(1, len(events)) event = events[0] - self.assertEqual("foo", event.event_name) + self.assertEqual("foo", event.event_type) self.assertEqual(now, event.generated) self.assertEqual(1, len(event.traits)) diff --git a/ceilometer/tests/storage/test_impl_sqlalchemy.py b/ceilometer/tests/storage/test_impl_sqlalchemy.py index 7b1a45b3..079f8051 100644 --- a/ceilometer/tests/storage/test_impl_sqlalchemy.py +++ b/ceilometer/tests/storage/test_impl_sqlalchemy.py @@ -68,6 +68,27 @@ class UniqueNameTest(EventTestBase): self.assertTrue(repr.repr(u2)) +class EventTypeTest(EventTestBase): + # EventType is a construct specific to sqlalchemy + # Not applicable to other drivers. + + def test_event_type_exists(self): + et1 = self.conn._get_or_create_event_type("foo") + self.assertTrue(et1.id >= 0) + et2 = self.conn._get_or_create_event_type("foo") + self.assertEqual(et1.id, et2.id) + self.assertEqual(et1.desc, et2.desc) + + def test_event_type_unique(self): + et1 = self.conn._get_or_create_event_type("foo") + self.assertTrue(et1.id >= 0) + et2 = self.conn._get_or_create_event_type("blah") + self.assertNotEqual(et1.id, et2.id) + self.assertNotEqual(et1.desc, et2.desc) + # Test the method __repr__ returns a string + self.assertTrue(repr.repr(et2)) + + class MyException(Exception): pass diff --git a/ceilometer/tests/storage/test_storage_scenarios.py b/ceilometer/tests/storage/test_storage_scenarios.py index 1741e276..df30fae5 100644 --- a/ceilometer/tests/storage/test_storage_scenarios.py +++ b/ceilometer/tests/storage/test_storage_scenarios.py @@ -2101,20 +2101,20 @@ class GetEventTest(EventTestBase): base = 0 self.start = datetime.datetime(2013, 12, 31, 5, 0) now = self.start - for event_name in ['Foo', 'Bar', 'Zoo']: + for event_type in ['Foo', 'Bar', 'Zoo']: trait_models = \ [models.Trait(name, dtype, value) for name, dtype, value in [ ('trait_A', models.Trait.TEXT_TYPE, - "my_%s_text" % event_name), + "my_%s_text" % event_type), ('trait_B', models.Trait.INT_TYPE, base + 1), ('trait_C', models.Trait.FLOAT_TYPE, float(base) + 0.123456), ('trait_D', models.Trait.DATETIME_TYPE, now)]] event_models.append( - models.Event("id_%s" % event_name, - event_name, now, trait_models)) + models.Event("id_%s" % event_type, + event_type, now, trait_models)) base += 100 now = now + datetime.timedelta(hours=1) self.end = now @@ -2127,7 +2127,7 @@ class GetEventTest(EventTestBase): self.assertEqual(3, len(events)) start_time = None for i, name in enumerate(["Foo", "Bar", "Zoo"]): - self.assertEqual(events[i].event_name, name) + self.assertEqual(events[i].event_type, name) self.assertEqual(4, len(events[i].traits)) # Ensure sorted results ... if start_time is not None: @@ -2135,11 +2135,11 @@ class GetEventTest(EventTestBase): self.assertTrue(start_time < events[i].generated) start_time = events[i].generated - def test_simple_get_event_name(self): + def test_simple_get_event_type(self): event_filter = storage.EventFilter(self.start, self.end, "Bar") events = self.conn.get_events(event_filter) self.assertEqual(1, len(events)) - self.assertEqual(events[0].event_name, "Bar") + self.assertEqual(events[0].event_type, "Bar") self.assertEqual(4, len(events[0].traits)) def test_get_event_trait_filter(self): @@ -2148,7 +2148,7 @@ class GetEventTest(EventTestBase): traits=trait_filters) events = self.conn.get_events(event_filter) self.assertEqual(1, len(events)) - self.assertEqual(events[0].event_name, "Bar") + self.assertEqual(events[0].event_type, "Bar") self.assertEqual(4, len(events[0].traits)) def test_simple_get_no_traits(self): @@ -2159,5 +2159,5 @@ class GetEventTest(EventTestBase): self.assertEqual(0, len(bad_events)) self.assertEqual(1, len(events)) self.assertEqual(events[0].message_id, "id_notraits") - self.assertEqual(events[0].event_name, "NoTraits") + self.assertEqual(events[0].event_type, "NoTraits") self.assertEqual(0, len(events[0].traits))