Add Prometheus monitoring metrics into logscraper
We would like to know statistics of processed jobs. It might help us to estimate storage usage on Opensearch service. Change-Id: Iae7e2fa7a5ed6d09855a2cb042ce68f7152f9cbc
This commit is contained in:
		@@ -23,6 +23,7 @@
 | 
				
			|||||||
        logsender_wait_time: 10
 | 
					        logsender_wait_time: 10
 | 
				
			||||||
        skip_debug: true
 | 
					        skip_debug: true
 | 
				
			||||||
        performance_index_prefix: "performance-"
 | 
					        performance_index_prefix: "performance-"
 | 
				
			||||||
 | 
					        monitoring_port: 9128
 | 
				
			||||||
  roles:
 | 
					  roles:
 | 
				
			||||||
    - logscraper
 | 
					    - logscraper
 | 
				
			||||||
    - logsender
 | 
					    - logsender
 | 
				
			||||||
@@ -44,3 +45,10 @@
 | 
				
			|||||||
        state: enabled
 | 
					        state: enabled
 | 
				
			||||||
        permanent: true
 | 
					        permanent: true
 | 
				
			||||||
        immediate: true
 | 
					        immediate: true
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    - name: Expose Prometheus Logscraper exporter metrics for softwarefactory-project.io
 | 
				
			||||||
 | 
					      firewalld:
 | 
				
			||||||
 | 
					        rich_rule: 'rule family=ipv4 source address=38.102.83.250/32 port port=9128 protocol=tcp accept'
 | 
				
			||||||
 | 
					        state: enabled
 | 
				
			||||||
 | 
					        permanent: true
 | 
				
			||||||
 | 
					        immediate: true
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -31,4 +31,5 @@ container_images:
 | 
				
			|||||||
#     debug: true
 | 
					#     debug: true
 | 
				
			||||||
#     logscraper_wait_time: 120
 | 
					#     logscraper_wait_time: 120
 | 
				
			||||||
#     custom_ca_crt: ""
 | 
					#     custom_ca_crt: ""
 | 
				
			||||||
 | 
					#     monitoring_port: ""
 | 
				
			||||||
tenant_builds: []
 | 
					tenant_builds: []
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -11,6 +11,7 @@ directory: {{ item['download_dir'] | default('/tmp/logscraper') }}
 | 
				
			|||||||
wait_time: {{ item['logscraper_wait_time'] | default(120) }}
 | 
					wait_time: {{ item['logscraper_wait_time'] | default(120) }}
 | 
				
			||||||
insecure: {{ item['insecure'] | default(false) }}
 | 
					insecure: {{ item['insecure'] | default(false) }}
 | 
				
			||||||
ca_file: {{ item['custom_ca_crt'] | default('') }}
 | 
					ca_file: {{ item['custom_ca_crt'] | default('') }}
 | 
				
			||||||
 | 
					monitoring_port: {{ item['monitoring_port'] | default('') }}
 | 
				
			||||||
#deprecated
 | 
					#deprecated
 | 
				
			||||||
gearman_server: {{ item['gearman_server'] | default('') }}
 | 
					gearman_server: {{ item['gearman_server'] | default('') }}
 | 
				
			||||||
gearman_port: {{ item['gearman_port'] | default(4730) }}
 | 
					gearman_port: {{ item['gearman_port'] | default(4730) }}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -75,6 +75,8 @@ import yaml
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
from concurrent.futures import ThreadPoolExecutor
 | 
					from concurrent.futures import ThreadPoolExecutor
 | 
				
			||||||
from distutils.version import StrictVersion as s_version
 | 
					from distutils.version import StrictVersion as s_version
 | 
				
			||||||
 | 
					from prometheus_client import Gauge
 | 
				
			||||||
 | 
					from prometheus_client import start_http_server
 | 
				
			||||||
import tenacity
 | 
					import tenacity
 | 
				
			||||||
