Vitrage Persistor Service

Change-Id: I4c70157ef8380825bcef844b572b1747ce95b262
This commit is contained in:
Muhamad Najjar 2017-11-19 10:00:42 +00:00
parent cb5b6c8a30
commit c708a6784e
13 changed files with 437 additions and 10 deletions

View File

@ -305,6 +305,7 @@ function start_vitrage {
run_process vitrage-graph "$VITRAGE_BIN_DIR/vitrage-graph --config-file $VITRAGE_CONF"
run_process vitrage-notifier "$VITRAGE_BIN_DIR/vitrage-notifier --config-file $VITRAGE_CONF"
run_process vitrage-ml "$VITRAGE_BIN_DIR/vitrage-ml --config-file $VITRAGE_CONF"
run_process vitrage-persistor "$VITRAGE_BIN_DIR/vitrage-persistor --config-file $VITRAGE_CONF"
write_systemd_dependency vitrage-graph vitrage-collector
@ -334,7 +335,7 @@ function stop_vitrage {
disable_apache_site vitrage
restart_apache_server
fi
for serv in vitrage-api vitrage-collector vitrage-graph vitrage-notifier; do
for serv in vitrage-api vitrage-collector vitrage-graph vitrage-notifier vitrage-persistor; do
stop_process $serv
done
}

View File

@ -9,7 +9,8 @@ enable_service vitrage-notifier
enable_service vitrage-collector
# machine_learning
enable_service vitrage-ml
# Persistor
enable_service vitrage-persistor
# Default directories
VITRAGE_DIR=$DEST/vitrage

View File

@ -29,6 +29,7 @@ console_scripts =
vitrage-graph = vitrage.cli.graph:main
vitrage-notifier = vitrage.cli.notifier:main
vitrage-collector = vitrage.cli.collector:main
vitrage-persistor = vitrage.cli.persistor:main
vitrage-ml = vitrage.cli.machine_learning:main
vitrage-dbsync = vitrage.cli.storage:dbsync

37
vitrage/cli/persistor.py Normal file
View File

@ -0,0 +1,37 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from oslo_log import log
from oslo_service import service as os_service
from vitrage.cli import VITRAGE_TITLE
from vitrage.persistor.service import PersistorService
from vitrage import service
from vitrage import storage
LOG = log.getLogger(__name__)
def main():
print(VITRAGE_TITLE)
conf = service.prepare_service()
db_connection = storage.get_connection_from_config(conf)
launcher = os_service.ServiceLauncher(conf)
launcher.launch_service(PersistorService(conf, db_connection))
launcher.wait()
if __name__ == "__main__":
sys.exit(main())

View File

@ -26,12 +26,17 @@ class CollectorNotifier(object):
def __init__(self, conf):
self.oslo_notifier = None
try:
topic = conf.datasources.notification_topic_collector
topics = [conf.datasources.notification_topic_collector]
if conf.persistor.persist_events:
topics.append(conf.persistor.persistor_topic)
else:
LOG.warning("Not persisting events")
self.oslo_notifier = oslo_messaging.Notifier(
get_transport(conf),
driver='messagingv2',
publisher_id='datasources.events',
topics=[topic])
topics=topics)
except Exception as e:
LOG.info('Collector notifier - missing configuration %s'
% str(e))

View File

