Improve docker plugin

Rewrote the plugin to look at the cgroups and docker api to
generate metrics for docker containers

Added using the docker python library to connect to
docker

Added the ability to set kubernetes specific dimensions if
running in a kubernetes cluster

Change-Id: I322e1d983c3ec554732fbfe0e14ccb9e6b3b29f5
Co-Authored-By: Tim Buckley <timothy.jas.buckley@hpe.com>
This commit is contained in:
Michael James Hoppal 2016-11-02 16:17:59 -06:00
parent 1544d4cb49
commit e7ca18385f
3 changed files with 391 additions and 240 deletions

View File

@ -1,34 +1,20 @@
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2016 Hewlett Packard Enterprise Development LP
# Warning
# The user running the Datadog Agent (usually "dd-agent") must be part of the "docker" group
# The user running the monasca-agent (usually "mon-agent") must be part of the "docker" group
init_config:
docker_root: /
# Timeout on Docker api connection. You may have to increase it if you have many containers.
# connection_timeout: 3
# docker version to use when connecting to the docker api. Default is auto
# version: "auto"
instances:
- url: "unix://var/run/docker.sock"
# URL of the Docker daemon socket to reach the Docker API. HTTP also works.
- url: "unix://var/run/docker.sock"
# Include/Exclude rules
#
# To include or exclude containers based on their docker tags, use the include and
# exclude keys in your instance.
# The reasoning is: if a tag matches an exclude rule, it won't be included
# unless it also matches an include rule.
#
# Examples:
# exclude all, except ubuntu and debian:
# instances:
# - url: "unix://var/run/docker.sock"
# include:
# - "image:ubuntu"
# - "image:debian"
# exclude:
# - ".*"
#
# include all, except ubuntu and debian:
# instances:
# - url: "unix://var/run/docker.sock"
# include: []
# exclude:
# - "image:ubuntu"
# - "image:debian"
# Set to true if you want the kubernetes namepsace and pod name to be set as dimensions
add_kubernetes_dimensions: True

View File

@ -644,6 +644,61 @@ The directory checks return the following metrics:
| directory.files_count | path, hostname, service |
## Docker
This plugin gathers metrics on docker containers.
A YAML file (docker.yaml) contains the url of the docker api to connect to and the root of docker that is used for looking for docker proc metrics.
For this check the user that is running the monasca agent (usually the mon-agent user) must be a part of the docker group
Also if you want to want to attach kubernetes dimensions to each metric you can set add_kubernetes_dimensions to true in the yaml file. This will set the pod_name and namespace.
Sample config:
Without kubernetes dimensions
```
init_config:
docker_root: /
socket_timeout: 5
instances:
- url: "unix://var/run/docker.sock"
```
With kubernetes dimensions
```
init_config:
docker_root: /
socket_timeout: 5
instances:
- url: "unix://var/run/docker.sock"
add_kubernetes_dimensions: True
```
Note this plugin only supports one instance in the config file.
The docker check return the following metrics:
| Metric Name | Metric Type | Dimensions | Optional_dimensions (set if add_kubernetes_dimensions is true and container is running under kubernetes) | Semantics |
| ----------- | ---------- | --------- |
| container.containers.running_count | Gauge | hostname | | Number of containers running on the host |
| container.cpu.system_time | Gauge| hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The total time the CPU has executed system calls on behalf of the processes in the container |
| container.cpu.system_time_sec | Rate | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The rate the CPU is executing system calls on behalf of the processes in the container |
| container.cpu.user_time | Gauge | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The total time the CPU is under direct control of the processes in this container |
| container.cpu.user_time_sec | Rate | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The rate the CPU is under direct control of the processes in this container |
| container.cpu.utilization_perc | Gauge | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The percentage of CPU used by the container |
| container.io.read_bytes | Gauge | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The total amount bytes read from the processes in the container |
| container.io.read_bytes_sec | Rate | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The rate of bytes read from the processes in the container |
| container.io.write_bytes | Gauge | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The total amount bytes written from the processes in the container |
| container.io.write_bytes_sec | Rate | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The rate of bytes written from the processes in the container |
| container.mem.cache | Gauge | hostname, name, image | kubernetes_pod_name, kubernetes_namespace |The amount of cached memory that belongs to the container's processes |
| container.mem.rss | Gauge | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The amount of non-cached memory used by the container's processes |
| container.mem.swap | Gauge | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The amount of swap memory used by the processes in the container |
| container.mem.used_perc | Gauge | hostname, name, image | kubernetes_pod_name, kubernetes_namespace | The percentage of memory used out of the given limit of the container |
| container.net.in_bytes | Gauge | hostname, name, image, interface | kubernetes_pod_name, kubernetes_namespace | The total amount of bytes received by the container per interface |
| container.net.in_bytes_sec | Rate | hostname, name, image, interface | kubernetes_pod_name, kubernetes_namespace | The rate of bytes received by the container per interface |
| container.net.out_bytes | Gauge | hostname, name, image, interface | kubernetes_pod_name, kubernetes_namespace | The total amount of bytes sent by the container per interface |
| container.net.out_bytes_sec | Rate | hostname, name, image, interface | kubernetes_pod_name, kubernetes_namespace | The rate of bytes sent by the container per interface |
## Elasticsearch Checks
This section describes the Elasticsearch check that can be performed by the Agent. The Elasticsearch check requires a configuration file called elastic.yaml to be available in the agent conf.d configuration directory.

