Merge "Add ElasticSearch exporter plugin"

This commit is contained in:
Zuul 2017-10-20 09:10:54 +00:00 committed by Gerrit Code Review
commit 0d06ecd4d4
10 changed files with 1276 additions and 5 deletions

View File

@ -1008,15 +1008,15 @@ class TaskCommands(object):
@cliutils.args("--type", dest="output_type", type=str,
required=True,
help="Report type. Out-of-the-box "
"types: JSON, HTML, HTML-Static, JUnit-XML. "
"HINT: You can list all types, executing `rally "
"plugin list --plugin-base TaskExporter` "
"types: JSON, HTML, HTML-Static, Elastic, JUnit-XML. "
"HINT: You can list all types, executing "
"`rally plugin list --plugin-base TaskExporter` "
"command.")
@cliutils.args("--to", dest="output_dest", type=str,
metavar="<dest>", required=False,
help="Report destination. Can be a path to a file (in case"
" of JSON, HTML, HTML-Static, JUnit-XML, etc. types)"
" to save the report to or a connection string."
" of JSON, HTML, HTML-Static, JUnit-XML, Elastic etc. "
"types) to save the report to or a connection string."
" It depends on the report type."
)
@envutils.with_default_task_id

View File

@ -0,0 +1,158 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import json
import requests
import six
from rally.common import logging
from rally import exceptions
LOG = logging.getLogger(__name__)
class ElasticSearchClient(object):
"""The helper class for communication with ElasticSearch 2.* and 5.*"""
# a number of documents to push to the cluster at once.
CHUNK_LENGTH = 10000
def __init__(self, url):
self._url = url or "http://localhost:9200"
self._version = None
@staticmethod
def _check_response(resp, action=None):
if resp.status_code in (200, 201):
return
# it is an error. let's try to find the reason
reason = None
try:
data = resp.json()
except ValueError:
# it is ok
pass
else:
if "error" in data:
if isinstance(data["error"], dict):
reason = data["error"].get("reason", "")
else:
reason = data["error"]
reason = reason or resp.text or "n/a"
action = action or "connect to"
raise exceptions.RallyException(
"[HTTP %s] Failed to %s ElasticSearch cluster: %s" %
(resp.status_code, action, reason))
def version(self):
"""Get version of the ElasticSearch cluster."""
if self._version is None:
self.info()
return self._version
def info(self):
"""Retrieve info about the ElasticSearch cluster."""
resp = requests.get(self._url)
self._check_response(resp)
err_msg = "Failed to retrieve info about the ElasticSearch cluster: %s"
try:
data = resp.json()
except ValueError:
LOG.debug("Return data from %s: %s" % (self._url, resp.text))
raise exceptions.RallyException(
err_msg % "The return data doesn't look like a json.")
version = data.get("version", {}).get("number")
if not version:
LOG.debug("Return data from %s: %s" % (self._url, resp.text))
raise exceptions.RallyException(
err_msg % "Failed to parse the received data.")
self._version = version
if self._version.startswith("2"):
data["version"]["build_date"] = data["version"].pop(
"build_timestamp")
return data
def push_documents(self, documents):
"""Push documents to the ElasticSearch cluster using bulk API.
:param documents: a list of documents to push
"""
LOG.debug("Pushing %s documents by chunks (up to %s documents at once)"
" to ElasticSearch." %
# dividing numbers by two, since each documents has 2 lines
# in `documents` (action and document itself).
(len(documents) / 2, self.CHUNK_LENGTH / 2))
for pos in six.moves.range(0, len(documents), self.CHUNK_LENGTH):
data = "\n".join(documents[pos:pos + self.CHUNK_LENGTH]) + "\n"
raw_resp = requests.post(self._url + "/_bulk", data=data)
self._check_response(raw_resp, action="push documents to")
LOG.debug("Successfully pushed %s documents." %
len(raw_resp.json()["items"]))
def list_indices(self):
"""List all indices."""
resp = requests.get(self._url + "/_cat/indices?v")
self._check_response(resp, "list the indices at")
return resp.text.rstrip().split(" ")
def create_index(self, name, doc_type, properties):
"""Create an index.
There are two very different ways to search strings. You can either
search whole values, that we often refer to as keyword search, or
individual tokens, that we usually refer to as full-text search.
In ElasticSearch 2.x `string` data type is used for these cases whereas
ElasticSearch 5.0 the `string` data type was replaced by two new types:
`keyword` and `text`. Since it is hard to predict the destiny of
`string` data type and support of 2 formats of input data, the
properties should be transmitted in ElasticSearch 5.x format.
"""
if self.version().startswith("2."):
properties = copy.deepcopy(properties)
for spec in properties.values():
if spec.get("type", None) == "text":
spec["type"] = "string"
elif spec.get("type", None) == "keyword":
spec["type"] = "string"
spec["index"] = "not_analyzed"
data = json.dumps({"mappings": {doc_type: {"properties": properties}}})
resp = requests.put(self._url + "/%s" % name, data=data)
self._check_response(resp, "create index at")
def check_document(self, index, doc_id, doc_type="data"):
"""Check for the existence of a document.
:param index: The index of a document
:param doc_id: The ID of a document
:param doc_type: The type of a document (Defaults to data)
"""
resp = requests.head("%(url)s/%(index)s/%(type)s/%(id)s" %
{"url": self._url,
"index": index,
"type": doc_type,
"id": doc_id})
if resp.status_code == 200:
return True
elif resp.status_code == 404:
return False
else:
self._check_response(resp, "check the index at")

