Rework Vertica plugin

Adding the metrics to monitor resource pools, resources
and projections per a node.

Metrics for Vertica are now:

vertica.connection_status
vertica.node_status
vertica.projection.ros_count
vertica.projection.tuple_mover_mergeouts
vertica.projection.tuple_mover_moveouts
vertica.projection.wos_used_bytes
vertica.resource.disk_space_rejections
vertica.resource.pool.memory_inuse_kb
vertica.resource.pool.memory_size_actual_kb
vertica.resource.pool.rejection_count
vertica.resource.pool.running_query_count
vertica.resource.request_queue_depth
vertica.resource.resource_rejections
vertica.resource.wos_used_bytes

Change-Id: I6e30d524ca77f900f231b9a626bfa853e31dbeb1
This commit is contained in:
Michael James Hoppal 2016-06-06 09:31:14 -06:00
parent f98e87a81e
commit 8b4ed96fa7
5 changed files with 172 additions and 90 deletions

View File

@ -6,5 +6,6 @@ instances:
# - name: localhost # - name: localhost
# user: username # user: username
# password: my_password # password: my_password
# node_name: v_mon_node0001
# service: monasca # Optional # service: monasca # Optional
# timeout: 3 # Optional (secs) # timeout: 3 # Optional (secs)

View File

@ -1432,7 +1432,20 @@ instances:
| Metric Name | Dimensions | Semantics | | Metric Name | Dimensions | Semantics |
| ----------- | ---------- | --------- | | ----------- | ---------- | --------- |
| vertica.db.connection_status | hostname, service=vertica | Value of DB connection status (0=Healthy). | vertica.connection_status | hostname, node_name, service=vertica | Value of DB connection status (0=Healthy). |
| vertica.node_status | hostname, node_name, service=vertica| Status of node connection (0=UP). |
| vertica.projection.ros_count | hostname, node_name, projection_name, service=vertica| The number of ROS containers in the projection. |
| vertica.projection.tuple_mover_mergeouts | hostname, node_name, projection_name, service=vertica | Number of current tuple mover mergeouts on this projection. |
| vertica.projection.tuple_mover_moveouts | hostname, node_name, projection_name, service=vertica | Number of current tuple mover moveout on this projection. |
| vertica.projection.wos_used_bytes | hostname, node_name, projection_name, service=vertica | The number of WOS bytes in the projection.). |
| vertica.resource.disk_space_rejections | hostname, node_name, service=vertica | The number of rejected disk write requests. |
| vertica.resource.pool.memory_inuse_kb | hostname, node_name, resource_pool, service=vertica | Amount of memory, in kilobytes, acquired by requests running against this pool. |
| vertica.resource.pool.memory_size_actual_kb | hostname, node_name, resource_pool, service=vertica | Current amount of memory, in kilobytes, allocated to the pool by the resource manager. |
| vertica.resource.pool.rejection_count | hostname, node_name, resource_pool, service=vertica | Number of resource rejections for this pool |
| vertica.resource.pool.running_query_count | hostname, node_name, resource_pool, service=vertica | Number of queries actually running using this pool. |
| vertica.resource.request_queue_depth | hostname, node_name, service=vertica | The cumulative number of requests for threads, file handles, and memory. |
| vertica.resource.resource_rejections | hostname, node_name, service=vertica | The number of rejected plan requests. |
| vertica.resource.wos_used_bytes | hostname, node_name, service=vertica | The size of the WOS in bytes. |
## Win32 Event Log ## Win32 Event Log

View File

