Database read write in bulk

1. Each processor event creates a few graph changes.
   Instead of persisting these seperatly, these are now persisted in bulk.

2. Scenario Evaluator read and write (active_actions table) are performed
   outside of the loop.

3. Evaluator and consistency generated actions are sent to the processor
   as lists, not as individual messages.

Depends-On: https://review.openstack.org/#/c/639330/
Change-Id: Iad3851d6969635730ffc0fb6a728a87e282d05c0
This commit is contained in:
Idan Hefetz 2019-02-24 13:39:11 +00:00
parent ca66ef5855
commit 12bac362ee
13 changed files with 236 additions and 141 deletions

View File

@ -100,6 +100,7 @@ class ConsistencyEnforcer(object):
return self._filter_vertices_to_be_deleted(vertices)
def _push_events_to_queue(self, vertices, action):
events = []
for vertex in vertices:
event = {
DSProps.ENTITY_TYPE: CONSISTENCY_DATASOURCE,
@ -112,7 +113,8 @@ class ConsistencyEnforcer(object):
VProps.VITRAGE_CATEGORY: vertex[VProps.VITRAGE_CATEGORY],
VProps.IS_REAL_VITRAGE_ID: True
}
self.actions_callback('consistency', event)
events.append(event)
self.actions_callback('consistency', events)
@staticmethod
def _filter_vertices_to_be_deleted(vertices):

View File

@ -93,11 +93,15 @@ class VitrageGraphInit(object):
self.events_coordination.start()
def process_event(self, event):
if event.get('template_action'):
if isinstance(event, list):
for e in event:
self.processor.process_event(e)
elif event.get('template_action'):
self.workers.submit_template_event(event)
self.workers.submit_evaluators_reload_templates()
else:
self.processor.process_event(event)
self.persist.flush_events()
def _recreate_transformers_id_cache(self):
for v in self.graph.get_vertices():

View File

@ -28,6 +28,7 @@ class GraphPersistency(object):
self.conf = conf
self.db = db
self.graph = graph
self.events_buffer = []
def store_graph(self):
LOG.info('Persisting graph...')
@ -91,7 +92,13 @@ class GraphPersistency(object):
event_row = models.Event(payload=curr, is_vertex=is_vertex,
event_id=event_id)
self.db.events.create(event_row)
self.events_buffer.append(event_row)
def flush_events(self):
if not self.events_buffer:
return
self.db.events.bulk_create(self.events_buffer)
self.events_buffer = []
@staticmethod
def is_important_change(before, curr, *args):

View File

