From 76ac86993d9237206a301dd3eda81089edd5b04b Mon Sep 17 00:00:00 2001 From: Alexey Weyl Date: Tue, 16 Feb 2016 10:14:56 +0200 Subject: [PATCH] infrastructure changes for synchronizers and consistency Change-Id: I6854feca155893356e9509d47021e42322074538 --- vitrage/common/constants.py | 1 + vitrage/common/file_utils.py | 24 ++-- vitrage/entity_graph/api_handler/service.py | 5 +- vitrage/entity_graph/processor/base.py | 5 + vitrage/entity_graph/processor/processor.py | 10 +- vitrage/entity_graph/transformer_manager.py | 33 +++-- vitrage/synchronizer/__init__.py | 9 ++ vitrage/synchronizer/launcher.py | 29 +++-- .../plugins/nagios/synchronizer.py | 16 ++- vitrage/synchronizer/plugins/nova/base.py | 2 +- .../plugins/nova/host/synchronizer.py | 12 +- .../plugins/nova/instance/synchronizer.py | 9 +- .../plugins/nova/zone/synchronizer.py | 9 +- .../plugins/static_physical/synchronizer.py | 118 +++++++++++++++-- .../plugins/static_physical/transformer.py | 5 +- .../{base.py => plugins/synchronizer_base.py} | 30 ++++- vitrage/synchronizer/services.py | 49 +++++-- .../switch_to_host_plugin.yaml | 41 ++++++ .../switch_to_switch_plugin.yaml | 19 +++ .../test_static_physical_synchronizer.py | 120 ++++++++++++++++++ .../test_static_physical_transformer.py | 13 +- .../static_plugin/test_static_plugin.py | 63 --------- 22 files changed, 484 insertions(+), 138 deletions(-) rename vitrage/synchronizer/{base.py => plugins/synchronizer_base.py} (61%) create mode 100644 vitrage/tests/resources/static_plugins/changes_plugins/switch_to_host_plugin.yaml create mode 100644 vitrage/tests/resources/static_plugins/changes_plugins/switch_to_switch_plugin.yaml create mode 100644 vitrage/tests/unit/synchronizer/static_plugin/test_static_physical_synchronizer.py delete mode 100644 vitrage/tests/unit/synchronizer/static_plugin/test_static_plugin.py diff --git a/vitrage/common/constants.py b/vitrage/common/constants.py index a74163c98..bf086d8b6 100644 --- a/vitrage/common/constants.py +++ b/vitrage/common/constants.py @@ -70,3 +70,4 @@ class EventAction(object): CREATE = 'create' DELETE = 'delete' UPDATE = 'update' + END_MESSAGE = 'end_message' diff --git a/vitrage/common/file_utils.py b/vitrage/common/file_utils.py index 1c25408e2..60f3753c5 100644 --- a/vitrage/common/file_utils.py +++ b/vitrage/common/file_utils.py @@ -13,9 +13,9 @@ # under the License. import os -import yaml from oslo_log import log +import yaml LOG = log.getLogger(__name__) @@ -35,15 +35,21 @@ def load_yaml_files(dir_path, with_exception=False): yaml_files = [] for file in files: full_path = dir_path + '/' + file - with open(full_path, 'r') as stream: - try: - config = yaml.load(stream, Loader=yaml.BaseLoader) - except Exception as e: - if with_exception: - raise e - else: - LOG.error("Fails to parse file: %s. %s" % (full_path, e)) + config = load_yaml_file(full_path, with_exception) + if config: yaml_files.append(config) return yaml_files + + +def load_yaml_file(full_path, with_exception=False): + with open(full_path, 'r') as stream: + try: + return yaml.load(stream, Loader=yaml.BaseLoader) + except Exception as e: + if with_exception: + raise e + else: + LOG.error("Fails to parse file: %s. %s" % (full_path, e)) + return None diff --git a/vitrage/entity_graph/api_handler/service.py b/vitrage/entity_graph/api_handler/service.py index 9565d5399..ef25d07a0 100644 --- a/vitrage/entity_graph/api_handler/service.py +++ b/vitrage/entity_graph/api_handler/service.py @@ -14,6 +14,7 @@ import json +import eventlet from oslo_config import cfg from oslo_log import log import oslo_messaging @@ -28,6 +29,8 @@ from vitrage.graph import Direction LOG = log.getLogger(__name__) +eventlet.monkey_patch() + class VitrageApiHandlerService(os_service.Service): @@ -61,7 +64,7 @@ class VitrageApiHandlerService(os_service.Service): # TODO(Dany) use eventlet instead of threading server = oslo_messaging.get_rpc_server(transport, target, - endpoints, executor='threading') + endpoints, executor='eventlet') server.start() diff --git a/vitrage/entity_graph/processor/base.py b/vitrage/entity_graph/processor/base.py index 4994035f1..21fcb4442 100644 --- a/vitrage/entity_graph/processor/base.py +++ b/vitrage/entity_graph/processor/base.py @@ -13,6 +13,7 @@ # under the License. import abc + import six @@ -37,3 +38,7 @@ class ProcessorBase(object): @abc.abstractmethod def delete_entity(self, deleted_vertex, neighbors): pass + + @abc.abstractmethod + def handle_end_message(self, event): + pass diff --git a/vitrage/entity_graph/processor/processor.py b/vitrage/entity_graph/processor/processor.py index 31475ea84..b7a930c9e 100644 --- a/vitrage/entity_graph/processor/processor.py +++ b/vitrage/entity_graph/processor/processor.py @@ -125,6 +125,9 @@ class Processor(processor.ProcessorBase): LOG.info("Delete event arrived on invalid resource: %s", deleted_vertex) + def handle_end_message(self, vertex, neighbors): + return + def transform_entity(self, event): return self.transformer.transform(event) @@ -146,8 +149,8 @@ class Processor(processor.ProcessorBase): neighbors, valid_edges) for (vertex, edge) in neighbors: graph_vertex = self.entity_graph.get_vertex(vertex.vertex_id) - if (not graph_vertex) or self.entity_graph.check_update_validation( - graph_vertex, vertex): + if not graph_vertex or \ + not self.entity_graph.is_vertex_deleted(graph_vertex): if self.entity_graph.can_update_vertex(graph_vertex, vertex): LOG.debug("Updates vertex: %s", vertex) self.entity_graph.update_vertex(vertex) @@ -215,5 +218,6 @@ class Processor(processor.ProcessorBase): self.actions = { EventAction.CREATE: self.create_entity, EventAction.UPDATE: self.update_entity, - EventAction.DELETE: self.delete_entity + EventAction.DELETE: self.delete_entity, + EventAction.END_MESSAGE: self.handle_end_message } diff --git a/vitrage/entity_graph/transformer_manager.py b/vitrage/entity_graph/transformer_manager.py index 314f42e56..d7b9c5b3d 100644 --- a/vitrage/entity_graph/transformer_manager.py +++ b/vitrage/entity_graph/transformer_manager.py @@ -16,7 +16,9 @@ from oslo_log import log as logging from oslo_utils import importutils from vitrage.common.constants import EntityType +from vitrage.common.constants import EventAction from vitrage.common.constants import SynchronizerProperties as SyncProps +from vitrage.common.constants import SyncMode from vitrage.common.exception import VitrageTransformerError from vitrage.synchronizer.plugins.nagios.transformer import NagiosTransformer from vitrage.synchronizer.plugins.nova.host.transformer import HostTransformer @@ -24,8 +26,8 @@ from vitrage.synchronizer.plugins.nova.instance.transformer import \ InstanceTransformer from vitrage.synchronizer.plugins.nova.zone.transformer import ZoneTransformer from vitrage.synchronizer.plugins.static_physical.transformer import \ - StaticPhysical - + StaticPhysicalTransformer +from vitrage.synchronizer.plugins import transformer_base LOG = logging.getLogger(__name__) @@ -54,7 +56,8 @@ class TransformerManager(object): transformers) transformers[EntityType.SWITCH] = importutils.import_object( - "%s.%s" % (StaticPhysical.__module__, StaticPhysical.__name__), + "%s.%s" % (StaticPhysicalTransformer.__module__, + StaticPhysicalTransformer.__name__), transformers) transformers[EntityType.NAGIOS] = importutils.import_object( @@ -75,14 +78,18 @@ class TransformerManager(object): return transformer def transform(self, entity_event): + if not self._is_end_message(entity_event): + try: + sync_type = entity_event[SyncProps.SYNC_TYPE] + except KeyError: + raise VitrageTransformerError( + 'Entity Event must contains sync_type field.') - try: - sync_type = entity_event[SyncProps.SYNC_TYPE] - except KeyError: - raise VitrageTransformerError( - 'Entity Event must contains sync_type field.') - - return self.get_transformer(sync_type).transform(entity_event) + return self.get_transformer(sync_type).transform(entity_event) + else: + return transformer_base.EntityWrapper(None, + None, + EventAction.END_MESSAGE) def extract_key(self, entity_event): @@ -93,3 +100,9 @@ class TransformerManager(object): 'Entity Event must contains sync_type field.') return self.get_transformer(sync_type).extract_key() + + @staticmethod + def _is_end_message(entity_event): + return entity_event[SyncProps.SYNC_MODE] == SyncMode.INIT_SNAPSHOT and\ + SyncProps.EVENT_TYPE in entity_event and \ + entity_event[SyncProps.EVENT_TYPE] == EventAction.END_MESSAGE diff --git a/vitrage/synchronizer/__init__.py b/vitrage/synchronizer/__init__.py index 31fbea826..7e5b736f2 100644 --- a/vitrage/synchronizer/__init__.py +++ b/vitrage/synchronizer/__init__.py @@ -19,4 +19,13 @@ OPTS = [ default=600, min=10, help='interval between full snapshots'), + cfg.IntOpt('nagios_changes_interval', + default=30, + min=30, + help='interval between checking changes in nagios plugin'), + cfg.IntOpt('static_physical_changes_interval', + default=30, + min=30, + help='interval between checking changes in the configuration ' + 'files of the physical topology plugin'), ] diff --git a/vitrage/synchronizer/launcher.py b/vitrage/synchronizer/launcher.py index 2b1d9502a..e44c0db4c 100644 --- a/vitrage/synchronizer/launcher.py +++ b/vitrage/synchronizer/launcher.py @@ -1,4 +1,5 @@ # Copyright 2016 - Alcatel-Lucent +# Copyright 2016 - Nokia # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -15,6 +16,7 @@ from oslo_log import log from oslo_service import service as os_service +from services import ChangesService from services import SnapshotsService from vitrage.synchronizer.plugins.nagios.synchronizer import NagiosSynchronizer from vitrage.synchronizer.plugins.nova.host.synchronizer import \ @@ -24,8 +26,7 @@ from vitrage.synchronizer.plugins.nova.instance.synchronizer import \ from vitrage.synchronizer.plugins.nova.zone.synchronizer import \ ZoneSynchronizer from vitrage.synchronizer.plugins.static_physical.synchronizer import \ - StaticPhysical - + StaticPhysicalSynchronizer LOG = log.getLogger(__name__) @@ -40,11 +41,10 @@ def create_send_to_queue_callback(queue): class Launcher(object): def __init__(self, conf, callback): - self.conf = conf self.callback = callback - self.plugins = self._init_registered_plugins() - self.services = [SnapshotsService(conf, self.plugins)] + self.snapshot_plugins = self._register_snapshot_plugins() + self.services = self._register_services() def launch(self): launcher = os_service.ProcessLauncher(self.conf) @@ -52,7 +52,7 @@ class Launcher(object): service.set_callback(self.callback) launcher.launch_service(service, 1) - def _init_registered_plugins(self): + def _register_snapshot_plugins(self): version = 2.0 user = 'admin' password = 'password' @@ -63,6 +63,19 @@ class Launcher(object): HostSynchronizer(version, user, password, project, auth_url), InstanceSynchronizer(version, user, password, project, auth_url), NagiosSynchronizer(self.conf), - StaticPhysical(self.conf) - ] + StaticPhysicalSynchronizer(self.conf)] return registered_plugins + + def _register_services(self): + nagios_changes_interval = self.conf.synchronizer.\ + nagios_changes_interval + static_physical_changes_interval = self.conf.synchronizer.\ + static_physical_changes_interval + + return [SnapshotsService(self.conf, self.snapshot_plugins), + ChangesService(self.conf, + [NagiosSynchronizer(self.conf)], + nagios_changes_interval), + ChangesService(self.conf, + [StaticPhysicalSynchronizer(self.conf)], + static_physical_changes_interval)] diff --git a/vitrage/synchronizer/plugins/nagios/synchronizer.py b/vitrage/synchronizer/plugins/nagios/synchronizer.py index dff620805..dac684913 100644 --- a/vitrage/synchronizer/plugins/nagios/synchronizer.py +++ b/vitrage/synchronizer/plugins/nagios/synchronizer.py @@ -11,7 +11,9 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + from collections import namedtuple + from oslo_log import log import requests @@ -19,11 +21,11 @@ from vitrage.common.constants import EntityType from vitrage.common.constants import SynchronizerProperties as SyncProps from vitrage.i18n import _LE from vitrage.i18n import _LW -from vitrage.synchronizer.base import SynchronizerBase from vitrage.synchronizer.plugins.nagios.parser import NagiosParser from vitrage.synchronizer.plugins.nagios.properties import NagiosProperties \ as NagiosProps from vitrage.synchronizer.plugins.nagios.properties import NagiosStatus +from vitrage.synchronizer.plugins.synchronizer_base import SynchronizerBase LOG = log.getLogger(__name__) @@ -36,8 +38,13 @@ class NagiosSynchronizer(SynchronizerBase): self.conf = conf self.cache = dict() - def get_all(self): - return self.make_pickleable(self._get_services(), EntityType.NAGIOS) + def get_all(self, sync_mode): + return self.make_pickleable(self._get_services(), + EntityType.NAGIOS, + sync_mode) + + def get_changes(self, sync_mode): + return [] def _get_services(self): nagios_user = self.conf.synchronizer_plugins.nagios_user @@ -71,7 +78,8 @@ class NagiosSynchronizer(SynchronizerBase): response.status_code) return [] - def _enrich_services(self, nagios_services): + @staticmethod + def _enrich_services(nagios_services): for service in nagios_services: # TODO(ifat_afek) - add a configuration file for resource types service[NagiosProps.RESOURCE_TYPE] = EntityType.NOVA_HOST diff --git a/vitrage/synchronizer/plugins/nova/base.py b/vitrage/synchronizer/plugins/nova/base.py index 83abe8c95..113007ef6 100644 --- a/vitrage/synchronizer/plugins/nova/base.py +++ b/vitrage/synchronizer/plugins/nova/base.py @@ -15,7 +15,7 @@ from novaclient import client -from vitrage.synchronizer.base import SynchronizerBase +from vitrage.synchronizer.plugins.synchronizer_base import SynchronizerBase class NovaBase(SynchronizerBase): diff --git a/vitrage/synchronizer/plugins/nova/host/synchronizer.py b/vitrage/synchronizer/plugins/nova/host/synchronizer.py index 279fe52d5..576e8f5ed 100644 --- a/vitrage/synchronizer/plugins/nova/host/synchronizer.py +++ b/vitrage/synchronizer/plugins/nova/host/synchronizer.py @@ -34,6 +34,12 @@ class HostSynchronizer(NovaBase): compute_hosts.append(host_dict) return compute_hosts - def get_all(self): - return self.make_pickleable(self.filter_none_compute_hosts( - self.client.hosts.list()), EntityType.NOVA_HOST, ['manager']) + def get_all(self, sync_mode): + return self.make_pickleable( + self.filter_none_compute_hosts(self.client.hosts.list()), + EntityType.NOVA_HOST, + sync_mode, + ['manager']) + + def get_changes(self, sync_mode): + pass diff --git a/vitrage/synchronizer/plugins/nova/instance/synchronizer.py b/vitrage/synchronizer/plugins/nova/instance/synchronizer.py index 39df7426a..2e8df9436 100644 --- a/vitrage/synchronizer/plugins/nova/instance/synchronizer.py +++ b/vitrage/synchronizer/plugins/nova/instance/synchronizer.py @@ -31,7 +31,12 @@ class InstanceSynchronizer(NovaBase): instances_res.append(instance.__dict__) return instances_res - def get_all(self): + def get_all(self, sync_mode): return self.make_pickleable( self.filter_instances(self.client.servers.list()), - EntityType.NOVA_INSTANCE, ['manager']) + EntityType.NOVA_INSTANCE, + sync_mode, + ['manager']) + + def get_changes(self, sync_mode): + pass diff --git a/vitrage/synchronizer/plugins/nova/zone/synchronizer.py b/vitrage/synchronizer/plugins/nova/zone/synchronizer.py index 7e21782d3..2f53d7d41 100644 --- a/vitrage/synchronizer/plugins/nova/zone/synchronizer.py +++ b/vitrage/synchronizer/plugins/nova/zone/synchronizer.py @@ -33,7 +33,12 @@ class ZoneSynchronizer(NovaBase): zones_res.append(zone_dict) return zones_res - def get_all(self): + def get_all(self, sync_mode): return self.make_pickleable(self.filter_internal_zone( self.client.availability_zones.list()), - EntityType.NOVA_ZONE, ['manager']) + EntityType.NOVA_ZONE, + sync_mode, + ['manager']) + + def get_changes(self, sync_mode): + pass diff --git a/vitrage/synchronizer/plugins/static_physical/synchronizer.py b/vitrage/synchronizer/plugins/static_physical/synchronizer.py index 26e82ef41..073b7a384 100644 --- a/vitrage/synchronizer/plugins/static_physical/synchronizer.py +++ b/vitrage/synchronizer/plugins/static_physical/synchronizer.py @@ -14,26 +14,120 @@ import os +from vitrage.common.constants import EventAction + +from vitrage.common.constants import SynchronizerProperties as SyncProps +from vitrage.common.constants import VertexProperties as VProps from vitrage.common import file_utils -from vitrage.synchronizer.base import SynchronizerBase +from vitrage.synchronizer.plugins.synchronizer_base import SynchronizerBase -class StaticPhysical(SynchronizerBase): +class StaticPhysicalSynchronizer(SynchronizerBase): + STATIC_PHYSICAL = 'static_physical' + ENTITIES_SECTION = 'entities' + def __init__(self, conf): - super(StaticPhysical, self).__init__() + super(StaticPhysicalSynchronizer, self).__init__() self.cfg = conf + self.cache = {} - def get_all(self): - return self.make_pickleable(self.get_instances(), None, []) + def get_all(self, sync_mode): + return self.make_pickleable(self._get_all_entities(), + self.STATIC_PHYSICAL, + sync_mode) - def get_instances(self): + def get_changes(self, sync_mode): + return self.make_pickleable(self._get_changes_entities(), + self.STATIC_PHYSICAL, + sync_mode) + + def _get_all_entities(self): static_entities = [] - if os.path.isdir(self.cfg.synchronizer_plugins.static_plugins_dir): - static_plugin_configs = file_utils.load_yaml_files( - self.cfg.synchronizer_plugins.static_plugins_dir) - for config in static_plugin_configs: - for entity in config['entities']: - static_entities.append(entity) + if os.path.isdir(self.cfg.synchronizer_plugins.static_plugins_dir): + files = file_utils.load_files( + self.cfg.synchronizer_plugins.static_plugins_dir, '.yaml') + + for file in files: + full_path = self.cfg.synchronizer_plugins.static_plugins_dir \ + + '/' + file + static_entities += self._get_entities_from_file(file, + full_path) return static_entities + + def _get_entities_from_file(self, file, path): + static_entities = [] + config = file_utils.load_yaml_file(path) + + for entity in config[self.ENTITIES_SECTION]: + static_entities.append(entity.copy()) + + self.cache[file] = config + + return static_entities + + def _get_changes_entities(self): + entities_updates = [] + + if os.path.isdir(self.cfg.synchronizer_plugins.static_plugins_dir): + entities_updates = [] + files = file_utils.load_files( + self.cfg.synchronizer_plugins.static_plugins_dir, '.yaml') + + for file in files: + full_path = self.cfg.synchronizer_plugins.static_plugins_dir +\ + '/' + file + config = file_utils.load_yaml_file(full_path) + if config: + if file in self.cache: + if str(config) != str(self.cache[file]): + # TODO(alexey_weyl): need also to remove deleted + # files from cache + + self._update_on_existing_entities( + self.cache[file][self.ENTITIES_SECTION], + config[self.ENTITIES_SECTION], + entities_updates) + + self._update_on_new_entities( + config[self.ENTITIES_SECTION], + self.cache[file][self.ENTITIES_SECTION], + entities_updates) + + self.cache[file] = config + else: + self.cache[file] = config + entities_updates += \ + self._get_entities_from_file(file, full_path) + + return entities_updates + + def _update_on_existing_entities(self, old_entities, + new_entities, updates): + for old_entity in old_entities: + if old_entity not in new_entities: + new_entity = self._find_entity(old_entity, new_entities) + if not new_entity: + self._delete_event(old_entity) + updates.append(old_entity.copy()) + else: + updates.append(new_entity.copy()) + + @staticmethod + def _find_entity(new_entity, entities): + for entity in entities: + if entity[SyncProps.SYNC_TYPE] == new_entity[SyncProps.SYNC_TYPE] \ + and entity[VProps.ID] == new_entity[VProps.ID]: + return entity + return None + + @staticmethod + def _update_on_new_entities(new_entities, old_entities, updates): + for entity in new_entities: + if entity not in updates and entity not in old_entities: + updates.append(entity.copy()) + + @staticmethod + def _delete_event(entity): + entity[SyncProps.EVENT_TYPE] = EventAction.DELETE diff --git a/vitrage/synchronizer/plugins/static_physical/transformer.py b/vitrage/synchronizer/plugins/static_physical/transformer.py index 3f2cddc84..f4f815595 100644 --- a/vitrage/synchronizer/plugins/static_physical/transformer.py +++ b/vitrage/synchronizer/plugins/static_physical/transformer.py @@ -24,9 +24,10 @@ from vitrage.synchronizer.plugins import transformer_base LOG = logging.getLogger(__name__) -class StaticPhysical(transformer_base.TransformerBase): +class StaticPhysicalTransformer(transformer_base.TransformerBase): RELATION_TYPE = 'relation_type' + RELATIONSHIPS_SECTION = 'relationships' def __init__(self, transformers): self.transformers = transformers @@ -55,7 +56,7 @@ class StaticPhysical(transformer_base.TransformerBase): entity_key = self.extract_key(entity_event) timestamp = entity_event[SyncProps.SAMPLE_DATE] - for neighbor_details in entity_event['relationships']: + for neighbor_details in entity_event[self.RELATIONSHIPS_SECTION]: # TODO(alexey): need to decide what to do if one of the entities # fails neighbor = self._create_neighbor(neighbor_details, entity_type, diff --git a/vitrage/synchronizer/base.py b/vitrage/synchronizer/plugins/synchronizer_base.py similarity index 61% rename from vitrage/synchronizer/base.py rename to vitrage/synchronizer/plugins/synchronizer_base.py index 81f605274..cbc53cc37 100644 --- a/vitrage/synchronizer/base.py +++ b/vitrage/synchronizer/plugins/synchronizer_base.py @@ -16,7 +16,10 @@ import abc import six +from vitrage.common.constants import EventAction + from vitrage.common.constants import SynchronizerProperties as SyncProps +from vitrage.common.constants import SyncMode import vitrage.common.datetime_utils @@ -27,11 +30,24 @@ class SynchronizerBase(object): pass @abc.abstractmethod - def get_all(self): + def get_all(self, sync_mode): pass - def make_pickleable(self, entities, sync_type, fields_to_remove=[]): + @staticmethod + def _get_end_message(sync_type): + end_message = { + SyncProps.SYNC_TYPE: sync_type, + SyncProps.SYNC_MODE: SyncMode.INIT_SNAPSHOT, + SyncProps.EVENT_TYPE: EventAction.END_MESSAGE + } + return end_message + @abc.abstractmethod + def get_changes(self, sync_mode): + pass + + def make_pickleable(self, entities, sync_type, + sync_mode, fields_to_remove=[]): pickleable_entities = [] for entity in entities: @@ -39,17 +55,25 @@ class SynchronizerBase(object): entity.pop(field) self._add_sync_type(entity, sync_type) + self._add_sync_mode(entity, sync_mode) self._add_sampling_time(entity) pickleable_entities.append(entity) + if sync_mode == SyncMode.INIT_SNAPSHOT: + pickleable_entities.append(self._get_end_message(sync_type)) + return pickleable_entities @staticmethod def _add_sync_type(entity, sync_type): - if sync_type: + if SyncProps.SYNC_TYPE not in entity: entity[SyncProps.SYNC_TYPE] = sync_type @staticmethod def _add_sampling_time(entity): entity[SyncProps.SAMPLE_DATE] = str( vitrage.common.datetime_utils.utcnow()) + + @staticmethod + def _add_sync_mode(entity, sync_mode): + entity[SyncProps.SYNC_MODE] = sync_mode diff --git a/vitrage/synchronizer/services.py b/vitrage/synchronizer/services.py index 3583b131c..258ba83a5 100644 --- a/vitrage/synchronizer/services.py +++ b/vitrage/synchronizer/services.py @@ -1,4 +1,5 @@ # Copyright 2016 - Alcatel-Lucent +# Copyright 2016 - Nokia # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -15,7 +16,6 @@ from oslo_log import log from oslo_service import service as os_service -from vitrage.common.constants import SynchronizerProperties as SyncProps from vitrage.common.constants import SyncMode LOG = log.getLogger(__name__) @@ -40,17 +40,19 @@ class SnapshotsService(SynchronizerService): def start(self): LOG.info("Start VitrageSnapshotsService") + super(SnapshotsService, self).start() interval = self.conf.synchronizer.snapshots_interval self.tg.add_timer(interval, self._get_all) + LOG.info("Finish start VitrageSnapshotsService") def stop(self): - LOG.info("Stop VitrageSynchronizerService") + LOG.info("Stop VitrageSnapshotsService") super(SnapshotsService, self).stop() - LOG.info("Finish stop VitrageSynchronizerService") + LOG.info("Finish stop VitrageSnapshotsService") def _get_all(self): sync_mode = SyncMode.INIT_SNAPSHOT \ @@ -58,15 +60,44 @@ class SnapshotsService(SynchronizerService): LOG.debug("start get all with sync mode %s" % sync_mode) for plugin in self.registered_plugins: - entities_dictionaries = \ - self._mark_snapshot_entities(plugin.get_all(), sync_mode) + entities_dictionaries = plugin.get_all(sync_mode) for entity in entities_dictionaries: self.callback_function(entity) LOG.debug("end get all with sync mode %s" % sync_mode) self.first_time = False - @staticmethod - def _mark_snapshot_entities(dicts, sync_mode): - [x.setdefault(SyncProps.SYNC_MODE, sync_mode) for x in dicts] - return dicts + +class ChangesService(SynchronizerService): + + def __init__(self, conf, registered_plugins, changes_interval): + super(ChangesService, self).__init__(conf, registered_plugins) + self.changes_interval = changes_interval + + def start(self): + LOG.info("Start VitrageChangesService - %s", + self.registered_plugins[0].__class__.__name__) + + super(ChangesService, self).start() + self.tg.add_timer(self.changes_interval, self._get_changes) + + LOG.info("Finish start VitrageChangesService - %s", + self.registered_plugins[0].__class__.__name__) + + def stop(self): + LOG.info("Stop VitrageChangesService - %s", + self.registered_plugins[0].__class__.__name__) + + super(ChangesService, self).stop() + + LOG.info("Finish stop VitrageChangesService - %s", + self.registered_plugins[0].__class__.__name__) + + def _get_changes(self): + LOG.debug("start get changes") + + for plugin in self.registered_plugins: + for entity in plugin.get_changes(SyncMode.UPDATE): + self.callback_function(entity) + + LOG.debug("end get changes") diff --git a/vitrage/tests/resources/static_plugins/changes_plugins/switch_to_host_plugin.yaml b/vitrage/tests/resources/static_plugins/changes_plugins/switch_to_host_plugin.yaml new file mode 100644 index 000000000..181ad7b0e --- /dev/null +++ b/vitrage/tests/resources/static_plugins/changes_plugins/switch_to_host_plugin.yaml @@ -0,0 +1,41 @@ +entities: + - name: switch-1 + id: 12345 + sync_type: switch + state: available + relationships: + - sync_type: nova.host + name: host-1 + id: 1 + relation_type: contains + - sync_type: nova.host + name: host-2 + id: 2 + relation_type: contains + - name: switch-3 + id: 34567 + sync_type: switch + state: available + relationships: + - sync_type: nova.host + name: host-4 + id: 4 + relation_type: contains + - name: switch-4 + id: 45678 + sync_type: switch + state: error + relationships: + - sync_type: nova.host + name: host-2 + id: 2 + relation_type: contains + - name: switch-5 + id: 56789 + sync_type: switch + state: error + relationships: + - sync_type: nova.host + name: host-3 + id: 3 + relation_type: contains \ No newline at end of file diff --git a/vitrage/tests/resources/static_plugins/changes_plugins/switch_to_switch_plugin.yaml b/vitrage/tests/resources/static_plugins/changes_plugins/switch_to_switch_plugin.yaml new file mode 100644 index 000000000..c80dbcfeb --- /dev/null +++ b/vitrage/tests/resources/static_plugins/changes_plugins/switch_to_switch_plugin.yaml @@ -0,0 +1,19 @@ +entities: + - name: switch-1 + id: 12345 + sync_type: switch + state: available + relationships: + - sync_type: switch + name: switch-2 + id: 23456 + relation_type: contains + - name: switch-2 + id: 23456 + sync_type: switch + state: available + relationships: + - sync_type: switch + name: switch-3 + id: 34567 + relation_type: contains \ No newline at end of file diff --git a/vitrage/tests/unit/synchronizer/static_plugin/test_static_physical_synchronizer.py b/vitrage/tests/unit/synchronizer/static_plugin/test_static_physical_synchronizer.py new file mode 100644 index 000000000..a077c2aba --- /dev/null +++ b/vitrage/tests/unit/synchronizer/static_plugin/test_static_physical_synchronizer.py @@ -0,0 +1,120 @@ +# Copyright 2016 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os + +from oslo_config import cfg +from oslo_log import log as logging + +from vitrage.common.constants import EntityType +from vitrage.common.constants import EventAction +from vitrage.common.constants import SynchronizerProperties as SyncProps +from vitrage.common.constants import SyncMode +from vitrage.common.constants import VertexProperties as VProps +from vitrage.common import file_utils +from vitrage.synchronizer.plugins.static_physical import synchronizer +from vitrage.tests import base +from vitrage.tests.mocks import utils + + +LOG = logging.getLogger(__name__) + + +class TestStaticPhysicalSynchronizer(base.BaseTest): + + OPTS = [ + cfg.StrOpt('static_plugins_dir', + default=utils.get_resources_dir() + '/static_plugins', + ), + ] + + CHANGES_OPTS = [ + cfg.StrOpt('static_plugins_dir', + default=utils.get_resources_dir() + '/static_plugins/' + 'changes_plugins', + ), + ] + + def setUp(self): + super(TestStaticPhysicalSynchronizer, self).setUp() + self.conf = cfg.ConfigOpts() + self.conf.register_opts(self.OPTS, group='synchronizer_plugins') + self.static_physical_synchronizer = \ + synchronizer.StaticPhysicalSynchronizer(self.conf) + + def test_static_plugins_loader(self): + # Setup + total_static_plugins = \ + os.listdir(self.conf.synchronizer_plugins.static_plugins_dir) + + # Action + static_configs = file_utils.load_yaml_files( + self.conf.synchronizer_plugins.static_plugins_dir) + + # Test assertions + # -1 is because there are 2 files and a folder in static_plugins_dir + self.assertEqual(len(total_static_plugins) - 1, len(static_configs)) + + def test_get_all(self): + # Action + static_entities = self.static_physical_synchronizer.get_all( + SyncMode.UPDATE) + + # Test assertions + self.assertEqual(5, len(static_entities)) + + def test_get_changes(self): + # Setup + entities = self.static_physical_synchronizer.get_all(SyncMode.UPDATE) + self.assertEqual(5, len(entities)) + + self.conf = cfg.ConfigOpts() + self.conf.register_opts(self.CHANGES_OPTS, + group='synchronizer_plugins') + self.static_physical_synchronizer.cfg = self.conf + + # Action + changes = self.static_physical_synchronizer.get_changes( + EventAction.UPDATE) + + # Test Assertions + status = any(change[SyncProps.SYNC_TYPE] == EntityType.SWITCH and + change[VProps.ID] == '12345' for change in changes) + self.assertEqual(False, status) + + status = any(change[SyncProps.SYNC_TYPE] == EntityType.SWITCH and + change[VProps.ID] == '23456' and + change[SyncProps.EVENT_TYPE] == 'delete' + for change in changes) + self.assertEqual(True, status) + + status = any(change[SyncProps.SYNC_TYPE] == EntityType.SWITCH and + change[VProps.ID] == '34567' for change in changes) + self.assertEqual(True, status) + + status = any(change[SyncProps.SYNC_TYPE] == EntityType.SWITCH and + change[VProps.ID] == '45678' for change in changes) + self.assertEqual(True, status) + status = any(change[SyncProps.SYNC_TYPE] == EntityType.SWITCH and + change[VProps.ID] == '56789' for change in changes) + self.assertEqual(True, status) + + self.assertEqual(4, len(changes)) + + # Action + changes = self.static_physical_synchronizer.get_changes( + EventAction.UPDATE) + + # Test Assertions + self.assertEqual(0, len(changes)) diff --git a/vitrage/tests/unit/synchronizer/static_plugin/test_static_physical_transformer.py b/vitrage/tests/unit/synchronizer/static_plugin/test_static_physical_transformer.py index c7000c8e8..c33488336 100644 --- a/vitrage/tests/unit/synchronizer/static_plugin/test_static_physical_transformer.py +++ b/vitrage/tests/unit/synchronizer/static_plugin/test_static_physical_transformer.py @@ -23,7 +23,7 @@ from vitrage.common.constants import SynchronizerProperties as SyncProps from vitrage.common.constants import VertexProperties as VProps from vitrage.synchronizer.plugins.nova.host.transformer import HostTransformer from vitrage.synchronizer.plugins.static_physical.transformer \ - import StaticPhysical + import StaticPhysicalTransformer from vitrage.synchronizer.plugins.transformer_base import TransformerBase from vitrage.tests import base from vitrage.tests.mocks import mock_syncronizer as mock_sync @@ -38,7 +38,7 @@ class TestStaticPhysicalTransformer(base.BaseTest): self.transformers = {} host_transformer = HostTransformer(self.transformers) - static_transformer = StaticPhysical(self.transformers) + static_transformer = StaticPhysicalTransformer(self.transformers) self.transformers[EntityType.NOVA_HOST] = host_transformer self.transformers[EntityType.SWITCH] = static_transformer @@ -51,7 +51,7 @@ class TestStaticPhysicalTransformer(base.BaseTest): switch_type = EntityType.SWITCH switch_name = 'switch-1' timestamp = datetime.datetime.utcnow() - static_transformer = StaticPhysical(self.transformers) + static_transformer = StaticPhysicalTransformer(self.transformers) # Test action properties = { @@ -65,7 +65,7 @@ class TestStaticPhysicalTransformer(base.BaseTest): observed_id_values = placeholder.vertex_id.split( TransformerBase.KEY_SEPARATOR) expected_id_values = \ - StaticPhysical(self.transformers).key_values( + StaticPhysicalTransformer(self.transformers).key_values( [switch_type, switch_name]) self.assertEqual(observed_id_values, expected_id_values) @@ -90,7 +90,7 @@ class TestStaticPhysicalTransformer(base.BaseTest): # Test setup switch_type = EntityType.SWITCH switch_name = 'switch-1' - static_transformer = StaticPhysical(self.transformers) + static_transformer = StaticPhysicalTransformer(self.transformers) # Test action observed_key_fields = static_transformer.key_values( @@ -112,7 +112,8 @@ class TestStaticPhysicalTransformer(base.BaseTest): for event in static_events: # Test action - wrapper = StaticPhysical(self.transformers).transform(event) + wrapper = StaticPhysicalTransformer(self.transformers).\ + transform(event) # Test assertions vertex = wrapper.vertex diff --git a/vitrage/tests/unit/synchronizer/static_plugin/test_static_plugin.py b/vitrage/tests/unit/synchronizer/static_plugin/test_static_plugin.py deleted file mode 100644 index a4b20fd16..000000000 --- a/vitrage/tests/unit/synchronizer/static_plugin/test_static_plugin.py +++ /dev/null @@ -1,63 +0,0 @@ -# Copyright 2016 - Nokia -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os - -from oslo_config import cfg -from oslo_log import log as logging - -from vitrage.common import file_utils -from vitrage.tests import base -from vitrage.tests.mocks import utils - - -LOG = logging.getLogger(__name__) - - -class TestStaticPlugin(base.BaseTest): - - OPTS = [ - cfg.StrOpt('static_plugins_dir', - default=utils.get_resources_dir() + '/static_plugins', - ), - ] - - def setUp(self): - super(TestStaticPlugin, self).setUp() - self.static_dir_path = utils.get_resources_dir() + '/static_plugins' - self.conf = cfg.ConfigOpts() - self.conf.register_opts(self.OPTS, group='synchronizer_plugins') - - def test_static_plugins_loader(self): - # Setup - total_static_plugins = os.listdir(self.static_dir_path) - - # Action - static_configs = file_utils.load_yaml_files( - self.conf.synchronizer_plugins.static_plugins_dir) - - # Test assertions - self.assertEqual(len(total_static_plugins), len(static_configs)) - - def test_number_of_entities(self): - static_entities = [] - static_plugin_configs = file_utils.load_yaml_files( - self.conf.synchronizer_plugins.static_plugins_dir) - - for config in static_plugin_configs: - for entity in config['entities']: - static_entities.append(entity) - - # Test assertions - self.assertEqual(5, len(static_entities))