dcos tail

This commit is contained in:
Michael Gummelt
2015-05-07 17:47:38 -07:00
parent a059e75e4e
commit 56d21c11ef
20 changed files with 1469 additions and 523 deletions

View File

@@ -1,4 +1,4 @@
"""Get the status of DCOS services
"""Manage DCOS services
Usage:
dcos service --info
@@ -121,5 +121,5 @@ def _shutdown(service_id):
:rtype: int
"""
mesos.get_master_client().shutdown_framework(service_id)
mesos.MesosClient().shutdown_framework(service_id)
return 0

View File

@@ -1,26 +1,36 @@
"""Get the status of DCOS tasks
"""Manage DCOS tasks
Usage:
dcos task --info
dcos task [--completed --json <task>]
dcos task log [--completed --follow --lines=N] <task> [<file>]
Options:
-h, --help Show this screen
--info Show a short description of this subcommand
--completed Include completed tasks as well
--follow Output data as the file grows
--json Print json-formatted tasks
--completed Show completed tasks as well
--lines=N Output the last N lines [default: 10]
--version Show version
Positional Arguments:
<task> Only match tasks whose ID matches <task>. <task> may be
a substring of the ID, or a unix glob pattern.
<file> Output this file. [default: stdout]
"""
import functools
import sys
import time
import concurrent.futures
import dcoscli
import docopt
from dcos import cmds, emitting, mesos, util
from dcos.errors import DCOSException
from dcos.errors import DCOSException, DefaultError
from dcoscli import tables
logger = util.get_logger(__name__)
@@ -57,6 +67,12 @@ def _cmds():
arg_keys=[],
function=_info),
cmds.Command(
hierarchy=['task', 'log'],
arg_keys=['--follow', '--completed', '--lines', '<task>',
'<file>'],
function=_log),
cmds.Command(
hierarchy=['task'],
arg_keys=['<task>', '--completed', '--json'],
@@ -86,7 +102,6 @@ def _task(fltr, completed, json_):
readable table.
:type json_: bool
:returns: process return code
"""
if fltr is None:
@@ -104,3 +119,293 @@ def _task(fltr, completed, json_):
emitter.publish(output)
return 0
NO_FILE_EXCEPTION = DCOSException('No files exist. Exiting.')
def _log(follow, completed, lines, task, path):
""" Tail a file in the task's sandbox.
:param follow: same as unix tail's -f
:type follow: bool
:param completed: whether to include completed tasks
:type completed: bool
:param lines: number of lines to print
:type lines: int
:param task: task pattern to match
:type task: str
:param path: file path to read
:type path: str
:returns: process return code
:rtype: int
"""
if task is None:
fltr = ""
else:
fltr = task
if path is None:
path = 'stdout'
lines = int(lines)
mesos_files = _mesos_files(completed, fltr, path)
if not mesos_files:
raise DCOSException('No matching tasks. Exiting.')
fn = functools.partial(_read_last_lines, lines)
curr_header, mesos_files = _stream_files(None, fn, mesos_files)
if not mesos_files:
raise NO_FILE_EXCEPTION
while follow:
# This flush is needed only for testing, since stdout is fully
# buffered (as opposed to line-buffered) when redirected to a
# pipe. So if we don't flush, our --follow tests, which use a
# pipe, never see the data
sys.stdout.flush()
curr_header, mesos_files = _stream_files(curr_header,
_read_rest,
mesos_files)
if not mesos_files:
raise NO_FILE_EXCEPTION
time.sleep(1)
return 0
def _mesos_files(completed, fltr, path):
"""Return MesosFile objects for the specified files. Only include
files that satisfy all of the following:
a) belong to an available slave
b) have an executor entry on the slave
:param completed: whether to include completed tasks
:type completed: bool
:param fltr: task pattern to match
:type fltr: str
:param path: file path to read
:type path: str
:returns: MesosFile objects
:rtype: [MesosFile]
"""
# get tasks
client = mesos.MesosClient()
master = mesos.Master(client.get_master_state())
tasks = master.tasks(completed=completed, fltr=fltr)
# load slave state in parallel
slaves = _load_slaves_state([task.slave() for task in tasks])
# some completed tasks may have entries on the master, but none on
# the slave. since we need the slave entry to get the executor
# sandbox, we only include files with an executor entry.
available_tasks = [task for task in tasks
if task.slave() in slaves and task.executor()]
# create files.
return [mesos.MesosFile(task, path, client)
for task in available_tasks]
def _load_slaves_state(slaves):
"""Fetch each slave's state.json in parallel, and return the reachable
slaves.
:param slaves: slaves to fetch
:type slaves: [MesosSlave]
:returns: MesosSlave objects that were successfully reached
:rtype: [MesosSlave]
"""
reachable_slaves = []
for job, slave in _stream(lambda slave: slave.state(), slaves):
try:
job.result()
reachable_slaves.append(slave)
except DCOSException as e:
emitter.publish(
DefaultError('Error accessing slave: {0}'.format(e)))
return reachable_slaves
def _stream_files(curr_header, fn, mesos_files):
"""Apply `fn` in parallel to each file in `mesos_files`. `fn` must
return a list of strings, and these strings are then printed
serially as separate lines.
`curr_header` is the most recently printed header. It's used to
group lines. Each line has an associated header (e.g. a string
representation of the MesosFile it was read from), and we only
print the header before printing a line with a different header
than the previous line. This effectively groups lines together
when the have the same header.
:param curr_header: Most recently printed header
:type curr_header: str
:param fn: function that reads a sequence of lines from a MesosFile
:type fn: MesosFile -> [str]
:param mesos_files: files to read
:type mesos_files: [MesosFile]
:returns: Returns the most recently printed header, and a list of
files that are still reachable. Once we detect a file is
unreachable, we stop trying to read from it.
:rtype: (str, [MesosFile])
"""
reachable_files = list(mesos_files)
# TODO switch to map
for job, mesos_file in _stream(fn, mesos_files):
try:
lines = job.result()
except DCOSException as e:
# The read function might throw an exception if read.json
# is unavailable, or if the file doesn't exist in the
# sandbox. In any case, we silently remove the file and
# continue.
logger.warning("Error reading file: {}".format(e))
reachable_files.remove(mesos_file)
continue
curr_header = _output(curr_header,
len(reachable_files) > 1,
str(mesos_file),
lines)
return curr_header, reachable_files
def _output(curr_header, output_header, header, lines):
"""Prints a sequence of lines. If `header` is different than
`curr_header`, first print the header.
:param curr_header: most recently printed header
:type curr_header: str
:param output_header: whether or not to output the header
:type output_header: bool
:param header: header for `lines`
:type header: str
:param lines: lines to print
:type lines: [str]
:returns: `header`
:rtype: str
"""
if lines:
if output_header and header != curr_header:
emitter.publish('===> {} <==='.format(header))
for line in lines:
emitter.publish(line)
return header
STREAM_CONCURRENCY = 20
def _stream(fn, objs):
"""Apply `fn` to `objs` in parallel, yielding the (Future, obj) for
each as it completes.
:param fn: function
:type fn: function
:param objs: objs
:type objs: objs
:returns: iterator over (Future, typeof(obj))
:rtype: iterator over (Future, typeof(obj))
"""
with concurrent.futures.ThreadPoolExecutor(STREAM_CONCURRENCY) as pool:
jobs = {pool.submit(fn, obj): obj for obj in objs}
for job in concurrent.futures.as_completed(jobs):
yield job, jobs[job]
# A liberal estimate of a line size. Used to estimate how much data
# we need to fetch from a file when we want to read N lines.
LINE_SIZE = 200
def _read_last_lines(num_lines, mesos_file):
"""Returns the last `num_lines` of a file, or less if the file is
smaller. Seeks to EOF.
:param num_lines: number of lines to read
:type num_lines: int
:param mesos_file: file to read
:type mesos_file: MesosFile
:returns: lines read
:rtype: [str]
"""
file_size = mesos_file.size()
# estimate how much data we need to fetch to read `num_lines`.
fetch_size = LINE_SIZE * num_lines
end = file_size
start = max(file_size - fetch_size, 0)
data = ''
while True:
# fetch data
mesos_file.seek(start)
data = mesos_file.read(end - start) + data
# break if we have enough lines
data_tmp = _strip_trailing_newline(data)
lines = data_tmp.split('\n')
if len(lines) > num_lines:
ret = lines[-num_lines:]
break
elif start == 0:
ret = lines
break
# otherwise shift our read window and repeat
end = start
start = max(file_size - fetch_size, 0)
mesos_file.seek(file_size)
return ret
def _read_rest(mesos_file):
""" Reads the rest of the file, and returns the lines.
:param mesos_file: file to read
:type mesos_file: MesosFile
:returns: lines read
:rtype: [str]
"""
data = mesos_file.read()
if data == '':
return []
else:
data_tmp = _strip_trailing_newline(data)
return data_tmp.split('\n')
def _strip_trailing_newline(s):
"""Returns a modified version of the string with the last character
truncated if it's a newline.
:param s: string to trim
:type s: str
:returns: modified string
:rtype: str
"""
if s == "":
return s
else:
return s[:-1] if s[-1] == '\n' else s

View File

@@ -95,8 +95,8 @@ setup(
'dcos-config=dcoscli.config.main:main',
'dcos-marathon=dcoscli.marathon.main:main',
'dcos-package=dcoscli.package.main:main',
'dcos-task=dcoscli.task.main:main',
'dcos-service=dcoscli.service.main:main',
'dcos-task=dcoscli.task.main:main',
],
},

View File

@@ -0,0 +1,7 @@
{
"id": "follow",
"cmd": "sleep 5 && echo \"follow_this\" && sleep 1000",
"cpus": 0.1,
"mem": 16,
"instances": 1
}

View File

@@ -0,0 +1,7 @@
{
"id": "two-tasks",
"cmd": "sleep 1000",
"cpus": 0.1,
"mem": 16,
"instances": 2
}

View File

@@ -0,0 +1,7 @@
{
"id": "two-tasks-follow",
"cmd": "sleep 5 && echo \"follow_this\" && sleep 1000",
"cpus": 0.1,
"mem": 16,
"instances": 2
}

View File

@@ -0,0 +1,11 @@
{
"id": "test-app2",
"cmd": "sleep 1000",
"cpus": 0.1,
"mem": 16,
"instances": 1,
"labels": {
"PACKAGE_ID": "test-app",
"PACKAGE_VERSION": "1.2.3"
}
}

View File

@@ -43,4 +43,4 @@ def framework_fixture():
},
"user": "root",
"webui_url": "http://mesos:8080"
})
}, None)

View File

@@ -32,6 +32,6 @@ def task_fixture():
}, None)
task.user = mock.Mock(return_value='root')
slave = Slave({"hostname": "mock-hostname"})
slave = Slave({"hostname": "mock-hostname"}, None, None)
task.slave = mock.Mock(return_value=slave)
return task

View File

@@ -1,11 +1,15 @@
import collections
import contextlib
import json
import os
import subprocess
import sys
import requests
import six
from dcos import util
import mock
from six.moves import urllib
@@ -80,6 +84,58 @@ def assert_command(
assert stderr_ == stderr
def exec_mock(main, args):
"""Call a main function with sys.args mocked, and capture
stdout/stderr
:param main: main function to call
:type main: function
:param args: sys.args to mock, excluding the initial 'dcos'
:type args: [str]
:returns: (returncode, stdout, stderr)
:rtype: (int, bytes, bytes)
"""
print('MOCK ARGS: {}'.format(' '.join(args)))
with mock_args(args) as (stdout, stderr):
returncode = main()
stdout_val = six.b(stdout.getvalue())
stderr_val = six.b(stderr.getvalue())
print('STDOUT: {}'.format(stdout_val))
print('STDERR: {}'.format(stderr_val))
return (returncode, stdout_val, stderr_val)
def assert_mock(main,
args,
returncode=0,
stdout=b'',
stderr=b''):
"""Mock and call a main function, and assert expected behavior.
:param main: main function to call
:type main: function
:param args: sys.args to mock, excluding the initial 'dcos'
:type args: [str]
:type returncode: int
:param stdout: Expected stdout
:type stdout: str
:param stderr: Expected stderr
:type stderr: str
:rtype: None
"""
returncode_, stdout_, stderr_ = exec_mock(main, args)
assert returncode_ == returncode
assert stdout_ == stdout
assert stderr_ == stderr
def mock_called_some_args(mock, *args, **kwargs):
"""Convience method for some mock assertions. Returns True if the
arguments to one of the calls of `mock` contains `args` and
@@ -138,6 +194,57 @@ def watch_all_deployments(count=300):
watch_deployment(dep['id'], count)
def add_app(app_path, deploy=False):
""" Add an app, and wait for it to deploy
:param app_path: path to app's json definition
:type app_path: str
:rtype: None
"""
assert_command(['dcos', 'marathon', 'app', 'add', app_path])
if deploy:
watch_all_deployments()
def remove_app(app_id):
""" Remove an app
:param app_id: id of app to remove
:type app_id: str
:rtype: None
"""
assert_command(['dcos', 'marathon', 'app', 'remove', app_id])
def get_services(expected_count=None, args=[]):
"""Get services
:param expected_count: assert exactly this number of services are
running
:type expected_count: int | None
:param args: cli arguments
:type args: [str]
:returns: services
:rtype: [dict]
"""
returncode, stdout, stderr = exec_command(
['dcos', 'service', '--json'] + args)
assert returncode == 0
assert stderr == b''
services = json.loads(stdout.decode('utf-8'))
assert isinstance(services, collections.Sequence)
if expected_count is not None:
assert len(services) == expected_count
return services
def list_deployments(expected_count=None, app_id=None):
"""Get all active deployments.
@@ -166,29 +273,6 @@ def list_deployments(expected_count=None, app_id=None):
return result
def get_services(expected_count=None, args=[]):
"""Get services
:param expected_count: assert exactly this number of services are
running
:type expected_count: int | None
:param args: cli arguments
:type args: [str]
:returns: services
:rtype: [dict]
"""
returncode, stdout, stderr = exec_command(
['dcos', 'service', '--json'] + args)
services = json.loads(stdout.decode('utf-8'))
assert isinstance(services, collections.Sequence)
if expected_count is not None:
assert len(services) == expected_count
return services
def show_app(app_id, version=None):
"""Show details of a Marathon application.
@@ -274,3 +358,39 @@ def file_bytes(path):
with open(path) as f:
return six.b(f.read())
@contextlib.contextmanager
def app(path, app_id, deploy=False):
"""Context manager that deploys an app on entrance, and removes it on
exit.
:param path: path to app's json definition:
:type path: str
:param app_id: app id
:type app_id: str
:rtype: None
"""
add_app(path, deploy)
try:
yield
finally:
remove_app(app_id)
@contextlib.contextmanager
def mock_args(args):
""" Context manager that mocks sys.args and captures stdout/stderr
:param args: sys.args values to mock
:type args: [str]
:rtype: None
"""
with mock.patch('sys.argv', [util.which('dcos')] + args):
stdout, stderr = sys.stdout, sys.stderr
sys.stdout, sys.stderr = six.StringIO(), six.StringIO()
try:
yield sys.stdout, sys.stderr
finally:
sys.stdout, sys.stderr = stdout, stderr

