Merge "use messaging instead of multiprocessing queue"

This commit is contained in:
Zuul 2017-12-05 19:04:20 +00:00 committed by Gerrit Code Review
commit c044180f00
15 changed files with 254 additions and 141 deletions

View File

@ -5,10 +5,8 @@
- vitrage-dsvm-api-py27 - vitrage-dsvm-api-py27
- vitrage-dsvm-datasources-py27 - vitrage-dsvm-datasources-py27
- vitrage-dsvm-api-py35: - vitrage-dsvm-api-py35:
voting: false
branches: ^(?!stable/(newton|ocata|pike)).*$ branches: ^(?!stable/(newton|ocata|pike)).*$
- vitrage-dsvm-datasources-py35: - vitrage-dsvm-datasources-py35:
voting: false
branches: ^(?!stable/(newton|ocata|pike)).*$ branches: ^(?!stable/(newton|ocata|pike)).*$
gate: gate:
jobs: jobs:

View File

@ -13,7 +13,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import multiprocessing
import sys import sys
from oslo_service import service as os_service from oslo_service import service as os_service
@ -38,20 +37,17 @@ def main():
print(VITRAGE_TITLE) print(VITRAGE_TITLE)
conf = service.prepare_service() conf = service.prepare_service()
evaluator_queue = multiprocessing.Queue()
e_graph = entity_graph.get_graph_driver(conf)('Entity Graph') e_graph = entity_graph.get_graph_driver(conf)('Entity Graph')
launcher = os_service.ServiceLauncher(conf) launcher = os_service.ServiceLauncher(conf)
full_scenario_repo = ScenarioRepository(conf) full_scenario_repo = ScenarioRepository(conf)
clear_db(conf) clear_db(conf)
launcher.launch_service(VitrageGraphService( launcher.launch_service(VitrageGraphService(conf, e_graph))
conf, evaluator_queue, e_graph))
launcher.launch_service(VitrageApiHandlerService( launcher.launch_service(VitrageApiHandlerService(
conf, e_graph, full_scenario_repo)) conf, e_graph, full_scenario_repo))
launcher.launch_service(VitrageConsistencyService( launcher.launch_service(VitrageConsistencyService(conf, e_graph))
conf, evaluator_queue, e_graph))
launcher.wait() launcher.wait()

View File

@ -32,6 +32,8 @@ OPTS = [
help='graph driver implementation class'), help='graph driver implementation class'),
] ]
EVALUATOR_TOPIC = 'vitrage.evaluator'
def get_graph_driver(conf): def get_graph_driver(conf):
try: try:

View File

@ -35,10 +35,10 @@ class ConsistencyEnforcer(object):
def __init__(self, def __init__(self,
conf, conf,
evaluator_queue, actions_callback,
entity_graph): entity_graph):
self.conf = conf self.conf = conf
self.evaluator_queue = evaluator_queue self.actions_callback = actions_callback
self.graph = entity_graph self.graph = entity_graph
def periodic_process(self): def periodic_process(self):
@ -104,7 +104,7 @@ class ConsistencyEnforcer(object):
VProps.VITRAGE_CATEGORY: vertex[VProps.VITRAGE_CATEGORY], VProps.VITRAGE_CATEGORY: vertex[VProps.VITRAGE_CATEGORY],
VProps.IS_REAL_VITRAGE_ID: True VProps.IS_REAL_VITRAGE_ID: True
} }
self.evaluator_queue.put(event) self.actions_callback('consistency', event)
@staticmethod @staticmethod
def _filter_vertices_to_be_deleted(vertices): def _filter_vertices_to_be_deleted(vertices):

View File