@ -28,6 +28,7 @@ import vitrage.machine_learning.plugins.jaccard_correlation
import vitrage.notifier
import vitrage.notifier.plugins.snmp
import vitrage.os_clients
import vitrage.persistor
import vitrage.rpc
import vitrage.storage
@ -46,6 +47,7 @@ def list_opts():
('evaluator', vitrage.evaluator.OPTS),
('consistency', vitrage.entity_graph.consistency.OPTS),
('database', vitrage.storage.OPTS),
('persistor', vitrage.persistor.OPTS),
('entity_graph', vitrage.entity_graph.OPTS),
('service_credentials', vitrage.keystone_client.OPTS),
('machine_learning',

View File

@ -0,0 +1,25 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
OPTS = [
cfg.StrOpt('persistor_topic',
default='vitrage_persistor',
help='The topic on which event will be sent from the '
'datasources to the persistor'),
cfg.BoolOpt('persist_events',
default=False,
help='Whether or not persistor is persisting the events'),
]

View File

@ -0,0 +1,78 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import dateutil.parser
import oslo_messaging as oslo_m
from oslo_log import log
from oslo_serialization import jsonutils
from oslo_service import service as os_service
from vitrage.common.constants import DatasourceProperties as DSProps
from vitrage.common.constants import GraphAction
from vitrage import messaging
from vitrage.storage.sqlalchemy import models
LOG = log.getLogger(__name__)
class PersistorService(os_service.Service):
def __init__(self, conf, db_connection):
super(PersistorService, self).__init__()
self.conf = conf
self.db_connection = db_connection
transport = messaging.get_transport(conf)
target = \
oslo_m.Target(topic=conf.persistor.persistor_topic)
self.listener = messaging.get_notification_listener(
transport, [target],
[VitragePersistorEndpoint(self.db_connection)])
def start(self):
LOG.info("Vitrage Persistor Service - Starting...")
super(PersistorService, self).start()
self.listener.start()
LOG.info("Vitrage Persistor Service - Started!")
def stop(self, graceful=False):
LOG.info("Vitrage Persistor Service - Stopping...")
self.listener.stop()
self.listener.wait()
super(PersistorService, self).stop(graceful)
LOG.info("Vitrage Persistor Service - Stopped!")
class VitragePersistorEndpoint(object):
def __init__(self, db_connection):
self.db_connection = db_connection
def info(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.debug('Vitrage Event Info: payload %s', payload)
self.process_event(payload)
def process_event(self, data):
""":param data: Serialized to a JSON formatted ``str`` """
if data.get(DSProps.EVENT_TYPE) == GraphAction.END_MESSAGE:
return
collector_timestamp = \
dateutil.parser.parse(data.get(DSProps.SAMPLE_DATE))
event_row = models.Event(payload=jsonutils.dumps(data),
collector_timestamp=collector_timestamp)
self.db_connection.events.create(event_row)

View File

@ -27,6 +27,10 @@ class Connection(object):
def active_actions(self):
return None
@property
def events(self):
return None
@abc.abstractmethod
def upgrade(self, nocreate=False):
raise NotImplementedError('upgrade not implemented')
@ -42,7 +46,6 @@ class Connection(object):
@six.add_metaclass(abc.ABCMeta)
class ActiveActionsConnection(object):
@abc.abstractmethod
def create(self, active_action):
"""Create a new action.
@ -87,3 +90,41 @@ class ActiveActionsConnection(object):
):
"""Delete all active actions that match the filters."""
raise NotImplementedError('delete active actions not implemented')
@six.add_metaclass(abc.ABCMeta)
class EventsConnection(object):
def create(self, event):
"""Create a new event.
:type event: vitrage.storage.sqlalchemy.models.Event
"""
raise NotImplementedError('create event not implemented')
def update(self, event):
"""Update an existing event.
:type event: vitrage.storage.sqlalchemy.models.Event
"""
raise NotImplementedError('update event not implemented')
def query(self,
event_id=None,
collector_timestamp=None,
payload=None,
gt_collector_timestamp=None,
lt_collector_timestamp=None):
"""Yields a lists of events that match filters.
:rtype: list of vitrage.storage.sqlalchemy.models.Event
"""
raise NotImplementedError('query events not implemented')
def delete(self,
event_id=None,
collector_timestamp=None,
payload=None,
gt_collector_timestamp=None,
lt_collector_timestamp=None):
"""Delete all events that match the filters."""
raise NotImplementedError('delete events not implemented')

View File

@ -27,7 +27,6 @@ LOG = log.getLogger(__name__)
class Connection(base.Connection):
def __init__(self, conf, url):
options = dict(conf.database.items())
# set retries to 0 , since reconnection is already implemented
@ -40,11 +39,16 @@ class Connection(base.Connection):
**options)
self.conf = conf
self._active_actions = ActiveActionsConnection(self._engine_facade)
self._events = EventsConnection(self._engine_facade)
@property
def active_actions(self):
return self._active_actions
@property
def events(self):
return self._events
@staticmethod
def _dress_url(url):
# If no explicit driver has been set, we default to pymysql
@ -135,3 +139,66 @@ class ActiveActionsConnection(base.ActiveActionsConnection, BaseTableConn):
score=score,
trigger=trigger)
return query.delete()
class EventsConnection(base.EventsConnection, BaseTableConn):
def __init__(self, engine_facade):
super(EventsConnection, self).__init__(engine_facade)
def create(self, event):
session = self._engine_facade.get_session()
with session.begin():
session.add(event)
def update(self, event):
session = self._engine_facade.get_session()
with session.begin():
session.merge(event)
def query(self,
event_id=None,
collector_timestamp=None,
payload=None,
gt_collector_timestamp=None,
lt_collector_timestamp=None):
query = self.query_filter(
models.Event,
event_id=event_id,
collector_timestamp=collector_timestamp,
payload=payload)
query = self._update_query_gt_lt(gt_collector_timestamp,
lt_collector_timestamp,
query)
return query.all()
@staticmethod
def _update_query_gt_lt(gt_collector_timestamp,
lt_collector_timestamp,
query):
if gt_collector_timestamp is not None:
query = query.filter(models.Event.collector_timestamp >=
gt_collector_timestamp)
if lt_collector_timestamp is not None:
query = query.filter(models.Event.collector_timestamp <=
lt_collector_timestamp)
return query
def delete(self,
event_id=None,
collector_timestamp=None,
payload=None,
gt_collector_timestamp=None,
lt_collector_timestamp=None):
query = self.query_filter(
models.Event,
event_id=event_id,
collector_timestamp=collector_timestamp,
payload=payload)
query = self._update_query_gt_lt(gt_collector_timestamp,
lt_collector_timestamp,
query)
query.delete()

View File

@ -13,12 +13,12 @@
# under the License.
from oslo_db.sqlalchemy import models
from sqlalchemy import Column, String, SmallInteger, BigInteger, Index
from sqlalchemy import Column, DateTime, INTEGER, String, \
SmallInteger, BigInteger, Index, Text
from sqlalchemy.ext.declarative import declarative_base
class VitrageBase(models.TimestampMixin, models.ModelBase):
class VitrageBase(models.ModelBase):
"""Base class for Vitrage Models."""
__table_args__ = {'mysql_charset': "utf8",
'mysql_engine': "InnoDB"}
@ -39,7 +39,29 @@ class VitrageBase(models.TimestampMixin, models.ModelBase):
Base = declarative_base(cls=VitrageBase)
class ActiveAction(Base):
class Event(Base):
__tablename__ = 'events'
event_id = Column("id", INTEGER, primary_key=True, nullable=False,
autoincrement=True)
collector_timestamp = Column(DateTime, index=True, nullable=False)
payload = Column(Text, nullable=False)
def __repr__(self):
return \
"<Event(" \
"id='%s', " \
"collector_timestamp='%s', " \
"payload='%s')>" %\
(
self.event_id,
self.collector_timestamp,
self.payload
)
class ActiveAction(Base, models.TimestampMixin):
__tablename__ = 'active_actions'
__table_args__ = (
# Index 'ix_active_action' on fields:

View File

@ -0,0 +1,14 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__author__ = 'stack'

View File

@ -0,0 +1,133 @@
# Copyright 2017 Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import six
from oslo_log import log as logging
from oslo_serialization import jsonutils
from vitrage.common.constants import DatasourceProperties as DSProps
from vitrage.datasources import NEUTRON_PORT_DATASOURCE
from vitrage.datasources import NOVA_INSTANCE_DATASOURCE
from vitrage import storage
from vitrage_tempest_tests.tests.base import BaseVitrageTempest
from vitrage_tempest_tests.tests.common import nova_utils
LOG = logging.getLogger(__name__)
INSTANCE_NAME = 'test-persistor-vm'
INSTANCE_CREATE_EVENT = {
DSProps.ENTITY_TYPE: NOVA_INSTANCE_DATASOURCE,
DSProps.EVENT_TYPE: 'compute.instance.create.end',
'hostname': INSTANCE_NAME + '-0'
}
PORT_CREATE_EVENT = {
DSProps.ENTITY_TYPE: NEUTRON_PORT_DATASOURCE,
DSProps.EVENT_TYPE: 'port.create.end',
}
PORT_UPDATE_EVENT = {
DSProps.ENTITY_TYPE: NEUTRON_PORT_DATASOURCE,
DSProps.EVENT_TYPE: 'port.update.end',
}
def get_first_match(events, event):
for curr_event in events:
if six.viewitems(event) <= six.viewitems(curr_event.payload):
return curr_event
class TestEvents(BaseVitrageTempest):
"""Test class for Vitrage persisror service"""
# noinspection PyPep8Naming
@classmethod
def setUpClass(cls):
super(TestEvents, cls).setUpClass()
cls.db_connection = storage.get_connection_from_config(cls.conf)
def test_create_instance(self):
"""This function validates creating instance events.
Create instance generates three ordered events.
1. neutron port is created.
2. the port is updated to the created instance.
3. nova instance is created with the given hostname.
"""
try:
# Action
time_before_action = datetime.datetime.utcnow()
nova_utils.create_instances(num_instances=1,
name=INSTANCE_NAME)
writen_events = self._load_db_events(time_before_action)
port_create_event = get_first_match(writen_events,
PORT_CREATE_EVENT)
self.assertIsNotNone(port_create_event,
"port.create.end event is not writen to db")
port_update_event = get_first_match(writen_events,
PORT_UPDATE_EVENT)
self.assertIsNotNone(port_update_event,
"port.update.end event is not writen to db")
instance_create_event = get_first_match(writen_events,
INSTANCE_CREATE_EVENT)
self.assertIsNotNone(instance_create_event,
"compute.instance.create.end event is not "
"writen to db")
# Check correct timestamp order
events_timestamp_list = \
[port_create_event.collector_timestamp,
port_update_event.collector_timestamp,
instance_create_event.collector_timestamp]
self.assertEqual(sorted(events_timestamp_list),
events_timestamp_list,
"Events Timestamp order is wrong")
# Check correct event_id order
events_id_list = \
[port_create_event.event_id,
port_update_event.event_id,
instance_create_event.event_id]
self.assertEqual(sorted(events_id_list),
events_id_list,
"Events id order is wrong")
except Exception as e:
self._handle_exception(e)
raise
finally:
nova_utils.delete_all_instances()
def _load_db_events(self, time_before_action):
writen_events = self.db_connection.events.query(
gt_collector_timestamp=time_before_action)
for event in writen_events:
event.payload = jsonutils.loads(event.payload)
return writen_events