dcos tasks

This commit is contained in:
Michael Gummelt
2015-05-07 17:47:38 -07:00
parent bc3cb12864
commit d37a4c2979
21 changed files with 772 additions and 41 deletions

View File

@@ -27,6 +27,12 @@
"type": "string",
"title": "Your OAuth access token",
"description": "Your OAuth access token"
},
"mesos_master_url": {
"type": "string",
"title": "Mesos Master URL",
"description":
"Mesos Master URL. Must be of the format: \"http://host:port\""
}
},
"additionalProperties": false

View File

142
cli/dcoscli/tasks/main.py Normal file
View File

@@ -0,0 +1,142 @@
"""Get the status of mesos tasks
Usage:
dcos tasks --info
dcos tasks [--inactive --json <task>]
Options:
-h, --help Show this screen
--info Show a short description of this subcommand
--json Print json-formatted task data
--inactive Show inactive tasks as well
--version Show version
Positional Arguments:
<task> Only match tasks whose ID matches <task>. <task> may be
a substring of the ID, or a unix glob pattern.
"""
from collections import OrderedDict
import blessings
import dcoscli
import docopt
import prettytable
from dcos import cmds, emitting, mesos, util
from dcos.errors import DCOSException
logger = util.get_logger(__name__)
emitter = emitting.FlatEmitter()
def main():
try:
return _main()
except DCOSException as e:
emitter.publish(e)
return 1
def _main():
util.configure_logger_from_environ()
args = docopt.docopt(
__doc__,
version="dcos-tasks version {}".format(dcoscli.version))
return cmds.execute(_cmds(), args)
def _cmds():
"""
:returns: All of the supported commands
:rtype: [Command]
"""
return [
cmds.Command(
hierarchy=['tasks', '--info'],
arg_keys=[],
function=_info),
cmds.Command(
hierarchy=['tasks'],
arg_keys=['<task>', '--inactive', '--json'],
function=_tasks),
]
def _info():
"""Print tasks cli information.
:returns: process return code
:rtype: int
"""
emitter.publish(__doc__.split('\n')[0])
return 0
def _task_table(tasks):
"""Returns a PrettyTable representation of the provided tasks.
:param tasks: tasks to render
:type tasks: [Task]
:rtype: TaskTable
"""
term = blessings.Terminal()
table_generator = OrderedDict([
("name", lambda t: t["name"]),
("user", lambda t: t.user()),
("state", lambda t: t["state"].split("_")[-1][0]),
("id", lambda t: t["id"]),
])
tb = prettytable.PrettyTable(
[k.upper() for k in table_generator.keys()],
border=False,
max_table_width=term.width,
hrules=prettytable.NONE,
vrules=prettytable.NONE,
left_padding_width=0,
right_padding_width=1
)
for task in tasks:
row = [fn(task) for fn in table_generator.values()]
tb.add_row(row)
return tb
def _tasks(fltr, inactive, is_json):
""" List mesos tasks
:param fltr: task id filter
:type fltr: str
:param inactive: If True, include inactive tasks
:type inactive: bool
:param is_json: If true, output json.
Otherwise, output a human readable table.
:type is_json: bool
:returns: process return code
"""
if fltr is None:
fltr = ""
master = mesos.get_master()
tasks = sorted(master.tasks(active_only=(not inactive), fltr=fltr),
key=lambda task: task['name'])
if is_json:
emitter.publish([task.dict() for task in tasks])
else:
table = _task_table(tasks)
output = str(table)
if output:
emitter.publish(output)

View File

@@ -72,7 +72,9 @@ setup(
'virtualenv>=12.1, <13.0',
'rollbar>=0.9, <1.0',
'futures>=3.0, <4.0',
'oauth2client>=1.4, <2.0'
'oauth2client>=1.4, <2.0',
'blessings>=1.6, <2.0',
'prettytable>=0.7, <1.0',
],
# If there are data files included in your packages that need to be
@@ -95,6 +97,7 @@ setup(
'dcos-config=dcoscli.config.main:main',
'dcos-marathon=dcoscli.marathon.main:main',
'dcos-package=dcoscli.package.main:main',
'dcos-tasks=dcoscli.tasks.main:main',
],
},

View File

@@ -1,4 +1,5 @@
[core]
mesos_master_url = "http://localhost:5050"
reporting = false
email = "test@mail.com"
[marathon]

View File

@@ -0,0 +1,11 @@
{
"id": "test-app",
"cmd": "sleep 1000",
"cpus": 0.1,
"mem": 16,
"instances": 1,
"labels": {
"PACKAGE_ID": "test-app",
"PACKAGE_VERSION": "1.2.3"
}
}