from urllib.parse import urljoin
 | 
					from urllib.parse import urljoin
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -139,6 +141,10 @@ def get_arguments():
 | 
				
			|||||||
    parser.add_argument("--wait-time", help="Pause time for the next "
 | 
					    parser.add_argument("--wait-time", help="Pause time for the next "
 | 
				
			||||||
                        "iteration", type=int)
 | 
					                        "iteration", type=int)
 | 
				
			||||||
    parser.add_argument("--ca-file", help="Provide custom CA certificate")
 | 
					    parser.add_argument("--ca-file", help="Provide custom CA certificate")
 | 
				
			||||||
 | 
					    parser.add_argument("--monitoring-port", help="Expose an Prometheus "
 | 
				
			||||||
 | 
					                        "exporter to collect monitoring metrics."
 | 
				
			||||||
 | 
					                        "NOTE: When no port set, monitoring will be disabled.",
 | 
				
			||||||
 | 
					                        type=int)
 | 
				
			||||||
    args = parser.parse_args()
 | 
					    args = parser.parse_args()
 | 
				
			||||||
    return args
 | 
					    return args
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -262,6 +268,18 @@ class BuildCache:
 | 
				
			|||||||
        return uid in self.builds
 | 
					        return uid in self.builds
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class Monitoring:
 | 
				
			||||||
 | 
					    def __init__(self):
 | 
				
			||||||
 | 
					        self.job_count = Gauge('logscraper_job_count',
 | 
				
			||||||
 | 
					                               'Number of jobs processed by logscraper',
 | 
				
			||||||
 | 
					                               ['job_name'])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def parse_metrics(self, builds):
 | 
				
			||||||
 | 
					        self.job_count.labels('summary').inc(len(builds))
 | 
				
			||||||
 | 
					        for build in builds:
 | 
				
			||||||
 | 
					            self.job_count.labels(build['job_name']).inc()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
###############################################################################
 | 
					###############################################################################
 | 
				
			||||||
#                             Log Processing                                  #
 | 
					#                             Log Processing                                  #
 | 
				
			||||||
###############################################################################
 | 
					###############################################################################
 | 
				
			||||||
@@ -650,7 +668,7 @@ def check_connection(logstash_url):
 | 
				
			|||||||
        return s.connect_ex((host, port)) == 0
 | 
					        return s.connect_ex((host, port)) == 0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def run_scraping(args, zuul_api_url, job_name=None):
 | 
					def run_scraping(args, zuul_api_url, job_name=None, monitoring=None):
 | 
				
			||||||
    """Get latest job results and push them into log processing service.
 | 
					    """Get latest job results and push them into log processing service.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    On the end, write build_cache file, so in the future
 | 
					    On the end, write build_cache file, so in the future
 | 
				
			||||||
@@ -684,8 +702,11 @@ def run_scraping(args, zuul_api_url, job_name=None):
 | 
				
			|||||||
        finally:
 | 
					        finally:
 | 
				
			||||||
            config.save()
 | 
					            config.save()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if monitoring:
 | 
				
			||||||
 | 
					        monitoring.parse_metrics(builds)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def run(args):
 | 
					
 | 
				
			||||||
 | 
					def run(args, monitoring):
 | 
				
			||||||
    if args.ca_file:
 | 
					    if args.ca_file:
 | 
				
			||||||
        validate_ca = args.ca_file
 | 
					        validate_ca = args.ca_file
 | 
				
			||||||
    else:
 | 
					    else:
 | 
				
			||||||
@@ -700,10 +721,10 @@ def run(args):
 | 
				
			|||||||
            for job_name in jobs_in_zuul:
 | 
					            for job_name in jobs_in_zuul:
 | 
				
			||||||
                logging.info("Starting checking logs for job %s in %s" % (
 | 
					                logging.info("Starting checking logs for job %s in %s" % (
 | 
				
			||||||
                    job_name, zuul_api_url))
 | 
					                    job_name, zuul_api_url))
 | 
				
			||||||
                run_scraping(args, zuul_api_url, job_name)
 | 
					                run_scraping(args, zuul_api_url, job_name, monitoring)
 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
            logging.info("Starting checking logs for %s" % zuul_api_url)
 | 
					            logging.info("Starting checking logs for %s" % zuul_api_url)
 | 
				
			||||||
            run_scraping(args, zuul_api_url)
 | 
					            run_scraping(args, zuul_api_url, monitoring=monitoring)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def main():
 | 
					def main():
 | 
				
			||||||
