From 33c44b4a9f356a9fa9fbddd59e338175494c6d48 Mon Sep 17 00:00:00 2001 From: Maksym Naboka Date: Tue, 3 Jan 2017 15:00:15 -0800 Subject: [PATCH] switch to `dcos-log` logging backend (#817) with DC/OS 1.9 release all system and container logs will go to sd journal on each host. This PR adds dcos-log support to CLI. Before using newest API , the CLI will try to get the cosmos capability "LOGGING". If the cluster supports dcos-log, it will use it. If the requests fails or dcos-log is missing the LOGGING capability, we will default to mesos files API. --- cli/dcoscli/data/help/node.txt | 9 ++ cli/dcoscli/log.py | 92 ++++++++++- cli/dcoscli/node/main.py | 208 ++++++++++++++++++++++++- cli/dcoscli/service/main.py | 22 ++- cli/dcoscli/task/main.py | 171 +++++++++++++++++++- cli/setup.py | 3 +- cli/tests/data/help/node.txt | 9 ++ cli/tests/integrations/common.py | 10 +- cli/tests/integrations/test_node.py | 34 ++-- cli/tests/integrations/test_service.py | 10 +- cli/tests/integrations/test_task.py | 16 +- cli/tests/unit/test_node.py | 105 +++++++++++++ cli/tests/unit/test_task.py | 88 ++++++++++- dcos/sse.py | 19 +++ setup.py | 1 + 15 files changed, 752 insertions(+), 45 deletions(-) create mode 100644 dcos/sse.py diff --git a/cli/dcoscli/data/help/node.txt b/cli/dcoscli/data/help/node.txt index f0cfe39..6bc270b 100644 --- a/cli/dcoscli/data/help/node.txt +++ b/cli/dcoscli/data/help/node.txt @@ -6,6 +6,8 @@ Usage: dcos node --info dcos node [--json] dcos node log [--follow --lines=N --leader --master --mesos-id= --slave=] + [--component= --filter=...] + dcos node list-components [--leader --mesos-id= --json] dcos node ssh [--option SSHOPT=VAL ...] [--config-file=] [--user=] @@ -72,6 +74,13 @@ Options: If not set, default to present working directory. --version Print version information. + --list-components + Print a list of available DC/OS components on specified node. + --component= + Show DC/OS component logs. + --filter= + Filter logs by field and value. Filter must be a string separated by colon. + For example: --filter _PID:0 --filter _UID:1 Positional Arguments: diff --git a/cli/dcoscli/log.py b/cli/dcoscli/log.py index bc9d11b..c950790 100644 --- a/cli/dcoscli/log.py +++ b/cli/dcoscli/log.py @@ -1,12 +1,17 @@ +import contextlib +import datetime import functools +import json import sys import time import six -from dcos import emitting, util +from dcos import cosmospackage, emitting, http, sse, util from dcos.errors import DCOSException, DefaultError +from dcoscli.package.main import get_cosmos_url + logger = util.get_logger(__name__) emitter = emitting.FlatEmitter() @@ -201,3 +206,88 @@ def _strip_trailing_newline(s): return s else: return s[:-1] if s[-1] == '\n' else s + + +def dcos_log_enabled(): + """ functions checks the cosmos capability LOGGING + to know if `dcos-log` is enabled on the cluster. + + :return: does cosmos have LOGGING capability. + :rtype: bool + """ + return cosmospackage.Cosmos(get_cosmos_url()).has_capability('LOGGING') + + +def follow_logs(url): + """ Function will use dcos.sse.get to subscribe to server sent events + and follow the real time logs. The log entry has the following format: + `date _HOSTNAME SYSLOG_IDENTIFIER[_PID]: MESSAGE`, where + _HOSTNAME, SYSLOG_IDENTIFIER and _PID are optional fields. + MESSAGE is also optional, however we should skip the entire log entry + if MESSAGE is not found. + + :param url: `dcos-log` streaming endpoint + :type url: str + """ + for entry in sse.get(url): + # the sse library does not handle sse comments properly + # making entry.data empty. As a workaround we can check if `entry.data` + # is not empty. + if not entry.data: + continue + + try: + entry_json = json.loads(entry.data) + except ValueError: + raise DCOSException( + 'Could not deserialize log entry to json: {}'.format(entry)) + + if 'fields' not in entry_json: + raise DCOSException( + 'Missing `fields` in log entry: {}'.format(entry)) + + # `MESSAGE` is optional field. Skip the log entry if it's missing. + if 'MESSAGE' not in entry_json['fields']: + continue + + if 'realtime_timestamp' not in entry_json: + raise DCOSException( + 'Missing `realtime_timestamp` in log entry: {}'.format(entry)) + + # text format: `date _HOSTNAME SYSLOG_IDENTIFIER[_PID]: MESSAGE` + # entry.RealtimeTimestamp returns a unix time in microseconds + # https://www.freedesktop.org/software/systemd/man/sd_journal_get_realtime_usec.html + t = int(entry_json['realtime_timestamp'] / 1000000) + l = [datetime.datetime.fromtimestamp(t).ctime()] + + optional_fields = ['_HOSTNAME', 'SYSLOG_IDENTIFIER'] + for optional_field in optional_fields: + if optional_field in entry_json['fields']: + l.append(entry_json['fields'][optional_field]) + if '_PID' in entry_json['fields']: + l.append('[' + entry_json['fields']['_PID'] + ']') + line = ' '.join(l) + line += ': {}'.format(entry_json['fields']['MESSAGE']) + emitter.publish(line) + + +def print_logs_range(url): + """ Make a get request to `dcos-log` range endpoint. + the function will print out logs to stdout and exit. + + :param url: `dcos-log` endpoint + :type url: str + """ + with contextlib.closing( + http.get(url, headers={'Accept': 'text/plain'})) as r: + + if r.status_code == 204: + raise DCOSException('No logs found') + + if r.status_code != 200: + raise DCOSException( + 'Error getting logs. Url: {};' + 'response code: {}'.format(url, r.status_code)) + + for line in r.iter_lines(): + emitter.publish(line.decode('utf-8', 'ignore')) diff --git a/cli/dcoscli/node/main.py b/cli/dcoscli/node/main.py index 6cc68fa..206e1d0 100644 --- a/cli/dcoscli/node/main.py +++ b/cli/dcoscli/node/main.py @@ -8,7 +8,9 @@ from six.moves import urllib import dcoscli from dcos import (cmds, config, cosmospackage, emitting, errors, http, mesos, subprocess, util) -from dcos.errors import DCOSException, DefaultError +from dcos.errors import (DCOSAuthenticationException, + DCOSAuthorizationException, + DCOSException, DefaultError) from dcoscli import log, tables from dcoscli.package.main import confirm, get_cosmos_url from dcoscli.subcommand import default_command_info, default_doc @@ -20,6 +22,7 @@ emitter = emitting.FlatEmitter() DIAGNOSTICS_BASE_URL = '/system/health/v1/report/diagnostics/' + # if a bundle size if more then 100Mb then warn user. BUNDLE_WARN_SIZE = 1000000 @@ -65,9 +68,15 @@ def _cmds(): cmds.Command( hierarchy=['node', 'log'], - arg_keys=['--follow', '--lines', '--leader', '--mesos-id'], + arg_keys=['--follow', '--lines', '--leader', '--mesos-id', + '--component', '--filter'], function=_log), + cmds.Command( + hierarchy=['node', 'list-components'], + arg_keys=['--leader', '--mesos-id', '--json'], + function=_list_components), + cmds.Command( hierarchy=['node', 'ssh'], arg_keys=['--leader', '--mesos-id', '--option', '--config-file', @@ -462,7 +471,7 @@ def _list(json_): emitter.publish(errors.DefaultError('No slaves found.')) -def _log(follow, lines, leader, slave): +def _log(follow, lines, leader, slave, component, filters): """ Prints the contents of leader and slave logs. :param follow: same as unix tail's -f @@ -473,24 +482,209 @@ def _log(follow, lines, leader, slave): :type leader: bool :param slave: the slave ID to print :type slave: str | None + :param component: DC/OS component name + :type component: string + :param filters: a list of filters ["key:value", ...] + :type filters: list :returns: process return code :rtype: int """ - if not (leader or slave): - raise DCOSException('You must choose one of --leader or --mesos-id.') + if not (leader or slave) or (leader and slave): + raise DCOSException( + 'You must choose one of --leader or --mesos-id.') if lines is None: lines = 10 + lines = util.parse_int(lines) + try: + _dcos_log(follow, lines, leader, slave, component, filters) + return 0 + except (DCOSAuthenticationException, + DCOSAuthorizationException): + raise + except DCOSException as e: + emitter.publish(DefaultError(e)) + emitter.publish(DefaultError('Falling back to files API...')) + + if component or filters: + raise DCOSException('--component or --filter is not ' + 'supported by files API') + + # fail back to mesos files API. mesos_files = _mesos_files(leader, slave) - log.log_files(mesos_files, follow, lines) - return 0 +def _get_slave_ip(slave): + """ Get an agent IP address based on mesos id. + If slave parameter is empty, the function will return + + :param slave: mesos node id + :type slave: str + :return: node ip address + :rtype: str + """ + if not slave: + return + + summary = mesos.DCOSClient().get_state_summary() + if 'slaves' not in summary: + raise DCOSException( + 'Invalid summary report. ' + 'Missing field `slaves`. {}'.format(summary)) + + for s in summary['slaves']: + if 'hostname' not in s or 'id' not in s: + raise DCOSException( + 'Invalid summary report. Missing field `id` ' + 'or `hostname`. {}'.format(summary)) + + if s['id'] == slave: + return s['hostname'] + + raise DCOSException('Agent `{}` not found'.format(slave)) + + +def _list_components(leader, slave, use_json): + """ List components for a leader or slave_ip node + + :param leader: use leader ip flag + :type leader: bool + :param slave_ip: agent ip address + :type slave_ip: str + :param use_json: print components in json format + :type use_json: bool + """ + if not (leader or slave): + raise DCOSException('--leader or --mesos-id must be provided') + + if leader and slave: + raise DCOSException( + 'Unable to use leader and mesos id at the same time') + + slave_ip = _get_slave_ip(slave) + if slave_ip: + print_components(slave_ip, use_json) + return + + leaders = mesos.MesosDNSClient().hosts('leader.mesos') + if len(leaders) != 1: + raise DCOSException('Expecting one leader. Got {}'.format(leaders)) + + if 'ip' not in leaders[0]: + raise DCOSException( + 'Invalid leader response, missing field `ip`. ' + 'Got {}'.format(leaders[0])) + + print_components(leaders[0]['ip'], use_json) + + +def print_components(ip, use_json): + """ Print components for a given node ip. + The data is taked from 3dt endpoint: + /system/health/v1/nodes//units + + :param ip: DC/OS node ip address + :type ip: str + :param use_json: print components in json format + :type use_json: bool + """ + dcos_url = config.get_config_val('core.dcos_url').rstrip("/") + if not dcos_url: + raise config.missing_config_exception(['core.dcos_url']) + + url = dcos_url + '/system/health/v1/nodes/{}/units'.format(ip) + response = http.get(url).json() + if 'units' not in response: + raise DCOSException( + 'Invalid response. Missing field `units`. {}'.format(response)) + + if use_json: + emitter.publish(response['units']) + else: + for component in response['units']: + emitter.publish(component['id']) + + +def _get_unit_type(unit_name): + """ Get the full unit name including the type postfix + or default to service. + + :param unit_name: unit name with or without type + :type unit_name: str + :return: unit name with type + :rtype: str + """ + if not unit_name: + raise DCOSException('Empty systemd unit parameter') + + # https://www.freedesktop.org/software/systemd/man/systemd.unit.html + unit_types = ['service', 'socket', 'device', 'mount', 'automount', + 'swap', 'target', 'path', 'timer', 'slice', 'scope'] + + for unit_type in unit_types: + if unit_name.endswith('.{}'.format(unit_type)): + return unit_name + + return '{}.service'.format(unit_name) + + +def _dcos_log(follow, lines, leader, slave, component, filters): + """ Print logs from dcos-log backend. + + :param follow: same as unix tail's -f + :type follow: bool + :param lines: number of lines to print + :type lines: int + :param leader: whether to print the leading master's log + :type leader: bool + :param slave: the slave ID to print + :type slave: str | None + :param component: DC/OS component name + :type component: string + :param filters: a list of filters ["key:value", ...] + :type filters: list + """ + if not log.dcos_log_enabled(): + raise DCOSException('dcos-log is not supported') + + filter_query = '' + if component: + filters.append('_SYSTEMD_UNIT:{}'.format(_get_unit_type(component))) + + for f in filters: + key_value = f.split(':') + if len(key_value) != 2: + raise SystemExit('Invalid filter parameter {}. ' + 'Must be --filter=key:value'.format(f)) + filter_query += '&filter={}'.format(f) + + endpoint = '/system/v1' + if leader: + endpoint += '/logs/v1/' + if slave: + endpoint += '/agent/{}/logs/v1/'.format(slave) + + endpoint_type = 'range' + if follow: + endpoint_type = 'stream' + + dcos_url = config.get_config_val('core.dcos_url').rstrip("/") + if not dcos_url: + raise config.missing_config_exception(['core.dcos_url']) + + url = (dcos_url + endpoint + endpoint_type + + '/?skip_prev={}'.format(lines) + filter_query) + + if follow: + return log.follow_logs(url) + return log.print_logs_range(url) + + def _mesos_files(leader, slave_id): """Returns the MesosFile objects to log diff --git a/cli/dcoscli/service/main.py b/cli/dcoscli/service/main.py index 7b14386..76a795e 100644 --- a/cli/dcoscli/service/main.py +++ b/cli/dcoscli/service/main.py @@ -3,11 +3,16 @@ import six import dcoscli from dcos import cmds, emitting, marathon, mesos, subprocess, util -from dcos.errors import DCOSException, DefaultError +from dcos.errors import (DCOSAuthenticationException, + DCOSAuthorizationException, + DCOSException, + DefaultError) from dcoscli import log, tables from dcoscli.subcommand import default_command_info, default_doc from dcoscli.util import decorate_docopt_usage +from ..task import main as task_main + logger = util.get_logger(__name__) emitter = emitting.FlatEmitter() @@ -169,6 +174,21 @@ def _log_service(follow, lines, service, file_): if file_ is None: file_ = 'stdout' + task = _get_service_task(service) + try: + if 'id' not in task: + raise DCOSException('Missing `id` in task. {}'.format(task)) + + task_id = task['id'] + task_main._log(follow, False, lines, task_id, file_) + return 0 + except (DCOSAuthenticationException, + DCOSAuthorizationException): + raise + except DCOSException as e: + emitter.publish(DefaultError(e)) + emitter.publish(DefaultError('Falling back to files API...')) + task = _get_service_task(service) return _log_task(task['id'], follow, lines, file_) diff --git a/cli/dcoscli/task/main.py b/cli/dcoscli/task/main.py index cc346d8..2a3c005 100644 --- a/cli/dcoscli/task/main.py +++ b/cli/dcoscli/task/main.py @@ -6,12 +6,15 @@ import docopt import six import dcoscli -from dcos import cmds, emitting, mesos, util -from dcos.errors import DCOSException, DCOSHTTPException, DefaultError +from dcos import cmds, config, emitting, mesos, util +from dcos.errors import (DCOSAuthenticationException, + DCOSAuthorizationException, + DCOSException, DCOSHTTPException, DefaultError) from dcoscli import log, tables from dcoscli.subcommand import default_command_info, default_doc from dcoscli.util import decorate_docopt_usage + logger = util.get_logger(__name__) emitter = emitting.FlatEmitter() @@ -229,6 +232,17 @@ def _log(follow, completed, lines, task, file_): raise DCOSException(msg) raise DCOSException('No matching tasks. Exiting.') + if file_ in ('stdout', 'stderr') and log.dcos_log_enabled(): + try: + _dcos_log(follow, tasks, lines, file_, completed) + return 0 + except (DCOSAuthenticationException, + DCOSAuthorizationException): + raise + except DCOSException as e: + emitter.publish(DefaultError(e)) + emitter.publish(DefaultError('Falling back to files API...')) + mesos_files = _mesos_files(tasks, file_, client) if not mesos_files: if fltr is None: @@ -242,6 +256,159 @@ def _log(follow, completed, lines, task, file_): return 0 +def get_nested_container_id(task): + """ Get the nested container id from mesos state. + + :param task: task definition + :type task: dict + :return: comma separated string of nested containers + :rtype: string + """ + + # get current task state + task_state = task.get('state') + if not task_state: + logger.debug('Full task state: {}'.format(task)) + raise DCOSException('Invalid executor info. ' + 'Missing field `state`') + + container_ids = [] + statuses = task.get('statuses') + if not statuses: + logger.debug('Full task state: {}'.format(task)) + raise DCOSException('Invalid executor info. Missing field `statuses`') + + for status in statuses: + if 'state' not in status: + logger.debug('Full task state: {}'.format(task)) + raise DCOSException('Invalid executor info. Missing field `state`') + + if status['state'] != task_state: + continue + + container_status = status.get('container_status') + if not container_status: + logger.debug('Full task state: {}'.format(task)) + raise DCOSException('Invalid executor info. ' + 'Missing field `container_status`') + + container_id = container_status.get('container_id') + if not container_id: + logger.debug('Full task state: {}'.format(task)) + raise DCOSException('Invalid executor info. ' + 'Missing field `container_id`') + + # traverse nested container_id field + while True: + value = container_id.get('value') + if not value: + logger.debug('Full task state: {}'.format(task)) + raise DCOSException('Invalid executor info. Missing field' + '`value` for nested container ids') + + container_ids.append(value) + + if 'parent' not in container_id: + break + + container_id = container_id['parent'] + + return '.'.join(reversed(container_ids)) + + +def _dcos_log(follow, tasks, lines, file_, completed): + """ a client to dcos-log + + :param follow: same as unix tail's -f + :type follow: bool + :param task: task pattern to match + :type task: str + :param lines: number of lines to print + :type lines: int + :param file_: file path to read + :type file_: str + :param completed: whether to include completed tasks + :type completed: bool + """ + + # only stdout and stderr is supported + if file_ not in ('stdout', 'stderr'): + raise DCOSException('Expect file stdout or stderr. ' + 'Got {}'.format(file_)) + # state json may container tasks and completed_tasks fields. Based on + # user request we should traverse the appropriate field. + tasks_field = 'tasks' + if completed: + tasks_field = 'completed_tasks' + + for task in tasks: + executor_info = task.executor() + if not executor_info: + continue + if (tasks_field not in executor_info and + not isinstance(executor_info[tasks_field], list)): + logger.debug('Executor info: {}'.format(executor_info)) + raise DCOSException('Invalid executor info. ' + 'Missing field {}'.format(tasks_field)) + + for t in executor_info[tasks_field]: + container_id = get_nested_container_id(t) + if not container_id: + logger.debug('Executor info: {}'.format(executor_info)) + raise DCOSException( + 'Invalid executor info. Missing container id') + + # get slave_id field + slave_id = t.get('slave_id') + if not slave_id: + logger.debug('Executor info: {}'.format(executor_info)) + raise DCOSException( + 'Invalid executor info. Missing field `slave_id`') + + framework_id = t.get('framework_id') + if not framework_id: + logger.debug('Executor info: {}'.format(executor_info)) + raise DCOSException( + 'Invalid executor info. Missing field `framework_id`') + + # try `executor_id` first. + executor_id = t.get('executor_id') + if not executor_id: + # if `executor_id` is an empty string, default to `id`. + executor_id = t.get('id') + if not executor_id: + logger.debug('Executor info: {}'.format(executor_info)) + raise DCOSException( + 'Invalid executor info. Missing executor id') + + dcos_url = config.get_config_val('core.dcos_url').rstrip('/') + if not dcos_url: + raise config.missing_config_exception(['core.dcos_url']) + + # dcos-log provides 2 base endpoints /range/ and /stream/ + # for range and streaming requests. + endpoint_type = 'range' + if follow: + endpoint_type = 'stream' + + endpoint = ('/system/v1/agent/{}/logs/v1/{}/framework/{}' + '/executor/{}/container/{}'.format(slave_id, + endpoint_type, + framework_id, + executor_id, + container_id)) + # append request parameters. + # `skip_prev` will move the cursor to -n lines. + # `filter=STREAM:{STDOUT,STDERR}` will filter logs by label. + url = (dcos_url + endpoint + + '?skip_prev={}&filter=STREAM:{}'.format(lines, + file_.upper())) + + if follow: + return log.follow_logs(url) + return log.print_logs_range(url) + + def _ls(task, path, long_, completed): """ List files in a task's sandbox. diff --git a/cli/setup.py b/cli/setup.py index f8815ac..be6c0ad 100644 --- a/cli/setup.py +++ b/cli/setup.py @@ -70,7 +70,8 @@ setup( 'pkginfo==1.2.1', 'toml>=0.9, <1.0', 'virtualenv>=13.0, <16.0', - 'cryptography==1.6' + 'cryptography==1.6', + 'sseclient==0.0.14' ], # If there are data files included in your packages that need to be diff --git a/cli/tests/data/help/node.txt b/cli/tests/data/help/node.txt index f0cfe39..6bc270b 100644 --- a/cli/tests/data/help/node.txt +++ b/cli/tests/data/help/node.txt @@ -6,6 +6,8 @@ Usage: dcos node --info dcos node [--json] dcos node log [--follow --lines=N --leader --master --mesos-id= --slave=] + [--component= --filter=...] + dcos node list-components [--leader --mesos-id= --json] dcos node ssh [--option SSHOPT=VAL ...] [--config-file=] [--user=] @@ -72,6 +74,13 @@ Options: If not set, default to present working directory. --version Print version information. + --list-components + Print a list of available DC/OS components on specified node. + --component= + Show DC/OS component logs. + --filter= + Filter logs by field and value. Filter must be a string separated by colon. + For example: --filter _PID:0 --filter _UID:1 Positional Arguments: diff --git a/cli/tests/integrations/common.py b/cli/tests/integrations/common.py index 5002073..f5bd030 100644 --- a/cli/tests/integrations/common.py +++ b/cli/tests/integrations/common.py @@ -464,13 +464,15 @@ def delete_zk_node(znode): http.delete(znode_url) -def assert_lines(cmd, num_lines): +def assert_lines(cmd, num_lines, great_then=False): """ Assert stdout contains the expected number of lines :param cmd: program and arguments :type cmd: [str] :param num_lines: expected number of lines for stdout :type num_lines: int + :param great_then: if True assume there may be at least num_lines or more + :type great_then: bool :rtype: None """ @@ -478,7 +480,11 @@ def assert_lines(cmd, num_lines): assert returncode == 0 assert stderr == b'' - assert len(stdout.decode('utf-8').split('\n')) - 1 == num_lines + lines = len(stdout.decode('utf-8').split('\n')) - 1 + if great_then: + assert lines >= num_lines + return + assert lines == num_lines def file_json_ast(path): diff --git a/cli/tests/integrations/test_node.py b/cli/tests/integrations/test_node.py index 47d73af..7f15d29 100644 --- a/cli/tests/integrations/test_node.py +++ b/cli/tests/integrations/test_node.py @@ -1,6 +1,5 @@ import json import os -import re import sys import pytest @@ -51,12 +50,15 @@ def test_node_log_empty(): def test_node_log_leader(): - assert_lines(['dcos', 'node', 'log', '--leader'], 10) + assert_lines(['dcos', 'node', 'log', '--leader'], 10, great_then=True) def test_node_log_slave(): slave_id = _node()[0]['id'] - assert_lines(['dcos', 'node', 'log', '--mesos-id={}'.format(slave_id)], 10) + assert_lines( + ['dcos', 'node', 'log', '--mesos-id={}'.format(slave_id)], + 10, + great_then=True) def test_node_log_missing_slave(): @@ -65,26 +67,18 @@ def test_node_log_missing_slave(): assert returncode == 1 assert stdout == b'' - assert stderr == b'No slave found with ID "bogus".\n' - - -def test_node_log_leader_slave(): - slave_id = _node()[0]['id'] - - returncode, stdout, stderr = exec_command( - ['dcos', 'node', 'log', '--leader', '--mesos-id={}'.format(slave_id)]) - - assert returncode == 0 - assert stderr == b'' - - lines = stdout.decode('utf-8').split('\n') - assert len(lines) == 23 - assert re.match('===>.*<===', lines[0]) - assert re.match('===>.*<===', lines[11]) + stderr_str = str(stderr) + assert 'HTTP 404' in stderr_str + assert 'No slave found with ID "bogus".' in stderr_str def test_node_log_lines(): - assert_lines(['dcos', 'node', 'log', '--leader', '--lines=4'], 4) + # since we are getting system logs, it's not guaranteed to get back + # exactly 4 log entries. It must be >= 4 + assert_lines( + ['dcos', 'node', 'log', '--leader', '--lines=4'], + 4, + great_then=True) def test_node_log_invalid_lines(): diff --git a/cli/tests/integrations/test_service.py b/cli/tests/integrations/test_service.py index eb0e76d..3e45ee5 100644 --- a/cli/tests/integrations/test_service.py +++ b/cli/tests/integrations/test_service.py @@ -170,6 +170,8 @@ def test_log_config(): returncode=1) +@pytest.mark.skipif(os.environ.get('DCOS_ENABLE_LOG_TEST') != 1, + reason='disable python buffering') def test_log_follow(): package_install('chronos', deploy=True) @@ -182,13 +184,19 @@ def test_log_follow(): stdout=subprocess.PIPE, stderr=subprocess.PIPE) else: + # disable stdout/stderr buffering: + # https://docs.python.org/3/using/cmdline.html#cmdoption-u + my_env = os.environ.copy() + my_env['PYTHONUNBUFFERED'] = 'x' + # os.setsid is only available for Unix: # https://docs.python.org/2/library/os.html#os.setsid proc = subprocess.Popen( args, preexec_fn=os.setsid, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + stderr=subprocess.PIPE, + env=my_env) time.sleep(10) diff --git a/cli/tests/integrations/test_task.py b/cli/tests/integrations/test_task.py index 87f72d7..7494578 100644 --- a/cli/tests/integrations/test_task.py +++ b/cli/tests/integrations/test_task.py @@ -133,7 +133,7 @@ def test_log_pod_task(): # logs shouldn't be seen and this pod shouldn't have any logging # to stderr assert returncode == 0 - assert stderr == b'No logs for this task\n' + assert 'No logs for this task' in str(stderr) assert stdout == b'\n' @@ -188,9 +188,7 @@ def test_log_two_tasks(): assert stderr == b'' lines = stdout.decode('utf-8').split('\n') - assert len(lines) == 17 - assert re.match('===>.*<===', lines[0]) - assert re.match('===>.*<===', lines[8]) + assert len(lines) == 11 @pytest.mark.skipif(sys.platform == 'win32', @@ -244,7 +242,7 @@ def test_ls_no_params(): assert returncode == 0 assert stderr == b'' - ls_line = 'stderr stderr.logrotate.conf stdout stdout.logrotate.conf' + ls_line = 'stderr stdout' lines = stdout.decode('utf-8').split('\n') assert len(lines) == 7 assert re.match('===>.*<===', lines[0]) @@ -256,13 +254,13 @@ def test_ls_no_params(): def test_ls(): - stdout = b'stderr stderr.logrotate.conf stdout stdout.logrotate.conf\n' + stdout = b'stderr stdout\n' assert_command(['dcos', 'task', 'ls', 'test-app1'], stdout=stdout) def test_ls_multiple_tasks(): - ls_line = 'stderr stderr.logrotate.conf stdout stdout.logrotate.conf' + ls_line = 'stderr stdout' returncode, stdout, stderr = exec_command( ['dcos', 'task', 'ls', 'test-app']) lines = stdout.decode('utf-8').split('\n') @@ -277,7 +275,7 @@ def test_ls_multiple_tasks(): def test_ls_long(): - assert_lines(['dcos', 'task', 'ls', '--long', 'test-app1'], 4) + assert_lines(['dcos', 'task', 'ls', '--long', 'test-app1'], 2) def test_ls_path(): @@ -310,7 +308,7 @@ def test_ls_completed(): returncode, stdout, stderr = exec_command( ['dcos', 'task', 'ls', '--completed', task_id_completed]) - out = b'stderr stderr.logrotate.conf stdout stdout.logrotate.conf\n' + out = b'stderr stdout\n' assert returncode == 0 assert stdout == out assert stderr == b'' diff --git a/cli/tests/unit/test_node.py b/cli/tests/unit/test_node.py index 8f8bb1c..5544c28 100644 --- a/cli/tests/unit/test_node.py +++ b/cli/tests/unit/test_node.py @@ -189,3 +189,108 @@ def test_node_diagnostics_download(mock_get_diagnostics_list, mock_do_request, mock_do_request.assert_called_with( '/system/health/v1/report/diagnostics/serve/bundle.zip', 'GET', stream=True) + + +@mock.patch('dcos.config.get_config_val') +@mock.patch('dcos.http.get') +@mock.patch('dcoscli.log.dcos_log_enabled') +def test_dcos_log(mocked_dcos_log_enabked, mocked_http_get, + mocked_get_config_val): + mocked_dcos_log_enabked.return_value = True + + m = mock.MagicMock() + m.status_code = 200 + mocked_http_get.return_value = m + + mocked_get_config_val.return_value = 'http://127.0.0.1' + + main._dcos_log(False, 10, True, '', None, []) + mocked_http_get.assert_called_with( + 'http://127.0.0.1/system/v1/logs/v1/range/?skip_prev=10', + headers={'Accept': 'text/plain'}) + + +@mock.patch('dcoscli.log.follow_logs') +@mock.patch('dcos.config.get_config_val') +@mock.patch('dcos.http.get') +@mock.patch('dcoscli.log.dcos_log_enabled') +def test_dcos_log_stream(mocked_dcos_log_enabked, mocked_http_get, + mocked_get_config_val, mocked_follow_logs): + mocked_dcos_log_enabked.return_value = True + + m = mock.MagicMock() + m.status_code = 200 + mocked_http_get.return_value = m + + mocked_get_config_val.return_value = 'http://127.0.0.1' + + main._dcos_log(True, 20, False, 'mesos-id', None, []) + mocked_follow_logs.assert_called_with( + 'http://127.0.0.1/system/v1/agent/mesos-id/logs' + '/v1/stream/?skip_prev=20') + + +@mock.patch('dcoscli.log.follow_logs') +@mock.patch('dcos.config.get_config_val') +@mock.patch('dcos.http.get') +@mock.patch('dcoscli.log.dcos_log_enabled') +def test_dcos_log_filters(mocked_dcos_log_enabked, mocked_http_get, + mocked_get_config_val, mocked_follow_logs): + mocked_dcos_log_enabked.return_value = True + + m = mock.MagicMock() + m.status_code = 200 + mocked_http_get.return_value = m + + mocked_get_config_val.return_value = 'http://127.0.0.1' + + main._dcos_log(True, 20, False, 'mesos-id', 'dcos-mesos-master', + ['key1:value1', 'key2:value2']) + + mocked_follow_logs.assert_called_with( + 'http://127.0.0.1/system/v1/agent/mesos-id/logs/v1/stream/' + '?skip_prev=20&filter=key1:value1&filter=key2:value2&' + 'filter=_SYSTEMD_UNIT:dcos-mesos-master.service') + + +@mock.patch('dcos.config.get_config_val') +@mock.patch('dcoscli.node.main._get_slave_ip') +@mock.patch('dcos.http.get') +def test_list_components(mocked_get, mocked_get_slave_ip, + mocked_get_config_val): + m = mock.MagicMock() + m.json.return_value = { + 'units': [ + { + 'id': 'dcos-component.service', + } + ] + } + mocked_get.return_value = m + mocked_get_slave_ip.return_value = '127.0.0.1' + mocked_get_config_val.return_value = 'http://10.10.10.10' + main._list_components(None, 'slave-id', False) + mocked_get.assert_called_with( + 'http://10.10.10.10/system/health/v1/nodes/127.0.0.1/units') + + +@mock.patch('dcos.config.get_config_val') +@mock.patch('dcos.mesos.MesosDNSClient') +@mock.patch('dcos.http.get') +def test_list_components_leader(mocked_get, mocked_dns, + mocked_get_config_val): + m = mock.MagicMock() + m.json.return_value = { + 'units': [ + { + 'id': 'dcos-component.service', + } + ] + } + mocked_dns().hosts.return_value = [{'ip': '10.10.0.1'}] + mocked_get_config_val.return_value = 'http://10.10.10.10' + + mocked_get.return_value = m + main._list_components(True, False, False) + mocked_get.assert_called_with( + 'http://10.10.10.10/system/health/v1/nodes/10.10.0.1/units') diff --git a/cli/tests/unit/test_task.py b/cli/tests/unit/test_task.py index 1a13af4..0ca03b1 100644 --- a/cli/tests/unit/test_task.py +++ b/cli/tests/unit/test_task.py @@ -4,7 +4,7 @@ from mock import MagicMock, patch from dcos import mesos from dcos.errors import DCOSException from dcoscli.log import log_files -from dcoscli.task.main import main +from dcoscli.task.main import _dcos_log, main from .common import assert_mock @@ -56,3 +56,89 @@ def test_log_file_unavailable(): def _mock_exception(contents='exception'): return MagicMock(side_effect=DCOSException(contents)) + + +@patch('dcos.http.get') +@patch('dcos.config.get_config_val') +def test_dcos_log(mocked_get_config_val, mocked_http_get): + mocked_get_config_val.return_value = 'http://127.0.0.1' + + m = MagicMock() + m.status_code = 200 + mocked_http_get.return_value = m + + executor_info = { + 'tasks': [ + { + 'container': 'container1', + 'state': 'TASK_RUNNING', + 'statuses': [ + { + 'state': 'TASK_RUNNING', + 'container_status': { + 'container_id': { + 'value': 'child-123', + 'parent': { + 'value': 'parent-456' + } + } + } + } + ], + 'slave_id': 'slave-123', + 'framework_id': 'framework-123', + 'id': 'id-123' + } + ] + } + + task = MagicMock + task.executor = lambda: executor_info + _dcos_log(False, [task], 10, 'stdout', False) + mocked_http_get.assert_called_with( + 'http://127.0.0.1/system/v1/agent/slave-123/logs/v1/range/framework/' + 'framework-123/executor/id-123/container/parent-456.child-123' + '?skip_prev=10&filter=STREAM:STDOUT', + headers={'Accept': 'text/plain'}) + + +@patch('dcoscli.log.follow_logs') +@patch('dcos.http.get') +@patch('dcos.config.get_config_val') +def test_dcos_log_stream(mocked_get_config_val, mocked_http_get, + mocked_follow_logs): + mocked_get_config_val.return_value = 'http://127.0.0.1' + + m = MagicMock() + m.status_code = 200 + mocked_http_get.return_value = m + + executor_info = { + 'tasks': [ + { + 'container': 'container1', + 'state': 'TASK_RUNNING', + 'statuses': [ + { + 'state': 'TASK_RUNNING', + 'container_status': { + 'container_id': { + 'value': 'child-123', + } + } + } + ], + 'slave_id': 'slave-123', + 'framework_id': 'framework-123', + 'id': 'id-123' + } + ] + } + + task = MagicMock + task.executor = lambda: executor_info + _dcos_log(True, [task], 10, 'stderr', False) + mocked_follow_logs.assert_called_with( + 'http://127.0.0.1/system/v1/agent/slave-123/logs/v1/' + 'stream/framework/framework-123/executor/id-123/container/' + 'child-123?skip_prev=10&filter=STREAM:STDERR') diff --git a/dcos/sse.py b/dcos/sse.py new file mode 100644 index 0000000..45a5d90 --- /dev/null +++ b/dcos/sse.py @@ -0,0 +1,19 @@ +from sseclient import SSEClient + +from . import http + + +def get(url, **kwargs): + """ Make a get request to streaming endpoint which + implements SSE (Server sent events). The parameter session=http + will ensure we are using `dcos.http` module with all required auth + headers. + + :param url: server sent events streaming URL + :type url: str + :param kwargs: arbitrary params for requests + :type kwargs: dict + :return: instance of sseclient.SSEClient + :rtype: sseclient.SSEClient + """ + return SSEClient(url, session=http, **kwargs) diff --git a/setup.py b/setup.py index e32206b..03b7191 100755 --- a/setup.py +++ b/setup.py @@ -67,6 +67,7 @@ setup( 'requests>=2.6, <3.0', 'six>=1.9, <2.0', 'toml>=0.9, <1.0', + 'sseclient==0.0.14', ], extras_require={