@ -17,6 +17,8 @@ from oslo_service import service as os_service
from vitrage.entity_graph.consistency.consistency_enforcer \ from vitrage.entity_graph.consistency.consistency_enforcer \
import ConsistencyEnforcer import ConsistencyEnforcer
from vitrage.entity_graph import EVALUATOR_TOPIC
from vitrage.messaging import VitrageNotifier
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -25,21 +27,22 @@ class VitrageConsistencyService(os_service.Service):
def __init__(self, def __init__(self,
conf, conf,
evaluator_queue,
entity_graph): entity_graph):
super(VitrageConsistencyService, self).__init__() super(VitrageConsistencyService, self).__init__()
self.conf = conf self.conf = conf
self.evaluator_queue = evaluator_queue
self.entity_graph = entity_graph self.entity_graph = entity_graph
self.actions_notifier = VitrageNotifier(
conf, 'vitrage_consistency', EVALUATOR_TOPIC)
def start(self): def start(self):
LOG.info("Vitrage Consistency Service - Starting...") LOG.info("Vitrage Consistency Service - Starting...")
super(VitrageConsistencyService, self).start() super(VitrageConsistencyService, self).start()
consistency_enf = ConsistencyEnforcer(self.conf, consistency_enf = ConsistencyEnforcer(
self.evaluator_queue, conf=self.conf,
self.entity_graph) actions_callback=self.actions_notifier.notify,
entity_graph=self.entity_graph)
self.tg.add_timer(self.conf.datasources.snapshots_interval, self.tg.add_timer(self.conf.datasources.snapshots_interval,
consistency_enf.periodic_process, consistency_enf.periodic_process,
initial_delay=60 + initial_delay=60 +

View File

@ -11,15 +11,15 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import datetime
import threading import threading
import time
from oslo_log import log from oslo_log import log
import oslo_messaging import oslo_messaging
from oslo_service import service as os_service from oslo_service import service as os_service
from vitrage.entity_graph.processor import processor as proc from vitrage.entity_graph import EVALUATOR_TOPIC
from vitrage.entity_graph.processor.processor import Processor
from vitrage.entity_graph.vitrage_init import VitrageInit from vitrage.entity_graph.vitrage_init import VitrageInit
from vitrage.evaluator.evaluator_service import EvaluatorManager from vitrage.evaluator.evaluator_service import EvaluatorManager
from vitrage import messaging from vitrage import messaging
@ -31,30 +31,31 @@ class VitrageGraphService(os_service.Service):
def __init__(self, def __init__(self,
conf, conf,
evaluator_queue,
graph): graph):
super(VitrageGraphService, self).__init__() super(VitrageGraphService, self).__init__()
self.conf = conf self.conf = conf
self.evaluator_queue = evaluator_queue
self.graph = graph self.graph = graph
self.evaluator = EvaluatorManager(conf, graph, evaluator_queue) self.evaluator = EvaluatorManager(conf, graph)
self.init = VitrageInit(conf, graph, self.evaluator, evaluator_queue) self.init = VitrageInit(conf, graph, self.evaluator)
self.processor = proc.Processor(self.conf, self.processor = Processor(self.conf, self.init, e_graph=graph)
self.init, self.listener = self._init_listener()
e_graph=graph)
self.processor_lock = threading.RLock() def _init_listener(self):
self.listener = self._create_datasources_event_listener() collector_topic = self.conf.datasources.notification_topic_collector
evaluator_topic = EVALUATOR_TOPIC
return TwoPriorityListener(
self.conf,
self.processor.process_event,
collector_topic,
evaluator_topic)
def start(self): def start(self):
LOG.info("Vitrage Graph Service - Starting...") LOG.info("Vitrage Graph Service - Starting...")
super(VitrageGraphService, self).start() super(VitrageGraphService, self).start()
self.tg.add_timer(0.1, self._process_event_non_blocking)
self.tg.add_thread( self.tg.add_thread(
self.init.initializing_process, self.init.initializing_process,
on_end_messages_func=self.processor.on_recieved_all_end_messages) on_end_messages_func=self.processor.on_recieved_all_end_messages)
self.listener.start() self.listener.start()
LOG.info("Vitrage Graph Service - Started!") LOG.info("Vitrage Graph Service - Started!")
def stop(self, graceful=False): def stop(self, graceful=False):
@ -66,50 +67,66 @@ class VitrageGraphService(os_service.Service):
LOG.info("Vitrage Graph Service - Stopped!") LOG.info("Vitrage Graph Service - Stopped!")
def _process_event_non_blocking(self):
"""Process events received from datasource
In order that other services (such as graph consistency, api handler) PRIORITY_DELAY = 0.05
could get work time as well, the work processing performed for 2
seconds and goes to sleep for 1 second. if there are more events in
the queue they are done when timer returns. class TwoPriorityListener(object):
""" def __init__(self, conf, do_work_func, topic_low, topic_high):
with self.processor_lock: self._conf = conf
start_time = datetime.datetime.now() self._do_work_func = do_work_func
while not self.evaluator_queue.empty(): self._lock = threading.Lock()
time_delta = datetime.datetime.now() - start_time self._high_event_finish_time = 0
if time_delta.total_seconds() >= 2:
self._low_pri_listener = self._init_listener(
topic_low, self._do_low_priority_work)
self._high_pri_listener = self._init_listener(
topic_high, self._do_high_priority_work)
def start(self):
self._high_pri_listener.start()
self._low_pri_listener.start()
def stop(self):
self._low_pri_listener.stop()
self._high_pri_listener.stop()
def wait(self):
self._low_pri_listener.wait()
self._high_pri_listener.wait()
def _do_high_priority_work(self, event):
self._lock.acquire()
self._do_work_func(event)
self._high_event_finish_time = time.time()
self._lock.release()
def _do_low_priority_work(self, event):
while True:
self._lock.acquire()
if (time.time() - self._high_event_finish_time) < PRIORITY_DELAY:
self._lock.release()
time.sleep(PRIORITY_DELAY)
else:
break break
if not self.evaluator_queue.empty(): self._do_work_func(event)
self.do_process(self.evaluator_queue) self._lock.release()
def do_process(self, queue): def _init_listener(self, topic, callback):
try: if not topic:
event = queue.get() return
self.processor.process_event(event)
except Exception as e:
LOG.exception("Exception: %s", e)
def _create_datasources_event_listener(self):
topic = self.conf.datasources.notification_topic_collector
transport = messaging.get_transport(self.conf)
targets = [oslo_messaging.Target(topic=topic)]
return messaging.get_notification_listener( return messaging.get_notification_listener(
transport, transport=messaging.get_transport(self._conf),
targets, targets=[oslo_messaging.Target(topic=topic)],
[PushNotificationsEndpoint(self.processor.process_event, endpoints=[PushNotificationsEndpoint(callback)])
self.processor_lock)])
class PushNotificationsEndpoint(object): class PushNotificationsEndpoint(object):
def __init__(self, process_event_callback):
def __init__(self, process_event_callback, processor_lock):
self.process_event_callback = process_event_callback self.process_event_callback = process_event_callback
self.processor_lock = processor_lock
def info(self, ctxt, publisher_id, event_type, payload, metadata): def info(self, ctxt, publisher_id, event_type, payload, metadata):
try: try:
with self.processor_lock:
self.process_event_callback(payload) self.process_event_callback(payload)
except Exception as e: except Exception as e:
LOG.exception(e) LOG.exception(e)

View File

@ -14,13 +14,7 @@
from oslo_log import log from oslo_log import log
import time import time
from vitrage.common.constants import DatasourceAction
from vitrage.common.constants import DatasourceProperties as DSProps
from vitrage.common.constants import EntityCategory
from vitrage.common.constants import GraphAction
from vitrage.common.constants import VertexProperties as VProps from vitrage.common.constants import VertexProperties as VProps
from vitrage.datasources.consistency import CONSISTENCY_DATASOURCE
from vitrage.utils.datetime import utcnow
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -30,11 +24,10 @@ class VitrageInit(object):
RECEIVED_ALL_END_MESSAGES = 'received_all_end_messages' RECEIVED_ALL_END_MESSAGES = 'received_all_end_messages'
FINISHED = 'finished' FINISHED = 'finished'
def __init__(self, conf, graph=None, evaluator=None, evaluator_queue=None): def __init__(self, conf, graph=None, evaluator=None):
self.conf = conf self.conf = conf
self.graph = graph self.graph = graph
self.evaluator = evaluator self.evaluator = evaluator
self.evaluator_queue = evaluator_queue
self.status = self.STARTED self.status = self.STARTED
self.end_messages = {} self.end_messages = {}
@ -50,14 +43,14 @@ class VitrageInit(object):
on_end_messages_func() on_end_messages_func()
timestamp = str(utcnow())
self.evaluator.run_evaluator() self.evaluator.run_evaluator()
if not self._wait_for_action(self.evaluator_queue.empty): # TODO(idan_hefetz) As vitrage is not yet persistent, there aren't
LOG.error('Evaluator Queue Not Empty') # TODO(idan_hefetz) any deduced alarms to be removed during init
# if not self._wait_for_action(self.evaluator_queue.empty):
self._mark_old_deduced_alarms_as_deleted(timestamp, self.graph, # LOG.error('Evaluator Queue Not Empty')
self.evaluator_queue) # self._mark_old_deduced_alarms_as_deleted(timestamp, self.graph,
# self.evaluator_queue)
self.status = self.FINISHED self.status = self.FINISHED
LOG.info('Init Finished') LOG.info('Init Finished')
@ -87,30 +80,30 @@ class VitrageInit(object):
count_retries += 1 count_retries += 1
time.sleep(self.conf.consistency.initialization_interval) time.sleep(self.conf.consistency.initialization_interval)
def _mark_old_deduced_alarms_as_deleted(self, timestamp, graph, out_queue): # def _mark_old_deduced_alarms_as_deleted(self, timestamp,graph,out_queue):
query = { # query = {
'and': [ # 'and': [
{'==': {VProps.VITRAGE_CATEGORY: EntityCategory.ALARM}}, # {'==': {VProps.VITRAGE_CATEGORY: EntityCategory.ALARM}},
{'==': {VProps.VITRAGE_TYPE: VProps.VITRAGE_TYPE}}, # {'==': {VProps.VITRAGE_TYPE: VProps.VITRAGE_TYPE}},
{'<': {VProps.VITRAGE_SAMPLE_TIMESTAMP: timestamp}} # {'<': {VProps.VITRAGE_SAMPLE_TIMESTAMP: timestamp}}
] # ]
} # }
old_deduced_alarms = graph.get_vertices(query_dict=query) # old_deduced_alarms = graph.get_vertices(query_dict=query)
self._push_events_to_queue(old_deduced_alarms, # self._push_events_to_queue(old_deduced_alarms,
GraphAction.DELETE_ENTITY, # GraphAction.DELETE_ENTITY,
out_queue) # out_queue)
#
def _push_events_to_queue(self, vertices, action, out_queue): # def _push_events_to_queue(self, vertices, action, out_queue):
for vertex in vertices: # for vertex in vertices:
event = { # event = {
DSProps.ENTITY_TYPE: CONSISTENCY_DATASOURCE, # DSProps.ENTITY_TYPE: CONSISTENCY_DATASOURCE,
DSProps.DATASOURCE_ACTION: DatasourceAction.UPDATE, # DSProps.DATASOURCE_ACTION: DatasourceAction.UPDATE,
DSProps.SAMPLE_DATE: str(utcnow()), # DSProps.SAMPLE_DATE: str(utcnow()),
DSProps.EVENT_TYPE: action, # DSProps.EVENT_TYPE: action,
VProps.VITRAGE_ID: vertex[VProps.VITRAGE_ID], # VProps.VITRAGE_ID: vertex[VProps.VITRAGE_ID],
VProps.ID: vertex.get(VProps.ID, None), # VProps.ID: vertex.get(VProps.ID, None),
VProps.VITRAGE_TYPE: vertex[VProps.VITRAGE_TYPE], # VProps.VITRAGE_TYPE: vertex[VProps.VITRAGE_TYPE],
VProps.VITRAGE_CATEGORY: vertex[VProps.VITRAGE_CATEGORY], # VProps.VITRAGE_CATEGORY: vertex[VProps.VITRAGE_CATEGORY],
VProps.IS_REAL_VITRAGE_ID: True # VProps.IS_REAL_VITRAGE_ID: True
} # }
out_queue.put(event) # out_queue.put(event)

View File

@ -44,11 +44,14 @@ from vitrage.utils import datetime as datetime_utils
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
EVALUATOR_EVENT = 'evaluator.event'
class ActionExecutor(object): class ActionExecutor(object):
def __init__(self, conf, event_queue): def __init__(self, conf, actions_callback):
self.event_queue = event_queue
self.actions_callback = actions_callback
self.notifier = EvaluatorNotifier(conf) self.notifier = EvaluatorNotifier(conf)
self.action_recipes = ActionExecutor._register_action_recipes() self.action_recipes = ActionExecutor._register_action_recipes()
@ -78,7 +81,7 @@ class ActionExecutor(object):
ActionExecutor._add_default_properties(event) ActionExecutor._add_default_properties(event)
event[EVALUATOR_EVENT_TYPE] = ADD_VERTEX event[EVALUATOR_EVENT_TYPE] = ADD_VERTEX
self.event_queue.put(event) self.actions_callback(EVALUATOR_EVENT, event)
def _update_vertex(self, params): def _update_vertex(self, params):
@ -86,14 +89,14 @@ class ActionExecutor(object):
ActionExecutor._add_default_properties(event) ActionExecutor._add_default_properties(event)
event[EVALUATOR_EVENT_TYPE] = UPDATE_VERTEX event[EVALUATOR_EVENT_TYPE] = UPDATE_VERTEX
self.event_queue.put(event) self.actions_callback(EVALUATOR_EVENT, event)
def _remove_vertex(self, params): def _remove_vertex(self, params):
event = copy.deepcopy(params) event = copy.deepcopy(params)
ActionExecutor._add_default_properties(event) ActionExecutor._add_default_properties(event)
event[EVALUATOR_EVENT_TYPE] = REMOVE_VERTEX event[EVALUATOR_EVENT_TYPE] = REMOVE_VERTEX
self.event_queue.put(event) self.actions_callback(EVALUATOR_EVENT, event)
def _add_edge(self, params): def _add_edge(self, params):
@ -101,7 +104,7 @@ class ActionExecutor(object):
ActionExecutor._add_default_properties(event) ActionExecutor._add_default_properties(event)
event[EVALUATOR_EVENT_TYPE] = ADD_EDGE event[EVALUATOR_EVENT_TYPE] = ADD_EDGE
self.event_queue.put(event) self.actions_callback(EVALUATOR_EVENT, event)
def _remove_edge(self, params): def _remove_edge(self, params):
@ -109,7 +112,7 @@ class ActionExecutor(object):
ActionExecutor._add_default_properties(event) ActionExecutor._add_default_properties(event)
event[EVALUATOR_EVENT_TYPE] = REMOVE_EDGE event[EVALUATOR_EVENT_TYPE] = REMOVE_EDGE
self.event_queue.put(event) self.actions_callback(EVALUATOR_EVENT, event)
def _execute_external(self, params): def _execute_external(self, params):

View File

@ -18,11 +18,10 @@ import six
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class EvaluatorBase(object): class EvaluatorBase(object):
def __init__(self, conf, entity_graph, evaluator_queue): def __init__(self, conf, entity_graph):
super(EvaluatorBase, self).__init__() super(EvaluatorBase, self).__init__()
self._conf = conf self._conf = conf
self._entity_graph = entity_graph self._entity_graph = entity_graph
self._evaluator_queue = evaluator_queue
@abc.abstractmethod @abc.abstractmethod
def run_evaluator(self): def run_evaluator(self):

View File

@ -18,10 +18,13 @@ import time
from oslo_concurrency import processutils from oslo_concurrency import processutils
from oslo_log import log from oslo_log import log
from oslo_service import service as os_service from oslo_service import service as os_service
from vitrage.entity_graph import EVALUATOR_TOPIC
from vitrage.evaluator.evaluator_base import EvaluatorBase from vitrage.evaluator.evaluator_base import EvaluatorBase
from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator
from vitrage.evaluator.scenario_repository import ScenarioRepository from vitrage.evaluator.scenario_repository import ScenarioRepository
from vitrage.messaging import VitrageNotifier
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -32,9 +35,8 @@ POISON_PILL = None
class EvaluatorManager(EvaluatorBase): class EvaluatorManager(EvaluatorBase):
def __init__(self, conf, entity_graph, evaluator_queue): def __init__(self, conf, entity_graph):
super(EvaluatorManager, self).__init__(conf, entity_graph, super(EvaluatorManager, self).__init__(conf, entity_graph)
evaluator_queue)
self._workers_num = conf.evaluator.workers or \ self._workers_num = conf.evaluator.workers or \
processutils.get_worker_count() processutils.get_worker_count()
self._worker_queues = list() self._worker_queues = list()
@ -63,7 +65,6 @@ class EvaluatorManager(EvaluatorBase):
tasks_queue, tasks_queue,
self._entity_graph, self._entity_graph,
scenario_repo, scenario_repo,
self._evaluator_queue,
enabled) enabled)
self._p_launcher.launch_service(w) self._p_launcher.launch_service(w)
self._worker_queues.append(tasks_queue) self._worker_queues.append(tasks_queue)
@ -103,25 +104,27 @@ class EvaluatorWorker(os_service.Service):
task_queue, task_queue,
entity_graph, entity_graph,
scenario_repo, scenario_repo,
evaluator_queue,
enabled=False): enabled=False):
super(EvaluatorWorker, self).__init__() super(EvaluatorWorker, self).__init__()
self._conf = conf self._conf = conf
self._task_queue = task_queue self._task_queue = task_queue
self._entity_graph = entity_graph self._entity_graph = entity_graph
self._scenario_repo = scenario_repo self._scenario_repo = scenario_repo
self._evaluator_queue = evaluator_queue
self._enabled = enabled self._enabled = enabled
self._evaluator = None self._evaluator = None
def start(self): def start(self):
super(EvaluatorWorker, self).start() super(EvaluatorWorker, self).start()
actions_callback = VitrageNotifier(
conf=self._conf,
publisher_id='vitrage_evaluator',
topic=EVALUATOR_TOPIC).notify
self._entity_graph.notifier._subscriptions = [] # Quick n dirty self._entity_graph.notifier._subscriptions = [] # Quick n dirty
self._evaluator = ScenarioEvaluator( self._evaluator = ScenarioEvaluator(
self._conf, self._conf,
self._entity_graph, self._entity_graph,
self._scenario_repo, self._scenario_repo,
self._evaluator_queue, actions_callback,
self._enabled) self._enabled)
self.tg.add_thread(self._read_queue) self.tg.add_thread(self._read_queue)
LOG.info("EvaluatorWorkerService - Started!") LOG.info("EvaluatorWorkerService - Started!")

