marathon: add dcos marathon task kill command (#971)
				
					
				
			This commit is contained in:
		
				
					committed by
					
						
						tamarrow
					
				
			
			
				
	
			
			
			
						parent
						
							9f20ab9a78
						
					
				
				
					commit
					fcd4cec620
				
			@@ -39,6 +39,7 @@ Usage:
 | 
			
		||||
    dcos marathon debug details <app-id> [--json]
 | 
			
		||||
    dcos marathon task list [--json <app-id>]
 | 
			
		||||
    dcos marathon task stop [--wipe] <task-id>
 | 
			
		||||
    dcos marathon task kill [--scale] [--wipe] [--json] [<task-ids>...]
 | 
			
		||||
    dcos marathon task show <task-id>
 | 
			
		||||
 | 
			
		||||
Commands:
 | 
			
		||||
@@ -109,6 +110,8 @@ Commands:
 | 
			
		||||
        List all tasks.
 | 
			
		||||
    task stop
 | 
			
		||||
        Stop a task.
 | 
			
		||||
    task kill
 | 
			
		||||
        Kill one or more tasks.
 | 
			
		||||
    task show
 | 
			
		||||
        List a specific task.
 | 
			
		||||
 | 
			
		||||
@@ -178,5 +181,7 @@ Positional Arguments:
 | 
			
		||||
        If omitted, properties are read from a JSON object provided on stdin.
 | 
			
		||||
    <task-id>
 | 
			
		||||
        The task ID.
 | 
			
		||||
    <task_ids>
 | 
			
		||||
        List of task IDs.
 | 
			
		||||
    <scale-factor>
 | 
			
		||||
        The factor to scale an application group by.
 | 
			
		||||
 
 | 
			
		||||
@@ -80,6 +80,11 @@ def _cmds():
 | 
			
		||||
            arg_keys=['<task-id>', '--wipe'],
 | 
			
		||||
            function=subcommand.task_stop),
 | 
			
		||||
 | 
			
		||||
        cmds.Command(
 | 
			
		||||
            hierarchy=['marathon', 'task', 'kill'],
 | 
			
		||||
            arg_keys=['<task-ids>', '--scale', '--wipe', '--json'],
 | 
			
		||||
            function=subcommand.task_kill),
 | 
			
		||||
 | 
			
		||||
        cmds.Command(
 | 
			
		||||
            hierarchy=['marathon', 'task', 'show'],
 | 
			
		||||
            arg_keys=['<task-id>'],
 | 
			
		||||
@@ -860,6 +865,49 @@ class MarathonSubcommand(object):
 | 
			
		||||
        emitter.publish(task)
 | 
			
		||||
        return 0
 | 
			
		||||
 | 
			
		||||
    def task_kill(self, task_ids, scale, wipe, json_):
 | 
			
		||||
        """Kill one or multiple Marathon tasks
 | 
			
		||||
 | 
			
		||||
        :param task_ids: the id of the task
 | 
			
		||||
        :type task_ids: [str]
 | 
			
		||||
        :param scale: Scale the app down after killing the specified tasks
 | 
			
		||||
        :type scale: bool
 | 
			
		||||
        :param wipe: whether remove reservations and persistent volumes.
 | 
			
		||||
        :type wipe: bool
 | 
			
		||||
        :param json_: output JSON if true
 | 
			
		||||
        :type json_: bool
 | 
			
		||||
        :returns: process return code
 | 
			
		||||
        :rtype: int
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
        client = self._create_marathon_client()
 | 
			
		||||
        payload = client.kill_and_scale_tasks(task_ids, scale, wipe)
 | 
			
		||||
 | 
			
		||||
        def print_deployment(deployment, json_):
 | 
			
		||||
            if json_:
 | 
			
		||||
                emitter.publish(deployment)
 | 
			
		||||
            else:
 | 
			
		||||
                emitter.publish('Created deployment: {}'.format(
 | 
			
		||||
                    deployment['deploymentId']))
 | 
			
		||||
 | 
			
		||||
        def print_killed_tasks(payload, json_):
 | 
			
		||||
            if json_:
 | 
			
		||||
                emitter.publish(payload)
 | 
			
		||||
            else:
 | 
			
		||||
                emitter.publish('Killed tasks: {}'.format(
 | 
			
		||||
                    [task['id'] for task in payload['tasks']]))
 | 
			
		||||
 | 
			
		||||
        if scale:
 | 
			
		||||
            print_deployment(payload, json_)
 | 
			
		||||
        else:
 | 
			
		||||
            print_killed_tasks(payload, json_)
 | 
			
		||||
 | 
			
		||||
            if len(payload['tasks']) == 0:
 | 
			
		||||
                raise DCOSException(
 | 
			
		||||
                    'Failed to kill tasks. task-ids seems to be unknown')
 | 
			
		||||
 | 
			
		||||
        return 0
 | 
			
		||||
 | 
			
		||||
    def task_show(self, task_id):
 | 
			
		||||
        """
 | 
			
		||||
        :param task_id: the task id
 | 
			
		||||
 
 | 
			
		||||
@@ -639,6 +639,57 @@ def test_stop_task_wipe():
 | 
			
		||||
        _stop_task(task_id, '--wipe')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_kill_one_task():
 | 
			
		||||
    with _zero_instance_app():
 | 
			
		||||
        start_app('zero-instance-app', 1)
 | 
			
		||||
        watch_all_deployments()
 | 
			
		||||
        task_list = _list_tasks(1, 'zero-instance-app')
 | 
			
		||||
        task_id = [task_list[0]['id']]
 | 
			
		||||
 | 
			
		||||
        _kill_task(task_id)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_kill_two_tasks():
 | 
			
		||||
    with _zero_instance_app():
 | 
			
		||||
        start_app('zero-instance-app', 2)
 | 
			
		||||
        watch_all_deployments()
 | 
			
		||||
        task_list = _list_tasks(2, 'zero-instance-app')
 | 
			
		||||
        task_ids = [task['id'] for task in task_list]
 | 
			
		||||
 | 
			
		||||
        _kill_task(task_ids)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_kill_and_scale_task():
 | 
			
		||||
    with _zero_instance_app():
 | 
			
		||||
        start_app('zero-instance-app', 2)
 | 
			
		||||
        watch_all_deployments()
 | 
			
		||||
        task_list = _list_tasks(2, 'zero-instance-app')
 | 
			
		||||
        task_id = [task_list[0]['id']]
 | 
			
		||||
 | 
			
		||||
        _kill_task(task_id, scale=True)
 | 
			
		||||
 | 
			
		||||
        task_list = _list_tasks(1, 'zero-instance-app')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_kill_unknown_task():
 | 
			
		||||
    with _zero_instance_app():
 | 
			
		||||
        start_app('zero-instance-app')
 | 
			
		||||
        watch_all_deployments()
 | 
			
		||||
        task_id = ['unknown-task-id']
 | 
			
		||||
 | 
			
		||||
        _kill_task(task_id, expect_success=False)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_kill_task_wipe():
 | 
			
		||||
    with _zero_instance_app():
 | 
			
		||||
        start_app('zero-instance-app', 1)
 | 
			
		||||
        watch_all_deployments()
 | 
			
		||||
        task_list = _list_tasks(1, 'zero-instance-app')
 | 
			
		||||
        task_id = [task_list[0]['id']]
 | 
			
		||||
 | 
			
		||||
        _kill_task(task_id, wipe=True)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_stop_unknown_task():
 | 
			
		||||
    with _zero_instance_app():
 | 
			
		||||
        start_app('zero-instance-app')
 | 
			
		||||
@@ -761,6 +812,28 @@ def _stop_task(task_id, wipe=None, expect_success=True):
 | 
			
		||||
        assert returncode == 1
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _kill_task(task_ids, scale=None, wipe=None, expect_success=True):
 | 
			
		||||
    cmd = ['dcos', 'marathon', 'task', 'kill', '--json'] + task_ids
 | 
			
		||||
    if scale:
 | 
			
		||||
        cmd.append('--scale')
 | 
			
		||||
    if wipe:
 | 
			
		||||
        cmd.append('--wipe')
 | 
			
		||||
    returncode, stdout, stderr = exec_command(cmd)
 | 
			
		||||
 | 
			
		||||
    if expect_success:
 | 
			
		||||
        assert returncode == 0
 | 
			
		||||
        assert stderr == b''
 | 
			
		||||
        result = json.loads(stdout.decode('utf-8'))
 | 
			
		||||
        if scale:
 | 
			
		||||
            assert 'deploymentId' in result
 | 
			
		||||
        else:
 | 
			
		||||
            assert sorted(
 | 
			
		||||
                [task['id'] for task in result['tasks']]) == sorted(task_ids)
 | 
			
		||||
 | 
			
		||||
    else:
 | 
			
		||||
        assert returncode == 1
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@contextlib.contextmanager
 | 
			
		||||
def _zero_instance_app():
 | 
			
		||||
    with app('tests/data/marathon/apps/zero_instance_sleep.json',
 | 
			
		||||
 
 | 
			
		||||
@@ -387,6 +387,37 @@ class Client(object):
 | 
			
		||||
        response = self._rpc.http_req(http.delete, path, params=params)
 | 
			
		||||
        return response.json()
 | 
			
		||||
 | 
			
		||||
    def kill_and_scale_tasks(self, task_ids, scale=None, wipe=None):
 | 
			
		||||
        """Kills the tasks for a given application,
 | 
			
		||||
        and can target a given agent, with a future target scale
 | 
			
		||||
 | 
			
		||||
        :param task_ids: a list of task ids to kill
 | 
			
		||||
        :type task_ids: list
 | 
			
		||||
        :param scale: Scale the app down after killing the specified tasks
 | 
			
		||||
        :type scale: bool
 | 
			
		||||
        :param wipe: whether remove reservations and persistent volumes.
 | 
			
		||||
        :type wipe: bool
 | 
			
		||||
        :returns: If scale=false, all tasks that were killed are returned.
 | 
			
		||||
                  If scale=true, than a deployment is triggered and the
 | 
			
		||||
                  deployment id and version returned.
 | 
			
		||||
        :rtype: list | dict
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
        params = {}
 | 
			
		||||
        path = 'v2/tasks/delete'
 | 
			
		||||
 | 
			
		||||
        if scale:
 | 
			
		||||
            params['scale'] = scale
 | 
			
		||||
        if wipe:
 | 
			
		||||
            params['wipe'] = wipe
 | 
			
		||||
 | 
			
		||||
        response = self._rpc.http_req(http.post,
 | 
			
		||||
                                      path,
 | 
			
		||||
                                      params=params,
 | 
			
		||||
                                      json={'ids': task_ids})
 | 
			
		||||
 | 
			
		||||
        return response.json()
 | 
			
		||||
 | 
			
		||||
    def restart_app(self, app_id, force=False):
 | 
			
		||||
        """Performs a rolling restart of all of the tasks.
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user