From 56d21c11effbb487cdc84c7f89676572bae86204 Mon Sep 17 00:00:00 2001 From: Michael Gummelt Date: Thu, 7 May 2015 17:47:38 -0700 Subject: [PATCH] dcos tail --- cli/dcoscli/service/main.py | 4 +- cli/dcoscli/task/main.py | 313 +++++++++++- cli/setup.py | 2 +- cli/tests/data/file/follow.json | 7 + cli/tests/data/file/two_tasks.json | 7 + cli/tests/data/file/two_tasks_follow.json | 7 + cli/tests/data/marathon/sleep2.json | 11 + cli/tests/fixtures/service.py | 2 +- cli/tests/fixtures/task.py | 2 +- cli/tests/integrations/common.py | 166 ++++++- cli/tests/integrations/test_dcos.py | 4 +- cli/tests/integrations/test_help.py | 4 +- cli/tests/integrations/test_marathon.py | 523 +++++++++----------- cli/tests/integrations/test_service.py | 4 +- cli/tests/integrations/test_task.py | 196 +++++++- dcos/http.py | 64 ++- dcos/marathon.py | 88 ++-- dcos/mesos.py | 573 ++++++++++++++++++---- dcos/package.py | 14 +- dcos/util.py | 1 - 20 files changed, 1469 insertions(+), 523 deletions(-) create mode 100644 cli/tests/data/file/follow.json create mode 100644 cli/tests/data/file/two_tasks.json create mode 100644 cli/tests/data/file/two_tasks_follow.json create mode 100644 cli/tests/data/marathon/sleep2.json diff --git a/cli/dcoscli/service/main.py b/cli/dcoscli/service/main.py index d7d5c89..b247894 100644 --- a/cli/dcoscli/service/main.py +++ b/cli/dcoscli/service/main.py @@ -1,4 +1,4 @@ -"""Get the status of DCOS services +"""Manage DCOS services Usage: dcos service --info @@ -121,5 +121,5 @@ def _shutdown(service_id): :rtype: int """ - mesos.get_master_client().shutdown_framework(service_id) + mesos.MesosClient().shutdown_framework(service_id) return 0 diff --git a/cli/dcoscli/task/main.py b/cli/dcoscli/task/main.py index 391e070..1ac0dcd 100644 --- a/cli/dcoscli/task/main.py +++ b/cli/dcoscli/task/main.py @@ -1,26 +1,36 @@ -"""Get the status of DCOS tasks +"""Manage DCOS tasks Usage: dcos task --info dcos task [--completed --json ] + dcos task log [--completed --follow --lines=N] [] Options: -h, --help Show this screen --info Show a short description of this subcommand + --completed Include completed tasks as well + --follow Output data as the file grows --json Print json-formatted tasks - --completed Show completed tasks as well + --lines=N Output the last N lines [default: 10] --version Show version Positional Arguments: Only match tasks whose ID matches . may be a substring of the ID, or a unix glob pattern. + + Output this file. [default: stdout] """ +import functools +import sys +import time + +import concurrent.futures import dcoscli import docopt from dcos import cmds, emitting, mesos, util -from dcos.errors import DCOSException +from dcos.errors import DCOSException, DefaultError from dcoscli import tables logger = util.get_logger(__name__) @@ -57,6 +67,12 @@ def _cmds(): arg_keys=[], function=_info), + cmds.Command( + hierarchy=['task', 'log'], + arg_keys=['--follow', '--completed', '--lines', '', + ''], + function=_log), + cmds.Command( hierarchy=['task'], arg_keys=['', '--completed', '--json'], @@ -86,7 +102,6 @@ def _task(fltr, completed, json_): readable table. :type json_: bool :returns: process return code - """ if fltr is None: @@ -104,3 +119,293 @@ def _task(fltr, completed, json_): emitter.publish(output) return 0 + + +NO_FILE_EXCEPTION = DCOSException('No files exist. Exiting.') + + +def _log(follow, completed, lines, task, path): + """ Tail a file in the task's sandbox. + + :param follow: same as unix tail's -f + :type follow: bool + :param completed: whether to include completed tasks + :type completed: bool + :param lines: number of lines to print + :type lines: int + :param task: task pattern to match + :type task: str + :param path: file path to read + :type path: str + :returns: process return code + :rtype: int + """ + + if task is None: + fltr = "" + else: + fltr = task + + if path is None: + path = 'stdout' + + lines = int(lines) + + mesos_files = _mesos_files(completed, fltr, path) + if not mesos_files: + raise DCOSException('No matching tasks. Exiting.') + + fn = functools.partial(_read_last_lines, lines) + curr_header, mesos_files = _stream_files(None, fn, mesos_files) + if not mesos_files: + raise NO_FILE_EXCEPTION + + while follow: + # This flush is needed only for testing, since stdout is fully + # buffered (as opposed to line-buffered) when redirected to a + # pipe. So if we don't flush, our --follow tests, which use a + # pipe, never see the data + sys.stdout.flush() + + curr_header, mesos_files = _stream_files(curr_header, + _read_rest, + mesos_files) + if not mesos_files: + raise NO_FILE_EXCEPTION + time.sleep(1) + + return 0 + + +def _mesos_files(completed, fltr, path): + """Return MesosFile objects for the specified files. Only include + files that satisfy all of the following: + + a) belong to an available slave + b) have an executor entry on the slave + + :param completed: whether to include completed tasks + :type completed: bool + :param fltr: task pattern to match + :type fltr: str + :param path: file path to read + :type path: str + :returns: MesosFile objects + :rtype: [MesosFile] + + """ + + # get tasks + client = mesos.MesosClient() + master = mesos.Master(client.get_master_state()) + tasks = master.tasks(completed=completed, fltr=fltr) + + # load slave state in parallel + slaves = _load_slaves_state([task.slave() for task in tasks]) + + # some completed tasks may have entries on the master, but none on + # the slave. since we need the slave entry to get the executor + # sandbox, we only include files with an executor entry. + available_tasks = [task for task in tasks + if task.slave() in slaves and task.executor()] + + # create files. + return [mesos.MesosFile(task, path, client) + for task in available_tasks] + + +def _load_slaves_state(slaves): + """Fetch each slave's state.json in parallel, and return the reachable + slaves. + + :param slaves: slaves to fetch + :type slaves: [MesosSlave] + :returns: MesosSlave objects that were successfully reached + :rtype: [MesosSlave] + """ + + reachable_slaves = [] + + for job, slave in _stream(lambda slave: slave.state(), slaves): + try: + job.result() + reachable_slaves.append(slave) + except DCOSException as e: + emitter.publish( + DefaultError('Error accessing slave: {0}'.format(e))) + + return reachable_slaves + + +def _stream_files(curr_header, fn, mesos_files): + """Apply `fn` in parallel to each file in `mesos_files`. `fn` must + return a list of strings, and these strings are then printed + serially as separate lines. + + `curr_header` is the most recently printed header. It's used to + group lines. Each line has an associated header (e.g. a string + representation of the MesosFile it was read from), and we only + print the header before printing a line with a different header + than the previous line. This effectively groups lines together + when the have the same header. + + :param curr_header: Most recently printed header + :type curr_header: str + :param fn: function that reads a sequence of lines from a MesosFile + :type fn: MesosFile -> [str] + :param mesos_files: files to read + :type mesos_files: [MesosFile] + :returns: Returns the most recently printed header, and a list of + files that are still reachable. Once we detect a file is + unreachable, we stop trying to read from it. + :rtype: (str, [MesosFile]) + """ + + reachable_files = list(mesos_files) + + # TODO switch to map + for job, mesos_file in _stream(fn, mesos_files): + try: + lines = job.result() + except DCOSException as e: + # The read function might throw an exception if read.json + # is unavailable, or if the file doesn't exist in the + # sandbox. In any case, we silently remove the file and + # continue. + logger.warning("Error reading file: {}".format(e)) + + reachable_files.remove(mesos_file) + continue + + curr_header = _output(curr_header, + len(reachable_files) > 1, + str(mesos_file), + lines) + + return curr_header, reachable_files + + +def _output(curr_header, output_header, header, lines): + """Prints a sequence of lines. If `header` is different than + `curr_header`, first print the header. + + :param curr_header: most recently printed header + :type curr_header: str + :param output_header: whether or not to output the header + :type output_header: bool + :param header: header for `lines` + :type header: str + :param lines: lines to print + :type lines: [str] + :returns: `header` + :rtype: str + """ + + if lines: + if output_header and header != curr_header: + emitter.publish('===> {} <==='.format(header)) + for line in lines: + emitter.publish(line) + return header + + +STREAM_CONCURRENCY = 20 + + +def _stream(fn, objs): + """Apply `fn` to `objs` in parallel, yielding the (Future, obj) for + each as it completes. + + :param fn: function + :type fn: function + :param objs: objs + :type objs: objs + :returns: iterator over (Future, typeof(obj)) + :rtype: iterator over (Future, typeof(obj)) + + """ + + with concurrent.futures.ThreadPoolExecutor(STREAM_CONCURRENCY) as pool: + jobs = {pool.submit(fn, obj): obj for obj in objs} + for job in concurrent.futures.as_completed(jobs): + yield job, jobs[job] + + +# A liberal estimate of a line size. Used to estimate how much data +# we need to fetch from a file when we want to read N lines. +LINE_SIZE = 200 + + +def _read_last_lines(num_lines, mesos_file): + """Returns the last `num_lines` of a file, or less if the file is + smaller. Seeks to EOF. + + :param num_lines: number of lines to read + :type num_lines: int + :param mesos_file: file to read + :type mesos_file: MesosFile + :returns: lines read + :rtype: [str] + """ + + file_size = mesos_file.size() + + # estimate how much data we need to fetch to read `num_lines`. + fetch_size = LINE_SIZE * num_lines + + end = file_size + start = max(file_size - fetch_size, 0) + data = '' + while True: + # fetch data + mesos_file.seek(start) + data = mesos_file.read(end - start) + data + + # break if we have enough lines + data_tmp = _strip_trailing_newline(data) + lines = data_tmp.split('\n') + if len(lines) > num_lines: + ret = lines[-num_lines:] + break + elif start == 0: + ret = lines + break + + # otherwise shift our read window and repeat + end = start + start = max(file_size - fetch_size, 0) + + mesos_file.seek(file_size) + return ret + + +def _read_rest(mesos_file): + """ Reads the rest of the file, and returns the lines. + + :param mesos_file: file to read + :type mesos_file: MesosFile + :returns: lines read + :rtype: [str] + """ + data = mesos_file.read() + if data == '': + return [] + else: + data_tmp = _strip_trailing_newline(data) + return data_tmp.split('\n') + + +def _strip_trailing_newline(s): + """Returns a modified version of the string with the last character + truncated if it's a newline. + + :param s: string to trim + :type s: str + :returns: modified string + :rtype: str + """ + + if s == "": + return s + else: + return s[:-1] if s[-1] == '\n' else s diff --git a/cli/setup.py b/cli/setup.py index ec6f28a..1657296 100644 --- a/cli/setup.py +++ b/cli/setup.py @@ -95,8 +95,8 @@ setup( 'dcos-config=dcoscli.config.main:main', 'dcos-marathon=dcoscli.marathon.main:main', 'dcos-package=dcoscli.package.main:main', - 'dcos-task=dcoscli.task.main:main', 'dcos-service=dcoscli.service.main:main', + 'dcos-task=dcoscli.task.main:main', ], }, diff --git a/cli/tests/data/file/follow.json b/cli/tests/data/file/follow.json new file mode 100644 index 0000000..8d52bdf --- /dev/null +++ b/cli/tests/data/file/follow.json @@ -0,0 +1,7 @@ +{ + "id": "follow", + "cmd": "sleep 5 && echo \"follow_this\" && sleep 1000", + "cpus": 0.1, + "mem": 16, + "instances": 1 +} diff --git a/cli/tests/data/file/two_tasks.json b/cli/tests/data/file/two_tasks.json new file mode 100644 index 0000000..d2de425 --- /dev/null +++ b/cli/tests/data/file/two_tasks.json @@ -0,0 +1,7 @@ +{ + "id": "two-tasks", + "cmd": "sleep 1000", + "cpus": 0.1, + "mem": 16, + "instances": 2 +} diff --git a/cli/tests/data/file/two_tasks_follow.json b/cli/tests/data/file/two_tasks_follow.json new file mode 100644 index 0000000..1134e74 --- /dev/null +++ b/cli/tests/data/file/two_tasks_follow.json @@ -0,0 +1,7 @@ +{ + "id": "two-tasks-follow", + "cmd": "sleep 5 && echo \"follow_this\" && sleep 1000", + "cpus": 0.1, + "mem": 16, + "instances": 2 +} diff --git a/cli/tests/data/marathon/sleep2.json b/cli/tests/data/marathon/sleep2.json new file mode 100644 index 0000000..e01bbd9 --- /dev/null +++ b/cli/tests/data/marathon/sleep2.json @@ -0,0 +1,11 @@ +{ + "id": "test-app2", + "cmd": "sleep 1000", + "cpus": 0.1, + "mem": 16, + "instances": 1, + "labels": { + "PACKAGE_ID": "test-app", + "PACKAGE_VERSION": "1.2.3" + } +} diff --git a/cli/tests/fixtures/service.py b/cli/tests/fixtures/service.py index f5b2dc5..d752df4 100644 --- a/cli/tests/fixtures/service.py +++ b/cli/tests/fixtures/service.py @@ -43,4 +43,4 @@ def framework_fixture(): }, "user": "root", "webui_url": "http://mesos:8080" - }) + }, None) diff --git a/cli/tests/fixtures/task.py b/cli/tests/fixtures/task.py index ed53f91..09b82c3 100644 --- a/cli/tests/fixtures/task.py +++ b/cli/tests/fixtures/task.py @@ -32,6 +32,6 @@ def task_fixture(): }, None) task.user = mock.Mock(return_value='root') - slave = Slave({"hostname": "mock-hostname"}) + slave = Slave({"hostname": "mock-hostname"}, None, None) task.slave = mock.Mock(return_value=slave) return task diff --git a/cli/tests/integrations/common.py b/cli/tests/integrations/common.py index f680e02..fdb8015 100644 --- a/cli/tests/integrations/common.py +++ b/cli/tests/integrations/common.py @@ -1,11 +1,15 @@ import collections +import contextlib import json import os import subprocess +import sys import requests import six +from dcos import util +import mock from six.moves import urllib @@ -80,6 +84,58 @@ def assert_command( assert stderr_ == stderr +def exec_mock(main, args): + """Call a main function with sys.args mocked, and capture + stdout/stderr + + :param main: main function to call + :type main: function + :param args: sys.args to mock, excluding the initial 'dcos' + :type args: [str] + :returns: (returncode, stdout, stderr) + :rtype: (int, bytes, bytes) + """ + + print('MOCK ARGS: {}'.format(' '.join(args))) + + with mock_args(args) as (stdout, stderr): + returncode = main() + + stdout_val = six.b(stdout.getvalue()) + stderr_val = six.b(stderr.getvalue()) + + print('STDOUT: {}'.format(stdout_val)) + print('STDERR: {}'.format(stderr_val)) + + return (returncode, stdout_val, stderr_val) + + +def assert_mock(main, + args, + returncode=0, + stdout=b'', + stderr=b''): + """Mock and call a main function, and assert expected behavior. + + :param main: main function to call + :type main: function + :param args: sys.args to mock, excluding the initial 'dcos' + :type args: [str] + :type returncode: int + :param stdout: Expected stdout + :type stdout: str + :param stderr: Expected stderr + :type stderr: str + :rtype: None + """ + + returncode_, stdout_, stderr_ = exec_mock(main, args) + + assert returncode_ == returncode + assert stdout_ == stdout + assert stderr_ == stderr + + def mock_called_some_args(mock, *args, **kwargs): """Convience method for some mock assertions. Returns True if the arguments to one of the calls of `mock` contains `args` and @@ -138,6 +194,57 @@ def watch_all_deployments(count=300): watch_deployment(dep['id'], count) +def add_app(app_path, deploy=False): + """ Add an app, and wait for it to deploy + + :param app_path: path to app's json definition + :type app_path: str + :rtype: None + """ + + assert_command(['dcos', 'marathon', 'app', 'add', app_path]) + + if deploy: + watch_all_deployments() + + +def remove_app(app_id): + """ Remove an app + + :param app_id: id of app to remove + :type app_id: str + :rtype: None + """ + + assert_command(['dcos', 'marathon', 'app', 'remove', app_id]) + + +def get_services(expected_count=None, args=[]): + """Get services + + :param expected_count: assert exactly this number of services are + running + :type expected_count: int | None + :param args: cli arguments + :type args: [str] + :returns: services + :rtype: [dict] + """ + + returncode, stdout, stderr = exec_command( + ['dcos', 'service', '--json'] + args) + + assert returncode == 0 + assert stderr == b'' + + services = json.loads(stdout.decode('utf-8')) + assert isinstance(services, collections.Sequence) + if expected_count is not None: + assert len(services) == expected_count + + return services + + def list_deployments(expected_count=None, app_id=None): """Get all active deployments. @@ -166,29 +273,6 @@ def list_deployments(expected_count=None, app_id=None): return result -def get_services(expected_count=None, args=[]): - """Get services - - :param expected_count: assert exactly this number of services are - running - :type expected_count: int | None - :param args: cli arguments - :type args: [str] - :returns: services - :rtype: [dict] - """ - - returncode, stdout, stderr = exec_command( - ['dcos', 'service', '--json'] + args) - - services = json.loads(stdout.decode('utf-8')) - assert isinstance(services, collections.Sequence) - if expected_count is not None: - assert len(services) == expected_count - - return services - - def show_app(app_id, version=None): """Show details of a Marathon application. @@ -274,3 +358,39 @@ def file_bytes(path): with open(path) as f: return six.b(f.read()) + + +@contextlib.contextmanager +def app(path, app_id, deploy=False): + """Context manager that deploys an app on entrance, and removes it on + exit. + + :param path: path to app's json definition: + :type path: str + :param app_id: app id + :type app_id: str + :rtype: None + """ + + add_app(path, deploy) + try: + yield + finally: + remove_app(app_id) + + +@contextlib.contextmanager +def mock_args(args): + """ Context manager that mocks sys.args and captures stdout/stderr + + :param args: sys.args values to mock + :type args: [str] + :rtype: None + """ + with mock.patch('sys.argv', [util.which('dcos')] + args): + stdout, stderr = sys.stdout, sys.stderr + sys.stdout, sys.stderr = six.StringIO(), six.StringIO() + try: + yield sys.stdout, sys.stderr + finally: + sys.stdout, sys.stderr = stdout, stderr diff --git a/cli/tests/integrations/test_dcos.py b/cli/tests/integrations/test_dcos.py index 8830781..2fa0168 100644 --- a/cli/tests/integrations/test_dcos.py +++ b/cli/tests/integrations/test_dcos.py @@ -21,8 +21,8 @@ Available DCOS commands: \thelp \tDisplay command line usage information \tmarathon \tDeploy and manage applications on the DCOS \tpackage \tInstall and manage DCOS software packages -\tservice \tGet the status of DCOS services -\ttask \tGet the status of DCOS tasks +\tservice \tManage DCOS services +\ttask \tManage DCOS tasks Get detailed command description with 'dcos --help'. """.encode('utf-8') diff --git a/cli/tests/integrations/test_help.py b/cli/tests/integrations/test_help.py index 58d86d9..dcd5d37 100644 --- a/cli/tests/integrations/test_help.py +++ b/cli/tests/integrations/test_help.py @@ -40,8 +40,8 @@ Available DCOS commands: \thelp \tDisplay command line usage information \tmarathon \tDeploy and manage applications on the DCOS \tpackage \tInstall and manage DCOS software packages -\tservice \tGet the status of DCOS services -\ttask \tGet the status of DCOS tasks +\tservice \tManage DCOS services +\ttask \tManage DCOS tasks Get detailed command description with 'dcos --help'. """.encode('utf-8') diff --git a/cli/tests/integrations/test_marathon.py b/cli/tests/integrations/test_marathon.py index 85c0548..788f537 100644 --- a/cli/tests/integrations/test_marathon.py +++ b/cli/tests/integrations/test_marathon.py @@ -1,3 +1,4 @@ +import contextlib import json import os @@ -5,7 +6,7 @@ from dcos import constants import pytest -from .common import (assert_command, assert_lines, exec_command, +from .common import (app, assert_command, assert_lines, exec_command, list_deployments, show_app, watch_all_deployments, watch_deployment) @@ -157,22 +158,18 @@ def test_empty_list(): def test_add_app(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _list_apps('zero-instance-app') - _remove_app('zero-instance-app') + with _zero_instance_app(): + _list_apps('zero-instance-app') def test_add_app_with_filename(): - assert_command(['dcos', 'marathon', 'app', 'add', - 'tests/data/marathon/apps/zero_instance_sleep.json']) - - _list_apps('zero-instance-app') - _remove_app('zero-instance-app') + with _zero_instance_app(): + _list_apps('zero-instance-app') def test_remove_app(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _remove_app('zero-instance-app') + with _zero_instance_app(): + pass _list_apps() @@ -188,108 +185,94 @@ def test_add_bad_json_app(): def test_add_existing_app(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - - with open('tests/data/marathon/apps/zero_instance_sleep_v2.json') as fd: - stderr = b"Application '/zero-instance-app' already exists\n" - assert_command(['dcos', 'marathon', 'app', 'add'], - returncode=1, - stderr=stderr, - stdin=fd) - - _remove_app('zero-instance-app') + with _zero_instance_app(): + app_path = 'tests/data/marathon/apps/zero_instance_sleep_v2.json' + with open(app_path) as fd: + stderr = b"Application '/zero-instance-app' already exists\n" + assert_command(['dcos', 'marathon', 'app', 'add'], + returncode=1, + stderr=stderr, + stdin=fd) def test_show_app(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - show_app('zero-instance-app') - _remove_app('zero-instance-app') + with _zero_instance_app(): + show_app('zero-instance-app') def test_show_absolute_app_version(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _update_app( - 'zero-instance-app', - 'tests/data/marathon/apps/update_zero_instance_sleep.json') + with _zero_instance_app(): + _update_app( + 'zero-instance-app', + 'tests/data/marathon/apps/update_zero_instance_sleep.json') - result = show_app('zero-instance-app') - show_app('zero-instance-app', result['version']) - - _remove_app('zero-instance-app') + result = show_app('zero-instance-app') + show_app('zero-instance-app', result['version']) def test_show_relative_app_version(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _update_app( - 'zero-instance-app', - 'tests/data/marathon/apps/update_zero_instance_sleep.json') - show_app('zero-instance-app', "-1") - _remove_app('zero-instance-app') + with _zero_instance_app(): + _update_app( + 'zero-instance-app', + 'tests/data/marathon/apps/update_zero_instance_sleep.json') + show_app('zero-instance-app', "-1") def test_show_missing_relative_app_version(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _update_app( - 'zero-instance-app', - 'tests/data/marathon/apps/update_zero_instance_sleep.json') + with _zero_instance_app(): + _update_app( + 'zero-instance-app', + 'tests/data/marathon/apps/update_zero_instance_sleep.json') - stderr = b"Application 'zero-instance-app' only has 2 version(s).\n" - assert_command(['dcos', 'marathon', 'app', 'show', - '--app-version=-2', 'zero-instance-app'], - returncode=1, - stderr=stderr) - - _remove_app('zero-instance-app') + stderr = b"Application 'zero-instance-app' only has 2 version(s).\n" + assert_command(['dcos', 'marathon', 'app', 'show', + '--app-version=-2', 'zero-instance-app'], + returncode=1, + stderr=stderr) def test_show_missing_absolute_app_version(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _update_app( - 'zero-instance-app', - 'tests/data/marathon/apps/update_zero_instance_sleep.json') + with _zero_instance_app(): + _update_app( + 'zero-instance-app', + 'tests/data/marathon/apps/update_zero_instance_sleep.json') - returncode, stdout, stderr = exec_command( - ['dcos', 'marathon', 'app', 'show', - '--app-version=2000-02-11T20:39:32.972Z', 'zero-instance-app']) + returncode, stdout, stderr = exec_command( + ['dcos', 'marathon', 'app', 'show', + '--app-version=2000-02-11T20:39:32.972Z', 'zero-instance-app']) - assert returncode == 1 - assert stdout == b'' - assert stderr.decode('utf-8').startswith( - "Error: App '/zero-instance-app' does not exist") - - _remove_app('zero-instance-app') + assert returncode == 1 + assert stdout == b'' + assert stderr.decode('utf-8').startswith( + "Error: App '/zero-instance-app' does not exist") def test_show_bad_app_version(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _update_app( - 'zero-instance-app', - 'tests/data/marathon/apps/update_zero_instance_sleep.json') + with _zero_instance_app(): + _update_app( + 'zero-instance-app', + 'tests/data/marathon/apps/update_zero_instance_sleep.json') - stderr = (b'Error: Invalid format: "20:39:32.972Z" is malformed at ' - b'":39:32.972Z"\n') - assert_command( - ['dcos', 'marathon', 'app', 'show', '--app-version=20:39:32.972Z', - 'zero-instance-app'], - returncode=1, - stderr=stderr) - - _remove_app('zero-instance-app') + stderr = (b'Error: Invalid format: "20:39:32.972Z" is malformed at ' + b'":39:32.972Z"\n') + assert_command( + ['dcos', 'marathon', 'app', 'show', '--app-version=20:39:32.972Z', + 'zero-instance-app'], + returncode=1, + stderr=stderr) def test_show_bad_relative_app_version(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _update_app( - 'zero-instance-app', - 'tests/data/marathon/apps/update_zero_instance_sleep.json') + with _zero_instance_app(): + _update_app( + 'zero-instance-app', + 'tests/data/marathon/apps/update_zero_instance_sleep.json') - assert_command( - ['dcos', 'marathon', 'app', 'show', - '--app-version=2', 'zero-instance-app'], - returncode=1, - stderr=b"Relative versions must be negative: 2\n") - - _remove_app('zero-instance-app') + assert_command( + ['dcos', 'marathon', 'app', 'show', + '--app-version=2', 'zero-instance-app'], + returncode=1, + stderr=b"Relative versions must be negative: 2\n") def test_start_missing_app(): @@ -300,21 +283,20 @@ def test_start_missing_app(): def test_start_app(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _start_app('zero-instance-app') - _remove_app('zero-instance-app') + with _zero_instance_app(): + _start_app('zero-instance-app') def test_start_already_started_app(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _start_app('zero-instance-app') + with _zero_instance_app(): + _start_app('zero-instance-app') - stdout = b"Application 'zero-instance-app' already started: 1 instances.\n" - assert_command(['dcos', 'marathon', 'app', 'start', 'zero-instance-app'], - returncode=1, - stdout=stdout) - - _remove_app('zero-instance-app') + stdout = (b"Application 'zero-instance-app' already " + b"started: 1 instances.\n") + assert_command( + ['dcos', 'marathon', 'app', 'start', 'zero-instance-app'], + returncode=1, + stdout=stdout) def test_stop_missing_app(): @@ -324,30 +306,27 @@ def test_stop_missing_app(): def test_stop_app(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _start_app('zero-instance-app', 3) - result = list_deployments(1, 'zero-instance-app') - watch_deployment(result[0]['id'], 60) + with _zero_instance_app(): + _start_app('zero-instance-app', 3) + result = list_deployments(1, 'zero-instance-app') + watch_deployment(result[0]['id'], 60) - returncode, stdout, stderr = exec_command( - ['dcos', 'marathon', 'app', 'stop', 'zero-instance-app']) + returncode, stdout, stderr = exec_command( + ['dcos', 'marathon', 'app', 'stop', 'zero-instance-app']) - assert returncode == 0 - assert stdout.decode().startswith('Created deployment ') - assert stderr == b'' - - _remove_app('zero-instance-app') + assert returncode == 0 + assert stdout.decode().startswith('Created deployment ') + assert stderr == b'' def test_stop_already_stopped_app(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - - stdout = b"Application 'zero-instance-app' already stopped: 0 instances.\n" - assert_command(['dcos', 'marathon', 'app', 'stop', 'zero-instance-app'], - returncode=1, - stdout=stdout) - - _remove_app('zero-instance-app') + with _zero_instance_app(): + stdout = (b"Application 'zero-instance-app' already " + b"stopped: 0 instances.\n") + assert_command( + ['dcos', 'marathon', 'app', 'stop', 'zero-instance-app'], + returncode=1, + stdout=stdout) def test_update_missing_app(): @@ -357,70 +336,57 @@ def test_update_missing_app(): def test_update_missing_field(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') + with _zero_instance_app(): + returncode, stdout, stderr = exec_command( + ['dcos', 'marathon', 'app', 'update', + 'zero-instance-app', 'missing="a string"']) - returncode, stdout, stderr = exec_command( - ['dcos', 'marathon', 'app', 'update', - 'zero-instance-app', 'missing="a string"']) - - assert returncode == 1 - assert stdout == b'' - assert stderr.decode('utf-8').startswith( - "The property 'missing' does not conform to the expected format. " - "Possible values are: ") - - _remove_app('zero-instance-app') + assert returncode == 1 + assert stdout == b'' + assert stderr.decode('utf-8').startswith( + "The property 'missing' does not conform to the expected format. " + "Possible values are: ") def test_update_bad_type(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') + with _zero_instance_app(): + returncode, stdout, stderr = exec_command( + ['dcos', 'marathon', 'app', 'update', + 'zero-instance-app', 'cpus="a string"']) - returncode, stdout, stderr = exec_command( - ['dcos', 'marathon', 'app', 'update', - 'zero-instance-app', 'cpus="a string"']) - - assert returncode == 1 - assert stderr.decode('utf-8').startswith( - "Unable to parse 'a string' as a float: could not convert string to " - "float: ") - assert stdout == b'' - - _remove_app('zero-instance-app') + assert returncode == 1 + assert stderr.decode('utf-8').startswith( + "Unable to parse 'a string' as a float: could not convert string " + "to float: ") + assert stdout == b'' def test_update_app(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') + with _zero_instance_app(): + returncode, stdout, stderr = exec_command( + ['dcos', 'marathon', 'app', 'update', 'zero-instance-app', + 'cpus=1', 'mem=20', "cmd='sleep 100'"]) - returncode, stdout, stderr = exec_command( - ['dcos', 'marathon', 'app', 'update', 'zero-instance-app', - 'cpus=1', 'mem=20', "cmd='sleep 100'"]) - - assert returncode == 0 - assert stdout.decode().startswith('Created deployment ') - assert stderr == b'' - - _remove_app('zero-instance-app') + assert returncode == 0 + assert stdout.decode().startswith('Created deployment ') + assert stderr == b'' def test_update_app_from_stdin(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _update_app( - 'zero-instance-app', - 'tests/data/marathon/apps/update_zero_instance_sleep.json') - - _remove_app('zero-instance-app') + with _zero_instance_app(): + _update_app( + 'zero-instance-app', + 'tests/data/marathon/apps/update_zero_instance_sleep.json') def test_restarting_stopped_app(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - - stdout = (b"Unable to perform rolling restart of application '" - b"/zero-instance-app' because it has no running tasks\n") - assert_command(['dcos', 'marathon', 'app', 'restart', 'zero-instance-app'], - returncode=1, - stdout=stdout) - - _remove_app('zero-instance-app') + with _zero_instance_app(): + stdout = (b"Unable to perform rolling restart of application '" + b"/zero-instance-app' because it has no running tasks\n") + assert_command( + ['dcos', 'marathon', 'app', 'restart', 'zero-instance-app'], + returncode=1, + stdout=stdout) def test_restarting_missing_app(): @@ -430,19 +396,17 @@ def test_restarting_missing_app(): def test_restarting_app(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _start_app('zero-instance-app', 3) - result = list_deployments(1, 'zero-instance-app') - watch_deployment(result[0]['id'], 60) + with _zero_instance_app(): + _start_app('zero-instance-app', 3) + result = list_deployments(1, 'zero-instance-app') + watch_deployment(result[0]['id'], 60) - returncode, stdout, stderr = exec_command( - ['dcos', 'marathon', 'app', 'restart', 'zero-instance-app']) + returncode, stdout, stderr = exec_command( + ['dcos', 'marathon', 'app', 'restart', 'zero-instance-app']) - assert returncode == 0 - assert stdout.decode().startswith('Created deployment ') - assert stderr == b'' - - _remove_app('zero-instance-app') + assert returncode == 0 + assert stdout.decode().startswith('Created deployment ') + assert stderr == b'' def test_list_version_missing_app(): @@ -460,28 +424,24 @@ def test_list_version_negative_max_count(): def test_list_version_app(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _list_versions('zero-instance-app', 1) + with _zero_instance_app(): + _list_versions('zero-instance-app', 1) - _update_app( - 'zero-instance-app', - 'tests/data/marathon/apps/update_zero_instance_sleep.json') - _list_versions('zero-instance-app', 2) - - _remove_app('zero-instance-app') + _update_app( + 'zero-instance-app', + 'tests/data/marathon/apps/update_zero_instance_sleep.json') + _list_versions('zero-instance-app', 2) def test_list_version_max_count(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _update_app( - 'zero-instance-app', - 'tests/data/marathon/apps/update_zero_instance_sleep.json') + with _zero_instance_app(): + _update_app( + 'zero-instance-app', + 'tests/data/marathon/apps/update_zero_instance_sleep.json') - _list_versions('zero-instance-app', 1, 1) - _list_versions('zero-instance-app', 2, 2) - _list_versions('zero-instance-app', 2, 3) - - _remove_app('zero-instance-app') + _list_versions('zero-instance-app', 1, 1) + _list_versions('zero-instance-app', 2, 2) + _list_versions('zero-instance-app', 2, 3) def test_list_empty_deployment(): @@ -489,10 +449,9 @@ def test_list_empty_deployment(): def test_list_deployment(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _start_app('zero-instance-app', 3) - list_deployments(1) - _remove_app('zero-instance-app') + with _zero_instance_app(): + _start_app('zero-instance-app', 3) + list_deployments(1) def test_list_deployment_table(): @@ -500,24 +459,22 @@ def test_list_deployment_table(): The more specific testing is done in unit tests. """ - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _start_app('zero-instance-app', 3) - assert_lines(['dcos', 'marathon', 'deployment', 'list'], 2) - _remove_app('zero-instance-app') + + with _zero_instance_app(): + _start_app('zero-instance-app', 3) + assert_lines(['dcos', 'marathon', 'deployment', 'list'], 2) def test_list_deployment_missing_app(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _start_app('zero-instance-app') - list_deployments(0, 'missing-id') - _remove_app('zero-instance-app') + with _zero_instance_app(): + _start_app('zero-instance-app') + list_deployments(0, 'missing-id') def test_list_deployment_app(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _start_app('zero-instance-app', 3) - list_deployments(1, 'zero-instance-app') - _remove_app('zero-instance-app') + with _zero_instance_app(): + _start_app('zero-instance-app', 3) + list_deployments(1, 'zero-instance-app') def test_rollback_missing_deployment(): @@ -528,35 +485,32 @@ def test_rollback_missing_deployment(): def test_rollback_deployment(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _start_app('zero-instance-app', 3) - result = list_deployments(1, 'zero-instance-app') + with _zero_instance_app(): + _start_app('zero-instance-app', 3) + result = list_deployments(1, 'zero-instance-app') - returncode, stdout, stderr = exec_command( - ['dcos', 'marathon', 'deployment', 'rollback', result[0]['id']]) + returncode, stdout, stderr = exec_command( + ['dcos', 'marathon', 'deployment', 'rollback', result[0]['id']]) - result = json.loads(stdout.decode('utf-8')) + result = json.loads(stdout.decode('utf-8')) - assert returncode == 0 - assert 'deploymentId' in result - assert 'version' in result - assert stderr == b'' + assert returncode == 0 + assert 'deploymentId' in result + assert 'version' in result + assert stderr == b'' - list_deployments(0) - - _remove_app('zero-instance-app') + list_deployments(0) def test_stop_deployment(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _start_app('zero-instance-app', 3) - result = list_deployments(1, 'zero-instance-app') + with _zero_instance_app(): + _start_app('zero-instance-app', 3) + result = list_deployments(1, 'zero-instance-app') - assert_command(['dcos', 'marathon', 'deployment', 'stop', result[0]['id']]) + assert_command( + ['dcos', 'marathon', 'deployment', 'stop', result[0]['id']]) - list_deployments(0) - - _remove_app('zero-instance-app') + list_deployments(0) def test_watching_missing_deployment(): @@ -564,12 +518,11 @@ def test_watching_missing_deployment(): def test_watching_deployment(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _start_app('zero-instance-app', 3) - result = list_deployments(1, 'zero-instance-app') - watch_deployment(result[0]['id'], 60) - list_deployments(0, 'zero-instance-app') - _remove_app('zero-instance-app') + with _zero_instance_app(): + _start_app('zero-instance-app', 3) + result = list_deployments(1, 'zero-instance-app') + watch_deployment(result[0]['id'], 60) + list_deployments(0, 'zero-instance-app') def test_list_empty_task(): @@ -577,44 +530,39 @@ def test_list_empty_task(): def test_list_empty_task_not_running_app(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _list_tasks(0) - _remove_app('zero-instance-app') + with _zero_instance_app(): + _list_tasks(0) def test_list_tasks(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _start_app('zero-instance-app', 3) - result = list_deployments(1, 'zero-instance-app') - watch_deployment(result[0]['id'], 60) - _list_tasks(3) - _remove_app('zero-instance-app') + with _zero_instance_app(): + _start_app('zero-instance-app', 3) + result = list_deployments(1, 'zero-instance-app') + watch_deployment(result[0]['id'], 60) + _list_tasks(3) def test_list_tasks_table(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _start_app('zero-instance-app', 3) - watch_all_deployments() - assert_lines(['dcos', 'marathon', 'task', 'list'], 4) - _remove_app('zero-instance-app') + with _zero_instance_app(): + _start_app('zero-instance-app', 3) + watch_all_deployments() + assert_lines(['dcos', 'marathon', 'task', 'list'], 4) def test_list_app_tasks(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _start_app('zero-instance-app', 3) - result = list_deployments(1, 'zero-instance-app') - watch_deployment(result[0]['id'], 60) - _list_tasks(3, 'zero-instance-app') - _remove_app('zero-instance-app') + with _zero_instance_app(): + _start_app('zero-instance-app', 3) + result = list_deployments(1, 'zero-instance-app') + watch_deployment(result[0]['id'], 60) + _list_tasks(3, 'zero-instance-app') def test_list_missing_app_tasks(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _start_app('zero-instance-app', 3) - result = list_deployments(1, 'zero-instance-app') - watch_deployment(result[0]['id'], 60) - _list_tasks(0, 'missing-id') - _remove_app('zero-instance-app') + with _zero_instance_app(): + _start_app('zero-instance-app', 3) + result = list_deployments(1, 'zero-instance-app') + watch_deployment(result[0]['id'], 60) + _list_tasks(0, 'missing-id') def test_show_missing_task(): @@ -630,22 +578,20 @@ def test_show_missing_task(): def test_show_task(): - _add_app('tests/data/marathon/apps/zero_instance_sleep.json') - _start_app('zero-instance-app', 3) - result = list_deployments(1, 'zero-instance-app') - watch_deployment(result[0]['id'], 60) - result = _list_tasks(3, 'zero-instance-app') + with _zero_instance_app(): + _start_app('zero-instance-app', 3) + result = list_deployments(1, 'zero-instance-app') + watch_deployment(result[0]['id'], 60) + result = _list_tasks(3, 'zero-instance-app') - returncode, stdout, stderr = exec_command( - ['dcos', 'marathon', 'task', 'show', result[0]['id']]) + returncode, stdout, stderr = exec_command( + ['dcos', 'marathon', 'task', 'show', result[0]['id']]) - result = json.loads(stdout.decode('utf-8')) + result = json.loads(stdout.decode('utf-8')) - assert returncode == 0 - assert result['appId'] == '/zero-instance-app' - assert stderr == b'' - - _remove_app('zero-instance-app') + assert returncode == 0 + assert result['appId'] == '/zero-instance-app' + assert stderr == b'' def test_bad_configuration(): @@ -659,7 +605,7 @@ def test_bad_configuration(): assert stdout == b'' assert stderr.decode().startswith( "Marathon likely misconfigured. Please check your proxy or " - "Marathon URI settings. See dcos config --help. ") + "Marathon URL settings. See dcos config --help. ") assert_command(['dcos', 'config', 'unset', 'marathon.url']) @@ -682,26 +628,6 @@ def _list_apps(app_id=None): return result -def _remove_app(app_id): - assert_command(['dcos', 'marathon', 'app', 'remove', app_id]) - - # Let's make sure that we don't return until the deployment has finished - result = list_deployments(None, app_id) - if len(result) != 0: - watch_deployment(result[0]['id'], 60) - - -def _add_app(file_path): - with open(file_path) as fd: - returncode, stdout, stderr = exec_command( - ['dcos', 'marathon', 'app', 'add'], - stdin=fd) - - assert returncode == 0 - assert stdout == b'' - assert stderr == b'' - - def _start_app(app_id, instances=None): cmd = ['dcos', 'marathon', 'app', 'start', app_id] if instances is not None: @@ -754,3 +680,10 @@ def _list_tasks(expected_count, app_id=None): assert stderr == b'' return result + + +@contextlib.contextmanager +def _zero_instance_app(): + with app('tests/data/marathon/apps/zero_instance_sleep.json', + 'zero-instance-app'): + yield diff --git a/cli/tests/integrations/test_service.py b/cli/tests/integrations/test_service.py index 64a4af2..93512ab 100644 --- a/cli/tests/integrations/test_service.py +++ b/cli/tests/integrations/test_service.py @@ -18,7 +18,7 @@ def zk_znode(request): def test_help(): - stdout = b"""Get the status of DCOS services + stdout = b"""Manage DCOS services Usage: dcos service --info @@ -45,7 +45,7 @@ Positional Arguments: def test_info(): - stdout = b"Get the status of DCOS services\n" + stdout = b"Manage DCOS services\n" assert_command(['dcos', 'service', '--info'], stdout=stdout) diff --git a/cli/tests/integrations/test_task.py b/cli/tests/integrations/test_task.py index 72d425d..4d23bf8 100644 --- a/cli/tests/integrations/test_task.py +++ b/cli/tests/integrations/test_task.py @@ -1,41 +1,59 @@ import collections import json +import os +import re +import subprocess +import time import dcos.util as util +from dcos import mesos +from dcos.errors import DCOSException from dcos.util import create_schema +from dcoscli.task.main import _mesos_files, main + +import fcntl +from mock import MagicMock, patch from ..fixtures.task import task_fixture -from .common import (assert_command, assert_lines, exec_command, - watch_all_deployments) +from .common import (app, assert_command, assert_lines, assert_mock, + exec_command, watch_all_deployments) SLEEP1 = 'tests/data/marathon/apps/sleep.json' SLEEP2 = 'tests/data/marathon/apps/sleep2.json' +FOLLOW = 'tests/data/file/follow.json' +TWO_TASKS = 'tests/data/file/two_tasks.json' +TWO_TASKS_FOLLOW = 'tests/data/file/two_tasks_follow.json' def test_help(): - stdout = b"""Get the status of DCOS tasks + stdout = b"""Manage DCOS tasks Usage: dcos task --info dcos task [--completed --json ] + dcos task log [--completed --follow --lines=N] [] Options: -h, --help Show this screen --info Show a short description of this subcommand + --completed Include completed tasks as well + --follow Output data as the file grows --json Print json-formatted tasks - --completed Show completed tasks as well + --lines=N Output the last N lines [default: 10] --version Show version Positional Arguments: Only match tasks whose ID matches . may be a substring of the ID, or a unix glob pattern. + + Output this file. [default: stdout] """ assert_command(['dcos', 'task', '--help'], stdout=stdout) def test_info(): - stdout = b"Get the status of DCOS tasks\n" + stdout = b"Manage DCOS tasks\n" assert_command(['dcos', 'task', '--info'], stdout=stdout) @@ -105,6 +123,174 @@ def test_filter(): _uninstall_sleep('test-app2') +def test_log_no_files(): + """ Tail stdout on nonexistant task """ + assert_command(['dcos', 'task', 'log', 'asdf'], + returncode=1, + stderr=b'No matching tasks. Exiting.\n') + + +def test_log_single_file(): + """ Tail a single file on a single task """ + with app(SLEEP1, 'test-app', True): + returncode, stdout, stderr = exec_command( + ['dcos', 'task', 'log', 'test-app']) + + assert returncode == 0 + assert stderr == b'' + assert len(stdout.decode('utf-8').split('\n')) == 5 + + +def test_log_missing_file(): + """ Tail a single file on a single task """ + with app(SLEEP1, 'test-app', True): + returncode, stdout, stderr = exec_command( + ['dcos', 'task', 'log', 'test-app', 'asdf']) + + assert returncode == 1 + assert stdout == b'' + assert stderr == b'No files exist. Exiting.\n' + + +def test_log_lines(): + """ Test --lines """ + with app(SLEEP1, 'test-app', True): + returncode, stdout, stderr = exec_command( + ['dcos', 'task', 'log', 'test-app', '--lines=2']) + + assert returncode == 0 + assert stderr == b'' + assert len(stdout.decode('utf-8').split('\n')) == 3 + + +def test_log_follow(): + """ Test --follow """ + with app(FOLLOW, 'follow', True): + # verify output + proc = subprocess.Popen(['dcos', 'task', 'log', 'follow', '--follow'], + stdout=subprocess.PIPE) + + # mark stdout as non-blocking, so we can read all available data + # before EOF + _mark_non_blocking(proc.stdout) + + # wait for data to be output + time.sleep(1) + + # assert lines before and after sleep + assert len(proc.stdout.read().decode('utf-8').split('\n')) == 5 + time.sleep(8) + assert len(proc.stdout.read().decode('utf-8').split('\n')) == 2 + + proc.kill() + + +def test_log_two_tasks(): + """ Test tailing a single file on two separate tasks """ + with app(TWO_TASKS, 'two-tasks', True): + returncode, stdout, stderr = exec_command( + ['dcos', 'task', 'log', 'two-tasks']) + + assert returncode == 0 + assert stderr == b'' + + lines = stdout.decode('utf-8').split('\n') + assert len(lines) == 11 + assert re.match('===>.*<===', lines[0]) + assert re.match('===>.*<===', lines[5]) + + +def test_log_two_tasks_follow(): + """ Test tailing a single file on two separate tasks with --follow """ + with app(TWO_TASKS_FOLLOW, 'two-tasks-follow', True): + proc = subprocess.Popen( + ['dcos', 'task', 'log', 'two-tasks-follow', '--follow'], + stdout=subprocess.PIPE) + + # mark stdout as non-blocking, so we can read all available data + # before EOF + _mark_non_blocking(proc.stdout) + + # wait for data to be output + time.sleep(1) + + # get output before and after the task's sleep + first_lines = proc.stdout.read().decode('utf-8').split('\n') + time.sleep(8) + second_lines = proc.stdout.read().decode('utf-8').split('\n') + + # assert both tasks have printed the expected amount of output + assert len(first_lines) >= 11 + # assert there is some difference after sleeping + assert len(second_lines) > 0 + + proc.kill() + + +def test_log_completed(): + """ Test --completed """ + # create a completed task + # ensure that tail lists nothing + # ensure that tail --completed lists a completed task + with app(SLEEP1, 'test-app', True): + pass + + assert_command(['dcos', 'task', 'log', 'test-app'], + returncode=1, + stderr=b'No matching tasks. Exiting.\n', + stdout=b'') + + returncode, stdout, stderr = exec_command( + ['dcos', 'task', 'log', '--completed', 'test-app']) + assert returncode == 0 + assert stderr == b'' + assert len(stdout.decode('utf-8').split('\n')) > 4 + + +def test_log_master_unavailable(): + """ Test master's state.json being unavailable """ + client = mesos.MesosClient() + client.get_master_state = _mock_exception() + + with patch('dcos.mesos.MesosClient', return_value=client): + args = ['task', 'log', '_'] + assert_mock(main, args, returncode=1, stderr=(b"exception\n")) + + +def test_log_slave_unavailable(): + """ Test slave's state.json being unavailable """ + with app(SLEEP1, 'test-app', True): + client = mesos.MesosClient() + client.get_slave_state = _mock_exception() + + with patch('dcos.mesos.MesosClient', return_value=client): + args = ['task', 'log', 'test-app'] + stderr = (b"""Error accessing slave: exception\n""" + b"""No matching tasks. Exiting.\n""") + assert_mock(main, args, returncode=1, stderr=stderr) + + +def test_log_file_unavailable(): + """ Test a file's read.json being unavailable """ + with app(SLEEP1, 'test-app', True): + files = _mesos_files(False, "", "stdout") + assert len(files) == 1 + files[0].read = _mock_exception('exception') + + with patch('dcoscli.task.main._mesos_files', return_value=files): + args = ['task', 'log', 'test-app'] + stderr = b"No files exist. Exiting.\n" + assert_mock(main, args, returncode=1, stderr=stderr) + + +def _mock_exception(contents='exception'): + return MagicMock(side_effect=DCOSException(contents)) + + +def _mark_non_blocking(file_): + fcntl.fcntl(file_.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) + + def _install_sleep_task(app_path=SLEEP1, app_name='test-app'): # install helloworld app args = ['dcos', 'marathon', 'app', 'add', app_path] diff --git a/dcos/http.py b/dcos/http.py index 9bf7f9a..0a5a155 100644 --- a/dcos/http.py +++ b/dcos/http.py @@ -1,6 +1,6 @@ import requests from dcos import util -from dcos.errors import DCOSException, DefaultError, Error +from dcos.errors import DCOSException logger = util.get_logger(__name__) @@ -17,18 +17,30 @@ def _default_is_success(status_code): return status_code >= 200 and status_code < 300 -def _default_to_error(response): +def _default_to_exception(response): """ - :param response: HTTP response object or Error - :type response: requests.Response or Error - :returns: the error embedded in the response JSON - :rtype: Error + :param response: HTTP response object or Exception + :type response: requests.Response | Exception + :returns: exception + :rtype: Exception """ - if isinstance(response, Error): + if isinstance(response, Exception) and \ + not isinstance(response, requests.exceptions.RequestException): return response - return DefaultError('{}: {}'.format(response.status_code, response.text)) + url = response.request.url + if isinstance(response, requests.exceptions.ConnectionError): + return DCOSException('URL [{0}] is unreachable: {1}' + .format(url, response)) + elif isinstance(response, requests.exceptions.Timeout): + return DCOSException('Request to URL [{0}] timed out'.format(url)) + elif isinstance(response, requests.exceptions.RequestException): + return response + else: + return DCOSException( + 'Error while fetching [{0}]: HTTP {1}: {2}'.format( + url, response.status_code, response.reason)) @util.duration @@ -36,7 +48,7 @@ def request(method, url, timeout=3.0, is_success=_default_is_success, - to_error=_default_to_error, + to_exception=_default_to_exception, **kwargs): """Sends an HTTP request. @@ -46,8 +58,8 @@ def request(method, :type url: str :param is_success: Defines successful status codes for the request :type is_success: Function from int to bool - :param to_error: Builds an Error from an unsuccessful response or Error - :type to_error: Function from requests.Response or Error to Error + :param to_exception: Builds an Error from an unsuccessful response or Error + :type to_exception: (requests.Response | Error) -> Error :param kwargs: Additional arguments to requests.request (see http://docs.python-requests.org/en/latest/api/#requests.request) :type kwargs: dict @@ -73,7 +85,7 @@ def request(method, with requests.Session() as session: response = session.send(request.prepare(), timeout=timeout) except Exception as ex: - raise DCOSException(to_error(DefaultError(str(ex))).error()) + raise to_exception(ex) logger.info('Received HTTP response [%r]: %r', response.status_code, @@ -82,10 +94,10 @@ def request(method, if is_success(response.status_code): return response else: - raise DCOSException(to_error(response).error()) + raise to_exception(response) -def head(url, to_error=_default_to_error, **kwargs): +def head(url, to_exception=_default_to_exception, **kwargs): """Sends a HEAD request. :param url: URL for the new Request object @@ -99,7 +111,7 @@ def head(url, to_error=_default_to_error, **kwargs): return request('head', url, **kwargs) -def get(url, to_error=_default_to_error, **kwargs): +def get(url, to_exception=_default_to_exception, **kwargs): """Sends a GET request. :param url: URL for the new Request object @@ -110,10 +122,11 @@ def get(url, to_error=_default_to_error, **kwargs): :rtype: Response """ - return request('get', url, to_error=to_error, **kwargs) + return request('get', url, to_exception=to_exception, **kwargs) -def post(url, to_error=_default_to_error, data=None, json=None, **kwargs): +def post(url, to_exception=_default_to_exception, + data=None, json=None, **kwargs): """Sends a POST request. :param url: URL for the new Request object @@ -128,11 +141,11 @@ def post(url, to_error=_default_to_error, data=None, json=None, **kwargs): :rtype: Response """ - return request('post', - url, to_error=to_error, data=data, json=json, **kwargs) + return request('post', url, + to_exception=to_exception, data=data, json=json, **kwargs) -def put(url, to_error=_default_to_error, data=None, **kwargs): +def put(url, to_exception=_default_to_exception, data=None, **kwargs): """Sends a PUT request. :param url: URL for the new Request object @@ -145,10 +158,10 @@ def put(url, to_error=_default_to_error, data=None, **kwargs): :rtype: Response """ - return request('put', url, to_error=to_error, data=data, **kwargs) + return request('put', url, to_exception=to_exception, data=data, **kwargs) -def patch(url, to_error=_default_to_error, data=None, **kwargs): +def patch(url, to_exception=_default_to_exception, data=None, **kwargs): """Sends a PATCH request. :param url: URL for the new Request object @@ -161,10 +174,11 @@ def patch(url, to_error=_default_to_error, data=None, **kwargs): :rtype: Response """ - return request('patch', url, to_error=to_error, data=data, **kwargs) + return request('patch', url, + to_exception=to_exception, data=data, **kwargs) -def delete(url, to_error=_default_to_error, **kwargs): +def delete(url, to_exception=_default_to_exception, **kwargs): """Sends a DELETE request. :param url: URL for the new Request object @@ -175,7 +189,7 @@ def delete(url, to_error=_default_to_error, **kwargs): :rtype: Response """ - return request('delete', url, to_error=to_error, **kwargs) + return request('delete', url, to_exception=to_exception, **kwargs) def silence_requests_warnings(): diff --git a/dcos/marathon.py b/dcos/marathon.py index 4d8056c..6769de3 100644 --- a/dcos/marathon.py +++ b/dcos/marathon.py @@ -2,7 +2,7 @@ import json from distutils.version import LooseVersion from dcos import http, util -from dcos.errors import DCOSException, DefaultError, Error +from dcos.errors import DCOSException from six.moves import urllib @@ -21,38 +21,38 @@ def create_client(config=None): if config is None: config = util.get_config() - marathon_uri = _get_marathon_uri(config) + marathon_url = _get_marathon_url(config) - logger.info('Creating marathon client with: %r', marathon_uri) - return Client(marathon_uri) + logger.info('Creating marathon client with: %r', marathon_url) + return Client(marathon_url) -def _get_marathon_uri(config): +def _get_marathon_url(config): """ :param config: configuration dictionary :type config: config.Toml - :returns: marathon base uri + :returns: marathon base url :rtype: str """ - marathon_uri = config.get('marathon.url') - if marathon_uri is None: + marathon_url = config.get('marathon.url') + if marathon_url is None: dcos_url = util.get_config_vals(config, ['core.dcos_url'])[0] - marathon_uri = urllib.parse.urljoin(dcos_url, 'marathon/') + marathon_url = urllib.parse.urljoin(dcos_url, 'marathon/') - return marathon_uri + return marathon_url -def _to_error(response): +def _to_exception(response): """ - :param response: HTTP response object or Error - :type response: requests.Response | Error - :returns: the error embedded in the response JSON - :rtype: Error + :param response: HTTP response object or Exception + :type response: requests.Response | Exception + :returns: An exception with the message from the response JSON + :rtype: Exception """ - if isinstance(response, Error): - return DefaultError(_default_marathon_error(response.error())) + if isinstance(response, Exception): + return DCOSException(_default_marathon_error(str(response))) message = response.json().get('message') if message is None: @@ -61,23 +61,23 @@ def _to_error(response): logger.error( 'Marathon server did not return a message: %s', response.json()) - return DefaultError(_default_marathon_error()) + return DCOSException(_default_marathon_error()) msg = '\n'.join(error['error'] for error in errs) - return DefaultError(_default_marathon_error(msg)) + return DCOSException(_default_marathon_error(msg)) - return DefaultError('Error: {}'.format(response.json()['message'])) + return DCOSException('Error: {}'.format(response.json()['message'])) class Client(object): """Class for talking to the Marathon server. - :param marathon_uri: the base URI for the Marathon server - :type marathon_uri: str + :param marathon_url: the base URL for the Marathon server + :type marathon_url: str """ - def __init__(self, marathon_uri): - self._base_uri = marathon_uri + def __init__(self, marathon_url): + self._base_url = marathon_url min_version = "0.8.1" version = LooseVersion(self.get_about()["version"]) @@ -97,7 +97,7 @@ class Client(object): :rtype: str """ - return urllib.parse.urljoin(self._base_uri, path) + return urllib.parse.urljoin(self._base_url, path) def get_version(self): """Get marathon version @@ -116,7 +116,7 @@ class Client(object): url = self._create_url('v2/info') - response = http.get(url, to_error=_to_error) + response = http.get(url, to_exception=_to_exception) return response.json() @@ -139,7 +139,7 @@ class Client(object): url = self._create_url( 'v2/apps{}/versions/{}'.format(app_id, version)) - response = http.get(url, to_error=_to_error) + response = http.get(url, to_exception=_to_exception) # Looks like Marathon return different JSON for versions if version is None: @@ -156,7 +156,7 @@ class Client(object): url = self._create_url('v2/groups') - response = http.get(url, to_error=_to_error) + response = http.get(url, to_exception=_to_exception) return response.json()['groups'] @@ -179,7 +179,7 @@ class Client(object): url = self._create_url( 'v2/groups{}/versions/{}'.format(group_id, version)) - response = http.get(url, to_error=_to_error) + response = http.get(url, to_exception=_to_exception) return response.json() @@ -206,7 +206,7 @@ class Client(object): url = self._create_url('v2/apps{}/versions'.format(app_id)) - response = http.get(url, to_error=_to_error) + response = http.get(url, to_exception=_to_exception) if max_count is None: return response.json()['versions'] @@ -222,7 +222,7 @@ class Client(object): url = self._create_url('v2/apps') - response = http.get(url, to_error=_to_error) + response = http.get(url, to_exception=_to_exception) return response.json()['apps'] @@ -245,7 +245,7 @@ class Client(object): response = http.post(url, json=app_json, - to_error=_to_error) + to_exception=_to_exception) return response.json() @@ -276,7 +276,7 @@ class Client(object): response = http.put(url, params=params, json=payload, - to_error=_to_error) + to_exception=_to_exception) return response.json().get('deploymentId') @@ -335,7 +335,7 @@ class Client(object): response, error = http.put(url, params=params, json={'instances': int(instances)}, - to_error=_to_error) + to_exception=_to_exception) if error is not None: return (None, error) @@ -375,7 +375,7 @@ class Client(object): url = self._create_url('v2/apps{}'.format(app_id)) - http.delete(url, params=params, to_error=_to_error) + http.delete(url, params=params, to_exception=_to_exception) def remove_group(self, group_id, force=None): """Completely removes the requested application. @@ -396,7 +396,7 @@ class Client(object): url = self._create_url('v2/groups{}'.format(group_id)) - http.delete(url, params=params, to_error=_to_error) + http.delete(url, params=params, to_exception=_to_exception) def restart_app(self, app_id, force=None): """Performs a rolling restart of all of the tasks. @@ -420,7 +420,7 @@ class Client(object): response = http.post(url, params=params, - to_error=_to_error) + to_exception=_to_exception) return response.json() @@ -436,7 +436,7 @@ class Client(object): url = self._create_url('v2/deployments') response = http.get(url, - to_error=_to_error) + to_exception=_to_exception) deployment = next( (deployment for deployment in response.json() @@ -456,7 +456,7 @@ class Client(object): url = self._create_url('v2/deployments') - response = http.get(url, to_error=_to_error) + response = http.get(url, to_exception=_to_exception) if app_id is not None: app_id = self.normalize_app_id(app_id) @@ -493,7 +493,7 @@ class Client(object): response = http.delete( url, params=params, - to_error=_to_error) + to_exception=_to_exception) if force: return None @@ -532,7 +532,7 @@ class Client(object): url = self._create_url('v2/tasks') - response = http.get(url, to_error=_to_error) + response = http.get(url, to_exception=_to_exception) if app_id is not None: app_id = self.normalize_app_id(app_id) @@ -556,7 +556,7 @@ class Client(object): url = self._create_url('v2/tasks') - response = http.get(url, to_error=_to_error) + response = http.get(url, to_exception=_to_exception) task = next( (task for task in response.json()['tasks'] @@ -609,7 +609,7 @@ class Client(object): else: group_json = group_resource - response = http.post(url, json=group_json, to_error=_to_error) + response = http.post(url, json=group_json, to_exception=_to_exception) return response.json() @@ -623,5 +623,5 @@ def _default_marathon_error(message=""): """ return ("Marathon likely misconfigured. Please check your proxy or " - "Marathon URI settings. See dcos config --help. {}").format( + "Marathon URL settings. See dcos config --help. {}").format( message) diff --git a/dcos/mesos.py b/dcos/mesos.py index f6dfdcb..b4db8ea 100644 --- a/dcos/mesos.py +++ b/dcos/mesos.py @@ -1,5 +1,6 @@ import fnmatch import itertools +import os from dcos import http, util from dcos.errors import DCOSException @@ -8,93 +9,100 @@ from six.moves import urllib logger = util.get_logger(__name__) +MESOS_TIMEOUT = 5 -def get_master(config=None): - """Create a Master object using the URLs stored in the user's - configuration. - :param config: config +def get_master(): + """Create a Master object using the url stored in the + 'core.mesos_master_url' property if it exists. Otherwise, we use + the `core.dcos_url` property + + :param config: user config :type config: Toml :returns: master state object :rtype: Master """ - return Master(get_master_client(config).get_state()) + return Master(MesosClient().get_master_state()) -def get_master_client(config=None): - """Create a Mesos master client using the URLs stored in the user's - configuration. +class MesosClient: + """Client for communicating with the Mesos master""" - :param config: config - :type config: Toml - :returns: mesos master client - :rtype: MasterClient - """ - - if config is None: + def __init__(self): config = util.get_config() + self._dcos_url = None + self._mesos_master_url = None - mesos_url = _get_mesos_url(config) - return MasterClient(mesos_url) + mesos_master_url = config.get('core.mesos_master_url') + if mesos_master_url is None: + self._dcos_url = util.get_config_vals(config, ['core.dcos_url'])[0] + else: + self._mesos_master_url = mesos_master_url + def master_url(self, path): + """ Create a URL that hits the master -def _get_mesos_url(config): - """ - :param config: configuration - :type config: Toml - :returns: url for the Mesos master - :rtype: str - """ - - mesos_master_url = config.get('core.mesos_master_url') - if mesos_master_url is None: - dcos_url = util.get_config_vals(config, ['core.dcos_url'])[0] - return urllib.parse.urljoin(dcos_url, 'mesos/') - else: - return mesos_master_url - - -class MasterClient: - """Client for communicating with the Mesos master - - :param url: URL for the Mesos master - :type url: str - """ - - def __init__(self, url): - self._base_url = url - - def _create_url(self, path): - """Creates the url from the provided path. - - :param path: url path + :param path: the path suffix of the desired URL :type path: str - :returns: constructed url + :returns: URL that hits the master :rtype: str """ - return urllib.parse.urljoin(self._base_url, path) + base_url = (self._mesos_master_url or + urllib.parse.urljoin(self._dcos_url, 'mesos/')) + return urllib.parse.urljoin(base_url, path) - def get_state(self): + # TODO (mgummelt): this doesn't work with self._mesos_master_url + def slave_url(self, slave_id, path): + """ Create a URL that hits the slave + + :param slave_id: slave ID + :type slave_id: str + :param path: the path suffix of the desired URL + :type path: str + :returns: URL that hits the master + :rtype: str + """ + + return urllib.parse.urljoin(self._dcos_url, + 'slave/{}/{}'.format(slave_id, path)) + + def get_master_state(self): """Get the Mesos master state json object :returns: Mesos' master state json object :rtype: dict """ - return http.get(self._create_url('master/state.json')).json() + url = self.master_url('master/state.json') + return http.get(url).json() + + def get_slave_state(self, slave_id): + """Get the Mesos slave state json object + + :param slave_id: slave ID + :type slave_id: str + :returns: Mesos' master state json object + :rtype: dict + """ + + url = self.slave_url(slave_id, 'state.json') + return http.get(url).json() def shutdown_framework(self, framework_id): """Shuts down a Mesos framework + :param framework_id: ID of the framework to shutdown + :type framework_id: str :returns: None """ logger.info('Shutting down framework {}'.format(framework_id)) data = 'frameworkId={}'.format(framework_id) - http.post(self._create_url('master/shutdown'), data=data) + url = self.master_url('master/shutdown') + http.post(url, data=data) class Master(object): @@ -106,6 +114,8 @@ class Master(object): def __init__(self, state): self._state = state + self._frameworks = {} + self._slaves = {} def state(self): """Returns master's master/state.json. @@ -116,8 +126,22 @@ class Master(object): return self._state - def slave(self, fltr): + def slave_base_url(self, slave): + """Returns the base url of the provided slave object. + :param slave: slave to create a url for + :type slave: Slave + :returns: slave's base url + :rtype: str + """ + if self._mesos_master_url is not None: + slave_ip = slave['pid'].split('@')[1] + return 'http://{}'.format(slave_ip) + else: + return urllib.parse.urljoin(self._dcos_url, + 'slave/{}/'.format(slave['id'])) + + def slave(self, fltr): """Returns the slave that has `fltr` in its id. Raises a DCOSException if there is not exactly one such slave. @@ -141,21 +165,8 @@ class Master(object): else: return slaves[0] - def slaves(self, fltr=""): - """Returns those slaves that have `fltr` in their 'id' - - :param fltr: filter string - :type fltr: str - :returns: Those slaves that have `fltr` in their 'id' - :rtype: [Slave] - """ - - return [Slave(slave) - for slave in self.state()['slaves'] - if fltr in slave['id']] - def task(self, fltr): - """Returns the task with `fltr` in its id. Raises an exception if + """Returns the task with `fltr` in its id. Raises a DCOSException if there is not exactly one such task. :param fltr: filter string @@ -178,18 +189,43 @@ class Master(object): else: return tasks[0] - # TODO (thomas): need to filter on task state as well as id + def framework(self, framework_id): + """Returns a framework by id + + :param framework_id: the framework's id + :type framework_id: str + :returns: the framework + :rtype: Framework + """ + + for f in self._framework_dicts(True, True): + if f['id'] == framework_id: + return self._framework_obj(f) + return None + + def slaves(self, fltr=""): + """Returns those slaves that have `fltr` in their 'id' + + :param fltr: filter string + :type fltr: str + :returns: Those slaves that have `fltr` in their 'id' + :rtype: [Slave] + """ + + return [self._slave_obj(slave) + for slave in self.state()['slaves'] + if fltr in slave['id']] + def tasks(self, fltr="", completed=False): """Returns tasks running under the master - :param fltr: May be a substring or unix glob pattern. Only - return tasks whose 'id' matches `fltr`. + :param fltr: May be a substring or regex. Only return tasks + whose 'id' matches `fltr`. :type fltr: str :param completed: also include completed tasks :type completed: bool :returns: a list of tasks :rtype: [Task] - """ keys = ['tasks'] @@ -198,28 +234,13 @@ class Master(object): tasks = [] for framework in self._framework_dicts(completed, completed): - tasks += \ - [Task(task, self) - for task in _merge(framework, *keys) - if fltr in task['id'] or - fnmatch.fnmatchcase(task['id'], fltr)] + for task in _merge(framework, keys): + if fltr in task['id'] or fnmatch.fnmatchcase(task['id'], fltr): + task = self._framework_obj(framework).task(task['id']) + tasks.append(task) return tasks - def framework(self, framework_id): - """Returns a framework by id - - :param framework_id: the framework's id - :type framework_id: int - :returns: the framework - :rtype: Framework - """ - - for f in self._framework_dicts(inactive=True): - if f['id'] == framework_id: - return Framework(f) - raise DCOSException('No Framework with id [{}]'.format(framework_id)) - def frameworks(self, inactive=False, completed=False): """Returns a list of all frameworks @@ -231,8 +252,51 @@ class Master(object): :rtype: [Framework] """ - return [Framework(f) - for f in self._framework_dicts(inactive, completed)] + return [self._framework_obj(framework) + for framework in self._framework_dicts(inactive, completed)] + + @util.duration + def fetch(self, path, **kwargs): + """GET the resource located at `path` + + :param path: the URL path + :type path: str + :param **kwargs: http.get kwargs + :type **kwargs: dict + :returns: the response object + :rtype: Response + """ + + url = urllib.parse.urljoin(self._base_url(), path) + return http.get(url, timeout=MESOS_TIMEOUT, **kwargs) + + def _slave_obj(self, slave): + """Returns the Slave object corresponding to the provided `slave` + dict. Creates it if it doesn't exist already. + + :param slave: slave + :type slave: dict + :returns: Slave + :rtype: Slave + """ + + if slave['id'] not in self._slaves: + self._slaves[slave['id']] = Slave(slave, None, self) + return self._slaves[slave['id']] + + def _framework_obj(self, framework): + """Returns the Framework object corresponding to the provided `framework` + dict. Creates it if it doesn't exist already. + + :param framework: framework + :type framework: dict + :returns: Framework + :rtype: Framework + """ + + if framework['id'] not in self._frameworks: + self._frameworks[framework['id']] = Framework(framework, self) + return self._frameworks[framework['id']] def _framework_dicts(self, inactive=False, completed=False): """Returns a list of all frameworks as their raw dictionaries @@ -242,13 +306,12 @@ class Master(object): :param completed: also include completed frameworks :type completed: bool :returns: a list of frameworks - :rtype: [dict] """ keys = ['frameworks'] if completed: keys.append('completed_frameworks') - for framework in _merge(self.state(), *keys): + for framework in _merge(self.state(), keys): if inactive or framework['active']: yield framework @@ -256,16 +319,62 @@ class Master(object): class Slave(object): """Mesos Slave Model - :param slave: dictionary representing the slave. - retrieved from master/state.json - :type slave: dict + :param short_state: slave's entry from the master's state.json + :type short_state: dict + :param state: slave's state.json + :type state: dict + :param master: slave's master + :type master: Master """ - def __init__(self, slave): - self._slave = slave + def __init__(self, short_state, state, master): + self._short_state = short_state + self._state = state + self._master = master + + def state(self): + """Get the slave's state.json object. Fetch it if it's not already + an instance variable. + + :returns: This slave's state.json object + :rtype: dict + """ + + if not self._state: + self._state = MesosClient().get_slave_state(self['id']) + return self._state + + def _framework_dicts(self): + """Returns the framework dictionaries from the state.json dict + + :returns: frameworks + :rtype: [dict] + """ + + return _merge(self._state, ['frameworks', 'completed_frameworks']) + + def executor_dicts(self): + """Returns the executor dictionaries from the state.json + + :returns: executors + :rtype: [dict] + """ + + iters = [_merge(framework, ['executors', 'completed_executors']) + for framework in self._framework_dicts()] + return itertools.chain(*iters) def __getitem__(self, name): - return self._slave[name] + """Support the slave[attr] syntax + + :param name: attribute to get + :type name: str + :returns: the value for this attribute in the underlying + slave dictionary + :rtype: object + """ + + return self._short_state[name] class Framework(object): @@ -273,22 +382,61 @@ class Framework(object): :param framework: framework properties :type framework: dict + :param master: framework's master + :type master: Master """ - def __init__(self, framework): + def __init__(self, framework, master): self._framework = framework + self._master = master + self._tasks = {} # id->Task map + + def task(self, task_id): + """Returns a task by id + + :param task_id: the task's id + :type task_id: str + :returns: the task + :rtype: Task + """ + + for task in _merge(self._framework, ['tasks', 'completed_tasks']): + if task['id'] == task_id: + return self._task_obj(task) + return None + + def _task_obj(self, task): + """Returns the Task object corresponding to the provided `task` + dict. Creates it if it doesn't exist already. + + :param task: task + :type task: dict + :returns: Task + :rtype: Task + """ + + if task['id'] not in self._tasks: + self._tasks[task['id']] = Task(task, self._master) + return self._tasks[task['id']] def dict(self): return self._framework def __getitem__(self, name): + """Support the framework[attr] syntax + + :param name: attribute to get + :type name: str + :returns: the value for this attribute in the underlying + framework dictionary + :rtype: object + """ + return self._framework[name] class Task(object): - """Mesos Task Model. Created from the Task objects sent in master's - state.json, which is in turn created from mesos' Task protobuf - object. + """Mesos Task Model. :param task: task properties :type task: dict @@ -309,7 +457,7 @@ class Task(object): return self._task def framework(self): - """Returns the task's framework + """Returns this task's framework :returns: task's framework :rtype: Framework @@ -335,19 +483,228 @@ class Task(object): return self.framework()['user'] + def executor(self): + """ Returns this tasks' executor + + :returns: task's executor + :rtype: dict + """ + for executor in self.slave().executor_dicts(): + tasks = _merge(executor, + ['completed_tasks', + 'tasks', + 'queued_tasks']) + if any(task['id'] == self['id'] for task in tasks): + return executor + return None + + def directory(self): + """ Sandbox directory for this task + + :returns: path to task's sandbox + :rtype: str + """ + return self.executor()['directory'] + def __getitem__(self, name): + """Support the task[attr] syntax + + :param name: attribute to get + :type name: str + :returns: the value for this attribute in the underlying + task dictionary + :rtype: object + """ + return self._task[name] -def _merge(d, *keys): +class MesosFile(object): + """File-like object that is backed by a remote slave file. Uses the + files/read.json endpoint. This endpoint isn't well documented + anywhere, so here is the spec derived from the mesos source code: + + request format: + { + path: absolute path to read + offset: start byte location, or -1. -1 means read no data, and + is used to fetch the size of the file in the response's + 'offset' parameter. + length: number of bytes to read, or -1. -1 means read the whole file. + } + + response format: + { + data: file data. Empty if a request.offset=-1. Could be + smaller than request.length if EOF was reached, or if (I + believe) request.length is larger than the length + supported by the server (16 pages I believe). + + offset: the offset value from the request, or the size of the + file if the request offset was -1 or >= the file size. + } + + :param task: file's task + :type task: Task + :param path: file's path, relative to the sandbox + :type path: str + """ + + def __init__(self, task, path, mesos_client): + self._task = task + self._path = path + self._mesos_client = mesos_client + self._cursor = 0 + + def size(self): + """Size of the file + + :returns: size of the file + :rtype: int + """ + + params = self._params(0, offset=-1) + return self._fetch(params)["offset"] + + def seek(self, offset, whence=os.SEEK_SET): + """Seek to the provided location in the file. + + :param offset: location to seek to + :type offset: int + :param whence: determines whether `offset` represents a + location that is absolute, relative to the + beginning of the file, or relative to the end + of the file + :type whence: os.SEEK_SET | os.SEEK_CUR | os.SEEK_END + :returns: None + :rtype: None + """ + + if whence == os.SEEK_SET: + self._cursor = 0 + offset + elif whence == os.SEEK_CUR: + self._cursor += offset + elif whence == os.SEEK_END: + self._cursor = self.size() + offset + else: + raise ValueError( + "Unexpected value for `whence`: {}".format(whence)) + + def tell(self): + """ The current cursor position. + + :returns: the current cursor position + :rtype: int + """ + + return self._cursor + + def read(self, length=None): + """Reads up to `length` bytes, or the entire file if `length` is None. + + :param length: number of bytes to read + :type length: int | None + :returns: data read + :rtype: str + """ + + data = '' + while length is None or length - len(data) > 0: + chunk_length = -1 if length is None else length - len(data) + chunk = self._fetch_chunk(chunk_length) + if chunk == '': + break + data += chunk + + return data + + def _host_path(self): + """ The absolute path to the file on slave. + + :returns: the absolute path to the file on slave + :rtype: str + """ + + directory = self._task.directory() + if directory[-1] == '/': + return directory + self._path + else: + return directory + '/' + self._path + + def _params(self, length, offset=None): + """GET parameters to send to files/read.json. See the MesosFile + docstring for full information. + + :param length: number of bytes to read + :type length: int + :param offset: start location. if None, will use the location + of the current file cursor + :type offset: int + :returns: GET parameters + :rtype: dict + """ + + if offset is None: + offset = self._cursor + + return { + 'path': self._host_path(), + 'offset': offset, + 'length': length + } + + def _fetch_chunk(self, length, offset=None): + """Fetch data from files/read.json + + :param length: number of bytes to fetch + :type length: int + :param offset: start location. If not None, this file's + cursor is set to `offset` + :type offset: int + :returns: data read + :rtype: str + """ + + if offset is not None: + self.seek(offset, os.SEEK_SET) + + params = self._params(length) + data = self._fetch(params)["data"] + self.seek(len(data), os.SEEK_CUR) + return data + + def _fetch(self, params): + """Fetch data from files/read.json + + :param params: GET parameters + :type params: dict + :returns: response dict + :rtype: dict + """ + + read_url = self._mesos_client.slave_url(self._task.slave()['id'], + 'files/read.json') + return http.get(read_url, params=params).json() + + def __str__(self): + """String representation of the file: + + :returns: string representation of the file + :rtype: str + """ + + return "{0}:{1}".format(self._task['id'], self._path) + + +def _merge(d, keys): """ Merge multiple lists from a dictionary into one iterator. e.g. _merge({'a': [1, 2], 'b': [3]}, ['a', 'b']) -> iter(1, 2, 3) :param d: dictionary :type d: dict - :param *keys: keys to merge - :type *keys: [hashable] + :param keys: keys to merge + :type keys: [hashable] :returns: iterator :rtype: iter """ diff --git a/dcos/package.py b/dcos/package.py index c40595b..c8e3227 100644 --- a/dcos/package.py +++ b/dcos/package.py @@ -165,7 +165,7 @@ def uninstall(package_name, remove_all, app_id, cli, app): remove_all, app_id, marathon.create_client(), - mesos.get_master_client()) + mesos.MesosClient()) if num_apps > 0: uninstalled = True @@ -192,7 +192,7 @@ def uninstall_subcommand(distribution_name): return subcommand.uninstall(distribution_name) -def uninstall_app(app_name, remove_all, app_id, init_client, master_client): +def uninstall_app(app_name, remove_all, app_id, init_client, mesos_client): """Uninstalls an app. :param app_name: The app to uninstall @@ -203,8 +203,8 @@ def uninstall_app(app_name, remove_all, app_id, init_client, master_client): :type app_id: str :param init_client: The program to use to run the app :type init_client: object - :param master_client: the mesos master client - :type master_client: dcos.mesos.MasterClient + :param mesos_client: the mesos client + :type mesos_client: dcos.mesos.MesosClient :returns: number of apps uninstalled :rtype: int """ @@ -245,8 +245,8 @@ def uninstall_app(app_name, remove_all, app_id, init_client, master_client): if framework_name is not None: logger.info( 'Trying to shutdown framework {}'.format(framework_name)) - frameworks = mesos.Master(master_client.get_state()).frameworks( - inactive=True) + frameworks = mesos.Master(mesos_client.get_master_state()) \ + .frameworks(inactive=True) # Look up all the framework names framework_ids = [ @@ -259,7 +259,7 @@ def uninstall_app(app_name, remove_all, app_id, init_client, master_client): 'Found the following frameworks: {}'.format(framework_ids)) if len(framework_ids) == 1: - master_client.shutdown_framework(framework_ids[0]) + mesos_client.shutdown_framework(framework_ids[0]) elif len(framework_ids) > 1: raise DCOSException( "Unable to shutdown the framework for [{}] because there " diff --git a/dcos/util.py b/dcos/util.py index c2359d9..ee6bdf7 100644 --- a/dcos/util.py +++ b/dcos/util.py @@ -107,7 +107,6 @@ def get_config(): :rtype: Toml """ - # avoid circular import from dcos import config return config.load_from_path(