@ -18,7 +18,6 @@ import cotyledon
import multiprocessing
import multiprocessing.queues
import os
from oslo_concurrency import processutils as ps
from oslo_log import log
import oslo_messaging
from oslo_utils import uuidutils
@ -94,7 +93,7 @@ class GraphWorkersManager(cotyledon.ServiceManager):
"""
if self._evaluator_queues:
raise VitrageError('add_evaluator_workers called more than once')
workers = self._conf.evaluator.workers or ps.get_worker_count()
workers = self._conf.evaluator.workers
queues = [multiprocessing.JoinableQueue() for i in range(workers)]
self.add(EvaluatorWorker,
args=(self._conf, queues, workers),

View File

@ -19,12 +19,10 @@ from vitrage.evaluator.template_schemas import init_template_schemas
# Register options for the service
OPTS = [
cfg.IntOpt('workers',
default=None,
default=1,
min=1,
max=32,
help='Number of workers for template evaluator, default is '
'equal to the number of CPUs available if that can be '
'determined, else a default worker count of 1 is returned.'
help='Number of workers for template evaluator.'
),
]

View File

@ -45,6 +45,8 @@ from vitrage.utils import datetime as datetime_utils
LOG = log.getLogger(__name__)
EVALUATOR_EVENT = 'evaluator.event'
TARGET = 'target'
SOURCE = 'source'
class ActionExecutor(object):
@ -64,7 +66,17 @@ class ActionExecutor(object):
EXECUTE_EXTERNAL: self._execute_external,
}
def execute(self, action_spec, action_mode):
def execute(self, actions):
if not actions:
return
events = []
for action in actions:
LOG.info('Action: %s', self._action_str(action))
events.extend(self._execute(action.specs, action.mode))
self.actions_callback(EVALUATOR_EVENT, events)
def _execute(self, action_spec, action_mode):
action_recipe = self.action_recipes[action_spec.type]
if action_mode == ActionMode.DO:
@ -72,8 +84,10 @@ class ActionExecutor(object):
else:
steps = action_recipe.get_undo_recipe(action_spec)
events = []
for step in steps:
self.action_step_defs[step.type](step.params)
events.append(self.action_step_defs[step.type](step.params))
return events
def _add_vertex(self, params):
@ -81,7 +95,7 @@ class ActionExecutor(object):
ActionExecutor._add_default_properties(event)
event[EVALUATOR_EVENT_TYPE] = ADD_VERTEX
self.actions_callback(EVALUATOR_EVENT, event)
return event
def _update_vertex(self, params):
@ -89,14 +103,14 @@ class ActionExecutor(object):
ActionExecutor._add_default_properties(event)
event[EVALUATOR_EVENT_TYPE] = UPDATE_VERTEX
self.actions_callback(EVALUATOR_EVENT, event)
return event
def _remove_vertex(self, params):
event = copy.deepcopy(params)
ActionExecutor._add_default_properties(event)
event[EVALUATOR_EVENT_TYPE] = REMOVE_VERTEX
self.actions_callback(EVALUATOR_EVENT, event)
return event
def _add_edge(self, params):
@ -104,7 +118,7 @@ class ActionExecutor(object):
ActionExecutor._add_default_properties(event)
event[EVALUATOR_EVENT_TYPE] = ADD_EDGE
self.actions_callback(EVALUATOR_EVENT, event)
return event
def _remove_edge(self, params):
@ -112,7 +126,7 @@ class ActionExecutor(object):
ActionExecutor._add_default_properties(event)
event[EVALUATOR_EVENT_TYPE] = REMOVE_EDGE
self.actions_callback(EVALUATOR_EVENT, event)
return event
def _execute_external(self, params):
@ -158,3 +172,11 @@ class ActionExecutor(object):
"%s.%s" % (ExecuteMistral.__module__, ExecuteMistral.__name__))
return recipes
@staticmethod
def _action_str(action):
s = action.specs.targets.get(SOURCE, {}).get(VProps.VITRAGE_ID, '')
t = action.specs.targets.get(TARGET, {}).get(VProps.VITRAGE_ID, '')
return '%s %s \'%s\' targets (%s,%s)' % (action.mode.upper(),
action.specs.type,
action.action_id, s, t)

View File

@ -39,6 +39,8 @@ from vitrage.graph.algo_driver.sub_graph_matching import \
NEG_CONDITION
from vitrage.graph.driver import Vertex
from vitrage import storage
from vitrage.storage.sqlalchemy import models
from vitrage.utils.datetime import utcnow
LOG = log.getLogger(__name__)
@ -65,12 +67,10 @@ class ScenarioEvaluator(object):
enabled=False):
self._conf = conf
self._entity_graph = e_graph
self._db_connection = storage.get_connection_from_config(self._conf)
self._db = storage.get_connection_from_config(self._conf)
self._scenario_repo = scenario_repo
self._action_executor = ActionExecutor(self._conf, actions_callback)
self._entity_graph.subscribe(self.process_event)
self._active_actions_tracker = ActiveActionsTracker(
self._conf, self._db_connection)
self.enabled = enabled
self.connected_component_cache = defaultdict(dict)
@ -138,10 +138,7 @@ class ScenarioEvaluator(object):
LOG.exception("Evaluator error, will not execute actions %s",
str(actions))
for action in actions_to_preform:
LOG.info('Action: %s', self._action_str(action))
self._action_executor.execute(action.specs, action.mode)
self._action_executor.execute(actions_to_preform)
LOG.debug('Process event - completed')
def _get_element_scenarios(self, element, is_vertex):
@ -314,33 +311,23 @@ class ScenarioEvaluator(object):
def _analyze_and_filter_actions(self, actions):
LOG.debug("Actions before filtering: %s", actions)
if not actions:
return []
actions_to_perform = []
active_actions = ActiveActionsTracker(self._conf, self._db, actions)
for action_info in actions:
if action_info.mode == ActionMode.DO:
is_highest_score, exists = \
self._active_actions_tracker.calc_do_action(action_info)
if is_highest_score and not exists:
actions_to_perform.append(action_info)
active_actions.calc_do_action(action_info)
elif action_info.mode == ActionMode.UNDO:
is_highest_score, second_highest = \
self._active_actions_tracker.calc_undo_action(action_info)
if is_highest_score:
# We should 'DO' the Second highest scored action so
# to override the existing dominant action.
# or, if there is no second highest scored action
# So we just 'UNDO' the existing dominant action
if second_highest:
action_to_perform = self._db_action_to_action_info(
second_highest)
actions_to_perform.append(action_to_perform)
else:
active_actions.calc_undo_action(action_info)
actions_to_perform.append(action_info)
active_actions.flush_db_updates()
unique_ordered_actions = OrderedDict()
for action in actions_to_perform:
id_ = ScenarioEvaluator._generate_action_id(action.specs)
for action in active_actions.actions_to_perform:
if isinstance(action, models.ActiveAction):
action = self._db_action_to_action_info(action)
id_ = self._generate_action_id(action.specs)
unique_ordered_actions[id_] = action
return unique_ordered_actions.values()
@ -472,14 +459,6 @@ class ScenarioEvaluator(object):
for v_id in ver_to_remove:
del match[v_id]
@staticmethod
def _action_str(action):
s = action.specs.targets.get(SOURCE, {}).get(VProps.VITRAGE_ID, '')
t = action.specs.targets.get(TARGET, {}).get(VProps.VITRAGE_ID, '')
return '%s %s \'%s\' targets (%s,%s)' % (action.mode.upper(),
action.specs.type,
action.action_id, s, t)
class ActiveActionsTracker(object):
"""Keeps track of all active actions and relative dominance/priority.
@ -502,13 +481,31 @@ class ActiveActionsTracker(object):
The score is used to determine which action in each group of similar
actions to be executed next.
"""
action_tools = None
def __init__(self, conf, db_connection):
def __init__(self, conf, db, actions):
self.db = db
self.data = defaultdict(set)
self.actions_to_create = {}
self.actions_to_remove = set()
self.actions_to_perform = [] # use a list to keep the insertion order
self._init_action_tools(conf)
# Query DB for all actions with same properties
actions_keys = set([self._get_key(action) for action in actions])
db_rows = self.db.active_actions.query_similar(actions_keys) or []
for db_row in db_rows:
self.data[(db_row.source_vertex_id, db_row.target_vertex_id,
db_row.extra_info, db_row.action_type)].add(db_row)
@classmethod
def _init_action_tools(cls, conf):
if cls.action_tools:
return
info_mapper = DatasourceInfoMapper(conf)
self._db = db_connection
alarms_score = info_mapper.get_datasource_priorities('vitrage')
all_scores = info_mapper.get_datasource_priorities()
self._action_tools = {
cls.action_tools = {
ActionType.SET_STATE: pt.SetStateTools(all_scores),
ActionType.RAISE_ALARM: pt.RaiseAlarmTools(alarms_score),
ActionType.ADD_CAUSAL_RELATIONSHIP: pt.BaselineTools,
@ -523,50 +520,78 @@ class ActiveActionsTracker(object):
Only a top scored action that is new should be performed
:return: (is top score, is it already existing)
"""
# TODO(idan_hefetz): DB read and write not in a transaction
active_actions = self._query_similar_actions(action_info)
similar_actions = self._get_similar(action_info)
exists = any(
a.action_id == action_info.action_id and
a.trigger == action_info.trigger_id for a in active_actions)
a.trigger == action_info.trigger_id for a in similar_actions)
if not exists:
db_row = self._to_db_row(action_info)
active_actions.append(db_row)
LOG.debug("DB Insert active_actions %s", str(db_row))
self._db.active_actions.create(db_row)
return self._is_highest_score(active_actions, action_info), exists
self._add(action_info)
if not exists and self._is_highest_score(similar_actions, action_info):
self.actions_to_perform.append(action_info)
def calc_undo_action(self, action_info):
"""Delete this action form active_actions table, if exists
return value to help decide if action should be performed
decide if action should be performed
A top scored action should be 'undone' if there is not a second action.
If there is a second, it should now be 'done' and become the dominant
:param action_info: action to delete
:return: is_highest_score, second highest action if exists
"""
# TODO(idan_hefetz): DB read and write not in a transaction
active_actions = self._query_similar_actions(action_info)
similar_actions = self._get_similar(action_info)
if not self._is_highest_score(similar_actions, action_info):
self._remove(action_info)
return
LOG.debug("DB delete active_actions %s %s",
action_info.action_id,
str(action_info.trigger_id))
self._db.active_actions.delete(
action_id=action_info.action_id,
trigger=action_info.trigger_id)
is_highest_score = self._is_highest_score(active_actions, action_info)
if is_highest_score and len(active_actions) > 1:
return is_highest_score, self._sort_db_actions(active_actions)[1]
second_highest = self._sort_db_actions(similar_actions)[1]\
if len(similar_actions) > 1 else None
# We should 'DO' the Second highest scored action so
# to override the existing dominant action.
# or, if there is no second highest scored action
# So we just 'UNDO' the existing dominant action
if second_highest:
self.actions_to_perform.append(second_highest)
else:
return is_highest_score, None
self.actions_to_perform.append(action_info)
self._remove(action_info)
def flush_db_updates(self):
self.db.active_actions.bulk_create(self.actions_to_create.values())
self.db.active_actions.bulk_delete(self.actions_to_remove)
def _add(self, action_info):
db_row = self._to_db_row(action_info)
self._get_similar(action_info).add(db_row)
id_ = ScenarioEvaluator._generate_action_id(action_info.specs)
if id_ not in self.actions_to_create:
self.actions_to_create[id_] = db_row
def _remove(self, action_info):
similar_actions = self._get_similar(action_info)
for action in similar_actions:
if action.trigger == action_info.trigger_id and \
action.action_id == action_info.action_id:
similar_actions.remove(action)
break
self.actions_to_remove.add(
(action_info.trigger_id, action_info.action_id))
def _get_similar(self, action_info):
return self.data.get(self._get_key(action_info), set())
def _get_key(self, action_info):
src = action_info.specs.targets.get(SOURCE, {}).get(VProps.VITRAGE_ID)
trg = action_info.specs.targets.get(TARGET, {}).get(VProps.VITRAGE_ID)
extra_info = self.action_tools[action_info.specs.type].get_extra_info(
action_info.specs)
action_type = action_info.specs.type
return src, trg, extra_info, action_type
def _to_db_row(self, action_info):
source = action_info.specs.targets.get(SOURCE, {})
target = action_info.specs.targets.get(TARGET, {})
action_score = self._action_tools[action_info.specs.type].\
action_score = self.action_tools[action_info.specs.type]. \
get_score(action_info)
extra_info = self._action_tools[action_info.specs.type].\
extra_info = self.action_tools[action_info.specs.type]. \
get_extra_info(action_info.specs)
return storage.sqlalchemy.models.ActiveAction(
action_type=action_info.specs.type,
@ -577,18 +602,6 @@ class ActiveActionsTracker(object):
trigger=action_info.trigger_id,
score=action_score)
def _query_similar_actions(self, action_info):
"""Query DB for all actions with same properties"""
source = action_info.specs.targets.get(SOURCE, {})
target = action_info.specs.targets.get(TARGET, {})
extra_info = self._action_tools[action_info.specs.type].get_extra_info(
action_info.specs)
return self._db.active_actions.query(
action_type=action_info.specs.type,
extra_info=extra_info,
source_vertex_id=source.get(VProps.VITRAGE_ID),
target_vertex_id=target.get(VProps.VITRAGE_ID))
@classmethod
def _is_highest_score(cls, db_actions, action_info):
"""Get the top action from the list and compare to action_info
@ -600,7 +613,8 @@ class ActiveActionsTracker(object):
if not db_actions:
return True
highest_score_action = min(
db_actions, key=lambda action: (-action.score, action.created_at))
db_actions, key=lambda action: (-action.score, action.created_at
or utcnow(False)))
return highest_score_action.trigger == action_info.trigger_id and \
highest_score_action.action_id == action_info.action_id

View File

@ -17,7 +17,7 @@ from __future__ import absolute_import
from oslo_db.sqlalchemy import session as db_session
from oslo_log import log
from sqlalchemy import and_
from sqlalchemy import and_, or_
from sqlalchemy.engine import url as sqlalchemy_url
from sqlalchemy import func
@ -146,6 +146,14 @@ class BaseTableConn(object):
super(BaseTableConn, self).__init__()
self._engine_facade = engine_facade
def bulk_create(self, items):
if not items:
return
session = self._engine_facade.get_session()
with session.begin():
session.bulk_save_objects(items)
def query_filter(self, model, **kwargs):
session = self._engine_facade.get_session()
query = session.query(model)
@ -225,6 +233,21 @@ class ActiveActionsConnection(base.ActiveActionsConnection, BaseTableConn):
trigger=trigger)
return query.all()
def query_similar(self, actions):
"""Query DB for all actions with same properties"""
session = self._engine_facade.get_session()
query = session.query(models.ActiveAction)
filters = []
for source, target, extra_info, action_type in actions:
filters.append(
and_(models.ActiveAction.action_type == action_type,
models.ActiveAction.extra_info == extra_info,
models.ActiveAction.source_vertex_id == source,
models.ActiveAction.target_vertex_id == target,))
query = query.filter(or_(*filters))
return query.all()
def delete(self,
action_type=None,
extra_info=None,
@ -244,6 +267,20 @@ class ActiveActionsConnection(base.ActiveActionsConnection, BaseTableConn):
trigger=trigger)
return query.delete()
def bulk_delete(self, actions):
if not actions:
return
session = self._engine_facade.get_session()
query = session.query(models.ActiveAction)
filters = []
for trigger, action_id in actions:
filters.append(
and_(models.ActiveAction.trigger == trigger,
models.ActiveAction.action_id == action_id))
query = query.filter(or_(*filters))
return query.delete()
class WebhooksConnection(base.WebhooksConnection,
BaseTableConn):

View File

@ -51,3 +51,13 @@ class TestFunctionalBase(TestEntityGraphUnitBase):
snap_vals={DSProps.DATASOURCE_ACTION:
DatasourceAction.INIT_SNAPSHOT})
return mock_driver.generate_sequential_events_list(gen_list)
@staticmethod
def _consume_queue(event_queue, processor):
while not event_queue.empty():
data = event_queue.get()
if isinstance(data, list):
for event in data:
processor.process_event(event)
else:
processor.process_event(data)

View File

@ -260,8 +260,12 @@ class TestConsistencyFunctional(TestFunctionalBase, TestConfiguration):
count = 0
while not self.event_queue.empty():
count += 1
event = self.event_queue.get()
self.processor.process_event(event)
data = self.event_queue.get()
if isinstance(data, list):
for event in data:
self.processor.process_event(event)
else:
self.processor.process_event(data)
return
num_retries += 1

View File

@ -73,6 +73,7 @@ class TestGraphPersistor(TestFunctionalBase, TestConfiguration):
edge = g.get_edges(vertices[0].vertex_id).pop()
edge[EdgeProperties.VITRAGE_IS_DELETED] = True
g.update_edge(edge)
graph_persistor.flush_events()
# Store graph:
graph_persistor.store_graph()
@ -85,6 +86,7 @@ class TestGraphPersistor(TestFunctionalBase, TestConfiguration):
edge = g.get_edges(vertices[2].vertex_id).pop()
edge[EdgeProperties.RELATIONSHIP_TYPE] = 'kuku'
g.update_edge(edge)
graph_persistor.flush_events()
self.assertIsNone(self.fail_msg, 'callback failed')

View File

@ -38,6 +38,7 @@ from vitrage.evaluator.actions.evaluator_event_transformer \
import VITRAGE_DATASOURCE
from vitrage.evaluator.actions.recipes.action_steps import ADD_VERTEX
from vitrage.evaluator.actions.recipes.base import EVALUATOR_EVENT_TYPE
from vitrage.evaluator.scenario_evaluator import ActionInfo
from vitrage.evaluator.template_data import ActionSpecs
from vitrage.evaluator.template_fields import TemplateFields as TFields
from vitrage.opts import register_opts
@ -84,8 +85,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration):
event_queue, action_executor = self._init_executer()
# Test Action - do
action_executor.execute(action_spec, ActionMode.DO)
processor.process_event(event_queue.get())
action_executor.execute(
[ActionInfo(action_spec, ActionMode.DO, None, None)])
self._consume_queue(event_queue, processor)
host_vertex_after = processor.entity_graph.get_vertex(
host_vertex_before.vertex_id)
@ -104,8 +106,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration):
self.assertEqual(v_state_after, OperationalResourceState.SUBOPTIMAL)
# Test Action - undo
action_executor.execute(action_spec, ActionMode.UNDO)
processor.process_event(event_queue.get())
action_executor.execute(
[ActionInfo(action_spec, ActionMode.UNDO, None, None)])
self._consume_queue(event_queue, processor)
host_vertex_after_undo = processor.entity_graph.get_vertex(
host_vertex_before.vertex_id)
@ -134,8 +137,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration):
event_queue, action_executor = self._init_executer()
# Test Action - do
action_executor.execute(action_spec, ActionMode.DO)
processor.process_event(event_queue.get())
action_executor.execute(
[ActionInfo(action_spec, ActionMode.DO, None, None)])
self._consume_queue(event_queue, processor)
instance_vertex_after = processor.entity_graph.get_vertex(
instance_vertex_before.vertex_id)
@ -144,8 +148,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration):
self.assertTrue(instance_vertex_after.get(VProps.IS_MARKED_DOWN))
# Test Action - undo
action_executor.execute(action_spec, ActionMode.UNDO)
processor.process_event(event_queue.get())
action_executor.execute(
[ActionInfo(action_spec, ActionMode.UNDO, None, None)])
self._consume_queue(event_queue, processor)
instance_vertex_after_undo = processor.entity_graph.get_vertex(
instance_vertex_before.vertex_id)
@ -170,8 +175,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration):
event_queue, action_executor = self._init_executer()
# Test Action - do
action_executor.execute(action_spec, ActionMode.DO)
processor.process_event(event_queue.get())
action_executor.execute(
[ActionInfo(action_spec, ActionMode.DO, None, None)])
self._consume_queue(event_queue, processor)
host_vertex_after = processor.entity_graph.get_vertex(
host_vertex_before.vertex_id)
@ -180,8 +186,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration):
self.assertTrue(host_vertex_after.get(VProps.IS_MARKED_DOWN))
# Test Action - undo
action_executor.execute(action_spec, ActionMode.UNDO)
processor.process_event(event_queue.get())
action_executor.execute(
[ActionInfo(action_spec, ActionMode.UNDO, None, None)])
self._consume_queue(event_queue, processor)
host_vertex_after_undo = processor.entity_graph.get_vertex(
host_vertex_before.vertex_id)
@ -227,8 +234,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration):
alarm1.vertex_id,
EdgeLabel.CAUSES)
# Test Action - do
action_executor.execute(action_spec, ActionMode.DO)
processor.process_event(event_queue.get())
action_executor.execute(
[ActionInfo(action_spec, ActionMode.DO, None, None)])
self._consume_queue(event_queue, processor)
new_edge = processor.entity_graph.get_edge(alarm2.vertex_id,
alarm1.vertex_id,
@ -267,8 +275,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration):
event_queue, action_executor = self._init_executer()
# Test Action
action_executor.execute(action_spec, ActionMode.DO)
processor.process_event(event_queue.get())
action_executor.execute(
[ActionInfo(action_spec, ActionMode.DO, None, None)])
self._consume_queue(event_queue, processor)
after_alarms = processor.entity_graph.get_vertices(
vertex_attr_filter=alarm_vertex_attrs)
@ -330,9 +339,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration):
event_queue, action_executor = self._init_executer()
# Test Action - undo
action_executor.execute(action_spec, ActionMode.UNDO)
event = event_queue.get()
processor.process_event(event)
action_executor.execute(
[ActionInfo(action_spec, ActionMode.UNDO, None, None)])
self._consume_queue(event_queue, processor)
after_alarms = processor.entity_graph.get_vertices(
vertex_attr_filter=alarm_vertex_attrs)

View File

@ -450,8 +450,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
port_vertex = entity_graph.get_vertices(
vertex_attr_filter={VProps.VITRAGE_TYPE:
NEUTRON_PORT_DATASOURCE})[0]
while not event_queue.empty():
processor.process_event(event_queue.get())
self._consume_queue(event_queue, processor)
# test asserts
query = {VProps.VITRAGE_CATEGORY: EntityCategory.ALARM}
@ -482,8 +481,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
nagios_event = mock_driver.generate_random_events_list(generator)[0]
processor.process_event(nagios_event)
while not event_queue.empty():
processor.process_event(event_queue.get())
self._consume_queue(event_queue, processor)
# test asserts
self.assertEqual(num_orig_vertices + num_added_vertices +
@ -527,8 +525,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
nagios_vertex.vertex_id)][0]
nagios_edge[EProps.VITRAGE_IS_DELETED] = True
processor.entity_graph.update_edge(nagios_edge)
while not event_queue.empty():
processor.process_event(event_queue.get())
self._consume_queue(event_queue, processor)
# test asserts
self.assertEqual(num_orig_vertices + num_added_vertices +
@ -583,8 +580,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
nagios_vertex.vertex_id)][0]
nagios_edge[EProps.VITRAGE_IS_DELETED] = False
processor.entity_graph.update_edge(nagios_edge)
while not event_queue.empty():
processor.process_event(event_queue.get())
self._consume_queue(event_queue, processor)
# test asserts
self.assertEqual(num_orig_vertices + num_added_vertices +
@ -638,8 +634,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
# disable PORT_PROBLEM alarm
nagios_event[NagiosProperties.STATUS] = NagiosTestStatus.OK
processor.process_event(nagios_event)
while not event_queue.empty():
processor.process_event(event_queue.get())
self._consume_queue(event_queue, processor)
# test asserts
self.assertEqual(num_orig_vertices + num_added_vertices +
@ -756,8 +751,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
EdgeLabel.ATTACHED)
entity_graph.add_edge(edge)
while not event_queue.empty():
processor.process_event(event_queue.get())
self._consume_queue(event_queue, processor)
# test asserts
query = {VProps.VITRAGE_CATEGORY: EntityCategory.ALARM}
@ -789,8 +783,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
nagios_event = mock_driver.generate_random_events_list(generator)[0]
processor.process_event(nagios_event)
while not event_queue.empty():
processor.process_event(event_queue.get())
self._consume_queue(event_queue, processor)
self.assertEqual(num_orig_vertices + num_added_vertices +
num_deduced_vertices + num_network_alarm_vertices,
@ -825,8 +818,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
# delete NETWORK_PROBLEM alarm
nagios_event[NagiosProperties.STATUS] = NagiosTestStatus.OK
processor.process_event(nagios_event)
while not event_queue.empty():
processor.process_event(event_queue.get())
self._consume_queue(event_queue, processor)
self.assertEqual(num_orig_vertices + num_added_vertices +
num_deduced_vertices + num_network_alarm_vertices +
@ -907,8 +899,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
volume_event1['attachments'][0]['server_id'] = instances[0][VProps.ID]
processor.process_event(volume_event1)
while not event_queue.empty():
processor.process_event(event_queue.get())
self._consume_queue(event_queue, processor)
# test asserts
num_volumes = 1
@ -955,8 +946,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
volume_event2['attachments'][0]['server_id'] = instances[1][VProps.ID]
processor.process_event(volume_event2)
while not event_queue.empty():
processor.process_event(event_queue.get())
self._consume_queue(event_queue, processor)
# test asserts
num_volumes = 2
@ -1021,8 +1011,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
volume_event2['volume_attachment'][0]['instance_uuid'] = \
volume_event2['attachments'][0]['server_id']
processor.process_event(volume_event2)
while not event_queue.empty():
processor.process_event(event_queue.get())
self._consume_queue(event_queue, processor)
# test asserts
self.assertEqual(num_orig_vertices + num_volumes + num_deduced_alarms +
@ -1106,8 +1095,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
volume_event1['volume_attachment'][0]['instance_uuid'] = \
volume_event1['attachments'][0]['server_id']
processor.process_event(volume_event1)
while not event_queue.empty():
processor.process_event(event_queue.get())
self._consume_queue(event_queue, processor)
# test asserts
self.assertEqual(num_orig_vertices + num_volumes + num_deduced_alarms +
@ -1352,8 +1340,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
def get_host_after_event(self, event_queue, nagios_event,
processor, target_host):
processor.process_event(nagios_event)
while not event_queue.empty():
processor.process_event(event_queue.get())
self._consume_queue(event_queue, processor)
host_v = self._get_entity_from_graph(NOVA_HOST_DATASOURCE,
target_host,
target_host,