From 8b4ed96fa7787552897a5cf46a07f1ba7ce4f1f6 Mon Sep 17 00:00:00 2001 From: Michael James Hoppal Date: Mon, 6 Jun 2016 09:31:14 -0600 Subject: [PATCH] Rework Vertica plugin Adding the metrics to monitor resource pools, resources and projections per a node. Metrics for Vertica are now: vertica.connection_status vertica.node_status vertica.projection.ros_count vertica.projection.tuple_mover_mergeouts vertica.projection.tuple_mover_moveouts vertica.projection.wos_used_bytes vertica.resource.disk_space_rejections vertica.resource.pool.memory_inuse_kb vertica.resource.pool.memory_size_actual_kb vertica.resource.pool.rejection_count vertica.resource.pool.running_query_count vertica.resource.request_queue_depth vertica.resource.resource_rejections vertica.resource.wos_used_bytes Change-Id: I6e30d524ca77f900f231b9a626bfa853e31dbeb1 --- conf.d/vertica.yaml.example | 1 + docs/Plugins.md | 15 ++- monasca_agent/collector/checks_d/vertica.py | 137 +++++++++++++++++--- monasca_agent/common/util.py | 26 ++-- monasca_setup/detection/plugins/vertica.py | 83 ++++-------- 5 files changed, 172 insertions(+), 90 deletions(-) diff --git a/conf.d/vertica.yaml.example b/conf.d/vertica.yaml.example index b68c873b..d0179c92 100644 --- a/conf.d/vertica.yaml.example +++ b/conf.d/vertica.yaml.example @@ -6,5 +6,6 @@ instances: # - name: localhost # user: username # password: my_password +# node_name: v_mon_node0001 # service: monasca # Optional # timeout: 3 # Optional (secs) diff --git a/docs/Plugins.md b/docs/Plugins.md index ed49a76b..2e18f89a 100644 --- a/docs/Plugins.md +++ b/docs/Plugins.md @@ -1432,7 +1432,20 @@ instances: | Metric Name | Dimensions | Semantics | | ----------- | ---------- | --------- | -| vertica.db.connection_status | hostname, service=vertica | Value of DB connection status (0=Healthy). +| vertica.connection_status | hostname, node_name, service=vertica | Value of DB connection status (0=Healthy). | +| vertica.node_status | hostname, node_name, service=vertica| Status of node connection (0=UP). | +| vertica.projection.ros_count | hostname, node_name, projection_name, service=vertica| The number of ROS containers in the projection. | +| vertica.projection.tuple_mover_mergeouts | hostname, node_name, projection_name, service=vertica | Number of current tuple mover mergeouts on this projection. | +| vertica.projection.tuple_mover_moveouts | hostname, node_name, projection_name, service=vertica | Number of current tuple mover moveout on this projection. | +| vertica.projection.wos_used_bytes | hostname, node_name, projection_name, service=vertica | The number of WOS bytes in the projection.). | +| vertica.resource.disk_space_rejections | hostname, node_name, service=vertica | The number of rejected disk write requests. | +| vertica.resource.pool.memory_inuse_kb | hostname, node_name, resource_pool, service=vertica | Amount of memory, in kilobytes, acquired by requests running against this pool. | +| vertica.resource.pool.memory_size_actual_kb | hostname, node_name, resource_pool, service=vertica | Current amount of memory, in kilobytes, allocated to the pool by the resource manager. | +| vertica.resource.pool.rejection_count | hostname, node_name, resource_pool, service=vertica | Number of resource rejections for this pool | +| vertica.resource.pool.running_query_count | hostname, node_name, resource_pool, service=vertica | Number of queries actually running using this pool. | +| vertica.resource.request_queue_depth | hostname, node_name, service=vertica | The cumulative number of requests for threads, file handles, and memory. | +| vertica.resource.resource_rejections | hostname, node_name, service=vertica | The number of rejected plan requests. | +| vertica.resource.wos_used_bytes | hostname, node_name, service=vertica | The size of the WOS in bytes. | ## Win32 Event Log diff --git a/monasca_agent/collector/checks_d/vertica.py b/monasca_agent/collector/checks_d/vertica.py index 4eaca9e9..99c4544a 100644 --- a/monasca_agent/collector/checks_d/vertica.py +++ b/monasca_agent/collector/checks_d/vertica.py @@ -3,7 +3,40 @@ import monasca_agent.collector.checks as checks from monasca_agent.common.util import timeout_command -VSQL_PATH = '/opt/vertica/bin/vsql' +NODE_METRICS_QUERY = "SELECT node_state " \ + "FROM NODES " \ + "WHERE node_name = '{0}';" + +RESOURCE_METRICS_QUERY = "SELECT COALESCE(request_queue_depth, 0) request_queue_depth, " \ + "wos_used_bytes, " \ + "COALESCE(resource_request_reject_count, 0) resource_rejections, " \ + "COALESCE(disk_space_request_reject_count, 0) disk_space_rejections " \ + "FROM resource_usage " \ + "WHERE node_name = '{0}';" + +PROJECTION_METRICS_QUERY = "SELECT projection_name, wos_used_bytes, ros_count, " \ + "COALESCE(tuple_mover_moveouts, 0) tuple_mover_moveouts, " \ + "COALESCE(tuple_mover_mergeouts, 0) tuple_mover_mergeouts " \ + "FROM projection_storage " \ + "LEFT JOIN (SELECT projection_id, " \ + "SUM(case when operation_name = 'Moveout' then 1 else 0 end) tuple_mover_moveouts, " \ + "SUM(case when operation_name = 'Mergeout' then 1 else 0 end) tuple_mover_mergeouts " \ + "FROM tuple_mover_operations " \ + "WHERE node_name = '{0}' and is_executing = 't' " \ + "GROUP BY projection_id) tm " \ + "ON projection_storage.projection_id = tm.projection_id " \ + "WHERE node_name = '{0}';" + +RESOURCE_POOL_METRICS_QUERY = "SELECT pool_name, memory_size_actual_kb, memory_inuse_kb, running_query_count, " \ + "COALESCE(rejection_count, 0) rejection_count " \ + "FROM resource_pool_status " \ + "LEFT JOIN (" \ + "SELECT pool_id, COUNT(*) rejection_count " \ + "FROM resource_rejections " \ + "WHERE node_name = '{0}' " \ + "GROUP BY pool_id) rj " \ + "ON resource_pool_status.POOL_OID = rj.POOL_ID " \ + "WHERE node_name = '{0}'" class Vertica(checks.AgentCheck): @@ -13,26 +46,98 @@ class Vertica(checks.AgentCheck): @staticmethod def _get_config(instance): - user = instance.get('user', 'mon_api') - password = instance.get('password', 'password') - service = instance.get('service', '') - timeout = int(instance.get('timeout', 3)) + user = instance.get('user') + password = instance.get('password') + service = instance.get('service') + node_name = instance.get('node_name') + timeout = int(instance.get('timeout')) - return user, password, service, timeout + return user, password, service, node_name, timeout def check(self, instance): - user, password, service, timeout = self._get_config(instance) + user, password, service, node_name, timeout = self._get_config(instance) dimensions = self._set_dimensions({'component': 'vertica', 'service': service}, instance) + query = self._build_query(node_name) - value = self._connect_health(user, password, timeout) - self.gauge('vertica.db.connection_status', value, dimensions=dimensions) + results, connection_status = self._query_database(user, password, timeout, query) - def _connect_health(self, user, password, timeout): - output = timeout_command( - [VSQL_PATH, "-U", user, "-w", password, "-c", "select version();"], timeout) - if (output is not None) and ('Vertica Analytic Database' in output): - # healthy - return 0 + if connection_status != 0: + self.gauge('vertica.db.connection_status', 1, dimensions=dimensions) else: - return 1 + results = results.split('\n') + self._report_node_status(results[0], dimensions) + + self._report_resource_metrics(results[1], dimensions) + + self._report_projection_metrics(results[2], dimensions) + + self._report_resource_pool_metrics(results[3], dimensions) + + def _query_database(self, user, password, timeout, query): + stdout, stderr, return_code = timeout_command(["/opt/vertica/bin/vsql", "-U", user, "-w", password, "-A", "-R", + "|", "-t", "-F", ",", "-x"], timeout, command_input=query) + if return_code == 0: + # remove trailing newline + stdout = stdout.rstrip() + return stdout, 0 + else: + self.log.error("Error querying vertica with return code of {0} and error {1}".format(return_code, stderr)) + return stderr, 1 + + def _build_query(self, node_name): + query = '' + query += NODE_METRICS_QUERY.format(node_name) + query += RESOURCE_METRICS_QUERY.format(node_name) + query += PROJECTION_METRICS_QUERY.format(node_name) + query += RESOURCE_POOL_METRICS_QUERY.format(node_name) + return query + + def _results_to_dict(self, results): + return [dict(entry.split(',') for entry in dictionary.split('|')) for dictionary in results.split('||')] + + def _report_node_status(self, results, dimensions): + result = self._results_to_dict(results) + node_status = result[0]['node_state'] + status_metric = 0 if node_status == 'UP' else 1 + self.gauge('vertica.node_status', status_metric, dimensions=dimensions, value_meta=result[0]) + self.gauge('vertica.connection_status', 0, dimensions=dimensions) + + def _report_projection_metrics(self, results, dimensions): + results = self._results_to_dict(results) + projection_metric_name = 'vertica.projection.' + for result in results: + projection_dimensions = dimensions.copy() + projection_dimensions['projection_name'] = result['projection_name'] + self.gauge(projection_metric_name + 'wos_used_bytes', int(result['wos_used_bytes']), + dimensions=projection_dimensions) + self.gauge(projection_metric_name + 'ros_count', int(result['ros_count']), dimensions=projection_dimensions) + self.rate(projection_metric_name + 'tuple_mover_moveouts', int(result['tuple_mover_moveouts']), + dimensions=projection_dimensions) + self.rate(projection_metric_name + 'tuple_mover_mergeouts', int(result['tuple_mover_mergeouts']), + dimensions=projection_dimensions) + + def _report_resource_metrics(self, results, dimensions): + results = self._results_to_dict(results) + resource_metric_name = 'vertica.resource.' + resource_metrics = results[0] + for metric_name, metric_value in resource_metrics.iteritems(): + if metric_name in ['resource_rejections', 'disk_space_rejections']: + self.rate(resource_metric_name + metric_name, int(metric_value), dimensions=dimensions) + else: + self.gauge(resource_metric_name + metric_name, int(metric_value), dimensions=dimensions) + + def _report_resource_pool_metrics(self, results, dimensions): + results = self._results_to_dict(results) + resource_pool_metric_name = 'vertica.resource.pool.' + for result in results: + resource_pool_dimensions = dimensions.copy() + resource_pool_dimensions['resource_pool'] = result['pool_name'] + self.gauge(resource_pool_metric_name + 'memory_size_actual_kb', int(result['memory_size_actual_kb']), + dimensions=resource_pool_dimensions) + self.gauge(resource_pool_metric_name + 'memory_inuse_kb', int(result['memory_inuse_kb']), + dimensions=resource_pool_dimensions) + self.gauge(resource_pool_metric_name + 'running_query_count', int(result['running_query_count']), + dimensions=resource_pool_dimensions) + self.rate(resource_pool_metric_name + 'rejection_count', int(result['rejection_count']), + dimensions=resource_pool_dimensions) diff --git a/monasca_agent/common/util.py b/monasca_agent/common/util.py index 4c409d52..0dd38889 100644 --- a/monasca_agent/common/util.py +++ b/monasca_agent/common/util.py @@ -1,6 +1,5 @@ # (C) Copyright 2015-2016 Hewlett Packard Enterprise Development Company LP -import datetime import glob import hashlib import imp @@ -11,11 +10,11 @@ import optparse import os import platform import re -import signal import socket import subprocess import sys import tempfile +import threading import time import traceback import uuid @@ -294,19 +293,18 @@ def get_uuid(): return uuid.uuid5(uuid.NAMESPACE_DNS, platform.node() + str(uuid.getnode())).hex -def timeout_command(command, timeout): - # call shell-command with timeout (in seconds). +def timeout_command(command, timeout, command_input=None): + # call shell-command with timeout (in seconds) and stdinput for the command (optional) # returns None if timeout or the command output. - start = datetime.datetime.now() - process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - while process.poll() is None: - time.sleep(0.1) - now = datetime.datetime.now() - if (now - start).seconds > timeout: - os.kill(process.pid, signal.SIGKILL) - os.waitpid(-1, os.WNOHANG) - return None - return process.stdout.read() + process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) + command_timer = threading.Timer(timeout, process.kill) + try: + command_timer.start() + stdout, stderr = process.communicate(input=command_input.encode() if command_input else None) + return_code = process.returncode + return stdout, stderr, return_code + finally: + command_timer.cancel() def get_os(): diff --git a/monasca_setup/detection/plugins/vertica.py b/monasca_setup/detection/plugins/vertica.py index 1da44659..d9956681 100644 --- a/monasca_setup/detection/plugins/vertica.py +++ b/monasca_setup/detection/plugins/vertica.py @@ -10,30 +10,22 @@ from monasca_setup.detection.utils import watch_process_by_username log = logging.getLogger(__name__) -VERTICA_CONF = '/root/.vertica.cnf' -VSQL_PATH = '/opt/vertica/bin/vsql' VERTICA_SERVICE = 'vertica' CONNECTION_TIMEOUT = 3 +SERVICE = 'vertica' +USER = 'monitor' +USER_PASSWORD = 'password' class Vertica(monasca_setup.detection.Plugin): """Detect Vertica process running and DB connection status - This plugin needs the Vertica username, password. - The other arguments are optional. - There are two ways to provide this, either by a file placed in - /root/.vertica.cnf or by passing the following arguments: - - user - - password - - service (optional) - - timeout (optional - timeout for connection attempt in seconds) - /root/.vertica.cnf in a format such as - [client] - user = user1 - password = yourpassword - service = monitoring - timeout = 3 + This plugin has the following options (each optional) that you can pass in via command line: + - user (optional - user to connect with) - Defaults to monitor user + - password (optional - password to use when connecting) - Defaults to password + - service (optional - dimensions service to be set for the metrics coming out of the plugin) + - timeout (optional - timeout for vertica connection in seconds) - Defaults to 3 second """ def _detect(self): @@ -50,58 +42,33 @@ class Vertica(monasca_setup.detection.Plugin): """ # Set defaults and read config or use arguments if self.args is None: - self.user = 'mon_api' - self.password = 'password' + self.user = USER + self.password = USER_PASSWORD self.service = VERTICA_SERVICE self.timeout = CONNECTION_TIMEOUT - - self._read_config(VERTICA_CONF) else: - self.user = self.args.get('user', 'mon_api') - self.password = self.args.get('password', 'password') + self.user = self.args.get('user', USER) + self.password = self.args.get('password', USER_PASSWORD) self.service = self.args.get('service', VERTICA_SERVICE) - self.timeout = self.args.get('timeout', CONNECTION_TIMEOUT) + self.timeout = int(self.args.get('timeout', CONNECTION_TIMEOUT)) def _connection_test(self): """Attempt to connect to Vertica DB to verify credentials. :return: bool status of the test """ log.info("\tVertica connection test.") - output = timeout_command( - [VSQL_PATH, "-U", self.user, "-w", self.password, "-c", "select version();"], self.timeout) - if (output is not None) and ('Vertica Analytic Database' in output): + stdout, stderr, return_code = timeout_command( + ["/opt/vertica/bin/vsql", "-U", self.user, "-w", self.password, "-t", "-A", "-c", + "SELECT node_name FROM current_session"], self.timeout) + # remove trailing newline + stdout = stdout.rstrip() + if return_code == 0: + self.node_name = stdout return True else: + log.error("Error querying vertica with return code of {0} and the error {1}".format(return_code, stderr)) return False - def _read_config(self, config_file): - """Read the configuration setting member variables as appropriate. - :param config_file: The filename of the configuration to read and parse - """ - # Read the Vertica config file to extract the needed variables. - client_section = False - try: - with open(config_file, "r") as conf: - for row in conf: - if "[client]" in row: - client_section = True - log.info("\tUsing client credentials from {:s}".format(config_file)) - continue - if client_section: - if "user" in row: - self.user = row.split("=")[1].strip() - if "password" in row: - self.password = row.split("=")[1].strip() - if "vsql_path" in row: - self.vsql_path = row.split("=")[1].strip() - if "service" in row: - self.service = row.split("=")[1].strip() - if "timeout" in row: - self.timeout = int(row.split("=")[1].strip()) - except IOError: - log.warn('Unable to open Vertica config file {0}. ' - 'Using default credentials to try to connect.'.format(VERTICA_CONF)) - def build_config(self): """Build the config as a Plugins object and return. @@ -117,6 +84,7 @@ class Vertica(monasca_setup.detection.Plugin): 'user': self.user, 'password': self.password, 'service': self.service, + 'node_name': self.node_name, 'timeout': self.timeout} config['vertica'] = {'init_config': None, 'instances': [instance_config]} else: @@ -125,12 +93,9 @@ class Vertica(monasca_setup.detection.Plugin): 'Please correct and re-run monasca-setup.' log.error(exception_msg) raise Exception(exception_msg) - except Exception: - exception_msg = 'Error configuring the Vertica check plugin' + except Exception as e: + exception_msg = 'Error configuring the Vertica check plugin - {0}'.format(e) log.error(exception_msg) raise Exception(exception_msg) return config - - def dependencies_installed(self): - return True