Kubernetes datasource

currently we support only k8s cluster that runs on top of Nova

Implements: k8s-datasource
Change-Id: I47057bac9cbac5d856fed74adb24583d2b090d06
This commit is contained in:
Idan Kinory 2018-05-31 10:35:17 +00:00
parent 3c576e4e96
commit 77edb8779b
11 changed files with 501 additions and 2 deletions

View File

@ -31,7 +31,8 @@ OPTS = [
NOVA_ZONE_DATASOURCE,
CINDER_VOLUME_DATASOURCE,
NEUTRON_PORT_DATASOURCE,
NEUTRON_NETWORK_DATASOURCE],
NEUTRON_NETWORK_DATASOURCE,
],
help='Names of supported data sources'),
cfg.ListOpt('path',
default=['vitrage.datasources'],

View File

@ -0,0 +1,44 @@
# Copyright 2018 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from vitrage.common.constants import DatasourceOpts as DSOpts
from vitrage.common.constants import UpdateMethod
OPTS = [
cfg.StrOpt(DSOpts.TRANSFORMER,
default='vitrage.datasources.kubernetes.transformer.'
'KubernetesTransformer',
help='Kubernetes transformer class path',
required=True),
cfg.StrOpt(DSOpts.DRIVER,
default='vitrage.datasources.kubernetes.driver.'
'KubernetesDriver',
help='Kubernetes driver class path',
required=True),
cfg.StrOpt(DSOpts.UPDATE_METHOD,
default=UpdateMethod.PULL,
help='None: updates only via Vitrage periodic snapshots.'
'Pull: updates every [changes_interval] seconds.'
'Push: updates by getting notifications from the'
' datasource itself.',
required=True),
cfg.IntOpt(DSOpts.CHANGES_INTERVAL,
default=20,
min=10,
help='interval between checking changes in kubernetes cluster'),
cfg.StrOpt(DSOpts.CONFIG_FILE,
help='kubernetes cluster configuration file'),
]

View File

@ -0,0 +1,75 @@
# Copyright 2018 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kubernetes import client
from kubernetes import config
from oslo_log import log
from vitrage.datasources.driver_base import DriverBase
from vitrage.datasources.kubernetes.properties import KUBERNETES_DATASOURCE
from vitrage.datasources.kubernetes.properties import KubernetesProperties\
as kubProp
LOG = log.getLogger(__name__)
class KubernetesDriver(DriverBase):
def __init__(self, conf):
super(KubernetesDriver, self).__init__()
self._client = None
self.conf = conf
@property
def client(self):
if not self._client:
self._client = self._k8s_client(self.conf)
return self._client
@staticmethod
def _k8s_client(conf):
try:
if not conf.kubernetes.config_file:
LOG.warning('kubernetes config file is not defined')
return
kubeconf = conf.kubernetes.config_file
config.load_kube_config(config_file=kubeconf)
k8s_client = client.CoreV1Api()
if k8s_client is None:
LOG.warning('k8s client returns None')
return
return k8s_client
except Exception as e:
LOG.exception('Create k8s client - Got Exception: %s', e)
def get_all(self, datasource_action):
return self.make_pickleable(self._prepare_entities(
self.client.list_node()),
KUBERNETES_DATASOURCE,
datasource_action)
def _prepare_entities(self, nodes):
entities = []
for item in nodes.items:
metadata = item.metadata
node_details = {
kubProp.NAME: metadata.name,
kubProp.UID: metadata.uid,
kubProp.CREATION_TIMESTAMP: metadata.creation_timestamp,
kubProp.EXTERNALID: item.spec.external_id,
kubProp.PROVIDER_NAME: item.spec.provider_id.split(":///")[0],
}
entities.append(node_details)
return [{'resources': entities}]

View File

@ -0,0 +1,41 @@
# Copyright 2018 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
KUBERNETES_DATASOURCE = 'kubernetes'
class KubernetesProperties(object):
NAME = 'name'
METADATA = 'metadata'
NETWORK = 'network'
ADDRESS = 'address'
STATUS = 'status'
ADDRESSES = 'addresses'
TYPE = 'type'
INTERNALIP = 'internal_ip'
EXTERNALIP = 'external_ip'
UID = 'uid'
EXTERNALID = 'external_id'
PROVIDERID = 'provider_id'
PROVIDER_NAME = 'provider_name'
SPEC = 'spec'
CREATION_TIMESTAMP = 'creation_timestamp'
RESOURCES = 'resources'
class KubeClusterProperies(object):
KUBERNETES_CLUSTER = 'kubernetes.cluster'
CLUSTER_ID = 'Kubernetes Cluster'
AVAILABLE = 'available'

View File

@ -0,0 +1,111 @@
# Copyright 2018 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from vitrage.common.constants import DatasourceProperties as DSProps
from vitrage.common.constants import EdgeLabel
from vitrage.common.constants import EntityCategory
from vitrage.common.constants import VertexProperties as VProps
from vitrage.datasources.resource_transformer_base import \
ResourceTransformerBase
from vitrage.datasources.transformer_base import extract_field_value
import vitrage.graph.utils as graph_utils
from vitrage.datasources import NOVA_INSTANCE_DATASOURCE
from vitrage.datasources import transformer_base as tbase
from vitrage.datasources.kubernetes.properties import KUBERNETES_DATASOURCE
from vitrage.datasources.kubernetes.properties import \
KubernetesProperties as kubProp
from vitrage.utils import file as file_utils
LOG = logging.getLogger(__name__)
class KubernetesTransformer(ResourceTransformerBase):
def __init__(self, transformers, conf):
super(KubernetesTransformer, self).__init__(transformers, conf)
self.conf = conf
def _create_vertex(self, entity_event):
metadata = {
VProps.NAME: self._get_cluster_name(),
}
entity_key = self._create_entity_key(entity_event)
vitrage_sample_timestamp = entity_event[DSProps.SAMPLE_DATE]
update_timestamp = self._format_update_timestamp(
extract_field_value(entity_event, DSProps.SAMPLE_DATE),
vitrage_sample_timestamp)
return graph_utils.create_vertex(
entity_key,
vitrage_category=EntityCategory.RESOURCE,
vitrage_type=KUBERNETES_DATASOURCE,
vitrage_sample_timestamp=vitrage_sample_timestamp,
update_timestamp=update_timestamp,
metadata=metadata)
def _create_snapshot_entity_vertex(self, entity_event):
return self._create_vertex(entity_event)
def _create_update_entity_vertex(self, entity_event):
return self._create_vertex(entity_event)
def _create_snapshot_neighbors(self, entity_event):
return self._create_node_neighbors(entity_event)
def _create_update_neighbors(self, entity_event):
return self._create_node_neighbors(entity_event)
def _create_entity_key(self, event):
key_fields = self._key_values(KUBERNETES_DATASOURCE,
self._get_cluster_name())
key = tbase.build_key(key_fields)
return key
def get_vitrage_type(self):
return KUBERNETES_DATASOURCE
def _get_cluster_name(self):
kubeconf = file_utils.load_yaml_file(self.conf.kubernetes.config_file)
contexts = kubeconf['contexts']
for context in contexts:
if context['name'] == kubeconf['current-context']:
cluster_name = context['context']['cluster']
return cluster_name
def _create_node_neighbors(self, entity_event):
"""neighbors are existing Nova instances only"""
neighbors = []
for neighbor in entity_event[kubProp.RESOURCES]:
neighbor[DSProps.ENTITY_TYPE] = entity_event[DSProps.ENTITY_TYPE]
neighbor[DSProps.DATASOURCE_ACTION] = \
entity_event[DSProps.DATASOURCE_ACTION]
neighbor[DSProps.SAMPLE_DATE] = entity_event[DSProps.SAMPLE_DATE]
neighbor_id = neighbor[kubProp.EXTERNALID]
neighbor_datasource_type = NOVA_INSTANCE_DATASOURCE
neighbors.append(self._create_neighbor(neighbor,
neighbor_id,
neighbor_datasource_type,
EdgeLabel.COMPRISED,
is_entity_source=True))
return neighbors

View File

@ -78,7 +78,6 @@ class Processor(processor.ProcessorBase):
"""
LOG.debug('Add entity to entity graph:\n%s', new_vertex)
self._add_resource_details_to_alarm(new_vertex, neighbors)
self.entity_graph.add_vertex(new_vertex)
self._connect_neighbors(neighbors, set(), GraphAction.CREATE_ENTITY)

View File

@ -569,3 +569,19 @@ def simple_aodh_alarm_notification_generators(alarm_num,
]
return tg.get_trace_generators(test_entity_spec_list)
def simple_k8s_nodes_generators(nodes_num, snapshot_events=0):
mapping = ['vm-{0}'.format(index) for index in range(nodes_num)]
test_entity_spec_list = []
if snapshot_events:
test_entity_spec_list.append(
{tg.DYNAMIC_INFO_FKEY: tg.DRIVER_KUBE_SNAPSHOT_D,
tg.STATIC_INFO_FKEY: tg.DRIVER_INST_SNAPSHOT_S,
tg.MAPPING_KEY: mapping,
tg.NAME_KEY: 'Nodes snapshot generator',
tg.NUM_EVENTS: snapshot_events
}
)
return tg.get_trace_generators(test_entity_spec_list)

View File

@ -66,6 +66,7 @@ DRIVER_STACK_UPDATE_D = 'driver_stack_update_dynamic.json'
DRIVER_STACK_SNAPSHOT_D = 'driver_stack_snapshot_dynamic.json'
DRIVER_CONSISTENCY_UPDATE_D = 'driver_consistency_update_dynamic.json'
DRIVER_ZONE_SNAPSHOT_D = 'driver_zone_snapshot_dynamic.json'
DRIVER_KUBE_SNAPSHOT_D = 'driver_kubernetes_snapshot_dynamic.json'
# Mock transformer Specs (i.e., what the transformer outputs)
@ -119,6 +120,7 @@ class EventTraceGenerator(object):
{DRIVER_AODH_UPDATE_D: _get_aodh_alarm_update_driver_values,
DRIVER_DOCTOR_UPDATE_D: _get_doctor_update_driver_values,
DRIVER_COLLECTD_UPDATE_D: _get_collectd_update_driver_values,
DRIVER_KUBE_SNAPSHOT_D: _get_k8s_node_snapshot_driver_values,
DRIVER_INST_SNAPSHOT_D: _get_vm_snapshot_driver_values,
DRIVER_INST_UPDATE_D: _get_vm_update_driver_values,
DRIVER_HOST_SNAPSHOT_D: _get_host_snapshot_driver_values,
@ -257,6 +259,24 @@ def _get_host_snapshot_driver_values(spec):
return static_values
def _get_k8s_node_snapshot_driver_values(spec):
"""Generates the static driver values for each k8s node.
:param spec: specification of event generation.
:type spec: dict
:return: list of static driver values for each k8s node.
:rtype: list
"""
vm_host_mapping = spec[MAPPING_KEY]
static_values = []
for node_name in vm_host_mapping:
static_values.append(combine_data(node_name, None,
spec.get(EXTERNAL_INFO_KEY)))
return static_values
def _get_doctor_update_driver_values(spec):
"""Generates the static driver values for Doctor monitor notification.

View File

@ -0,0 +1,19 @@
apiVersion: v1
clusters:
- cluster:
certificate-authority-data:
server: https://127.0.0.1:8080
name: kubernetes
contexts:
- context:
cluster: kubernetes
user: kubernetes-admin
name: kubernetes-admin@kubernetes
current-context: kubernetes-admin@kubernetes
kind: Config
preferences: {}
users:
- name: kubernetes-admin
user:
client-certificate-data:
client-key-data:

View File

@ -0,0 +1,11 @@
{"resources": [{
"uid": "68011206-d4d6-11e7-9c63-fa163e2e2123",
"provider_name": "openstack",
"creation_timestamp": "e2017-11-29T07:24:59Z",
"external_id": "41c40aab-80e9-4bb6-a280-27976bfc811f",
"network": ["172.16.1.12", "10.5.138.49"],
"name": "vm-0"}],
"vitrage_datasource_action": "snapshot",
"vitrage_entity_type": "nova.instance",
"vitrage_sample_date": "2015-12-01T12:46:41Z"
}

View File

@ -0,0 +1,162 @@
# Copyright 2018 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
from testtools import matchers
from vitrage.common.constants import DatasourceAction
from vitrage.common.constants import DatasourceOpts as DSOpts
from vitrage.common.constants import DatasourceProperties as DSProps
from vitrage.common.constants import EntityCategory
from vitrage.common.constants import GraphAction
from vitrage.common.constants import UpdateMethod
from vitrage.common.constants import VertexProperties as VProps
from vitrage.datasources.kubernetes.properties import KUBERNETES_DATASOURCE
from vitrage.datasources.kubernetes.properties import KubernetesProperties \
as kubProp
from vitrage.datasources.kubernetes.transformer import KubernetesTransformer
from vitrage.datasources.nova.instance import NOVA_INSTANCE_DATASOURCE
from vitrage.datasources.nova.instance.transformer import InstanceTransformer
from vitrage.datasources import transformer_base as tbase
from vitrage.datasources.transformer_base import TransformerBase
from vitrage.tests import base
from vitrage.tests.mocks import mock_driver as mock_sync
LOG = logging.getLogger(__name__)
cluster_name = 'kubernetes'
class KubernetesTransformerTest(base.BaseTest):
OPTS = [
cfg.StrOpt(DSOpts.UPDATE_METHOD,
default=UpdateMethod.PULL),
cfg.StrOpt(DSOpts.CONFIG_FILE,
default='/opt/stack/vitrage/vitrage/tests/resources/'
'kubernetes/kubernetes_config.yaml'),
]
# noinspection PyAttributeOutsideInit,PyPep8Naming
@classmethod
def setUpClass(cls):
super(KubernetesTransformerTest, cls).setUpClass()
cls.transformers = {}
cls.conf = cfg.ConfigOpts()
cls.conf.register_opts(cls.OPTS, group=KUBERNETES_DATASOURCE)
cls.transformers[KUBERNETES_DATASOURCE] = KubernetesTransformer(
cls.transformers, cls.conf)
cls.transformers[NOVA_INSTANCE_DATASOURCE] = \
InstanceTransformer(cls.transformers, cls.conf)
def test_snapshot_event_transform(self):
LOG.debug('Test tactual transform action for '
'snapshot and snapshot init events')
k8s_spec_list = \
mock_sync.simple_k8s_nodes_generators(nodes_num=2,
snapshot_events=1)
nodes_events = mock_sync.generate_random_events_list(k8s_spec_list)
for event in nodes_events:
k8s_wrapper = self.transformers[KUBERNETES_DATASOURCE].transform(
event)
# Test assertions
self.assertEqual(cluster_name, k8s_wrapper.vertex[VProps.NAME])
n_length = str(len(k8s_wrapper.neighbors))
self.assertThat(n_length, matchers.HasLength(1),
'Cluster vertex has one neighbor')
self._validate_cluster_neighbors(k8s_wrapper.neighbors, event)
datasource_action = event[DSProps.DATASOURCE_ACTION]
if datasource_action == DatasourceAction.INIT_SNAPSHOT:
self.assertEqual(GraphAction.CREATE_ENTITY, k8s_wrapper.action)
elif datasource_action == DatasourceAction.SNAPSHOT:
self.assertEqual(GraphAction.UPDATE_ENTITY, k8s_wrapper.action)
def test_build_cluster_key(self):
LOG.debug('Test build cluster key')
# Test setup
expected_key = 'RESOURCE:kubernetes:kubernetes'
instance_transformer = self.transformers[NOVA_INSTANCE_DATASOURCE]
# Test action
key_fields = instance_transformer._key_values(
KUBERNETES_DATASOURCE,
cluster_name)
# Test assertions
observed_key = tbase.build_key(key_fields)
self.assertEqual(expected_key, observed_key)
def _validate_cluster_neighbors(self, neighbor, event):
# Create expected neigbor
time = event[DSProps.SAMPLE_DATE]
external_id = event['resources'][0][kubProp.EXTERNALID]
properties = {
VProps.ID: external_id,
VProps.VITRAGE_TYPE: NOVA_INSTANCE_DATASOURCE,
VProps.VITRAGE_CATEGORY: EntityCategory.RESOURCE,
VProps.VITRAGE_SAMPLE_TIMESTAMP: time
}
nova_instance_tran = self.transformers[NOVA_INSTANCE_DATASOURCE]
expected_neighbor = \
nova_instance_tran.create_neighbor_placeholder_vertex(**properties)
self.assertEqual(expected_neighbor, neighbor[0].vertex)
# Validate neighbor edge
edge = neighbor[0].edge
entity_key = \
self.transformers[KUBERNETES_DATASOURCE]._create_entity_key(event)
entity_uuid = \
TransformerBase.uuid_from_deprecated_vitrage_id(entity_key)
self.assertEqual(edge.source_id, entity_uuid)
self.assertEqual(edge.target_id, neighbor[0].vertex.vertex_id)
def test_create_entity_key(self):
LOG.debug('Test get key from kubernetes transformer')
# Test setup
spec_list = mock_sync.simple_k8s_nodes_generators(nodes_num=1,
snapshot_events=1)
nodes_events = mock_sync.generate_random_events_list(spec_list)
kubernetes_transformer = self.transformers[KUBERNETES_DATASOURCE]
for event in nodes_events:
# Test action
observed_key = kubernetes_transformer._create_entity_key(event)
# Test assertions
observed_key_fields = observed_key.split(
TransformerBase.KEY_SEPARATOR)
self.assertEqual(EntityCategory.RESOURCE, observed_key_fields[0])
self.assertEqual(
KUBERNETES_DATASOURCE,
observed_key_fields[1]
)
key_values = kubernetes_transformer._key_values(
KUBERNETES_DATASOURCE,
cluster_name)
expected_key = tbase.build_key(key_values)
self.assertEqual(expected_key, observed_key)