Merge "Add Prometheus monitoring metrics into logscraper"
This commit is contained in:
		@@ -23,6 +23,7 @@
 | 
			
		||||
        logsender_wait_time: 10
 | 
			
		||||
        skip_debug: true
 | 
			
		||||
        performance_index_prefix: "performance-"
 | 
			
		||||
        monitoring_port: 9128
 | 
			
		||||
  roles:
 | 
			
		||||
    - logscraper
 | 
			
		||||
    - logsender
 | 
			
		||||
@@ -44,3 +45,10 @@
 | 
			
		||||
        state: enabled
 | 
			
		||||
        permanent: true
 | 
			
		||||
        immediate: true
 | 
			
		||||
 | 
			
		||||
    - name: Expose Prometheus Logscraper exporter metrics for softwarefactory-project.io
 | 
			
		||||
      firewalld:
 | 
			
		||||
        rich_rule: 'rule family=ipv4 source address=38.102.83.250/32 port port=9128 protocol=tcp accept'
 | 
			
		||||
        state: enabled
 | 
			
		||||
        permanent: true
 | 
			
		||||
        immediate: true
 | 
			
		||||
 
 | 
			
		||||
@@ -31,4 +31,5 @@ container_images:
 | 
			
		||||
#     debug: true
 | 
			
		||||
#     logscraper_wait_time: 120
 | 
			
		||||
#     custom_ca_crt: ""
 | 
			
		||||
#     monitoring_port: ""
 | 
			
		||||
tenant_builds: []
 | 
			
		||||
 
 | 
			
		||||
@@ -11,6 +11,7 @@ directory: {{ item['download_dir'] | default('/tmp/logscraper') }}
 | 
			
		||||
wait_time: {{ item['logscraper_wait_time'] | default(120) }}
 | 
			
		||||
insecure: {{ item['insecure'] | default(false) }}
 | 
			
		||||
ca_file: {{ item['custom_ca_crt'] | default('') }}
 | 
			
		||||
monitoring_port: {{ item['monitoring_port'] | default('') }}
 | 
			
		||||
#deprecated
 | 
			
		||||
gearman_server: {{ item['gearman_server'] | default('') }}
 | 
			
		||||
gearman_port: {{ item['gearman_port'] | default(4730) }}
 | 
			
		||||
 
 | 
			
		||||
@@ -75,6 +75,8 @@ import yaml
 | 
			
		||||
 | 
			
		||||
from concurrent.futures import ThreadPoolExecutor
 | 
			
		||||
from distutils.version import StrictVersion as s_version
 | 
			
		||||
from prometheus_client import Gauge
 | 
			
		||||
from prometheus_client import start_http_server
 | 
			
		||||
import tenacity
 | 
			
		||||
from urllib.parse import urljoin
 | 
			
		||||
 | 
			
		||||
@@ -139,6 +141,10 @@ def get_arguments():
 | 
			
		||||
    parser.add_argument("--wait-time", help="Pause time for the next "
 | 
			
		||||
                        "iteration", type=int)
 | 
			
		||||
    parser.add_argument("--ca-file", help="Provide custom CA certificate")
 | 
			
		||||
    parser.add_argument("--monitoring-port", help="Expose an Prometheus "
 | 
			
		||||
                        "exporter to collect monitoring metrics."
 | 
			
		||||
                        "NOTE: When no port set, monitoring will be disabled.",
 | 
			
		||||
                        type=int)
 | 
			
		||||
    args = parser.parse_args()
 | 
			
		||||
    return args
 | 
			
		||||
 | 
			
		||||
@@ -262,6 +268,18 @@ class BuildCache:
 | 
			
		||||
        return uid in self.builds
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Monitoring:
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        self.job_count = Gauge('logscraper_job_count',
 | 
			
		||||
                               'Number of jobs processed by logscraper',
 | 
			
		||||
                               ['job_name'])
 | 
			
		||||
 | 
			
		||||
    def parse_metrics(self, builds):
 | 
			
		||||
        self.job_count.labels('summary').inc(len(builds))
 | 
			
		||||
        for build in builds:
 | 
			
		||||
            self.job_count.labels(build['job_name']).inc()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
###############################################################################
 | 
			
		||||
#                             Log Processing                                  #
 | 
			
		||||
###############################################################################
 | 
			
		||||
@@ -650,7 +668,7 @@ def check_connection(logstash_url):
 | 
			
		||||
        return s.connect_ex((host, port)) == 0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def run_scraping(args, zuul_api_url, job_name=None):
 | 
			
		||||