View File

@ -1,252 +1,362 @@
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development LP
from __future__ import absolute_import
import httplib
import json
import os
import re
import socket
import urllib
import urllib2
from urlparse import urlsplit
from monasca_agent.collector.checks import AgentCheck
import docker
from monasca_agent.collector import checks
CONTAINER_ID_RE = re.compile('[0-9a-f]{64}')
DEFAULT_BASE_URL = "unix://var/run/docker.sock"
DEFAULT_VERSION = "auto"
DEFAULT_TIMEOUT = 3
DEFAULT_ADD_KUBERNETES_DIMENSIONS = False
JIFFY_HZ = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
CGROUPS = ['cpuacct', 'memory', 'blkio']
DEFAULT_MAX_CONTAINERS = 20
class Docker(checks.AgentCheck):
"""Collect metrics and events from Docker API and cgroups"""
LXC_METRICS = [
{
"cgroup": "memory",
"file": "lxc/%s/memory.stat",
"metrics": {
"active_anon": ("docker.mem.active_anon", "gauge"),
"active_file": ("docker.mem.active_file", "gauge"),
"cache": ("docker.mem.cache", "gauge"),
"hierarchical_memory_limit": ("docker.mem.hierarchical_memory_limit", "gauge"),
"hierarchical_memsw_limit": ("docker.mem.hierarchical_memsw_limit", "gauge"),
"inactive_anon": ("docker.mem.inactive_anon", "gauge"),
"inactive_file": ("docker.mem.inactive_file", "gauge"),
"mapped_file": ("docker.mem.mapped_file", "gauge"),
"pgfault": ("docker.mem.pgfault", "gauge"),
"pgmajfault": ("docker.mem.pgmajfault", "gauge"),
"pgpgin": ("docker.mem.pgpgin", "gauge"),
"pgpgout": ("docker.mem.pgpgout", "gauge"),
"rss": ("docker.mem.rss", "gauge"),
"swap": ("docker.mem.swap", "gauge"),
"unevictable": ("docker.mem.unevictable", "gauge"),
"total_active_anon": ("docker.mem.total_active_anon", "gauge"),
"total_active_file": ("docker.mem.total_active_file", "gauge"),
"total_cache": ("docker.mem.total_cache", "gauge"),
"total_inactive_anon": ("docker.mem.total_inactive_anon", "gauge"),
"total_inactive_file": ("docker.mem.total_inactive_file", "gauge"),
"total_mapped_file": ("docker.mem.total_mapped_file", "gauge"),
"total_pgfault": ("docker.mem.total_pgfault", "gauge"),
"total_pgmajfault": ("docker.mem.total_pgmajfault", "gauge"),
"total_pgpgin": ("docker.mem.total_pgpgin", "gauge"),
"total_pgpgout": ("docker.mem.total_pgpgout", "gauge"),
"total_rss": ("docker.mem.total_rss", "gauge"),
"total_swap": ("docker.mem.total_swap", "gauge"),
"total_unevictable": ("docker.mem.total_unevictable", "gauge"),
}
},
{
"cgroup": "cpuacct",
"file": "lxc/%s/cpuacct.stat",
"metrics": {
"user": ("docker.cpu.user", "gauge"),
"system": ("docker.cpu.system", "gauge"),
},
},
]
def __init__(self, name, init_config, agent_config, instances=None):
checks.AgentCheck.__init__(self, name, init_config, agent_config, instances)
DOCKER_METRICS = {
"SizeRw": ("docker.disk.size", "gauge"),
}
if instances is not None and len(instances) > 1:
raise Exception('Docker check only supports one configured instance.')
DOCKER_TAGS = [
"Command",
"Image",
]
class UnixHTTPConnection(httplib.HTTPConnection, object):
"""Class used in conjunction with UnixSocketHandler to make urllib2
compatible with Unix sockets.
"""
def __init__(self, unix_socket):
self._unix_socket = unix_socket
def connect(self):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(self._unix_socket)
self.sock = sock
def __call__(self, *args, **kwargs):
httplib.HTTPConnection.__init__(self, *args, **kwargs)
return self
class UnixSocketHandler(urllib2.AbstractHTTPHandler):
"""Class that makes Unix sockets work with urllib2 without any additional dependencies.
"""
def unix_open(self, req):
full_path = "%s%s" % urlsplit(req.get_full_url())[1:3]
path = os.path.sep
for part in full_path.split("/"):
path = os.path.join(path, part)
if not os.path.exists(path):
break
unix_socket = path
# add a host or else urllib2 complains
url = req.get_full_url().replace(unix_socket, "/localhost")
new_req = urllib2.Request(url, req.get_data(), dict(req.header_items()))
new_req.timeout = req.timeout
return self.do_open(UnixHTTPConnection(unix_socket), new_req)
unix_request = urllib2.AbstractHTTPHandler.do_request_
class Docker(AgentCheck):
def __init__(self, *args, **kwargs):
super(Docker, self).__init__(*args, **kwargs)
urllib2.install_opener(urllib2.build_opener(UnixSocketHandler()))
self._mounpoints = {}
for metric in LXC_METRICS:
self._mounpoints[metric["cgroup"]] = self._find_cgroup(metric["cgroup"])
self.connection_timeout = int(init_config.get('connection_timeout', DEFAULT_TIMEOUT))
self.docker_version = init_config.get('version', DEFAULT_VERSION)
self.docker_root = init_config.get('docker_root', '/')
# Locate cgroups directories
self._mount_points = {}
self._cgroup_filename_pattern = None
for cgroup in CGROUPS:
self._mount_points[cgroup] = self._find_cgroup(cgroup)
self._prev_cpu = {}
self._curr_cpu = {}
self._cpu_count = None
self._prev_system_cpu = None
def check(self, instance):
docker_url = instance.get('url', DEFAULT_BASE_URL)
try:
docker_client = docker.Client(base_url=docker_url, version=self.docker_version,
timeout=self.connection_timeout)
running_containers = {container['Id']: container for container in self._get_containers(docker_client)}
except Exception as e:
self.log.error("Could not get containers from Docker API skipping Docker check - {}".format(e))
return
add_kubernetes_dimensions = instance.get('add_kubernetes_dimensions', DEFAULT_ADD_KUBERNETES_DIMENSIONS)
dimensions = self._set_dimensions(None, instance)
containers = self._get_containers(instance)
if not containers:
self.log.warn("No containers are running.")
self.gauge("container.running_count", len(running_containers), dimensions=dimensions)
self._set_container_pids(running_containers)
# Report container metrics from cgroups
self._report_container_metrics(running_containers, add_kubernetes_dimensions, dimensions)
max_containers = instance.get('max_containers', DEFAULT_MAX_CONTAINERS)
def _report_rate_gauge_metric(self, metric_name, value, dimensions):
self.rate(metric_name + "_sec", value, dimensions=dimensions)
self.gauge(metric_name, value, dimensions=dimensions)
if not instance.get("exclude") or not instance.get("include"):
if len(containers) > max_containers:
self.log.warn("Too many containers to collect. Please refine the containers to collect by editing the "
"configuration file. Truncating to %s containers" % max_containers)
containers = containers[:max_containers]
def _report_container_metrics(self, container_dict, add_kubernetes_dimensions, dimensions):
self._curr_system_cpu, self._cpu_count = self._get_system_cpu_ns()
system_memory = self._get_total_memory()
for container in container_dict.itervalues():
try:
container_dimensions = dimensions.copy()
container_id = container['Id']
container['name'] = self._get_container_name(container['Names'], container_id)
container_dimensions['image'] = container['Image']
container_labels = container['Labels']
if add_kubernetes_dimensions:
if 'io.kubernetes.pod.name' in container_labels:
container_dimensions['kubernetes_pod_name'] = container_labels['io.kubernetes.pod.name']
if 'io.kubernetes.pod.namespace' in container_labels:
container_dimensions['kubernetes_namespace'] = container_labels['io.kubernetes.pod.namespace']
self._report_cgroup_cpuacct(container_id, container_dimensions)
self._report_cgroup_memory(container_id, container_dimensions, system_memory)
self._report_cgroup_blkio(container_id, container_dimensions)
if "_proc_root" in container:
self._report_net_metrics(container, container_dimensions)
collected_containers = 0
for container in containers:
container_dimensions = dimensions.copy()
container_dimensions['container_names'] = ' '.join(container["Names"])
container_dimensions['docker_tags'] = ' '.join(DOCKER_TAGS)
self._report_cgroup_cpu_pct(container_id, container_dimensions)
except IOError as err:
# It is possible that the container got stopped between the
# API call and now
self.log.info("IO error while collecting cgroup metrics, "
"skipping container...", exc_info=err)
except Exception as err:
self.log.error("Error when collecting data about container {}".format(err))
self._prev_system_cpu = self._curr_system_cpu
# Check if the container is included/excluded
if not self._is_container_included(instance, container["Names"]):
continue
def _get_container_name(self, container_names, container_id):
container_name = None
if container_names:
for name in container_names:
# if there is more than one / the name is actually an alias
if name.count('/') <= 1:
container_name = str(name).lstrip('/')
break
return container_name if container_name else container_id
collected_containers += 1
if collected_containers > max_containers:
self.log.warn("Too many containers are matching the current configuration. Some containers will not "
"be collected. Please refine your configuration")
break
def _report_cgroup_cpuacct(self, container_id, container_dimensions):
stat_file = self._get_cgroup_file('cpuacct', container_id, 'cpuacct.stat')
stats = self._parse_cgroup_pairs(stat_file)
self._report_rate_gauge_metric('container.cpu.user_time', stats['user'], container_dimensions)
self._report_rate_gauge_metric('container.cpu.system_time', stats['system'], container_dimensions)
for key, (dd_key, metric_type) in DOCKER_METRICS.items():
if key in container:
getattr(self, metric_type)(
dd_key, int(container[key]), dimensions=container_dimensions)
for metric in LXC_METRICS:
mountpoint = self._mounpoints[metric["cgroup"]]
stat_file = os.path.join(mountpoint, metric["file"] % container["Id"])
stats = self._parse_cgroup_file(stat_file)
for key, (dd_key, metric_type) in metric["metrics"].items():
if key in stats:
getattr(self, metric_type)(
dd_key, int(stats[key]), dimensions=container_dimensions)
def _report_cgroup_memory(self, container_id, container_dimensions, system_memory_limit):
stat_file = self._get_cgroup_file('memory', container_id, 'memory.stat')
stats = self._parse_cgroup_pairs(stat_file)
@staticmethod
def _make_tag(key, value):
return "%s:%s" % (key.lower(), value.strip())
cache_memory = stats['cache']
rss_memory = stats['rss']
self.gauge('container.mem.cache', cache_memory, dimensions=container_dimensions)
self.gauge('container.mem.rss', rss_memory, dimensions=container_dimensions)
@staticmethod
def _is_container_included(instance, tags):
def _is_tag_included(tag):
for exclude_rule in instance.get("exclude") or []:
if re.match(exclude_rule, tag):
for include_rule in instance.get("include") or []:
if re.match(include_rule, tag):
return True
return False
return True
for tag in tags:
if _is_tag_included(tag):
return True
return False
swap_memory = 0
if 'swap' in stats:
swap_memory = stats['swap']
self.gauge('container.mem.swap', swap_memory, dimensions=container_dimensions)
def _get_containers(self, instance):
"""Gets the list of running containers in Docker.
# Get container max memory
memory_limit_file = self._get_cgroup_file('memory', container_id, 'memory.limit_in_bytes')
memory_limit = self._parse_cgroup_value(memory_limit_file, convert=float)
if memory_limit > system_memory_limit:
memory_limit = float(system_memory_limit)
used_perc = round((((cache_memory + rss_memory + swap_memory) / memory_limit) * 100), 2)
self.gauge('container.mem.used_perc', used_perc, dimensions=container_dimensions)
"""
return self._get_json("%(url)s/containers/json" % instance, params={"size": 1})
def _report_cgroup_blkio(self, container_id, container_dimensions):
stat_file = self._get_cgroup_file('blkio', container_id,
'blkio.throttle.io_service_bytes')
stats = self._parse_cgroup_blkio_metrics(stat_file)
self._report_rate_gauge_metric('container.io.read_bytes', stats['io_read'], container_dimensions)
self._report_rate_gauge_metric('container.io.write_bytes', stats['io_write'], container_dimensions)
def _get_container(self, instance, cid):
"""Get container information from Docker, gived a container Id.
def _report_cgroup_cpu_pct(self, container_id, container_dimensions):
usage_file = self._get_cgroup_file('cpuacct', container_id, 'cpuacct.usage')
"""
return self._get_json("%s/containers/%s/json" % (instance["url"], cid))
prev_cpu = self._prev_cpu.get(container_id, None)
curr_cpu = self._parse_cgroup_value(usage_file)
self._prev_cpu[container_id] = curr_cpu
def _get_json(self, uri, params=None):
"""Utility method to get and parse JSON streams.
if prev_cpu is None:
# probably first run, we need 2 data points
return
"""
if params:
uri = "%s?%s" % (uri, urllib.urlencode(params))
self.log.debug("Connecting to: %s" % uri)
req = urllib2.Request(uri, None)
system_cpu_delta = float(self._curr_system_cpu - self._prev_system_cpu)
container_cpu_delta = float(curr_cpu - prev_cpu)
if system_cpu_delta > 0 and container_cpu_delta > 0:
cpu_pct = (container_cpu_delta / system_cpu_delta) * self._cpu_count * 100
self.gauge('container.cpu.utilization_perc', cpu_pct, dimensions=container_dimensions)
def _report_net_metrics(self, container, container_dimensions):
"""Find container network metrics by looking at /proc/$PID/net/dev of the container process."""
proc_net_file = os.path.join(container['_proc_root'], 'net/dev')
try:
request = urllib2.urlopen(req)
except urllib2.URLError as e:
if "Errno 13" in str(e):
raise Exception(
"Unable to connect to socket. dd-agent user must be part of the 'docker' group")
raise
response = request.read()
return json.loads(response)
with open(proc_net_file, 'r') as f:
lines = f.readlines()
"""Two first lines are headers:
Inter-| Receive | Transmit
face |bytes packets errs drop fifo frame compressed multicast|bytes packets errs drop fifo colls carrier compressed
"""
for line in lines[2:]:
cols = line.split(':', 1)
interface_name = str(cols[0]).strip()
if interface_name != 'lo':
container_network_dimensions = container_dimensions.copy()
container_network_dimensions['interface'] = interface_name
network_values = cols[1].split()
self._report_rate_gauge_metric("container.net.in_bytes", long(network_values[0]),
container_network_dimensions)
self._report_rate_gauge_metric("container.net.out_bytes", long(network_values[8]),
container_network_dimensions)
break
except Exception as e:
self.log.error("Failed to report network metrics from file {0}. Exception: {1}".format(proc_net_file, e))
@staticmethod
def _find_cgroup(hierarchy):
"""Finds the mount point for a specified cgroup hierarchy.
# Docker API
def _get_containers(self, docker_client):
"""Gets the list of running containers in Docker."""
return docker_client.containers()
Works with old style and new style mounts.
def _find_cgroup_filename_pattern(self, container_id):
# We try with different cgroups so that it works even if only one is properly working
for mountpoint in self._mount_points.itervalues():
stat_file_path_lxc = os.path.join(mountpoint, "lxc")
stat_file_path_docker = os.path.join(mountpoint, "docker")
stat_file_path_coreos = os.path.join(mountpoint, "system.slice")
stat_file_path_kubernetes = os.path.join(mountpoint, container_id)
stat_file_path_kubernetes_docker = os.path.join(mountpoint, "system", "docker", container_id)
stat_file_path_docker_daemon = os.path.join(mountpoint, "docker-daemon", "docker", container_id)
if os.path.exists(stat_file_path_lxc):
return '%(mountpoint)s/lxc/%(id)s/%(file)s'
elif os.path.exists(stat_file_path_docker):
return '%(mountpoint)s/docker/%(id)s/%(file)s'
elif os.path.exists(stat_file_path_coreos):
return '%(mountpoint)s/system.slice/docker-%(id)s.scope/%(file)s'
elif os.path.exists(stat_file_path_kubernetes):
return '%(mountpoint)s/%(id)s/%(file)s'
elif os.path.exists(stat_file_path_kubernetes_docker):
return '%(mountpoint)s/system/docker/%(id)s/%(file)s'
elif os.path.exists(stat_file_path_docker_daemon):
return '%(mountpoint)s/docker-daemon/docker/%(id)s/%(file)s'
raise Exception("Cannot find Docker cgroup directory. Be sure your system is supported.")
def _get_cgroup_file(self, cgroup, container_id, filename):
# This can't be initialized at startup because cgroups may not be mounted yet
if not self._cgroup_filename_pattern:
self._cgroup_filename_pattern = self._find_cgroup_filename_pattern(container_id)
return self._cgroup_filename_pattern % (dict(
mountpoint=self._mount_points[cgroup],
id=container_id,
file=filename,
))
def _get_total_memory(self):
with open(os.path.join(self.docker_root, '/proc/meminfo')) as f:
for line in f.readlines():
tokens = line.split()
if tokens[0] == 'MemTotal:':
return int(tokens[1]) * 1024
raise Exception('Invalid formatting in /proc/meminfo: unable to '
'determine MemTotal')
def _get_system_cpu_ns(self):
# see also: getSystemCPUUsage of docker's stats_collector_unix.go
total_jiffies = None
cpu_count = 0
with open(os.path.join(self.docker_root, '/proc/stat'), 'r') as f:
for line in f.readlines():
tokens = line.split()
if tokens[0] == 'cpu':
if len(tokens) < 8:
raise Exception("Invalid formatting in /proc/stat")
total_jiffies = sum(map(lambda t: int(t), tokens[1:8]))
elif tokens[0].startswith('cpu'):
# startswith but does not equal implies /cpu\d+/ or so
# we don't need full per-cpu usage to calculate %,
# so just count cores
cpu_count += 1
if not total_jiffies:
raise Exception("Unable to find CPU usage in /proc/stat")
cpu_time_ns = (total_jiffies / JIFFY_HZ) * 1e9
return cpu_time_ns, cpu_count
def _find_cgroup(self, hierarchy):
"""Finds the mount point for a specified cgroup hierarchy. Works with
old style and new style mounts.
"""
try:
fp = open("/proc/mounts")
mounts = map(lambda x: x.split(), fp.read().splitlines())
finally:
fp.close()
with open(os.path.join(self.docker_root, "/proc/mounts"), 'r') as f:
mounts = map(lambda x: x.split(), f.read().splitlines())
cgroup_mounts = filter(lambda x: x[2] == "cgroup", mounts)
if len(cgroup_mounts) == 0:
raise Exception("Can't find mounted cgroups. If you run the Agent inside a container,"
" please refer to the documentation.")
# Old cgroup style
if len(cgroup_mounts) == 1:
return cgroup_mounts[0][1]
return os.path.join(self.docker_root, cgroup_mounts[0][1])
candidate = None
for _, mountpoint, _, opts, _, _ in cgroup_mounts:
if hierarchy in opts:
return mountpoint
if mountpoint.startswith("/host/"):
return os.path.join(self.docker_root, mountpoint)
candidate = mountpoint
if candidate is not None:
return os.path.join(self.docker_root, candidate)
raise Exception("Can't find mounted %s cgroups." % hierarchy)
def _parse_cgroup_file(self, file_):
"""Parses a cgroup pseudo file for key/values.
def _parse_cgroup_value(self, stat_file, convert=int):
"""Parse a cgroup info file containing a single value."""
with open(stat_file, 'r') as f:
return convert(f.read().strip())
"""
fp = None
try:
self.log.debug("Opening file: %s" % file_)
def _parse_cgroup_pairs(self, stat_file, convert=int):
"""Parse a cgroup file for key/values."""
with open(stat_file, 'r') as f:
split_lines = map(lambda x: x.split(' ', 1), f.readlines())
return {k: convert(v) for k, v in split_lines}
def _parse_cgroup_blkio_metrics(self, stat_file):
"""Parse the blkio metrics."""
with open(stat_file, 'r') as f:
stats = f.read().splitlines()
metrics = {
'io_read': 0,
'io_write': 0,
}
for line in stats:
if 'Read' in line:
metrics['io_read'] += int(line.split()[2])
if 'Write' in line:
metrics['io_write'] += int(line.split()[2])
return metrics
# checking if cgroup is a container cgroup
def _is_container_cgroup(self, line, selinux_policy):
if line[1] not in ('cpu,cpuacct', 'cpuacct,cpu', 'cpuacct') or line[2] == '/docker-daemon':
return False
if 'docker' in line[2]:
return True
if 'docker' in selinux_policy:
return True
if line[2].startswith('/') and re.match(CONTAINER_ID_RE, line[2][1:]): # kubernetes
return True
return False
def _set_container_pids(self, containers):
"""Find all proc paths for running containers."""
proc_path = os.path.join(self.docker_root, 'proc')
pid_dirs = [_dir for _dir in os.listdir(proc_path) if _dir.isdigit()]
for pid_dir in pid_dirs:
try:
fp = open(file_)
except IOError:
raise IOError(
"Can't open %s. If you are using Docker 0.9.0 or higher, the Datadog agent is not yet compatible with these versions. Please get in touch with Datadog Support for more information" %
file_)
return dict(map(lambda x: x.split(), fp.read().splitlines()))
path = os.path.join(proc_path, pid_dir, 'cgroup')
with open(path, 'r') as f:
content = [line.strip().split(':') for line in f.readlines()]
finally:
if fp is not None:
fp.close()
selinux_policy = ''
path = os.path.join(proc_path, pid_dir, 'attr', 'current')
if os.path.exists(path):
with open(path, 'r') as f:
selinux_policy = f.readlines()[0]
except IOError as e:
self.log.debug("Cannot read %s, "
"process likely raced to finish : %s" %
(path, str(e)))
continue
except Exception as e:
self.log.warning("Cannot read %s : %s" % (path, str(e)))
continue
try:
cpuacct = None
for line in content:
if self._is_container_cgroup(line, selinux_policy):
cpuacct = line[2]
break
matches = re.findall(CONTAINER_ID_RE, cpuacct) if cpuacct else None
if matches:
container_id = matches[-1]
if container_id not in containers:
self.log.debug("Container %s not in container_dict, it's likely excluded", container_id)
continue
containers[container_id]['_pid'] = pid_dir
containers[container_id]['_proc_root'] = os.path.join(proc_path, pid_dir)
except Exception as e:
self.log.warning("Cannot parse %s content: %s" % (path, str(e)))
continue