View File

@ -0,0 +1,380 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import datetime as dt
import itertools
import json
import os
from rally.common import logging
from rally.common import validation
from rally import consts
from rally import exceptions
from rally.plugins.common.exporters.elastic import client
from rally.task import exporter
LOG = logging.getLogger(__name__)
@validation.configure("es_exporter_destination")
class Validator(validation.Validator):
"""Validates the destination for ElasticSearch exporter.
In case when the destination is ElasticSearch cluster, the version of it
should be 2.* or 5.*
"""
def validate(self, context, config, plugin_cls, plugin_cfg):
destination = plugin_cfg["destination"]
if destination and (not destination.startswith("http://")
and not destination.startswith("https://")):
# it is a path to a local file
return
es = client.ElasticSearchClient(destination)
try:
version = es.version()
except exceptions.RallyException as e:
# re-raise a proper exception to hide redundant traceback
self.fail(e.format_message())
if not (version.startswith("2.") or version.startswith("5.")):
self.fail("The unsupported version detected %s." % version)
@validation.add("es_exporter_destination")
@exporter.configure("elastic")
class ElasticSearchExporter(exporter.TaskExporter):
"""Exports task results to the ElasticSearch 2.x or 5.x clusters.
The exported data includes:
* Task basic information such as title, description, status,
deployment uuid, etc.
See rally_task_v1_data index.
* Workload information such as scenario name and configuration, runner
type and configuration, time of the start load, success rate, sla
details in case of errors, etc.
See rally_workload_v1_data index.
* Separate documents for all atomic actions.
See rally_atomic_action_data_v1 index.
The destination can be a remote server. In this case specify it like:
https://elastic:changeme@example.com
Or we can dump documents to the file. The destination should look like:
/home/foo/bar.txt
In case of an empty destination, the http://localhost:9200 destination
will be used.
"""
TASK_INDEX = "rally_task_data_v1"
WORKLOAD_INDEX = "rally_workload_data_v1"
AA_INDEX = "rally_atomic_action_data_v1"
INDEX_SCHEMAS = {
TASK_INDEX: {
"task_uuid": {"type": "keyword"},
"deployment_uuid": {"type": "keyword"},
"title": {"type": "text"},
"description": {"type": "text"},
"status": {"type": "keyword"},
"pass_sla": {"type": "boolean"},
"tags": {"type": "keyword"}
},
WORKLOAD_INDEX: {
"scenario_name": {"type": "keyword"},
"scenario_cfg": {"type": "keyword"},
"description": {"type": "text"},
"runner_name": {"type": "keyword"},
"runner_cfg": {"type": "keyword"},
"contexts": {"type": "keyword"},
"task_uuid": {"type": "keyword"},
"subtask_uuid": {"type": "keyword"},
"started_at": {"type": "date"},
"load_duration": {"type": "long"},
"full_duration": {"type": "long"},
"pass_sla": {"type": "boolean"},
"success_rate": {"type": "float"},
"sla_details": {"type": "text"}
},
AA_INDEX: {
"action_name": {"type": "keyword"},
"workload_uuid": {"type": "keyword"},
"scenario_cfg": {"type": "keyword"},
"contexts": {"type": "keyword"},
"runner_name": {"type": "keyword"},
"runner_cfg": {"type": "keyword"},
"success": {"type": "boolean"},
"duration": {"type": "float"},
"started_at": {"type": "date"},
"finished_at": {"type": "date"},
"parent": {"type": "keyword"},
"error": {"type": "keyword"}
}
}
def __init__(self, tasks_results, output_destination, api=None):
super(ElasticSearchExporter, self).__init__(tasks_results,
output_destination,
api=api)
self._report = []
self._remote = (
output_destination is None or (
output_destination.startswith("http://")
or self.output_destination.startswith("https://")))
if self._remote:
self._client = client.ElasticSearchClient(self.output_destination)
@staticmethod
def _pack(obj):
import morph
return sorted(["%s=%s" % (k, v)
for k, v in morph.flatten(obj).items()])
def _add_index(self, index, body, doc_id=None, doc_type="data"):
"""Create a document for the specified index with specified id.
:param index: The name of the index
:param body: The document. Here is the report of (sla,
scenario, iteration and atomic action)
:param doc_id: Document ID. Here we use task/subtask/workload uuid
:param doc_type: The type of document
"""
self._report.append(
json.dumps(
# use OrderedDict to make the report more unified
{"index": collections.OrderedDict([
("_index", index),
("_type", doc_type),
("_id", doc_id)])},
sort_keys=False))
self._report.append(json.dumps(body))
def _ensure_indices(self):
"""Check available indices and create require ones if they missed."""
available_index = set(self._client.list_indices())
missed_index = {self.TASK_INDEX, self.WORKLOAD_INDEX,
self.AA_INDEX} - available_index
for index in missed_index:
LOG.debug("Creating '%s' index." % index)
self._client.create_index(index, doc_type="data",
properties=self.INDEX_SCHEMAS[index])
@staticmethod
def _make_action_report(name, workload_id, workload, duration,
started_at, finished_at, parent, error):
# NOTE(andreykurilin): actually, this method just creates a dict object
# but we need to have the same format at two places, so the template
# transformed into a method.
parent = parent[0] if parent else None
return {"action_name": name,
"workload_uuid": workload_id,
"scenario_cfg": workload["scenario_cfg"],
"contexts": workload["contexts"],
"runner_name": workload["runner_name"],
"runner_cfg": workload["runner_cfg"],
"success": not bool(error),
"duration": duration,
"started_at": started_at,
"finished_at": finished_at,
"parent": parent,
"error": error}
def _process_atomic_actions(self, itr, workload, workload_id,
atomic_actions=None, _parent=None, _depth=0,
_cache=None):
"""Process atomic actions of an iteration
:param atomic_actions: A list with an atomic actions
:param itr: The iteration data
:param workload: The workload report
:param workload_id: The workload UUID
:param _parent: An inner parameter which is used for pointing to the
parent atomic action
:param _depth: An inner parameter which is used to mark the level of
depth while parsing atomic action children
:param _cache: An inner parameter which is used to avoid conflicts in
IDs of atomic actions of a single iteration.
"""
if _depth >= 3:
return
cache = _cache or {}
if atomic_actions is None:
atomic_actions = itr["atomic_actions"]
act_id_tmpl = "%(itr_id)s_action_%(action_name)s_%(num)s"
for i, action in enumerate(atomic_actions, 1):
cache.setdefault(action["name"], 0)
act_id = act_id_tmpl % {
"itr_id": itr["id"],
"action_name": action["name"],
"num": cache[action["name"]]}
cache[action["name"]] += 1
started_at = dt.datetime.utcfromtimestamp(action["started_at"])
finished_at = dt.datetime.utcfromtimestamp(action["finished_at"])
started_at = started_at.strftime(consts.TimeFormat.ISO8601)
finished_at = finished_at.strftime(consts.TimeFormat.ISO8601)
action_report = self._make_action_report(
name=action["name"],
workload_id=workload_id,
workload=workload,
duration=(action["finished_at"] - action["started_at"]),
started_at=started_at,
finished_at=finished_at,
parent=_parent,
error=(itr["error"] if action.get("failed", False) else None)
)
self._add_index(self.AA_INDEX, action_report,
doc_id=act_id)
self._process_atomic_actions(
atomic_actions=action["children"],
itr=itr,
workload=workload,
workload_id=workload_id,
_parent=(act_id, action_report),
_depth=(_depth + 1),
_cache=cache)
if itr["error"] and (
# the case when it is a top level of the scenario and the
# first fails the item which is not wrapped by AtomicTimer
(not _parent and not atomic_actions) or
# the case when it is a top level of the scenario and and
# the item fails after some atomic actions completed
(not _parent and atomic_actions and
not atomic_actions[-1].get("failed", False)) or
# the case when the item fails after some atomic actions
# completed and there is a root atomic
(_parent and atomic_actions and not _parent[1]["success"] and
not atomic_actions[-1].get("failed", False))):
act_id = act_id_tmpl % {
"itr_id": itr["id"],
"action_name": "no-name-action",
"num": 0}
# Since the action had not be wrapped by AtomicTimer, we cannot
# make any assumption about it's duration (start_time) so let's use
# finished_at timestamp of iteration with 0 duration
timestamp = (itr["timestamp"] + itr["duration"] +
itr["idle_duration"])
timestamp = dt.datetime.utcfromtimestamp(timestamp)
timestamp = timestamp.strftime(consts.TimeFormat.ISO8601)
action_report = self._make_action_report(
name="no-name-action",
workload_id=workload_id,
workload=workload,
duration=0,
started_at=timestamp,
finished_at=timestamp,
parent=_parent,
error=itr["error"]
)
self._add_index(self.AA_INDEX, action_report, doc_id=act_id)
def generate(self):
if self._remote:
self._ensure_indices()
for task in self.tasks_results:
if self._remote:
if self._client.check_document(self.TASK_INDEX, task["uuid"]):
raise exceptions.RallyException(
"Failed to push the task %s to the ElasticSearch "
"cluster. The document with such UUID already exists" %
task["uuid"])
task_report = {
"task_uuid": task["uuid"],
"deployment_uuid": task["deployment_uuid"],
"title": task["title"],
"description": task["description"],
"status": task["status"],
"pass_sla": task["pass_sla"],
"tags": task["tags"]
}
self._add_index(self.TASK_INDEX, task_report,
doc_id=task["uuid"])
# NOTE(andreykurilin): The subtasks do not have much logic now, so
# there is no reason to save the info about them.
for workload in itertools.chain(
*[s["workloads"] for s in task["subtasks"]]):
durations = workload["statistics"]["durations"]
success_rate = durations["total"]["data"]["success"]
# cut the % char and transfrom to the float value
success_rate = float(success_rate[:-1]) / 100.0
started_at = workload["start_time"]
if started_at:
started_at = dt.datetime.utcfromtimestamp(started_at)
started_at = started_at.strftime(consts.TimeFormat.ISO8601)
workload_report = {
"scenario_name": workload["name"],
"scenario_cfg": self._pack(workload["args"]),
"description": workload["description"],
"runner_name": workload["runner_type"],
"runner_cfg": self._pack(workload["runner"]),
"contexts": self._pack(workload["context"]),
"task_uuid": workload["task_uuid"],
"subtask_uuid": workload["subtask_uuid"],
"started_at": started_at,
"load_duration": workload["load_duration"],
"full_duration": workload["full_duration"],
"pass_sla": workload["pass_sla"],
"success_rate": success_rate,
"sla_details": [s["detail"]
for s in workload["sla_results"]["sla"]
if not s["success"]]}
# do we need to store hooks ?!
self._add_index(self.WORKLOAD_INDEX, workload_report,
doc_id=workload["uuid"])
# Iterations
for idx, itr in enumerate(workload.get("data", []), 1):
itr["id"] = "%(uuid)s_iter_%(num)s" % {
"uuid": workload["uuid"],
"num": str(idx)}
self._process_atomic_actions(
itr=itr,
workload=workload_report,
workload_id=workload["uuid"])
if self._remote:
LOG.debug("The info of ElasticSearch cluster to which the results "
"will be exported: %s" % self._client.info())
self._client.push_documents(self._report)
msg = ("Successfully exported results to ElasticSearch at url "
"'%s'" % self.output_destination)
return {"print": msg}
else:
# a new line is required in the end of the file.
report = "\n".join(self._report) + "\n"
return {"files": {self.output_destination: report},
"open": "file://" + os.path.abspath(
self.output_destination)}