View File

@@ -0,0 +1,11 @@
{
"id": "test-app2",
"cmd": "sleep 1000",
"cpus": 0.1,
"mem": 16,
"instances": 1,
"labels": {
"PACKAGE_ID": "test-app",
"PACKAGE_VERSION": "1.2.3"
}
}

View File

@@ -0,0 +1 @@
{"port": 8046}

View File

@@ -0,0 +1 @@
{"port": 8047}

View File

@@ -1,3 +1,4 @@
import json
import subprocess
@@ -87,3 +88,38 @@ def mock_called_some_args(mock, *args, **kwargs):
return True
return False
def watch_deployment(deployment_id, count):
""" Wait for a deployment to complete.
:param deployment_id: deployment id
:type deployment_id: str
:param count: max number of seconds to wait
:type count: int
:rtype: None
"""
returncode, stdout, stderr = exec_command(
['dcos', 'marathon', 'deployment', 'watch',
'--max-count={}'.format(count), deployment_id])
assert returncode == 0
assert stderr == b''
def list_deployments(expected_count=None, app_id=None):
cmd = ['dcos', 'marathon', 'deployment', 'list']
if app_id is not None:
cmd.append(app_id)
returncode, stdout, stderr = exec_command(cmd)
result = json.loads(stdout.decode('utf-8'))
assert returncode == 0
if expected_count is not None:
assert len(result) == expected_count
assert stderr == b''
return result

View File

@@ -1,29 +0,0 @@
import json
from common import exec_command
def watch_deployment(deployment_id, count):
returncode, stdout, stderr = exec_command(
['dcos', 'marathon', 'deployment', 'watch',
'--max-count={}'.format(count), deployment_id])
assert returncode == 0
assert stderr == b''
def list_deployments(expected_count, app_id=None):
cmd = ['dcos', 'marathon', 'deployment', 'list']
if app_id is not None:
cmd.append(app_id)
returncode, stdout, stderr = exec_command(cmd)
result = json.loads(stdout.decode('utf-8'))
assert returncode == 0
if expected_count is not None:
assert len(result) == expected_count
assert stderr == b''
return result

View File

@@ -68,6 +68,7 @@ def test_version():
def test_list_property(env):
stdout = b"""core.email=test@mail.com
core.mesos_master_url=http://localhost:5050
core.reporting=False
marathon.uri=http://localhost:8080
package.cache=tmp/cache

View File

@@ -21,6 +21,7 @@ Available DCOS commands:
\thelp \tDisplay command line usage information
\tmarathon \tDeploy and manage applications on the DCOS
\tpackage \tInstall and manage DCOS software packages
\ttasks \tGet the status of mesos tasks
Get detailed command description with 'dcos <command> --help'.
""".encode('utf-8')

View File

