Merge "graph cloning logic extracted to base class"

This commit is contained in:
Zuul 2018-01-16 12:09:59 +00:00 committed by Gerrit Code Review
commit 6fb1e323e1
5 changed files with 168 additions and 124 deletions

View File

@ -1,4 +1,4 @@
# Copyright 2017 - Nokia
# Copyright 2018 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -11,19 +11,4 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class EvaluatorBase(object):
def __init__(self, conf, entity_graph):
super(EvaluatorBase, self).__init__()
self._conf = conf
self._entity_graph = entity_graph
@abc.abstractmethod
def run_evaluator(self):
"""Start evaluation """
pass
__author__ = 'stack'

View File

@ -0,0 +1,126 @@
# Copyright 2018 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import time
from oslo_log import log
from oslo_service import service as os_service
LOG = log.getLogger(__name__)
GRAPH_UPDATE = 'graph_update'
POISON_PILL = None
class GraphCloneManagerBase(object):
def __init__(self, conf, entity_graph, worker_num):
self._conf = conf
self._entity_graph = entity_graph
self._workers_num = worker_num
self._worker_queues = list()
self._p_launcher = os_service.ProcessLauncher(conf)
def start(self):
LOG.info('%s start %s processes', self.__class__.__name__,
self._workers_num)
for i in range(self._workers_num):
worker_queue = self._run_worker(i, self._workers_num)
self._worker_queues.append(worker_queue)
self.before_subscribe()
self._entity_graph.subscribe(self.notify_graph_update)
@abc.abstractmethod
def _run_worker(self, worker_index, workers_num):
raise NotImplementedError
@abc.abstractmethod
def before_subscribe(self):
pass
def notify_graph_update(self, before, current, is_vertex, *args, **kwargs):
"""Notify all workers
This method is subscribed to entity graph changes.
Per each change in the main entity graph, this method will notify
each of the evaluators, causing them to update their own graph.
"""
self._notify_and_wait((GRAPH_UPDATE, before, current, is_vertex))
def _notify_and_wait(self, payload):
for q in self._worker_queues:
q.put(payload)
time.sleep(0) # context switch before join
for q in self._worker_queues:
q.join()
def stop_all_workers(self):
self._notify_and_wait(POISON_PILL)
for q in self._worker_queues:
q.close()
self._worker_queues = list()
class GraphCloneWorkerBase(os_service.Service):
def __init__(self,
conf,
task_queue,
entity_graph):
super(GraphCloneWorkerBase, self).__init__()
self._conf = conf
self._task_queue = task_queue
self._entity_graph = entity_graph
def start(self):
super(GraphCloneWorkerBase, self).start()
self._entity_graph.notifier._subscriptions = [] # Quick n dirty
self.tg.add_thread(self._read_queue)
LOG.info("%s - Started!", self.__class__.__name__)
def _read_queue(self):
while True:
next_task = self._task_queue.get()
if next_task is POISON_PILL:
self._task_queue.task_done()
break
try:
self.do_task(next_task)
except Exception as e:
LOG.exception("Graph may not be in sync: exception %s", e)
self._task_queue.task_done()
# Evaluator queue may have been updated, thus the sleep:
time.sleep(0)
def do_task(self, task):
action = task[0]
if action == GRAPH_UPDATE:
(action, before, current, is_vertex) = task
self._graph_update(before, current, is_vertex)
def _graph_update(self, before, current, is_vertex):
if current:
if is_vertex:
self._entity_graph.add_vertex(current)
else:
self._entity_graph.add_edge(current)
else:
if is_vertex:
self._entity_graph.delete_vertex(before)
else:
self._entity_graph.delete_edge(before)
def stop(self, graceful=False):
super(GraphCloneWorkerBase, self).stop(graceful)
LOG.info("%s - Stopped!", self.__class__.__name__)

View File

@ -43,7 +43,7 @@ class VitrageInit(object):
on_end_messages_func()
self.evaluator.run_evaluator()
self.evaluator.start()
# TODO(idan_hefetz) As vitrage is not yet persistent, there aren't
# TODO(idan_hefetz) any deduced alarms to be removed during init

View File

