dispatcher: split the database dispatcher
Change-Id: I6973049554b19d29a524886a217021a2e32efec9
This commit is contained in:
parent
b92a0315e0
commit
28127f2d01
@ -24,8 +24,7 @@ from ceilometer import storage
|
|||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class DatabaseDispatcher(dispatcher.MeterDispatcherBase,
|
class DatabaseDispatcher(object):
|
||||||
dispatcher.EventDispatcherBase):
|
|
||||||
"""Dispatcher class for recording metering data into database.
|
"""Dispatcher class for recording metering data into database.
|
||||||
|
|
||||||
The dispatcher class which records each meter into a database configured
|
The dispatcher class which records each meter into a database configured
|
||||||
@ -38,16 +37,13 @@ class DatabaseDispatcher(dispatcher.MeterDispatcherBase,
|
|||||||
meter_dispatchers = database
|
meter_dispatchers = database
|
||||||
event_dispatchers = database
|
event_dispatchers = database
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, conf):
|
def __init__(self, conf):
|
||||||
super(DatabaseDispatcher, self).__init__(conf)
|
self.conf = conf
|
||||||
|
self._conn = self._get_db_conn(conf, self.CONNECTION_TYPE, True)
|
||||||
|
|
||||||
self._meter_conn = self._get_db_conn('metering', True)
|
def _get_db_conn(self, conf, purpose, ignore_exception=False):
|
||||||
self._event_conn = self._get_db_conn('event', True)
|
|
||||||
|
|
||||||
def _get_db_conn(self, purpose, ignore_exception=False):
|
|
||||||
try:
|
try:
|
||||||
return storage.get_connection_from_config(self.conf, purpose)
|
return storage.get_connection_from_config(conf, purpose)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
params = {"purpose": purpose, "err": err}
|
params = {"purpose": purpose, "err": err}
|
||||||
LOG.exception(_LE("Failed to connect to db, purpose %(purpose)s "
|
LOG.exception(_LE("Failed to connect to db, purpose %(purpose)s "
|
||||||
@ -56,18 +52,19 @@ class DatabaseDispatcher(dispatcher.MeterDispatcherBase,
|
|||||||
raise
|
raise
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def meter_conn(self):
|
def conn(self):
|
||||||
if not self._meter_conn:
|
if not self._conn:
|
||||||
self._meter_conn = self._get_db_conn('metering')
|
self._conn = self._get_db_conn(self.conf, self.CONNECTION_TYPE)
|
||||||
|
return self._conn
|
||||||
|
|
||||||
return self._meter_conn
|
|
||||||
|
|
||||||
@property
|
class MeterDatabaseDispatcher(dispatcher.MeterDispatcherBase,
|
||||||
def event_conn(self):
|
DatabaseDispatcher):
|
||||||
if not self._event_conn:
|
CONNECTION_TYPE = 'metering'
|
||||||
self._event_conn = self._get_db_conn('event')
|
|
||||||
|
|
||||||
return self._event_conn
|
def __init__(self, conf):
|
||||||
|
DatabaseDispatcher.__init__(self, conf)
|
||||||
|
dispatcher.MeterDispatcherBase.__init__(self, conf)
|
||||||
|
|
||||||
def record_metering_data(self, data):
|
def record_metering_data(self, data):
|
||||||
# We may have receive only one counter on the wire
|
# We may have receive only one counter on the wire
|
||||||
@ -91,12 +88,21 @@ class DatabaseDispatcher(dispatcher.MeterDispatcherBase,
|
|||||||
ts = timeutils.parse_isotime(meter['timestamp'])
|
ts = timeutils.parse_isotime(meter['timestamp'])
|
||||||
meter['timestamp'] = timeutils.normalize_time(ts)
|
meter['timestamp'] = timeutils.normalize_time(ts)
|
||||||
try:
|
try:
|
||||||
self.meter_conn.record_metering_data_batch(data)
|
self.conn.record_metering_data_batch(data)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
LOG.error(_LE('Failed to record %(len)s: %(err)s.'),
|
LOG.error(_LE('Failed to record %(len)s: %(err)s.'),
|
||||||
{'len': len(data), 'err': err})
|
{'len': len(data), 'err': err})
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
class EventDatabaseDispatcher(dispatcher.EventDispatcherBase,
|
||||||
|
DatabaseDispatcher):
|
||||||
|
CONNECTION_TYPE = 'event'
|
||||||
|
|
||||||
|
def __init__(self, conf):
|
||||||
|
DatabaseDispatcher.__init__(self, conf)
|
||||||
|
dispatcher.EventDispatcherBase.__init__(self, conf)
|
||||||
|
|
||||||
def record_events(self, events):
|
def record_events(self, events):
|
||||||
if not isinstance(events, list):
|
if not isinstance(events, list):
|
||||||
events = [events]
|
events = [events]
|
||||||
@ -119,4 +125,4 @@ class DatabaseDispatcher(dispatcher.MeterDispatcherBase,
|
|||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception(_LE("Error processing event and it will be "
|
LOG.exception(_LE("Error processing event and it will be "
|
||||||
"dropped: %s"), ev)
|
"dropped: %s"), ev)
|
||||||
self.event_conn.record_events(event_list)
|
self.conn.record_events(event_list)
|
||||||
|
@ -30,7 +30,8 @@ class TestDispatcherDB(base.BaseTestCase):
|
|||||||
super(TestDispatcherDB, self).setUp()
|
super(TestDispatcherDB, self).setUp()
|
||||||
self.CONF = self.useFixture(fixture_config.Config()).conf
|
self.CONF = self.useFixture(fixture_config.Config()).conf
|
||||||
self.CONF.set_override('connection', 'sqlite://', group='database')
|
self.CONF.set_override('connection', 'sqlite://', group='database')
|
||||||
self.dispatcher = database.DatabaseDispatcher(self.CONF)
|
self.meter_dispatcher = database.MeterDatabaseDispatcher(self.CONF)
|
||||||
|
self.event_dispatcher = database.EventDatabaseDispatcher(self.CONF)
|
||||||
self.ctx = None
|
self.ctx = None
|
||||||
|
|
||||||
def test_event_conn(self):
|
def test_event_conn(self):
|
||||||
@ -39,9 +40,9 @@ class TestDispatcherDB(base.BaseTestCase):
|
|||||||
[], {})
|
[], {})
|
||||||
event = utils.message_from_event(event,
|
event = utils.message_from_event(event,
|
||||||
self.CONF.publisher.telemetry_secret)
|
self.CONF.publisher.telemetry_secret)
|
||||||
with mock.patch.object(self.dispatcher.event_conn,
|
with mock.patch.object(self.event_dispatcher._conn,
|
||||||
'record_events') as record_events:
|
'record_events') as record_events:
|
||||||
self.dispatcher.record_events(event)
|
self.event_dispatcher.record_events(event)
|
||||||
self.assertEqual(1, len(record_events.call_args_list[0][0][0]))
|
self.assertEqual(1, len(record_events.call_args_list[0][0][0]))
|
||||||
|
|
||||||
def test_valid_message(self):
|
def test_valid_message(self):
|
||||||
@ -53,9 +54,9 @@ class TestDispatcherDB(base.BaseTestCase):
|
|||||||
msg, self.CONF.publisher.telemetry_secret,
|
msg, self.CONF.publisher.telemetry_secret,
|
||||||
)
|
)
|
||||||
|
|
||||||
with mock.patch.object(self.dispatcher.meter_conn,
|
with mock.patch.object(self.meter_dispatcher._conn,
|
||||||
'record_metering_data') as record_metering_data:
|
'record_metering_data') as record_metering_data:
|
||||||
self.dispatcher.record_metering_data(msg)
|
self.meter_dispatcher.record_metering_data(msg)
|
||||||
|
|
||||||
record_metering_data.assert_called_once_with(msg)
|
record_metering_data.assert_called_once_with(msg)
|
||||||
|
|
||||||
@ -72,9 +73,9 @@ class TestDispatcherDB(base.BaseTestCase):
|
|||||||
expected = msg.copy()
|
expected = msg.copy()
|
||||||
expected['timestamp'] = datetime.datetime(2012, 7, 2, 13, 53, 40)
|
expected['timestamp'] = datetime.datetime(2012, 7, 2, 13, 53, 40)
|
||||||
|
|
||||||
with mock.patch.object(self.dispatcher.meter_conn,
|
with mock.patch.object(self.meter_dispatcher._conn,
|
||||||
'record_metering_data') as record_metering_data:
|
'record_metering_data') as record_metering_data:
|
||||||
self.dispatcher.record_metering_data(msg)
|
self.meter_dispatcher.record_metering_data(msg)
|
||||||
|
|
||||||
record_metering_data.assert_called_once_with(expected)
|
record_metering_data.assert_called_once_with(expected)
|
||||||
|
|
||||||
@ -92,8 +93,8 @@ class TestDispatcherDB(base.BaseTestCase):
|
|||||||
expected['timestamp'] = datetime.datetime(2012, 9, 30, 23,
|
expected['timestamp'] = datetime.datetime(2012, 9, 30, 23,
|
||||||
31, 50, 262000)
|
31, 50, 262000)
|
||||||
|
|
||||||
with mock.patch.object(self.dispatcher.meter_conn,
|
with mock.patch.object(self.meter_dispatcher._conn,
|
||||||
'record_metering_data') as record_metering_data:
|
'record_metering_data') as record_metering_data:
|
||||||
self.dispatcher.record_metering_data(msg)
|
self.meter_dispatcher.record_metering_data(msg)
|
||||||
|
|
||||||
record_metering_data.assert_called_once_with(expected)
|
record_metering_data.assert_called_once_with(expected)
|
||||||
|
@ -19,16 +19,12 @@ from ceilometer import dispatcher
|
|||||||
from ceilometer.tests import base
|
from ceilometer.tests import base
|
||||||
|
|
||||||
|
|
||||||
class FakeDispatcherSample(dispatcher.MeterDispatcherBase):
|
class FakeMeterDispatcher(dispatcher.MeterDispatcherBase):
|
||||||
def record_metering_data(self, data):
|
def record_metering_data(self, data):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class FakeDispatcher(dispatcher.MeterDispatcherBase,
|
class FakeEventDispatcher(dispatcher.EventDispatcherBase):
|
||||||
dispatcher.EventDispatcherBase):
|
|
||||||
def record_metering_data(self, data):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def record_events(self, events):
|
def record_events(self, events):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@ -41,10 +37,13 @@ class TestDispatchManager(base.BaseTestCase):
|
|||||||
event_dispatchers=['database'])
|
event_dispatchers=['database'])
|
||||||
self.useFixture(mockpatch.Patch(
|
self.useFixture(mockpatch.Patch(
|
||||||
'ceilometer.dispatcher.gnocchi.GnocchiDispatcher',
|
'ceilometer.dispatcher.gnocchi.GnocchiDispatcher',
|
||||||
new=FakeDispatcherSample))
|
new=FakeMeterDispatcher))
|
||||||
self.useFixture(mockpatch.Patch(
|
self.useFixture(mockpatch.Patch(
|
||||||
'ceilometer.dispatcher.database.DatabaseDispatcher',
|
'ceilometer.dispatcher.database.MeterDatabaseDispatcher',
|
||||||
new=FakeDispatcher))
|
new=FakeMeterDispatcher))
|
||||||
|
self.useFixture(mockpatch.Patch(
|
||||||
|
'ceilometer.dispatcher.database.EventDatabaseDispatcher',
|
||||||
|
new=FakeEventDispatcher))
|
||||||
|
|
||||||
def test_load(self):
|
def test_load(self):
|
||||||
sample_mg, event_mg = dispatcher.load_dispatcher_manager()
|
sample_mg, event_mg = dispatcher.load_dispatcher_manager()
|
||||||
|
@ -37,7 +37,7 @@ class TestEventDispatcherVerifier(base.BaseTestCase):
|
|||||||
'ceilometer.publisher.utils',
|
'ceilometer.publisher.utils',
|
||||||
'publisher')
|
'publisher')
|
||||||
self.useFixture(mockpatch.Patch(
|
self.useFixture(mockpatch.Patch(
|
||||||
'ceilometer.dispatcher.database.DatabaseDispatcher',
|
'ceilometer.dispatcher.database.EventDatabaseDispatcher',
|
||||||
new=FakeDispatcher))
|
new=FakeDispatcher))
|
||||||
|
|
||||||
@mock.patch('ceilometer.publisher.utils.verify_signature')
|
@mock.patch('ceilometer.publisher.utils.verify_signature')
|
||||||
@ -46,7 +46,7 @@ class TestEventDispatcherVerifier(base.BaseTestCase):
|
|||||||
return ev.get('message_signature') != 'bad_signature'
|
return ev.get('message_signature') != 'bad_signature'
|
||||||
mocked_verify.side_effect = _fake_verify
|
mocked_verify.side_effect = _fake_verify
|
||||||
sample = {"payload": [{"message_signature": "bad_signature"}]}
|
sample = {"payload": [{"message_signature": "bad_signature"}]}
|
||||||
manager = dispatcher.load_dispatcher_manager()[0]
|
manager = dispatcher.load_dispatcher_manager()[1]
|
||||||
v = collector.EventEndpoint("secret", manager)
|
v = collector.EventEndpoint("secret", manager)
|
||||||
v.sample([sample])
|
v.sample([sample])
|
||||||
self.assertEqual([], manager['database'].obj.events)
|
self.assertEqual([], manager['database'].obj.events)
|
||||||
|
@ -262,13 +262,13 @@ console_scripts =
|
|||||||
ceilometer-collector = ceilometer.cmd.collector:main
|
ceilometer-collector = ceilometer.cmd.collector:main
|
||||||
|
|
||||||
ceilometer.dispatcher.meter =
|
ceilometer.dispatcher.meter =
|
||||||
database = ceilometer.dispatcher.database:DatabaseDispatcher
|
database = ceilometer.dispatcher.database:MeterDatabaseDispatcher
|
||||||
file = ceilometer.dispatcher.file:FileDispatcher
|
file = ceilometer.dispatcher.file:FileDispatcher
|
||||||
http = ceilometer.dispatcher.http:HttpDispatcher
|
http = ceilometer.dispatcher.http:HttpDispatcher
|
||||||
gnocchi = ceilometer.dispatcher.gnocchi:GnocchiDispatcher
|
gnocchi = ceilometer.dispatcher.gnocchi:GnocchiDispatcher
|
||||||
|
|
||||||
ceilometer.dispatcher.event =
|
ceilometer.dispatcher.event =
|
||||||
database = ceilometer.dispatcher.database:DatabaseDispatcher
|
database = ceilometer.dispatcher.database:EventDatabaseDispatcher
|
||||||
file = ceilometer.dispatcher.file:FileDispatcher
|
file = ceilometer.dispatcher.file:FileDispatcher
|
||||||
http = ceilometer.dispatcher.http:HttpDispatcher
|
http = ceilometer.dispatcher.http:HttpDispatcher
|
||||||
gnocchi = ceilometer.dispatcher.gnocchi:GnocchiDispatcher
|
gnocchi = ceilometer.dispatcher.gnocchi:GnocchiDispatcher
|
||||||
|
Loading…
x
Reference in New Issue
Block a user