rpc collector
active-active Collector service, detached from vitrage-graph service, exposing it's methods by RPC, so to only run when needed. Change-Id: I4a72420b5096e8483e5e99cceb16d99798c3bbe7 implements: blueprint rpc-collector
This commit is contained in:
parent
e85af78a76
commit
d8f14571ac
|
@ -308,8 +308,6 @@ function start_vitrage {
|
|||
run_process vitrage-persistor "$VITRAGE_BIN_DIR/vitrage-persistor --config-file $VITRAGE_CONF"
|
||||
run_process vitrage-snmp-parsing "$VITRAGE_BIN_DIR/vitrage-snmp-parsing --config-file $VITRAGE_CONF"
|
||||
|
||||
write_systemd_dependency vitrage-graph vitrage-collector
|
||||
|
||||
change_systemd_kill_mode vitrage-graph vitrage-collector
|
||||
|
||||
}
|
||||
|
@ -322,24 +320,6 @@ function change_systemd_kill_mode {
|
|||
iniset -sudo $unitfile "Service" "KillMode" "control-group"
|
||||
}
|
||||
|
||||
function write_systemd_dependency {
|
||||
local service_after=$1
|
||||
local service_before=$2
|
||||
local systemd_service_after="devstack@$service_after.service"
|
||||
local systemd_service_before="devstack@$service_before.service"
|
||||
|
||||
local unitfile_after="$SYSTEMD_DIR/$systemd_service_after"
|
||||
local unitfile_before="$SYSTEMD_DIR/$systemd_service_before"
|
||||
|
||||
iniset -sudo $unitfile_after "Unit" "Requires" "$systemd_service_before"
|
||||
iniset -sudo $unitfile_after "Unit" "After" "$systemd_service_before"
|
||||
|
||||
iniset -sudo $unitfile_before "Unit" "Requires" "$systemd_service_after"
|
||||
iniset -sudo $unitfile_before "Unit" "Before" "$systemd_service_after"
|
||||
|
||||
$SYSTEMCTL daemon-reload
|
||||
}
|
||||
|
||||
# stop_vitrage() - Stop running processes
|
||||
function stop_vitrage {
|
||||
if [ "$VITRAGE_DEPLOY" == "mod_wsgi" ]; then
|
||||
|
|
|
@ -46,4 +46,4 @@ eventlet!=0.18.3,!=0.20.1,<0.21.0,>=0.18.2 # MIT
|
|||
six>=1.10.0 # MIT
|
||||
debtcollector>=1.2.0 # Apache-2.0
|
||||
cotyledon>=1.3.0 # Apache-2.0
|
||||
|
||||
futures>=3.0.0;python_version=='2.7' or python_version=='2.6' # BSD
|
||||
|
|
|
@ -17,10 +17,7 @@ import sys
|
|||
from oslo_service import service as os_service
|
||||
from vitrage.cli import VITRAGE_TITLE
|
||||
from vitrage.datasources.listener_service import ListenerService
|
||||
|
||||
from vitrage.datasources.collector_notifier import CollectorNotifier
|
||||
from vitrage.datasources import launcher as datasource_launcher
|
||||
from vitrage.entity_graph import utils
|
||||
from vitrage.datasources.rpc_service import CollectorRpcHandlerService
|
||||
from vitrage import service
|
||||
|
||||
|
||||
|
@ -31,15 +28,8 @@ def main():
|
|||
print(VITRAGE_TITLE)
|
||||
conf = service.prepare_service()
|
||||
launcher = os_service.ServiceLauncher(conf)
|
||||
rabbitmq = CollectorNotifier(conf)
|
||||
callback = datasource_launcher.create_send_to_queue_callback(rabbitmq)
|
||||
launcher.launch_service(ListenerService(conf,
|
||||
utils.get_drivers(conf),
|
||||
callback))
|
||||
|
||||
datasources = datasource_launcher.Launcher(conf, callback)
|
||||
datasources.launch()
|
||||
|
||||
launcher.launch_service(ListenerService(conf))
|
||||
launcher.launch_service(CollectorRpcHandlerService(conf))
|
||||
launcher.wait()
|
||||
|
||||
|
||||
|
|
|
@ -1,58 +0,0 @@
|
|||
# Copyright 2016 - Alcatel-Lucent
|
||||
# Copyright 2016 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import itertools
|
||||
|
||||
from oslo_service import service as os_service
|
||||
|
||||
from vitrage.datasources.services import ChangesService
|
||||
from vitrage.datasources.services import SnapshotsService
|
||||
from vitrage.entity_graph import utils
|
||||
|
||||
|
||||
def create_send_to_queue_callback(rabbitq):
|
||||
def send_to_queue_callback(event):
|
||||
rabbitq.notify_when_applicable(event)
|
||||
|
||||
return send_to_queue_callback
|
||||
|
||||
|
||||
class Launcher(object):
|
||||
def __init__(self, conf, callback):
|
||||
self.conf = conf
|
||||
self.callback = callback
|
||||
self.drivers = utils.get_drivers(conf)
|
||||
self.services = self._register_services()
|
||||
|
||||
def launch(self):
|
||||
launcher = os_service.ProcessLauncher(self.conf)
|
||||
for service in self.services:
|
||||
launcher.launch_service(service, 1)
|
||||
|
||||
def _register_services(self):
|
||||
pull_datasources = utils.get_pull_datasources(self.conf)
|
||||
changes_services = \
|
||||
(ChangesService(self.conf,
|
||||
[self.drivers[datasource]],
|
||||
self.conf[datasource].changes_interval,
|
||||
self.callback)
|
||||
for datasource in pull_datasources)
|
||||
|
||||
snapshot_service = (SnapshotsService(self.conf,
|
||||
self.drivers,
|
||||
self.callback),)
|
||||
|
||||
return itertools.chain(changes_services,
|
||||
snapshot_service)
|
|
@ -18,25 +18,27 @@ from oslo_log import log
|
|||
import oslo_messaging
|
||||
from oslo_service import service as os_service
|
||||
|
||||
from vitrage.entity_graph import utils
|
||||
from vitrage.datasources import utils
|
||||
from vitrage import messaging
|
||||
|
||||
from vitrage.messaging import VitrageNotifier
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class ListenerService(os_service.Service):
|
||||
|
||||
def __init__(self, conf, drivers, callback):
|
||||
def __init__(self, conf):
|
||||
super(ListenerService, self).__init__()
|
||||
|
||||
self.enrich_callbacks_by_events = \
|
||||
self._create_callbacks_by_events_dict(drivers, conf)
|
||||
self._create_callbacks_by_events_dict(conf)
|
||||
|
||||
topics = conf.datasources.notification_topics
|
||||
exchange = conf.datasources.notification_exchange
|
||||
self.listener = self._get_topics_listener(
|
||||
conf, topics, callback, exchange)
|
||||
topics = [conf.datasources.notification_topic_collector]
|
||||
if conf.persistency.enable_persistency:
|
||||
topics.append(conf.persistency.persistor_topic)
|
||||
else:
|
||||
LOG.warning("Not persisting events")
|
||||
notifier = VitrageNotifier(conf, 'driver.events', topics)
|
||||
self.listener = self._get_topics_listener(conf, notifier.notify)
|
||||
|
||||
def start(self):
|
||||
LOG.info("Vitrage data source Listener Service - Starting...")
|
||||
|
@ -57,9 +59,10 @@ class ListenerService(os_service.Service):
|
|||
LOG.info("Vitrage data source Listener Service - Stopped!")
|
||||
|
||||
@classmethod
|
||||
def _create_callbacks_by_events_dict(cls, drivers, conf):
|
||||
def _create_callbacks_by_events_dict(cls, conf):
|
||||
ret = defaultdict(list)
|
||||
push_drivers = utils.get_push_datasources(drivers, conf)
|
||||
driver_names = utils.get_push_drivers_names(conf)
|
||||
push_drivers = utils.get_drivers_by_name(conf, driver_names)
|
||||
|
||||
for driver in push_drivers:
|
||||
for event in driver.get_event_types():
|
||||
|
@ -67,8 +70,9 @@ class ListenerService(os_service.Service):
|
|||
|
||||
return ret
|
||||
|
||||
def _get_topics_listener(self, conf, topics, callback, exchange=None):
|
||||
# Create a listener for each topic
|
||||
def _get_topics_listener(self, conf, callback):
|
||||
topics = conf.datasources.notification_topics
|
||||
exchange = conf.datasources.notification_exchange
|
||||
transport = messaging.get_transport(conf)
|
||||
targets = [oslo_messaging.Target(exchange=exchange, topic=topic)
|
||||
for topic in topics]
|
||||
|
@ -102,5 +106,5 @@ class NotificationsEndpoint(object):
|
|||
def _enqueue_events(self, enriched_events):
|
||||
for event in enriched_events:
|
||||
if event is not None:
|
||||
self.enqueue_callback(event)
|
||||
self.enqueue_callback(event_type='', data=event)
|
||||
LOG.debug('EVENT ENQUEUED: \n' + str(event))
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
# Copyright 2018 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from concurrent import futures
|
||||
import time
|
||||
|
||||
from oslo_log import log
|
||||
from oslo_service import service as os_service
|
||||
|
||||
from vitrage.common.constants import DatasourceAction
|
||||
from vitrage.datasources import utils
|
||||
from vitrage.messaging import VitrageNotifier
|
||||
from vitrage import rpc as vitrage_rpc
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class CollectorRpcHandlerService(os_service.Service):
|
||||
|
||||
def __init__(self, conf):
|
||||
super(CollectorRpcHandlerService, self).__init__()
|
||||
self.conf = conf
|
||||
async_persistor = self.create_async_persistor(conf)
|
||||
self.server = vitrage_rpc.get_default_server(
|
||||
conf,
|
||||
conf.rpc_topic_collector,
|
||||
[DriversEndpoint(conf, async_persistor)])
|
||||
|
||||
def start(self):
|
||||
LOG.info("Collector Rpc Handler Service - Starting...")
|
||||
super(CollectorRpcHandlerService, self).start()
|
||||
self.server.start()
|
||||
LOG.info("Collector Rpc Handler Service - Started!")
|
||||
|
||||
def stop(self, graceful=False):
|
||||
LOG.info("Collector Rpc Handler Service - Stopping...")
|
||||
super(CollectorRpcHandlerService, self).stop(graceful)
|
||||
self.server.stop()
|
||||
LOG.info("Collector Rpc Handler Service - Stopped!")
|
||||
|
||||
@staticmethod
|
||||
def create_async_persistor(conf):
|
||||
if not conf.persistency.enable_persistency:
|
||||
return None
|
||||
topics = [conf.persistency.persistor_topic]
|
||||
notifier = VitrageNotifier(conf, 'driver.events', topics)
|
||||
persist_worker = futures.ThreadPoolExecutor(max_workers=1)
|
||||
|
||||
def do_persist(events):
|
||||
for e in events:
|
||||
notifier.notify('', e)
|
||||
|
||||
def do_work(events):
|
||||
persist_worker.submit(do_persist, events)
|
||||
|
||||
return do_work
|
||||
|
||||
|
||||
class DriversEndpoint(object):
|
||||
|
||||
def __init__(self, conf, async_persistor):
|
||||
self.conf = conf
|
||||
self.async_persistor = async_persistor
|
||||
self.pool = futures.ThreadPoolExecutor(
|
||||
max_workers=len(self.conf.datasources.types))
|
||||
|
||||
def driver_get_all(self, ctx, driver_names, action, retry_on_fault=False):
|
||||
"""Call get_all for specified drivers"""
|
||||
LOG.debug("run drivers get_all: %s %s", driver_names, action)
|
||||
drivers = utils.get_drivers_by_name(self.conf, driver_names)
|
||||
fault_interval = self.conf.datasources.snapshot_interval_on_fault
|
||||
|
||||
def run_driver(driver):
|
||||
ok = True
|
||||
try:
|
||||
return ok, driver.get_all(action)
|
||||
except Exception as e:
|
||||
LOG.exception('driver failed %s', e)
|
||||
return not ok, driver
|
||||
|
||||
result = list(self.pool.map(run_driver, drivers))
|
||||
failed_drivers = [driver for success, driver in result if not success]
|
||||
if failed_drivers and retry_on_fault:
|
||||
LOG.info('retrying failed drivers in %s seconds', fault_interval)
|
||||
time.sleep(fault_interval)
|
||||
result.extend(list(self.pool.map(run_driver, failed_drivers)))
|
||||
|
||||
events = [e for success, events in result if success for e in events]
|
||||
if self.async_persistor:
|
||||
self.async_persistor(events)
|
||||
LOG.debug("run drivers get_all done.")
|
||||
return events
|
||||
|
||||
def driver_get_changes(self, ctx, driver_name):
|
||||
"""Call get_changes for a specific driver"""
|
||||
LOG.debug("run driver get_changes: %s", driver_name)
|
||||
drivers = utils.get_drivers_by_name(self.conf, [driver_name])
|
||||
events = drivers[0].get_changes(DatasourceAction.UPDATE)
|
||||
if self.async_persistor:
|
||||
self.async_persistor(events)
|
||||
LOG.debug("run driver get_changes: %s done.", driver_name)
|
||||
return events
|
|
@ -1,148 +0,0 @@
|
|||
# Copyright 2016 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import setproctitle
|
||||
import time
|
||||
|
||||
from oslo_log import log
|
||||
from oslo_service import service as os_service
|
||||
from vitrage.common.constants import DatasourceAction
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class DatasourceService(os_service.Service):
|
||||
def __init__(self, conf, registered_datasources, send_to_queue_func):
|
||||
super(DatasourceService, self).__init__()
|
||||
self.conf = conf
|
||||
self.registered_datasources = registered_datasources
|
||||
self.send_to_queue = send_to_queue_func
|
||||
|
||||
def name(self):
|
||||
return ''
|
||||
|
||||
def start(self):
|
||||
super(DatasourceService, self).start()
|
||||
try:
|
||||
setproctitle.setproctitle('{} {} {}'.format(
|
||||
'vitrage-collector',
|
||||
self.__class__.__name__,
|
||||
self.name()))
|
||||
except Exception:
|
||||
LOG.warning('failed to set process name')
|
||||
|
||||
|
||||
class SnapshotsService(DatasourceService):
|
||||
def __init__(self, conf, registered_datasources, callback_function):
|
||||
super(SnapshotsService, self).__init__(conf,
|
||||
registered_datasources,
|
||||
callback_function)
|
||||
|
||||
def name(self):
|
||||
names = [name for name in self.registered_datasources]
|
||||
return ','.join(names)
|
||||
|
||||
def start(self):
|
||||
LOG.info("Vitrage datasources Snapshot Service - Starting...")
|
||||
super(SnapshotsService, self).start()
|
||||
|
||||
standard_interval = self.conf.datasources.snapshots_interval
|
||||
fault_interval = self.conf.datasources.snapshot_interval_on_fault
|
||||
init_ttl = self.conf.consistency.initialization_max_retries * \
|
||||
self.conf.consistency.initialization_interval
|
||||
|
||||
for ds_driver in self.registered_datasources.values():
|
||||
callback = self.entities_to_queue(
|
||||
ds_driver,
|
||||
DatasourceAction.INIT_SNAPSHOT,
|
||||
fault_interval,
|
||||
init_ttl)
|
||||
self.tg.add_thread(callback)
|
||||
|
||||
callback = self.entities_to_queue(
|
||||
ds_driver,
|
||||
DatasourceAction.SNAPSHOT,
|
||||
fault_interval,
|
||||
standard_interval)
|
||||
self.tg.add_timer(standard_interval, callback, standard_interval)
|
||||
|
||||
LOG.info('Vitrage datasources Snapshot Service - Started!')
|
||||
|
||||
def entities_to_queue(self, driver, action, fault_interval, timeout):
|
||||
def _entities_to_queue():
|
||||
endtime = time.time() + timeout
|
||||
while time.time() < endtime:
|
||||
try:
|
||||
LOG.info('Driver %s - %s', type(driver).__name__, action)
|
||||
items = driver.get_all(action)
|
||||
for entity in items:
|
||||
self.send_to_queue(entity)
|
||||
break
|
||||
except Exception as e:
|
||||
LOG.exception('Driver Exception: {0}'.format(e))
|
||||
time.sleep(fault_interval)
|
||||
driver.callback_on_fault(e)
|
||||
return _entities_to_queue
|
||||
|
||||
def stop(self, graceful=False):
|
||||
LOG.info("Vitrage datasources Snapshot Service - Stopping...")
|
||||
|
||||
super(SnapshotsService, self).stop(graceful)
|
||||
|
||||
LOG.info("Vitrage datasources Snapshot Service - Stopped!")
|
||||
|
||||
|
||||
class ChangesService(DatasourceService):
|
||||
def __init__(self, conf,
|
||||
registered_datasources,
|
||||
changes_interval,
|
||||
callback_function):
|
||||
super(ChangesService, self).__init__(conf,
|
||||
registered_datasources,
|
||||
callback_function)
|
||||
self.changes_interval = changes_interval
|
||||
|
||||
def name(self):
|
||||
names = [d.__class__.__name__ for d in self.registered_datasources]
|
||||
return ','.join(names)
|
||||
|
||||
def start(self):
|
||||
LOG.info("Vitrage Datasource Changes Service For: %s - Starting...",
|
||||
self.registered_datasources[0].__class__.__name__)
|
||||
|
||||
super(ChangesService, self).start()
|
||||
self.tg.add_timer(interval=self.changes_interval,
|
||||
callback=self._get_changes,
|
||||
initial_delay=self.changes_interval)
|
||||
|
||||
LOG.info("Vitrage Datasource Changes Service For: %s - Started!",
|
||||
self.registered_datasources[0].__class__.__name__)
|
||||
|
||||
def stop(self, graceful=False):
|
||||
LOG.info("Vitrage Datasource Changes Service For: %s - Stopping...",
|
||||
self.registered_datasources[0].__class__.__name__)
|
||||
|
||||
super(ChangesService, self).stop(graceful)
|
||||
|
||||
LOG.info("Vitrage Datasource Changes Service For: %s - Stopped!",
|
||||
self.registered_datasources[0].__class__.__name__)
|
||||
|
||||
def _get_changes(self):
|
||||
LOG.debug("start get changes")
|
||||
for datasource in self.registered_datasources:
|
||||
try:
|
||||
for entity in datasource.get_changes(DatasourceAction.UPDATE):
|
||||
self.send_to_queue(entity)
|
||||
except Exception as e:
|
||||
LOG.exception("Get changes Failed - %s", e)
|
||||
LOG.debug("end get changes")
|
|
@ -1,4 +1,4 @@
|
|||
# Copyright 2017 - Nokia
|
||||
# Copyright 2018 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
|
@ -11,7 +11,6 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_utils import importutils as utils
|
||||
|
||||
from vitrage.common.constants import DatasourceOpts as DSOpts
|
||||
|
@ -19,17 +18,17 @@ from vitrage.common.constants import UpdateMethod
|
|||
from vitrage.utils import opt_exists
|
||||
|
||||
|
||||
def get_drivers(conf):
|
||||
return {datasource: utils.import_object(conf[datasource].driver, conf)
|
||||
for datasource in conf.datasources.types}
|
||||
def get_drivers_by_name(conf, driver_names):
|
||||
return [utils.import_object(conf[d_name].driver, conf)
|
||||
for d_name in driver_names]
|
||||
|
||||
|
||||
def get_pull_datasources(conf):
|
||||
return (datasource for datasource in conf.datasources.types
|
||||
if conf[datasource].update_method.lower() == UpdateMethod.PULL
|
||||
and opt_exists(conf[datasource], DSOpts.CHANGES_INTERVAL))
|
||||
def get_pull_drivers_names(conf):
|
||||
return [name for name in conf.datasources.types
|
||||
if conf[name].update_method.lower() == UpdateMethod.PULL
|
||||
and opt_exists(conf[name], DSOpts.CHANGES_INTERVAL)]
|
||||
|
||||
|
||||
def get_push_datasources(drivers, conf):
|
||||
return (driver_cls for datasource, driver_cls in drivers.items()
|
||||
if conf[datasource].update_method.lower() == UpdateMethod.PUSH)
|
||||
def get_push_drivers_names(conf):
|
||||
return [name for name in conf.datasources.types
|
||||
if conf[name].update_method.lower() == UpdateMethod.PUSH]
|
|
@ -0,0 +1,68 @@
|
|||
# Copyright 2018 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import time
|
||||
|
||||
from oslo_log import log
|
||||
import oslo_messaging
|
||||
|
||||
from vitrage import messaging
|
||||
from vitrage import rpc as vitrage_rpc
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def create_rpc_client_instance(conf):
|
||||
transport = messaging.get_rpc_transport(conf)
|
||||
target = oslo_messaging.Target(topic=conf.rpc_topic_collector)
|
||||
client = vitrage_rpc.get_client(transport, target)
|
||||
return client
|
||||
|
||||
|
||||
def get_all(rpc_client, events_coordination, driver_names, action,
|
||||
retry_on_fault=False, first_call_timeout=None):
|
||||
LOG.info('get_all starting for %s', driver_names)
|
||||
t1 = time.time()
|
||||
|
||||
def _call(_client):
|
||||
return _client.call(
|
||||
{},
|
||||
'driver_get_all',
|
||||
driver_names=driver_names,
|
||||
action=action,
|
||||
retry_on_fault=retry_on_fault)
|
||||
|
||||
try:
|
||||
if first_call_timeout:
|
||||
# create a temporary client instance with a timeout
|
||||
client = rpc_client.prepare(timeout=first_call_timeout)
|
||||
events = _call(client)
|
||||
else:
|
||||
events = _call(rpc_client)
|
||||
except oslo_messaging.MessagingTimeout as e:
|
||||
LOG.exception('Got MessagingTimeout %s', e)
|
||||
events = _call(rpc_client) if retry_on_fault else []
|
||||
t2 = time.time()
|
||||
events_coordination.handle_multiple_low_priority(events)
|
||||
t3 = time.time()
|
||||
LOG.info('get_all took %s, processing took %s for %s events',
|
||||
t2 - t1, t3 - t2, len(events))
|
||||
|
||||
|
||||
def get_changes(rpc_client, events_coordination, driver_name):
|
||||
LOG.info('get_changes starting %s', driver_name)
|
||||
events = rpc_client.call(
|
||||
{},
|
||||
'driver_get_changes',
|
||||
driver_name=driver_name)
|
||||
events_coordination.handle_multiple_low_priority(events)
|
|
@ -17,7 +17,9 @@ import time
|
|||
from oslo_log import log
|
||||
import oslo_messaging
|
||||
|
||||
from vitrage.common.constants import DatasourceAction
|
||||
from vitrage.common.utils import spawn
|
||||
from vitrage.entity_graph import datasource_rpc as ds_rpc
|
||||
from vitrage.entity_graph import EVALUATOR_TOPIC
|
||||
from vitrage.entity_graph.processor.processor import Processor
|
||||
from vitrage.entity_graph.scheduler import Scheduler
|
||||
|
@ -36,20 +38,22 @@ class VitrageGraphInit(object):
|
|||
self.process_event,
|
||||
conf.datasources.notification_topic_collector,
|
||||
EVALUATOR_TOPIC)
|
||||
self.end_messages = {}
|
||||
self.scheduler = Scheduler(conf, graph)
|
||||
self.processor = Processor(conf,
|
||||
self._handle_end_message,
|
||||
graph,
|
||||
self.scheduler.graph_persistor)
|
||||
self.scheduler = Scheduler(conf, graph, self.events_coordination)
|
||||
self.processor = Processor(conf, graph, self.scheduler.graph_persistor)
|
||||
|
||||
def run(self):
|
||||
LOG.info('Init Started')
|
||||
self.events_coordination.start()
|
||||
self._wait_for_all_end_messages()
|
||||
self.scheduler.start_periodic_tasks()
|
||||
ds_rpc.get_all(
|
||||
ds_rpc.create_rpc_client_instance(self.conf),
|
||||
self.events_coordination,
|
||||
self.conf.datasources.types,
|
||||
action=DatasourceAction.INIT_SNAPSHOT,
|
||||
retry_on_fault=True,
|
||||
first_call_timeout=10)
|
||||
self.processor.start_notifier()
|
||||
self.events_coordination.start()
|
||||
spawn(self.workers.submit_start_evaluations)
|
||||
self.scheduler.start_periodic_tasks()
|
||||
self.workers.run()
|
||||
|
||||
def process_event(self, event):
|
||||
|
@ -59,21 +63,6 @@ class VitrageGraphInit(object):
|
|||
else:
|
||||
self.processor.process_event(event)
|
||||
|
||||
def _handle_end_message(self, vitrage_type):
|
||||
self.end_messages[vitrage_type] = True
|
||||
|
||||
def _wait_for_all_end_messages(self):
|
||||
start = time.time()
|
||||
timeout = self.conf.consistency.initialization_max_retries * \
|
||||
self.conf.consistency.initialization_interval
|
||||
while time.time() < start + timeout:
|
||||
if len(self.end_messages) == len(self.conf.datasources.types):
|
||||
LOG.info('end messages received')
|
||||
return True
|
||||
time.sleep(0.2)
|
||||
LOG.warning('Missing end messages %s', self.end_messages.keys())
|
||||
return False
|
||||
|
||||
|
||||
PRIORITY_DELAY = 0.05
|
||||
|
||||
|
@ -128,6 +117,10 @@ class EventsCoordination(object):
|
|||
self._do_work_func(event)
|
||||
self._lock.release()
|
||||
|
||||
def handle_multiple_low_priority(self, events):
|
||||
for e in events:
|
||||
self._do_low_priority_work(e)
|
||||
|
||||
def _init_listener(self, topic, callback):
|
||||
if not topic:
|
||||
return
|
||||
|
|
|
@ -38,7 +38,3 @@ class ProcessorBase(object):
|
|||
@abc.abstractmethod
|
||||
def delete_entity(self, deleted_vertex, neighbors):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def handle_end_message(self, vertex, neighbors):
|
||||
pass
|
||||
|
|
|
@ -32,14 +32,12 @@ LOG = log.getLogger(__name__)
|
|||
|
||||
class Processor(processor.ProcessorBase):
|
||||
|
||||
def __init__(self, conf, end_messages_func=None, e_graph=None,
|
||||
graph_persistor=None):
|
||||
def __init__(self, conf, e_graph=None, graph_persistor=None):
|
||||
super(Processor, self).__init__()
|
||||
self.conf = conf
|
||||
self.transformer_manager = TransformerManager(self.conf)
|
||||
self.info_mapper = DatasourceInfoMapper(self.conf)
|
||||
self._initialize_events_actions()
|
||||
self.end_messages_func = end_messages_func
|
||||
self.entity_graph = e_graph
|
||||
self._notifier = GraphNotifier(conf)
|
||||
self._graph_persistor = graph_persistor
|
||||
|
@ -59,6 +57,9 @@ class Processor(processor.ProcessorBase):
|
|||
|
||||
self._enrich_event(event)
|
||||
entity = self.transformer_manager.transform(event)
|
||||
if entity.action not in self.actions.keys():
|
||||
LOG.debug('deprecated or unknown entity %s ignored', str(entity))
|
||||
return
|
||||
self._calculate_vitrage_aggregated_values(entity.vertex, entity.action)
|
||||
self.actions[entity.action](entity.vertex, entity.neighbors)
|
||||
if self._graph_persistor:
|
||||
|
@ -189,9 +190,6 @@ class Processor(processor.ProcessorBase):
|
|||
"deleted_vertex - %s, graph_vertex - %s",
|
||||
vertex, graph_vertex)
|
||||
|
||||
def handle_end_message(self, vertex, neighbors):
|
||||
self.end_messages_func(vertex[VProps.VITRAGE_TYPE])
|
||||
|
||||
def start_notifier(self):
|
||||
if self._notifier and self._notifier.enabled:
|
||||
self.entity_graph.subscribe(self._notifier.notify_when_applicable)
|
||||
|
@ -298,8 +296,6 @@ class Processor(processor.ProcessorBase):
|
|||
GraphAction.UPDATE_RELATIONSHIP: self.update_relationship,
|
||||
GraphAction.DELETE_RELATIONSHIP: self.delete_relationship,
|
||||
GraphAction.REMOVE_DELETED_ENTITY: self.remove_deleted_entity,
|
||||
# should not be called explicitly
|
||||
GraphAction.END_MESSAGE: self.handle_end_message
|
||||
}
|
||||
|
||||
def _calculate_vitrage_aggregated_values(self, vertex, action):
|
||||
|
|
|
@ -15,10 +15,14 @@ from concurrent.futures import ThreadPoolExecutor
|
|||
from futurist import periodics
|
||||
|
||||
from oslo_log import log
|
||||
from vitrage.datasources import utils
|
||||
|
||||
from vitrage.common.constants import DatasourceAction
|
||||
from vitrage.common.utils import spawn
|
||||
|
||||
from vitrage.entity_graph.consistency.consistency_enforcer import\
|
||||
ConsistencyEnforcer
|
||||
from vitrage.entity_graph import datasource_rpc as ds_rpc
|
||||
from vitrage.persistency.graph_persistor import GraphPersistor
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
@ -26,12 +30,11 @@ LOG = log.getLogger(__name__)
|
|||
|
||||
class Scheduler(object):
|
||||
|
||||
def __init__(self,
|
||||
conf,
|
||||
graph):
|
||||
def __init__(self, conf, graph, events_coordination):
|
||||
super(Scheduler, self).__init__()
|
||||
self.conf = conf
|
||||
self.graph = graph
|
||||
self.events_coordination = events_coordination
|
||||
self.graph_persistor = GraphPersistor(conf) if \
|
||||
self.conf.persistency.enable_persistency else None
|
||||
self.consistency = ConsistencyEnforcer(conf, graph)
|
||||
|
@ -41,33 +44,72 @@ class Scheduler(object):
|
|||
self.periodic = periodics.PeriodicWorker.create(
|
||||
[], executor_factory=lambda: ThreadPoolExecutor(max_workers=10))
|
||||
|
||||
self.add_persistor_timer()
|
||||
self.add_persist_timer()
|
||||
self.add_consistency_timer()
|
||||
self.add_rpc_datasources_timers()
|
||||
spawn(self.periodic.start)
|
||||
|
||||
def add_persistor_timer(self):
|
||||
def add_persist_timer(self):
|
||||
if not self.graph_persistor:
|
||||
return
|
||||
spacing = self.conf.persistency.graph_persistency_interval
|
||||
|
||||
@periodics.periodic(spacing=spacing)
|
||||
def persist():
|
||||
def persist_periodic():
|
||||
if self.graph_persistor:
|
||||
try:
|
||||
self.graph_persistor.store_graph(graph=self.graph)
|
||||
except Exception as e:
|
||||
LOG.exception('persist failed %s', e)
|
||||
|
||||
self.periodic.add(persist)
|
||||
LOG.info("periodic task - persistor %s", spacing)
|
||||
self.periodic.add(persist_periodic)
|
||||
LOG.info("added persist_periodic (spacing=%s)", spacing)
|
||||
|
||||
def add_consistency_timer(self):
|
||||
spacing = self.conf.datasources.snapshots_interval
|
||||
|
||||
@periodics.periodic(spacing=spacing)
|
||||
def run_consistency():
|
||||
def consistency_periodic():
|
||||
try:
|
||||
self.consistency.periodic_process()
|
||||
except Exception as e:
|
||||
LOG.exception('run_consistency failed %s', e)
|
||||
|
||||
self.periodic.add(run_consistency)
|
||||
LOG.info("periodic task - run_consistency %s", spacing)
|
||||
self.periodic.add(consistency_periodic)
|
||||
LOG.info("added consistency_periodic (spacing=%s)", spacing)
|
||||
|
||||
def add_rpc_datasources_timers(self):
|
||||
spacing = self.conf.datasources.snapshots_interval
|
||||
rpc_client = ds_rpc.create_rpc_client_instance(self.conf)
|
||||
|
||||
@periodics.periodic(spacing=spacing)
|
||||
def get_all_periodic():
|
||||
try:
|
||||
ds_rpc.get_all(rpc_client,
|
||||
self.events_coordination,
|
||||
self.conf.datasources.types,
|
||||
DatasourceAction.SNAPSHOT)
|
||||
except Exception as e:
|
||||
LOG.exception('get_all_periodic failed %s', e)
|
||||
|
||||
self.periodic.add(get_all_periodic)
|
||||
LOG.info("added get_all_periodic (spacing=%s)", spacing)
|
||||
|
||||
driver_names = utils.get_pull_drivers_names(self.conf)
|
||||
for d_name in driver_names:
|
||||
spacing = self.conf[d_name].changes_interval
|
||||
rpc_client = ds_rpc.create_rpc_client_instance(self.conf)
|
||||
|
||||
@periodics.periodic(spacing=spacing)
|
||||
def get_changes_periodic(driver_name=d_name):
|
||||
try:
|
||||
ds_rpc.get_changes(rpc_client,
|
||||
self.events_coordination,
|
||||
driver_name)
|
||||
except Exception as e:
|
||||
LOG.exception('get_changes_periodic %s failed %s',
|
||||
driver_name, e)
|
||||
|
||||
self.periodic.add(get_changes_periodic)
|
||||
LOG.info("added get_changes_periodic %s (spacing=%s)",
|
||||
d_name, spacing)
|
||||
|
|
|
@ -35,7 +35,7 @@ def get_rpc_transport(conf, url=None, optional=False, cache=True):
|
|||
def get_transport(conf, url=None, optional=False, cache=True, rpc=False):
|
||||
"""Initialise the oslo_messaging layer."""
|
||||
global TRANSPORTS, DEFAULT_URL
|
||||
cache_key = url or DEFAULT_URL
|
||||
cache_key = url or DEFAULT_URL + '_rpc' if rpc else ''
|
||||
transport = TRANSPORTS.get(cache_key)
|
||||
if not transport or not cache:
|
||||
try:
|
||||
|
|
|
@ -24,6 +24,9 @@ OPTS = [
|
|||
cfg.StrOpt('rpc_topic',
|
||||
default='rpcapiv1',
|
||||
help='The topic vitrage listens on'),
|
||||
cfg.StrOpt('rpc_topic_collector',
|
||||
default='rpc-collector',
|
||||
help='The topic vitrage-collector listens on'),
|
||||
]
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
@ -85,6 +88,13 @@ def get_client(transport, target, version_cap=None, serializer=None):
|
|||
serializer=serializer)
|
||||
|
||||
|
||||
def get_default_server(conf, topic, endpoints):
|
||||
target = messaging.Target(
|
||||
topic=topic,
|
||||
server=conf.oslo_messaging_rabbit.rabbit_hosts)
|
||||
return get_server(target, endpoints, messaging.get_rpc_transport(conf))
|
||||
|
||||
|
||||
def get_server(target, endpoints, transport, serializer=None):
|
||||
assert transport is not None
|
||||
|
||||
|
|
|
@ -39,8 +39,8 @@ class TestListenerService(base.BaseTest):
|
|||
def setUpClass(cls):
|
||||
super(TestListenerService, cls).setUpClass()
|
||||
|
||||
def _add_event_to_actual_events(self, event):
|
||||
self.actual_events.append(event)
|
||||
def _add_event_to_actual_events(self, event_type, data):
|
||||
self.actual_events.append(data)
|
||||
|
||||
def _set_excepted_events(self, events):
|
||||
self.excepted_events = events
|
||||
|
|
|
@ -77,7 +77,7 @@ class TestConsistencyFunctional(TestFunctionalBase, TestConfiguration):
|
|||
cls.add_db(cls.conf)
|
||||
cls.load_datasources(cls.conf)
|
||||
cls.graph = NXGraph("Entity Graph")
|
||||
cls.processor = Processor(cls.conf, lambda x: x, cls.graph)
|
||||
cls.processor = Processor(cls.conf, cls.graph)
|
||||
|
||||
cls.event_queue = queue.Queue()
|
||||
|
||||
|
|
|
@ -20,8 +20,8 @@ from vitrage.common.constants import UpdateMethod
|
|||
from vitrage.datasources.nagios import NAGIOS_DATASOURCE
|
||||
from vitrage.datasources.nova.host import NOVA_HOST_DATASOURCE
|
||||
from vitrage.datasources.nova.instance import NOVA_INSTANCE_DATASOURCE
|
||||
from vitrage.datasources import utils as ds_utils
|
||||
from vitrage.datasources.zabbix import ZABBIX_DATASOURCE
|
||||
from vitrage.entity_graph import utils as graph_utils
|
||||
from vitrage.tests import base
|
||||
|
||||
|
||||
|
@ -172,23 +172,18 @@ class DatasourceUpdateMethod(base.BaseTest):
|
|||
ZABBIX_DATASOURCE_NONE))
|
||||
|
||||
def test_datasource_update_method_push(self):
|
||||
drivers = {driver: utils.import_class(self.conf[driver].driver)
|
||||
for driver in self.conf.datasources.types}
|
||||
push_drivers = graph_utils.get_push_datasources(drivers=drivers,
|
||||
conf=self.conf)
|
||||
self.assertSequenceEqual(set(push_drivers), {utils.import_class(
|
||||
driver_names = ds_utils.get_push_drivers_names(self.conf)
|
||||
push_drivers = ds_utils.get_drivers_by_name(self.conf, driver_names)
|
||||
self.assertSequenceEqual({utils.import_class(
|
||||
self.conf[NOVA_INSTANCE_DATASOURCE].driver), utils.import_class(
|
||||
self.conf[ZABBIX_DATASOURCE_PUSH].driver)})
|
||||
self.conf[ZABBIX_DATASOURCE_PUSH].driver)},
|
||||
set(d.__class__ for d in push_drivers))
|
||||
|
||||
def test_datasource_update_method_pull(self):
|
||||
pull_drivers = tuple(graph_utils.get_pull_datasources(self.conf))
|
||||
self.assertSequenceEqual(pull_drivers,
|
||||
(NAGIOS_DATASOURCE,
|
||||
ZABBIX_DATASOURCE_PULL))
|
||||
|
||||
def test_datasource_update_method_pull_with_no_changes_interval(self):
|
||||
pull_drivers = tuple(graph_utils.get_pull_datasources(self.conf))
|
||||
self.assertNotIn(ZABBIX_DATASOURCE_PULL_NO_INTERVAL, pull_drivers)
|
||||
driver_names = ds_utils.get_pull_drivers_names(self.conf)
|
||||
self.assertSequenceEqual(
|
||||
set([NAGIOS_DATASOURCE, ZABBIX_DATASOURCE_PULL]),
|
||||
set(driver_names))
|
||||
|
||||
def test_datasources_notification_topic(self):
|
||||
self.assertEqual('vitrage_notifications',
|
||||
|
|
Loading…
Reference in New Issue