View File

@@ -21,8 +21,8 @@ Available DCOS commands:
\thelp \tDisplay command line usage information
\tmarathon \tDeploy and manage applications on the DCOS
\tpackage \tInstall and manage DCOS software packages
\tservice \tGet the status of DCOS services
\ttask \tGet the status of DCOS tasks
\tservice \tManage DCOS services
\ttask \tManage DCOS tasks
Get detailed command description with 'dcos <command> --help'.
""".encode('utf-8')

View File

@@ -40,8 +40,8 @@ Available DCOS commands:
\thelp \tDisplay command line usage information
\tmarathon \tDeploy and manage applications on the DCOS
\tpackage \tInstall and manage DCOS software packages
\tservice \tGet the status of DCOS services
\ttask \tGet the status of DCOS tasks
\tservice \tManage DCOS services
\ttask \tManage DCOS tasks
Get detailed command description with 'dcos <command> --help'.
""".encode('utf-8')

View File

@@ -1,3 +1,4 @@
import contextlib
import json
import os
@@ -5,7 +6,7 @@ from dcos import constants
import pytest
from .common import (assert_command, assert_lines, exec_command,
from .common import (app, assert_command, assert_lines, exec_command,
list_deployments, show_app, watch_all_deployments,
watch_deployment)
@@ -157,22 +158,18 @@ def test_empty_list():
def test_add_app():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_list_apps('zero-instance-app')
_remove_app('zero-instance-app')
with _zero_instance_app():
_list_apps('zero-instance-app')
def test_add_app_with_filename():
assert_command(['dcos', 'marathon', 'app', 'add',
'tests/data/marathon/apps/zero_instance_sleep.json'])
_list_apps('zero-instance-app')
_remove_app('zero-instance-app')
with _zero_instance_app():
_list_apps('zero-instance-app')
def test_remove_app():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_remove_app('zero-instance-app')
with _zero_instance_app():
pass
_list_apps()
@@ -188,108 +185,94 @@ def test_add_bad_json_app():
def test_add_existing_app():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
with open('tests/data/marathon/apps/zero_instance_sleep_v2.json') as fd:
stderr = b"Application '/zero-instance-app' already exists\n"
assert_command(['dcos', 'marathon', 'app', 'add'],
returncode=1,
stderr=stderr,
stdin=fd)
_remove_app('zero-instance-app')
with _zero_instance_app():
app_path = 'tests/data/marathon/apps/zero_instance_sleep_v2.json'
with open(app_path) as fd:
stderr = b"Application '/zero-instance-app' already exists\n"
assert_command(['dcos', 'marathon', 'app', 'add'],
returncode=1,
stderr=stderr,
stdin=fd)
def test_show_app():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
show_app('zero-instance-app')
_remove_app('zero-instance-app')
with _zero_instance_app():
show_app('zero-instance-app')
def test_show_absolute_app_version():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_update_app(
'zero-instance-app',
'tests/data/marathon/apps/update_zero_instance_sleep.json')
with _zero_instance_app():
_update_app(
'zero-instance-app',
'tests/data/marathon/apps/update_zero_instance_sleep.json')
result = show_app('zero-instance-app')
show_app('zero-instance-app', result['version'])
_remove_app('zero-instance-app')
result = show_app('zero-instance-app')
show_app('zero-instance-app', result['version'])
def test_show_relative_app_version():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_update_app(
'zero-instance-app',
'tests/data/marathon/apps/update_zero_instance_sleep.json')
show_app('zero-instance-app', "-1")
_remove_app('zero-instance-app')
with _zero_instance_app():
_update_app(
'zero-instance-app',
'tests/data/marathon/apps/update_zero_instance_sleep.json')
show_app('zero-instance-app', "-1")
def test_show_missing_relative_app_version():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_update_app(
'zero-instance-app',
'tests/data/marathon/apps/update_zero_instance_sleep.json')
with _zero_instance_app():
_update_app(
'zero-instance-app',
'tests/data/marathon/apps/update_zero_instance_sleep.json')
stderr = b"Application 'zero-instance-app' only has 2 version(s).\n"
assert_command(['dcos', 'marathon', 'app', 'show',
'--app-version=-2', 'zero-instance-app'],
returncode=1,
stderr=stderr)
_remove_app('zero-instance-app')
stderr = b"Application 'zero-instance-app' only has 2 version(s).\n"
assert_command(['dcos', 'marathon', 'app', 'show',
'--app-version=-2', 'zero-instance-app'],
returncode=1,
stderr=stderr)
def test_show_missing_absolute_app_version():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_update_app(
'zero-instance-app',
'tests/data/marathon/apps/update_zero_instance_sleep.json')
with _zero_instance_app():
_update_app(
'zero-instance-app',
'tests/data/marathon/apps/update_zero_instance_sleep.json')
returncode, stdout, stderr = exec_command(
['dcos', 'marathon', 'app', 'show',
'--app-version=2000-02-11T20:39:32.972Z', 'zero-instance-app'])
returncode, stdout, stderr = exec_command(
['dcos', 'marathon', 'app', 'show',
'--app-version=2000-02-11T20:39:32.972Z', 'zero-instance-app'])
assert returncode == 1
assert stdout == b''
assert stderr.decode('utf-8').startswith(
"Error: App '/zero-instance-app' does not exist")
_remove_app('zero-instance-app')
assert returncode == 1
assert stdout == b''
assert stderr.decode('utf-8').startswith(
"Error: App '/zero-instance-app' does not exist")
def test_show_bad_app_version():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_update_app(
'zero-instance-app',
'tests/data/marathon/apps/update_zero_instance_sleep.json')
with _zero_instance_app():
_update_app(
'zero-instance-app',
'tests/data/marathon/apps/update_zero_instance_sleep.json')
stderr = (b'Error: Invalid format: "20:39:32.972Z" is malformed at '
b'":39:32.972Z"\n')
assert_command(
['dcos', 'marathon', 'app', 'show', '--app-version=20:39:32.972Z',
'zero-instance-app'],
returncode=1,
stderr=stderr)
_remove_app('zero-instance-app')
stderr = (b'Error: Invalid format: "20:39:32.972Z" is malformed at '
b'":39:32.972Z"\n')
assert_command(
['dcos', 'marathon', 'app', 'show', '--app-version=20:39:32.972Z',
'zero-instance-app'],
returncode=1,
stderr=stderr)
def test_show_bad_relative_app_version():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_update_app(
'zero-instance-app',
'tests/data/marathon/apps/update_zero_instance_sleep.json')
with _zero_instance_app():
_update_app(
'zero-instance-app',
'tests/data/marathon/apps/update_zero_instance_sleep.json')
assert_command(
['dcos', 'marathon', 'app', 'show',
'--app-version=2', 'zero-instance-app'],
returncode=1,
stderr=b"Relative versions must be negative: 2\n")
_remove_app('zero-instance-app')
assert_command(
['dcos', 'marathon', 'app', 'show',
'--app-version=2', 'zero-instance-app'],
returncode=1,
stderr=b"Relative versions must be negative: 2\n")
def test_start_missing_app():
@@ -300,21 +283,20 @@ def test_start_missing_app():
def test_start_app():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_start_app('zero-instance-app')
_remove_app('zero-instance-app')
with _zero_instance_app():
_start_app('zero-instance-app')
def test_start_already_started_app():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_start_app('zero-instance-app')
with _zero_instance_app():
_start_app('zero-instance-app')
stdout = b"Application 'zero-instance-app' already started: 1 instances.\n"
assert_command(['dcos', 'marathon', 'app', 'start', 'zero-instance-app'],
returncode=1,
stdout=stdout)
_remove_app('zero-instance-app')
stdout = (b"Application 'zero-instance-app' already "
b"started: 1 instances.\n")
assert_command(
['dcos', 'marathon', 'app', 'start', 'zero-instance-app'],
returncode=1,
stdout=stdout)
def test_stop_missing_app():
@@ -324,30 +306,27 @@ def test_stop_missing_app():
def test_stop_app():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_start_app('zero-instance-app', 3)
result = list_deployments(1, 'zero-instance-app')
watch_deployment(result[0]['id'], 60)
with _zero_instance_app():
_start_app('zero-instance-app', 3)
result = list_deployments(1, 'zero-instance-app')
watch_deployment(result[0]['id'], 60)
returncode, stdout, stderr = exec_command(
['dcos', 'marathon', 'app', 'stop', 'zero-instance-app'])
returncode, stdout, stderr = exec_command(
['dcos', 'marathon', 'app', 'stop', 'zero-instance-app'])
assert returncode == 0
assert stdout.decode().startswith('Created deployment ')
assert stderr == b''
_remove_app('zero-instance-app')
assert returncode == 0
assert stdout.decode().startswith('Created deployment ')
assert stderr == b''
def test_stop_already_stopped_app():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
stdout = b"Application 'zero-instance-app' already stopped: 0 instances.\n"
assert_command(['dcos', 'marathon', 'app', 'stop', 'zero-instance-app'],
returncode=1,
stdout=stdout)
_remove_app('zero-instance-app')
with _zero_instance_app():
stdout = (b"Application 'zero-instance-app' already "
b"stopped: 0 instances.\n")
assert_command(
['dcos', 'marathon', 'app', 'stop', 'zero-instance-app'],
returncode=1,
stdout=stdout)
def test_update_missing_app():
@@ -357,70 +336,57 @@ def test_update_missing_app():
def test_update_missing_field():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
with _zero_instance_app():
returncode, stdout, stderr = exec_command(
['dcos', 'marathon', 'app', 'update',
'zero-instance-app', 'missing="a string"'])
returncode, stdout, stderr = exec_command(
['dcos', 'marathon', 'app', 'update',
'zero-instance-app', 'missing="a string"'])
assert returncode == 1
assert stdout == b''
assert stderr.decode('utf-8').startswith(
"The property 'missing' does not conform to the expected format. "
"Possible values are: ")
_remove_app('zero-instance-app')
assert returncode == 1
assert stdout == b''
assert stderr.decode('utf-8').startswith(
"The property 'missing' does not conform to the expected format. "
"Possible values are: ")
def test_update_bad_type():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
with _zero_instance_app():
returncode, stdout, stderr = exec_command(
['dcos', 'marathon', 'app', 'update',
'zero-instance-app', 'cpus="a string"'])
returncode, stdout, stderr = exec_command(
['dcos', 'marathon', 'app', 'update',
'zero-instance-app', 'cpus="a string"'])
assert returncode == 1
assert stderr.decode('utf-8').startswith(
"Unable to parse 'a string' as a float: could not convert string to "
"float: ")
assert stdout == b''
_remove_app('zero-instance-app')
assert returncode == 1
assert stderr.decode('utf-8').startswith(
"Unable to parse 'a string' as a float: could not convert string "
"to float: ")
assert stdout == b''
def test_update_app():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
with _zero_instance_app():
returncode, stdout, stderr = exec_command(
['dcos', 'marathon', 'app', 'update', 'zero-instance-app',
'cpus=1', 'mem=20', "cmd='sleep 100'"])
returncode, stdout, stderr = exec_command(
['dcos', 'marathon', 'app', 'update', 'zero-instance-app',
'cpus=1', 'mem=20', "cmd='sleep 100'"])
assert returncode == 0
assert stdout.decode().startswith('Created deployment ')
assert stderr == b''
_remove_app('zero-instance-app')
assert returncode == 0
assert stdout.decode().startswith('Created deployment ')
assert stderr == b''
def test_update_app_from_stdin():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_update_app(
'zero-instance-app',
'tests/data/marathon/apps/update_zero_instance_sleep.json')
_remove_app('zero-instance-app')
with _zero_instance_app():
_update_app(
'zero-instance-app',
'tests/data/marathon/apps/update_zero_instance_sleep.json')
def test_restarting_stopped_app():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
stdout = (b"Unable to perform rolling restart of application '"
b"/zero-instance-app' because it has no running tasks\n")
assert_command(['dcos', 'marathon', 'app', 'restart', 'zero-instance-app'],
returncode=1,
stdout=stdout)
_remove_app('zero-instance-app')
with _zero_instance_app():
stdout = (b"Unable to perform rolling restart of application '"
b"/zero-instance-app' because it has no running tasks\n")
assert_command(
['dcos', 'marathon', 'app', 'restart', 'zero-instance-app'],
returncode=1,
stdout=stdout)
def test_restarting_missing_app():
@@ -430,19 +396,17 @@ def test_restarting_missing_app():
def test_restarting_app():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_start_app('zero-instance-app', 3)
result = list_deployments(1, 'zero-instance-app')
watch_deployment(result[0]['id'], 60)
with _zero_instance_app():
_start_app('zero-instance-app', 3)
result = list_deployments(1, 'zero-instance-app')
watch_deployment(result[0]['id'], 60)
returncode, stdout, stderr = exec_command(
['dcos', 'marathon', 'app', 'restart', 'zero-instance-app'])
returncode, stdout, stderr = exec_command(
['dcos', 'marathon', 'app', 'restart', 'zero-instance-app'])
assert returncode == 0
assert stdout.decode().startswith('Created deployment ')
assert stderr == b''
_remove_app('zero-instance-app')
assert returncode == 0
assert stdout.decode().startswith('Created deployment ')
assert stderr == b''
def test_list_version_missing_app():
@@ -460,28 +424,24 @@ def test_list_version_negative_max_count():
def test_list_version_app():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_list_versions('zero-instance-app', 1)
with _zero_instance_app():
_list_versions('zero-instance-app', 1)
_update_app(
'zero-instance-app',
'tests/data/marathon/apps/update_zero_instance_sleep.json')
_list_versions('zero-instance-app', 2)
_remove_app('zero-instance-app')
_update_app(
'zero-instance-app',
'tests/data/marathon/apps/update_zero_instance_sleep.json')
_list_versions('zero-instance-app', 2)
def test_list_version_max_count():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_update_app(
'zero-instance-app',
'tests/data/marathon/apps/update_zero_instance_sleep.json')
with _zero_instance_app():
_update_app(
'zero-instance-app',
'tests/data/marathon/apps/update_zero_instance_sleep.json')
_list_versions('zero-instance-app', 1, 1)
_list_versions('zero-instance-app', 2, 2)
_list_versions('zero-instance-app', 2, 3)
_remove_app('zero-instance-app')
_list_versions('zero-instance-app', 1, 1)
_list_versions('zero-instance-app', 2, 2)
_list_versions('zero-instance-app', 2, 3)
def test_list_empty_deployment():
@@ -489,10 +449,9 @@ def test_list_empty_deployment():
def test_list_deployment():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_start_app('zero-instance-app', 3)
list_deployments(1)
_remove_app('zero-instance-app')
with _zero_instance_app():
_start_app('zero-instance-app', 3)
list_deployments(1)
def test_list_deployment_table():
@@ -500,24 +459,22 @@ def test_list_deployment_table():
The more specific testing is done in unit tests.
"""
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_start_app('zero-instance-app', 3)
assert_lines(['dcos', 'marathon', 'deployment', 'list'], 2)
_remove_app('zero-instance-app')
with _zero_instance_app():
_start_app('zero-instance-app', 3)
assert_lines(['dcos', 'marathon', 'deployment', 'list'], 2)
def test_list_deployment_missing_app():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_start_app('zero-instance-app')
list_deployments(0, 'missing-id')
_remove_app('zero-instance-app')
with _zero_instance_app():
_start_app('zero-instance-app')
list_deployments(0, 'missing-id')
def test_list_deployment_app():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_start_app('zero-instance-app', 3)
list_deployments(1, 'zero-instance-app')
_remove_app('zero-instance-app')
with _zero_instance_app():
_start_app('zero-instance-app', 3)
list_deployments(1, 'zero-instance-app')
def test_rollback_missing_deployment():
@@ -528,35 +485,32 @@ def test_rollback_missing_deployment():
def test_rollback_deployment():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_start_app('zero-instance-app', 3)
result = list_deployments(1, 'zero-instance-app')
with _zero_instance_app():
_start_app('zero-instance-app', 3)
result = list_deployments(1, 'zero-instance-app')
returncode, stdout, stderr = exec_command(
['dcos', 'marathon', 'deployment', 'rollback', result[0]['id']])
returncode, stdout, stderr = exec_command(
['dcos', 'marathon', 'deployment', 'rollback', result[0]['id']])
result = json.loads(stdout.decode('utf-8'))
result = json.loads(stdout.decode('utf-8'))
assert returncode == 0
assert 'deploymentId' in result
assert 'version' in result
assert stderr == b''
assert returncode == 0
assert 'deploymentId' in result
assert 'version' in result
assert stderr == b''
list_deployments(0)
_remove_app('zero-instance-app')
list_deployments(0)
def test_stop_deployment():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_start_app('zero-instance-app', 3)
result = list_deployments(1, 'zero-instance-app')
with _zero_instance_app():
_start_app('zero-instance-app', 3)
result = list_deployments(1, 'zero-instance-app')
assert_command(['dcos', 'marathon', 'deployment', 'stop', result[0]['id']])
assert_command(
['dcos', 'marathon', 'deployment', 'stop', result[0]['id']])
list_deployments(0)
_remove_app('zero-instance-app')
list_deployments(0)
def test_watching_missing_deployment():
@@ -564,12 +518,11 @@ def test_watching_missing_deployment():
def test_watching_deployment():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_start_app('zero-instance-app', 3)
result = list_deployments(1, 'zero-instance-app')
watch_deployment(result[0]['id'], 60)
list_deployments(0, 'zero-instance-app')
_remove_app('zero-instance-app')
with _zero_instance_app():
_start_app('zero-instance-app', 3)
result = list_deployments(1, 'zero-instance-app')
watch_deployment(result[0]['id'], 60)
list_deployments(0, 'zero-instance-app')
def test_list_empty_task():
@@ -577,44 +530,39 @@ def test_list_empty_task():
def test_list_empty_task_not_running_app():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_list_tasks(0)
_remove_app('zero-instance-app')
with _zero_instance_app():
_list_tasks(0)
def test_list_tasks():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_start_app('zero-instance-app', 3)
result = list_deployments(1, 'zero-instance-app')
watch_deployment(result[0]['id'], 60)
_list_tasks(3)
_remove_app('zero-instance-app')
with _zero_instance_app():
_start_app('zero-instance-app', 3)
result = list_deployments(1, 'zero-instance-app')
watch_deployment(result[0]['id'], 60)
_list_tasks(3)
def test_list_tasks_table():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_start_app('zero-instance-app', 3)
watch_all_deployments()
assert_lines(['dcos', 'marathon', 'task', 'list'], 4)
_remove_app('zero-instance-app')
with _zero_instance_app():
_start_app('zero-instance-app', 3)
watch_all_deployments()
assert_lines(['dcos', 'marathon', 'task', 'list'], 4)
def test_list_app_tasks():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_start_app('zero-instance-app', 3)
result = list_deployments(1, 'zero-instance-app')
watch_deployment(result[0]['id'], 60)
_list_tasks(3, 'zero-instance-app')
_remove_app('zero-instance-app')
with _zero_instance_app():
_start_app('zero-instance-app', 3)
result = list_deployments(1, 'zero-instance-app')
watch_deployment(result[0]['id'], 60)
_list_tasks(3, 'zero-instance-app')
def test_list_missing_app_tasks():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_start_app('zero-instance-app', 3)
result = list_deployments(1, 'zero-instance-app')
watch_deployment(result[0]['id'], 60)
_list_tasks(0, 'missing-id')
_remove_app('zero-instance-app')
with _zero_instance_app():
_start_app('zero-instance-app', 3)
result = list_deployments(1, 'zero-instance-app')
watch_deployment(result[0]['id'], 60)
_list_tasks(0, 'missing-id')
def test_show_missing_task():
@@ -630,22 +578,20 @@ def test_show_missing_task():
def test_show_task():
_add_app('tests/data/marathon/apps/zero_instance_sleep.json')
_start_app('zero-instance-app', 3)
result = list_deployments(1, 'zero-instance-app')
watch_deployment(result[0]['id'], 60)
result = _list_tasks(3, 'zero-instance-app')
with _zero_instance_app():
_start_app('zero-instance-app', 3)
result = list_deployments(1, 'zero-instance-app')
watch_deployment(result[0]['id'], 60)
result = _list_tasks(3, 'zero-instance-app')
returncode, stdout, stderr = exec_command(
['dcos', 'marathon', 'task', 'show', result[0]['id']])
returncode, stdout, stderr = exec_command(
['dcos', 'marathon', 'task', 'show', result[0]['id']])
result = json.loads(stdout.decode('utf-8'))
result = json.loads(stdout.decode('utf-8'))
assert returncode == 0
assert result['appId'] == '/zero-instance-app'
assert stderr == b''
_remove_app('zero-instance-app')
assert returncode == 0
assert result['appId'] == '/zero-instance-app'
assert stderr == b''
def test_bad_configuration():
@@ -659,7 +605,7 @@ def test_bad_configuration():
assert stdout == b''
assert stderr.decode().startswith(
"Marathon likely misconfigured. Please check your proxy or "
"Marathon URI settings. See dcos config --help. ")
"Marathon URL settings. See dcos config --help. ")
assert_command(['dcos', 'config', 'unset', 'marathon.url'])
@@ -682,26 +628,6 @@ def _list_apps(app_id=None):
return result
def _remove_app(app_id):
assert_command(['dcos', 'marathon', 'app', 'remove', app_id])
# Let's make sure that we don't return until the deployment has finished
result = list_deployments(None, app_id)
if len(result) != 0:
watch_deployment(result[0]['id'], 60)
def _add_app(file_path):
with open(file_path) as fd:
returncode, stdout, stderr = exec_command(
['dcos', 'marathon', 'app', 'add'],
stdin=fd)
assert returncode == 0
assert stdout == b''
assert stderr == b''
def _start_app(app_id, instances=None):
cmd = ['dcos', 'marathon', 'app', 'start', app_id]
if instances is not None:
@@ -754,3 +680,10 @@ def _list_tasks(expected_count, app_id=None):
assert stderr == b''
return result
@contextlib.contextmanager
def _zero_instance_app():
with app('tests/data/marathon/apps/zero_instance_sleep.json',
'zero-instance-app'):
yield

View File

@@ -18,7 +18,7 @@ def zk_znode(request):
def test_help():
stdout = b"""Get the status of DCOS services
stdout = b"""Manage DCOS services
Usage:
dcos service --info
@@ -45,7 +45,7 @@ Positional Arguments:
def test_info():
stdout = b"Get the status of DCOS services\n"
stdout = b"Manage DCOS services\n"
assert_command(['dcos', 'service', '--info'], stdout=stdout)

View File

@@ -1,41 +1,59 @@
import collections
import json
import os
import re
import subprocess
import time
import dcos.util as util
from dcos import mesos
from dcos.errors import DCOSException
from dcos.util import create_schema
from dcoscli.task.main import _mesos_files, main
import fcntl
from mock import MagicMock, patch
from ..fixtures.task import task_fixture
from .common import (assert_command, assert_lines, exec_command,
watch_all_deployments)
from .common import (app, assert_command, assert_lines, assert_mock,
exec_command, watch_all_deployments)
SLEEP1 = 'tests/data/marathon/apps/sleep.json'
SLEEP2 = 'tests/data/marathon/apps/sleep2.json'
FOLLOW = 'tests/data/file/follow.json'
TWO_TASKS = 'tests/data/file/two_tasks.json'
TWO_TASKS_FOLLOW = 'tests/data/file/two_tasks_follow.json'
def test_help():
stdout = b"""Get the status of DCOS tasks
stdout = b"""Manage DCOS tasks
Usage:
dcos task --info
dcos task [--completed --json <task>]
dcos task log [--completed --follow --lines=N] <task> [<file>]
Options:
-h, --help Show this screen
--info Show a short description of this subcommand
--completed Include completed tasks as well
--follow Output data as the file grows
--json Print json-formatted tasks
--completed Show completed tasks as well
--lines=N Output the last N lines [default: 10]
--version Show version
Positional Arguments:
<task> Only match tasks whose ID matches <task>. <task> may be
a substring of the ID, or a unix glob pattern.
<file> Output this file. [default: stdout]
"""
assert_command(['dcos', 'task', '--help'], stdout=stdout)
def test_info():
stdout = b"Get the status of DCOS tasks\n"
stdout = b"Manage DCOS tasks\n"
assert_command(['dcos', 'task', '--info'], stdout=stdout)
@@ -105,6 +123,174 @@ def test_filter():
_uninstall_sleep('test-app2')
def test_log_no_files():
""" Tail stdout on nonexistant task """
assert_command(['dcos', 'task', 'log', 'asdf'],
returncode=1,
stderr=b'No matching tasks. Exiting.\n')
def test_log_single_file():
""" Tail a single file on a single task """
with app(SLEEP1, 'test-app', True):
returncode, stdout, stderr = exec_command(
['dcos', 'task', 'log', 'test-app'])
assert returncode == 0
assert stderr == b''
assert len(stdout.decode('utf-8').split('\n')) == 5
def test_log_missing_file():
""" Tail a single file on a single task """
with app(SLEEP1, 'test-app', True):
returncode, stdout, stderr = exec_command(
['dcos', 'task', 'log', 'test-app', 'asdf'])
assert returncode == 1
assert stdout == b''
assert stderr == b'No files exist. Exiting.\n'
def test_log_lines():
""" Test --lines """
with app(SLEEP1, 'test-app', True):
returncode, stdout, stderr = exec_command(
['dcos', 'task', 'log', 'test-app', '--lines=2'])
assert returncode == 0
assert stderr == b''
assert len(stdout.decode('utf-8').split('\n')) == 3
def test_log_follow():
""" Test --follow """
with app(FOLLOW, 'follow', True):
# verify output
proc = subprocess.Popen(['dcos', 'task', 'log', 'follow', '--follow'],
stdout=subprocess.PIPE)
# mark stdout as non-blocking, so we can read all available data
# before EOF
_mark_non_blocking(proc.stdout)
# wait for data to be output
time.sleep(1)
# assert lines before and after sleep
assert len(proc.stdout.read().decode('utf-8').split('\n')) == 5
time.sleep(8)
assert len(proc.stdout.read().decode('utf-8').split('\n')) == 2
proc.kill()
def test_log_two_tasks():
""" Test tailing a single file on two separate tasks """
with app(TWO_TASKS, 'two-tasks', True):
returncode, stdout, stderr = exec_command(
['dcos', 'task', 'log', 'two-tasks'])
assert returncode == 0
assert stderr == b''
lines = stdout.decode('utf-8').split('\n')
assert len(lines) == 11
assert re.match('===>.*<===', lines[0])
assert re.match('===>.*<===', lines[5])
def test_log_two_tasks_follow():
""" Test tailing a single file on two separate tasks with --follow """
with app(TWO_TASKS_FOLLOW, 'two-tasks-follow', True):
proc = subprocess.Popen(
['dcos', 'task', 'log', 'two-tasks-follow', '--follow'],
stdout=subprocess.PIPE)
# mark stdout as non-blocking, so we can read all available data
# before EOF
_mark_non_blocking(proc.stdout)
# wait for data to be output
time.sleep(1)
# get output before and after the task's sleep
first_lines = proc.stdout.read().decode('utf-8').split('\n')
time.sleep(8)
second_lines = proc.stdout.read().decode('utf-8').split('\n')
# assert both tasks have printed the expected amount of output
assert len(first_lines) >= 11
# assert there is some difference after sleeping
assert len(second_lines) > 0
proc.kill()
def test_log_completed():
""" Test --completed """
# create a completed task
# ensure that tail lists nothing
# ensure that tail --completed lists a completed task
with app(SLEEP1, 'test-app', True):
pass
assert_command(['dcos', 'task', 'log', 'test-app'],
returncode=1,
stderr=b'No matching tasks. Exiting.\n',
stdout=b'')
returncode, stdout, stderr = exec_command(
['dcos', 'task', 'log', '--completed', 'test-app'])
assert returncode == 0
assert stderr == b''
assert len(stdout.decode('utf-8').split('\n')) > 4
def test_log_master_unavailable():
""" Test master's state.json being unavailable """
client = mesos.MesosClient()
client.get_master_state = _mock_exception()
with patch('dcos.mesos.MesosClient', return_value=client):
args = ['task', 'log', '_']
assert_mock(main, args, returncode=1, stderr=(b"exception\n"))
def test_log_slave_unavailable():
""" Test slave's state.json being unavailable """
with app(SLEEP1, 'test-app', True):
client = mesos.MesosClient()
client.get_slave_state = _mock_exception()
with patch('dcos.mesos.MesosClient', return_value=client):
args = ['task', 'log', 'test-app']
stderr = (b"""Error accessing slave: exception\n"""
b"""No matching tasks. Exiting.\n""")
assert_mock(main, args, returncode=1, stderr=stderr)
def test_log_file_unavailable():
""" Test a file's read.json being unavailable """
with app(SLEEP1, 'test-app', True):
files = _mesos_files(False, "", "stdout")
assert len(files) == 1
files[0].read = _mock_exception('exception')
with patch('dcoscli.task.main._mesos_files', return_value=files):
args = ['task', 'log', 'test-app']
stderr = b"No files exist. Exiting.\n"
assert_mock(main, args, returncode=1, stderr=stderr)
def _mock_exception(contents='exception'):
return MagicMock(side_effect=DCOSException(contents))
def _mark_non_blocking(file_):
fcntl.fcntl(file_.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
def _install_sleep_task(app_path=SLEEP1, app_name='test-app'):
# install helloworld app
args = ['dcos', 'marathon', 'app', 'add', app_path]

View File

@@ -1,6 +1,6 @@
import requests
from dcos import util
from dcos.errors import DCOSException, DefaultError, Error
from dcos.errors import DCOSException
logger = util.get_logger(__name__)
@@ -17,18 +17,30 @@ def _default_is_success(status_code):
return status_code >= 200 and status_code < 300
def _default_to_error(response):
def _default_to_exception(response):
"""
:param response: HTTP response object or Error
:type response: requests.Response or Error
:returns: the error embedded in the response JSON
:rtype: Error
:param response: HTTP response object or Exception
:type response: requests.Response | Exception
:returns: exception
:rtype: Exception
"""
if isinstance(response, Error):
if isinstance(response, Exception) and \
not isinstance(response, requests.exceptions.RequestException):
return response
return DefaultError('{}: {}'.format(response.status_code, response.text))
url = response.request.url
if isinstance(response, requests.exceptions.ConnectionError):
return DCOSException('URL [{0}] is unreachable: {1}'
.format(url, response))
elif isinstance(response, requests.exceptions.Timeout):
return DCOSException('Request to URL [{0}] timed out'.format(url))
elif isinstance(response, requests.exceptions.RequestException):
return response
else:
return DCOSException(
'Error while fetching [{0}]: HTTP {1}: {2}'.format(
url, response.status_code, response.reason))
@util.duration
@@ -36,7 +48,7 @@ def request(method,
url,
timeout=3.0,
is_success=_default_is_success,
to_error=_default_to_error,
to_exception=_default_to_exception,
**kwargs):
"""Sends an HTTP request.
@@ -46,8 +58,8 @@ def request(method,
:type url: str
:param is_success: Defines successful status codes for the request
:type is_success: Function from int to bool
:param to_error: Builds an Error from an unsuccessful response or Error
:type to_error: Function from requests.Response or Error to Error
:param to_exception: Builds an Error from an unsuccessful response or Error
:type to_exception: (requests.Response | Error) -> Error
:param kwargs: Additional arguments to requests.request
(see http://docs.python-requests.org/en/latest/api/#requests.request)
:type kwargs: dict
@@ -73,7 +85,7 @@ def request(method,
with requests.Session() as session:
response = session.send(request.prepare(), timeout=timeout)
except Exception as ex:
raise DCOSException(to_error(DefaultError(str(ex))).error())
raise to_exception(ex)
logger.info('Received HTTP response [%r]: %r',
response.status_code,
@@ -82,10 +94,10 @@ def request(method,
if is_success(response.status_code):
return response
else:
raise DCOSException(to_error(response).error())
raise to_exception(response)
def head(url, to_error=_default_to_error, **kwargs):
def head(url, to_exception=_default_to_exception, **kwargs):
"""Sends a HEAD request.
:param url: URL for the new Request object
@@ -99,7 +111,7 @@ def head(url, to_error=_default_to_error, **kwargs):
return request('head', url, **kwargs)
def get(url, to_error=_default_to_error, **kwargs):
def get(url, to_exception=_default_to_exception, **kwargs):
"""Sends a GET request.
:param url: URL for the new Request object
@@ -110,10 +122,11 @@ def get(url, to_error=_default_to_error, **kwargs):
:rtype: Response
"""
return request('get', url, to_error=to_error, **kwargs)
return request('get', url, to_exception=to_exception, **kwargs)
def post(url, to_error=_default_to_error, data=None, json=None, **kwargs):
def post(url, to_exception=_default_to_exception,
data=None, json=None, **kwargs):
"""Sends a POST request.
:param url: URL for the new Request object
@@ -128,11 +141,11 @@ def post(url, to_error=_default_to_error, data=None, json=None, **kwargs):
:rtype: Response
"""
return request('post',
url, to_error=to_error, data=data, json=json, **kwargs)
return request('post', url,
to_exception=to_exception, data=data, json=json, **kwargs)
def put(url, to_error=_default_to_error, data=None, **kwargs):
def put(url, to_exception=_default_to_exception, data=None, **kwargs):
"""Sends a PUT request.
:param url: URL for the new Request object
@@ -145,10 +158,10 @@ def put(url, to_error=_default_to_error, data=None, **kwargs):
:rtype: Response
"""
return request('put', url, to_error=to_error, data=data, **kwargs)
return request('put', url, to_exception=to_exception, data=data, **kwargs)
def patch(url, to_error=_default_to_error, data=None, **kwargs):
def patch(url, to_exception=_default_to_exception, data=None, **kwargs):
"""Sends a PATCH request.
:param url: URL for the new Request object
@@ -161,10 +174,11 @@ def patch(url, to_error=_default_to_error, data=None, **kwargs):
:rtype: Response
"""
return request('patch', url, to_error=to_error, data=data, **kwargs)
return request('patch', url,
to_exception=to_exception, data=data, **kwargs)
def delete(url, to_error=_default_to_error, **kwargs):
def delete(url, to_exception=_default_to_exception, **kwargs):
"""Sends a DELETE request.
:param url: URL for the new Request object
@@ -175,7 +189,7 @@ def delete(url, to_error=_default_to_error, **kwargs):
:rtype: Response
"""
return request('delete', url, to_error=to_error, **kwargs)
return request('delete', url, to_exception=to_exception, **kwargs)
def silence_requests_warnings():

View File

@@ -2,7 +2,7 @@ import json
from distutils.version import LooseVersion
from dcos import http, util
from dcos.errors import DCOSException, DefaultError, Error
from dcos.errors import DCOSException
from six.moves import urllib
@@ -21,38 +21,38 @@ def create_client(config=None):
if config is None:
config = util.get_config()
marathon_uri = _get_marathon_uri(config)
marathon_url = _get_marathon_url(config)
logger.info('Creating marathon client with: %r', marathon_uri)
return Client(marathon_uri)
logger.info('Creating marathon client with: %r', marathon_url)
return Client(marathon_url)
def _get_marathon_uri(config):
def _get_marathon_url(config):
"""
:param config: configuration dictionary
:type config: config.Toml
:returns: marathon base uri
:returns: marathon base url
:rtype: str
"""
marathon_uri = config.get('marathon.url')
if marathon_uri is None:
marathon_url = config.get('marathon.url')
if marathon_url is None:
dcos_url = util.get_config_vals(config, ['core.dcos_url'])[0]
marathon_uri = urllib.parse.urljoin(dcos_url, 'marathon/')
marathon_url = urllib.parse.urljoin(dcos_url, 'marathon/')
return marathon_uri
return marathon_url
def _to_error(response):
def _to_exception(response):
"""
:param response: HTTP response object or Error
:type response: requests.Response | Error
:returns: the error embedded in the response JSON
:rtype: Error
:param response: HTTP response object or Exception
:type response: requests.Response | Exception
:returns: An exception with the message from the response JSON
:rtype: Exception
"""
if isinstance(response, Error):
return DefaultError(_default_marathon_error(response.error()))
if isinstance(response, Exception):
return DCOSException(_default_marathon_error(str(response)))
message = response.json().get('message')
if message is None:
@@ -61,23 +61,23 @@ def _to_error(response):
logger.error(
'Marathon server did not return a message: %s',
response.json())
return DefaultError(_default_marathon_error())
return DCOSException(_default_marathon_error())
msg = '\n'.join(error['error'] for error in errs)
return DefaultError(_default_marathon_error(msg))
return DCOSException(_default_marathon_error(msg))
return DefaultError('Error: {}'.format(response.json()['message']))
return DCOSException('Error: {}'.format(response.json()['message']))
class Client(object):
"""Class for talking to the Marathon server.
:param marathon_uri: the base URI for the Marathon server
:type marathon_uri: str
:param marathon_url: the base URL for the Marathon server
:type marathon_url: str
"""
def __init__(self, marathon_uri):
self._base_uri = marathon_uri
def __init__(self, marathon_url):
self._base_url = marathon_url
min_version = "0.8.1"
version = LooseVersion(self.get_about()["version"])
@@ -97,7 +97,7 @@ class Client(object):
:rtype: str
"""
return urllib.parse.urljoin(self._base_uri, path)
return urllib.parse.urljoin(self._base_url, path)
def get_version(self):
"""Get marathon version
@@ -116,7 +116,7 @@ class Client(object):
url = self._create_url('v2/info')
response = http.get(url, to_error=_to_error)
response = http.get(url, to_exception=_to_exception)
return response.json()
@@ -139,7 +139,7 @@ class Client(object):
url = self._create_url(
'v2/apps{}/versions/{}'.format(app_id, version))
response = http.get(url, to_error=_to_error)
response = http.get(url, to_exception=_to_exception)
# Looks like Marathon return different JSON for versions
if version is None:
@@ -156,7 +156,7 @@ class Client(object):
url = self._create_url('v2/groups')
response = http.get(url, to_error=_to_error)
response = http.get(url, to_exception=_to_exception)
return response.json()['groups']
@@ -179,7 +179,7 @@ class Client(object):
url = self._create_url(
'v2/groups{}/versions/{}'.format(group_id, version))
response = http.get(url, to_error=_to_error)
response = http.get(url, to_exception=_to_exception)
return response.json()
@@ -206,7 +206,7 @@ class Client(object):
url = self._create_url('v2/apps{}/versions'.format(app_id))
response = http.get(url, to_error=_to_error)
response = http.get(url, to_exception=_to_exception)
if max_count is None:
return response.json()['versions']
@@ -222,7 +222,7 @@ class Client(object):
url = self._create_url('v2/apps')
response = http.get(url, to_error=_to_error)
response = http.get(url, to_exception=_to_exception)
return response.json()['apps']
@@ -245,7 +245,7 @@ class Client(object):
response = http.post(url,
json=app_json,
to_error=_to_error)
to_exception=_to_exception)
return response.json()
@@ -276,7 +276,7 @@ class Client(object):
response = http.put(url,
params=params,
json=payload,
to_error=_to_error)
to_exception=_to_exception)
return response.json().get('deploymentId')
@@ -335,7 +335,7 @@ class Client(object):
response, error = http.put(url,
params=params,
json={'instances': int(instances)},
to_error=_to_error)
to_exception=_to_exception)
if error is not None:
return (None, error)
@@ -375,7 +375,7 @@ class Client(object):
url = self._create_url('v2/apps{}'.format(app_id))
http.delete(url, params=params, to_error=_to_error)
http.delete(url, params=params, to_exception=_to_exception)
def remove_group(self, group_id, force=None):
"""Completely removes the requested application.
@@ -396,7 +396,7 @@ class Client(object):
url = self._create_url('v2/groups{}'.format(group_id))
http.delete(url, params=params, to_error=_to_error)
http.delete(url, params=params, to_exception=_to_exception)
def restart_app(self, app_id, force=None):
"""Performs a rolling restart of all of the tasks.
@@ -420,7 +420,7 @@ class Client(object):
response = http.post(url,
params=params,
to_error=_to_error)
to_exception=_to_exception)
return response.json()
@@ -436,7 +436,7 @@ class Client(object):
url = self._create_url('v2/deployments')
response = http.get(url,
to_error=_to_error)
to_exception=_to_exception)
deployment = next(
(deployment for deployment in response.json()
@@ -456,7 +456,7 @@ class Client(object):
url = self._create_url('v2/deployments')
response = http.get(url, to_error=_to_error)
response = http.get(url, to_exception=_to_exception)
if app_id is not None:
app_id = self.normalize_app_id(app_id)
@@ -493,7 +493,7 @@ class Client(object):
response = http.delete(
url,
params=params,
to_error=_to_error)
to_exception=_to_exception)
if force:
return None
@@ -532,7 +532,7 @@ class Client(object):
url = self._create_url('v2/tasks')
response = http.get(url, to_error=_to_error)
response = http.get(url, to_exception=_to_exception)
if app_id is not None:
app_id = self.normalize_app_id(app_id)
@@ -556,7 +556,7 @@ class Client(object):
url = self._create_url('v2/tasks')
response = http.get(url, to_error=_to_error)
response = http.get(url, to_exception=_to_exception)
task = next(
(task for task in response.json()['tasks']
@@ -609,7 +609,7 @@ class Client(object):
else:
group_json = group_resource
response = http.post(url, json=group_json, to_error=_to_error)
response = http.post(url, json=group_json, to_exception=_to_exception)
return response.json()
@@ -623,5 +623,5 @@ def _default_marathon_error(message=""):
"""
return ("Marathon likely misconfigured. Please check your proxy or "
"Marathon URI settings. See dcos config --help. {}").format(
"Marathon URL settings. See dcos config --help. {}").format(
message)

View File

@@ -1,5 +1,6 @@
import fnmatch
import itertools
import os
from dcos import http, util
from dcos.errors import DCOSException
@@ -8,93 +9,100 @@ from six.moves import urllib
logger = util.get_logger(__name__)
MESOS_TIMEOUT = 5
def get_master(config=None):
"""Create a Master object using the URLs stored in the user's
configuration.
:param config: config
def get_master():
"""Create a Master object using the url stored in the
'core.mesos_master_url' property if it exists. Otherwise, we use
the `core.dcos_url` property
:param config: user config
:type config: Toml
:returns: master state object
:rtype: Master
"""
return Master(get_master_client(config).get_state())
return Master(MesosClient().get_master_state())
def get_master_client(config=None):
"""Create a Mesos master client using the URLs stored in the user's
configuration.
class MesosClient:
"""Client for communicating with the Mesos master"""
:param config: config
:type config: Toml
:returns: mesos master client
:rtype: MasterClient
"""
if config is None:
def __init__(self):
config = util.get_config()
self._dcos_url = None
self._mesos_master_url = None
mesos_url = _get_mesos_url(config)
return MasterClient(mesos_url)
mesos_master_url = config.get('core.mesos_master_url')
if mesos_master_url is None:
self._dcos_url = util.get_config_vals(config, ['core.dcos_url'])[0]
else:
self._mesos_master_url = mesos_master_url
def master_url(self, path):
""" Create a URL that hits the master
def _get_mesos_url(config):
"""
:param config: configuration
:type config: Toml
:returns: url for the Mesos master
:rtype: str
"""
mesos_master_url = config.get('core.mesos_master_url')
if mesos_master_url is None:
dcos_url = util.get_config_vals(config, ['core.dcos_url'])[0]
return urllib.parse.urljoin(dcos_url, 'mesos/')
else:
return mesos_master_url
class MasterClient:
"""Client for communicating with the Mesos master
:param url: URL for the Mesos master
:type url: str
"""
def __init__(self, url):
self._base_url = url
def _create_url(self, path):
"""Creates the url from the provided path.
:param path: url path
:param path: the path suffix of the desired URL
:type path: str
:returns: constructed url
:returns: URL that hits the master
:rtype: str
"""
return urllib.parse.urljoin(self._base_url, path)
base_url = (self._mesos_master_url or
urllib.parse.urljoin(self._dcos_url, 'mesos/'))
return urllib.parse.urljoin(base_url, path)
def get_state(self):
# TODO (mgummelt): this doesn't work with self._mesos_master_url
def slave_url(self, slave_id, path):
""" Create a URL that hits the slave
:param slave_id: slave ID
:type slave_id: str
:param path: the path suffix of the desired URL
:type path: str
:returns: URL that hits the master
:rtype: str
"""
return urllib.parse.urljoin(self._dcos_url,
'slave/{}/{}'.format(slave_id, path))
def get_master_state(self):
"""Get the Mesos master state json object
:returns: Mesos' master state json object
:rtype: dict
"""
return http.get(self._create_url('master/state.json')).json()
url = self.master_url('master/state.json')
return http.get(url).json()
def get_slave_state(self, slave_id):
"""Get the Mesos slave state json object
:param slave_id: slave ID
:type slave_id: str
:returns: Mesos' master state json object
:rtype: dict
"""
url = self.slave_url(slave_id, 'state.json')
return http.get(url).json()
def shutdown_framework(self, framework_id):
"""Shuts down a Mesos framework
:param framework_id: ID of the framework to shutdown
:type framework_id: str
:returns: None
"""
logger.info('Shutting down framework {}'.format(framework_id))
data = 'frameworkId={}'.format(framework_id)
http.post(self._create_url('master/shutdown'), data=data)
url = self.master_url('master/shutdown')
http.post(url, data=data)
class Master(object):
@@ -106,6 +114,8 @@ class Master(object):
def __init__(self, state):
self._state = state
self._frameworks = {}
self._slaves = {}
def state(self):
"""Returns master's master/state.json.
@@ -116,8 +126,22 @@ class Master(object):
return self._state
def slave(self, fltr):
def slave_base_url(self, slave):
"""Returns the base url of the provided slave object.
:param slave: slave to create a url for
:type slave: Slave
:returns: slave's base url
:rtype: str
"""
if self._mesos_master_url is not None:
slave_ip = slave['pid'].split('@')[1]
return 'http://{}'.format(slave_ip)
else:
return urllib.parse.urljoin(self._dcos_url,
'slave/{}/'.format(slave['id']))
def slave(self, fltr):
"""Returns the slave that has `fltr` in its id. Raises a
DCOSException if there is not exactly one such slave.
@@ -141,21 +165,8 @@ class Master(object):
else:
return slaves[0]
def slaves(self, fltr=""):
"""Returns those slaves that have `fltr` in their 'id'
:param fltr: filter string
:type fltr: str
:returns: Those slaves that have `fltr` in their 'id'
:rtype: [Slave]
"""
return [Slave(slave)
for slave in self.state()['slaves']
if fltr in slave['id']]
def task(self, fltr):
"""Returns the task with `fltr` in its id. Raises an exception if
"""Returns the task with `fltr` in its id. Raises a DCOSException if
there is not exactly one such task.
:param fltr: filter string
@@ -178,18 +189,43 @@ class Master(object):
else:
return tasks[0]
# TODO (thomas): need to filter on task state as well as id
def framework(self, framework_id):
"""Returns a framework by id
:param framework_id: the framework's id
:type framework_id: str
:returns: the framework
:rtype: Framework
"""
for f in self._framework_dicts(True, True):
if f['id'] == framework_id:
return self._framework_obj(f)
return None
def slaves(self, fltr=""):
"""Returns those slaves that have `fltr` in their 'id'
:param fltr: filter string
:type fltr: str
:returns: Those slaves that have `fltr` in their 'id'
:rtype: [Slave]
"""
return [self._slave_obj(slave)
for slave in self.state()['slaves']
if fltr in slave['id']]
def tasks(self, fltr="", completed=False):
"""Returns tasks running under the master
:param fltr: May be a substring or unix glob pattern. Only
return tasks whose 'id' matches `fltr`.
:param fltr: May be a substring or regex. Only return tasks
whose 'id' matches `fltr`.
:type fltr: str
:param completed: also include completed tasks
:type completed: bool
:returns: a list of tasks
:rtype: [Task]
"""
keys = ['tasks']
@@ -198,28 +234,13 @@ class Master(object):
tasks = []
for framework in self._framework_dicts(completed, completed):
tasks += \
[Task(task, self)
for task in _merge(framework, *keys)
if fltr in task['id'] or
fnmatch.fnmatchcase(task['id'], fltr)]
for task in _merge(framework, keys):
if fltr in task['id'] or fnmatch.fnmatchcase(task['id'], fltr):
task = self._framework_obj(framework).task(task['id'])
tasks.append(task)
return tasks
def framework(self, framework_id):
"""Returns a framework by id
:param framework_id: the framework's id
:type framework_id: int
:returns: the framework
:rtype: Framework
"""
for f in self._framework_dicts(inactive=True):
if f['id'] == framework_id:
return Framework(f)
raise DCOSException('No Framework with id [{}]'.format(framework_id))
def frameworks(self, inactive=False, completed=False):
"""Returns a list of all frameworks
@@ -231,8 +252,51 @@ class Master(object):
:rtype: [Framework]
"""
return [Framework(f)
for f in self._framework_dicts(inactive, completed)]
return [self._framework_obj(framework)
for framework in self._framework_dicts(inactive, completed)]
@util.duration
def fetch(self, path, **kwargs):
"""GET the resource located at `path`
:param path: the URL path
:type path: str
:param **kwargs: http.get kwargs
:type **kwargs: dict
:returns: the response object
:rtype: Response
"""
url = urllib.parse.urljoin(self._base_url(), path)
return http.get(url, timeout=MESOS_TIMEOUT, **kwargs)
def _slave_obj(self, slave):
"""Returns the Slave object corresponding to the provided `slave`
dict. Creates it if it doesn't exist already.
:param slave: slave
:type slave: dict
:returns: Slave
:rtype: Slave
"""
if slave['id'] not in self._slaves:
self._slaves[slave['id']] = Slave(slave, None, self)
return self._slaves[slave['id']]
def _framework_obj(self, framework):
"""Returns the Framework object corresponding to the provided `framework`
dict. Creates it if it doesn't exist already.
:param framework: framework
:type framework: dict
:returns: Framework
:rtype: Framework
"""
if framework['id'] not in self._frameworks:
self._frameworks[framework['id']] = Framework(framework, self)
return self._frameworks[framework['id']]
def _framework_dicts(self, inactive=False, completed=False):
"""Returns a list of all frameworks as their raw dictionaries
@@ -242,13 +306,12 @@ class Master(object):
:param completed: also include completed frameworks
:type completed: bool
:returns: a list of frameworks
:rtype: [dict]
"""
keys = ['frameworks']
if completed:
keys.append('completed_frameworks')
for framework in _merge(self.state(), *keys):
for framework in _merge(self.state(), keys):
if inactive or framework['active']:
yield framework
@@ -256,16 +319,62 @@ class Master(object):
class Slave(object):
"""Mesos Slave Model
:param slave: dictionary representing the slave.
retrieved from master/state.json
:type slave: dict
:param short_state: slave's entry from the master's state.json
:type short_state: dict
:param state: slave's state.json
:type state: dict
:param master: slave's master
:type master: Master
"""
def __init__(self, slave):
self._slave = slave
def __init__(self, short_state, state, master):
self._short_state = short_state
self._state = state
self._master = master
def state(self):
"""Get the slave's state.json object. Fetch it if it's not already
an instance variable.
:returns: This slave's state.json object
:rtype: dict
"""
if not self._state:
self._state = MesosClient().get_slave_state(self['id'])
return self._state
def _framework_dicts(self):
"""Returns the framework dictionaries from the state.json dict
:returns: frameworks
:rtype: [dict]
"""
return _merge(self._state, ['frameworks', 'completed_frameworks'])
def executor_dicts(self):
"""Returns the executor dictionaries from the state.json
:returns: executors
:rtype: [dict]
"""
iters = [_merge(framework, ['executors', 'completed_executors'])
for framework in self._framework_dicts()]
return itertools.chain(*iters)
def __getitem__(self, name):
return self._slave[name]
"""Support the slave[attr] syntax
:param name: attribute to get
:type name: str
:returns: the value for this attribute in the underlying
slave dictionary
:rtype: object
"""
return self._short_state[name]
class Framework(object):
@@ -273,22 +382,61 @@ class Framework(object):
:param framework: framework properties
:type framework: dict
:param master: framework's master
:type master: Master
"""
def __init__(self, framework):
def __init__(self, framework, master):
self._framework = framework
self._master = master
self._tasks = {} # id->Task map
def task(self, task_id):
"""Returns a task by id
:param task_id: the task's id
:type task_id: str
:returns: the task
:rtype: Task
"""
for task in _merge(self._framework, ['tasks', 'completed_tasks']):
if task['id'] == task_id:
return self._task_obj(task)
return None
def _task_obj(self, task):
"""Returns the Task object corresponding to the provided `task`
dict. Creates it if it doesn't exist already.
:param task: task
:type task: dict
:returns: Task
:rtype: Task
"""
if task['id'] not in self._tasks:
self._tasks[task['id']] = Task(task, self._master)
return self._tasks[task['id']]
def dict(self):
return self._framework
def __getitem__(self, name):
"""Support the framework[attr] syntax
:param name: attribute to get
:type name: str
:returns: the value for this attribute in the underlying
framework dictionary
:rtype: object
"""
return self._framework[name]
class Task(object):
"""Mesos Task Model. Created from the Task objects sent in master's
state.json, which is in turn created from mesos' Task protobuf
object.
"""Mesos Task Model.
:param task: task properties
:type task: dict
@@ -309,7 +457,7 @@ class Task(object):
return self._task
def framework(self):
"""Returns the task's framework
"""Returns this task's framework
:returns: task's framework
:rtype: Framework
@@ -335,19 +483,228 @@ class Task(object):
return self.framework()['user']
def executor(self):
""" Returns this tasks' executor
:returns: task's executor
:rtype: dict
"""
for executor in self.slave().executor_dicts():
tasks = _merge(executor,
['completed_tasks',
'tasks',
'queued_tasks'])
if any(task['id'] == self['id'] for task in tasks):
return executor
return None
def directory(self):
""" Sandbox directory for this task
:returns: path to task's sandbox
:rtype: str
"""
return self.executor()['directory']
def __getitem__(self, name):
"""Support the task[attr] syntax
:param name: attribute to get
:type name: str
:returns: the value for this attribute in the underlying
task dictionary
:rtype: object
"""
return self._task[name]
def _merge(d, *keys):
class MesosFile(object):
"""File-like object that is backed by a remote slave file. Uses the
files/read.json endpoint. This endpoint isn't well documented
anywhere, so here is the spec derived from the mesos source code:
request format:
{
path: absolute path to read
offset: start byte location, or -1. -1 means read no data, and
is used to fetch the size of the file in the response's
'offset' parameter.
length: number of bytes to read, or -1. -1 means read the whole file.
}
response format:
{
data: file data. Empty if a request.offset=-1. Could be
smaller than request.length if EOF was reached, or if (I
believe) request.length is larger than the length
supported by the server (16 pages I believe).
offset: the offset value from the request, or the size of the
file if the request offset was -1 or >= the file size.
}
:param task: file's task
:type task: Task
:param path: file's path, relative to the sandbox
:type path: str
"""
def __init__(self, task, path, mesos_client):
self._task = task
self._path = path
self._mesos_client = mesos_client
self._cursor = 0
def size(self):
"""Size of the file
:returns: size of the file
:rtype: int
"""
params = self._params(0, offset=-1)
return self._fetch(params)["offset"]
def seek(self, offset, whence=os.SEEK_SET):
"""Seek to the provided location in the file.
:param offset: location to seek to
:type offset: int
:param whence: determines whether `offset` represents a
location that is absolute, relative to the
beginning of the file, or relative to the end
of the file
:type whence: os.SEEK_SET | os.SEEK_CUR | os.SEEK_END
:returns: None
:rtype: None
"""
if whence == os.SEEK_SET:
self._cursor = 0 + offset
elif whence == os.SEEK_CUR:
self._cursor += offset
elif whence == os.SEEK_END:
self._cursor = self.size() + offset
else:
raise ValueError(
"Unexpected value for `whence`: {}".format(whence))
def tell(self):
""" The current cursor position.
:returns: the current cursor position
:rtype: int
"""
return self._cursor
def read(self, length=None):
"""Reads up to `length` bytes, or the entire file if `length` is None.
:param length: number of bytes to read
:type length: int | None
:returns: data read
:rtype: str
"""
data = ''
while length is None or length - len(data) > 0:
chunk_length = -1 if length is None else length - len(data)
chunk = self._fetch_chunk(chunk_length)
if chunk == '':
break
data += chunk
return data
def _host_path(self):
""" The absolute path to the file on slave.
:returns: the absolute path to the file on slave
:rtype: str
"""
directory = self._task.directory()
if directory[-1] == '/':
return directory + self._path
else:
return directory + '/' + self._path
def _params(self, length, offset=None):
"""GET parameters to send to files/read.json. See the MesosFile
docstring for full information.
:param length: number of bytes to read
:type length: int
:param offset: start location. if None, will use the location
of the current file cursor
:type offset: int
:returns: GET parameters
:rtype: dict
"""
if offset is None:
offset = self._cursor
return {
'path': self._host_path(),
'offset': offset,
'length': length
}
def _fetch_chunk(self, length, offset=None):
"""Fetch data from files/read.json
:param length: number of bytes to fetch
:type length: int
:param offset: start location. If not None, this file's
cursor is set to `offset`
:type offset: int
:returns: data read
:rtype: str
"""
if offset is not None:
self.seek(offset, os.SEEK_SET)
params = self._params(length)
data = self._fetch(params)["data"]
self.seek(len(data), os.SEEK_CUR)
return data
def _fetch(self, params):
"""Fetch data from files/read.json
:param params: GET parameters
:type params: dict
:returns: response dict
:rtype: dict
"""
read_url = self._mesos_client.slave_url(self._task.slave()['id'],
'files/read.json')
return http.get(read_url, params=params).json()
def __str__(self):
"""String representation of the file: <task_id:file_path>
:returns: string representation of the file
:rtype: str
"""
return "{0}:{1}".format(self._task['id'], self._path)
def _merge(d, keys):
""" Merge multiple lists from a dictionary into one iterator.
e.g. _merge({'a': [1, 2], 'b': [3]}, ['a', 'b']) ->
iter(1, 2, 3)
:param d: dictionary
:type d: dict
:param *keys: keys to merge
:type *keys: [hashable]
:param keys: keys to merge
:type keys: [hashable]
:returns: iterator
:rtype: iter
"""

View File

@@ -165,7 +165,7 @@ def uninstall(package_name, remove_all, app_id, cli, app):
remove_all,
app_id,
marathon.create_client(),
mesos.get_master_client())
mesos.MesosClient())
if num_apps > 0:
uninstalled = True
@@ -192,7 +192,7 @@ def uninstall_subcommand(distribution_name):
return subcommand.uninstall(distribution_name)
def uninstall_app(app_name, remove_all, app_id, init_client, master_client):
def uninstall_app(app_name, remove_all, app_id, init_client, mesos_client):
"""Uninstalls an app.
:param app_name: The app to uninstall
@@ -203,8 +203,8 @@ def uninstall_app(app_name, remove_all, app_id, init_client, master_client):
:type app_id: str
:param init_client: The program to use to run the app
:type init_client: object
:param master_client: the mesos master client
:type master_client: dcos.mesos.MasterClient
:param mesos_client: the mesos client
:type mesos_client: dcos.mesos.MesosClient
:returns: number of apps uninstalled
:rtype: int
"""
@@ -245,8 +245,8 @@ def uninstall_app(app_name, remove_all, app_id, init_client, master_client):
if framework_name is not None:
logger.info(
'Trying to shutdown framework {}'.format(framework_name))
frameworks = mesos.Master(master_client.get_state()).frameworks(
inactive=True)
frameworks = mesos.Master(mesos_client.get_master_state()) \
.frameworks(inactive=True)
# Look up all the framework names
framework_ids = [
@@ -259,7 +259,7 @@ def uninstall_app(app_name, remove_all, app_id, init_client, master_client):
'Found the following frameworks: {}'.format(framework_ids))
if len(framework_ids) == 1:
master_client.shutdown_framework(framework_ids[0])
mesos_client.shutdown_framework(framework_ids[0])
elif len(framework_ids) > 1:
raise DCOSException(
"Unable to shutdown the framework for [{}] because there "

View File

@@ -107,7 +107,6 @@ def get_config():
:rtype: Toml
"""
# avoid circular import
from dcos import config
return config.load_from_path(