Implements metadata query for complex query feature
A subfield in the resource_metadata field of the Sample can be used in filter expression as metadata.<subfield_name> Validation of the names of the metadata is not implemented Implements: blueprint complex-filter-expressions-in-api-queries Change-Id: Icf198f740191b5fff0008e9aa7b054c1cc33d75d
This commit is contained in:
		@@ -180,27 +180,9 @@ def make_query_from_filter(session, query, sample_filter, require_meter=True):
 | 
			
		||||
    return query
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def operator_in(field_name, field_value):
 | 
			
		||||
    return field_name.in_(field_value)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Connection(base.Connection):
 | 
			
		||||
    """SqlAlchemy connection."""
 | 
			
		||||
 | 
			
		||||
    operators = {"=": operator.eq,
 | 
			
		||||
                 "<": operator.lt,
 | 
			
		||||
                 ">": operator.gt,
 | 
			
		||||
                 "<=": operator.le,
 | 
			
		||||
                 "=<": operator.le,
 | 
			
		||||
                 ">=": operator.ge,
 | 
			
		||||
                 "=>": operator.ge,
 | 
			
		||||
                 "!=": operator.ne,
 | 
			
		||||
                 "in": operator_in}
 | 
			
		||||
    complex_operators = {"or": or_,
 | 
			
		||||
                         "and": and_}
 | 
			
		||||
    ordering_functions = {"asc": asc,
 | 
			
		||||
                          "desc": desc}
 | 
			
		||||
 | 
			
		||||
    def __init__(self, conf):
 | 
			
		||||
        url = conf.database.connection
 | 
			
		||||
        if url == 'sqlite://':
 | 
			
		||||
@@ -622,12 +604,10 @@ class Connection(base.Connection):
 | 
			
		||||
        query = session.query(table)
 | 
			
		||||
        query = make_query_from_filter(session, query, sample_filter,
 | 
			
		||||
                                       require_meter=False)
 | 
			
		||||
 | 
			
		||||
        query = self._apply_options(query,
 | 
			
		||||
                                    None,
 | 
			
		||||
                                    limit,
 | 
			
		||||
                                    table)
 | 
			
		||||
        return self._retrieve_samples(query)
 | 
			
		||||
        transformer = QueryTransformer(table, query)
 | 
			
		||||
        transformer.apply_options(None,
 | 
			
		||||
                                  limit)
 | 
			
		||||
        return self._retrieve_samples(transformer.get_query())
 | 
			
		||||
 | 
			
		||||
    def _retrieve_data(self, filter_expr, orderby, limit, table):
 | 
			
		||||
        if limit == 0:
 | 
			
		||||