@@ -40,6 +40,7 @@ Available DCOS commands:
\thelp \tDisplay command line usage information
\tmarathon \tDeploy and manage applications on the DCOS
\tpackage \tInstall and manage DCOS software packages
\ttasks \tGet the status of mesos tasks
Get detailed command description with 'dcos <command> --help'.
""".encode('utf-8')

View File

@@ -4,8 +4,8 @@ import os
from dcos import constants
import pytest
from common import assert_command, exec_command
from marathon_common import list_deployments, watch_deployment
from common import (assert_command, exec_command, list_deployments,
watch_deployment)
def test_help():

View File

@@ -1,7 +1,7 @@
import json
from common import assert_command, exec_command
from marathon_common import list_deployments, watch_deployment
from common import (assert_command, exec_command, list_deployments,
watch_deployment)
def test_add_group():

View File

@@ -0,0 +1,155 @@
import collections
import json
import dcos.util as util
from dcos.mesos import Task
from dcos.util import create_schema
from dcoscli.tasks.main import _task_table
import mock
import pytest
from common import (assert_command, exec_command, list_deployments,
watch_deployment)
SLEEP1 = 'tests/data/marathon/apps/sleep.json'
SLEEP2 = 'tests/data/marathon/apps/sleep2.json'
@pytest.fixture
def task():
task = Task({
"executor_id": "",
"framework_id": "20150502-231327-16842879-5050-3889-0000",
"id": "test-app.d44dd7f2-f9b7-11e4-bb43-56847afe9799",
"labels": [],
"name": "test-app",
"resources": {
"cpus": 0.1,
"disk": 0,
"mem": 16,
"ports": "[31651-31651]"
},
"slave_id": "20150513-185808-177048842-5050-1220-S0",
"state": "TASK_RUNNING",
"statuses": [
{
"state": "TASK_RUNNING",
"timestamp": 1431552866.52692
}
]
}, None)
task.user = mock.Mock(return_value='root')
return task
def test_help():
stdout = b"""Get the status of mesos tasks
Usage:
dcos tasks --info
dcos tasks [--inactive --json <task>]
Options:
-h, --help Show this screen
--info Show a short description of this subcommand
--json Print json-formatted task data
--inactive Show inactive tasks as well
--version Show version
Positional Arguments:
<task> Only match tasks whose ID matches <task>. <task> may be
a substring of the ID, or a unix glob pattern.
"""
assert_command(['dcos', 'tasks', '--help'], stdout=stdout)
def test_info():
stdout = b"Get the status of mesos tasks\n"
assert_command(['dcos', 'tasks', '--info'], stdout=stdout)
def test_tasks(task):
_install_sleep_task()
# test `dcos tasks` output
returncode, stdout, stderr = exec_command(['dcos', 'tasks', '--json'])
assert returncode == 0
assert stderr == b''
tasks = json.loads(stdout.decode('utf-8'))
assert isinstance(tasks, collections.Sequence)
assert len(tasks) == 1
schema = create_schema(task.dict())
for task in tasks:
assert not util.validate_json(task, schema)
_uninstall_sleep()
def test_task_inactive():
_install_sleep_task()
_uninstall_sleep()
_install_sleep_task()
returncode, stdout, stderr = exec_command(
['dcos', 'tasks', '--inactive', '--json'])
assert returncode == 0
assert stderr == b''
assert len(json.loads(stdout.decode('utf-8'))) > 1
returncode, stdout, stderr = exec_command(
['dcos', 'tasks', '--json'])
assert returncode == 0
assert stderr == b''
assert len(json.loads(stdout.decode('utf-8'))) == 1
_uninstall_sleep()
def test_filter(task):
_install_sleep_task()
_install_sleep_task(SLEEP2, 'test-app2')
returncode, stdout, stderr = exec_command(
['dcos', 'tasks', 'test-app2', '--json'])
assert returncode == 0
assert stderr == b''
assert len(json.loads(stdout.decode('utf-8'))) == 1
_uninstall_sleep()
_uninstall_sleep('test-app2')
# not an integration test
def test_task_table(task):
table = _task_table([task])
stdout = b"""\
NAME USER STATE ID \n\
test-app root R test-app.d44dd7f2-f9b7-11e4-bb43-56847afe9799 """
assert str(table).encode('utf-8') == stdout
def _install_sleep_task(app_path=SLEEP1, app_name='test-app'):
# install helloworld app
args = ['dcos', 'marathon', 'app', 'add', app_path]
assert_command(args)
_wait_for_deployment()
def _wait_for_deployment():
deps = list_deployments()
if deps:
watch_deployment(deps[0]['id'], 60)
def _uninstall_helloworld(args=[]):
assert_command(['dcos', 'package', 'uninstall', 'helloworld'] + args)
def _uninstall_sleep(app_id='test-app'):
assert_command(['dcos', 'marathon', 'app', 'remove', app_id])

View File

@@ -33,6 +33,7 @@ def _default_to_error(response):
def request(method,
url,
timeout=3.0,
is_success=_default_is_success,
to_error=_default_to_error,
**kwargs):
@@ -69,7 +70,7 @@ def request(method,
request.headers)
with requests.Session() as session:
response = session.send(request.prepare(), timeout=3.0)
response = session.send(request.prepare(), timeout=timeout)
except Exception as ex:
raise DCOSException(to_error(DefaultError(str(ex))).error())

281
dcos/mesos.py Normal file
View File

@@ -0,0 +1,281 @@
import fnmatch
import itertools
import dcos.http
from dcos import util
from dcos.errors import DCOSException
from six.moves import urllib
logger = util.get_logger(__name__)
def get_master(config=None):
"""Create a MesosMaster object using the url stored in the
'core.master' property of the user's config.
:param config: config
:type config: Toml
:returns: MesosMaster object
:rtype: MesosMaster
"""
if config is None:
config = util.get_config()
mesos_master_url = util.get_config_vals(
config, ['core.mesos_master_url'])[0]
return MesosMaster(mesos_master_url)
MESOS_TIMEOUT = 3
class MesosMaster(object):
"""Mesos Master Model
:param url: master url (e.g. "http://localhost:5050")
:type url: str
"""
def __init__(self, url, state=None):
self._url = url
self._state = None
def state(self):
"""Returns master's /master/state.json. Fetches and saves it if we
haven't already.
:returns: state.json
:rtype: dict
"""
if not self._state:
self._state = self.fetch('/master/state.json').json()
return self._state
def slave(self, fltr):
"""Returns the slave that has `fltr` in its id. Raises a
DCOSException if there is not exactly one such slave.
:param fltr: filter string
:type fltr: str
:returns: the slave that has `fltr` in its id
:rtype: MesosSlave
"""
slaves = self.slaves(fltr)
if len(slaves) == 0:
raise DCOSException('Slave {} no longer exists'.format(fltr))
elif len(slaves) > 1:
matches = ['\t{0}'.format(slave.id) for slave in slaves]
raise DCOSException(
"There are multiple slaves with that id. " +
"Please choose one: {}".format('\n'.join(matches)))
else:
return slaves[0]
def slaves(self, fltr=""):
"""Returns those slaves that have `fltr` in their 'id'
:param fltr: filter string
:type fltr: str
:returns: Those slaves that have `fltr` in their 'id'
:rtype: [MesosSlave]
"""
return [MesosSlave(slave)
for slave in self.state()['slaves']
if fltr in slave['id']]
def task(self, fltr):
"""Returns the task with `fltr` in its id. Raises an exception if
there is not exactly one such task.
:param fltr: filter string
:type fltr: str
:returns: the task that has `fltr` in its id
:rtype: Task
"""
tasks = self.tasks(fltr)
if len(tasks) == 0:
raise DCOSException(
'Cannot find a task containing "{}"'.format(fltr))
elif len(tasks) > 1:
msg = ["There are multiple tasks with that id. Please choose one:"]
msg += ["\t{0}".format(t["id"]) for t in tasks]
raise DCOSException('\n'.join(msg))
else:
return tasks[0]
# TODO (thomas): need to filter on task state as well as id
def tasks(self, fltr="", active_only=False):
"""Returns tasks running under the master
:param fltr: May be a substring or regex. Only return tasks
whose 'id' matches `fltr`.
:type fltr: str
:param active_only: only include active tasks
:type active_only: bool
:returns: a list of tasks
:rtype: [Task]
"""
keys = ['tasks']
if not active_only:
keys = ['completed_tasks']
tasks = []
for framework in self._framework_dicts(active_only):
tasks += \
[Task(task, self)
for task in _merge(framework, *keys)
if fltr in task['id'] or fnmatch.fnmatch(task['id'], fltr)]
return tasks
def framework(self, framework_id):
"""Returns a framework by id
:param framework_id: the framework's id
:type framework_id: int
:returns: the framework
:rtype: Framework
"""
for f in self._framework_dicts(active_only=False):
if f['id'] == framework_id:
return Framework(f)
raise DCOSException('No Framework with id [{}]'.format(framework_id))
def frameworks(self, active_only=False):
"""Returns a list of all frameworks
:param active_only: only include active frameworks
:type active_only: bool
:returns: a list of frameworks
:rtype: [Framework]
"""
return [Framework(f) for f in self._framework_dicts(active_only)]
def _framework_dicts(self, active_only=False):
keys = ['frameworks']
if not active_only:
keys.append('completed_frameworks')
return _merge(self.state(), *keys)
@util.duration
def fetch(self, path, **kwargs):
"""GET the resource located at `path`
:param path: the URL path
:type path: str
:param **kwargs: requests.get kwargs
:type **kwargs: dict
:returns: the response object
:rtype: Response
"""
url = urllib.parse.urljoin(self._url, path)
return dcos.http.get(url,
timeout=MESOS_TIMEOUT,
**kwargs)
class MesosSlave(object):
"""Mesos Slave Model
:param slave: dictionary representing the slave.
retrieved from master/state.json
:type slave: dict
"""
def __init__(self, slave):
self._slave = slave
def __getitem__(self, name):
return self._slave[name]
class Framework(object):
""" Mesos Framework Model
:param framework: framework properties
:type framework: dict
"""
def __init__(self, framework):
self._framework = framework
def __getitem__(self, name):
return self._framework[name]
class Task(object):
"""Mesos Task Model. Created from the Task objects sent in master's
state.json, which is in turn created from mesos' Task protobuf
object.
:param task: task properties
:type task: dict
:param master: mesos master
:type master: MesosMaster
"""
def __init__(self, task, master):
self._task = task
self._master = master
def dict(self):
"""
:returns: dictionary representation of this Task
:rtype: dict
"""
return self._task
def framework(self):
"""Returns the task's framework
:returns" task's framework
:rtype: Framework
"""
return self._master.framework(self["framework_id"])
def user(self):
"""Task owner
:returns: task owner
:rtype: str
"""
return self.framework()['user']
def __getitem__(self, name):
return self._task[name]
def _merge(d, *keys):
""" Merge multiple lists from a dictionary into one iterator.
e.g. _merge({'a': [1, 2], 'b': [3]}, ['a', 'b']) ->
iter(1, 2, 3)
:param d: dictionary
:type d: dict
:param *keys: keys to merge
:type *keys: [hashable]
:returns: iterator
:rtype: iter
"""
return itertools.chain(*[d[k] for k in keys])

View File

@@ -356,7 +356,7 @@ def install_with_pip(
]
if _execute_command(cmd) != 0:
# We should remove the diretory that we just created
# We should remove the directory that we just created
if new_package_dir:
shutil.rmtree(env_directory)

View File

@@ -1,4 +1,6 @@
import collections
import contextlib
import functools
import json
import logging
import os
@@ -7,6 +9,7 @@ import re
import shutil
import sys
import tempfile
import time
import jsonschema
import pystache
@@ -27,9 +30,6 @@ def get_logger(name):
return logging.getLogger(name)
logger = get_logger(__name__)
@contextlib.contextmanager
def tempdir():
"""A context manager for temporary directories.
@@ -113,6 +113,29 @@ def get_config():
os.environ[constants.DCOS_CONFIG_ENV])
def get_config_vals(config, keys):
"""Gets config values for each of the keys. Raises a DCOSException if
any of the keys don't exist.
:param config: config
:type config: Toml
:param keys: keys in the config dict
:type keys: [str]
:returns: values for each of the keys
:rtype: [object]
"""
missing = [key for key in keys if key not in config]
if missing:
msg = '\n'.join(
'Missing required config parameter: "{0}".'.format(key) +
' Please run `dcos config set {0} <value>`.'.format(key)
for key in keys)
raise DCOSException(msg)
return [config[key] for key in keys]
def which(program):
"""Returns the path to the named executable program.
@@ -193,7 +216,6 @@ def load_json(reader):
try:
return json.load(reader)
except Exception as error:
logger = get_logger(__name__)
logger.error(
'Unhandled exception while loading JSON: %r',
error)
@@ -285,6 +307,47 @@ def _format_validation_error(error):
return message
def create_schema(obj):
""" Creates a basic json schema derived from `obj`.
:param obj: object for which to derive a schema
:type obj: str | int | float | dict | list
:returns: json schema
:rtype: dict
"""
if isinstance(obj, six.string_types):
return {'type': 'string'}
elif isinstance(obj, six.integer_types):
return {'type': 'integer'}
elif isinstance(obj, float):
return {'type': 'number'}
elif isinstance(obj, collections.Mapping):
schema = {'type': 'object',
'properties': {},
'additionalProperties': False,
'required': obj.keys()}
for key, val in obj.items():
schema['properties'][key] = create_schema(val)
return schema
elif isinstance(obj, collections.Sequence):
schema = {'type': 'array'}
if obj:
schema['items'] = create_schema(obj[0])
return schema
else:
raise ValueError(
'Cannot create schema with object {} of unrecognized type'
.format(str(obj)))
def list_to_err(errs):
"""convert list of error strings to a single string
@@ -310,7 +373,6 @@ def parse_int(string):
return int(string)
except:
error = sys.exc_info()[0]
logger = get_logger(__name__)
logger.error(
'Unhandled exception while parsing string as int: %r -- %r',
string,
@@ -363,6 +425,52 @@ class CustomJsonRenderer(pystache.Renderer):
:returns: a string containing a JSON representation of the value
:rtype: str
"""
return json.dumps(val)
def duration(fn):
""" Decorator to log the duration of a function.
:param fn: function to measure
:type fn: function
:returns: wrapper function
:rtype: function
"""
@functools.wraps(fn)
def timer(*args, **kwargs):
start = time.time()
try:
return fn(*args, **kwargs)
finally:
logger.debug("duration: {0}.{1}: {2:2.2f}s".format(
fn.__module__,
fn.__name__,
time.time() - start))
return timer
def humanize_bytes(b):
""" Return a human representation of a number of bytes.
:param b: number of bytes
:type b: number
:returns: human representation of a number of bytes
:rtype: str
"""
abbrevs = (
(1 << 30, 'GB'),
(1 << 20, 'MB'),
(1 << 10, 'kB'),
(1, 'B')
)
for factor, suffix in abbrevs:
if b >= factor:
break
return "{0:.2f} {1}".format(b/float(factor), suffix)
logger = get_logger(__name__)