Get rid of ElasticSearch in export to CSV

ElasticSearch removed from requirements.txt.
Generation of installation structures added.
Network configuration added to skeleton.
Tox configuration fixed.

Change-Id: I533519d14ce0f49c70a6e2c454ae488e593d76ae
Implements: blueprint openstack-workload-statistics
This commit is contained in:
Alexander Kislitsky 2015-02-20 12:16:29 +03:00
parent 92ed03cac4
commit e129befac8
10 changed files with 295 additions and 270 deletions

View File

@ -19,21 +19,22 @@ from fuel_analytics.api.app import app
from fuel_analytics.api.app import db
from fuel_analytics.api.db.model import InstallationStructure as IS
from fuel_analytics.api.db.model import OpenStackWorkloadStats as OSWS
from fuel_analytics.api.resources.utils.es_client import ElasticSearchClient
from fuel_analytics.api.resources.utils.oswl_stats_to_csv import OswlStatsToCsv
from fuel_analytics.api.resources.utils.stats_to_csv import StatsToCsv
bp = Blueprint('clusters_to_csv', __name__)
def get_inst_structures(yield_per=1000):
return db.session.query(IS).order_by(IS.id).yield_per(yield_per)
@bp.route('/clusters', methods=['GET'])
def clusters_to_csv():
app.logger.debug("Handling clusters_to_csv get request")
es_client = ElasticSearchClient()
structures = es_client.get_structures()
inst_structures = get_inst_structures()
exporter = StatsToCsv()
result = exporter.export_clusters(structures)
result = exporter.export_clusters(inst_structures)
# NOTE: result - is generator, but streaming can not work with some
# WSGI middlewares: http://flask.pocoo.org/docs/0.10/patterns/streaming/

View File

@ -1,70 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from elasticsearch import Elasticsearch
from fuel_analytics.api.app import app
class ElasticSearchClient(object):
def __init__(self):
self.es = Elasticsearch(hosts=[
{'host': app.config['ELASTIC_HOST'],
'port': app.config['ELASTIC_PORT'],
'use_ssl': app.config['ELASTIC_USE_SSL']}
])
def fetch_all_data(self, query, doc_type, show_fields=(),
sort=({"_id": {"order": "asc"}},), chunk_size=100):
"""Gets structures from the Elasticsearch by querying by chunk_size
number of structures
:param query: Elasticsearch query
:param doc_type: requested document type
:param show_fields: tuple of selected fields.
All fields will be fetched, if show_fields is not set
:param sort: tuple of fields for sorting
:param chunk_size: size of fetched structures chunk
:return: list of fetched structures
"""
received = 0
paged_query = query.copy()
paged_query["from"] = received
paged_query["size"] = chunk_size
if sort:
paged_query["sort"] = sort
if show_fields:
paged_query["_source"] = show_fields
while True:
app.logger.debug("Fetching chunk from ElasticSearch. "
"From: %d, size: %d",
paged_query["from"], chunk_size)
response = self.es.search(index=app.config['ELASTIC_INDEX_FUEL'],
doc_type=doc_type, body=paged_query)
total = response["hits"]["total"]
received += chunk_size
paged_query["from"] = received
for d in response["hits"]["hits"]:
yield d["_source"]
app.logger.debug("Chunk from ElasticSearch is fetched. "
"From: %d, size: %d",
paged_query["from"], chunk_size)
if total <= received:
break
def get_structures(self):
app.logger.debug("Fetching structures info from ElasticSearch")
query = {"query": {"match_all": {}}}
doc_type = app.config['ELASTIC_DOC_TYPE_STRUCTURE']
return self.fetch_all_data(query, doc_type)

View File