View File

@ -58,6 +58,7 @@
<li><a href="rally-plot/results.json.gz">Raw results (JSON)</a> <code>$ rally task results</code>
<li><a href="rally-plot/new_results.json.gz">Raw results (JSON 1.0)</a> <code>$ rally task report --json</code>
<li><a href="rally-plot/junit.xml.gz">JUNIT-XML report</a> <code>$ rally task export --type junit-xml</code>
<li><a href="rally-plot/elasticsearch.txt.gz">ElasticSearch x5 dump file</a> <code>$ rally task export --type elastic</code>
</ul>
<h2>About Rally</h2>

View File

@ -7,6 +7,7 @@ alembic>=0.8.10,<=0.9.5 # MIT
decorator>=3.4.0,<=4.1.2 # new BSD License
Jinja2>=2.8,!=2.9.0,!=2.9.1,!=2.9.2,!=2.9.3,!=2.9.4,<=2.9.6 # BSD
jsonschema>=2.0.0,!=2.5.0,<3.0.0 # MIT
morph
netaddr>=0.7.13,!=0.7.16,<=0.7.19 # BSD
oslo.config>=4.0.0,!=4.3.0,!=4.4.0,<=4.12.0 # Apache Software License
oslo.db>=4.24.0,<=4.26.0 # Apache Software License

