cleanup problem events logic in event db storage
when an error occurs in event storage, we capture the errors and related events but do nothing with it. this patch logs corresponding errors and raises it if the error is unknown so that dispatcher can requeue payload. Change-Id: Id2493b8073646cad9dece626937c8e8af96e1e43
This commit is contained in:
parent
72977d4e2b
commit
4fd6ab0516
@ -22,8 +22,13 @@ import six
|
|||||||
|
|
||||||
from ceilometer.event.storage import base
|
from ceilometer.event.storage import base
|
||||||
from ceilometer.event.storage import models
|
from ceilometer.event.storage import models
|
||||||
|
from ceilometer.i18n import _LE, _LI
|
||||||
|
from ceilometer.openstack.common import log
|
||||||
|
from ceilometer import storage
|
||||||
from ceilometer import utils
|
from ceilometer import utils
|
||||||
|
|
||||||
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
AVAILABLE_CAPABILITIES = {
|
AVAILABLE_CAPABILITIES = {
|
||||||
'events': {'query': {'simple': True}},
|
'events': {'query': {'simple': True}},
|
||||||
@ -93,23 +98,24 @@ class Connection(base.Connection):
|
|||||||
'traits': traits,
|
'traits': traits,
|
||||||
'raw': ev.raw}}
|
'raw': ev.raw}}
|
||||||
|
|
||||||
problem_events = []
|
error = None
|
||||||
for ok, result in helpers.streaming_bulk(
|
for ok, result in helpers.streaming_bulk(
|
||||||
self.conn, _build_bulk_index(events)):
|
self.conn, _build_bulk_index(events)):
|
||||||
if not ok:
|
if not ok:
|
||||||
__, result = result.popitem()
|
__, result = result.popitem()
|
||||||
if result['status'] == 409:
|
if result['status'] == 409:
|
||||||
problem_events.append((models.Event.DUPLICATE,
|
LOG.info(_LI('Duplicate event detected, skipping it: %s')
|
||||||
result['_id']))
|
% result)
|
||||||
else:
|
else:
|
||||||
problem_events.append((models.Event.UNKNOWN_PROBLEM,
|
LOG.exception(_LE('Failed to record event: %s') % result)
|
||||||
result['_id']))
|
error = storage.StorageUnknownWriteError(result)
|
||||||
|
|
||||||
if self._refresh_on_write:
|
if self._refresh_on_write:
|
||||||
self.conn.indices.refresh(index='%s_*' % self.index_name)
|
self.conn.indices.refresh(index='%s_*' % self.index_name)
|
||||||
while self.conn.cluster.pending_tasks(local=True)['tasks']:
|
while self.conn.cluster.pending_tasks(local=True)['tasks']:
|
||||||
pass
|
pass
|
||||||
return problem_events
|
if error:
|
||||||
|
raise error
|
||||||
|
|
||||||
def _make_dsl_from_filter(self, indices, ev_filter):
|
def _make_dsl_from_filter(self, indices, ev_filter):
|
||||||
q_args = {}
|
q_args = {}
|
||||||
|
@ -15,7 +15,7 @@ import operator
|
|||||||
|
|
||||||
from ceilometer.event.storage import base
|
from ceilometer.event.storage import base
|
||||||
from ceilometer.event.storage import models
|
from ceilometer.event.storage import models
|
||||||
from ceilometer.i18n import _
|
from ceilometer.i18n import _, _LE
|
||||||
from ceilometer.openstack.common import log
|
from ceilometer.openstack.common import log
|
||||||
from ceilometer.storage.hbase import base as hbase_base
|
from ceilometer.storage.hbase import base as hbase_base
|
||||||
from ceilometer.storage.hbase import utils as hbase_utils
|
from ceilometer.storage.hbase import utils as hbase_utils
|
||||||
@ -92,12 +92,8 @@ class Connection(hbase_base.Connection, base.Connection):
|
|||||||
"""Write the events to Hbase.
|
"""Write the events to Hbase.
|
||||||
|
|
||||||
:param event_models: a list of models.Event objects.
|
:param event_models: a list of models.Event objects.
|
||||||
:return problem_events: a list of events that could not be saved in a
|
|
||||||
(reason, event) tuple. From the reasons that are enumerated in
|
|
||||||
storage.models.Event only the UNKNOWN_PROBLEM is applicable here.
|
|
||||||
"""
|
"""
|
||||||
problem_events = []
|
error = None
|
||||||
|
|
||||||
with self.conn_pool.connection() as conn:
|
with self.conn_pool.connection() as conn:
|
||||||
events_table = conn.table(self.EVENT_TABLE)
|
events_table = conn.table(self.EVENT_TABLE)
|
||||||
for event_model in event_models:
|
for event_model in event_models:
|
||||||
@ -121,10 +117,10 @@ class Connection(hbase_base.Connection, base.Connection):
|
|||||||
try:
|
try:
|
||||||
events_table.put(row, record)
|
events_table.put(row, record)
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
LOG.debug(_("Failed to record event: %s") % ex)
|
LOG.exception(_LE("Failed to record event: %s") % ex)
|
||||||
problem_events.append((models.Event.UNKNOWN_PROBLEM,
|
error = ex
|
||||||
event_model))
|
if error:
|
||||||
return problem_events
|
raise error
|
||||||
|
|
||||||
def get_events(self, event_filter):
|
def get_events(self, event_filter):
|
||||||
"""Return an iter of models.Event objects.
|
"""Return an iter of models.Event objects.
|
||||||
|
@ -25,7 +25,7 @@ import sqlalchemy as sa
|
|||||||
|
|
||||||
from ceilometer.event.storage import base
|
from ceilometer.event.storage import base
|
||||||
from ceilometer.event.storage import models as api_models
|
from ceilometer.event.storage import models as api_models
|
||||||
from ceilometer.i18n import _, _LI
|
from ceilometer.i18n import _LE, _LI
|
||||||
from ceilometer.openstack.common import log
|
from ceilometer.openstack.common import log
|
||||||
from ceilometer.storage.sqlalchemy import models
|
from ceilometer.storage.sqlalchemy import models
|
||||||
from ceilometer import utils
|
from ceilometer import utils
|
||||||
@ -166,16 +166,9 @@ class Connection(base.Connection):
|
|||||||
"""Write the events to SQL database via sqlalchemy.
|
"""Write the events to SQL database via sqlalchemy.
|
||||||
|
|
||||||
:param event_models: a list of model.Event objects.
|
:param event_models: a list of model.Event objects.
|
||||||
|
|
||||||
Returns a list of events that could not be saved in a
|
|
||||||
(reason, event) tuple. Reasons are enumerated in
|
|
||||||
storage.model.Event
|
|
||||||
|
|
||||||
Flush when they're all added, unless new EventTypes or
|
|
||||||
TraitTypes are added along the way.
|
|
||||||
"""
|
"""
|
||||||
session = self._engine_facade.get_session()
|
session = self._engine_facade.get_session()
|
||||||
problem_events = []
|
error = None
|
||||||
for event_model in event_models:
|
for event_model in event_models:
|
||||||
event = None
|
event = None
|
||||||
try:
|
try:
|
||||||
@ -202,18 +195,14 @@ class Connection(base.Connection):
|
|||||||
session.execute(model.__table__.insert(),
|
session.execute(model.__table__.insert(),
|
||||||
trait_map[dtype])
|
trait_map[dtype])
|
||||||
except dbexc.DBDuplicateEntry as e:
|
except dbexc.DBDuplicateEntry as e:
|
||||||
LOG.exception(_("Failed to record duplicated event: %s") % e)
|
LOG.info(_LI("Duplicate event detected, skipping it: %s") % e)
|
||||||
problem_events.append((api_models.Event.DUPLICATE,
|
|
||||||
event_model))
|
|
||||||
except KeyError as e:
|
except KeyError as e:
|
||||||
LOG.exception(_('Failed to record event: %s') % e)
|
LOG.exception(_LE('Failed to record event: %s') % e)
|
||||||
problem_events.append((api_models.Event.INCOMPATIBLE_TRAIT,
|
|
||||||
event_model))
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.exception(_('Failed to record event: %s') % e)
|
LOG.exception(_LE('Failed to record event: %s') % e)
|
||||||
problem_events.append((api_models.Event.UNKNOWN_PROBLEM,
|
error = e
|
||||||
event_model))
|
if error:
|
||||||
return problem_events
|
raise error
|
||||||
|
|
||||||
def get_events(self, event_filter):
|
def get_events(self, event_filter):
|
||||||
"""Return an iterable of model.Event objects.
|
"""Return an iterable of model.Event objects.
|
||||||
|
@ -16,7 +16,7 @@ import pymongo
|
|||||||
|
|
||||||
from ceilometer.event.storage import base
|
from ceilometer.event.storage import base
|
||||||
from ceilometer.event.storage import models
|
from ceilometer.event.storage import models
|
||||||
from ceilometer.i18n import _
|
from ceilometer.i18n import _LE, _LI
|
||||||
from ceilometer.openstack.common import log
|
from ceilometer.openstack.common import log
|
||||||
from ceilometer.storage.mongo import utils as pymongo_utils
|
from ceilometer.storage.mongo import utils as pymongo_utils
|
||||||
from ceilometer import utils
|
from ceilometer import utils
|
||||||
@ -47,14 +47,9 @@ class Connection(base.Connection):
|
|||||||
def record_events(self, event_models):
|
def record_events(self, event_models):
|
||||||
"""Write the events to database.
|
"""Write the events to database.
|
||||||
|
|
||||||
Return a list of events of type models.Event.DUPLICATE in case of
|
|
||||||
trying to write an already existing event to the database, or
|
|
||||||
models.Event.UNKONW_PROBLEM in case of any failures with recording the
|
|
||||||
event in the database.
|
|
||||||
|
|
||||||
:param event_models: a list of models.Event objects.
|
:param event_models: a list of models.Event objects.
|
||||||
"""
|
"""
|
||||||
problem_events = []
|
error = None
|
||||||
for event_model in event_models:
|
for event_model in event_models:
|
||||||
traits = []
|
traits = []
|
||||||
if event_model.traits:
|
if event_model.traits:
|
||||||
@ -69,14 +64,12 @@ class Connection(base.Connection):
|
|||||||
'timestamp': event_model.generated,
|
'timestamp': event_model.generated,
|
||||||
'traits': traits, 'raw': event_model.raw})
|
'traits': traits, 'raw': event_model.raw})
|
||||||
except pymongo.errors.DuplicateKeyError as ex:
|
except pymongo.errors.DuplicateKeyError as ex:
|
||||||
LOG.exception(_("Failed to record duplicated event: %s") % ex)
|
LOG.info(_LI("Duplicate event detected, skipping it: %s") % ex)
|
||||||
problem_events.append((models.Event.DUPLICATE,
|
|
||||||
event_model))
|
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
LOG.exception(_("Failed to record event: %s") % ex)
|
LOG.exception(_LE("Failed to record event: %s") % ex)
|
||||||
problem_events.append((models.Event.UNKNOWN_PROBLEM,
|
error = ex
|
||||||
event_model))
|
if error:
|
||||||
return problem_events
|
raise error
|
||||||
|
|
||||||
def get_events(self, event_filter):
|
def get_events(self, event_filter):
|
||||||
"""Return an iter of models.Event objects.
|
"""Return an iter of models.Event objects.
|
||||||
|
@ -93,6 +93,10 @@ cfg.CONF.register_cli_opts(CLI_OPTS)
|
|||||||
db_options.set_defaults(cfg.CONF)
|
db_options.set_defaults(cfg.CONF)
|
||||||
|
|
||||||
|
|
||||||
|
class StorageUnknownWriteError(Exception):
|
||||||
|
"""Error raised when an unknown error occurs while recording."""
|
||||||
|
|
||||||
|
|
||||||
class StorageBadVersion(Exception):
|
class StorageBadVersion(Exception):
|
||||||
"""Error raised when the storage backend version is not good enough."""
|
"""Error raised when the storage backend version is not good enough."""
|
||||||
|
|
||||||
|
@ -65,10 +65,6 @@ class EventTypeTest(tests_db.TestBase):
|
|||||||
self.assertTrue(repr.repr(et2))
|
self.assertTrue(repr.repr(et2))
|
||||||
|
|
||||||
|
|
||||||
class MyException(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
@tests_db.run_with('sqlite', 'mysql', 'pgsql')
|
@tests_db.run_with('sqlite', 'mysql', 'pgsql')
|
||||||
class EventTest(tests_db.TestBase):
|
class EventTest(tests_db.TestBase):
|
||||||
def _verify_data(self, trait, trait_table):
|
def _verify_data(self, trait, trait_table):
|
||||||
@ -101,19 +97,6 @@ class EventTest(tests_db.TestBase):
|
|||||||
model = models.Trait("Foo", models.Trait.DATETIME_TYPE, now)
|
model = models.Trait("Foo", models.Trait.DATETIME_TYPE, now)
|
||||||
self._verify_data(model, sql_models.TraitDatetime)
|
self._verify_data(model, sql_models.TraitDatetime)
|
||||||
|
|
||||||
def test_bad_event(self):
|
|
||||||
now = datetime.datetime.utcnow()
|
|
||||||
m = [models.Event("1", "Foo", now, [], {}),
|
|
||||||
models.Event("2", "Zoo", now, [], {})]
|
|
||||||
|
|
||||||
with mock.patch.object(self.event_conn,
|
|
||||||
"_get_or_create_event_type") as mock_save:
|
|
||||||
mock_save.side_effect = MyException("Boom")
|
|
||||||
problem_events = self.event_conn.record_events(m)
|
|
||||||
self.assertEqual(2, len(problem_events))
|
|
||||||
for bad, event in problem_events:
|
|
||||||
self.assertEqual(bad, models.Event.UNKNOWN_PROBLEM)
|
|
||||||
|
|
||||||
def test_event_repr(self):
|
def test_event_repr(self):
|
||||||
ev = sql_models.Event('msg_id', None, False, {})
|
ev = sql_models.Event('msg_id', None, False, {})
|
||||||
ev.id = 100
|
ev.id = 100
|
||||||
|
@ -3325,10 +3325,22 @@ class EventTest(EventTestBase):
|
|||||||
now = datetime.datetime.utcnow()
|
now = datetime.datetime.utcnow()
|
||||||
m = [event_models.Event("1", "Foo", now, None, {}),
|
m = [event_models.Event("1", "Foo", now, None, {}),
|
||||||
event_models.Event("1", "Zoo", now, [], {})]
|
event_models.Event("1", "Zoo", now, [], {})]
|
||||||
problem_events = self.event_conn.record_events(m)
|
with mock.patch('%s.LOG' %
|
||||||
self.assertEqual(1, len(problem_events))
|
self.event_conn.record_events.__module__) as log:
|
||||||
bad = problem_events[0]
|
self.event_conn.record_events(m)
|
||||||
self.assertEqual(event_models.Event.DUPLICATE, bad[0])
|
self.assertEqual(1, log.info.call_count)
|
||||||
|
|
||||||
|
def test_bad_event(self):
|
||||||
|
now = datetime.datetime.utcnow()
|
||||||
|
broken_event = event_models.Event("1", "Foo", now, None, {})
|
||||||
|
del(broken_event.__dict__['raw'])
|
||||||
|
m = [broken_event, broken_event]
|
||||||
|
with mock.patch('%s.LOG' %
|
||||||
|
self.event_conn.record_events.__module__) as log:
|
||||||
|
self.assertRaises(AttributeError, self.event_conn.record_events, m)
|
||||||
|
# ensure that record_events does not break on first error but
|
||||||
|
# delays exception and tries to record each event.
|
||||||
|
self.assertEqual(2, log.exception.call_count)
|
||||||
|
|
||||||
|
|
||||||
class GetEventTest(EventTestBase):
|
class GetEventTest(EventTestBase):
|
||||||
@ -3634,10 +3646,9 @@ class GetEventTest(EventTestBase):
|
|||||||
def test_simple_get_event_no_traits(self):
|
def test_simple_get_event_no_traits(self):
|
||||||
new_events = [event_models.Event("id_notraits", "NoTraits",
|
new_events = [event_models.Event("id_notraits", "NoTraits",
|
||||||
self.start, [], {})]
|
self.start, [], {})]
|
||||||
bad_events = self.event_conn.record_events(new_events)
|
self.event_conn.record_events(new_events)
|
||||||
event_filter = storage.EventFilter(self.start, self.end, "NoTraits")
|
event_filter = storage.EventFilter(self.start, self.end, "NoTraits")
|
||||||
events = [event for event in self.event_conn.get_events(event_filter)]
|
events = [event for event in self.event_conn.get_events(event_filter)]
|
||||||
self.assertEqual(0, len(bad_events))
|
|
||||||
self.assertEqual(1, len(events))
|
self.assertEqual(1, len(events))
|
||||||
self.assertEqual("id_notraits", events[0].message_id)
|
self.assertEqual("id_notraits", events[0].message_id)
|
||||||
self.assertEqual("NoTraits", events[0].event_type)
|
self.assertEqual("NoTraits", events[0].event_type)
|
||||||
@ -3654,10 +3665,9 @@ class GetEventTest(EventTestBase):
|
|||||||
self.start,
|
self.start,
|
||||||
[], {})]
|
[], {})]
|
||||||
|
|
||||||
bad_events = self.event_conn.record_events(new_events)
|
self.event_conn.record_events(new_events)
|
||||||
event_filter = storage.EventFilter(message_id="id_testid")
|
event_filter = storage.EventFilter(message_id="id_testid")
|
||||||
events = [event for event in self.event_conn.get_events(event_filter)]
|
events = [event for event in self.event_conn.get_events(event_filter)]
|
||||||
self.assertEqual(0, len(bad_events))
|
|
||||||
self.assertEqual(1, len(events))
|
self.assertEqual(1, len(events))
|
||||||
event = events[0]
|
event = events[0]
|
||||||
self.assertEqual("id_testid", event.message_id)
|
self.assertEqual("id_testid", event.message_id)
|
||||||
|
Loading…
Reference in New Issue
Block a user