From 3734c3805bbf324b254335ba45de00d20d129247 Mon Sep 17 00:00:00 2001 From: David Schroeder Date: Fri, 3 Oct 2014 14:26:55 -0600 Subject: [PATCH] Add support for VM monitoring This patch gives the Monasca Agent the ability to monitor virtual machines (VMs) provisioned under Nova. It bundles Ceilometer's virtualization inspector to gather the actual metrics, includes a monasca-setup plugin, and adds cross-tenant support. The latter enables the Agent to submit metrics on behalf of a different tenant/project, giving the VM's owner a set of metrics without each VM having to run its own Agent. This plugin collects two sets of measurements per metric: one for the VM's owner (in-cloud) and for the Operations team (infrastructure). They differ in the following ways: - Operations metric names are prefixed with "vm." in order to group VM and overcloud metrics separately - Operations metrics include "tenant_id" as a dimension - Operations metrics include "cloud_tier" dimension with the value "overcloud" - The "hostname" dimension for Operations contains the name of the compute server; for the tenant, "hostname" is the name of the VM The metrics gathered by this plugin include: - Disk I/O: read/write operations/bytes per second - Disk I/O: errors per second - Network I/O: in/out packets/bytes per second - CPU utilization as a percentage of CPU time over polling time Change-Id: I23781a1ba552ef2551e6ff1caea21dd8f515d73d --- README.md | 111 ++++++++ monagent/collector/checks/check.py | 33 ++- monagent/collector/checks/collector.py | 6 +- monagent/collector/checks/system/unix.py | 5 +- monagent/collector/checks_d/libvirt.py | 254 ++++++++++++++++++ monagent/collector/virt/__init__.py | 0 monagent/collector/virt/hyperv/__init__.py | 0 monagent/collector/virt/hyperv/inspector.py | 80 ++++++ monagent/collector/virt/hyperv/utilsv2.py | 204 ++++++++++++++ monagent/collector/virt/inspector.py | 224 +++++++++++++++ monagent/collector/virt/libvirt/__init__.py | 0 monagent/collector/virt/libvirt/inspector.py | 169 ++++++++++++ monagent/collector/virt/vmware/__init__.py | 0 monagent/collector/virt/vmware/inspector.py | 182 +++++++++++++ .../virt/vmware/vsphere_operations.py | 229 ++++++++++++++++ monagent/collector/virt/xenapi/__init__.py | 0 monagent/collector/virt/xenapi/inspector.py | 192 +++++++++++++ monagent/common/aggregator.py | 64 +++-- monagent/common/metrics.py | 30 ++- monagent/forwarder/api/mon.py | 18 +- monsetup/detection/plugins/libvirt.py | 72 +++++ monsetup/main.py | 30 ++- setup.cfg | 6 + 23 files changed, 1852 insertions(+), 57 deletions(-) create mode 100644 monagent/collector/checks_d/libvirt.py create mode 100644 monagent/collector/virt/__init__.py create mode 100644 monagent/collector/virt/hyperv/__init__.py create mode 100644 monagent/collector/virt/hyperv/inspector.py create mode 100644 monagent/collector/virt/hyperv/utilsv2.py create mode 100644 monagent/collector/virt/inspector.py create mode 100644 monagent/collector/virt/libvirt/__init__.py create mode 100644 monagent/collector/virt/libvirt/inspector.py create mode 100644 monagent/collector/virt/vmware/__init__.py create mode 100644 monagent/collector/virt/vmware/inspector.py create mode 100644 monagent/collector/virt/vmware/vsphere_operations.py create mode 100644 monagent/collector/virt/xenapi/__init__.py create mode 100644 monagent/collector/virt/xenapi/inspector.py create mode 100644 monsetup/detection/plugins/libvirt.py diff --git a/README.md b/README.md index f93502be..e3ded4c2 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,7 @@ - [Ceilometer Checks](#ceilometer-checks) - [Ceilometer Processes Monitored](#ceilometer-processes-monitored) - [Example Ceilometer Metrics](#example-ceilometer-metrics) + - [Libvirt VM Monitoring](#libvirt-vm-monitoring) - [Statsd](#statsd) - [Log Parsing](#log-parsing) - [License](#license) @@ -939,6 +940,116 @@ The following ceilometer processes are monitored, if they exist when the monasca | ceilometer-api | processes.process_pid_count | Gauge | Passive | service=ceilometer, component=ceilometer-api | process | ceilometer-api process pid count | This is only one of the process checks performed | | ceilometer-api | http_status | Gauge | Active | service=ceilometer, component=ceilometer-api url=url_to_ceilometer_api | http_status | ceilometer-api http endpoint is alive | This check should be executed on multiple systems.| +## Libvirt VM Monitoring + +### Overview +The Libvirt plugin provides metrics for virtual machines when run on the hypervisor server. It provides two sets of metrics per measurement: one designed for the owner of the VM, and one intended for the owner of the hypervisor server. + +### Configuration +The `monasca-setup` program will configure the Libvirt plugin if `nova-api` is running, `/etc/nova/nova.conf` exists, and `python-novaclient` is installed. It uses a cache directory to persist data, which is `/dev/shm` by default. On non-Linux systems (BSD, Mac OSX), `/dev/shm` may not exist, so `cache_dir` would need to be changed accordingly, either in `monsetup/detection/plugins/libvirt.py` prior to running `monasca-setup`, or `/etc/monasca/agent/conf.d/libvirt.yaml` afterwards. + +`nova_refresh` specifies the number of seconds between calls to the Nova API to refresh the instance cache. This is helpful for updating VM hostname and pruning deleted instances from the cache. By default, it is set to 14,400 seconds (four hours). Set to 0 to refresh every time the Collector runs, or to None to disable regular refreshes entirely (though the instance cache will still be refreshed if a new instance is detected). + +`vm_probation` specifies a period of time (in seconds) in which to suspend metrics from a newly-created VM. This is to prevent quickly-obsolete metrics in an environment with a high amount of instance churn (VMs created and destroyed in rapid succession). The default probation length is 300 seconds (five minutes). Setting to 0 disables VM probation, and metrics will be recorded as soon as possible after a VM is created. + +Example config: +``` +init_config: + admin_password: pass + admin_tenant_name: service + admin_user: nova + identity_uri: 'http://192.168.10.5:35357/v2.0' + cache_dir: /dev/shm + nova_refresh: 14400 + vm_probation: 300 +instances: + - {} +``` +`instances` are null in `libvirt.yaml` because the libvirt plugin detects and runs against all provisioned VM instances; specifying them in `libvirt.yaml` is unnecessary. + +Note: If the Nova service login credentials are changed, `monasca-setup` would need to be re-run to use the new credentials. Alternately, `/etc/monasca/agent/conf.d/libvirt.yaml` could be modified directly. + +### Instance Cache +The instance cache (`/dev/shm/libvirt_instances.yaml` by default) contains data that is not available to libvirt, but queried from Nova. To limit calls to the Nova API, the cache is only updated if a new instance is detected (libvirt sees an instance not already in the cache), or every `nova_refresh` seconds (see Configuration above). + +Example cache: +``` +instance-00000003: {created: '2014-10-14T17:30:03Z', hostname: vm01.testboy.net, + instance_uuid: 54272a41-cf12-4243-b6f4-6e0c5ecbd777, tenant_id: 09afcd6d22bf4de0aea02de6e0724d41, + zone: nova} +instance-00000005: {created: '2014-10-15T18:39:44Z', hostname: vm02.testboy.net, + instance_uuid: aa04fa03-93c5-4a70-be01-3ddd9a529710, tenant_id: 09afcd6d22bf4de0aea02de6e0724d41, + zone: nova} +last_update: 1413398407 +``` + +### Metrics Cache +The libvirt inspector returns *counters*, but it is much more useful to use *rates* instead. To convert counters to rates, a metrics cache is used, stored in `/dev/shm/libvirt_metrics.yaml` by default. For each measurement gathered, the current value and timestamp (UNIX epoch) are recorded in the cache. The subsequent run of the Monasca Agent Collector compares current values against prior ones, and computes the rate. + +Since CPU Time is provided in nanoseconds, the timestamp recorded has nanosecond resolution. Otherwise, integer seconds are used. + +Example cache (excerpt, see next section for complete list of available metrics): +``` +instance-00000003: + cpu.time: {timestamp: 1413327252.150278, value: 191890000000} + io.read_bytes: + hdd: {timestamp: 1413327252, value: 139594} + vda: {timestamp: 1413327252, value: 1604608} + net.rx_packets: + vnet0: {timestamp: 1413327252, value: 24} +instance-00000004: + cpu.time: {timestamp: 1413327252.196404, value: 34870000000} + io.write_requests: + hdd: {timestamp: 1413327252, value: 0} + vda: {timestamp: 1413327252, value: 447} + net.tx_bytes: + vnet1: {timestamp: 1413327252, value: 2260} +``` + +### Metrics + +| Name | Description | Associated Dimensions | +| -------------------- | -------------------------------------- | ---------------------- | +| cpu.utilization_perc | Overall CPU utilization (percentage) | | +| io.read_ops_sec | Disk I/O read operations per second | 'device' (ie, 'hdd') | +| io.write_ops_sec | Disk I/O write operations per second | 'device' (ie, 'hdd') | +| io.read_bytes_sec | Disk I/O read bytes per second | 'device' (ie, 'hdd') | +| io.write_bytes_sec | Disk I/O write bytes per second | 'device' (ie, 'hdd') | +| io.errors_sec | Disk I/O errors per second | 'device' (ie, 'hdd') | +| net.in_packets_sec | Network received packets per second | 'device' (ie, 'vnet0') | +| net.out_packets_sec | Network transmitted packets per second | 'device' (ie, 'vnet0') | +| net.in_bytes_sec | Network received bytes per second | 'device' (ie, 'vnet0') | +| net.out_bytes_sec | Network transmitted bytes per second | 'device' (ie, 'vnet0') | + +Since separate metrics are sent to the VM's owner as well as Operations, all metric names designed for Operations are prefixed with "vm." to easily distinguish between VM metrics and compute host's metrics. + +### Dimensions +All metrics include `resource_id` and `zone` (availability zone) dimensions. Because there is a separate set of metrics for the two target audiences (VM customers and Operations), other dimensions may differ. + +| Dimension Name | Customer Value | Operations Value | +| -------------- | ------------------------- | ----------------------- | +| hostname | name of VM as provisioned | hypervisor's hostname | +| zone | availability zone | availability zone | +| resource_id | resource ID of VM | resource ID of VM | +| service | "compute" | "compute" | +| component | "vm" | "vm" | +| device | name of net or disk dev | name of net or disk dev | +| tenant_id | (N/A) | owner of VM | + +### Cross-Tenant Metric Submission +If the owner of the VM is to receive his or her own metrics, the Agent needs to be able to submit metrics on their behalf. This is called cross-tenant metric submission. For this to work, a keystone role called "monitoring-delegate" needs to be created, and the monasca-agent user assigned to it. +``` +keystone role-create --name=monitoring-delegate + +user_id=`keystone user-list |grep monasca-agent |cut -d'|' -f2` +role_id=`keystone role-list |grep monitoring-delegate |cut -d'|' -f2` +tenant_id=`keystone tenant-list |grep mini-mon |cut -d'|' -f2` + +keystone user-role-add --user=${user_id// /} --role=${role_id// /} --tenant_id=${tenant_id// /} +``` +The tenant name "mini-mon" in the example above may differ depending on your installation (it is set by the `--project-name` parameter of `monasca-setup` and can be referenced as `project_name` in `/etc/monasca/agent/agent.conf`). Once assigned to the `monitoring-delegate` group, the Agent can submit metrics for other tenants. + + # Statsd The Monasca Agent ships with a Statsd daemon implementation called monasca-statsd. A statsd client can be used to send metrics to the Forwarder via the Statsd daemon. diff --git a/monagent/collector/checks/check.py b/monagent/collector/checks/check.py index c314215e..8174349e 100644 --- a/monagent/collector/checks/check.py +++ b/monagent/collector/checks/check.py @@ -314,42 +314,45 @@ class AgentCheck(object): """ return len(self.instances) - def gauge(self, metric, value, dimensions=None, + def gauge(self, metric, value, dimensions=None, delegated_tenant=None, hostname=None, device_name=None, timestamp=None): """Record the value of a gauge, with optional dimensions, hostname and device name. :param metric: The name of the metric :param value: The value of the gauge :param dimensions: (optional) A dictionary of dimensions for this metric + :param delegated_tenant: (optional) Submit metrics on behalf of this tenant ID. :param hostname: (optional) A hostname for this metric. Defaults to the current hostname. :param device_name: (optional) The device name for this metric :param timestamp: (optional) The timestamp for this metric value """ - self.aggregator.gauge(metric, value, dimensions, hostname, device_name, timestamp) + self.aggregator.gauge(metric, value, dimensions, delegated_tenant, hostname, device_name, timestamp) - def increment(self, metric, value=1, dimensions=None, hostname=None, device_name=None): + def increment(self, metric, value=1, dimensions=None, delegated_tenant=None, hostname=None, device_name=None): """Increment a counter with optional dimensions, hostname and device name. :param metric: The name of the metric :param value: The value to increment by :param dimensions: (optional) A dictionary of dimensions for this metric + :param delegated_tenant: (optional) Submit metrics on behalf of this tenant ID. :param hostname: (optional) A hostname for this metric. Defaults to the current hostname. :param device_name: (optional) The device name for this metric """ - self.aggregator.increment(metric, value, dimensions, hostname, device_name) + self.aggregator.increment(metric, value, dimensions, delegated_tenant, hostname, device_name) - def decrement(self, metric, value=-1, dimensions=None, hostname=None, device_name=None): + def decrement(self, metric, value=-1, dimensions=None, delegated_tenant=None, hostname=None, device_name=None): """Decrement a counter with optional dimensions, hostname and device name. :param metric: The name of the metric :param value: The value to decrement by :param dimensions: (optional) A dictionary of dimensions for this metric + :param delegated_tenant: (optional) Submit metrics on behalf of this tenant ID. :param hostname: (optional) A hostname for this metric. Defaults to the current hostname. :param device_name: (optional) The device name for this metric """ - self.aggregator.decrement(metric, value, dimensions, hostname, device_name) + self.aggregator.decrement(metric, value, dimensions, delegated_tenant, hostname, device_name) - def rate(self, metric, value, dimensions=None, hostname=None, device_name=None): + def rate(self, metric, value, dimensions=None, delegated_tenant=None, hostname=None, device_name=None): """Submit a point for a metric that will be calculated as a rate on flush. Values will persist across each call to `check` if there is not enough @@ -358,32 +361,35 @@ class AgentCheck(object): :param metric: The name of the metric :param value: The value of the rate :param dimensions: (optional) A dictionary of dimensions for this metric + :param delegated_tenant: (optional) Submit metrics on behalf of this tenant ID. :param hostname: (optional) A hostname for this metric. Defaults to the current hostname. :param device_name: (optional) The device name for this metric """ - self.aggregator.rate(metric, value, dimensions, hostname, device_name) + self.aggregator.rate(metric, value, dimensions, delegated_tenant, hostname, device_name) - def histogram(self, metric, value, dimensions=None, hostname=None, device_name=None): + def histogram(self, metric, value, dimensions=None, delegated_tenant=None, hostname=None, device_name=None): """Sample a histogram value, with optional dimensions, hostname and device name. :param metric: The name of the metric :param value: The value to sample for the histogram :param dimensions: (optional) A dictionary of dimensions for this metric + :param delegated_tenant: (optional) Submit metrics on behalf of this tenant ID. :param hostname: (optional) A hostname for this metric. Defaults to the current hostname. :param device_name: (optional) The device name for this metric """ - self.aggregator.histogram(metric, value, dimensions, hostname, device_name) + self.aggregator.histogram(metric, value, dimensions, delegated_tenant, hostname, device_name) - def set(self, metric, value, dimensions=None, hostname=None, device_name=None): + def set(self, metric, value, dimensions=None, delegated_tenant=None, hostname=None, device_name=None): """Sample a set value, with optional dimensions, hostname and device name. :param metric: The name of the metric :param value: The value for the set :param dimensions: (optional) A dictionary of dimensions for this metric + :param delegated_tenant: (optional) Submit metrics on behalf of this tenant ID. :param hostname: (optional) A hostname for this metric. Defaults to the current hostname. :param device_name: (optional) The device name for this metric """ - self.aggregator.set(metric, value, dimensions, hostname, device_name) + self.aggregator.set(metric, value, dimensions, delegated_tenant, hostname, device_name) def event(self, event): """Save an event. @@ -428,6 +434,9 @@ class AgentCheck(object): print(" Timestamp: {}".format(metric.timestamp)) print(" Name: {}".format(metric.name)) print(" Value: {}".format(metric.value)) + if (metric.delegated_tenant): + print(" Delegtd ID: {}".format(metric.delegated_tenant)) + print(" Dimensions: ", end='') line = 0 for name in metric.dimensions: diff --git a/monagent/collector/checks/collector.py b/monagent/collector/checks/collector.py index 5516ec4a..b37c27b6 100644 --- a/monagent/collector/checks/collector.py +++ b/monagent/collector/checks/collector.py @@ -142,7 +142,8 @@ class Collector(object): for check_type in self._legacy_checks: try: for name, value in check_type.check().iteritems(): - metrics_list.append(monagent.common.metrics.Measurement(name, timestamp, value, {})) + metrics_list.append(monagent.common.metrics.Measurement(name, timestamp, + value, {}, None)) except Exception: log.exception('Error running check.') @@ -164,7 +165,8 @@ class Collector(object): metrics_list.append(monagent.common.metrics.Measurement(name, timestamp, value, - {'component': 'collector'})) + {'component': 'collector'}, + None)) emitter_statuses = self._emit(metrics_list) self.emit_duration = timer.step() diff --git a/monagent/collector/checks/system/unix.py b/monagent/collector/checks/system/unix.py index c864792c..060f2fcf 100644 --- a/monagent/collector/checks/system/unix.py +++ b/monagent/collector/checks/system/unix.py @@ -63,7 +63,8 @@ class Disk(monagent.collector.checks.check.Check): measurements = [monagent.common.metrics.Measurement(key.split('.', 1)[1], timestamp, value, - {'device': key.split('.', 1)[0]}) + {'device': key.split('.', 1)[0]}, + None) for key, value in stats.iteritems()] return measurements @@ -399,7 +400,7 @@ class IO(monagent.collector.checks.check.Check): for dev_name, stats in filtered_io.iteritems(): filtered_stats = {stat: stats[stat] for stat in stats.iterkeys() if stat not in self.stat_blacklist} - m_list = [monagent.common.metrics.Measurement(key, timestamp, value, {'device': dev_name}) + m_list = [monagent.common.metrics.Measurement(key, timestamp, value, {'device': dev_name}, None) for key, value in filtered_stats.iteritems()] measurements.extend(m_list) diff --git a/monagent/collector/checks_d/libvirt.py b/monagent/collector/checks_d/libvirt.py new file mode 100644 index 00000000..03ae4afd --- /dev/null +++ b/monagent/collector/checks_d/libvirt.py @@ -0,0 +1,254 @@ +#!/bin/env python + +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Monasca Agent interface for libvirt metrics""" + +import os +import stat +import time +import yaml + +from calendar import timegm +from datetime import datetime +from monagent.collector.virt import inspector +from monagent.collector.checks import AgentCheck + + +class LibvirtCheck(AgentCheck): + + """Inherit Agent class and gather libvirt metrics""" + + def __init__(self, name, init_config, agent_config): + AgentCheck.__init__(self, name, init_config, agent_config) + self.instance_cache_file = "{}/{}".format(self.init_config.get('cache_dir'), + 'libvirt_instances.yaml') + self.metric_cache_file = "{}/{}".format(self.init_config.get('cache_dir'), + 'libvirt_metrics.yaml') + + def _test_vm_probation(self, created): + """Test to see if a VM was created within the probation period. + + Convert an ISO-8601 timestamp into UNIX epoch timestamp from now + and compare that against configured vm_probation. Return the + number of seconds this VM will remain in probation. + """ + dt = datetime.strptime(created, '%Y-%m-%dT%H:%M:%SZ') + created_sec = (time.time() - timegm(dt.timetuple())) + probation_time = self.init_config.get('vm_probation') - created_sec + return int(probation_time) + + def _update_instance_cache(self): + """Collect instance_id, project_id, and AZ for all instance UUIDs + """ + from novaclient.v3 import client + id_cache = {} + # Get a list of all instances from the Nova API + nova_client = client.Client(self.init_config.get('admin_user'), + self.init_config.get('admin_password'), + self.init_config.get('admin_tenant_name'), + self.init_config.get('identity_uri'), + service_type="compute") + instances = nova_client.servers.list(search_opts={'all_tenants': 1}) + + for instance in instances: + inst_name = instance.__getattr__('OS-EXT-SRV-ATTR:instance_name') + inst_az = instance.__getattr__('OS-EXT-AZ:availability_zone') + id_cache[inst_name] = {'instance_uuid': instance.id, + 'hostname': instance.name, + 'zone': inst_az, + 'created': instance.created, + 'tenant_id': instance.tenant_id} + id_cache['last_update'] = int(time.time()) + + # Write the updated cache + try: + with open(self.instance_cache_file, 'w') as cache_yaml: + yaml.safe_dump(id_cache, cache_yaml) + if stat.S_IMODE(os.stat(self.instance_cache_file).st_mode) != 0600: + os.chmod(self.instance_cache_file, 0600) + except IOError as e: + self.log.error("Cannot write to {}: {}".format(self.instance_cache_file, e)) + + return id_cache + + def _load_instance_cache(self): + """Load the cache if instance names to IDs. + + If the cache does not yet exist, return an empty one. + """ + instance_cache = {} + try: + with open(self.instance_cache_file, 'r') as cache_yaml: + instance_cache = yaml.safe_load(cache_yaml) + + # Is it time to force a refresh of this data? + if self.init_config.get('nova_refresh') is not None: + time_diff = time.time() - instance_cache['last_update'] + if time_diff > self.init_config.get('nova_refresh'): + self._update_instance_cache() + except IOError: + # The file may not exist yet, and that's OK. Build it now. + instance_cache = self._update_instance_cache() + pass + + return instance_cache + + def _load_metric_cache(self): + """Load the counter metrics from the previous collection iteration + """ + metric_cache = {} + try: + with open(self.metric_cache_file, 'r') as cache_yaml: + metric_cache = yaml.safe_load(cache_yaml) + except IOError: + # The file may not exist yet. + pass + + return metric_cache + + def _update_metric_cache(self, metric_cache): + try: + with open(self.metric_cache_file, 'w') as cache_yaml: + yaml.safe_dump(metric_cache, cache_yaml) + if stat.S_IMODE(os.stat(self.metric_cache_file).st_mode) != 0600: + os.chmod(self.metric_cache_file, 0600) + except IOError as e: + self.log.error("Cannot write to {}: {}".format(self.metric_cache_file, e)) + + def check(self, instance): + """Gather VM metrics for each instance""" + + # Load metric cache + metric_cache = self._load_metric_cache() + + # Load the nova-obtained instance data cache + instance_cache = self._load_instance_cache() + + # Build dimensions for both the customer and for operations + dims_base = {'service': 'compute', 'component': 'vm'} + + insp = inspector.get_hypervisor_inspector() + for inst in insp.inspect_instances(): + # Verify that this instance exists in the cache. Add if necessary. + if inst.name not in instance_cache: + instance_cache = self._update_instance_cache() + if inst.name not in metric_cache: + metric_cache[inst.name] = {} + + # Skip instances created within the probation period + vm_probation_remaining = self._test_vm_probation(instance_cache.get(inst.name)['created']) + if (vm_probation_remaining >= 0): + self.log.info("Libvirt: {} in probation for another {} seconds".format(instance_cache.get(inst.name)['hostname'], + vm_probation_remaining)) + continue + + # Build customer dimensions + dims_customer = dims_base.copy() + dims_customer['resource_id'] = instance_cache.get(inst.name)['instance_uuid'] + dims_customer['zone'] = instance_cache.get(inst.name)['zone'] + # Add dimensions that would be helpful for operations + dims_operations = dims_customer.copy() + dims_operations['tenant_id'] = instance_cache.get(inst.name)['tenant_id'] + dims_operations['cloud_tier'] = 'overcloud' + + # CPU utilization percentage + sample_time = float("{:9f}".format(time.time())) + if 'cpu.time' in metric_cache[inst.name]: + # I have a prior value, so calculate the rate & push the metric + cpu_diff = insp.inspect_cpus(inst.name).time - metric_cache[inst.name]['cpu.time']['value'] + time_diff = sample_time - float(metric_cache[inst.name]['cpu.time']['timestamp']) + # Convert time_diff to nanoseconds, and calculate percentage + rate = (cpu_diff / (time_diff * 1000000000)) * 100 + + self.gauge('cpu.utilization_perc', int(round(rate, 0)), + dimensions=dims_customer, + delegated_tenant=instance_cache.get(inst.name)['tenant_id'], + hostname=instance_cache.get(inst.name)['hostname']) + self.gauge('vm.cpu.utilization_perc', int(round(rate, 0)), + dimensions=dims_operations) + + metric_cache[inst.name]['cpu.time'] = {'timestamp': sample_time, + 'value': insp.inspect_cpus(inst.name).time} + + # Disk utilization + for disk in insp.inspect_disks(inst.name): + sample_time = int(time.time()) + disk_dimensions = {'device': disk[0].device} + for metric in disk[1]._fields: + metric_name = "io.{}".format(metric) + if metric_name not in metric_cache[inst.name]: + metric_cache[inst.name][metric_name] = {} + + value = int(disk[1].__getattribute__(metric)) + if disk[0].device in metric_cache[inst.name][metric_name]: + time_diff = sample_time - metric_cache[inst.name][metric_name][disk[0].device]['timestamp'] + val_diff = value - metric_cache[inst.name][metric_name][disk[0].device]['value'] + # Change the metric name to a rate, ie. "io.read_requests" + # gets converted to "io.read_ops_sec" + rate_name = "{}_sec".format(metric_name.replace('requests', 'ops')) + # Customer + this_dimensions = disk_dimensions.copy() + this_dimensions.update(dims_customer) + self.gauge(rate_name, val_diff, dimensions=this_dimensions, + delegated_tenant=instance_cache.get(inst.name)['tenant_id'], + hostname=instance_cache.get(inst.name)['hostname']) + # Operations (metric name prefixed with "vm." + this_dimensions = disk_dimensions.copy() + this_dimensions.update(dims_operations) + self.gauge("vm.{}".format(rate_name), val_diff, + dimensions=this_dimensions) + # Save this metric to the cache + metric_cache[inst.name][metric_name][disk[0].device] = { + 'timestamp': sample_time, + 'value': value} + + # Network utilization + for vnic in insp.inspect_vnics(inst.name): + sample_time = int(time.time()) + vnic_dimensions = {'device': vnic[0].name} + for metric in vnic[1]._fields: + metric_name = "net.{}".format(metric) + if metric_name not in metric_cache[inst.name]: + metric_cache[inst.name][metric_name] = {} + + value = int(vnic[1].__getattribute__(metric)) + if vnic[0].name in metric_cache[inst.name][metric_name]: + time_diff = sample_time - metric_cache[inst.name][metric_name][vnic[0].name]['timestamp'] + val_diff = value - metric_cache[inst.name][metric_name][vnic[0].name]['value'] + # Change the metric name to a rate, ie. "net.rx_bytes" + # gets converted to "net.rx_bytes_sec" + rate_name = "{}_sec".format(metric_name) + # Rename "tx" to "out" and "rx" to "in" + rate_name = rate_name.replace("tx", "out") + rate_name = rate_name.replace("rx", "in") + # Customer + this_dimensions = vnic_dimensions.copy() + this_dimensions.update(dims_customer) + self.gauge(rate_name, val_diff, + dimensions=this_dimensions, + delegated_tenant=instance_cache.get(inst.name)['tenant_id'], + hostname=instance_cache.get(inst.name)['hostname']) + # Operations (metric name prefixed with "vm." + this_dimensions = vnic_dimensions.copy() + this_dimensions.update(dims_operations) + self.gauge("vm.{}".format(rate_name), val_diff, + dimensions=this_dimensions) + # Save this metric to the cache + metric_cache[inst.name][metric_name][vnic[0].name] = { + 'timestamp': sample_time, + 'value': value} + + # Save these metrics for the next collector invocation + self._update_metric_cache(metric_cache) diff --git a/monagent/collector/virt/__init__.py b/monagent/collector/virt/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/monagent/collector/virt/hyperv/__init__.py b/monagent/collector/virt/hyperv/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/monagent/collector/virt/hyperv/inspector.py b/monagent/collector/virt/hyperv/inspector.py new file mode 100644 index 00000000..81c37b73 --- /dev/null +++ b/monagent/collector/virt/hyperv/inspector.py @@ -0,0 +1,80 @@ +# Copyright 2013 Cloudbase Solutions Srl +# +# Author: Claudiu Belu +# Alessandro Pilotti +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Implementation of Inspector abstraction for Hyper-V""" + +from oslo.utils import units + +from monagent.collector.virt.hyperv import utilsv2 +from monagent.collector.virt import inspector as virt_inspector + + +class HyperVInspector(virt_inspector.Inspector): + + def __init__(self): + super(HyperVInspector, self).__init__() + self._utils = utilsv2.UtilsV2() + + def inspect_instances(self): + for element_name, name in self._utils.get_all_vms(): + yield virt_inspector.Instance( + name=element_name, + UUID=name) + + def inspect_cpus(self, instance_name): + (cpu_clock_used, + cpu_count, uptime) = self._utils.get_cpu_metrics(instance_name) + host_cpu_clock, host_cpu_count = self._utils.get_host_cpu_info() + + cpu_percent_used = (cpu_clock_used / + float(host_cpu_clock * cpu_count)) + # Nanoseconds + cpu_time = (long(uptime * cpu_percent_used) * units.k) + + return virt_inspector.CPUStats(number=cpu_count, time=cpu_time) + + def inspect_vnics(self, instance_name): + for vnic_metrics in self._utils.get_vnic_metrics(instance_name): + interface = virt_inspector.Interface( + name=vnic_metrics["element_name"], + mac=vnic_metrics["address"], + fref=None, + parameters=None) + + stats = virt_inspector.InterfaceStats( + rx_bytes=vnic_metrics['rx_mb'] * units.Mi, + rx_packets=0, + tx_bytes=vnic_metrics['tx_mb'] * units.Mi, + tx_packets=0) + + yield (interface, stats) + + def inspect_disks(self, instance_name): + for disk_metrics in self._utils.get_disk_metrics(instance_name): + device = dict([(i, disk_metrics[i]) + for i in ['instance_id', 'host_resource'] + if i in disk_metrics]) + + disk = virt_inspector.Disk(device=device) + stats = virt_inspector.DiskStats( + read_requests=0, + # Return bytes + read_bytes=disk_metrics['read_mb'] * units.Mi, + write_requests=0, + write_bytes=disk_metrics['write_mb'] * units.Mi, + errors=0) + + yield (disk, stats) diff --git a/monagent/collector/virt/hyperv/utilsv2.py b/monagent/collector/virt/hyperv/utilsv2.py new file mode 100644 index 00000000..0973a5af --- /dev/null +++ b/monagent/collector/virt/hyperv/utilsv2.py @@ -0,0 +1,204 @@ +# Copyright 2013 Cloudbase Solutions Srl +# +# Author: Claudiu Belu +# Alessandro Pilotti +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Utility class for VM related operations. +Based on the "root/virtualization/v2" namespace available starting with +Hyper-V Server / Windows Server 2012. +""" + +import sys + +if sys.platform == 'win32': + import wmi + +from monagent.collector.virt import inspector + + +class HyperVException(inspector.InspectorException): + pass + + +class UtilsV2(object): + + _VIRTUAL_SYSTEM_TYPE_REALIZED = 'Microsoft:Hyper-V:System:Realized' + + _PROC_SETTING = 'Msvm_ProcessorSettingData' + _SYNTH_ETH_PORT = 'Msvm_SyntheticEthernetPortSettingData' + _ETH_PORT_ALLOC = 'Msvm_EthernetPortAllocationSettingData' + _PORT_ACL_SET_DATA = 'Msvm_EthernetSwitchPortAclSettingData' + _STORAGE_ALLOC = 'Msvm_StorageAllocationSettingData' + _VS_SETTING_DATA = 'Msvm_VirtualSystemSettingData' + _METRICS_ME = 'Msvm_MetricForME' + _BASE_METRICS_VALUE = 'Msvm_BaseMetricValue' + + _CPU_METRIC_NAME = 'Aggregated Average CPU Utilization' + _NET_IN_METRIC_NAME = 'Filtered Incoming Network Traffic' + _NET_OUT_METRIC_NAME = 'Filtered Outgoing Network Traffic' + # Disk metrics are supported from Hyper-V 2012 R2 + _DISK_RD_METRIC_NAME = 'Disk Data Read' + _DISK_WR_METRIC_NAME = 'Disk Data Written' + + def __init__(self, host='.'): + if sys.platform == 'win32': + self._init_hyperv_wmi_conn(host) + self._init_cimv2_wmi_conn(host) + self._host_cpu_info = None + + def _init_hyperv_wmi_conn(self, host): + self._conn = wmi.WMI(moniker='//%s/root/virtualization/v2' % host) + + def _init_cimv2_wmi_conn(self, host): + self._conn_cimv2 = wmi.WMI(moniker='//%s/root/cimv2' % host) + + def get_host_cpu_info(self): + if not self._host_cpu_info: + host_cpus = self._conn_cimv2.Win32_Processor() + self._host_cpu_info = (host_cpus[0].MaxClockSpeed, len(host_cpus)) + return self._host_cpu_info + + def get_all_vms(self): + vms = [(v.ElementName, v.Name) for v in + self._conn.Msvm_ComputerSystem(['ElementName', 'Name'], + Caption="Virtual Machine")] + return vms + + def get_cpu_metrics(self, vm_name): + vm = self._lookup_vm(vm_name) + cpu_sd = self._get_vm_resources(vm, self._PROC_SETTING)[0] + cpu_metrics_def = self._get_metric_def(self._CPU_METRIC_NAME) + cpu_metric_aggr = self._get_metrics(vm, cpu_metrics_def) + + cpu_used = 0 + if cpu_metric_aggr: + cpu_used = long(cpu_metric_aggr[0].MetricValue) + + return (cpu_used, + int(cpu_sd.VirtualQuantity), + long(vm.OnTimeInMilliseconds)) + + def get_vnic_metrics(self, vm_name): + vm = self._lookup_vm(vm_name) + ports = self._get_vm_resources(vm, self._ETH_PORT_ALLOC) + vnics = self._get_vm_resources(vm, self._SYNTH_ETH_PORT) + + metric_def_in = self._get_metric_def(self._NET_IN_METRIC_NAME) + metric_def_out = self._get_metric_def(self._NET_OUT_METRIC_NAME) + + for port in ports: + vnic = [v for v in vnics if port.Parent == v.path_()][0] + + metric_value_instances = self._get_metric_value_instances( + port.associators(wmi_result_class=self._PORT_ACL_SET_DATA), + self._BASE_METRICS_VALUE) + metric_values = self._sum_metric_values_by_defs( + metric_value_instances, [metric_def_in, metric_def_out]) + + yield { + 'rx_mb': metric_values[0], + 'tx_mb': metric_values[1], + 'element_name': vnic.ElementName, + 'address': vnic.Address + } + + def get_disk_metrics(self, vm_name): + vm = self._lookup_vm(vm_name) + metric_def_r = self._get_metric_def(self._DISK_RD_METRIC_NAME) + metric_def_w = self._get_metric_def(self._DISK_WR_METRIC_NAME) + + disks = self._get_vm_resources(vm, self._STORAGE_ALLOC) + for disk in disks: + metric_values = self._get_metric_values( + disk, [metric_def_r, metric_def_w]) + + # Thi sis e.g. the VHD file location + if disk.HostResource: + host_resource = disk.HostResource[0] + + yield { + # Values are in megabytes + 'read_mb': metric_values[0], + 'write_mb': metric_values[1], + 'instance_id': disk.InstanceID, + 'host_resource': host_resource + } + + def _sum_metric_values(self, metrics): + tot_metric_val = 0 + for metric in metrics: + tot_metric_val += long(metric.MetricValue) + return tot_metric_val + + def _sum_metric_values_by_defs(self, element_metrics, metric_defs): + metric_values = [] + for metric_def in metric_defs: + if metric_def: + metrics = self._filter_metrics(element_metrics, metric_def) + metric_values.append(self._sum_metric_values(metrics)) + else: + # In case the metric is not defined on this host + metric_values.append(0) + return metric_values + + def _get_metric_value_instances(self, elements, result_class): + instances = [] + for el in elements: + associators = el.associators(wmi_result_class=result_class) + if associators: + instances.append(associators[0]) + + return instances + + def _get_metric_values(self, element, metric_defs): + element_metrics = element.associators( + wmi_association_class=self._METRICS_ME) + return self._sum_metric_values_by_defs(element_metrics, metric_defs) + + def _lookup_vm(self, vm_name): + vms = self._conn.Msvm_ComputerSystem(ElementName=vm_name) + n = len(vms) + if n == 0: + raise inspector.InstanceNotFoundException( + _('VM %s not found on Hyper-V') % vm_name) + elif n > 1: + raise HyperVException(_('Duplicate VM name found: %s') % vm_name) + else: + return vms[0] + + def _get_metrics(self, element, metric_def): + return self._filter_metrics( + element.associators( + wmi_association_class=self._METRICS_ME), metric_def) + + def _filter_metrics(self, all_metrics, metric_def): + return [v for v in all_metrics if + v.MetricDefinitionId == metric_def.Id] + + def _get_metric_def(self, metric_def): + metric = self._conn.CIM_BaseMetricDefinition(ElementName=metric_def) + if metric: + return metric[0] + + def _get_vm_setting_data(self, vm): + vm_settings = vm.associators( + wmi_result_class=self._VS_SETTING_DATA) + # Avoid snapshots + return [s for s in vm_settings if + s.VirtualSystemType == self._VIRTUAL_SYSTEM_TYPE_REALIZED][0] + + def _get_vm_resources(self, vm, resource_class): + setting_data = self._get_vm_setting_data(vm) + return setting_data.associators(wmi_result_class=resource_class) diff --git a/monagent/collector/virt/inspector.py b/monagent/collector/virt/inspector.py new file mode 100644 index 00000000..0777bfea --- /dev/null +++ b/monagent/collector/virt/inspector.py @@ -0,0 +1,224 @@ +# +# Copyright 2012 Red Hat, Inc +# +# Author: Eoghan Glynn +# Doug Hellmann +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Inspector abstraction for read-only access to hypervisors.""" + +import collections + +from oslo.config import cfg +from stevedore import driver + +OPTS = [ + cfg.StrOpt('hypervisor_inspector', + default='libvirt', + help='Inspector to use for inspecting the hypervisor layer.'), +] + +cfg.CONF.register_opts(OPTS) + + +# Named tuple representing instances. +# +# name: the name of the instance +# uuid: the UUID associated with the instance +# +Instance = collections.namedtuple('Instance', ['name', 'UUID']) + + +# Named tuple representing CPU statistics. +# +# number: number of CPUs +# time: cumulative CPU time +# +CPUStats = collections.namedtuple('CPUStats', ['number', 'time']) + +# Named tuple representing CPU Utilization statistics. +# +# util: CPU utilization in percentage +# +CPUUtilStats = collections.namedtuple('CPUUtilStats', ['util']) + +# Named tuple representing Memory usage statistics. +# +# usage: Amount of memory used +# +MemoryUsageStats = collections.namedtuple('MemoryUsageStats', ['usage']) + + +# Named tuple representing vNICs. +# +# name: the name of the vNIC +# mac: the MAC address +# fref: the filter ref +# parameters: miscellaneous parameters +# +Interface = collections.namedtuple('Interface', ['name', 'mac', + 'fref', 'parameters']) + + +# Named tuple representing vNIC statistics. +# +# rx_bytes: number of received bytes +# rx_packets: number of received packets +# tx_bytes: number of transmitted bytes +# tx_packets: number of transmitted packets +# +InterfaceStats = collections.namedtuple('InterfaceStats', + ['rx_bytes', 'rx_packets', + 'tx_bytes', 'tx_packets']) + + +# Named tuple representing vNIC rate statistics. +# +# rx_bytes_rate: rate of received bytes +# tx_bytes_rate: rate of transmitted bytes +# +InterfaceRateStats = collections.namedtuple('InterfaceRateStats', + ['rx_bytes_rate', 'tx_bytes_rate']) + + +# Named tuple representing disks. +# +# device: the device name for the disk +# +Disk = collections.namedtuple('Disk', ['device']) + + +# Named tuple representing disk statistics. +# +# read_bytes: number of bytes read +# read_requests: number of read operations +# write_bytes: number of bytes written +# write_requests: number of write operations +# errors: number of errors +# +DiskStats = collections.namedtuple('DiskStats', + ['read_bytes', 'read_requests', + 'write_bytes', 'write_requests', + 'errors']) + +# Named tuple representing disk rate statistics. +# +# read_bytes_rate: number of bytes read per second +# read_requests_rate: number of read operations per second +# write_bytes_rate: number of bytes written per second +# write_requests_rate: number of write operations per second +# +DiskRateStats = collections.namedtuple('DiskRateStats', + ['read_bytes_rate', + 'read_requests_rate', + 'write_bytes_rate', + 'write_requests_rate']) + + +# Exception types +# +class InspectorException(Exception): + def __init__(self, message=None): + super(InspectorException, self).__init__(message) + + +class InstanceNotFoundException(InspectorException): + pass + + +# Main virt inspector abstraction layering over the hypervisor API. +# +class Inspector(object): + + def inspect_instances(self): + """List the instances on the current host.""" + raise NotImplementedError() + + def inspect_cpus(self, instance_name): + """Inspect the CPU statistics for an instance. + + :param instance_name: the name of the target instance + :return: the number of CPUs and cumulative CPU time + """ + raise NotImplementedError() + + def inspect_cpu_util(self, instance, duration=None): + """Inspect the CPU Utilization (%) for an instance. + + :param instance: the target instance + :param duration: the last 'n' seconds, over which the value should be + inspected + :return: the percentage of CPU utilization + """ + raise NotImplementedError() + + def inspect_vnics(self, instance_name): + """Inspect the vNIC statistics for an instance. + + :param instance_name: the name of the target instance + :return: for each vNIC, the number of bytes & packets + received and transmitted + """ + raise NotImplementedError() + + def inspect_vnic_rates(self, instance, duration=None): + """Inspect the vNIC rate statistics for an instance. + + :param instance: the target instance + :param duration: the last 'n' seconds, over which the value should be + inspected + :return: for each vNIC, the rate of bytes & packets + received and transmitted + """ + raise NotImplementedError() + + def inspect_disks(self, instance_name): + """Inspect the disk statistics for an instance. + + :param instance_name: the name of the target instance + :return: for each disk, the number of bytes & operations + read and written, and the error count + """ + raise NotImplementedError() + + def inspect_memory_usage(self, instance, duration=None): + """Inspect the memory usage statistics for an instance. + + :param instance: the target instance + :param duration: the last 'n' seconds, over which the value should be + inspected + :return: the amount of memory used + """ + raise NotImplementedError() + + def inspect_disk_rates(self, instance, duration=None): + """Inspect the disk statistics as rates for an instance. + + :param instance: the target instance + :param duration: the last 'n' seconds, over which the value should be + inspected + :return: for each disk, the number of bytes & operations + read and written per second, with the error count + """ + raise NotImplementedError() + + +def get_hypervisor_inspector(): + try: + namespace = 'monagent.collector.virt' + mgr = driver.DriverManager(namespace, + cfg.CONF.hypervisor_inspector, + invoke_on_load=True) + return mgr.driver + except ImportError as e: + return Inspector() diff --git a/monagent/collector/virt/libvirt/__init__.py b/monagent/collector/virt/libvirt/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/monagent/collector/virt/libvirt/inspector.py b/monagent/collector/virt/libvirt/inspector.py new file mode 100644 index 00000000..6e3009a2 --- /dev/null +++ b/monagent/collector/virt/libvirt/inspector.py @@ -0,0 +1,169 @@ +# +# Copyright 2012 Red Hat, Inc +# +# Author: Eoghan Glynn +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Implementation of Inspector abstraction for libvirt.""" + +from lxml import etree +from oslo.config import cfg +import six + +from monagent.collector.virt import inspector as virt_inspector + +libvirt = None + + +libvirt_opts = [ + cfg.StrOpt('libvirt_type', + default='kvm', + help='Libvirt domain type (valid options are: ' + 'kvm, lxc, qemu, uml, xen).'), + cfg.StrOpt('libvirt_uri', + default='', + help='Override the default libvirt URI ' + '(which is dependent on libvirt_type).'), +] + +CONF = cfg.CONF +CONF.register_opts(libvirt_opts) + + +def retry_on_disconnect(function): + def decorator(self, *args, **kwargs): + try: + return function(self, *args, **kwargs) + except libvirt.libvirtError as e: + if (e.get_error_code() == libvirt.VIR_ERR_SYSTEM_ERROR and + e.get_error_domain() in (libvirt.VIR_FROM_REMOTE, + libvirt.VIR_FROM_RPC)): + self.connection = None + return function(self, *args, **kwargs) + else: + raise + return decorator + + +class LibvirtInspector(virt_inspector.Inspector): + + per_type_uris = dict(uml='uml:///system', xen='xen:///', lxc='lxc:///') + + def __init__(self): + self.uri = self._get_uri() + self.connection = None + + def _get_uri(self): + return CONF.libvirt_uri or self.per_type_uris.get(CONF.libvirt_type, + 'qemu:///system') + + def _get_connection(self): + if not self.connection: + global libvirt + if libvirt is None: + libvirt = __import__('libvirt') + self.connection = libvirt.openReadOnly(self.uri) + + return self.connection + + @retry_on_disconnect + def _lookup_by_name(self, instance_name): + try: + return self._get_connection().lookupByName(instance_name) + except Exception as ex: + if not libvirt or not isinstance(ex, libvirt.libvirtError): + raise virt_inspector.InspectorException(six.text_type(ex)) + error_code = ex.get_error_code() + if (error_code == libvirt.VIR_ERR_SYSTEM_ERROR and + ex.get_error_domain() in (libvirt.VIR_FROM_REMOTE, + libvirt.VIR_FROM_RPC)): + raise + msg = ("Error from libvirt while looking up %(instance_name)s: " + "[Error Code %(error_code)s] " + "%(ex)s" % {'instance_name': instance_name, + 'error_code': error_code, + 'ex': ex}) + raise virt_inspector.InstanceNotFoundException(msg) + + @retry_on_disconnect + def inspect_instance(self, domain_id): + domain = self._get_connection().lookupByID(domain_id) + return virt_inspector.Instance(name=domain.name(), + UUID=domain.UUIDString()) + + @retry_on_disconnect + def inspect_instances(self): + if self._get_connection().numOfDomains() > 0: + for domain_id in self._get_connection().listDomainsID(): + if domain_id != 0: + try: + yield self.inspect_instance(domain_id) + except libvirt.libvirtError: + # Instance was deleted while listing... ignore it + pass + + def inspect_cpus(self, instance_name): + domain = self._lookup_by_name(instance_name) + dom_info = domain.info() + return virt_inspector.CPUStats(number=dom_info[3], time=dom_info[4]) + + def inspect_vnics(self, instance_name): + domain = self._lookup_by_name(instance_name) + state = domain.info()[0] + if state == libvirt.VIR_DOMAIN_SHUTOFF: + return + tree = etree.fromstring(domain.XMLDesc(0)) + for iface in tree.findall('devices/interface'): + target = iface.find('target') + if target is not None: + name = target.get('dev') + else: + continue + mac = iface.find('mac') + if mac is not None: + mac_address = mac.get('address') + else: + continue + fref = iface.find('filterref') + if fref is not None: + fref = fref.get('filter') + + params = dict((p.get('name').lower(), p.get('value')) + for p in iface.findall('filterref/parameter')) + interface = virt_inspector.Interface(name=name, mac=mac_address, + fref=fref, parameters=params) + dom_stats = domain.interfaceStats(name) + stats = virt_inspector.InterfaceStats(rx_bytes=dom_stats[0], + rx_packets=dom_stats[1], + tx_bytes=dom_stats[4], + tx_packets=dom_stats[5]) + yield (interface, stats) + + def inspect_disks(self, instance_name): + domain = self._lookup_by_name(instance_name) + state = domain.info()[0] + if state == libvirt.VIR_DOMAIN_SHUTOFF: + return + tree = etree.fromstring(domain.XMLDesc(0)) + for device in filter( + bool, + [target.get("dev") + for target in tree.findall('devices/disk/target')]): + disk = virt_inspector.Disk(device=device) + block_stats = domain.blockStats(device) + stats = virt_inspector.DiskStats(read_requests=block_stats[0], + read_bytes=block_stats[1], + write_requests=block_stats[2], + write_bytes=block_stats[3], + errors=block_stats[4]) + yield (disk, stats) diff --git a/monagent/collector/virt/vmware/__init__.py b/monagent/collector/virt/vmware/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/monagent/collector/virt/vmware/inspector.py b/monagent/collector/virt/vmware/inspector.py new file mode 100644 index 00000000..e8cc674e --- /dev/null +++ b/monagent/collector/virt/vmware/inspector.py @@ -0,0 +1,182 @@ +# Copyright (c) 2014 VMware, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Implementation of Inspector abstraction for VMware vSphere""" + +from oslo.config import cfg +from oslo.utils import units +from oslo.vmware import api + +from monagent.collector.virt import inspector as virt_inspector +from monagent.collector.virt.vmware import vsphere_operations + + +opt_group = cfg.OptGroup(name='vmware', + title='Options for VMware') + +OPTS = [ + cfg.StrOpt('host_ip', + default='', + help='IP address of the VMware Vsphere host'), + cfg.StrOpt('host_username', + default='', + help='Username of VMware Vsphere'), + cfg.StrOpt('host_password', + default='', + help='Password of VMware Vsphere', + secret=True), + cfg.IntOpt('api_retry_count', + default=10, + help='Number of times a VMware Vsphere API must be retried'), + cfg.FloatOpt('task_poll_interval', + default=0.5, + help='Sleep time in seconds for polling an ongoing async ' + 'task'), + cfg.StrOpt('wsdl_location', + help='Optional vim service WSDL location ' + 'e.g http:///vimService.wsdl. ' + 'Optional over-ride to default location for bug ' + 'work-arounds'), +] + +cfg.CONF.register_group(opt_group) +cfg.CONF.register_opts(OPTS, group=opt_group) + +VC_AVERAGE_MEMORY_CONSUMED_CNTR = 'mem:consumed:average' +VC_AVERAGE_CPU_CONSUMED_CNTR = 'cpu:usage:average' +VC_NETWORK_RX_COUNTER = 'net:received:average' +VC_NETWORK_TX_COUNTER = 'net:transmitted:average' +VC_DISK_READ_RATE_CNTR = "disk:read:average" +VC_DISK_READ_REQUESTS_RATE_CNTR = "disk:numberReadAveraged:average" +VC_DISK_WRITE_RATE_CNTR = "disk:write:average" +VC_DISK_WRITE_REQUESTS_RATE_CNTR = "disk:numberWriteAveraged:average" + + +def get_api_session(): + api_session = api.VMwareAPISession( + cfg.CONF.vmware.host_ip, + cfg.CONF.vmware.host_username, + cfg.CONF.vmware.host_password, + cfg.CONF.vmware.api_retry_count, + cfg.CONF.vmware.task_poll_interval, + wsdl_loc=cfg.CONF.vmware.wsdl_location) + return api_session + + +class VsphereInspector(virt_inspector.Inspector): + + def __init__(self): + super(VsphereInspector, self).__init__() + self._ops = vsphere_operations.VsphereOperations( + get_api_session(), 1000) + + def inspect_cpu_util(self, instance, duration=None): + vm_moid = self._ops.get_vm_moid(instance.id) + if vm_moid is None: + raise virt_inspector.InstanceNotFoundException( + _('VM %s not found in VMware Vsphere') % instance.id) + cpu_util_counter_id = self._ops.get_perf_counter_id( + VC_AVERAGE_CPU_CONSUMED_CNTR) + cpu_util = self._ops.query_vm_aggregate_stats( + vm_moid, cpu_util_counter_id, duration) + + # For this counter vSphere returns values scaled-up by 100, since the + # corresponding API can't return decimals, but only longs. + # For e.g. if the utilization is 12.34%, the value returned is 1234. + # Hence, dividing by 100. + cpu_util = cpu_util / 100 + return virt_inspector.CPUUtilStats(util=cpu_util) + + def inspect_vnic_rates(self, instance, duration=None): + vm_moid = self._ops.get_vm_moid(instance.id) + if not vm_moid: + raise virt_inspector.InstanceNotFoundException( + _('VM %s not found in VMware Vsphere') % instance.id) + + vnic_stats = {} + vnic_ids = set() + + for net_counter in (VC_NETWORK_RX_COUNTER, VC_NETWORK_TX_COUNTER): + net_counter_id = self._ops.get_perf_counter_id(net_counter) + vnic_id_to_stats_map = self._ops.query_vm_device_stats( + vm_moid, net_counter_id, duration) + vnic_stats[net_counter] = vnic_id_to_stats_map + vnic_ids.update(vnic_id_to_stats_map.iterkeys()) + + # Stats provided from vSphere are in KB/s, converting it to B/s. + for vnic_id in vnic_ids: + rx_bytes_rate = (vnic_stats[VC_NETWORK_RX_COUNTER] + .get(vnic_id, 0) * units.Ki) + tx_bytes_rate = (vnic_stats[VC_NETWORK_TX_COUNTER] + .get(vnic_id, 0) * units.Ki) + + stats = virt_inspector.InterfaceRateStats(rx_bytes_rate, + tx_bytes_rate) + interface = virt_inspector.Interface( + name=vnic_id, + mac=None, + fref=None, + parameters=None) + yield (interface, stats) + + def inspect_memory_usage(self, instance, duration=None): + vm_moid = self._ops.get_vm_moid(instance.id) + if vm_moid is None: + raise virt_inspector.InstanceNotFoundException( + _('VM %s not found in VMware Vsphere') % instance.id) + mem_counter_id = self._ops.get_perf_counter_id( + VC_AVERAGE_MEMORY_CONSUMED_CNTR) + memory = self._ops.query_vm_aggregate_stats( + vm_moid, mem_counter_id, duration) + # Stat provided from vSphere is in KB, converting it to MB. + memory = memory / units.Ki + return virt_inspector.MemoryUsageStats(usage=memory) + + def inspect_disk_rates(self, instance, duration=None): + vm_moid = self._ops.get_vm_moid(instance.id) + if not vm_moid: + raise virt_inspector.InstanceNotFoundException( + _('VM %s not found in VMware Vsphere') % instance.id) + + disk_stats = {} + disk_ids = set() + disk_counters = [ + VC_DISK_READ_RATE_CNTR, + VC_DISK_READ_REQUESTS_RATE_CNTR, + VC_DISK_WRITE_RATE_CNTR, + VC_DISK_WRITE_REQUESTS_RATE_CNTR + ] + + for disk_counter in disk_counters: + disk_counter_id = self._ops.get_perf_counter_id(disk_counter) + disk_id_to_stat_map = self._ops.query_vm_device_stats( + vm_moid, disk_counter_id, duration) + disk_stats[disk_counter] = disk_id_to_stat_map + disk_ids.update(disk_id_to_stat_map.iterkeys()) + + for disk_id in disk_ids: + + def stat_val(counter_name): + return disk_stats[counter_name].get(disk_id, 0) + + disk = virt_inspector.Disk(device=disk_id) + # Stats provided from vSphere are in KB/s, converting it to B/s. + disk_rate_info = virt_inspector.DiskRateStats( + read_bytes_rate=stat_val(VC_DISK_READ_RATE_CNTR) * units.Ki, + read_requests_rate=stat_val(VC_DISK_READ_REQUESTS_RATE_CNTR), + write_bytes_rate=stat_val(VC_DISK_WRITE_RATE_CNTR) * units.Ki, + write_requests_rate=stat_val(VC_DISK_WRITE_REQUESTS_RATE_CNTR) + ) + yield(disk, disk_rate_info) diff --git a/monagent/collector/virt/vmware/vsphere_operations.py b/monagent/collector/virt/vmware/vsphere_operations.py new file mode 100644 index 00000000..2531e1af --- /dev/null +++ b/monagent/collector/virt/vmware/vsphere_operations.py @@ -0,0 +1,229 @@ +# Copyright (c) 2014 VMware, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo.vmware import vim_util + + +PERF_MANAGER_TYPE = "PerformanceManager" +PERF_COUNTER_PROPERTY = "perfCounter" +VM_INSTANCE_ID_PROPERTY = 'config.extraConfig["nvp.vm-uuid"].value' + +# ESXi Servers sample performance data every 20 seconds. 20-second interval +# data is called instance data or real-time data. To retrieve instance data, +# we need to specify a value of 20 seconds for the "PerfQuerySpec.intervalId" +# property. In that case the "QueryPerf" method operates as a raw data feed +# that bypasses the vCenter database and instead retrieves performance data +# from an ESXi host. +# The following value is time interval for real-time performance stats +# in seconds and it is not configurable. +VC_REAL_TIME_SAMPLING_INTERVAL = 20 + + +class VsphereOperations(object): + """Class to invoke vSphere APIs calls. + + vSphere APIs calls are required by various pollsters, collecting data from + VMware infrastructure. + """ + def __init__(self, api_session, max_objects): + self._api_session = api_session + self._max_objects = max_objects + # Mapping between "VM's Nova instance Id" -> "VM's MOID" + # In case a VM is deployed by Nova, then its name is instance ID. + # So this map essentially has VM names as keys. + self._vm_moid_lookup_map = {} + + # Mapping from full name -> ID, for VC Performance counters + self._perf_counter_id_lookup_map = None + + def _init_vm_moid_lookup_map(self): + session = self._api_session + result = session.invoke_api(vim_util, "get_objects", session.vim, + "VirtualMachine", self._max_objects, + [VM_INSTANCE_ID_PROPERTY], + False) + while result: + for vm_object in result.objects: + vm_moid = vm_object.obj.value + # propSet will be set only if the server provides value + if hasattr(vm_object, 'propSet') and vm_object.propSet: + vm_instance_id = vm_object.propSet[0].val + if vm_instance_id: + self._vm_moid_lookup_map[vm_instance_id] = vm_moid + + result = session.invoke_api(vim_util, "continue_retrieval", + session.vim, result) + + def get_vm_moid(self, vm_instance_id): + """Method returns VC MOID of the VM by its NOVA instance ID.""" + if vm_instance_id not in self._vm_moid_lookup_map: + self._init_vm_moid_lookup_map() + + return self._vm_moid_lookup_map.get(vm_instance_id, None) + + def _init_perf_counter_id_lookup_map(self): + + # Query details of all the performance counters from VC + session = self._api_session + client_factory = session.vim.client.factory + perf_manager = session.vim.service_content.perfManager + + prop_spec = vim_util.build_property_spec( + client_factory, PERF_MANAGER_TYPE, [PERF_COUNTER_PROPERTY]) + + obj_spec = vim_util.build_object_spec( + client_factory, perf_manager, None) + + filter_spec = vim_util.build_property_filter_spec( + client_factory, [prop_spec], [obj_spec]) + + options = client_factory.create('ns0:RetrieveOptions') + options.maxObjects = 1 + + prop_collector = session.vim.service_content.propertyCollector + result = session.invoke_api(session.vim, "RetrievePropertiesEx", + prop_collector, specSet=[filter_spec], + options=options) + + perf_counter_infos = result.objects[0].propSet[0].val.PerfCounterInfo + + # Extract the counter Id for each counter and populate the map + self._perf_counter_id_lookup_map = {} + for perf_counter_info in perf_counter_infos: + + counter_group = perf_counter_info.groupInfo.key + counter_name = perf_counter_info.nameInfo.key + counter_rollup_type = perf_counter_info.rollupType + counter_id = perf_counter_info.key + + counter_full_name = (counter_group + ":" + counter_name + ":" + + counter_rollup_type) + self._perf_counter_id_lookup_map[counter_full_name] = counter_id + + def get_perf_counter_id(self, counter_full_name): + """Method returns the ID of VC performance counter by its full name. + + A VC performance counter is uniquely identified by the + tuple {'Group Name', 'Counter Name', 'Rollup Type'}. + It will have an id - counter ID (changes from one VC to another), + which is required to query performance stats from that VC. + This method returns the ID for a counter, + assuming 'CounterFullName' => 'Group Name:CounterName:RollupType'. + """ + if not self._perf_counter_id_lookup_map: + self._init_perf_counter_id_lookup_map() + return self._perf_counter_id_lookup_map[counter_full_name] + + # TODO(akhils@vmware.com) Move this method to common library + # when it gets checked-in + def query_vm_property(self, vm_moid, property_name): + """Method returns the value of specified property for a VM. + + :param vm_moid: moid of the VM whose property is to be queried + :param property_name: path of the property + """ + vm_mobj = vim_util.get_moref(vm_moid, "VirtualMachine") + session = self._api_session + return session.invoke_api(vim_util, "get_object_property", + session.vim, vm_mobj, property_name) + + def query_vm_aggregate_stats(self, vm_moid, counter_id, duration): + """Method queries the aggregated real-time stat value for a VM. + + This method should be used for aggregate counters. + + :param vm_moid: moid of the VM + :param counter_id: id of the perf counter in VC + :param duration: in seconds from current time, + over which the stat value was applicable + :return: the aggregated stats value for the counter + """ + # For aggregate counters, device_name should be "" + stats = self._query_vm_perf_stats(vm_moid, counter_id, "", duration) + + # Performance manager provides the aggregated stats value + # with device name -> None + return stats.get(None, 0) + + def query_vm_device_stats(self, vm_moid, counter_id, duration): + """Method queries the real-time stat values for a VM, for all devices. + + This method should be used for device(non-aggregate) counters. + + :param vm_moid: moid of the VM + :param counter_id: id of the perf counter in VC + :param duration: in seconds from current time, + over which the stat value was applicable + :return: a map containing the stat values keyed by the device ID/name + """ + # For device counters, device_name should be "*" to get stat values + # for all devices. + stats = self._query_vm_perf_stats(vm_moid, counter_id, "*", duration) + + # For some device counters, in addition to the per device value + # the Performance manager also returns the aggregated value. + # Just to be consistent, deleting the aggregated value if present. + stats.pop(None, None) + return stats + + def _query_vm_perf_stats(self, vm_moid, counter_id, device_name, duration): + """Method queries the real-time stat values for a VM. + + :param vm_moid: moid of the VM for which stats are needed + :param counter_id: id of the perf counter in VC + :param device_name: name of the device for which stats are to be + queried. For aggregate counters pass empty string (""). + For device counters pass "*", if stats are required over all + devices. + :param duration: in seconds from current time, + over which the stat value was applicable + :return: a map containing the stat values keyed by the device ID/name + """ + + session = self._api_session + client_factory = session.vim.client.factory + + # Construct the QuerySpec + metric_id = client_factory.create('ns0:PerfMetricId') + metric_id.counterId = counter_id + metric_id.instance = device_name + + query_spec = client_factory.create('ns0:PerfQuerySpec') + query_spec.entity = vim_util.get_moref(vm_moid, "VirtualMachine") + query_spec.metricId = [metric_id] + query_spec.intervalId = VC_REAL_TIME_SAMPLING_INTERVAL + # We query all samples which are applicable over the specified duration + samples_cnt = (int(duration / VC_REAL_TIME_SAMPLING_INTERVAL) + if duration else 1) + query_spec.maxSample = samples_cnt + + perf_manager = session.vim.service_content.perfManager + perf_stats = session.invoke_api(session.vim, 'QueryPerf', perf_manager, + querySpec=[query_spec]) + + stat_values = {} + if perf_stats: + entity_metric = perf_stats[0] + sample_infos = entity_metric.sampleInfo + + if len(sample_infos) > 0: + for metric_series in entity_metric.value: + # Take the average of all samples to improve the accuracy + # of the stat value + stat_value = float(sum(metric_series.value)) / samples_cnt + device_id = metric_series.id.instance + stat_values[device_id] = stat_value + + return stat_values diff --git a/monagent/collector/virt/xenapi/__init__.py b/monagent/collector/virt/xenapi/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/monagent/collector/virt/xenapi/inspector.py b/monagent/collector/virt/xenapi/inspector.py new file mode 100644 index 00000000..5124718c --- /dev/null +++ b/monagent/collector/virt/xenapi/inspector.py @@ -0,0 +1,192 @@ +# Copyright 2014 Intel +# +# Author: Ren Qiaowei +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Implementation of Inspector abstraction for XenAPI.""" + +from eventlet import timeout +from oslo.config import cfg +from oslo.utils import units +try: + import XenAPI as api +except ImportError: + api = None + +from ceilometer.compute.pollsters import util +from monagent.collector.virt import inspector as virt_inspector + +opt_group = cfg.OptGroup(name='xenapi', + title='Options for XenAPI') + +xenapi_opts = [ + cfg.StrOpt('connection_url', + help='URL for connection to XenServer/Xen Cloud Platform'), + cfg.StrOpt('connection_username', + default='root', + help='Username for connection to XenServer/Xen Cloud Platform'), + cfg.StrOpt('connection_password', + help='Password for connection to XenServer/Xen Cloud Platform', + secret=True), + cfg.IntOpt('login_timeout', + default=10, + help='Timeout in seconds for XenAPI login.'), +] + +CONF = cfg.CONF +CONF.register_group(opt_group) +CONF.register_opts(xenapi_opts, group=opt_group) + + +class XenapiException(virt_inspector.InspectorException): + pass + + +def get_api_session(): + if not api: + raise ImportError(_('XenAPI not installed')) + + url = CONF.xenapi.connection_url + username = CONF.xenapi.connection_username + password = CONF.xenapi.connection_password + if not url or password is None: + raise XenapiException(_('Must specify connection_url, and ' + 'connection_password to use')) + + exception = api.Failure(_("Unable to log in to XenAPI " + "(is the Dom0 disk full?)")) + try: + session = api.Session(url) + with timeout.Timeout(CONF.xenapi.login_timeout, exception): + session.login_with_password(username, password) + except api.Failure as e: + msg = _("Could not connect to XenAPI: %s") % e.details[0] + raise XenapiException(msg) + return session + + +class XenapiInspector(virt_inspector.Inspector): + + def __init__(self): + super(XenapiInspector, self).__init__() + self.session = get_api_session() + + def _get_host_ref(self): + """Return the xenapi host on which nova-compute runs on.""" + return self.session.xenapi.session.get_this_host(self.session.handle) + + def _call_xenapi(self, method, *args): + return self.session.xenapi_request(method, args) + + def _list_vms(self): + host_ref = self._get_host_ref() + vms = self._call_xenapi("VM.get_all_records_where", + 'field "is_control_domain"="false" and ' + 'field "is_a_template"="false" and ' + 'field "resident_on"="%s"' % host_ref) + for vm_ref in vms.keys(): + yield vm_ref, vms[vm_ref] + + def _lookup_by_name(self, instance_name): + vm_refs = self._call_xenapi("VM.get_by_name_label", instance_name) + n = len(vm_refs) + if n == 0: + raise virt_inspector.InstanceNotFoundException( + _('VM %s not found in XenServer') % instance_name) + elif n > 1: + raise XenapiException( + _('Multiple VM %s found in XenServer') % instance_name) + else: + return vm_refs[0] + + def inspect_instances(self): + for vm_ref, vm_rec in self._list_vms(): + name = vm_rec['name_label'] + other_config = vm_rec['other_config'] + uuid = other_config.get('nova_uuid') + if uuid: + yield virt_inspector.Instance(name, uuid) + + def inspect_cpu_util(self, instance, duration=None): + instance_name = util.instance_name(instance) + vm_ref = self._lookup_by_name(instance_name) + metrics_ref = self._call_xenapi("VM.get_metrics", vm_ref) + metrics_rec = self._call_xenapi("VM_metrics.get_record", + metrics_ref) + vcpus_number = metrics_rec['VCPUs_number'] + vcpus_utils = metrics_rec['VCPUs_utilisation'] + if len(vcpus_utils) == 0: + msg = _("Could not get VM %s CPU Utilization") % instance_name + raise XenapiException(msg) + + utils = 0.0 + for num in range(int(vcpus_number)): + utils += vcpus_utils.get(str(num)) + utils = utils / int(vcpus_number) * 100 + return virt_inspector.CPUUtilStats(util=utils) + + def inspect_memory_usage(self, instance, duration=None): + instance_name = util.instance_name(instance) + vm_ref = self._lookup_by_name(instance_name) + metrics_ref = self._call_xenapi("VM.get_metrics", vm_ref) + metrics_rec = self._call_xenapi("VM_metrics.get_record", + metrics_ref) + # Stat provided from XenServer is in B, converting it to MB. + memory = long(metrics_rec['memory_actual']) / units.Mi + return virt_inspector.MemoryUsageStats(usage=memory) + + def inspect_vnic_rates(self, instance, duration=None): + instance_name = util.instance_name(instance) + vm_ref = self._lookup_by_name(instance_name) + vif_refs = self._call_xenapi("VM.get_VIFs", vm_ref) + if vif_refs: + for vif_ref in vif_refs: + vif_rec = self._call_xenapi("VIF.get_record", vif_ref) + vif_metrics_ref = self._call_xenapi( + "VIF.get_metrics", vif_ref) + vif_metrics_rec = self._call_xenapi( + "VIF_metrics.get_record", vif_metrics_ref) + + interface = virt_inspector.Interface( + name=vif_rec['uuid'], + mac=vif_rec['MAC'], + fref=None, + parameters=None) + rx_rate = float(vif_metrics_rec['io_read_kbs']) * units.Ki + tx_rate = float(vif_metrics_rec['io_write_kbs']) * units.Ki + stats = virt_inspector.InterfaceRateStats(rx_rate, tx_rate) + yield (interface, stats) + + def inspect_disk_rates(self, instance, duration=None): + instance_name = util.instance_name(instance) + vm_ref = self._lookup_by_name(instance_name) + vbd_refs = self._call_xenapi("VM.get_VBDs", vm_ref) + if vbd_refs: + for vbd_ref in vbd_refs: + vbd_rec = self._call_xenapi("VBD.get_record", vbd_ref) + vbd_metrics_ref = self._call_xenapi("VBD.get_metrics", + vbd_ref) + vbd_metrics_rec = self._call_xenapi("VBD_metrics.get_record", + vbd_metrics_ref) + + disk = virt_inspector.Disk(device=vbd_rec['device']) + # Stats provided from XenServer are in KB/s, + # converting it to B/s. + read_rate = float(vbd_metrics_rec['io_read_kbs']) * units.Ki + write_rate = float(vbd_metrics_rec['io_write_kbs']) * units.Ki + disk_rate_info = virt_inspector.DiskRateStats( + read_bytes_rate=read_rate, + read_requests_rate=0, + write_bytes_rate=write_rate, + write_requests_rate=0) + yield(disk, disk_rate_info) diff --git a/monagent/common/aggregator.py b/monagent/common/aggregator.py index a9a1e486..0aae873f 100644 --- a/monagent/common/aggregator.py +++ b/monagent/common/aggregator.py @@ -42,7 +42,8 @@ class Aggregator(object): @staticmethod def formatter(metric, value, timestamp, dimensions, hostname, - device_name=None, metric_type=None, interval=None): + delegated_tenant=None, device_name=None, metric_type=None, + interval=None): """ Formats metrics, put them into a Measurement class (metric, timestamp, value, {"dimensions": {"name1": "value1", "name2": "value2"}, ...}) dimensions should be a dictionary @@ -54,7 +55,8 @@ class Aggregator(object): if device_name: dimensions['device_name'] = device_name - return Measurement(metric, int(timestamp), value, dimensions) + return Measurement(metric, int(timestamp), value, dimensions, + delegated_tenant) def packets_per_second(self, interval): if interval == 0: @@ -151,8 +153,9 @@ class MetricsBucketAggregator(Aggregator): def calculate_bucket_start(self, timestamp): return timestamp - (timestamp % self.interval) - def submit_metric(self, name, value, mtype, dimensions=None, hostname=None, - device_name=None, timestamp=None, sample_rate=1): + def submit_metric(self, name, value, mtype, dimensions=None, + delegated_tenant=None, hostname=None, device_name=None, + timestamp=None, sample_rate=1): # Avoid calling extra functions to dedupe dimensions if there are none # Note: if you change the way that context is created, please also change create_empty_metrics, # which counts on this order @@ -184,7 +187,8 @@ class MetricsBucketAggregator(Aggregator): if context not in metric_by_context: metric_class = self.metric_type_to_class[mtype] - metric_by_context[context] = metric_class(self.formatter, name, new_dimensions, + metric_by_context[context] = metric_class(self.formatter, name, + new_dimensions, delegated_tenant, hostname or self.hostname, device_name) metric_by_context[context].sample(value, sample_rate, timestamp) @@ -283,19 +287,23 @@ class MetricsAggregator(Aggregator): '_dd-r': Rate, } - def submit_metric(self, name, value, mtype, dimensions=None, hostname=None, - device_name=None, timestamp=None, sample_rate=1): + def submit_metric(self, name, value, mtype, dimensions=None, + delegated_tenant=None, hostname=None, device_name=None, + timestamp=None, sample_rate=1): + # Avoid calling extra functions to dedupe dimensions if there are none if dimensions is not None: new_dimensions = dimensions.copy() - context = (name, tuple(new_dimensions.items()), hostname, device_name) + context = (name, tuple(new_dimensions.items()), delegated_tenant, + hostname, device_name) else: new_dimensions = None - context = (name, new_dimensions, hostname, device_name) + context = (name, new_dimensions, delegated_tenant, + hostname, device_name) if context not in self.metrics: metric_class = self.metric_type_to_class[mtype] - self.metrics[context] = metric_class(self.formatter, name, new_dimensions, + self.metrics[context] = metric_class(self.formatter, name, new_dimensions, delegated_tenant, hostname or self.hostname, device_name) cur_time = time() if timestamp is not None and cur_time - int(timestamp) > self.recent_point_threshold: @@ -304,23 +312,35 @@ class MetricsAggregator(Aggregator): else: self.metrics[context].sample(value, sample_rate, timestamp) - def gauge(self, name, value, dimensions=None, hostname=None, device_name=None, timestamp=None): - self.submit_metric(name, value, 'g', dimensions, hostname, device_name, timestamp) + def gauge(self, name, value, dimensions=None, delegated_tenant=None, + hostname=None, device_name=None, timestamp=None): + self.submit_metric(name, value, 'g', dimensions, delegated_tenant, + hostname, device_name, timestamp) - def increment(self, name, value=1, dimensions=None, hostname=None, device_name=None): - self.submit_metric(name, value, 'c', dimensions, hostname, device_name) + def increment(self, name, value=1, dimensions=None, delegated_tenant=None, + hostname=None, device_name=None): + self.submit_metric(name, value, 'c', dimensions, delegated_tenant, + hostname, device_name) - def decrement(self, name, value=-1, dimensions=None, hostname=None, device_name=None): - self.submit_metric(name, value, 'c', dimensions, hostname, device_name) + def decrement(self, name, value=-1, dimensions=None, delegated_tenant=None, + hostname=None, device_name=None): + self.submit_metric(name, value, 'c', dimensions, delegated_tenant, + hostname, device_name) - def rate(self, name, value, dimensions=None, hostname=None, device_name=None): - self.submit_metric(name, value, '_dd-r', dimensions, hostname, device_name) + def rate(self, name, value, dimensions=None, delegated_tenant=None, + hostname=None, device_name=None): + self.submit_metric(name, value, '_dd-r', dimensions, delegated_tenant, + hostname, device_name) - def histogram(self, name, value, dimensions=None, hostname=None, device_name=None): - self.submit_metric(name, value, 'h', dimensions, hostname, device_name) + def histogram(self, name, value, dimensions=None, delegated_tenant=None, + hostname=None, device_name=None): + self.submit_metric(name, value, 'h', dimensions, delegated_tenant, + hostname, device_name) - def set(self, name, value, dimensions=None, hostname=None, device_name=None): - self.submit_metric(name, value, 's', dimensions, hostname, device_name) + def set(self, name, value, dimensions=None, delegated_tenant=None, + hostname=None, device_name=None): + self.submit_metric(name, value, 's', dimensions, delegated_tenant, + hostname, device_name) def flush(self): timestamp = time() diff --git a/monagent/common/metrics.py b/monagent/common/metrics.py index c43ea73e..eb7cdd10 100644 --- a/monagent/common/metrics.py +++ b/monagent/common/metrics.py @@ -14,7 +14,8 @@ log = logging.getLogger(__name__) # in the current setup both the emitter and the mon api are converting to json in for loops # A Measurement is the standard format used to pass data from the # collector and monstatsd to the forwarder -Measurement = namedtuple('Measurement', ['name', 'timestamp', 'value', 'dimensions']) +Measurement = namedtuple('Measurement', ['name', 'timestamp', 'value', + 'dimensions', 'delegated_tenant']) class MetricTypes(object): @@ -43,11 +44,13 @@ class Gauge(Metric): """ A metric that tracks a value at particular points in time. """ - def __init__(self, formatter, name, dimensions, hostname, device_name): + def __init__(self, formatter, name, dimensions, delegated_tenant, + hostname, device_name): self.formatter = formatter self.name = name self.value = None self.dimensions = dimensions + self.delegated_tenant = delegated_tenant self.hostname = hostname self.device_name = device_name self.last_sample_time = None @@ -65,6 +68,7 @@ class Gauge(Metric): timestamp=self.timestamp or timestamp, value=self.value, dimensions=self.dimensions, + delegated_tenant=self.delegated_tenant, hostname=self.hostname, device_name=self.device_name, metric_type=MetricTypes.GAUGE, @@ -92,6 +96,7 @@ class BucketGauge(Gauge): timestamp=timestamp, value=self.value, dimensions=self.dimensions, + delegated_tenant=self.delegated_tenant, hostname=self.hostname, device_name=self.device_name, metric_type=MetricTypes.GAUGE, @@ -107,11 +112,13 @@ class Counter(Metric): """ A metric that tracks a counter value. """ - def __init__(self, formatter, name, dimensions, hostname, device_name): + def __init__(self, formatter, name, dimensions, delegated_tenant, + hostname, device_name): self.formatter = formatter self.name = name self.value = 0 self.dimensions = dimensions + self.delegated_tenant = delegated_tenant self.hostname = hostname self.device_name = device_name self.last_sample_time = None @@ -128,6 +135,7 @@ class Counter(Metric): value=value, timestamp=timestamp, dimensions=self.dimensions, + delegated_tenant=self.delegated_tenant, hostname=self.hostname, device_name=self.device_name, metric_type=MetricTypes.RATE, @@ -141,13 +149,15 @@ class Histogram(Metric): """ A metric to track the distribution of a set of values. """ - def __init__(self, formatter, name, dimensions, hostname, device_name): + def __init__(self, formatter, name, dimensions, delegated_tenant, + hostname, device_name): self.formatter = formatter self.name = name self.count = 0 self.samples = [] self.percentiles = [0.95] self.dimensions = dimensions + self.delegated_tenant = delegated_tenant self.hostname = hostname self.device_name = device_name self.last_sample_time = None @@ -179,6 +189,7 @@ class Histogram(Metric): hostname=self.hostname, device_name=self.device_name, dimensions=self.dimensions, + delegated_tenant=self.delegated_tenant, metric='%s.%s' % (self.name, suffix), value=value, timestamp=ts, @@ -193,6 +204,7 @@ class Histogram(Metric): metrics.append(self.formatter( hostname=self.hostname, dimensions=self.dimensions, + delegated_tenant=self.delegated_tenant, metric=name, value=val, timestamp=ts, @@ -211,10 +223,12 @@ class Set(Metric): """ A metric to track the number of unique elements in a set. """ - def __init__(self, formatter, name, dimensions, hostname, device_name): + def __init__(self, formatter, name, dimensions, delegated_tenant, + hostname, device_name): self.formatter = formatter self.name = name self.dimensions = dimensions + self.delegated_tenant = delegated_tenant self.hostname = hostname self.device_name = device_name self.values = set() @@ -232,6 +246,7 @@ class Set(Metric): hostname=self.hostname, device_name=self.device_name, dimensions=self.dimensions, + delegated_tenant=self.delegated_tenant, metric=self.name, value=len(self.values), timestamp=timestamp, @@ -246,10 +261,12 @@ class Rate(Metric): """ Track the rate of metrics over each flush interval """ - def __init__(self, formatter, name, dimensions, hostname, device_name): + def __init__(self, formatter, name, dimensions, delegated_tenant, + hostname, device_name): self.formatter = formatter self.name = name self.dimensions = dimensions + self.delegated_tenant = delegated_tenant self.hostname = hostname self.device_name = device_name self.samples = [] @@ -286,6 +303,7 @@ class Rate(Metric): hostname=self.hostname, device_name=self.device_name, dimensions=self.dimensions, + delegated_tenant=self.delegated_tenant, metric=self.name, value=val, timestamp=timestamp, diff --git a/monagent/forwarder/api/mon.py b/monagent/forwarder/api/mon.py index d3725530..23e3268d 100644 --- a/monagent/forwarder/api/mon.py +++ b/monagent/forwarder/api/mon.py @@ -45,15 +45,16 @@ class MonAPI(object): self.backlog_send_rate = config['backlog_send_rate'] self.message_queue = deque(maxlen=self.max_buffer_size) - def _post(self, measurements): + def _post(self, measurements, delegated_tenant=None): """Does the actual http post measurements is a list of Measurement """ - data = [m.__dict__ for m in measurements] kwargs = { - 'jsonbody': data + 'jsonbody': measurements } + if delegated_tenant is not None: + kwargs['tenant_id'] = delegated_tenant if not self.mon_client: # construct the monasca client self.mon_client = self.get_client() @@ -91,7 +92,16 @@ class MonAPI(object): else: measurement.dimensions = self.default_dimensions.copy() - self._post(measurements) + # Split out separate POSTs for each delegated tenant (includes 'None') + tenant_group = {} + for measurement in measurements: + m_dict = measurement.__dict__ + delegated_tenant = m_dict.pop('delegated_tenant') + if delegated_tenant not in tenant_group: + tenant_group[delegated_tenant] = [] + tenant_group[delegated_tenant].extend([m_dict.copy()]) + for tenant in tenant_group: + self._post(tenant_group[tenant], tenant) def get_client(self): """get_client diff --git a/monsetup/detection/plugins/libvirt.py b/monsetup/detection/plugins/libvirt.py new file mode 100644 index 00000000..9e5d6e8b --- /dev/null +++ b/monsetup/detection/plugins/libvirt.py @@ -0,0 +1,72 @@ +import logging +import os.path +import ConfigParser +import monsetup.detection +import monsetup.agent_config + +log = logging.getLogger(__name__) + +# Location of nova.conf to read sql_connect string +nova_conf = "/etc/nova/nova.conf" +# Directory to use for instance and metric caches (preferred tmpfs "/dev/shm") +cache_dir = "/dev/shm" +# Maximum age of instance cache before automatic refresh (in seconds) +nova_refresh = 60 * 60 * 4 # Four hours +# Probation period before metrics are gathered for a VM (in seconds) +vm_probation = 60 * 5 # Five minutes + + +class Libvirt(monsetup.detection.Plugin): + """Configures VM monitoring through Nova""" + + def _detect(self): + """Run detection, set self.available True if the service is detected. + """ + if (monsetup.detection.find_process_name('nova-api') is not None and + os.path.isfile(nova_conf)): + self.available = True + + def build_config(self): + """Build the config as a Plugins object and return back. + """ + config = monsetup.agent_config.Plugins() + + if self.dependencies_installed(): + nova_cfg = ConfigParser.SafeConfigParser() + nova_cfg.read(nova_conf) + sql_conn = nova_cfg.get('DEFAULT', 'sql_connection') + # Which configuration options are needed for the plugin YAML? + cfg_needed = ['admin_user', 'admin_password', + 'admin_tenant_name', 'identity_uri'] + cfg_section = 'keystone_authtoken' + + # Start with plugin-specific configuration parameters + init_config = {'cache_dir': cache_dir, + 'nova_refresh': nova_refresh, + 'vm_probation': vm_probation} + + for option in cfg_needed: + init_config[option] = nova_cfg.get(cfg_section, option) + + # Add version to identity_uri + init_config['identity_uri'] += '/v2.0' + + config['libvirt'] = {'init_config': init_config, + 'instances': [{}]} + + return config + + def dependencies_installed(self): + try: + import time + import yaml + import novaclient.v3.client + import monagent.collector.virt.inspector + except ImportError: + log.warn("\tDependencies not satisfied; plugin not configured.") + return False + if os.path.isdir(cache_dir) is False: + log.warn("\tCache directory {} not found;" + + " plugin not configured.".format(cache_dir)) + return False + return True diff --git a/monsetup/main.py b/monsetup/main.py index 67806bfa..8e6658cc 100644 --- a/monsetup/main.py +++ b/monsetup/main.py @@ -2,26 +2,38 @@ """ Detect running daemons then configure and start the agent. """ +import agent_config import argparse import logging import os +import platform import pwd import socket import subprocess import sys import yaml -import platform - -import agent_config -from detection.plugins import kafka_consumer, mon, mysql, network, zookeeper -from detection.plugins import nova, glance, cinder, neutron, swift -from detection.plugins import keystone, ceilometer +from detection.plugins import ceilometer +from detection.plugins import cinder +from detection.plugins import glance +from detection.plugins import kafka_consumer +from detection.plugins import keystone +from detection.plugins import libvirt +from detection.plugins import mon +from detection.plugins import mysql +from detection.plugins import network +from detection.plugins import neutron +from detection.plugins import nova +from detection.plugins import rabbitmq +from detection.plugins import swift +from detection.plugins import zookeeper from service import sysv # List of all detection plugins to run -DETECTION_PLUGINS = [kafka_consumer.Kafka, mon.MonAPI, mon.MonPersister, mon.MonThresh, mysql.MySQL, - network.Network, nova.Nova, cinder.Cinder, swift.Swift, glance.Glance, - ceilometer.Ceilometer, neutron.Neutron, keystone.Keystone, zookeeper.Zookeeper] +DETECTION_PLUGINS = [ceilometer.Ceilometer, cinder.Cinder, glance.Glance, + kafka_consumer.Kafka, keystone.Keystone, libvirt.Libvirt, + mon.MonAPI, mon.MonPersister, mon.MonThresh, mysql.MySQL, + network.Network, neutron.Neutron, nova.Nova, + rabbitmq.RabbitMQ, swift.Swift, zookeeper.Zookeeper] # Map OS to service type OS_SERVICE_MAP = {'Linux': sysv.SysV} diff --git a/setup.cfg b/setup.cfg index 3e6dfd0d..d0f51723 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,6 +32,12 @@ console_scripts = monasca-collector = monagent.collector.daemon:main monasca-statsd = monagent.monstatsd.daemon:main monasca-setup = monsetup.main:main +monagent.collector.virt = + libvirt = monagent.collector.virt.libvirt.inspector:LibvirtInspector + hyperv = monagent.collector.virt.hyperv.inspector:HyperVInspector + vsphere = monagent.collector.virt.vmware.inspector:VsphereInspector + xenapi = monagent.collector.virt.xenapi.inspector:XenapiInspector + [global] setup-hooks =