View File

@ -139,6 +139,8 @@ function run () {
gzip -9 rally-plot/new_results.json
rally task export --type junit-xml --to rally-plot/junit.xml
gzip -9 rally-plot/junit.xml
rally task export --type elastic --to rally-plot/elasticsearch.txt
gzip -9 rally-plot/elasticsearch.txt
# NOTE(stpierre): if the sla check fails, we still want osresources.py
# to run, so we turn off -e and save the return value

View File

@ -0,0 +1,223 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import mock
from rally import exceptions
from rally.plugins.common.exporters.elastic import client
from tests.unit import test
PATH = "rally.plugins.common.exporters.elastic.client"
class ElasticSearchClientTestCase(test.TestCase):
def test_check_response(self):
es = client.ElasticSearchClient(None)
resp = mock.Mock(status_code=200)
self.assertIsNone(es._check_response(resp))
resp.status_code = 300
resp.json.side_effect = ValueError("Foo!!!!")
resp.text = "something"
e = self.assertRaises(exceptions.RallyException,
es._check_response, resp)
self.assertEqual("[HTTP 300] Failed to connect to ElasticSearch "
"cluster: something", e.format_message())
resp.json = mock.Mock(return_value={"error": {
"reason": "I'm too lazy to process this request."}})
e = self.assertRaises(exceptions.RallyException,
es._check_response, resp)
self.assertEqual("[HTTP 300] Failed to connect to ElasticSearch "
"cluster: I'm too lazy to process this request.",
e.format_message())
@mock.patch("%s.ElasticSearchClient.info" % PATH)
def test_version(self, mock_info):
es = client.ElasticSearchClient(None)
es._version = "foo"
self.assertEqual("foo", es.version())
self.assertFalse(mock_info.called)
es._version = None
self.assertIsNone(es.version())
mock_info.assert_called_once_with()
@mock.patch("%s.requests.get" % PATH)
def test_info(self, mock_requests_get):
resp = mock_requests_get.return_value
resp.status_code = 200
data = {"version": {"number": "5.6.1"}}
resp.json.return_value = data
es = client.ElasticSearchClient(None)
self.assertEqual(data, es.info())
self.assertEqual("5.6.1", es._version)
mock_requests_get.assert_called_once_with("http://localhost:9200")
# check unification
data = {"version": {"number": "2.4.1",
"build_timestamp": "timestamp"}}
resp.json.return_value = data
self.assertEqual(
{"version": {"number": "2.4.1",
"build_date": "timestamp"}},
es.info())
@mock.patch("%s.ElasticSearchClient._check_response" % PATH)
@mock.patch("%s.requests.get" % PATH)
def test_info_fails(self, mock_requests_get, mock__check_response):
es = client.ElasticSearchClient(None)
# case #1 - _check_response raises exception. it should not be caught
exc = KeyError("foo")
mock__check_response.side_effect = exc
e = self.assertRaises(KeyError, es.info)
self.assertEqual(exc, e)
mock__check_response.reset_mock()
mock__check_response.side_effect = None
# case #2 - the response is ok, but the data is not a json-like obj
resp = mock_requests_get.return_value
resp.json.side_effect = ValueError()
es = client.ElasticSearchClient(None)
e = self.assertRaises(exceptions.RallyException, es.info)
self.assertIn("The return data doesn't look like a json.",
e.format_message())
resp.json.reset_mock()
resp.json.side_effect = None
# case #3 - the return data is a json, but doesn't include the version
resp.json.return_value = {}
e = self.assertRaises(exceptions.RallyException, es.info)
self.assertIn("Failed to parse the received data.",
e.format_message())
@mock.patch("%s.ElasticSearchClient._check_response" % PATH)
@mock.patch("%s.requests.head" % PATH)
def test_check_document(self, mock_requests_head, mock__check_response):
es = client.ElasticSearchClient(None)
resp = mock_requests_head.return_value
resp.status_code = 200
self.assertTrue(es.check_document("foo", "bar"))
mock_requests_head.assert_called_once_with(
"http://localhost:9200/foo/data/bar")
self.assertFalse(mock__check_response.called)
resp.status_code = 404
self.assertFalse(es.check_document("foo", "bar"))
self.assertFalse(mock__check_response.called)
resp.status_code = 300
self.assertIsNone(es.check_document("foo", "bar"))
mock__check_response.assert_called_once_with(resp, mock.ANY)
@mock.patch("%s.requests.post" % PATH)
def test_push_documents(self, mock_requests_post):
mock_requests_post.return_value.status_code = 200
es = client.ElasticSearchClient(None)
# decrease the size of chunks to not generate 10_001 number of docs
es.CHUNK_LENGTH = 2
documents = ["doc1", "doc2", "doc3"]
es.push_documents(documents)
self.assertEqual(
[mock.call("http://localhost:9200/_bulk",
data="doc1\ndoc2\n"),
mock.call("http://localhost:9200/_bulk",
data="doc3\n")],
mock_requests_post.call_args_list
)
@mock.patch("%s.ElasticSearchClient._check_response" % PATH)
@mock.patch("%s.requests.get" % PATH)
def test_list_indices(self, mock_requests_get, mock__check_response):
mock_requests_get.return_value.text = "foo bar\n"
es = client.ElasticSearchClient(None)
self.assertEqual(["foo", "bar"], es.list_indices())
mock_requests_get.assert_called_once_with(
"http://localhost:9200/_cat/indices?v")
mock__check_response.assert_called_once_with(
mock_requests_get.return_value, mock.ANY)
@mock.patch("%s.json.dumps" % PATH)
@mock.patch("%s.ElasticSearchClient._check_response" % PATH)
@mock.patch("%s.requests.put" % PATH)
def test_create_index_es_3(self, mock_requests_put, mock__check_response,
mock_json_dumps):
es = client.ElasticSearchClient(None)
es._version = "3"
i_name = "foo"
i_type = "data"
o_properties = {"prop1": {"type": "text"},
"prop2": {"type": "keyword"}}
# ensure that no transformation with properties will not be performed
properties = copy.deepcopy(o_properties)
es.create_index(i_name, i_type, properties)
mock_requests_put.assert_called_once_with(
"http://localhost:9200/%s" % i_name,
data=mock_json_dumps.return_value)
mock_json_dumps.assert_called_once_with(
{"mappings": {i_type: {"properties": o_properties}}})
mock__check_response.assert_called_once_with(
mock_requests_put.return_value, mock.ANY)
@mock.patch("%s.json.dumps" % PATH)
@mock.patch("%s.ElasticSearchClient._check_response" % PATH)
@mock.patch("%s.requests.put" % PATH)
def test_create_index_es_2(self, mock_requests_put, mock__check_response,
mock_json_dumps):
es = client.ElasticSearchClient(None)
es._version = "2.4.3"
i_name = "foo"
i_type = "data"
o_properties = {"prop1": {"type": "text"},
"prop2": {"type": "keyword"}}
# ensure that no transformation with properties will be performed
properties = copy.deepcopy(o_properties)
es.create_index(i_name, i_type, properties)
mock_requests_put.assert_called_once_with(
"http://localhost:9200/%s" % i_name,
data=mock_json_dumps.return_value)
e_properties = {"prop1": {"type": "string"},
"prop2": {"type": "string",
"index": "not_analyzed"}}
mock_json_dumps.assert_called_once_with(
{"mappings": {i_type: {"properties": e_properties}}})
mock__check_response.assert_called_once_with(
mock_requests_put.return_value, mock.ANY)

View File

@ -0,0 +1,506 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import ddt
import json
import mock
from rally import exceptions
from rally.plugins.common.exporters.elastic import exporter as elastic
from tests.unit import test
PATH = "rally.plugins.common.exporters.elastic.exporter"
class ValidatorTestCase(test.TestCase):
@mock.patch("%s.client.ElasticSearchClient" % PATH)
def test_validate(self, mock_elastic_search_client):
validator = elastic.Validator()
client = mock_elastic_search_client.return_value
validator.validate({}, {}, None, {"destination": "/home/foo"})
self.assertFalse(mock_elastic_search_client.called)
client.version.return_value = "2.5.1"
validator.validate({}, {}, None, {"destination": None})
client.version.return_value = "5.6.2"
validator.validate({}, {}, None, {"destination": None})
client.version.return_value = "1.1.1"
e = self.assertRaises(
elastic.validation.ValidationError,
validator.validate, {}, {}, None, {"destination": None})
self.assertEqual("The unsupported version detected 1.1.1.",
e.message)
exp_e = exceptions.RallyException("foo")
client.version.side_effect = exp_e
actual_e = self.assertRaises(
elastic.validation.ValidationError,
validator.validate, {}, {}, None, {"destination": None})
self.assertEqual(exp_e.format_message(), actual_e.message)
def get_tasks_results():
task_uuid = "2fa4f5ff-7d23-4bb0-9b1f-8ee235f7f1c8"
subtask_uuid = "35166362-0b11-4e74-929d-6988377e2da2"
return [{
"id": 1,
"uuid": task_uuid,
"deployment_uuid": "deployment-uuu-iii-iii-ddd",
"title": "foo",
"description": "bar",
"status": "ok",
"pass_sla": "yup",
"task_duration": "dur",
"tags": ["tag-1", "tag-2"],
"subtasks": [{
"task_uuid": task_uuid,
"subtask_uuid": subtask_uuid,
"workloads": [{
"id": 3,
"position": 0,
"uuid": "4dcd88a5-164b-4431-8b44-3979868116dd",
"task_uuid": task_uuid,
"subtask_uuid": subtask_uuid,
"name": "CinderVolumes.list_volumes",
"args": {"key1": "value1"},
"description": "List all volumes.",
"runner_type": "constant",
"runner": {"type": "constant",
"times": 3},
"sla": {},
"context": {"users@openstack": {"tenants": 2}},
"created_at": "2017-07-28T23:35:46",
"updated_at": "2017-07-28T23:37:55",
"start_time": 1501284950.371992,
"failed_iteration_count": 2,
"load_duration": 97.82577991485596,
"full_duration": 127.59103488922119,
"pass_sla": False,
"sla_results": {
"sla": [{"criterion": "JoGo",
"success": False,
"detail": "because i do not like you"}]
},
"statistics": {
"durations": {
"total": {
"data": {"success": "80.0%"}
}
}
},
"data": [
# iteration where the unwrapped action failed
{"timestamp": 1501284950.371992,
"error": ["ERROR!"],
"duration": 10.096552848815918,
"idle_duration": 0,
"atomic_actions": [
{"finished_at": 1501284961.468537,
"started_at": 1501284950.372052,
"name": "cinder.list_volumes",
"children": []}]},
# iteration where the known action failed
{"timestamp": 1501284950.371992,
"error": ["ERROR!"],
"duration": 10.096552848815918,
"idle_duration": 0,
"atomic_actions": [
{"finished_at": 1501284961.468537,
"started_at": 1501284950.372052,
"name": "cinder.list_volumes",
"failed": True,
"children": []}]}
]}]
}]
}]
@ddt.ddt
class ElasticSearchExporterTestCase(test.TestCase):
def setUp(self):
super(ElasticSearchExporterTestCase, self).setUp()
self.patcher = mock.patch.object(elastic.client, "ElasticSearchClient")
self.es_cls = self.patcher.start()
self.addCleanup(self.patcher.stop)
def test_init(self):
exporter = elastic.ElasticSearchExporter(None, "http://example.com")
self.assertTrue(exporter._remote)
self.assertEqual(self.es_cls.return_value,
getattr(exporter, "_client"))
exporter = elastic.ElasticSearchExporter(None, None)
self.assertTrue(exporter._remote)
self.assertEqual(self.es_cls.return_value,
getattr(exporter, "_client"))
exporter = elastic.ElasticSearchExporter(None, "/foo/bar")
self.assertFalse(exporter._remote)
self.assertIsNone(getattr(exporter, "_client", None))
def test__pack(self):
exporter = elastic.ElasticSearchExporter(None, None)
self.assertEqual(
{"key1=value1", "key2=value2"},
set(exporter._pack({"key1": "value1", "key2": "value2"})))
self.assertEqual(
{"key1=value1", "key2.foo.bar=1", "key2.xxx=yyy"},
set(exporter._pack({"key1": "value1", "key2": {"foo": {"bar": 1},
"xxx": "yyy"}})))
@ddt.data(None, "/home/bar", "https://example.com")
def test__add_index(self, destination):
index = "foo"
doc_type = "bar"
body = {
"key1": "value1",
"key2": "value2"
}
doc_id = "2fa4f5ff-7d23-4bb0-9b1f-8ee235f7f1c8"
exporter = elastic.ElasticSearchExporter(None, destination)
exporter._add_index(index=index,
body=body,
doc_id=doc_id,
doc_type=doc_type)
self.assertEqual(2, len(exporter._report))
self.assertEqual({"index": {"_index": index,
"_type": doc_type,
"_id": doc_id}},
json.loads(exporter._report[0]))
@ddt.data(True, False)
@mock.patch("%s.ElasticSearchExporter._add_index" % PATH)
def test__process_atomic_actions(self, known_fail, mock__add_index):
es_exporter = elastic.ElasticSearchExporter({}, None)
itr_data = {"id": "foo_bar_uuid",
"error": ["I was forced to fail. Sorry"],
"timestamp": 1, "duration": 2, "idle_duration": 1}
workload = {"scenario_cfg": ["key1=value1"],
"runner_name": "foo",
"runner_cfg": ["times=3"],
"contexts": ["users@openstack.tenants=2"]}
atomic_actions = [
{"name": "do_something",
"started_at": 1, "finished_at": 2,
"children": []},
{"name": "fail_something",
"started_at": 3,
"finished_at": 4,
"children": [
{"name": "rm -rf", "started_at": 3, "finished_at": 4,
"children": []}
]},
]
if known_fail:
atomic_actions[-1]["failed"] = True
atomic_actions[-1]["children"][-1]["failed"] = True
es_exporter._process_atomic_actions(atomic_actions=atomic_actions,
itr=itr_data,
workload_id="wid",
workload=workload)
expected_calls = [
mock.call(
"rally_atomic_action_data_v1",
{
"action_name": "do_something",
"scenario_cfg": ["key1=value1"],
"contexts": ["users@openstack.tenants=2"],
"runner_name": "foo",
"runner_cfg": ["times=3"],
"started_at": "1970-01-01T00:00:01",
"finished_at": "1970-01-01T00:00:02",
"duration": 1,
"success": True,
"error": None,
"parent": None,
"workload_uuid": "wid"},
doc_id="foo_bar_uuid_action_do_something_0"),
mock.call(
"rally_atomic_action_data_v1",
{
"action_name": "fail_something",
"scenario_cfg": ["key1=value1"],
"contexts": ["users@openstack.tenants=2"],
"runner_name": "foo",
"runner_cfg": ["times=3"],
"started_at": "1970-01-01T00:00:03",
"finished_at": "1970-01-01T00:00:04",
"duration": 1,
"success": not known_fail,
"error": itr_data["error"] if known_fail else None,
"parent": None,
"workload_uuid": "wid"},
doc_id="foo_bar_uuid_action_fail_something_0"),
mock.call(
"rally_atomic_action_data_v1",
{
"action_name": "rm -rf",
"scenario_cfg": ["key1=value1"],
"contexts": ["users@openstack.tenants=2"],
"runner_name": "foo",
"runner_cfg": ["times=3"],
"started_at": "1970-01-01T00:00:03",
"finished_at": "1970-01-01T00:00:04",
"duration": 1,
"success": not known_fail,
"error": itr_data["error"] if known_fail else None,
"parent": "foo_bar_uuid_action_fail_something_0",
"workload_uuid": "wid"},
doc_id="foo_bar_uuid_action_rm -rf_0")]
if not known_fail:
expected_calls.append(mock.call(
"rally_atomic_action_data_v1",
{
"action_name": "no-name-action",
"scenario_cfg": ["key1=value1"],
"contexts": ["users@openstack.tenants=2"],
"runner_name": "foo",
"runner_cfg": ["times=3"],
"started_at": "1970-01-01T00:00:04",
"finished_at": "1970-01-01T00:00:04",
"duration": 0,
"success": False,
"error": itr_data["error"],
"parent": None,
"workload_uuid": "wid"},
doc_id="foo_bar_uuid_action_no-name-action_0"))
self.assertEqual(expected_calls, mock__add_index.call_args_list)
def test_generate_fails_on_doc_exists(self):
destination = "http://example.com"
client = self.es_cls.return_value
client.check_document.side_effect = (False, True)
tasks = get_tasks_results()
second_task = copy.deepcopy(tasks[-1])
second_task["subtasks"] = []
tasks.append(second_task)
exporter = elastic.ElasticSearchExporter(tasks, destination)
e = self.assertRaises(exceptions.RallyException, exporter.generate)
self.assertIn("Failed to push the task %s" % tasks[0]["uuid"],
e.format_message())
def test__ensure_indices(self):
es = mock.MagicMock()
exporter = elastic.ElasticSearchExporter([], None)
exporter._client = es
# case #1: everything exists
es.list_indices.return_value = [exporter.WORKLOAD_INDEX,
exporter.TASK_INDEX,
exporter.AA_INDEX]
exporter._ensure_indices()
self.assertFalse(es.create_index.called)
es.list_indices.assert_called_once_with()
# case #2: some indices exist
es.list_indices.reset_mock()
es.list_indices.return_value = [exporter.TASK_INDEX, exporter.AA_INDEX]
exporter._ensure_indices()
es.list_indices.assert_called_once_with()
es.create_index.assert_called_once_with(
exporter.WORKLOAD_INDEX, doc_type="data",
properties=exporter.INDEX_SCHEMAS[exporter.WORKLOAD_INDEX]
)
# case #3: none of indices exists
es.list_indices.reset_mock()
es.create_index.reset_mock()
es.list_indices.return_value = []
exporter._ensure_indices()
es.list_indices.assert_called_once_with()
self.assertEqual(3, es.create_index.call_count)
@ddt.data(True, False)
def test_generate(self, remote):
if remote:
destination = "http://example.com"
client = self.es_cls.return_value
client.check_document.return_value = False
else:
destination = "/home/bar.txt"
tasks = get_tasks_results()
second_task = copy.deepcopy(tasks[-1])
second_task["subtasks"] = []
tasks.append(second_task)
exporter = elastic.ElasticSearchExporter(tasks, destination)
result = exporter.generate()
if remote:
self.assertEqual(
[mock.call("rally_task_data_v1", second_task["uuid"]),
mock.call("rally_task_data_v1", second_task["uuid"])],
client.check_document.call_args_list
)
client.push_documents.assert_called_once_with(exporter._report)
client.list_indices.assert_called_once_with()
self.assertEqual(3, client.create_index.call_count)
else:
self.assertEqual({"files", "open"}, set(result.keys()))
self.assertEqual("file://%s" % destination, result["open"])
self.assertEqual({destination}, set(result["files"].keys()))
data = result["files"][destination].split("\n")
# the should be always empty line in the end
self.assertEqual("", data[-1])
data = [json.loads(l) for l in exporter._report]
self.assertIsInstance(data, list)
expected = [
{
"index": {"_id": "2fa4f5ff-7d23-4bb0-9b1f-8ee235f7f1c8",
"_index": "rally_task_data_v1",
"_type": "data"}
},
{
"title": "foo",
"description": "bar",
"deployment_uuid": "deployment-uuu-iii-iii-ddd",
"status": "ok",
"pass_sla": "yup",
"task_uuid": "2fa4f5ff-7d23-4bb0-9b1f-8ee235f7f1c8",
"tags": ["tag-1", "tag-2"]
},
{
"index": {"_id": "4dcd88a5-164b-4431-8b44-3979868116dd",
"_index": "rally_workload_data_v1",
"_type": "data"}
},
{
"task_uuid": "2fa4f5ff-7d23-4bb0-9b1f-8ee235f7f1c8",
"subtask_uuid": "35166362-0b11-4e74-929d-6988377e2da2",
"scenario_name": "CinderVolumes.list_volumes",
"description": "List all volumes.",
"scenario_cfg": ["key1=value1"],
"contexts": ["users@openstack.tenants=2"],
"runner_name": "constant",
"runner_cfg": ["times=3", "type=constant"],
"full_duration": 127.59103488922119,
"load_duration": 97.82577991485596,
"started_at": "2017-07-28T23:35:50",
"pass_sla": False,
"success_rate": 0.8,
"sla_details": ["because i do not like you"]
},
{
"index": {
"_id": "4dcd88a5-164b-4431-8b44-3979868116dd_iter_1_action"
"_cinder.list_volumes_0",
"_index": "rally_atomic_action_data_v1",
"_type": "data"}
},
{
"action_name": "cinder.list_volumes",
"started_at": "2017-07-28T23:35:50",
"finished_at": "2017-07-28T23:36:01",
"duration": 11.096485137939453,
"contexts": ["users@openstack.tenants=2"],
"error": None,
"parent": None,
"runner_name": "constant",
"runner_cfg": ["times=3", "type=constant"],
"scenario_cfg": ["key1=value1"],
"success": True,
"workload_uuid": "4dcd88a5-164b-4431-8b44-3979868116dd"
},
{
"index": {
"_id": "4dcd88a5-164b-4431-8b44-3979868116dd_iter_1_action"
"_no-name-action_0",
"_index": "rally_atomic_action_data_v1",
"_type": "data"}
},
{
"action_name": "no-name-action",
"started_at": "2017-07-28T23:36:00",
"finished_at": "2017-07-28T23:36:00",
"duration": 0,
"contexts": ["users@openstack.tenants=2"],
"error": ["ERROR!"],
"parent": None,
"runner_name": "constant",
"runner_cfg": ["times=3", "type=constant"],
"scenario_cfg": ["key1=value1"],
"success": False,
"workload_uuid": "4dcd88a5-164b-4431-8b44-3979868116dd"
},
{
"index": {
"_id": "4dcd88a5-164b-4431-8b44-3979868116dd_iter_2_action"
"_cinder.list_volumes_0",
"_index": "rally_atomic_action_data_v1",
"_type": "data"}
},
{
"action_name": "cinder.list_volumes",
"started_at": "2017-07-28T23:35:50",
"finished_at": "2017-07-28T23:36:01",
"duration": 11.096485137939453,
"contexts": ["users@openstack.tenants=2"],
"error": ["ERROR!"],
"parent": None,
"runner_name": "constant",
"runner_cfg": ["times=3", "type=constant"],
"scenario_cfg": ["key1=value1"],
"success": False,
"workload_uuid": "4dcd88a5-164b-4431-8b44-3979868116dd"
},
{
"index": {"_id": "2fa4f5ff-7d23-4bb0-9b1f-8ee235f7f1c8",
"_index": "rally_task_data_v1",
"_type": "data"}
},
{
"title": "foo",
"description": "bar",
"deployment_uuid": "deployment-uuu-iii-iii-ddd",
"status": "ok",
"pass_sla": "yup",
"task_uuid": "2fa4f5ff-7d23-4bb0-9b1f-8ee235f7f1c8",
"tags": ["tag-1", "tag-2"]}
]
for i, line in enumerate(expected):
if i == len(data):
self.fail("The next line is missed: %s" % line)
self.assertEqual(line, data[i], "Line #%s is wrong." % (i + 1))