@ -14,98 +14,104 @@
from fuel_analytics.api.common import consts
INSTALLATION_INFO_SKELETON = {
'allocated_nodes_num': None,
'clusters': [
{
'attributes': {
'assign_public_to_all_nodes': None,
'ceilometer': None,
'debug_mode': None,
'ephemeral_ceph': None,
'heat': None,
'images_ceph': None,
'images_vcenter': None,
'iser': None,
'kernel_params': None,
'libvirt_type': None,
'mellanox': None,
'mellanox_vf_num': None,
'murano': None,
'nsx': None,
'nsx_replication': None,
'nsx_transport': None,
'objects_ceph': None,
'osd_pool_size': None,
'provision_method': None,
'sahara': None,
'syslog_transport': None,
'use_cow_images': None,
'vcenter': None,
'vlan_splinters': None,
'vlan_splinters_ovs': None,
'volumes_ceph': None,
'volumes_lvm': None,
'volumes_vmdk': None
},
'fuel_version': None,
'id': None,
'is_customized': None,
'mode': None,
'net_provider': None,
'node_groups': [{'id': None, 'nodes': [{}]}],
'nodes': [
{
'bond_interfaces': [
{'id': None, 'slaves': [{}]}
],
'error_type': None,
'group_id': None,
'id': None,
'manufacturer': None,
'nic_interfaces': [{'id': None}],
'online': None,
'os': None,
'pending_addition': None,
'pending_deletion': None,
'pending_roles': [{}],
'platform_name': None,
'roles': [{}],
'status': None
}
],
'nodes_num': None,
'openstack_info': {
'images': [{'size': None, 'unit': None}],
'nova_servers_count': None
},
'release': {'name': None, 'os': None, 'version': None},
'status': None
'structure': {
'allocated_nodes_num': None,
'clusters': [
{
'attributes': {
'assign_public_to_all_nodes': None,
'ceilometer': None,
'debug_mode': None,
'ephemeral_ceph': None,
'heat': None,
'images_ceph': None,
'images_vcenter': None,
'iser': None,
'kernel_params': None,
'libvirt_type': None,
'mellanox': None,
'mellanox_vf_num': None,
'murano': None,
'nsx': None,
'nsx_replication': None,
'nsx_transport': None,
'objects_ceph': None,
'osd_pool_size': None,
'provision_method': None,
'sahara': None,
'syslog_transport': None,
'use_cow_images': None,
'vcenter': None,
'vlan_splinters': None,
'vlan_splinters_ovs': None,
'volumes_ceph': None,
'volumes_lvm': None,
'volumes_vmdk': None
},
'fuel_version': None,
'id': None,
'is_customized': None,
'mode': None,
'net_provider': None,
'node_groups': [{'id': None, 'nodes': [{}]}],
'nodes': [
{
'bond_interfaces': [
{'id': None, 'slaves': [{}]}
],
'error_type': None,
'group_id': None,
'id': None,
'manufacturer': None,
'nic_interfaces': [{'id': None}],
'online': None,
'os': None,
'pending_addition': None,
'pending_deletion': None,
'pending_roles': [{}],
'platform_name': None,
'roles': [{}],
'status': None
}
],
'nodes_num': None,
'network_configuration': {
'segmentation_type': None,
'net_l23_provider': None,
'net_manager': None,
'fixed_networks_vlan_start': None,
'fixed_network_size': None,
'fixed_networks_amount': None
},
'release': {'name': None, 'os': None, 'version': None},
'status': None
}
],
'clusters_num': None,
'fuel_release': {
'api': None,
'astute_sha': None,
'build_id': None,
'build_number': None,
'feature_groups': [{}],
'fuellib_sha': None,
'fuelmain_sha': None,
'nailgun_sha': None,
'ostf_sha': None,
'production': None,
'release': None
},
'unallocated_nodes_num': None,
'user_information': {
'company': None,
'contact_info_provided': None,
'email': None,
'name': None
}
],
'clusters_num': None,
'creation_date': None,
'fuel_release': {
'api': None,
'astute_sha': None,
'build_id': None,
'build_number': None,
'feature_groups': [{}],
'fuellib_sha': None,
'fuelmain_sha': None,
'nailgun_sha': None,
'ostf_sha': None,
'production': None,
'release': None
},
'master_node_uid': None,
'modification_date': None,
'unallocated_nodes_num': None,
'user_information': {
'company': None,
'contact_info_provided': None,
'email': None,
'name': None
}
'creation_date': None
}
OSWL_SKELETONS = {

View File

@ -29,15 +29,12 @@ class StatsToCsv(object):
app.logger.debug("Getting cluster keys paths")
structure_skeleton = INSTALLATION_INFO_SKELETON
structure_key_paths = export_utils.get_keys_paths(structure_skeleton)
clusters = structure_skeleton.get('clusters')
if not clusters:
clusters = [{}]
clusters = structure_skeleton['structure']['clusters']
cluster_skeleton = clusters[0]
# Removing lists of dicts from cluster skeleton
cluster_skeleton.pop('nodes', None)
cluster_skeleton.pop('node_groups', None)
cluster_skeleton.pop('openstack_info', None)
cluster_key_paths = export_utils.get_keys_paths(cluster_skeleton)
result_key_paths = cluster_key_paths + structure_key_paths
@ -66,8 +63,8 @@ class StatsToCsv(object):
return structure_key_paths, cluster_key_paths, result_key_paths
def get_flatten_clusters(self, structure_keys_paths, cluster_keys_paths,
structures):
"""Gets flatten clusters data
inst_structures):
"""Gets flatten clusters data form installation structures collection
:param structure_keys_paths: list of keys paths in the
installation structure
:param cluster_keys_paths: list of keys paths in the cluster
@ -91,10 +88,12 @@ class StatsToCsv(object):
def extract_nodes_platform_name(nodes):
return extract_nodes_fields('platform_name', nodes)
for structure in structures:
for inst_structure in inst_structures:
structure = inst_structure.structure
clusters = structure.pop('clusters', [])
flatten_structure = export_utils.get_flatten_data(
structure_keys_paths, structure)
structure_keys_paths, inst_structure)
for cluster in clusters:
flatten_cluster = export_utils.get_flatten_data(
cluster_keys_paths, cluster)
@ -114,14 +113,13 @@ class StatsToCsv(object):
app.logger.debug("Flatten clusters info is got")
def export_clusters(self, structures):
def export_clusters(self, inst_structures):
app.logger.info("Export clusters info into CSV started")
structure_keys_paths, cluster_keys_paths, csv_keys_paths = \
self.get_cluster_keys_paths()
flatten_clusters = self.get_flatten_clusters(structure_keys_paths,
cluster_keys_paths,
structures)
result = export_utils.flatten_data_as_csv(csv_keys_paths,
flatten_clusters)
flatten_clusters = self.get_flatten_clusters(
structure_keys_paths, cluster_keys_paths, inst_structures)
result = export_utils.flatten_data_as_csv(
csv_keys_paths, flatten_clusters)
app.logger.info("Export clusters info into CSV finished")
return result

View File

@ -0,0 +1,163 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
from datetime import timedelta
import random
import uuid
from fuel_analytics.test.base import BaseTest
from fuel_analytics.api.app import db
from fuel_analytics.api.db.model import InstallationStructure
class InstStructureTest(BaseTest):
def gen_id(self, id_range=(0, 1000000)):
return random.randint(*id_range)
def generate_node(
self,
roles_range=(0, 5),
node_roles=('compute', 'controller', 'cinder', 'ceph-osd',
'zabbix', 'mongo'),
oses=('Ubuntu', 'CentOs', 'Ubuntu LTS XX'),
node_statuses = ('ready', 'discover', 'provisioning',
'provisioned', 'deploying', 'error'),
manufacturers = ('Dell Inc.', 'VirtualBox', 'QEMU',
'VirtualBox', 'Supermicro', 'Cisco Systems Inc',
'KVM', 'VMWARE', 'HP')
):
roles = []
for _ in xrange(random.randint(*roles_range)):
roles.append(random.choice(node_roles))
node = {
'id': self.gen_id(),
'roles': roles,
'os': random.choice(oses),
'status': random.choice(node_statuses),
'manufacturer': random.choice(manufacturers)
}
return node
def generate_cluster(
self,
nodes_range=(0, 100),
oses=('Ubuntu', 'CentOs', 'Ubuntu LTS XX'),
release_names=('Juno on CentOS 6.5', 'Juno on Ubuntu 12.04.4'),
release_versions=('6.0 TechPreview', '6.0 GA', '6.1'),
cluster_statuses=('new', 'deployment', 'stopped', 'operational',
'error', 'remove', 'update', 'update_error'),
libvirt_names=('qemu', 'kvm', 'vCenter'),
segmentation_types=('vlan', 'gre')
):
nodes_num = random.randint(*nodes_range)
cluster = {
'id': self.gen_id(),
'nodes_num': nodes_num,
'release': {
'os': random.choice(oses),
'name': random.choice(release_names),
'version': random.choice(release_versions),
},
'status': random.choice(cluster_statuses),
'nodes': [],
'attributes': {
'libvirt_type': random.choice(libvirt_names),
'heat': random.choice((True, False)),
},
'network_configuration': {
'segmentation_type': random.choice(segmentation_types)
}
}
network_configuration = self.generate_network_configuration()
cluster.update(network_configuration)
for _ in xrange(nodes_num):
cluster['nodes'].append(self.generate_node())
return cluster
def generate_network_configuration(self):
return random.choice((
{'network_configuration': {
'segmentation_type': random.choice(("gre", "vlan")),
'net_l23_provider': random.choice(("ovw", "nsx")),
}},
{'network_configuration': {
'net_manager': random.choice(('FlatDHCPManager',
'VlanManager')),
'fixed_networks_vlan_start': random.choice((2, 3, None)),
'fixed_network_size': random.randint(0, 255),
'fixed_networks_amount': random.randint(0, 10),
}},
{'network_configuration': {}},
{}
))
def generate_structure(self, clusters_num_range=(0, 10),
unallocated_nodes_num_range=(0, 20)):
clusters_num = random.randint(*clusters_num_range)
fuel_release = {
'release': random.choice(("6.0-techpreview", "6.0-ga")),
'api': 1,
'nailgun_sha': "Unknown build",
'astute_sha': "Unknown build",
'fuellib_sha': "Unknown build",
'ostf_sha': "Unknown build",
'feature_groups': ['experimental', 'mirantis']
}
structure = {
'fuel_release': fuel_release,
'clusters_num': clusters_num,
'clusters': [],
'unallocated_nodes_num_range': random.randint(
*unallocated_nodes_num_range),
'allocated_nodes_num': 0
}
for _ in xrange(clusters_num):
cluster = self.generate_cluster()
structure['clusters'].append(cluster)
structure['allocated_nodes_num'] += cluster['nodes_num']
return structure
def generate_inst_structures(self, installations_num=100,
creation_date_range=(1, 10),
modification_date_range=(1, 10)):
for _ in xrange(installations_num):
mn_uid = '{}'.format(uuid.uuid4())
structure = self.generate_structure()
creation_date = datetime.utcnow() - timedelta(
days=random.randint(*creation_date_range))
modification_date = datetime.utcnow() - timedelta(
days=random.randint(*modification_date_range))
obj = InstallationStructure(
master_node_uid=mn_uid,
structure=structure,
creation_date=creation_date,
modification_date=modification_date
)
yield obj
def get_saved_inst_structures(self, *args, **kwargs):
inst_structs = self.generate_inst_structures(*args, **kwargs)
result = []
for inst_struct in inst_structs:
db.session.add(inst_struct)
result.append(inst_struct)
db.session.commit()
return result

View File

@ -1,42 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from fuel_analytics.test.base import ElasticTest
from fuel_analytics.api.app import app
from fuel_analytics.api.resources.utils.es_client import ElasticSearchClient
class EsClientTest(ElasticTest):
def test_fetch_all_data(self):
installations_num = 160
self.generate_data(installations_num=installations_num)
query = {"query": {"match_all": {}}}
es_client = ElasticSearchClient()
doc_type = app.config['ELASTIC_DOC_TYPE_STRUCTURE']
resp = es_client.fetch_all_data(query, doc_type,
show_fields=('master_node_uid',),
chunk_size=installations_num / 10 + 1)
mn_uids = set([row['master_node_uid'] for row in resp])
self.assertEquals(installations_num, len(mn_uids))
def test_get_structures(self):
installations_num = 100
self.generate_data(installations_num=installations_num)
es_client = ElasticSearchClient()
resp = es_client.get_structures()
mn_uids = set([row['master_node_uid'] for row in resp])
self.assertEquals(installations_num, len(mn_uids))

View File

@ -18,14 +18,15 @@ import csv
import six
import types
from fuel_analytics.test.base import ElasticTest
from fuel_analytics.test.api.resources.utils.inst_structure_test import \
InstStructureTest
from fuel_analytics.test.base import DbTest
from fuel_analytics.api.resources.utils.es_client import ElasticSearchClient
from fuel_analytics.api.resources.utils import export_utils
from fuel_analytics.api.resources.utils.stats_to_csv import StatsToCsv
class StatsToCsvExportTest(ElasticTest):
class StatsToCsvExportTest(InstStructureTest, DbTest):
def test_get_cluster_keys_paths(self):
exporter = StatsToCsv()
@ -40,48 +41,29 @@ class StatsToCsvExportTest(ElasticTest):
self.assertTrue(['manufacturer_2' in csv_keys_paths])
self.assertTrue(['attributes', 'heat'] in csv_keys_paths)
def test_new_param_handled_by_structures_skeleton(self):
installations_num = 5
self.generate_data(installations_num=installations_num)
# Mixing new pram into structures
es_client = ElasticSearchClient()
structures = es_client.get_structures()
self.assertTrue(isinstance(structures, types.GeneratorType))
structures = list(structures)
structures[-1]['mixed_param'] = 'xx'
skeleton = export_utils.get_data_skeleton(structures)
self.assertTrue('mixed_param' in skeleton)
def test_get_flatten_clusters(self):
installations_num = 200
self.generate_data(installations_num=installations_num)
es_client = ElasticSearchClient()
structures = es_client.get_structures()
inst_structures = self.get_saved_inst_structures(
installations_num=installations_num)
exporter = StatsToCsv()
structure_paths, cluster_paths, csv_paths = \
exporter.get_cluster_keys_paths()
flatten_clusters = exporter.get_flatten_clusters(structure_paths,
cluster_paths,
structures)
flatten_clusters = exporter.get_flatten_clusters(
structure_paths, cluster_paths, inst_structures)
self.assertTrue(isinstance(flatten_clusters, types.GeneratorType))
for flatten_cluster in flatten_clusters:
self.assertEquals(len(csv_paths), len(flatten_cluster))
def test_flatten_data_as_csv(self):
installations_num = 100
self.generate_data(installations_num=installations_num)
es_client = ElasticSearchClient()
structures = es_client.get_structures()
inst_structures = self.get_saved_inst_structures(
installations_num=installations_num)
exporter = StatsToCsv()
structure_paths, cluster_paths, csv_paths = \
exporter.get_cluster_keys_paths()
flatten_clusters = exporter.get_flatten_clusters(structure_paths,
cluster_paths,
structures)
flatten_clusters = exporter.get_flatten_clusters(
structure_paths, cluster_paths, inst_structures)
self.assertTrue(isinstance(flatten_clusters, types.GeneratorType))
result = export_utils.flatten_data_as_csv(csv_paths, flatten_clusters)
self.assertTrue(isinstance(result, types.GeneratorType))
@ -101,26 +83,22 @@ class StatsToCsvExportTest(ElasticTest):
def test_unicode_as_csv(self):
installations_num = 10
self.generate_data(installations_num=installations_num)
es_client = ElasticSearchClient()
structures = es_client.get_structures()
inst_structures = self.get_saved_inst_structures(
installations_num=installations_num)
exporter = StatsToCsv()
structure_paths, cluster_paths, csv_paths = \
exporter.get_cluster_keys_paths()
flatten_clusters = exporter.get_flatten_clusters(structure_paths,
cluster_paths,
structures)
flatten_clusters = exporter.get_flatten_clusters(
structure_paths, cluster_paths, inst_structures)
flatten_clusters = list(flatten_clusters)
flatten_clusters[1][0] = u'эюя'
list(export_utils.flatten_data_as_csv(csv_paths, flatten_clusters))
def test_export_clusters(self):
installations_num = 100
self.generate_data(installations_num=installations_num)
es_client = ElasticSearchClient()
structures = es_client.get_structures()
inst_structures = self.get_saved_inst_structures(
installations_num=installations_num)
exporter = StatsToCsv()
result = exporter.export_clusters(structures)
result = exporter.export_clusters(inst_structures)
self.assertTrue(isinstance(result, types.GeneratorType))

View File

@ -24,8 +24,6 @@ from fuel_analytics.api.log import init_logger
app.config.from_object('fuel_analytics.api.config.Testing')
init_logger()
from migration.test.base import ElasticTest as MigrationElasticTest
class BaseTest(TestCase):
@ -40,10 +38,6 @@ class BaseTest(TestCase):
self.assertEquals(code, resp.status_code)
class ElasticTest(MigrationElasticTest):
pass
class DbTest(BaseTest):
def setUp(self):

View File

@ -1,4 +1,3 @@
elasticsearch==1.2.0
psycopg2==2.5.4
uWSGI==2.0.9
Flask==0.10.1

View File

@ -6,9 +6,7 @@ envlist = py27,pep8
[testenv]
usedevelop = True
install_command = pip install {packages}
setenv =
VIRTUAL_ENV={envdir}
PYTHONPATH={toxinidir}/../migration
setenv = VIRTUAL_ENV={envdir}
deps = -r{toxinidir}/test-requirements.txt
commands =
nosetests {posargs:fuel_analytics/test}