@@ -712,12 +733,19 @@ def main():
 | 
				
			|||||||
    args = parse_args(app_args, config_args)
 | 
					    args = parse_args(app_args, config_args)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    setup_logging(args.debug)
 | 
					    setup_logging(args.debug)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    monitoring = None
 | 
				
			||||||
 | 
					    if args.monitoring_port:
 | 
				
			||||||
 | 
					        monitoring = Monitoring()
 | 
				
			||||||
 | 
					        start_http_server(args.monitoring_port)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if args.download and args.gearman_server and args.gearman_port:
 | 
					    if args.download and args.gearman_server and args.gearman_port:
 | 
				
			||||||
        logging.critical("Can not use logscraper to send logs to gearman "
 | 
					        logging.critical("Can not use logscraper to send logs to gearman "
 | 
				
			||||||
                         "and download logs. Choose one")
 | 
					                         "and download logs. Choose one")
 | 
				
			||||||
        sys.exit(1)
 | 
					        sys.exit(1)
 | 
				
			||||||
    while True:
 | 
					    while True:
 | 
				
			||||||
        run(args)
 | 
					        run(args, monitoring)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if not args.follow:
 | 
					        if not args.follow:
 | 
				
			||||||
            break
 | 
					            break
 | 
				
			||||||
        time.sleep(args.wait_time)
 | 
					        time.sleep(args.wait_time)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -12,6 +12,7 @@ directory: /tmp/logscraper
 | 
				
			|||||||
wait_time: 120
 | 
					wait_time: 120
 | 
				
			||||||
insecure: false
 | 
					insecure: false
 | 
				
			||||||
ca_file: ""
 | 
					ca_file: ""
 | 
				
			||||||
 | 
					monitoring_port: 9128
 | 
				
			||||||
#deprecated
 | 
					#deprecated
 | 
				
			||||||
gearman_server: ""
 | 
					gearman_server: ""
 | 
				
			||||||
gearman_port: 4730
 | 
					gearman_port: 4730
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -150,7 +150,7 @@ class FakeArgs(object):
 | 
				
			|||||||
                 logstash_url=None, workers=None, max_skipped=None,
 | 
					                 logstash_url=None, workers=None, max_skipped=None,
 | 
				
			||||||
                 job_name=None, download=None, directory=None,
 | 
					                 job_name=None, download=None, directory=None,
 | 
				
			||||||
                 config=None, wait_time=None, ca_file=None,
 | 
					                 config=None, wait_time=None, ca_file=None,
 | 
				
			||||||
                 file_list=None):
 | 
					                 file_list=None, monitoring_port=None):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.zuul_api_url = zuul_api_url
 | 
					        self.zuul_api_url = zuul_api_url
 | 
				
			||||||
        self.gearman_server = gearman_server
 | 
					        self.gearman_server = gearman_server
 | 
				
			||||||
@@ -169,6 +169,7 @@ class FakeArgs(object):
 | 
				
			|||||||
        self.wait_time = wait_time
 | 
					        self.wait_time = wait_time
 | 
				
			||||||
        self.ca_file = ca_file
 | 
					        self.ca_file = ca_file
 | 
				
			||||||
        self.file_list = file_list
 | 
					        self.file_list = file_list
 | 
				
			||||||
 | 
					        self.monitoring_port = monitoring_port
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class TestScraper(base.TestCase):
 | 
					class TestScraper(base.TestCase):
 | 
				
			||||||