View File

@ -57,12 +57,12 @@ class ScenarioEvaluator(EvaluatorBase):
conf, conf,
e_graph, e_graph,
scenario_repo, scenario_repo,
event_queue, actions_callback,
enabled=False): enabled=False):
super(ScenarioEvaluator, self).__init__(conf, e_graph, event_queue) super(ScenarioEvaluator, self).__init__(conf, e_graph)
self._db_connection = storage.get_connection_from_config(self._conf) self._db_connection = storage.get_connection_from_config(self._conf)
self._scenario_repo = scenario_repo self._scenario_repo = scenario_repo
self._action_executor = ActionExecutor(self._conf, event_queue) self._action_executor = ActionExecutor(self._conf, actions_callback)
self._entity_graph.subscribe(self.process_event) self._entity_graph.subscribe(self.process_event)
self._active_actions_tracker = ActiveActionsTracker( self._active_actions_tracker = ActiveActionsTracker(
self._conf, self._db_connection) self._conf, self._db_connection)

View File

@ -88,14 +88,25 @@ class TestConsistencyFunctional(TestFunctionalBase):
cls.graph) cls.graph)
cls.event_queue = queue.Queue() cls.event_queue = queue.Queue()
def actions_callback(event_type, data):
"""Mock notify method
Mocks vitrage.messaging.VitrageNotifier.notify(event_type, data)
:param event_type: is currently always the same and is ignored
:param data:
"""
cls.event_queue.put(data)
scenario_repo = ScenarioRepository(cls.conf) scenario_repo = ScenarioRepository(cls.conf)
cls.evaluator = ScenarioEvaluator(cls.conf, cls.evaluator = ScenarioEvaluator(cls.conf,
cls.processor.entity_graph, cls.processor.entity_graph,
scenario_repo, scenario_repo,
cls.event_queue) actions_callback)
cls.consistency_enforcer = ConsistencyEnforcer( cls.consistency_enforcer = ConsistencyEnforcer(
cls.conf, cls.conf,
cls.event_queue, actions_callback,
cls.processor.entity_graph) cls.processor.entity_graph)
@unittest.skip("test_initializing_process skipping") @unittest.skip("test_initializing_process skipping")