@@ -635,21 +615,17 @@ class Connection(base.Connection):
 | 
			
		||||
 | 
			
		||||
        session = self._get_db_session()
 | 
			
		||||
        query = session.query(table)
 | 
			
		||||
 | 
			
		||||
        transformer = QueryTransformer(table, query)
 | 
			
		||||
        if filter_expr is not None:
 | 
			
		||||
            sql_condition = self._transform_expression(filter_expr,
 | 
			
		||||
                                                       table)
 | 
			
		||||
            query = query.filter(sql_condition)
 | 
			
		||||
            transformer.apply_filter(filter_expr)
 | 
			
		||||
 | 
			
		||||
        query = self._apply_options(query,
 | 
			
		||||
                                    orderby,
 | 
			
		||||
                                    limit,
 | 
			
		||||
                                    table)
 | 
			
		||||
        transformer.apply_options(orderby,
 | 
			
		||||
                                  limit)
 | 
			
		||||
 | 
			
		||||
        retrieve = {models.MeterSample: self._retrieve_samples,
 | 
			
		||||
                    models.Alarm: self._retrieve_alarms,
 | 
			
		||||
                    models.AlarmChange: self._retrieve_alarm_history}
 | 
			
		||||
        return retrieve[table](query)
 | 
			
		||||
        return retrieve[table](transformer.get_query())
 | 
			
		||||
 | 
			
		||||
    def query_samples(self, filter_expr=None, orderby=None, limit=None):
 | 
			
		||||
        return self._retrieve_data(filter_expr,
 | 
			
		||||
@@ -657,35 +633,6 @@ class Connection(base.Connection):
 | 
			
		||||
                                   limit,
 | 
			
		||||
                                   models.MeterSample)
 | 
			
		||||
 | 
			
		||||
    def _transform_expression(self, expression_tree, table):
 | 
			
		||||
 | 
			
		||||
        def transform(sub_tree):
 | 
			
		||||
            operator = sub_tree.keys()[0]
 | 
			
		||||
            nodes = sub_tree.values()[0]
 | 
			
		||||
            if operator in self.complex_operators:
 | 
			
		||||
                op = self.complex_operators[operator]
 | 
			
		||||
                element_list = []
 | 
			
		||||
                for node in nodes:
 | 
			
		||||
                    element = transform(node)
 | 
			
		||||
                    element_list.append(element)
 | 
			
		||||
                return op(*element_list)
 | 
			
		||||
            else:
 | 
			
		||||
                op = self.operators[operator]
 | 
			
		||||
                return op(getattr(table, nodes.keys()[0]), nodes.values()[0])
 | 
			
		||||
 | 
			
		||||
        return transform(expression_tree)
 | 
			
		||||
 | 
			
		||||
    def _apply_order_by(self, query, orderby, table):
 | 
			
		||||
 | 
			
		||||
        if orderby is not None:
 | 
			
		||||
            for field in orderby:
 | 
			
		||||
                ordering_function = self.ordering_functions[field.values()[0]]
 | 
			
		||||
                query = query.order_by(ordering_function(
 | 
			
		||||
                    getattr(table, field.keys()[0])))
 | 
			
		||||
        else:
 | 
			
		||||
            query = query.order_by(desc(table.timestamp))
 | 
			
		||||
        return query
 | 
			
		||||
 | 
			
		||||
    def _make_stats_query(self, sample_filter, groupby):
 | 
			
		||||
        select = [
 | 
			
		||||
            models.Meter.unit,
 | 
			
		||||
@@ -1246,3 +1193,88 @@ class Connection(base.Connection):
 | 
			
		||||
                yield api_models.Trait(name=type.desc,
 | 
			
		||||
                                       dtype=type.data_type,
 | 
			
		||||
                                       value=trait.get_value())
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class QueryTransformer(object):
 | 
			
		||||
    operators = {"=": operator.eq,
 | 
			
		||||
                 "<": operator.lt,
 | 
			
		||||
                 ">": operator.gt,
 | 
			
		||||
                 "<=": operator.le,
 | 
			
		||||
                 "=<": operator.le,
 | 
			
		||||
                 ">=": operator.ge,
 | 
			
		||||
                 "=>": operator.ge,
 | 
			
		||||
                 "!=": operator.ne,
 | 
			
		||||
                 "in": lambda field_name, values: field_name.in_(values)}
 | 
			
		||||
 | 
			
		||||
    complex_operators = {"or": or_,
 | 
			
		||||
                         "and": and_}
 | 
			
		||||
 | 
			
		||||
    ordering_functions = {"asc": asc,
 | 
			
		||||
                          "desc": desc}
 | 
			
		||||
 | 
			
		||||
    def __init__(self, table, query):
 | 
			
		||||
        self.table = table
 | 
			
		||||
        self.query = query
 | 
			
		||||
 | 
			
		||||
    def _handle_complex_op(self, complex_op, nodes):
 | 
			
		||||
        op = self.complex_operators[complex_op]
 | 
			
		||||
        element_list = []
 | 
			
		||||
        for node in nodes:
 | 
			
		||||
            element = self._transform(node)
 | 
			
		||||
            element_list.append(element)
 | 
			
		||||
        return op(*element_list)
 | 
			
		||||
 | 
			
		||||
    def _handle_simple_op(self, simple_op, nodes):
 | 
			
		||||
        op = self.operators[simple_op]
 | 
			
		||||
        field_name = nodes.keys()[0]
 | 
			
		||||
        value = nodes.values()[0]
 | 
			
		||||
        if field_name.startswith('resource_metadata.'):
 | 
			
		||||
            return self._handle_metadata(op, field_name, value)
 | 
			
		||||
        else:
 | 
			
		||||
            return op(getattr(self.table, field_name), value)
 | 
			
		||||
 | 
			
		||||
    def _handle_metadata(self, op, field_name, value):
 | 
			
		||||
        if op == self.operators["in"]:
 | 
			
		||||
            raise NotImplementedError(
 | 
			
		||||
                _("Metadata query with in operator is not implemented"))
 | 
			
		||||
 | 
			
		||||
        field_name = field_name[len('resource_metadata.'):]
 | 
			
		||||
        meta_table = META_TYPE_MAP[type(value)]
 | 
			
		||||
        meta_alias = aliased(meta_table)
 | 
			
		||||
        on_clause = and_(self.table.id == meta_alias.id,
 | 
			
		||||
                         meta_alias.meta_key == field_name)
 | 
			
		||||
        # outer join is needed to support metaquery
 | 
			
		||||
        # with or operator on non existent metadata field
 | 
			
		||||
        # see: test_query_non_existing_metadata_with_result
 | 
			
		||||
        # test case.
 | 
			
		||||
        self.query = self.query.outerjoin(meta_alias, on_clause)
 | 
			
		||||
        return op(meta_alias.value, value)
 | 
			
		||||
 | 
			
		||||
    def _transform(self, sub_tree):
 | 
			
		||||
        operator = sub_tree.keys()[0]
 | 
			
		||||
        nodes = sub_tree.values()[0]
 | 
			
		||||
        if operator in self.complex_operators:
 | 
			
		||||
            return self._handle_complex_op(operator, nodes)
 | 
			
		||||
        else:
 | 
			
		||||
            return self._handle_simple_op(operator, nodes)
 | 
			
		||||
 | 
			
		||||
    def apply_filter(self, expression_tree):
 | 
			
		||||
        condition = self._transform(expression_tree)
 | 
			
		||||
        self.query = self.query.filter(condition)
 | 
			
		||||
 | 
			
		||||
    def apply_options(self, orderby, limit):
 | 
			
		||||
        self._apply_order_by(orderby)
 | 
			
		||||
        if limit is not None:
 | 
			
		||||
            self.query = self.query.limit(limit)
 | 
			
		||||
 | 
			
		||||
    def _apply_order_by(self, orderby):
 | 
			
		||||
        if orderby is not None:
 | 
			
		||||
            for field in orderby:
 | 
			
		||||
                ordering_function = self.ordering_functions[field.values()[0]]
 | 
			
		||||
                self.query = self.query.order_by(ordering_function(
 | 
			
		||||
                    getattr(self.table, field.keys()[0])))
 | 
			
		||||
        else:
 | 
			
		||||
            self.query = self.query.order_by(desc(self.table.timestamp))
 | 
			
		||||
 | 
			
		||||
    def get_query(self):
 | 
			
		||||
        return self.query
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user