Merge "Improve docker plugin"
This commit is contained in:
commit
97778d6b05
@ -1,34 +1,20 @@
|
||||
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
|
||||
# (C) Copyright 2016 Hewlett Packard Enterprise Development LP
|
||||
|
||||
# Warning
|
||||
# The user running the Datadog Agent (usually "dd-agent") must be part of the "docker" group
|
||||
# The user running the monasca-agent (usually "mon-agent") must be part of the "docker" group
|
||||
|
||||
init_config:
|
||||
docker_root: /
|
||||
|
||||
# Timeout on Docker api connection. You may have to increase it if you have many containers.
|
||||
# connection_timeout: 3
|
||||
|
||||
# docker version to use when connecting to the docker api. Default is auto
|
||||
# version: "auto"
|
||||
|
||||
instances:
|
||||
- url: "unix://var/run/docker.sock"
|
||||
# URL of the Docker daemon socket to reach the Docker API. HTTP also works.
|
||||
- url: "unix://var/run/docker.sock"
|
||||
|
||||
# Include/Exclude rules
|
||||
#
|
||||
# To include or exclude containers based on their docker tags, use the include and
|
||||
# exclude keys in your instance.
|
||||
# The reasoning is: if a tag matches an exclude rule, it won't be included
|
||||
# unless it also matches an include rule.
|
||||
#
|
||||
# Examples:
|
||||
# exclude all, except ubuntu and debian:
|
||||
# instances:
|
||||
# - url: "unix://var/run/docker.sock"
|
||||
# include:
|
||||
# - "image:ubuntu"
|
||||
# - "image:debian"
|
||||
# exclude:
|
||||
# - ".*"
|
||||
#
|
||||
# include all, except ubuntu and debian:
|
||||
# instances:
|
||||
# - url: "unix://var/run/docker.sock"
|
||||
# include: []
|
||||
# exclude:
|
||||
# - "image:ubuntu"
|
||||
# - "image:debian"
|
||||
# Set to true if you want the kubernetes namepsace and pod name to be set as dimensions
|
||||
add_kubernetes_dimensions: True
|
||||
|
@ -644,6 +644,61 @@ The directory checks return the following metrics:
|
||||
| directory.files_count | path, hostname, service |
|
||||
|
||||
## Docker
|
||||
This plugin gathers metrics on docker containers.
|
||||
|
||||
A YAML file (docker.yaml) contains the url of the docker api to connect to and the root of docker that is used for looking for docker proc metrics.
|
||||
|
||||
For this check the user that is running the monasca agent (usually the mon-agent user) must be a part of the docker group
|
||||
|
||||
Also if you want to want to attach kubernetes dimensions to each metric you can set add_kubernetes_dimensions to true in the yaml file. This will set the pod_name and namespace.
|
||||
|
||||
Sample config:
|
||||
|
||||
Without kubernetes dimensions
|
||||
|
||||
```
|
||||
init_config:
|
||||
docker_root: /
|
||||
socket_timeout: 5
|
||||
instances:
|
||||
- url: "unix://var/run/docker.sock"
|
||||
```
|
||||
|
||||
With kubernetes dimensions
|
||||
```
|
||||
init_config:
|
||||
docker_root: /
|
||||
socket_timeout: 5
|
||||
instances:
|
||||
- url: "unix://var/run/docker.sock"
|
||||
add_kubernetes_dimensions: True
|
||||
```
|
||||
|
||||
Note this plugin only supports one instance in the config file.
|
||||
|
||||
The docker check return the following metrics:
|
||||
|
||||
| Metric Name | Metric Type | Dimensions | Optional_dimensions (set if add_kubernetes_dimensions is true and container is running under kubernetes) | Semantics |
|
||||
| ----------- | ---------- | --------- |
|
||||
| container.containers.running_count | Gauge | hostname | | Number of containers running on the host |
|
||||
| container.cpu.system_time | Gauge| hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The total time the CPU has executed system calls on behalf of the processes in the container |
|
||||
| container.cpu.system_time_sec | Rate | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The rate the CPU is executing system calls on behalf of the processes in the container |
|
||||
| container.cpu.user_time | Gauge | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The total time the CPU is under direct control of the processes in this container |
|
||||
| container.cpu.user_time_sec | Rate | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The rate the CPU is under direct control of the processes in this container |
|
||||
| container.cpu.utilization_perc | Gauge | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The percentage of CPU used by the container |
|
||||
| container.io.read_bytes | Gauge | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The total amount bytes read from the processes in the container |
|
||||
| container.io.read_bytes_sec | Rate | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The rate of bytes read from the processes in the container |
|
||||
| container.io.write_bytes | Gauge | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The total amount bytes written from the processes in the container |
|
||||
| container.io.write_bytes_sec | Rate | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The rate of bytes written from the processes in the container |
|
||||
| container.mem.cache | Gauge | hostname, name, image | kubernetes_pod_name, kubernetes_namespace |The amount of cached memory that belongs to the container's processes |
|
||||
| container.mem.rss | Gauge | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The amount of non-cached memory used by the container's processes |
|
||||
| container.mem.swap | Gauge | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The amount of swap memory used by the processes in the container |
|
||||
| container.mem.used_perc | Gauge | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The percentage of memory used out of the given limit of the container |
|
||||
| container.net.in_bytes | Gauge | hostname, name, image, interface | kubernetes_pod_name, kubernetes_namespace | The total amount of bytes received by the container per interface |
|
||||
| container.net.in_bytes_sec | Rate | hostname, name, image, interface | kubernetes_pod_name, kubernetes_namespace | The rate of bytes received by the container per interface |
|
||||
| container.net.out_bytes | Gauge | hostname, name, image, interface | kubernetes_pod_name, kubernetes_namespace | The total amount of bytes sent by the container per interface |
|
||||
| container.net.out_bytes_sec | Rate | hostname, name, image, interface | kubernetes_pod_name, kubernetes_namespace | The rate of bytes sent by the container per interface |
|
||||
|
||||
|
||||
## Elasticsearch Checks
|
||||
This section describes the Elasticsearch check that can be performed by the Agent. The Elasticsearch check requires a configuration file called elastic.yaml to be available in the agent conf.d configuration directory.
|
||||
|
@ -1,252 +1,362 @@
|
||||
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
|
||||
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development LP
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
import httplib
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
import urllib
|
||||
import urllib2
|
||||
from urlparse import urlsplit
|
||||
|
||||
from monasca_agent.collector.checks import AgentCheck
|
||||
import docker
|
||||
|
||||
from monasca_agent.collector import checks
|
||||
|
||||
CONTAINER_ID_RE = re.compile('[0-9a-f]{64}')
|
||||
DEFAULT_BASE_URL = "unix://var/run/docker.sock"
|
||||
DEFAULT_VERSION = "auto"
|
||||
DEFAULT_TIMEOUT = 3
|
||||
DEFAULT_ADD_KUBERNETES_DIMENSIONS = False
|
||||
JIFFY_HZ = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
|
||||
CGROUPS = ['cpuacct', 'memory', 'blkio']
|
||||
|
||||
|
||||
DEFAULT_MAX_CONTAINERS = 20
|
||||
class Docker(checks.AgentCheck):
|
||||
"""Collect metrics and events from Docker API and cgroups"""
|
||||
|
||||
LXC_METRICS = [
|
||||
{
|
||||
"cgroup": "memory",
|
||||
"file": "lxc/%s/memory.stat",
|
||||
"metrics": {
|
||||
"active_anon": ("docker.mem.active_anon", "gauge"),
|
||||
"active_file": ("docker.mem.active_file", "gauge"),
|
||||
"cache": ("docker.mem.cache", "gauge"),
|
||||
"hierarchical_memory_limit": ("docker.mem.hierarchical_memory_limit", "gauge"),
|
||||
"hierarchical_memsw_limit": ("docker.mem.hierarchical_memsw_limit", "gauge"),
|
||||
"inactive_anon": ("docker.mem.inactive_anon", "gauge"),
|
||||
"inactive_file": ("docker.mem.inactive_file", "gauge"),
|
||||
"mapped_file": ("docker.mem.mapped_file", "gauge"),
|
||||
"pgfault": ("docker.mem.pgfault", "gauge"),
|
||||
"pgmajfault": ("docker.mem.pgmajfault", "gauge"),
|
||||
"pgpgin": ("docker.mem.pgpgin", "gauge"),
|
||||
"pgpgout": ("docker.mem.pgpgout", "gauge"),
|
||||
"rss": ("docker.mem.rss", "gauge"),
|
||||
"swap": ("docker.mem.swap", "gauge"),
|
||||
"unevictable": ("docker.mem.unevictable", "gauge"),
|
||||
"total_active_anon": ("docker.mem.total_active_anon", "gauge"),
|
||||
"total_active_file": ("docker.mem.total_active_file", "gauge"),
|
||||
"total_cache": ("docker.mem.total_cache", "gauge"),
|
||||
"total_inactive_anon": ("docker.mem.total_inactive_anon", "gauge"),
|
||||
"total_inactive_file": ("docker.mem.total_inactive_file", "gauge"),
|
||||
"total_mapped_file": ("docker.mem.total_mapped_file", "gauge"),
|
||||
"total_pgfault": ("docker.mem.total_pgfault", "gauge"),
|
||||
"total_pgmajfault": ("docker.mem.total_pgmajfault", "gauge"),
|
||||
"total_pgpgin": ("docker.mem.total_pgpgin", "gauge"),
|
||||
"total_pgpgout": ("docker.mem.total_pgpgout", "gauge"),
|
||||
"total_rss": ("docker.mem.total_rss", "gauge"),
|
||||
"total_swap": ("docker.mem.total_swap", "gauge"),
|
||||
"total_unevictable": ("docker.mem.total_unevictable", "gauge"),
|
||||
}
|
||||
},
|
||||
{
|
||||
"cgroup": "cpuacct",
|
||||
"file": "lxc/%s/cpuacct.stat",
|
||||
"metrics": {
|
||||
"user": ("docker.cpu.user", "gauge"),
|
||||
"system": ("docker.cpu.system", "gauge"),
|
||||
},
|
||||
},
|
||||
]
|
||||
def __init__(self, name, init_config, agent_config, instances=None):
|
||||
checks.AgentCheck.__init__(self, name, init_config, agent_config, instances)
|
||||
|
||||
DOCKER_METRICS = {
|
||||
"SizeRw": ("docker.disk.size", "gauge"),
|
||||
}
|
||||
if instances is not None and len(instances) > 1:
|
||||
raise Exception('Docker check only supports one configured instance.')
|
||||
|
||||
DOCKER_TAGS = [
|
||||
"Command",
|
||||
"Image",
|
||||
]
|
||||
|
||||
|
||||
class UnixHTTPConnection(httplib.HTTPConnection, object):
|
||||
|
||||
"""Class used in conjunction with UnixSocketHandler to make urllib2
|
||||
|
||||
compatible with Unix sockets.
|
||||
"""
|
||||
|
||||
def __init__(self, unix_socket):
|
||||
self._unix_socket = unix_socket
|
||||
|
||||
def connect(self):
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
sock.connect(self._unix_socket)
|
||||
self.sock = sock
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
httplib.HTTPConnection.__init__(self, *args, **kwargs)
|
||||
return self
|
||||
|
||||
|
||||
class UnixSocketHandler(urllib2.AbstractHTTPHandler):
|
||||
|
||||
"""Class that makes Unix sockets work with urllib2 without any additional dependencies.
|
||||
|
||||
"""
|
||||
|
||||
def unix_open(self, req):
|
||||
full_path = "%s%s" % urlsplit(req.get_full_url())[1:3]
|
||||
path = os.path.sep
|
||||
for part in full_path.split("/"):
|
||||
path = os.path.join(path, part)
|
||||
if not os.path.exists(path):
|
||||
break
|
||||
unix_socket = path
|
||||
# add a host or else urllib2 complains
|
||||
url = req.get_full_url().replace(unix_socket, "/localhost")
|
||||
new_req = urllib2.Request(url, req.get_data(), dict(req.header_items()))
|
||||
new_req.timeout = req.timeout
|
||||
return self.do_open(UnixHTTPConnection(unix_socket), new_req)
|
||||
|
||||
unix_request = urllib2.AbstractHTTPHandler.do_request_
|
||||
|
||||
|
||||
class Docker(AgentCheck):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(Docker, self).__init__(*args, **kwargs)
|
||||
urllib2.install_opener(urllib2.build_opener(UnixSocketHandler()))
|
||||
self._mounpoints = {}
|
||||
for metric in LXC_METRICS:
|
||||
self._mounpoints[metric["cgroup"]] = self._find_cgroup(metric["cgroup"])
|
||||
self.connection_timeout = int(init_config.get('connection_timeout', DEFAULT_TIMEOUT))
|
||||
self.docker_version = init_config.get('version', DEFAULT_VERSION)
|
||||
self.docker_root = init_config.get('docker_root', '/')
|
||||
# Locate cgroups directories
|
||||
self._mount_points = {}
|
||||
self._cgroup_filename_pattern = None
|
||||
for cgroup in CGROUPS:
|
||||
self._mount_points[cgroup] = self._find_cgroup(cgroup)
|
||||
self._prev_cpu = {}
|
||||
self._curr_cpu = {}
|
||||
self._cpu_count = None
|
||||
self._prev_system_cpu = None
|
||||
|
||||
def check(self, instance):
|
||||
docker_url = instance.get('url', DEFAULT_BASE_URL)
|
||||
try:
|
||||
docker_client = docker.Client(base_url=docker_url, version=self.docker_version,
|
||||
timeout=self.connection_timeout)
|
||||
running_containers = {container['Id']: container for container in self._get_containers(docker_client)}
|
||||
except Exception as e:
|
||||
self.log.error("Could not get containers from Docker API skipping Docker check - {}".format(e))
|
||||
return
|
||||
add_kubernetes_dimensions = instance.get('add_kubernetes_dimensions', DEFAULT_ADD_KUBERNETES_DIMENSIONS)
|
||||
dimensions = self._set_dimensions(None, instance)
|
||||
containers = self._get_containers(instance)
|
||||
if not containers:
|
||||
self.log.warn("No containers are running.")
|
||||
self.gauge("container.running_count", len(running_containers), dimensions=dimensions)
|
||||
self._set_container_pids(running_containers)
|
||||
# Report container metrics from cgroups
|
||||
self._report_container_metrics(running_containers, add_kubernetes_dimensions, dimensions)
|
||||
|
||||
max_containers = instance.get('max_containers', DEFAULT_MAX_CONTAINERS)
|
||||
def _report_rate_gauge_metric(self, metric_name, value, dimensions):
|
||||
self.rate(metric_name + "_sec", value, dimensions=dimensions)
|
||||
self.gauge(metric_name, value, dimensions=dimensions)
|
||||
|
||||
if not instance.get("exclude") or not instance.get("include"):
|
||||
if len(containers) > max_containers:
|
||||
self.log.warn("Too many containers to collect. Please refine the containers to collect by editing the "
|
||||
"configuration file. Truncating to %s containers" % max_containers)
|
||||
containers = containers[:max_containers]
|
||||
def _report_container_metrics(self, container_dict, add_kubernetes_dimensions, dimensions):
|
||||
self._curr_system_cpu, self._cpu_count = self._get_system_cpu_ns()
|
||||
system_memory = self._get_total_memory()
|
||||
for container in container_dict.itervalues():
|
||||
try:
|
||||
container_dimensions = dimensions.copy()
|
||||
container_id = container['Id']
|
||||
container['name'] = self._get_container_name(container['Names'], container_id)
|
||||
container_dimensions['image'] = container['Image']
|
||||
container_labels = container['Labels']
|
||||
if add_kubernetes_dimensions:
|
||||
if 'io.kubernetes.pod.name' in container_labels:
|
||||
container_dimensions['kubernetes_pod_name'] = container_labels['io.kubernetes.pod.name']
|
||||
if 'io.kubernetes.pod.namespace' in container_labels:
|
||||
container_dimensions['kubernetes_namespace'] = container_labels['io.kubernetes.pod.namespace']
|
||||
self._report_cgroup_cpuacct(container_id, container_dimensions)
|
||||
self._report_cgroup_memory(container_id, container_dimensions, system_memory)
|
||||
self._report_cgroup_blkio(container_id, container_dimensions)
|
||||
if "_proc_root" in container:
|
||||
self._report_net_metrics(container, container_dimensions)
|
||||
|
||||
collected_containers = 0
|
||||
for container in containers:
|
||||
container_dimensions = dimensions.copy()
|
||||
container_dimensions['container_names'] = ' '.join(container["Names"])
|
||||
container_dimensions['docker_tags'] = ' '.join(DOCKER_TAGS)
|
||||
self._report_cgroup_cpu_pct(container_id, container_dimensions)
|
||||
except IOError as err:
|
||||
# It is possible that the container got stopped between the
|
||||
# API call and now
|
||||
self.log.info("IO error while collecting cgroup metrics, "
|
||||
"skipping container...", exc_info=err)
|
||||
except Exception as err:
|
||||
self.log.error("Error when collecting data about container {}".format(err))
|
||||
self._prev_system_cpu = self._curr_system_cpu
|
||||
|
||||
# Check if the container is included/excluded
|
||||
if not self._is_container_included(instance, container["Names"]):
|
||||
continue
|
||||
def _get_container_name(self, container_names, container_id):
|
||||
container_name = None
|
||||
if container_names:
|
||||
for name in container_names:
|
||||
# if there is more than one / the name is actually an alias
|
||||
if name.count('/') <= 1:
|
||||
container_name = str(name).lstrip('/')
|
||||
break
|
||||
return container_name if container_name else container_id
|
||||
|
||||
collected_containers += 1
|
||||
if collected_containers > max_containers:
|
||||
self.log.warn("Too many containers are matching the current configuration. Some containers will not "
|
||||
"be collected. Please refine your configuration")
|
||||
break
|
||||
def _report_cgroup_cpuacct(self, container_id, container_dimensions):
|
||||
stat_file = self._get_cgroup_file('cpuacct', container_id, 'cpuacct.stat')
|
||||
stats = self._parse_cgroup_pairs(stat_file)
|
||||
self._report_rate_gauge_metric('container.cpu.user_time', stats['user'], container_dimensions)
|
||||
self._report_rate_gauge_metric('container.cpu.system_time', stats['system'], container_dimensions)
|
||||
|
||||
for key, (dd_key, metric_type) in DOCKER_METRICS.items():
|
||||
if key in container:
|
||||
getattr(self, metric_type)(
|
||||
dd_key, int(container[key]), dimensions=container_dimensions)
|
||||
for metric in LXC_METRICS:
|
||||
mountpoint = self._mounpoints[metric["cgroup"]]
|
||||
stat_file = os.path.join(mountpoint, metric["file"] % container["Id"])
|
||||
stats = self._parse_cgroup_file(stat_file)
|
||||
for key, (dd_key, metric_type) in metric["metrics"].items():
|
||||
if key in stats:
|
||||
getattr(self, metric_type)(
|
||||
dd_key, int(stats[key]), dimensions=container_dimensions)
|
||||
def _report_cgroup_memory(self, container_id, container_dimensions, system_memory_limit):
|
||||
stat_file = self._get_cgroup_file('memory', container_id, 'memory.stat')
|
||||
stats = self._parse_cgroup_pairs(stat_file)
|
||||
|
||||
@staticmethod
|
||||
def _make_tag(key, value):
|
||||
return "%s:%s" % (key.lower(), value.strip())
|
||||
cache_memory = stats['cache']
|
||||
rss_memory = stats['rss']
|
||||
self.gauge('container.mem.cache', cache_memory, dimensions=container_dimensions)
|
||||
self.gauge('container.mem.rss', rss_memory, dimensions=container_dimensions)
|
||||
|
||||
@staticmethod
|
||||
def _is_container_included(instance, tags):
|
||||
def _is_tag_included(tag):
|
||||
for exclude_rule in instance.get("exclude") or []:
|
||||
if re.match(exclude_rule, tag):
|
||||
for include_rule in instance.get("include") or []:
|
||||
if re.match(include_rule, tag):
|
||||
return True
|
||||
return False
|
||||
return True
|
||||
for tag in tags:
|
||||
if _is_tag_included(tag):
|
||||
return True
|
||||
return False
|
||||
swap_memory = 0
|
||||
if 'swap' in stats:
|
||||
swap_memory = stats['swap']
|
||||
self.gauge('container.mem.swap', swap_memory, dimensions=container_dimensions)
|
||||
|
||||
def _get_containers(self, instance):
|
||||
"""Gets the list of running containers in Docker.
|
||||
# Get container max memory
|
||||
memory_limit_file = self._get_cgroup_file('memory', container_id, 'memory.limit_in_bytes')
|
||||
memory_limit = self._parse_cgroup_value(memory_limit_file, convert=float)
|
||||
if memory_limit > system_memory_limit:
|
||||
memory_limit = float(system_memory_limit)
|
||||
used_perc = round((((cache_memory + rss_memory + swap_memory) / memory_limit) * 100), 2)
|
||||
self.gauge('container.mem.used_perc', used_perc, dimensions=container_dimensions)
|
||||
|
||||
"""
|
||||
return self._get_json("%(url)s/containers/json" % instance, params={"size": 1})
|
||||
def _report_cgroup_blkio(self, container_id, container_dimensions):
|
||||
stat_file = self._get_cgroup_file('blkio', container_id,
|
||||
'blkio.throttle.io_service_bytes')
|
||||
stats = self._parse_cgroup_blkio_metrics(stat_file)
|
||||
self._report_rate_gauge_metric('container.io.read_bytes', stats['io_read'], container_dimensions)
|
||||
self._report_rate_gauge_metric('container.io.write_bytes', stats['io_write'], container_dimensions)
|
||||
|
||||
def _get_container(self, instance, cid):
|
||||
"""Get container information from Docker, gived a container Id.
|
||||
def _report_cgroup_cpu_pct(self, container_id, container_dimensions):
|
||||
usage_file = self._get_cgroup_file('cpuacct', container_id, 'cpuacct.usage')
|
||||
|
||||
"""
|
||||
return self._get_json("%s/containers/%s/json" % (instance["url"], cid))
|
||||
prev_cpu = self._prev_cpu.get(container_id, None)
|
||||
curr_cpu = self._parse_cgroup_value(usage_file)
|
||||
self._prev_cpu[container_id] = curr_cpu
|
||||
|
||||
def _get_json(self, uri, params=None):
|
||||
"""Utility method to get and parse JSON streams.
|
||||
if prev_cpu is None:
|
||||
# probably first run, we need 2 data points
|
||||
return
|
||||
|
||||
"""
|
||||
if params:
|
||||
uri = "%s?%s" % (uri, urllib.urlencode(params))
|
||||
self.log.debug("Connecting to: %s" % uri)
|
||||
req = urllib2.Request(uri, None)
|
||||
system_cpu_delta = float(self._curr_system_cpu - self._prev_system_cpu)
|
||||
container_cpu_delta = float(curr_cpu - prev_cpu)
|
||||
if system_cpu_delta > 0 and container_cpu_delta > 0:
|
||||
cpu_pct = (container_cpu_delta / system_cpu_delta) * self._cpu_count * 100
|
||||
self.gauge('container.cpu.utilization_perc', cpu_pct, dimensions=container_dimensions)
|
||||
|
||||
def _report_net_metrics(self, container, container_dimensions):
|
||||
"""Find container network metrics by looking at /proc/$PID/net/dev of the container process."""
|
||||
proc_net_file = os.path.join(container['_proc_root'], 'net/dev')
|
||||
try:
|
||||
request = urllib2.urlopen(req)
|
||||
except urllib2.URLError as e:
|
||||
if "Errno 13" in str(e):
|
||||
raise Exception(
|
||||
"Unable to connect to socket. dd-agent user must be part of the 'docker' group")
|
||||
raise
|
||||
response = request.read()
|
||||
return json.loads(response)
|
||||
with open(proc_net_file, 'r') as f:
|
||||
lines = f.readlines()
|
||||
"""Two first lines are headers:
|
||||
Inter-| Receive | Transmit
|
||||
face |bytes packets errs drop fifo frame compressed multicast|bytes packets errs drop fifo colls carrier compressed
|
||||
"""
|
||||
for line in lines[2:]:
|
||||
cols = line.split(':', 1)
|
||||
interface_name = str(cols[0]).strip()
|
||||
if interface_name != 'lo':
|
||||
container_network_dimensions = container_dimensions.copy()
|
||||
container_network_dimensions['interface'] = interface_name
|
||||
network_values = cols[1].split()
|
||||
self._report_rate_gauge_metric("container.net.in_bytes", long(network_values[0]),
|
||||
container_network_dimensions)
|
||||
self._report_rate_gauge_metric("container.net.out_bytes", long(network_values[8]),
|
||||
container_network_dimensions)
|
||||
break
|
||||
except Exception as e:
|
||||
self.log.error("Failed to report network metrics from file {0}. Exception: {1}".format(proc_net_file, e))
|
||||
|
||||
@staticmethod
|
||||
def _find_cgroup(hierarchy):
|
||||
"""Finds the mount point for a specified cgroup hierarchy.
|
||||
# Docker API
|
||||
def _get_containers(self, docker_client):
|
||||
"""Gets the list of running containers in Docker."""
|
||||
return docker_client.containers()
|
||||
|
||||
Works with old style and new style mounts.
|
||||
def _find_cgroup_filename_pattern(self, container_id):
|
||||
# We try with different cgroups so that it works even if only one is properly working
|
||||
for mountpoint in self._mount_points.itervalues():
|
||||
stat_file_path_lxc = os.path.join(mountpoint, "lxc")
|
||||
stat_file_path_docker = os.path.join(mountpoint, "docker")
|
||||
stat_file_path_coreos = os.path.join(mountpoint, "system.slice")
|
||||
stat_file_path_kubernetes = os.path.join(mountpoint, container_id)
|
||||
stat_file_path_kubernetes_docker = os.path.join(mountpoint, "system", "docker", container_id)
|
||||
stat_file_path_docker_daemon = os.path.join(mountpoint, "docker-daemon", "docker", container_id)
|
||||
|
||||
if os.path.exists(stat_file_path_lxc):
|
||||
return '%(mountpoint)s/lxc/%(id)s/%(file)s'
|
||||
elif os.path.exists(stat_file_path_docker):
|
||||
return '%(mountpoint)s/docker/%(id)s/%(file)s'
|
||||
elif os.path.exists(stat_file_path_coreos):
|
||||
return '%(mountpoint)s/system.slice/docker-%(id)s.scope/%(file)s'
|
||||
elif os.path.exists(stat_file_path_kubernetes):
|
||||
return '%(mountpoint)s/%(id)s/%(file)s'
|
||||
elif os.path.exists(stat_file_path_kubernetes_docker):
|
||||
return '%(mountpoint)s/system/docker/%(id)s/%(file)s'
|
||||
elif os.path.exists(stat_file_path_docker_daemon):
|
||||
return '%(mountpoint)s/docker-daemon/docker/%(id)s/%(file)s'
|
||||
|
||||
raise Exception("Cannot find Docker cgroup directory. Be sure your system is supported.")
|
||||
|
||||
def _get_cgroup_file(self, cgroup, container_id, filename):
|
||||
# This can't be initialized at startup because cgroups may not be mounted yet
|
||||
if not self._cgroup_filename_pattern:
|
||||
self._cgroup_filename_pattern = self._find_cgroup_filename_pattern(container_id)
|
||||
|
||||
return self._cgroup_filename_pattern % (dict(
|
||||
mountpoint=self._mount_points[cgroup],
|
||||
id=container_id,
|
||||
file=filename,
|
||||
))
|
||||
|
||||
def _get_total_memory(self):
|
||||
with open(os.path.join(self.docker_root, '/proc/meminfo')) as f:
|
||||
for line in f.readlines():
|
||||
tokens = line.split()
|
||||
if tokens[0] == 'MemTotal:':
|
||||
return int(tokens[1]) * 1024
|
||||
|
||||
raise Exception('Invalid formatting in /proc/meminfo: unable to '
|
||||
'determine MemTotal')
|
||||
|
||||
def _get_system_cpu_ns(self):
|
||||
# see also: getSystemCPUUsage of docker's stats_collector_unix.go
|
||||
total_jiffies = None
|
||||
cpu_count = 0
|
||||
|
||||
with open(os.path.join(self.docker_root, '/proc/stat'), 'r') as f:
|
||||
for line in f.readlines():
|
||||
tokens = line.split()
|
||||
|
||||
if tokens[0] == 'cpu':
|
||||
if len(tokens) < 8:
|
||||
raise Exception("Invalid formatting in /proc/stat")
|
||||
|
||||
total_jiffies = sum(map(lambda t: int(t), tokens[1:8]))
|
||||
elif tokens[0].startswith('cpu'):
|
||||
# startswith but does not equal implies /cpu\d+/ or so
|
||||
# we don't need full per-cpu usage to calculate %,
|
||||
# so just count cores
|
||||
cpu_count += 1
|
||||
|
||||
if not total_jiffies:
|
||||
raise Exception("Unable to find CPU usage in /proc/stat")
|
||||
|
||||
cpu_time_ns = (total_jiffies / JIFFY_HZ) * 1e9
|
||||
return cpu_time_ns, cpu_count
|
||||
|
||||
def _find_cgroup(self, hierarchy):
|
||||
"""Finds the mount point for a specified cgroup hierarchy. Works with
|
||||
old style and new style mounts.
|
||||
"""
|
||||
try:
|
||||
fp = open("/proc/mounts")
|
||||
mounts = map(lambda x: x.split(), fp.read().splitlines())
|
||||
finally:
|
||||
fp.close()
|
||||
with open(os.path.join(self.docker_root, "/proc/mounts"), 'r') as f:
|
||||
mounts = map(lambda x: x.split(), f.read().splitlines())
|
||||
|
||||
cgroup_mounts = filter(lambda x: x[2] == "cgroup", mounts)
|
||||
if len(cgroup_mounts) == 0:
|
||||
raise Exception("Can't find mounted cgroups. If you run the Agent inside a container,"
|
||||
" please refer to the documentation.")
|
||||
# Old cgroup style
|
||||
if len(cgroup_mounts) == 1:
|
||||
return cgroup_mounts[0][1]
|
||||
return os.path.join(self.docker_root, cgroup_mounts[0][1])
|
||||
|
||||
candidate = None
|
||||
for _, mountpoint, _, opts, _, _ in cgroup_mounts:
|
||||
if hierarchy in opts:
|
||||
return mountpoint
|
||||
if mountpoint.startswith("/host/"):
|
||||
return os.path.join(self.docker_root, mountpoint)
|
||||
candidate = mountpoint
|
||||
if candidate is not None:
|
||||
return os.path.join(self.docker_root, candidate)
|
||||
raise Exception("Can't find mounted %s cgroups." % hierarchy)
|
||||
|
||||
def _parse_cgroup_file(self, file_):
|
||||
"""Parses a cgroup pseudo file for key/values.
|
||||
def _parse_cgroup_value(self, stat_file, convert=int):
|
||||
"""Parse a cgroup info file containing a single value."""
|
||||
with open(stat_file, 'r') as f:
|
||||
return convert(f.read().strip())
|
||||
|
||||
"""
|
||||
fp = None
|
||||
try:
|
||||
self.log.debug("Opening file: %s" % file_)
|
||||
def _parse_cgroup_pairs(self, stat_file, convert=int):
|
||||
"""Parse a cgroup file for key/values."""
|
||||
with open(stat_file, 'r') as f:
|
||||
split_lines = map(lambda x: x.split(' ', 1), f.readlines())
|
||||
return {k: convert(v) for k, v in split_lines}
|
||||
|
||||
def _parse_cgroup_blkio_metrics(self, stat_file):
|
||||
"""Parse the blkio metrics."""
|
||||
with open(stat_file, 'r') as f:
|
||||
stats = f.read().splitlines()
|
||||
metrics = {
|
||||
'io_read': 0,
|
||||
'io_write': 0,
|
||||
}
|
||||
for line in stats:
|
||||
if 'Read' in line:
|
||||
metrics['io_read'] += int(line.split()[2])
|
||||
if 'Write' in line:
|
||||
metrics['io_write'] += int(line.split()[2])
|
||||
return metrics
|
||||
|
||||
# checking if cgroup is a container cgroup
|
||||
def _is_container_cgroup(self, line, selinux_policy):
|
||||
if line[1] not in ('cpu,cpuacct', 'cpuacct,cpu', 'cpuacct') or line[2] == '/docker-daemon':
|
||||
return False
|
||||
if 'docker' in line[2]:
|
||||
return True
|
||||
if 'docker' in selinux_policy:
|
||||
return True
|
||||
if line[2].startswith('/') and re.match(CONTAINER_ID_RE, line[2][1:]): # kubernetes
|
||||
return True
|
||||
return False
|
||||
|
||||
def _set_container_pids(self, containers):
|
||||
"""Find all proc paths for running containers."""
|
||||
proc_path = os.path.join(self.docker_root, 'proc')
|
||||
pid_dirs = [_dir for _dir in os.listdir(proc_path) if _dir.isdigit()]
|
||||
|
||||
for pid_dir in pid_dirs:
|
||||
try:
|
||||
fp = open(file_)
|
||||
except IOError:
|
||||
raise IOError(
|
||||
"Can't open %s. If you are using Docker 0.9.0 or higher, the Datadog agent is not yet compatible with these versions. Please get in touch with Datadog Support for more information" %
|
||||
file_)
|
||||
return dict(map(lambda x: x.split(), fp.read().splitlines()))
|
||||
path = os.path.join(proc_path, pid_dir, 'cgroup')
|
||||
with open(path, 'r') as f:
|
||||
content = [line.strip().split(':') for line in f.readlines()]
|
||||
|
||||
finally:
|
||||
if fp is not None:
|
||||
fp.close()
|
||||
selinux_policy = ''
|
||||
path = os.path.join(proc_path, pid_dir, 'attr', 'current')
|
||||
if os.path.exists(path):
|
||||
with open(path, 'r') as f:
|
||||
selinux_policy = f.readlines()[0]
|
||||
except IOError as e:
|
||||
self.log.debug("Cannot read %s, "
|
||||
"process likely raced to finish : %s" %
|
||||
(path, str(e)))
|
||||
continue
|
||||
except Exception as e:
|
||||
self.log.warning("Cannot read %s : %s" % (path, str(e)))
|
||||
continue
|
||||
|
||||
try:
|
||||
cpuacct = None
|
||||
for line in content:
|
||||
if self._is_container_cgroup(line, selinux_policy):
|
||||
cpuacct = line[2]
|
||||
break
|
||||
matches = re.findall(CONTAINER_ID_RE, cpuacct) if cpuacct else None
|
||||
if matches:
|
||||
container_id = matches[-1]
|
||||
if container_id not in containers:
|
||||
self.log.debug("Container %s not in container_dict, it's likely excluded", container_id)
|
||||
continue
|
||||
containers[container_id]['_pid'] = pid_dir
|
||||
containers[container_id]['_proc_root'] = os.path.join(proc_path, pid_dir)
|
||||
except Exception as e:
|
||||
self.log.warning("Cannot parse %s content: %s" % (path, str(e)))
|
||||
continue
|
||||
|
Loading…
Reference in New Issue
Block a user