@@ -215,10 +216,11 @@ class TestScraper(base.TestCase):
 | 
				
			|||||||
            'http://somehost.com/api/tenant/tenant1', job_names, False)
 | 
					            'http://somehost.com/api/tenant/tenant1', job_names, False)
 | 
				
			||||||
        self.assertEqual(['openstack-tox-py38'], result)
 | 
					        self.assertEqual(['openstack-tox-py38'], result)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @mock.patch('logscraper.logscraper.Monitoring')
 | 
				
			||||||
    @mock.patch('logscraper.logscraper.filter_available_jobs',
 | 
					    @mock.patch('logscraper.logscraper.filter_available_jobs',
 | 
				
			||||||
                side_effect=[['testjob1', 'testjob2'], [], []])
 | 
					                side_effect=[['testjob1', 'testjob2'], [], []])
 | 
				
			||||||
    @mock.patch('logscraper.logscraper.run_scraping')
 | 
					    @mock.patch('logscraper.logscraper.run_scraping')
 | 
				
			||||||
    def test_run_with_jobs(self, mock_scraping, mock_jobs):
 | 
					    def test_run_with_jobs(self, mock_scraping, mock_jobs, mock_monitoring):
 | 
				
			||||||
        # when multiple job name provied, its iterate on zuul jobs
 | 
					        # when multiple job name provied, its iterate on zuul jobs
 | 
				
			||||||
        # if such job is available.
 | 
					        # if such job is available.
 | 
				
			||||||
        with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
 | 
					        with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
 | 
				
			||||||
@@ -229,7 +231,7 @@ class TestScraper(base.TestCase):
 | 
				
			|||||||
                gearman_server='localhost',
 | 
					                gearman_server='localhost',
 | 
				
			||||||
                job_name=['testjob1', 'testjob2'])
 | 
					                job_name=['testjob1', 'testjob2'])
 | 
				
			||||||
            args = logscraper.get_arguments()
 | 
					            args = logscraper.get_arguments()
 | 
				
			||||||
            logscraper.run(args)
 | 
					            logscraper.run(args, mock_monitoring)
 | 
				
			||||||
            self.assertEqual(2, mock_scraping.call_count)
 | 
					            self.assertEqual(2, mock_scraping.call_count)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @mock.patch('socket.socket')
 | 
					    @mock.patch('socket.socket')
 | 
				
			||||||
@@ -315,8 +317,9 @@ class TestScraper(base.TestCase):
 | 
				
			|||||||
                                 mock_specified_files.call_args.args[0])
 | 
					                                 mock_specified_files.call_args.args[0])
 | 
				
			||||||
                self.assertFalse(mock_save_buildinfo.called)
 | 
					                self.assertFalse(mock_save_buildinfo.called)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @mock.patch('logscraper.logscraper.Monitoring')
 | 
				
			||||||
    @mock.patch('logscraper.logscraper.run_scraping')
 | 
					    @mock.patch('logscraper.logscraper.run_scraping')
 | 
				
			||||||
    def test_run(self, mock_scraping):
 | 
					    def test_run(self, mock_scraping, mock_monitoring):
 | 
				
			||||||
        with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
 | 
					        with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
 | 
				
			||||||
            mock_args.return_value = FakeArgs(
 | 
					            mock_args.return_value = FakeArgs(
 | 
				
			||||||
                zuul_api_url=['http://somehost.com/api/tenant/tenant1',
 | 
					                zuul_api_url=['http://somehost.com/api/tenant/tenant1',
 | 
				
			||||||
@@ -324,7 +327,7 @@ class TestScraper(base.TestCase):
 | 
				
			|||||||
                              'http://somehost.com/api/tenant/tenant3'],
 | 
					                              'http://somehost.com/api/tenant/tenant3'],
 | 
				
			||||||
                gearman_server='localhost')
 | 
					                gearman_server='localhost')
 | 
				
			||||||
            args = logscraper.get_arguments()
 | 
					            args = logscraper.get_arguments()
 | 
				
			||||||
            logscraper.run(args)
 | 
					            logscraper.run(args, mock_monitoring)
 | 
				
			||||||
            self.assertEqual(3, mock_scraping.call_count)
 | 
					            self.assertEqual(3, mock_scraping.call_count)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @mock.patch('logscraper.logscraper.load_config')
 | 
					    @mock.patch('logscraper.logscraper.load_config')
 | 
				
			||||||
