separate api handler to a dedicated process

Event processing and api handling should be able to run in parallel.

Change-Id: Ia70fe97f926f854a7c9c3c860064cb0997f639ed
Depends-On: I84268a8ff458374f40d09854fa64f6e66034ca27
Depends-On: Id1b7470cae1d85a0b6cc1a9c9cf0c4cf97bae641
This commit is contained in:
Idan Hefetz 2018-05-17 12:56:03 +00:00
parent 3c576e4e96
commit 7d484d979c
3 changed files with 63 additions and 74 deletions

View File

@ -1,69 +0,0 @@
# Copyright 2016 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
import oslo_messaging
from vitrage.common.utils import spawn
from vitrage.entity_graph import EVALUATOR_TOPIC
from vitrage.messaging import VitrageNotifier
from vitrage.api_handler.apis.alarm import AlarmApis
from vitrage.api_handler.apis.event import EventApis
from vitrage.api_handler.apis.rca import RcaApis
from vitrage.api_handler.apis.resource import ResourceApis
from vitrage.api_handler.apis.template import TemplateApis
from vitrage.api_handler.apis.topology import TopologyApis
from vitrage.api_handler.apis.webhook import WebhookApis
from vitrage import messaging
from vitrage import rpc as vitrage_rpc
from vitrage import storage
LOG = log.getLogger(__name__)
class VitrageApiHandlerService(object):
def __init__(self, conf, e_graph):
super(VitrageApiHandlerService, self).__init__()
self.conf = conf
self.entity_graph = e_graph
self.notifier = VitrageNotifier(self.conf, "vitrage.api",
[EVALUATOR_TOPIC])
self.db = storage.get_connection_from_config(conf)
def start(self):
spawn(self._start)
def _start(self):
LOG.info("Vitrage Api Handler Service - Starting...")
transport = messaging.get_rpc_transport(self.conf)
rabbit_hosts = self.conf.oslo_messaging_rabbit.rabbit_hosts
target = oslo_messaging.Target(topic=self.conf.rpc_topic,
server=rabbit_hosts)
endpoints = [TopologyApis(self.entity_graph, self.conf),
AlarmApis(self.entity_graph, self.conf),
RcaApis(self.entity_graph, self.conf),
TemplateApis(self.notifier, self.db),
EventApis(self.conf),
ResourceApis(self.entity_graph, self.conf),
WebhookApis(self.conf)]
server = vitrage_rpc.get_server(target, endpoints, transport)
server.start()
LOG.info("Vitrage Api Handler Service - Started!")

View File

@ -15,7 +15,6 @@
import sys
from vitrage.api_handler.service import VitrageApiHandlerService
from vitrage.cli import VITRAGE_TITLE
from vitrage.entity_graph import get_graph_driver
from vitrage.entity_graph.graph_init import VitrageGraphInit
@ -32,7 +31,6 @@ def main():
db_connection = storage.get_connection_from_config(conf)
clear_active_actions_table(db_connection)
VitrageApiHandlerService(conf, e_graph).start()
VitrageGraphInit(conf, e_graph, db_connection).run()

View File

@ -17,7 +17,15 @@ import multiprocessing
from oslo_concurrency import processutils as ps
from oslo_log import log
import oslo_messaging
from vitrage.api_handler.apis.alarm import AlarmApis
from vitrage.api_handler.apis.event import EventApis
from vitrage.api_handler.apis.rca import RcaApis
from vitrage.api_handler.apis.resource import ResourceApis
from vitrage.api_handler.apis.template import TemplateApis
from vitrage.api_handler.apis.topology import TopologyApis
from vitrage.api_handler.apis.webhook import WebhookApis
from vitrage.common.constants import TemplateStatus as TStatus
from vitrage.common.constants import TemplateTypes as TType
from vitrage.common.exception import VitrageError
@ -25,7 +33,9 @@ from vitrage.entity_graph import EVALUATOR_TOPIC
from vitrage.evaluator.actions.base import ActionMode
from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator
from vitrage.evaluator.scenario_repository import ScenarioRepository
from vitrage.messaging import VitrageNotifier
from vitrage import messaging
from vitrage import rpc as vitrage_rpc
from vitrage import storage
LOG = log.getLogger(__name__)
@ -53,9 +63,11 @@ class GraphWorkersManager(cotyledon.ServiceManager):
self._db = db
self._evaluator_queues = []
self._template_queues = []
self._api_queues = []
self._all_queues = []
self.add_evaluator_workers()
self.add_template_workers()
self.add_api_workers()
def add_evaluator_workers(self):
"""Add evaluator workers
@ -101,6 +113,24 @@ class GraphWorkersManager(cotyledon.ServiceManager):
self._template_queues = queues
self._all_queues.extend(queues)
def add_api_workers(self):
"""Add Api workers
Api workers receive all graph updates, hence are updated.
Each template worker holds a disabled scenario-evaluator that does
not process changes.
These also hold a rpc server and process the incoming Api calls
"""
if self._api_queues:
raise VitrageError('add_api_workers called more than once')
workers = self._conf.api.workers
queues = [multiprocessing.JoinableQueue() for i in range(workers)]
self.add(ApiWorker,
args=(self._conf, queues, self._entity_graph),
workers=workers)
self._api_queues = queues
self._all_queues.extend(queues)
def submit_graph_update(self, before, current, is_vertex, *args, **kwargs):
"""Graph update all workers
@ -243,7 +273,7 @@ class EvaluatorWorker(GraphCloneWorkerBase):
def _init_instance(self):
scenario_repo = ScenarioRepository(self._conf, self.worker_id,
self._workers_num)
actions_callback = VitrageNotifier(
actions_callback = messaging.VitrageNotifier(
conf=self._conf,
publisher_id='vitrage_evaluator',
topics=[EVALUATOR_TOPIC]).notify
@ -286,7 +316,7 @@ class TemplateLoaderWorker(GraphCloneWorkerBase):
name = 'TemplateLoaderWorker'
def _init_instance(self):
actions_callback = VitrageNotifier(
actions_callback = messaging.VitrageNotifier(
conf=self._conf,
publisher_id='vitrage_evaluator',
topics=[EVALUATOR_TOPIC]).notify
@ -323,3 +353,33 @@ class TemplateLoaderWorker(GraphCloneWorkerBase):
def _disable_evaluator(self):
self._entity_graph.notifier._subscriptions = [] # Quick n dirty
self._evaluator.enabled = False
class ApiWorker(GraphCloneWorkerBase):
name = 'ApiWorker'
def _init_instance(self):
conf = self._conf
LOG.info("Vitrage Api Handler Service - Starting...")
notifier = messaging.VitrageNotifier(conf, "vitrage.api",
[EVALUATOR_TOPIC])
db = storage.get_connection_from_config(conf)
transport = messaging.get_rpc_transport(conf)
rabbit_hosts = conf.oslo_messaging_rabbit.rabbit_hosts
target = oslo_messaging.Target(topic=conf.rpc_topic,
server=rabbit_hosts)
endpoints = [TopologyApis(self._entity_graph, conf),
AlarmApis(self._entity_graph, conf),
RcaApis(self._entity_graph, conf),
TemplateApis(notifier, db),
EventApis(conf),
ResourceApis(self._entity_graph, conf),
WebhookApis(conf)]
server = vitrage_rpc.get_server(target, endpoints, transport)
server.start()
LOG.info("Vitrage Api Handler Service - Started!")