def run_scraping(args, zuul_api_url, job_name=None, monitoring=None):
 | 
			
		||||
    """Get latest job results and push them into log processing service.
 | 
			
		||||
 | 
			
		||||
    On the end, write build_cache file, so in the future
 | 
			
		||||
@@ -684,8 +702,11 @@ def run_scraping(args, zuul_api_url, job_name=None):
 | 
			
		||||
        finally:
 | 
			
		||||
            config.save()
 | 
			
		||||
 | 
			
		||||
    if monitoring:
 | 
			
		||||
        monitoring.parse_metrics(builds)
 | 
			
		||||
 | 
			
		||||
def run(args):
 | 
			
		||||
 | 
			
		||||
def run(args, monitoring):
 | 
			
		||||
    if args.ca_file:
 | 
			
		||||
        validate_ca = args.ca_file
 | 
			
		||||
    else:
 | 
			
		||||
@@ -700,10 +721,10 @@ def run(args):
 | 
			
		||||
            for job_name in jobs_in_zuul:
 | 
			
		||||
                logging.info("Starting checking logs for job %s in %s" % (
 | 
			
		||||
                    job_name, zuul_api_url))
 | 
			
		||||
                run_scraping(args, zuul_api_url, job_name)
 | 
			
		||||
                run_scraping(args, zuul_api_url, job_name, monitoring)
 | 
			
		||||
        else:
 | 
			
		||||
            logging.info("Starting checking logs for %s" % zuul_api_url)
 | 
			
		||||
            run_scraping(args, zuul_api_url)
 | 
			
		||||
            run_scraping(args, zuul_api_url, monitoring=monitoring)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def main():
 | 
			
		||||
@@ -712,12 +733,19 @@ def main():
 | 
			
		||||
    args = parse_args(app_args, config_args)
 | 
			
		||||
 | 
			
		||||
    setup_logging(args.debug)
 | 
			
		||||
 | 
			
		||||
    monitoring = None
 | 
			
		||||
    if args.monitoring_port:
 | 
			
		||||
        monitoring = Monitoring()
 | 
			
		||||
        start_http_server(args.monitoring_port)
 | 
			
		||||
 | 
			
		||||
    if args.download and args.gearman_server and args.gearman_port:
 | 
			
		||||
        logging.critical("Can not use logscraper to send logs to gearman "
 | 
			
		||||
                         "and download logs. Choose one")
 | 
			
		||||
        sys.exit(1)
 | 
			
		||||
    while True:
 | 
			
		||||
        run(args)
 | 
			
		||||
        run(args, monitoring)
 | 
			
		||||
 | 
			
		||||
        if not args.follow:
 | 
			
		||||
            break
 | 
			
		||||
        time.sleep(args.wait_time)
 | 
			
		||||
 
 | 
			
		||||
@@ -13,6 +13,7 @@ directory: /tmp/logscraper
 | 
			
		||||
wait_time: 120
 | 
			
		||||
insecure: false
 | 
			
		||||
ca_file: ""
 | 
			
		||||
monitoring_port: 9128
 | 
			
		||||
#deprecated
 | 
			
		||||
gearman_server: ""
 | 
			
		||||
gearman_port: 4730
 | 
			
		||||
 
 | 
			
		||||
@@ -150,7 +150,7 @@ class FakeArgs(object):
 | 
			
		||||
                 logstash_url=None, workers=None, max_skipped=None,
 | 
			
		||||
                 job_name=None, download=None, directory=None,
 | 
			
		||||
                 config=None, wait_time=None, ca_file=None,
 | 
			
		||||
                 file_list=None):
 | 
			
		||||
                 file_list=None, monitoring_port=None):
 | 
			
		||||
 | 
			
		||||
        self.zuul_api_url = zuul_api_url
 | 
			
		||||
        self.gearman_server = gearman_server
 | 
			
		||||
@@ -169,6 +169,7 @@ class FakeArgs(object):
 | 
			
		||||
        self.wait_time = wait_time
 | 
			
		||||
        self.ca_file = ca_file
 | 
			
		||||
        self.file_list = file_list
 | 
			
		||||
        self.monitoring_port = monitoring_port
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestScraper(base.TestCase):
 | 
			
		||||
@@ -215,10 +216,11 @@ class TestScraper(base.TestCase):
 | 
			
		||||
            'http://somehost.com/api/tenant/tenant1', job_names, False)
 | 
			
		||||
        self.assertEqual(['openstack-tox-py38'], result)
 | 
			
		||||
 | 
			
		||||
    @mock.patch('logscraper.logscraper.Monitoring')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.filter_available_jobs',
 | 
			
		||||
                side_effect=[['testjob1', 'testjob2'], [], []])
 | 
			
		||||
    @mock.patch('logscraper.logscraper.run_scraping')
 | 
			
		||||
    def test_run_with_jobs(self, mock_scraping, mock_jobs):
 | 
			
		||||
    def test_run_with_jobs(self, mock_scraping, mock_jobs, mock_monitoring):
 | 
			
		||||
        # when multiple job name provied, its iterate on zuul jobs
 | 
			
		||||
        # if such job is available.
 | 
			
		||||
        with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
 | 
			
		||||
@@ -229,7 +231,7 @@ class TestScraper(base.TestCase):
 | 
			
		||||
                gearman_server='localhost',
 | 
			
		||||
                job_name=['testjob1', 'testjob2'])
 | 
			
		||||
            args = logscraper.get_arguments()
 | 
			
		||||
            logscraper.run(args)
 | 
			
		||||
            logscraper.run(args, mock_monitoring)
 | 
			
		||||
            self.assertEqual(2, mock_scraping.call_count)
 | 
			
		||||
 | 
			
		||||
    @mock.patch('socket.socket')
 | 
			
		||||
@@ -315,8 +317,9 @@ class TestScraper(base.TestCase):
 | 
			
		||||
                                 mock_specified_files.call_args.args[0])
 | 
			
		||||
                self.assertFalse(mock_save_buildinfo.called)
 | 
			
		||||
 | 
			
		||||
    @mock.patch('logscraper.logscraper.Monitoring')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.run_scraping')
 | 
			
		||||
    def test_run(self, mock_scraping):
 | 
			
		||||
    def test_run(self, mock_scraping, mock_monitoring):
 | 
			
		||||
        with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
 | 
			
		||||
            mock_args.return_value = FakeArgs(
 | 
			
		||||
                zuul_api_url=['http://somehost.com/api/tenant/tenant1',
 | 
			
		||||
@@ -324,7 +327,7 @@ class TestScraper(base.TestCase):
 | 
			
		||||
                              'http://somehost.com/api/tenant/tenant3'],
 | 
			
		||||
                gearman_server='localhost')
 | 
			
		||||
            args = logscraper.get_arguments()
 | 
			
		||||
            logscraper.run(args)
 | 
			
		||||
            logscraper.run(args, mock_monitoring)
 | 
			
		||||
            self.assertEqual(3, mock_scraping.call_count)
 | 
			
		||||
 | 
			
		||||
    @mock.patch('logscraper.logscraper.load_config')
 | 
			
		||||
@@ -362,6 +365,45 @@ class TestScraper(base.TestCase):
 | 
			
		||||
                             mock_specified_files.call_args.args[0])
 | 
			
		||||
            self.assertTrue(mock_save_buildinfo.called)
 | 
			
		||||
 | 
			
		||||
    @mock.patch('logscraper.logscraper.load_config')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.save_build_info')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.check_specified_files')
 | 
			
		||||
    @mock.patch('builtins.open', new_callable=mock.mock_open())
 | 
			
		||||
    @mock.patch('os.path.isfile')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.check_specified_files',
 | 
			
		||||
                return_value=['job-output.txt'])
 | 
			
		||||
    @mock.patch('logscraper.logscraper.LogMatcher.submitJobs')
 | 
			
		||||
    @mock.patch('argparse.ArgumentParser.parse_args',
 | 
			
		||||
                return_value=FakeArgs(
 | 
			
		||||
                    zuul_api_url=['http://somehost.com/api/tenant/tenant1'],
 | 
			
		||||
                    workers=1, download=True, directory="/tmp/testdir"))
 | 
			
		||||
    def test_run_scraping_monitoring(self, mock_args, mock_submit, mock_files,
 | 
			
		||||
                                     mock_isfile, mock_readfile,
 | 
			
		||||
                                     mock_specified_files, mock_save_buildinfo,
 | 
			
		||||
                                     mock_config):
 | 
			
		||||
        with mock.patch('logscraper.logscraper.get_last_job_results'
 | 
			
		||||
                        ) as mock_job_results:
 | 
			
		||||
            with mock.patch(
 | 
			
		||||
                    'multiprocessing.pool.Pool.map_async',
 | 
			
		||||
                    lambda self, func, iterable, chunksize=None, callback=None,
 | 
			
		||||
                    error_callback=None: _MockedPoolMapAsyncResult(
 | 
			
		||||
                        func, iterable),
 | 
			
		||||
            ):
 | 
			
		||||
                args = logscraper.get_arguments()
 | 
			
		||||
                mock_job_results.return_value = [builds_result[0]]
 | 
			
		||||
                monitoring = logscraper.Monitoring()
 | 
			
		||||
                logscraper.run_scraping(
 | 
			
		||||
                    args, 'http://somehost.com/api/tenant/tenant1',
 | 
			
		||||
                    monitoring=monitoring)
 | 
			
		||||
 | 
			
		||||
            self.assertEqual('job_name', monitoring.job_count._labelnames[0])
 | 
			
		||||
            self.assertEqual(2, len(monitoring.job_count._metrics))
 | 
			
		||||
            self.assertFalse(mock_submit.called)
 | 
			
		||||
            self.assertTrue(mock_specified_files.called)
 | 
			
		||||
            self.assertEqual(builds_result[0],
 | 
			
		||||
                             mock_specified_files.call_args.args[0])
 | 
			
		||||
            self.assertTrue(mock_save_buildinfo.called)
 | 
			
		||||
 | 
			
		||||
    @mock.patch('logscraper.logscraper.create_custom_result')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.check_specified_files')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.LogMatcher.submitJobs')
 | 
			
		||||
 
 | 
			
		||||
@@ -5,3 +5,4 @@ PyYAML<6.1       # MIT
 | 
			
		||||
tenacity
 | 
			
		||||
opensearch-py<=1.0.0   # Apache-2.0
 | 
			
		||||
ruamel.yaml
 | 
			
		||||
prometheus_client
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user