A Zuul reporter for Elasticsearch
It has the capability to index build and buildset results. With the help of tools like Kibana, advanced analytics dashboard could be built on top of the Zuul Elasticsearch index. Optionally job's variables and zuul_return data can be exported along with build results under the job_vars and job_returned_vars fields. Change-Id: I5315483c55c10de63a3cd995ef681d0b64b98513
This commit is contained in:
parent
d691eb9db4
commit
89b6803085
|
@ -0,0 +1,150 @@
|
|||
:title: Elasticsearch Driver
|
||||
|
||||
Elasticsearch
|
||||
=============
|
||||
|
||||
The Elasticsearch driver supports reporters only. The purpose of the driver is
|
||||
to export build and buildset results to an Elasticsearch index.
|
||||
|
||||
If the index does not exist in Elasticsearch then the driver will create it
|
||||
with an appropriate mapping for static fields.
|
||||
|
||||
The driver can add job's variables and any data returned to Zuul
|
||||
via zuul_return respectively into the `job_vars` and `job_returned_vars` fields
|
||||
of the exported build doc. Elasticsearch will apply a dynamic data type
|
||||
detection for those fields.
|
||||
|
||||
Elasticsearch supports a number of different datatypes for the fields in a
|
||||
document. Please refer to its `documentation`_.
|
||||
|
||||
The Elasticsearch reporter uses new ES client, that is only supporting
|
||||
`current version`_ of Elastisearch. In that case the
|
||||
reporter has been tested on ES cluster version 7. Lower version may
|
||||
be working, but we can not give tu any guarantee of that.
|
||||
|
||||
|
||||
.. _documentation: https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
|
||||
.. _current version: https://www.elastic.co/support/eol
|
||||
|
||||
Connection Configuration
|
||||
------------------------
|
||||
|
||||
The connection options for the Elasticsearch driver are:
|
||||
|
||||
.. attr:: <Elasticsearch connection>
|
||||
|
||||
.. attr:: driver
|
||||
:required:
|
||||
|
||||
.. value:: elasticsearch
|
||||
|
||||
The connection must set ``driver=elasticsearch``.
|
||||
|
||||
.. attr:: uri
|
||||
:required:
|
||||
|
||||
Database connection information in the form of a comma separated
|
||||
list of ``host:port``. The information can also include protocol (http/https)
|
||||
or username and password required to authenticate to the Elasticsearch.
|
||||
|
||||
Example:
|
||||
|
||||
uri=elasticsearch1.domain:9200,elasticsearch2.domain:9200
|
||||
|
||||
or
|
||||
|
||||
uri=https://user:password@elasticsearch:9200
|
||||
|
||||
where user and password is optional.
|
||||
|
||||
.. attr:: use_ssl
|
||||
:default: true
|
||||
|
||||
Turn on SSL. This option is not required, if you set ``https`` in
|
||||
uri param.
|
||||
|
||||
.. attr:: verify_certs
|
||||
:default: true
|
||||
|
||||
Make sure we verify SSL certificates.
|
||||
|
||||
.. attr:: ca_certs
|
||||
:default: ''
|
||||
|
||||
Path to CA certs on disk.
|
||||
|
||||
.. attr:: client_cert
|
||||
:default: ''
|
||||
|
||||
Path to the PEM formatted SSL client certificate.
|
||||
|
||||
.. attr:: client_key
|
||||
:default: ''
|
||||
|
||||
Path to the PEM formatted SSL client key.
|
||||
|
||||
|
||||
Example of driver configuration:
|
||||
|
||||
.. code-block:: text
|
||||
|
||||
[connection elasticsearch]
|
||||
driver=elasticsearch
|
||||
uri=https://managesf.sftests.com:9200
|
||||
|
||||
|
||||
Additional parameters to authenticate to the Elasticsearch server you
|
||||
can find in `client`_ class.
|
||||
|
||||
|
||||
.. _client: https://github.com/elastic/elasticsearch-py/blob/master/elasticsearch/client/__init__.py
|
||||
|
||||
Reporter Configuration
|
||||
----------------------
|
||||
|
||||
This reporter is used to store build results in an Elasticsearch index.
|
||||
|
||||
The Elasticsearch reporter does nothing on :attr:`pipeline.start` or
|
||||
:attr:`pipeline.merge-failure`; it only acts on
|
||||
:attr:`pipeline.success` or :attr:`pipeline.failure` reporting stages.
|
||||
|
||||
.. attr:: pipeline.<reporter>.<elasticsearch source>
|
||||
|
||||
The reporter supports the following attributes:
|
||||
|
||||
.. attr:: index
|
||||
:default: zuul
|
||||
|
||||
The Elasticsearch index to be used to index the data. To prevent
|
||||
any name collisions between Zuul tenants, the tenant name is used as index
|
||||
name prefix. The real index name will be:
|
||||
|
||||
.. code-block::
|
||||
|
||||
<index-name>.<tenant-name>-<YYYY>.<MM>.<DD>
|
||||
|
||||
The index will be created if it does not exist.
|
||||
|
||||
.. attr:: index-vars
|
||||
:default: false
|
||||
|
||||
Boolean value that determines if the reporter should add job's vars
|
||||
to the exported build doc.
|
||||
NOTE: The index-vars is not including the secrets.
|
||||
|
||||
.. attr:: index-returned-vars
|
||||
:default: false
|
||||
|
||||
Boolean value that determines if the reporter should add zuul_returned
|
||||
vars to the exported build doc.
|
||||
|
||||
|
||||
For example:
|
||||
|
||||
.. code-block:: yaml
|
||||
|
||||
- pipeline:
|
||||
name: check
|
||||
success:
|
||||
elasticsearch:
|
||||
index: 'zuul-index'
|
|
@ -24,6 +24,7 @@ Zuul includes the following drivers:
|
|||
gitlab
|
||||
git
|
||||
mqtt
|
||||
elasticsearch
|
||||
smtp
|
||||
sql
|
||||
timer
|
||||
|
|
|
@ -35,3 +35,4 @@ routes
|
|||
jsonpath-rw
|
||||
urllib3!=1.25.4,!=1.25.5 # https://github.com/urllib3/urllib3/pull/1684
|
||||
cheroot!=8.1.*,!=8.2.*,!=8.3.0 # https://github.com/cherrypy/cheroot/issues/263
|
||||
elasticsearch
|
||||
|
|
|
@ -81,6 +81,7 @@ from zuul.driver.pagure import PagureDriver
|
|||
from zuul.driver.gitlab import GitlabDriver
|
||||
from zuul.driver.gerrit import GerritDriver
|
||||
from zuul.driver.github.githubconnection import GithubClientManager
|
||||
from zuul.driver.elasticsearch import ElasticsearchDriver
|
||||
from zuul.lib.connections import ConnectionRegistry
|
||||
from psutil import Popen
|
||||
|
||||
|
@ -92,6 +93,7 @@ import zuul.driver.github.githubconnection as githubconnection
|
|||
import zuul.driver.pagure.pagureconnection as pagureconnection
|
||||
import zuul.driver.gitlab.gitlabconnection as gitlabconnection
|
||||
import zuul.driver.github
|
||||
import zuul.driver.elasticsearch.connection as elconnection
|
||||
import zuul.driver.sql
|
||||
import zuul.scheduler
|
||||
import zuul.executor.server
|
||||
|
@ -335,6 +337,7 @@ class TestConnectionRegistry(ConnectionRegistry):
|
|||
self, changes, upstream_root, additional_event_queues, rpcclient))
|
||||
self.registerDriver(GitlabDriverMock(
|
||||
self, changes, upstream_root, additional_event_queues, rpcclient))
|
||||
self.registerDriver(ElasticsearchDriver())
|
||||
|
||||
|
||||
class FakeAnsibleManager(zuul.lib.ansible.AnsibleManager):
|
||||
|
@ -1070,6 +1073,19 @@ class FakeGerritRefWatcher(gitwatcher.GitWatcher):
|
|||
return r
|
||||
|
||||
|
||||
class FakeElasticsearchConnection(elconnection.ElasticsearchConnection):
|
||||
|
||||
log = logging.getLogger("zuul.test.FakeElasticsearchConnection")
|
||||
|
||||
def __init__(self, driver, connection_name, connection_config):
|
||||
self.driver = driver
|
||||
self.source_it = None
|
||||
|
||||
def add_docs(self, source_it, index):
|
||||
self.source_it = source_it
|
||||
self.index = index
|
||||
|
||||
|
||||
class FakeGerritConnection(gerritconnection.GerritConnection):
|
||||
"""A Fake Gerrit connection for use in tests.
|
||||
|
||||
|
@ -4130,6 +4146,7 @@ class ZuulTestCase(BaseTestCase):
|
|||
self.poller_events = {}
|
||||
self._configureSmtp()
|
||||
self._configureMqtt()
|
||||
self._configureElasticsearch()
|
||||
|
||||
executor_connections = TestConnectionRegistry(
|
||||
self.changes, self.config, self.additional_event_queues,
|
||||
|
@ -4197,6 +4214,17 @@ class ZuulTestCase(BaseTestCase):
|
|||
'zuul.driver.mqtt.mqttconnection.MQTTConnection.publish',
|
||||
fakeMQTTPublish))
|
||||
|
||||
def _configureElasticsearch(self):
|
||||
# Set up Elasticsearch related fakes
|
||||
def getElasticsearchConnection(driver, name, config):
|
||||
con = FakeElasticsearchConnection(
|
||||
driver, name, config)
|
||||
return con
|
||||
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'zuul.driver.elasticsearch.ElasticsearchDriver.getConnection',
|
||||
getElasticsearchConnection))
|
||||
|
||||
def setup_config(self, config_file: str):
|
||||
# This creates the per-test configuration object. It can be
|
||||
# overridden by subclasses, but should not need to be since it
|
||||
|
|
5
tests/fixtures/config/elasticsearch-driver/git/common-config/playbooks/test.yaml
vendored
Normal file
5
tests/fixtures/config/elasticsearch-driver/git/common-config/playbooks/test.yaml
vendored
Normal file
|
@ -0,0 +1,5 @@
|
|||
- hosts: all
|
||||
tasks:
|
||||
- zuul_return:
|
||||
data:
|
||||
foo: 'bar'
|
57
tests/fixtures/config/elasticsearch-driver/git/common-config/zuul.d/config.yaml
vendored
Normal file
57
tests/fixtures/config/elasticsearch-driver/git/common-config/zuul.d/config.yaml
vendored
Normal file
|
@ -0,0 +1,57 @@
|
|||
- pipeline:
|
||||
name: check
|
||||
manager: independent
|
||||
trigger:
|
||||
gerrit:
|
||||
- event: patchset-created
|
||||
success:
|
||||
gerrit:
|
||||
Verified: 1
|
||||
elasticsearch:
|
||||
index: zuul-index
|
||||
index-vars: true
|
||||
index-returned-vars: true
|
||||
|
||||
- secret:
|
||||
name: test_secret
|
||||
data:
|
||||
username: test-username
|
||||
password: !encrypted/pkcs1-oaep |
|
||||
BFhtdnm8uXx7kn79RFL/zJywmzLkT1GY78P3bOtp4WghUFWobkifSu7ZpaV4NeO0s71YUsi1wGZZ
|
||||
L0LveZjUN0t6OU1VZKSG8R5Ly7urjaSo1pPVIq5Rtt/H7W14Lecd+cUeKb4joeusC9drN3AA8a4o
|
||||
ykcVpt1wVqUnTbMGC9ARMCQP6eopcs1l7tzMseprW4RDNhIuz3CRgd0QBMPl6VDoFgBPB8vxtJw+
|
||||
3m0rqBYZCLZgCXekqlny8s2s92nJMuUABbJOEcDRarzibDsSXsfJt1y+5n7yOURsC7lovMg4GF/v
|
||||
Cl/0YMKjBO5bpv9EM5fToeKYyPGSKQoHOnCYceb3cAVcv5UawcCic8XjhEhp4K7WPdYf2HVAC/qt
|
||||
xhbpjTxG4U5Q/SoppOJ60WqEkQvbXs6n5Dvy7xmph6GWmU/bAv3eUK3pdD3xa2Ue1lHWz3U+rsYr
|
||||
aI+AKYsMYx3RBlfAmCeC1ve2BXPrqnOo7G8tnUvfdYPbK4Aakk0ds/AVqFHEZN+S6hRBmBjLaRFW
|
||||
Z3QSO1NjbBxWnaHKZYT7nkrJm8AMCgZU0ZArFLpaufKCeiK5ECSsDxic4FIsY1OkWT42qEUfL0Wd
|
||||
+150AKGNZpPJnnP3QYY4W/MWcKH/zdO400+zWN52WevbSqZy90tqKDJrBkMl1ydqbuw1E4ZHvIs=
|
||||
|
||||
- job:
|
||||
name: base
|
||||
parent: null
|
||||
nodeset:
|
||||
nodes:
|
||||
- name: test_node
|
||||
label: test_label
|
||||
|
||||
- job:
|
||||
name: test
|
||||
run: playbooks/test.yaml
|
||||
vars:
|
||||
bar: foo
|
||||
secrets:
|
||||
- test_secret
|
||||
|
||||
- project:
|
||||
name: org/project
|
||||
check:
|
||||
jobs:
|
||||
- test:
|
||||
vars:
|
||||
bar2: foo2
|
||||
|
||||
- project:
|
||||
name: common-config
|
||||
check:
|
||||
jobs: []
|
|
@ -0,0 +1 @@
|
|||
test
|
|
@ -0,0 +1,8 @@
|
|||
- tenant:
|
||||
name: tenant-one
|
||||
source:
|
||||
gerrit:
|
||||
config-projects:
|
||||
- common-config
|
||||
untrusted-projects:
|
||||
- org/project
|
|
@ -0,0 +1,25 @@
|
|||
[gearman]
|
||||
server=127.0.0.1
|
||||
|
||||
[scheduler]
|
||||
tenant_config=main.yaml
|
||||
|
||||
[merger]
|
||||
git_dir=/tmp/zuul-test/merger-git
|
||||
git_user_email=zuul@example.com
|
||||
git_user_name=zuul
|
||||
|
||||
[executor]
|
||||
git_dir=/tmp/zuul-test/executor-git
|
||||
|
||||
[connection gerrit]
|
||||
driver=gerrit
|
||||
server=review.example.com
|
||||
user=jenkins
|
||||
sshkey=fake_id_rsa1
|
||||
|
||||
[connection elasticsearch]
|
||||
driver=elasticsearch
|
||||
uri=localhost:9200
|
||||
use_ssl=true
|
||||
verify_certs=false
|
|
@ -13,10 +13,11 @@
|
|||
# under the License.
|
||||
|
||||
import textwrap
|
||||
import time
|
||||
|
||||
import sqlalchemy as sa
|
||||
|
||||
from tests.base import ZuulTestCase, ZuulDBTestCase
|
||||
from tests.base import ZuulTestCase, ZuulDBTestCase, AnsibleZuulTestCase
|
||||
|
||||
|
||||
def _get_reporter_from_connection_name(reporters, connection_name):
|
||||
|
@ -652,3 +653,79 @@ class TestMQTTConnectionBuildPage(ZuulTestCase):
|
|||
build_id
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class TestElasticsearchConnection(AnsibleZuulTestCase):
|
||||
config_file = 'zuul-elastic-driver.conf'
|
||||
tenant_config_file = 'config/elasticsearch-driver/main.yaml'
|
||||
|
||||
def _getSecrets(self, job, pbtype):
|
||||
secrets = []
|
||||
build = self.getJobFromHistory(job)
|
||||
for pb in build.parameters[pbtype]:
|
||||
secrets.append(pb['secrets'])
|
||||
return secrets
|
||||
|
||||
def test_elastic_reporter(self):
|
||||
"Test the Elasticsearch reporter"
|
||||
# Add a success result
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
indexed_docs = self.scheds.first.connections.connections[
|
||||
'elasticsearch'].source_it
|
||||
index = self.scheds.first.connections.connections[
|
||||
'elasticsearch'].index
|
||||
|
||||
self.assertEqual(len(indexed_docs), 2)
|
||||
self.assertEqual(index, ('zuul-index.tenant-one-%s' %
|
||||
time.strftime("%Y.%m.%d")))
|
||||
buildset_doc = [doc for doc in indexed_docs if
|
||||
doc['build_type'] == 'buildset'][0]
|
||||
self.assertEqual(buildset_doc['tenant'], 'tenant-one')
|
||||
self.assertEqual(buildset_doc['pipeline'], 'check')
|
||||
self.assertEqual(buildset_doc['result'], 'SUCCESS')
|
||||
build_doc = [doc for doc in indexed_docs if
|
||||
doc['build_type'] == 'build'][0]
|
||||
self.assertEqual(build_doc['buildset_uuid'], buildset_doc['uuid'])
|
||||
self.assertEqual(build_doc['result'], 'SUCCESS')
|
||||
self.assertEqual(build_doc['job_name'], 'test')
|
||||
self.assertEqual(build_doc['tenant'], 'tenant-one')
|
||||
self.assertEqual(build_doc['pipeline'], 'check')
|
||||
|
||||
self.assertIn('job_vars', build_doc)
|
||||
self.assertDictEqual(
|
||||
build_doc['job_vars'], {'bar': 'foo', 'bar2': 'foo2'})
|
||||
|
||||
self.assertIn('job_returned_vars', build_doc)
|
||||
self.assertDictEqual(
|
||||
build_doc['job_returned_vars'], {'foo': 'bar'})
|
||||
|
||||
self.assertEqual(self.history[0].uuid, build_doc['uuid'])
|
||||
|
||||
def test_elasticsearch_secret_leak(self):
|
||||
expected_secret = [{
|
||||
'test_secret': {
|
||||
'username': 'test-username',
|
||||
'password': 'test-password'
|
||||
}
|
||||
}]
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
indexed_docs = self.scheds.first.connections.connections[
|
||||
'elasticsearch'].source_it
|
||||
|
||||
build_doc = [doc for doc in indexed_docs if
|
||||
doc['build_type'] == 'build'][0]
|
||||
|
||||
# Ensure that job include secret
|
||||
self.assertEqual(
|
||||
self._getSecrets('test', 'playbooks'),
|
||||
expected_secret)
|
||||
|
||||
# Check if there is a secret leak
|
||||
self.assertFalse('test_secret' in build_doc['job_vars'])
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
# Copyright 2019 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from zuul.driver import Driver, ConnectionInterface, ReporterInterface
|
||||
from zuul.driver.elasticsearch import connection as elconnection
|
||||
from zuul.driver.elasticsearch import reporter as elreporter
|
||||
|
||||
|
||||
class ElasticsearchDriver(Driver, ConnectionInterface, ReporterInterface):
|
||||
name = 'elasticsearch'
|
||||
|
||||
def getConnection(self, name, config):
|
||||
return elconnection.ElasticsearchConnection(self, name, config)
|
||||
|
||||
def getReporter(self, connection, pipeline, config=None):
|
||||
return elreporter.ElasticsearchReporter(self, connection, config)
|
||||
|
||||
def getReporterSchema(self):
|
||||
return elreporter.getSchema()
|
|
@ -0,0 +1,155 @@
|
|||
# Copyright 2019 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import yaml
|
||||
import logging
|
||||
|
||||
from elasticsearch import Elasticsearch
|
||||
from elasticsearch.client import IndicesClient
|
||||
from elasticsearch.helpers import bulk
|
||||
from elasticsearch.helpers import BulkIndexError
|
||||
|
||||
from zuul.connection import BaseConnection
|
||||
|
||||
|
||||
class ElasticsearchConnection(BaseConnection):
|
||||
driver_name = 'elasticsearch'
|
||||
log = logging.getLogger("zuul.ElasticSearchConnection")
|
||||
properties = {
|
||||
# Common attribute
|
||||
"uuid": {"type": "keyword"},
|
||||
"build_type": {"type": "keyword"},
|
||||
"result": {"type": "keyword"},
|
||||
"duration": {"type": "integer"},
|
||||
# BuildSet type specific attributes
|
||||
"zuul_ref": {"type": "keyword"},
|
||||
"pipeline": {"type": "keyword"},
|
||||
"project": {"type": "keyword"},
|
||||
"branch": {"type": "keyword"},
|
||||
"change": {"type": "integer"},
|
||||
"patchset": {"type": "keyword"},
|
||||
"ref": {"type": "keyword"},
|
||||
"oldrev": {"type": "keyword"},
|
||||
"newrev": {"type": "keyword"},
|
||||
"ref_url": {"type": "keyword"},
|
||||
"message": {"type": "text"},
|
||||
"tenant": {"type": "keyword"},
|
||||
# Build type specific attibutes
|
||||
"buildset_uuid": {"type": "keyword"},
|
||||
"job_name": {"type": "keyword"},
|
||||
"start_time": {"type": "date", "format": "epoch_second"},
|
||||
"end_time": {"type": "date", "format": "epoch_second"},
|
||||
"voting": {"type": "boolean"},
|
||||
"log_url": {"type": "keyword"},
|
||||
"node_name": {"type": "keyword"}
|
||||
}
|
||||
|
||||
def __init__(self, driver, connection_name, connection_config):
|
||||
super(ElasticsearchConnection, self).__init__(
|
||||
driver, connection_name, connection_config)
|
||||
self.uri = self.connection_config.get('uri').split(',')
|
||||
self.cnx_opts = {}
|
||||
use_ssl = self.connection_config.get('use_ssl', True)
|
||||
if isinstance(use_ssl, str):
|
||||
if use_ssl.lower() == 'false':
|
||||
use_ssl = False
|
||||
else:
|
||||
use_ssl = True
|
||||
self.cnx_opts['use_ssl'] = use_ssl
|
||||
if use_ssl:
|
||||
verify_certs = self.connection_config.get('verify_certs', True)
|
||||
if isinstance(verify_certs, str):
|
||||
if verify_certs.lower() == 'false':
|
||||
verify_certs = False
|
||||
else:
|
||||
verify_certs = True
|
||||
self.cnx_opts['verify_certs'] = verify_certs
|
||||
self.cnx_opts['ca_certs'] = self.connection_config.get(
|
||||
'ca_certs', None)
|
||||
self.cnx_opts['client_cert'] = self.connection_config.get(
|
||||
'client_cert', None)
|
||||
self.cnx_opts['client_key'] = self.connection_config.get(
|
||||
'client_key', None)
|
||||
self.es = Elasticsearch(
|
||||
self.uri, **self.cnx_opts)
|
||||
try:
|
||||
self.log.debug("Elasticsearch info: %s" % self.es.info())
|
||||
except Exception as e:
|
||||
self.log.warn("An error occured on estabilishing "
|
||||
"connection to Elasticsearch: %s" % e)
|
||||
self.ic = IndicesClient(self.es)
|
||||
|
||||
def setIndex(self, index):
|
||||
settings = {
|
||||
'mappings': {
|
||||
'zuul': {
|
||||
"properties": self.properties
|
||||
}
|
||||
}
|
||||
}
|
||||
try:
|
||||
self.ic.create(index=index, ignore=400, body=settings)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Unable to create the index %s on connection %s" % (
|
||||
index, self.connection_name))
|
||||
|
||||
def gen(self, it, index):
|
||||
for source in it:
|
||||
d = {}
|
||||
d['_index'] = index
|
||||
d['_type'] = 'zuul'
|
||||
d['_op_type'] = 'index'
|
||||
d['_source'] = source
|
||||
yield d
|
||||
|
||||
def add_docs(self, source_it, index):
|
||||
|
||||
self.setIndex(index)
|
||||
|
||||
try:
|
||||
bulk(self.es, self.gen(source_it, index))
|
||||
self.es.indices.refresh(index=index)
|
||||
self.log.debug('%s docs indexed to %s' % (
|
||||
len(source_it), self.connection_name))
|
||||
except BulkIndexError as exc:
|
||||
self.log.warn("Some docs failed to be indexed (%s)" % exc)
|
||||
# We give flexibility by allowing any type of job's vars and
|
||||
# zuul return data to be indexed with EL dynamic mapping enabled.
|
||||
# It may happen that a doc own a field with a value that does not
|
||||
# match the previous data type that EL has detected for that field.
|
||||
# In that case the whole doc is not indexed by EL.
|
||||
# Here we want to mitigate by indexing the errorneous docs in a
|
||||
# <index-name>.errorneous index by flattening the doc data as yaml.
|
||||
# This ensures the doc is indexed and can be tracked and eventually
|
||||
# be modified and re-indexed by an operator.
|
||||
errorneous_docs = []
|
||||
for d in exc.errors:
|
||||
if d['index']['error']['type'] == 'mapper_parsing_exception':
|
||||
errorneous_doc = {
|
||||
'uuid': d['index']['data']['uuid'],
|
||||
'blob': yaml.dump(d['index']['data'])
|
||||
}
|
||||
errorneous_docs.append(errorneous_doc)
|
||||
try:
|
||||
mapping_errorneous_index = "%s.errorneous" % index
|
||||
bulk(
|
||||
self.es,
|
||||
self.gen(errorneous_docs, mapping_errorneous_index))
|
||||
self.es.indices.refresh(index=mapping_errorneous_index)
|
||||
self.log.info(
|
||||
"%s errorneous docs indexed" % (len(errorneous_docs)))
|
||||
except BulkIndexError as exc:
|
||||
self.log.warn(
|
||||
"Some errorneous docs failed to be indexed (%s)" % exc)
|
|
@ -0,0 +1,123 @@
|
|||
# Copyright 2019 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
import logging
|
||||
|
||||
import voluptuous as v
|
||||
|
||||
from zuul.reporter import BaseReporter
|
||||
|
||||
|
||||
class ElasticsearchReporter(BaseReporter):
|
||||
name = 'elasticsearch'
|
||||
log = logging.getLogger("zuul.ElasticsearchReporter")
|
||||
|
||||
def __init__(self, driver, connection, config):
|
||||
super(ElasticsearchReporter, self).__init__(driver, connection, config)
|
||||
self.index = self.config.get('index', 'zuul')
|
||||
self.index_vars = self.config.get('index-vars')
|
||||
self.index_returned_vars = self.config.get('index-returned-vars')
|
||||
|
||||
def report(self, item):
|
||||
"""Create an entry into a database."""
|
||||
docs = []
|
||||
index = '%s.%s-%s' % (self.index, item.pipeline.tenant.name,
|
||||
time.strftime("%Y.%m.%d"))
|
||||
buildset_doc = {
|
||||
"uuid": item.current_build_set.uuid,
|
||||
"build_type": "buildset",
|
||||
"tenant": item.pipeline.tenant.name,
|
||||
"pipeline": item.pipeline.name,
|
||||
"project": item.change.project.name,
|
||||
"change": getattr(item.change, 'number', None),
|
||||
"patchset": getattr(item.change, 'patchset', None),
|
||||
"ref": getattr(item.change, 'ref', ''),
|
||||
"oldrev": getattr(item.change, 'oldrev', ''),
|
||||
"newrev": getattr(item.change, 'newrev', ''),
|
||||
"branch": getattr(item.change, 'branch', ''),
|
||||
"zuul_ref": item.current_build_set.ref,
|
||||
"ref_url": item.change.url,
|
||||
"result": item.current_build_set.result,
|
||||
"message": self._formatItemReport(item, with_jobs=False)
|
||||
}
|
||||
|
||||
for job in item.getJobs():
|
||||
build = item.current_build_set.getBuild(job.name)
|
||||
if not build:
|
||||
continue
|
||||
# Ensure end_time is defined
|
||||
if not build.end_time:
|
||||
build.end_time = time.time()
|
||||
# Ensure start_time is defined
|
||||
if not build.start_time:
|
||||
build.start_time = build.end_time
|
||||
|
||||
(result, url) = item.formatJobResult(job)
|
||||
|
||||
# Manage to set time attributes in buildset
|
||||
start_time = int(build.start_time)
|
||||
end_time = int(build.end_time)
|
||||
if ('start_time' not in buildset_doc or
|
||||
buildset_doc['start_time'] > start_time):
|
||||
buildset_doc['start_time'] = start_time
|
||||
if ('end_time' not in buildset_doc or
|
||||
buildset_doc['end_time'] < end_time):
|
||||
buildset_doc['end_time'] = end_time
|
||||
buildset_doc['duration'] = (
|
||||
buildset_doc['end_time'] - buildset_doc['start_time'])
|
||||
|
||||
build_doc = {
|
||||
"uuid": build.uuid,
|
||||
"build_type": "build",
|
||||
"buildset_uuid": buildset_doc['uuid'],
|
||||
"job_name": build.job.name,
|
||||
"result": result,
|
||||
"start_time": str(start_time),
|
||||
"end_time": str(end_time),
|
||||
"duration": str(end_time - start_time),
|
||||
"voting": build.job.voting,
|
||||
"log_url": url,
|
||||
}
|
||||
|
||||
# Extends the build doc with some buildset info
|
||||
for attr in (
|
||||
'tenant', 'pipeline', 'project', 'change', 'patchset',
|
||||
'ref', 'oldrev', 'newrev', 'branch'):
|
||||
build_doc[attr] = buildset_doc[attr]
|
||||
|
||||
if self.index_vars:
|
||||
build_doc['job_vars'] = job.variables
|
||||
|
||||
if self.index_returned_vars:
|
||||
build_doc['job_returned_vars'] = build.result_data
|
||||
|
||||
docs.append(build_doc)
|
||||
|
||||
docs.append(buildset_doc)
|
||||
self.connection.add_docs(docs, index)
|
||||
|
||||
|
||||
def getSchema():
|
||||
el_reporter = v.Schema(
|
||||
v.Any(
|
||||
None,
|
||||
{
|
||||
'index': str,
|
||||
'index-vars': bool,
|
||||
'index-returned-vars': bool
|
||||
}
|
||||
)
|
||||
)
|
||||
return el_reporter
|
|
@ -29,6 +29,7 @@ import zuul.driver.nullwrap
|
|||
import zuul.driver.mqtt
|
||||
import zuul.driver.pagure
|
||||
import zuul.driver.gitlab
|
||||
import zuul.driver.elasticsearch
|
||||
from zuul.connection import BaseConnection
|
||||
from zuul.driver import SourceInterface
|
||||
|
||||
|
@ -58,6 +59,7 @@ class ConnectionRegistry(object):
|
|||
self.registerDriver(zuul.driver.mqtt.MQTTDriver())
|
||||
self.registerDriver(zuul.driver.pagure.PagureDriver())
|
||||
self.registerDriver(zuul.driver.gitlab.GitlabDriver())
|
||||
self.registerDriver(zuul.driver.elasticsearch.ElasticsearchDriver())
|
||||
|
||||
def registerDriver(self, driver):
|
||||
if driver.name in self.drivers:
|
||||
|
|
Loading…
Reference in New Issue