@ -3,7 +3,40 @@
import monasca_agent.collector.checks as checks import monasca_agent.collector.checks as checks
from monasca_agent.common.util import timeout_command from monasca_agent.common.util import timeout_command
VSQL_PATH = '/opt/vertica/bin/vsql' NODE_METRICS_QUERY = "SELECT node_state " \
"FROM NODES " \
"WHERE node_name = '{0}';"
RESOURCE_METRICS_QUERY = "SELECT COALESCE(request_queue_depth, 0) request_queue_depth, " \
"wos_used_bytes, " \
"COALESCE(resource_request_reject_count, 0) resource_rejections, " \
"COALESCE(disk_space_request_reject_count, 0) disk_space_rejections " \
"FROM resource_usage " \
"WHERE node_name = '{0}';"
PROJECTION_METRICS_QUERY = "SELECT projection_name, wos_used_bytes, ros_count, " \
"COALESCE(tuple_mover_moveouts, 0) tuple_mover_moveouts, " \
"COALESCE(tuple_mover_mergeouts, 0) tuple_mover_mergeouts " \
"FROM projection_storage " \
"LEFT JOIN (SELECT projection_id, " \
"SUM(case when operation_name = 'Moveout' then 1 else 0 end) tuple_mover_moveouts, " \
"SUM(case when operation_name = 'Mergeout' then 1 else 0 end) tuple_mover_mergeouts " \
"FROM tuple_mover_operations " \
"WHERE node_name = '{0}' and is_executing = 't' " \
"GROUP BY projection_id) tm " \
"ON projection_storage.projection_id = tm.projection_id " \
"WHERE node_name = '{0}';"
RESOURCE_POOL_METRICS_QUERY = "SELECT pool_name, memory_size_actual_kb, memory_inuse_kb, running_query_count, " \
"COALESCE(rejection_count, 0) rejection_count " \
"FROM resource_pool_status " \
"LEFT JOIN (" \
"SELECT pool_id, COUNT(*) rejection_count " \
"FROM resource_rejections " \
"WHERE node_name = '{0}' " \
"GROUP BY pool_id) rj " \
"ON resource_pool_status.POOL_OID = rj.POOL_ID " \
"WHERE node_name = '{0}'"
class Vertica(checks.AgentCheck): class Vertica(checks.AgentCheck):
@ -13,26 +46,98 @@ class Vertica(checks.AgentCheck):
@staticmethod @staticmethod
def _get_config(instance): def _get_config(instance):
user = instance.get('user', 'mon_api') user = instance.get('user')
password = instance.get('password', 'password') password = instance.get('password')
service = instance.get('service', '') service = instance.get('service')
timeout = int(instance.get('timeout', 3)) node_name = instance.get('node_name')
timeout = int(instance.get('timeout'))
return user, password, service, timeout return user, password, service, node_name, timeout
def check(self, instance): def check(self, instance):
user, password, service, timeout = self._get_config(instance) user, password, service, node_name, timeout = self._get_config(instance)
dimensions = self._set_dimensions({'component': 'vertica', 'service': service}, instance) dimensions = self._set_dimensions({'component': 'vertica', 'service': service}, instance)
query = self._build_query(node_name)
value = self._connect_health(user, password, timeout) results, connection_status = self._query_database(user, password, timeout, query)
self.gauge('vertica.db.connection_status', value, dimensions=dimensions)
def _connect_health(self, user, password, timeout): if connection_status != 0:
output = timeout_command( self.gauge('vertica.db.connection_status', 1, dimensions=dimensions)
[VSQL_PATH, "-U", user, "-w", password, "-c", "select version();"], timeout)
if (output is not None) and ('Vertica Analytic Database' in output):
# healthy
return 0
else: else:
return 1 results = results.split('\n')
self._report_node_status(results[0], dimensions)
self._report_resource_metrics(results[1], dimensions)
self._report_projection_metrics(results[2], dimensions)
self._report_resource_pool_metrics(results[3], dimensions)
def _query_database(self, user, password, timeout, query):
stdout, stderr, return_code = timeout_command(["/opt/vertica/bin/vsql", "-U", user, "-w", password, "-A", "-R",
"|", "-t", "-F", ",", "-x"], timeout, command_input=query)
if return_code == 0:
# remove trailing newline
stdout = stdout.rstrip()
return stdout, 0
else:
self.log.error("Error querying vertica with return code of {0} and error {1}".format(return_code, stderr))
return stderr, 1
def _build_query(self, node_name):
query = ''
query += NODE_METRICS_QUERY.format(node_name)
query += RESOURCE_METRICS_QUERY.format(node_name)
query += PROJECTION_METRICS_QUERY.format(node_name)
query += RESOURCE_POOL_METRICS_QUERY.format(node_name)
return query
def _results_to_dict(self, results):
return [dict(entry.split(',') for entry in dictionary.split('|')) for dictionary in results.split('||')]
def _report_node_status(self, results, dimensions):
result = self._results_to_dict(results)
node_status = result[0]['node_state']
status_metric = 0 if node_status == 'UP' else 1
self.gauge('vertica.node_status', status_metric, dimensions=dimensions, value_meta=result[0])
self.gauge('vertica.connection_status', 0, dimensions=dimensions)
def _report_projection_metrics(self, results, dimensions):
results = self._results_to_dict(results)
projection_metric_name = 'vertica.projection.'
for result in results:
projection_dimensions = dimensions.copy()
projection_dimensions['projection_name'] = result['projection_name']
self.gauge(projection_metric_name + 'wos_used_bytes', int(result['wos_used_bytes']),
dimensions=projection_dimensions)
self.gauge(projection_metric_name + 'ros_count', int(result['ros_count']), dimensions=projection_dimensions)
self.rate(projection_metric_name + 'tuple_mover_moveouts', int(result['tuple_mover_moveouts']),
dimensions=projection_dimensions)
self.rate(projection_metric_name + 'tuple_mover_mergeouts', int(result['tuple_mover_mergeouts']),
dimensions=projection_dimensions)
def _report_resource_metrics(self, results, dimensions):
results = self._results_to_dict(results)
resource_metric_name = 'vertica.resource.'
resource_metrics = results[0]
for metric_name, metric_value in resource_metrics.iteritems():
if metric_name in ['resource_rejections', 'disk_space_rejections']:
self.rate(resource_metric_name + metric_name, int(metric_value), dimensions=dimensions)
else:
self.gauge(resource_metric_name + metric_name, int(metric_value), dimensions=dimensions)
def _report_resource_pool_metrics(self, results, dimensions):
results = self._results_to_dict(results)
resource_pool_metric_name = 'vertica.resource.pool.'
for result in results:
resource_pool_dimensions = dimensions.copy()
resource_pool_dimensions['resource_pool'] = result['pool_name']
self.gauge(resource_pool_metric_name + 'memory_size_actual_kb', int(result['memory_size_actual_kb']),
dimensions=resource_pool_dimensions)
self.gauge(resource_pool_metric_name + 'memory_inuse_kb', int(result['memory_inuse_kb']),
dimensions=resource_pool_dimensions)
self.gauge(resource_pool_metric_name + 'running_query_count', int(result['running_query_count']),
dimensions=resource_pool_dimensions)
self.rate(resource_pool_metric_name + 'rejection_count', int(result['rejection_count']),
dimensions=resource_pool_dimensions)