@@ -362,6 +365,45 @@ class TestScraper(base.TestCase):
 | 
				
			|||||||
                             mock_specified_files.call_args.args[0])
 | 
					                             mock_specified_files.call_args.args[0])
 | 
				
			||||||
            self.assertTrue(mock_save_buildinfo.called)
 | 
					            self.assertTrue(mock_save_buildinfo.called)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @mock.patch('logscraper.logscraper.load_config')
 | 
				
			||||||
 | 
					    @mock.patch('logscraper.logscraper.save_build_info')
 | 
				
			||||||
 | 
					    @mock.patch('logscraper.logscraper.check_specified_files')
 | 
				
			||||||
 | 
					    @mock.patch('builtins.open', new_callable=mock.mock_open())
 | 
				
			||||||
 | 
					    @mock.patch('os.path.isfile')
 | 
				
			||||||
 | 
					    @mock.patch('logscraper.logscraper.check_specified_files',
 | 
				
			||||||
 | 
					                return_value=['job-output.txt'])
 | 
				
			||||||
 | 
					    @mock.patch('logscraper.logscraper.LogMatcher.submitJobs')
 | 
				
			||||||
 | 
					    @mock.patch('argparse.ArgumentParser.parse_args',
 | 
				
			||||||
 | 
					                return_value=FakeArgs(
 | 
				
			||||||
 | 
					                    zuul_api_url=['http://somehost.com/api/tenant/tenant1'],
 | 
				
			||||||
 | 
					                    workers=1, download=True, directory="/tmp/testdir"))
 | 
				
			||||||
 | 
					    def test_run_scraping_monitoring(self, mock_args, mock_submit, mock_files,
 | 
				
			||||||
 | 
					                                     mock_isfile, mock_readfile,
 | 
				
			||||||
 | 
					                                     mock_specified_files, mock_save_buildinfo,
 | 
				
			||||||
 | 
					                                     mock_config):
 | 
				
			||||||
 | 
					        with mock.patch('logscraper.logscraper.get_last_job_results'
 | 
				
			||||||
 | 
					                        ) as mock_job_results:
 | 
				
			||||||
 | 
					            with mock.patch(
 | 
				
			||||||
 | 
					                    'multiprocessing.pool.Pool.map_async',
 | 
				
			||||||
 | 
					                    lambda self, func, iterable, chunksize=None, callback=None,
 | 
				
			||||||
 | 
					                    error_callback=None: _MockedPoolMapAsyncResult(
 | 
				
			||||||
 | 
					                        func, iterable),
 | 
				
			||||||
 | 
					            ):
 | 
				
			||||||
 | 
					                args = logscraper.get_arguments()
 | 
				
			||||||
 | 
					                mock_job_results.return_value = [builds_result[0]]
 | 
				
			||||||
 | 
					                monitoring = logscraper.Monitoring()
 | 
				
			||||||
 | 
					                logscraper.run_scraping(
 | 
				
			||||||
 | 
					                    args, 'http://somehost.com/api/tenant/tenant1',
 | 
				
			||||||
 | 
					                    monitoring=monitoring)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            self.assertEqual('job_name', monitoring.job_count._labelnames[0])
 | 
				
			||||||
 | 
					            self.assertEqual(2, len(monitoring.job_count._metrics))
 | 
				
			||||||
 | 
					            self.assertFalse(mock_submit.called)
 | 
				
			||||||
 | 
					            self.assertTrue(mock_specified_files.called)
 | 
				
			||||||
 | 
					            self.assertEqual(builds_result[0],
 | 
				
			||||||
 | 
					                             mock_specified_files.call_args.args[0])
 | 
				
			||||||
 | 
					            self.assertTrue(mock_save_buildinfo.called)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @mock.patch('logscraper.logscraper.create_custom_result')
 | 
					    @mock.patch('logscraper.logscraper.create_custom_result')
 | 
				
			||||||
    @mock.patch('logscraper.logscraper.check_specified_files')
 | 
					    @mock.patch('logscraper.logscraper.check_specified_files')
 | 
				
			||||||
    @mock.patch('logscraper.logscraper.LogMatcher.submitJobs')
 | 
					    @mock.patch('logscraper.logscraper.LogMatcher.submitJobs')
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -5,3 +5,4 @@ PyYAML<6.1       # MIT
 | 
				
			|||||||
tenacity
 | 
					tenacity
 | 
				
			||||||
opensearch-py<=1.0.0   # Apache-2.0
 | 
					opensearch-py<=1.0.0   # Apache-2.0
 | 
				
			||||||
ruamel.yaml
 | 
					ruamel.yaml
 | 
				
			||||||
 | 
					prometheus_client
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user