@ -13,157 +13,90 @@
# under the License.
import multiprocessing
import time
from oslo_concurrency import processutils
from oslo_log import log
from oslo_service import service as os_service
from vitrage.entity_graph import EVALUATOR_TOPIC
from vitrage.evaluator.evaluator_base import EvaluatorBase
from vitrage.entity_graph.graph_clone import base
from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator
from vitrage.evaluator.scenario_repository import ScenarioRepository
from vitrage.messaging import VitrageNotifier
LOG = log.getLogger(__name__)
START_EVALUATION = 'start_evaluation'
POISON_PILL = None
RELOAD_TEMPLATES = 'reload_templates'
class EvaluatorManager(EvaluatorBase):
class EvaluatorManager(base.GraphCloneManagerBase):
def __init__(self, conf, entity_graph):
super(EvaluatorManager, self).__init__(conf, entity_graph)
self._workers_num = conf.evaluator.workers or \
processutils.get_worker_count()
self._worker_queues = list()
self._p_launcher = os_service.ProcessLauncher(conf)
workers_num = conf.evaluator.workers or processutils.get_worker_count()
super(EvaluatorManager, self).__init__(conf, entity_graph, workers_num)
def run_evaluator(self):
LOG.info('Starting %s Evaluator Processes', str(self._workers_num))
for i in range(self._workers_num):
self._add_worker(enabled=False)
self._notify_all(None, None, None, evaluator_action=START_EVALUATION)
self._entity_graph.subscribe(self._notify_all)
def before_subscribe(self):
self.start_evaluations()
def _add_worker(self, enabled=False):
def _run_worker(self, worker_index, workers_num):
"""Create an EvaluatorWorker and it's task queue
The new worker is initialized with a scenario repository
that only contains a portion of the templates
"""
scenario_repo = ScenarioRepository(
self._conf,
len(self._worker_queues),
self._workers_num)
tasks_queue = multiprocessing.JoinableQueue()
w = EvaluatorWorker(
self._conf,
tasks_queue,
self._entity_graph,
scenario_repo,
enabled)
worker_index,
workers_num)
self._p_launcher.launch_service(w)
self._worker_queues.append(tasks_queue)
return tasks_queue
def _notify_all(self, before, current, is_vertex, *args, **kwargs):
"""Notify all workers
def start_evaluations(self):
self._notify_and_wait((START_EVALUATION,))
This method is subscribed to entity graph changes.
Per each change in the main entity graph, this method will notify
each of the evaluators, causing them to update their own graph.
"""
evaluator_action = kwargs.get('evaluator_action', None)
self._notify_and_wait((before, current, is_vertex, evaluator_action))
def _notify_and_wait(self, payload):
for q in self._worker_queues:
q.put(payload)
time.sleep(0) # context switch before join
for q in self._worker_queues:
q.join()
def stop_all_workers(self):
self._notify_and_wait(POISON_PILL)
for q in self._worker_queues:
q.close()
self._worker_queues = list()
def reload_all_workers(self, enabled=True):
self.stop_all_workers()
for i in range(self._workers_num):
self._add_worker(enabled=enabled)
def reload_evaluators_templates(self):
self._notify_and_wait((RELOAD_TEMPLATES,))
class EvaluatorWorker(os_service.Service):
class EvaluatorWorker(base.GraphCloneWorkerBase):
def __init__(self,
conf,
task_queue,
entity_graph,
scenario_repo,
enabled=False):
super(EvaluatorWorker, self).__init__()
self._conf = conf
self._task_queue = task_queue
self._entity_graph = entity_graph
self._scenario_repo = scenario_repo
self._enabled = enabled
e_graph,
worker_index,
workers_num):
super(EvaluatorWorker, self).__init__(conf, task_queue, e_graph)
self._worker_index = worker_index
self._workers_num = workers_num
self._evaluator = None
def start(self):
super(EvaluatorWorker, self).start()
scenario_repo = ScenarioRepository(self._conf, self._worker_index,
self._workers_num)
actions_callback = VitrageNotifier(
conf=self._conf,
publisher_id='vitrage_evaluator',
topic=EVALUATOR_TOPIC).notify
self._entity_graph.notifier._subscriptions = [] # Quick n dirty
self._evaluator = ScenarioEvaluator(
self._conf,
self._entity_graph,
self._scenario_repo,
scenario_repo,
actions_callback,
self._enabled)
self.tg.add_thread(self._read_queue)
LOG.info("EvaluatorWorkerService - Started!")
enabled=False)
self._evaluator.scenario_repo.log_enabled_scenarios()
def _read_queue(self):
while True:
next_task = self._task_queue.get()
if next_task is POISON_PILL:
self._task_queue.task_done()
break
try:
self._do_task(next_task)
except Exception as e:
LOG.exception("Graph may not be in sync: exception %s", e)
self._task_queue.task_done()
# Evaluator queue may have been updated, thus the sleep:
time.sleep(0)
def do_task(self, task):
super(EvaluatorWorker, self).do_task(task)
action = task[0]
if action == START_EVALUATION:
self._evaluator.run_evaluator()
elif action == RELOAD_TEMPLATES:
self._reload_templates()
def _do_task(self, task):
(before, current, is_vertex, action) = task
if not action:
self._graph_update(before, current, is_vertex)
elif action == START_EVALUATION:
self._evaluator.run_evaluator()
def _graph_update(self, before, current, is_vertex):
if current:
if is_vertex:
self._entity_graph.add_vertex(current)
else:
self._entity_graph.add_edge(current)
else:
if is_vertex:
self._entity_graph.delete_vertex(before)
else:
self._entity_graph.delete_edge(before)
def stop(self, graceful=False):
super(EvaluatorWorker, self).stop(graceful)
self.tg.stop()
LOG.info("EvaluatorWorkerService - Stopped!")
def _reload_templates(self):
raise NotImplementedError()

View File

@ -28,7 +28,6 @@ from vitrage.evaluator.actions.action_executor import ActionExecutor
from vitrage.evaluator.actions.base import ActionMode
from vitrage.evaluator.actions.base import ActionType
import vitrage.evaluator.actions.priority_tools as pt
from vitrage.evaluator.evaluator_base import EvaluatorBase
from vitrage.evaluator.template_data import ActionSpecs
from vitrage.evaluator.template_data import EdgeDescription
from vitrage.graph.algo_driver.algorithm import Mapping
@ -52,7 +51,7 @@ TARGET = 'target'
SOURCE = 'source'
class ScenarioEvaluator(EvaluatorBase):
class ScenarioEvaluator(object):
def __init__(self,
conf,
@ -60,7 +59,8 @@ class ScenarioEvaluator(EvaluatorBase):
scenario_repo,
actions_callback,
enabled=False):
super(ScenarioEvaluator, self).__init__(conf, e_graph)
self._conf = conf
self._entity_graph = e_graph
self._db_connection = storage.get_connection_from_config(self._conf)
self._scenario_repo = scenario_repo
self._action_executor = ActionExecutor(self._conf, actions_callback)