View File

@ -1,6 +1,5 @@
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development Company LP # (C) Copyright 2015-2016 Hewlett Packard Enterprise Development Company LP
import datetime
import glob import glob
import hashlib import hashlib
import imp import imp
@ -11,11 +10,11 @@ import optparse
import os import os
import platform import platform
import re import re
import signal
import socket import socket
import subprocess import subprocess
import sys import sys
import tempfile import tempfile
import threading
import time import time
import traceback import traceback
import uuid import uuid
@ -294,19 +293,18 @@ def get_uuid():
return uuid.uuid5(uuid.NAMESPACE_DNS, platform.node() + str(uuid.getnode())).hex return uuid.uuid5(uuid.NAMESPACE_DNS, platform.node() + str(uuid.getnode())).hex
def timeout_command(command, timeout): def timeout_command(command, timeout, command_input=None):
# call shell-command with timeout (in seconds). # call shell-command with timeout (in seconds) and stdinput for the command (optional)
# returns None if timeout or the command output. # returns None if timeout or the command output.
start = datetime.datetime.now() process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) command_timer = threading.Timer(timeout, process.kill)
while process.poll() is None: try:
time.sleep(0.1) command_timer.start()
now = datetime.datetime.now() stdout, stderr = process.communicate(input=command_input.encode() if command_input else None)
if (now - start).seconds > timeout: return_code = process.returncode
os.kill(process.pid, signal.SIGKILL) return stdout, stderr, return_code
os.waitpid(-1, os.WNOHANG) finally:
return None command_timer.cancel()
return process.stdout.read()
def get_os(): def get_os():

View File