View File

@ -66,6 +66,14 @@ class TestActionExecutor(TestFunctionalBase):
for datasource_name in cls.conf.datasources.types: for datasource_name in cls.conf.datasources.types:
register_opts(cls.conf, datasource_name, cls.conf.datasources.path) register_opts(cls.conf, datasource_name, cls.conf.datasources.path)
def _init_executer(self):
event_queue = queue.Queue()
def actions_callback(event_type, data):
event_queue.put(data)
return event_queue, ActionExecutor(self.conf, actions_callback)
def test_execute_set_state(self): def test_execute_set_state(self):
# Test Setup # Test Setup
@ -80,8 +88,7 @@ class TestActionExecutor(TestFunctionalBase):
props = {TFields.STATE: OperationalResourceState.SUBOPTIMAL} props = {TFields.STATE: OperationalResourceState.SUBOPTIMAL}
action_spec = ActionSpecs(0, ActionType.SET_STATE, targets, props) action_spec = ActionSpecs(0, ActionType.SET_STATE, targets, props)
event_queue = queue.Queue() event_queue, action_executor = self._init_executer()
action_executor = ActionExecutor(self.conf, event_queue)
# Test Action - do # Test Action - do
action_executor.execute(action_spec, ActionMode.DO) action_executor.execute(action_spec, ActionMode.DO)
@ -131,8 +138,7 @@ class TestActionExecutor(TestFunctionalBase):
props = {} props = {}
action_spec = ActionSpecs(0, ActionType.MARK_DOWN, targets, props) action_spec = ActionSpecs(0, ActionType.MARK_DOWN, targets, props)
event_queue = queue.Queue() event_queue, action_executor = self._init_executer()
action_executor = ActionExecutor(self.conf, event_queue)
# Test Action - do # Test Action - do
action_executor.execute(action_spec, ActionMode.DO) action_executor.execute(action_spec, ActionMode.DO)
@ -168,8 +174,7 @@ class TestActionExecutor(TestFunctionalBase):
props = {} props = {}
action_spec = ActionSpecs(0, ActionType.MARK_DOWN, targets, props) action_spec = ActionSpecs(0, ActionType.MARK_DOWN, targets, props)
event_queue = queue.Queue() event_queue, action_executor = self._init_executer()
action_executor = ActionExecutor(self.conf, event_queue)
# Test Action - do # Test Action - do
action_executor.execute(action_spec, ActionMode.DO) action_executor.execute(action_spec, ActionMode.DO)
@ -223,8 +228,7 @@ class TestActionExecutor(TestFunctionalBase):
action_spec = ActionSpecs( action_spec = ActionSpecs(
0, ActionType.ADD_CAUSAL_RELATIONSHIP, targets, {}) 0, ActionType.ADD_CAUSAL_RELATIONSHIP, targets, {})
event_queue = queue.Queue() event_queue, action_executor = self._init_executer()
action_executor = ActionExecutor(self.conf, event_queue)
before_edge = processor.entity_graph.get_edge(alarm2.vertex_id, before_edge = processor.entity_graph.get_edge(alarm2.vertex_id,
alarm1.vertex_id, alarm1.vertex_id,
@ -266,8 +270,8 @@ class TestActionExecutor(TestFunctionalBase):
alarm_vertex_attrs = {VProps.VITRAGE_TYPE: VITRAGE_DATASOURCE} alarm_vertex_attrs = {VProps.VITRAGE_TYPE: VITRAGE_DATASOURCE}
before_alarms = processor.entity_graph.get_vertices( before_alarms = processor.entity_graph.get_vertices(
vertex_attr_filter=alarm_vertex_attrs) vertex_attr_filter=alarm_vertex_attrs)
event_queue = queue.Queue()
action_executor = ActionExecutor(self.conf, event_queue) event_queue, action_executor = self._init_executer()
# Test Action # Test Action
action_executor.execute(action_spec, ActionMode.DO) action_executor.execute(action_spec, ActionMode.DO)
@ -330,8 +334,7 @@ class TestActionExecutor(TestFunctionalBase):
before_alarms = processor.entity_graph.get_vertices( before_alarms = processor.entity_graph.get_vertices(
vertex_attr_filter=alarm_vertex_attrs) vertex_attr_filter=alarm_vertex_attrs)
event_queue = queue.Queue() event_queue, action_executor = self._init_executer()
action_executor = ActionExecutor(self.conf, event_queue)
# Test Action - undo # Test Action - undo
action_executor.execute(action_spec, ActionMode.UNDO) action_executor.execute(action_spec, ActionMode.UNDO)

View File

@ -1370,8 +1370,21 @@ class TestScenarioEvaluator(TestFunctionalBase):
def _init_system(self): def _init_system(self):
processor = self._create_processor_with_graph(self.conf) processor = self._create_processor_with_graph(self.conf)
event_queue = queue.Queue() event_queue = queue.Queue()
evaluator = ScenarioEvaluator(self.conf, processor.entity_graph,
self.scenario_repository, event_queue, def actions_callback(event_type, data):
"""Mock notify method
Mocks vitrage.messaging.VitrageNotifier.notify(event_type, data)
:param event_type: is currently always the same and is ignored
:param data:
"""
event_queue.put(data)
evaluator = ScenarioEvaluator(self.conf,
processor.entity_graph,
self.scenario_repository,
actions_callback,
enabled=True) enabled=True)
return event_queue, processor, evaluator return event_queue, processor, evaluator

View File

@ -0,0 +1,72 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
from vitrage.entity_graph.service import TwoPriorityListener
from vitrage.tests import base
class TwoPriorityListenerTest(base.BaseTest):
@classmethod
def setUpClass(cls):
super(TwoPriorityListenerTest, cls).setUpClass()
cls.calc_result = 0
def do_work(self, x):
if x:
self.calc_result = self.calc_result * 2
else:
self.calc_result = self.calc_result + 1
def test_queue_coordination(self):
explain = """
initially calc_result is 0.
each high priority call multiplies by *2
each low priority call adds +1
so, if all the high calls are performed first, and then all the low,
the result should be the number of low priority calls.
0*(2^n) + 1*n
"""
priority_listener = TwoPriorityListener(None, self.do_work, None, None)
def write_high():
for i in range(10000):
priority_listener._do_high_priority_work(True)
def write_low():
for i in range(10000):
priority_listener._do_low_priority_work(False)
self.calc_result = 0
t1 = threading.Thread(name='high_1', target=write_high)
t2 = threading.Thread(name='high_2', target=write_high)
t3 = threading.Thread(name='low_1', target=write_low)
t4 = threading.Thread(name='low_2', target=write_low)
self._start_and_join(t1, t2, t3, t4)
self.assertEqual(20000, self.calc_result, explain)
self.calc_result = 0
t1 = threading.Thread(name='high_1', target=write_high)
t2 = threading.Thread(name='low_1', target=write_low)
t3 = threading.Thread(name='low_2', target=write_low)
t4 = threading.Thread(name='high_2', target=write_high)
self._start_and_join(t1, t2, t3, t4)
self.assertEqual(20000, self.calc_result, explain)
def _start_and_join(self, *args):
for t in args:
t.start()
for t in args:
t.join()