Merge "add mandatory limit value to event list"
This commit is contained in:
commit
e5562b7e73
ceilometer
api/controllers/v2
event/storage
tests/api/v2
@ -238,13 +238,15 @@ class EventsController(rest.RestController):
|
||||
"""Works on Events."""
|
||||
|
||||
@v2_utils.requires_admin
|
||||
@wsme_pecan.wsexpose([Event], [EventQuery])
|
||||
def get_all(self, q=None):
|
||||
@wsme_pecan.wsexpose([Event], [EventQuery], int)
|
||||
def get_all(self, q=None, limit=None):
|
||||
"""Return all events matching the query filters.
|
||||
|
||||
:param q: Filter arguments for which Events to return
|
||||
:param limit: Maximum number of samples to be returned.
|
||||
"""
|
||||
q = q or []
|
||||
limit = v2_utils.enforce_limit(limit)
|
||||
event_filter = _event_query_to_event_filter(q)
|
||||
return [Event(message_id=event.message_id,
|
||||
event_type=event.event_type,
|
||||
@ -252,7 +254,8 @@ class EventsController(rest.RestController):
|
||||
traits=event.traits,
|
||||
raw=event.raw)
|
||||
for event in
|
||||
pecan.request.event_storage_conn.get_events(event_filter)]
|
||||
pecan.request.event_storage_conn.get_events(event_filter,
|
||||
limit)]
|
||||
|
||||
@v2_utils.requires_admin
|
||||
@wsme_pecan.wsexpose(Event, wtypes.text)
|
||||
|
@ -45,7 +45,7 @@ class Connection(object):
|
||||
raise ceilometer.NotImplementedError('Events not implemented.')
|
||||
|
||||
@staticmethod
|
||||
def get_events(event_filter):
|
||||
def get_events(event_filter, limit=None):
|
||||
"""Return an iterable of model.Event objects."""
|
||||
raise ceilometer.NotImplementedError('Events not implemented.')
|
||||
|
||||
|
@ -172,11 +172,15 @@ class Connection(base.Connection):
|
||||
{'filter': {'bool': {'must': filters}}}}}
|
||||
return q_args
|
||||
|
||||
def get_events(self, event_filter):
|
||||
def get_events(self, event_filter, limit=None):
|
||||
if limit == 0:
|
||||
return
|
||||
iclient = es.client.IndicesClient(self.conn)
|
||||
indices = iclient.get_mapping('%s_*' % self.index_name).keys()
|
||||
if indices:
|
||||
filter_args = self._make_dsl_from_filter(indices, event_filter)
|
||||
if limit is not None:
|
||||
filter_args['size'] = limit
|
||||
results = self.conn.search(fields=['_id', 'timestamp',
|
||||
'_type', '_source'],
|
||||
sort='timestamp:asc',
|
||||
|
@ -123,18 +123,21 @@ class Connection(hbase_base.Connection, base.Connection):
|
||||
if error:
|
||||
raise error
|
||||
|
||||
def get_events(self, event_filter):
|
||||
def get_events(self, event_filter, limit=None):
|
||||
"""Return an iter of models.Event objects.
|
||||
|
||||
:param event_filter: storage.EventFilter object, consists of filters
|
||||
for events that are stored in database.
|
||||
"""
|
||||
if limit == 0:
|
||||
return
|
||||
q, start, stop = hbase_utils.make_events_query_from_filter(
|
||||
event_filter)
|
||||
with self.conn_pool.connection() as conn:
|
||||
events_table = conn.table(self.EVENT_TABLE)
|
||||
|
||||
gen = events_table.scan(filter=q, row_start=start, row_stop=stop)
|
||||
gen = events_table.scan(filter=q, row_start=start, row_stop=stop,
|
||||
limit=limit)
|
||||
|
||||
for event_id, data in gen:
|
||||
traits = []
|
||||
|
@ -203,12 +203,13 @@ class Connection(base.Connection):
|
||||
if error:
|
||||
raise error
|
||||
|
||||
def get_events(self, event_filter):
|
||||
def get_events(self, event_filter, limit=None):
|
||||
"""Return an iterable of model.Event objects.
|
||||
|
||||
:param event_filter: EventFilter instance
|
||||
"""
|
||||
|
||||
if limit == 0:
|
||||
return
|
||||
session = self._engine_facade.get_session()
|
||||
with session.begin():
|
||||
# Build up the join conditions
|
||||
@ -260,46 +261,49 @@ class Connection(base.Connection):
|
||||
if event_filter_conditions:
|
||||
query = query.filter(sa.and_(*event_filter_conditions))
|
||||
|
||||
query = query.order_by(models.Event.generated).limit(limit)
|
||||
event_list = {}
|
||||
# get a list of all events that match filters
|
||||
for (id_, generated, message_id,
|
||||
desc, raw) in query.add_columns(
|
||||
models.Event.generated, models.Event.message_id,
|
||||
models.EventType.desc, models.Event.raw).order_by(
|
||||
models.Event.generated).all():
|
||||
models.EventType.desc, models.Event.raw).all():
|
||||
event_list[id_] = api_models.Event(message_id, desc,
|
||||
generated, [], raw)
|
||||
# Query all traits related to events.
|
||||
# NOTE (gordc): cast is done because pgsql defaults to TEXT when
|
||||
# handling unknown values such as null.
|
||||
trait_q = (
|
||||
query.join(
|
||||
models.TraitDatetime,
|
||||
models.TraitDatetime.event_id == models.Event.id)
|
||||
.add_columns(
|
||||
session.query(
|
||||
models.TraitDatetime.event_id,
|
||||
models.TraitDatetime.key, models.TraitDatetime.value,
|
||||
sa.cast(sa.null(), sa.Integer),
|
||||
sa.cast(sa.null(), sa.Float(53)),
|
||||
sa.cast(sa.null(), sa.String(255)))
|
||||
.filter(sa.exists().where(
|
||||
models.TraitDatetime.event_id == query.subquery().c.id))
|
||||
).union(
|
||||
query.join(
|
||||
models.TraitInt,
|
||||
models.TraitInt.event_id == models.Event.id)
|
||||
.add_columns(models.TraitInt.key, sa.null(),
|
||||
models.TraitInt.value, sa.null(), sa.null()),
|
||||
query.join(
|
||||
models.TraitFloat,
|
||||
models.TraitFloat.event_id == models.Event.id)
|
||||
.add_columns(models.TraitFloat.key, sa.null(),
|
||||
sa.null(), models.TraitFloat.value, sa.null()),
|
||||
query.join(
|
||||
models.TraitText,
|
||||
models.TraitText.event_id == models.Event.id)
|
||||
.add_columns(models.TraitText.key, sa.null(),
|
||||
sa.null(), sa.null(), models.TraitText.value))
|
||||
session.query(
|
||||
models.TraitInt.event_id,
|
||||
models.TraitInt.key, sa.null(),
|
||||
models.TraitInt.value, sa.null(), sa.null())
|
||||
.filter(sa.exists().where(
|
||||
models.TraitInt.event_id == query.subquery().c.id)),
|
||||
session.query(
|
||||
models.TraitFloat.event_id,
|
||||
models.TraitFloat.key, sa.null(), sa.null(),
|
||||
models.TraitFloat.value, sa.null())
|
||||
.filter(sa.exists().where(
|
||||
models.TraitFloat.event_id == query.subquery().c.id)),
|
||||
session.query(
|
||||
models.TraitText.event_id,
|
||||
models.TraitText.key, sa.null(), sa.null(), sa.null(),
|
||||
models.TraitText.value)
|
||||
.filter(sa.exists().where(
|
||||
models.TraitText.event_id == query.subquery().c.id)))
|
||||
|
||||
for id_, key, t_date, t_int, t_float, t_text in (
|
||||
trait_q.order_by('2')).all():
|
||||
trait_q.order_by(models.TraitDatetime.key)).all():
|
||||
if t_int is not None:
|
||||
dtype = api_models.Trait.INT_TYPE
|
||||
val = t_int
|
||||
|
@ -71,14 +71,21 @@ class Connection(base.Connection):
|
||||
if error:
|
||||
raise error
|
||||
|
||||
def get_events(self, event_filter):
|
||||
def get_events(self, event_filter, limit=None):
|
||||
"""Return an iter of models.Event objects.
|
||||
|
||||
:param event_filter: storage.EventFilter object, consists of filters
|
||||
for events that are stored in database.
|
||||
:param limit: Maximum number of results to return.
|
||||
"""
|
||||
if limit == 0:
|
||||
return
|
||||
q = pymongo_utils.make_events_query_from_filter(event_filter)
|
||||
for event in self.db.event.find(q):
|
||||
if limit is not None:
|
||||
results = self.db.event.find(q, limit=limit)
|
||||
else:
|
||||
results = self.db.event.find(q)
|
||||
for event in results:
|
||||
traits = []
|
||||
for trait in event['traits']:
|
||||
traits.append(models.Trait(name=trait['trait_name'],
|
||||
|
@ -15,6 +15,7 @@
|
||||
"""Test event, event_type and trait retrieval."""
|
||||
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
import webtest.app
|
||||
|
||||
@ -450,3 +451,57 @@ class TestEventAPI(EventTestBase):
|
||||
'value': '1',
|
||||
'type': 'integer',
|
||||
'op': 'el'}])
|
||||
|
||||
|
||||
class EventRestrictionTestBase(v2.FunctionalTest,
|
||||
tests_db.MixinTestsWithBackendScenarios):
|
||||
|
||||
def setUp(self):
|
||||
super(EventRestrictionTestBase, self).setUp()
|
||||
self.CONF.set_override('default_api_return_limit', 10, group='api')
|
||||
self._generate_models()
|
||||
|
||||
def _generate_models(self):
|
||||
event_models = []
|
||||
base = 0
|
||||
self.s_time = datetime.datetime(2013, 12, 31, 5, 0)
|
||||
self.trait_time = datetime.datetime(2013, 12, 31, 5, 0)
|
||||
for i in range(20):
|
||||
trait_models = [models.Trait(name, type, value)
|
||||
for name, type, value in [
|
||||
('trait_A', models.Trait.TEXT_TYPE,
|
||||
"my_text"),
|
||||
('trait_B', models.Trait.INT_TYPE,
|
||||
base + 1),
|
||||
('trait_C', models.Trait.FLOAT_TYPE,
|
||||
float(base) + 0.123456),
|
||||
('trait_D', models.Trait.DATETIME_TYPE,
|
||||
self.trait_time)]]
|
||||
|
||||
event_models.append(
|
||||
models.Event(message_id=str(uuid.uuid4()),
|
||||
event_type='foo.bar',
|
||||
generated=self.trait_time,
|
||||
traits=trait_models,
|
||||
raw={'status': {'nested': 'started'}}))
|
||||
self.trait_time += datetime.timedelta(seconds=1)
|
||||
self.event_conn.record_events(event_models)
|
||||
|
||||
|
||||
class TestEventRestriction(EventRestrictionTestBase):
|
||||
|
||||
def test_get_limit(self):
|
||||
data = self.get_json('/events?limit=1', headers=headers)
|
||||
self.assertEqual(1, len(data))
|
||||
|
||||
def test_get_limit_negative(self):
|
||||
self.assertRaises(webtest.app.AppError,
|
||||
self.get_json, '/events?limit=-2', headers=headers)
|
||||
|
||||
def test_get_limit_bigger(self):
|
||||
data = self.get_json('/events?limit=100', headers=headers)
|
||||
self.assertEqual(20, len(data))
|
||||
|
||||
def test_get_default_limit(self):
|
||||
data = self.get_json('/events', headers=headers)
|
||||
self.assertEqual(10, len(data))
|
||||
|
Loading…
x
Reference in New Issue
Block a user