@ -10,30 +10,22 @@ from monasca_setup.detection.utils import watch_process_by_username
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
VERTICA_CONF = '/root/.vertica.cnf'
VSQL_PATH = '/opt/vertica/bin/vsql'
VERTICA_SERVICE = 'vertica' VERTICA_SERVICE = 'vertica'
CONNECTION_TIMEOUT = 3 CONNECTION_TIMEOUT = 3
SERVICE = 'vertica'
USER = 'monitor'
USER_PASSWORD = 'password'
class Vertica(monasca_setup.detection.Plugin): class Vertica(monasca_setup.detection.Plugin):
"""Detect Vertica process running and DB connection status """Detect Vertica process running and DB connection status
This plugin needs the Vertica username, password. This plugin has the following options (each optional) that you can pass in via command line:
The other arguments are optional. - user (optional - user to connect with) - Defaults to monitor user
There are two ways to provide this, either by a file placed in - password (optional - password to use when connecting) - Defaults to password
/root/.vertica.cnf or by passing the following arguments: - service (optional - dimensions service to be set for the metrics coming out of the plugin)
- user - timeout (optional - timeout for vertica connection in seconds) - Defaults to 3 second
- password
- service (optional)
- timeout (optional - timeout for connection attempt in seconds)
/root/.vertica.cnf in a format such as
[client]
user = user1
password = yourpassword
service = monitoring
timeout = 3
""" """
def _detect(self): def _detect(self):
@ -50,58 +42,33 @@ class Vertica(monasca_setup.detection.Plugin):
""" """
# Set defaults and read config or use arguments # Set defaults and read config or use arguments
if self.args is None: if self.args is None:
self.user = 'mon_api' self.user = USER
self.password = 'password' self.password = USER_PASSWORD
self.service = VERTICA_SERVICE self.service = VERTICA_SERVICE
self.timeout = CONNECTION_TIMEOUT self.timeout = CONNECTION_TIMEOUT
self._read_config(VERTICA_CONF)
else: else:
self.user = self.args.get('user', 'mon_api') self.user = self.args.get('user', USER)
self.password = self.args.get('password', 'password') self.password = self.args.get('password', USER_PASSWORD)
self.service = self.args.get('service', VERTICA_SERVICE) self.service = self.args.get('service', VERTICA_SERVICE)
self.timeout = self.args.get('timeout', CONNECTION_TIMEOUT) self.timeout = int(self.args.get('timeout', CONNECTION_TIMEOUT))
def _connection_test(self): def _connection_test(self):
"""Attempt to connect to Vertica DB to verify credentials. """Attempt to connect to Vertica DB to verify credentials.
:return: bool status of the test :return: bool status of the test
""" """
log.info("\tVertica connection test.") log.info("\tVertica connection test.")
output = timeout_command( stdout, stderr, return_code = timeout_command(
[VSQL_PATH, "-U", self.user, "-w", self.password, "-c", "select version();"], self.timeout) ["/opt/vertica/bin/vsql", "-U", self.user, "-w", self.password, "-t", "-A", "-c",
if (output is not None) and ('Vertica Analytic Database' in output): "SELECT node_name FROM current_session"], self.timeout)
# remove trailing newline
stdout = stdout.rstrip()
if return_code == 0:
self.node_name = stdout
return True return True
else: else:
log.error("Error querying vertica with return code of {0} and the error {1}".format(return_code, stderr))
return False return False
def _read_config(self, config_file):
"""Read the configuration setting member variables as appropriate.
:param config_file: The filename of the configuration to read and parse
"""
# Read the Vertica config file to extract the needed variables.
client_section = False
try:
with open(config_file, "r") as conf:
for row in conf:
if "[client]" in row:
client_section = True
log.info("\tUsing client credentials from {:s}".format(config_file))
continue
if client_section:
if "user" in row:
self.user = row.split("=")[1].strip()
if "password" in row:
self.password = row.split("=")[1].strip()
if "vsql_path" in row:
self.vsql_path = row.split("=")[1].strip()
if "service" in row:
self.service = row.split("=")[1].strip()
if "timeout" in row:
self.timeout = int(row.split("=")[1].strip())
except IOError:
log.warn('Unable to open Vertica config file {0}. '
'Using default credentials to try to connect.'.format(VERTICA_CONF))
def build_config(self): def build_config(self):
"""Build the config as a Plugins object and return. """Build the config as a Plugins object and return.
@ -117,6 +84,7 @@ class Vertica(monasca_setup.detection.Plugin):
'user': self.user, 'user': self.user,
'password': self.password, 'password': self.password,
'service': self.service, 'service': self.service,
'node_name': self.node_name,
'timeout': self.timeout} 'timeout': self.timeout}
config['vertica'] = {'init_config': None, 'instances': [instance_config]} config['vertica'] = {'init_config': None, 'instances': [instance_config]}
else: else:
@ -125,12 +93,9 @@ class Vertica(monasca_setup.detection.Plugin):
'Please correct and re-run monasca-setup.' 'Please correct and re-run monasca-setup.'
log.error(exception_msg) log.error(exception_msg)
raise Exception(exception_msg) raise Exception(exception_msg)
except Exception: except Exception as e:
exception_msg = 'Error configuring the Vertica check plugin' exception_msg = 'Error configuring the Vertica check plugin - {0}'.format(e)
log.error(exception_msg) log.error(exception_msg)
raise Exception(exception_msg) raise Exception(exception_msg)
return config return config
def dependencies_installed(self):
return True