This commit:8971ccdb08Inadvertently undid changes originally introduced in this commit:f33a45ee94Unfortunately, the integration tests didn't catch this because all `task exec` integration tests were hidden behind an environment variable: @pytest.mark.skipif('DCOS_DEBUGGING_ENABLED' not in os.environ, reason="Requires Agent Debugging APIs") This commit reintroduces the changes that were inadvertently undone and enables the `task exec` integration tests by default so that something like this doesn't occur in the future.
1523 lines
48 KiB
Python
1523 lines
48 KiB
Python
import base64
|
|
import fnmatch
|
|
import itertools
|
|
import json
|
|
import os
|
|
import signal
|
|
import sys
|
|
import threading
|
|
import time
|
|
import uuid
|
|
|
|
from functools import partial
|
|
from queue import Queue
|
|
|
|
from six.moves import urllib
|
|
|
|
from dcos import config, http, recordio, util
|
|
|
|
from dcos.errors import DCOSException, DCOSHTTPException
|
|
|
|
if not util.is_windows_platform():
|
|
import termios
|
|
import tty
|
|
|
|
logger = util.get_logger(__name__)
|
|
|
|
|
|
COMPLETED_TASK_STATES = [
|
|
"TASK_FINISHED", "TASK_KILLED", "TASK_FAILED", "TASK_LOST", "TASK_ERROR",
|
|
"TASK_GONE", "TASK_GONE_BY_OPERATOR", "TASK_DROPPED", "TASK_UNREACHABLE",
|
|
"TASK_UNKNOWN"
|
|
]
|
|
|
|
|
|
def get_master(dcos_client=None):
|
|
"""Create a Master object using the url stored in the
|
|
'core.mesos_master_url' property if it exists. Otherwise, we use
|
|
the `core.dcos_url` property
|
|
|
|
:param dcos_client: DCOSClient
|
|
:type dcos_client: DCOSClient | None
|
|
:returns: master state object
|
|
:rtype: Master
|
|
"""
|
|
|
|
dcos_client = dcos_client or DCOSClient()
|
|
return Master(dcos_client.get_master_state())
|
|
|
|
|
|
class DCOSClient(object):
|
|
"""Client for communicating with DC/OS"""
|
|
|
|
def __init__(self):
|
|
toml_config = config.get_config()
|
|
|
|
self._dcos_url = config.get_config_val("core.dcos_url", toml_config)
|
|
if self._dcos_url is None:
|
|
raise config.missing_config_exception(['core.dcos_url'])
|
|
self._mesos_master_url = config.get_config_val(
|
|
'core.mesos_master_url', toml_config)
|
|
|
|
self._timeout = config.get_config_val('core.timeout', toml_config)
|
|
|
|
def get_dcos_url(self, path):
|
|
""" Create a DC/OS URL
|
|
|
|
:param path: the path suffix of the URL
|
|
:type path: str
|
|
:returns: DC/OS URL
|
|
:rtype: str
|
|
"""
|
|
|
|
return urllib.parse.urljoin(self._dcos_url, path)
|
|
|
|
def master_url(self, path):
|
|
""" Create a master URL
|
|
|
|
:param path: the path suffix of the desired URL
|
|
:type path: str
|
|
:returns: URL that hits the master
|
|
:rtype: str
|
|
"""
|
|
|
|
base_url = (self._mesos_master_url or
|
|
urllib.parse.urljoin(self._dcos_url, 'mesos/'))
|
|
return urllib.parse.urljoin(base_url, path)
|
|
|
|
def slave_url(self, slave_id, private_url, path):
|
|
"""Create a slave URL
|
|
|
|
:param slave_id: slave ID
|
|
:type slave_id: str
|
|
:param private_url: The slave's private URL derived from its
|
|
pid. Used when we're accessing mesos
|
|
directly, rather than through DC/OS.
|
|
:type private_url: str
|
|
:param path: the path suffix of the desired URL
|
|
:type path: str
|
|
:returns: URL that hits the master
|
|
:rtype: str
|
|
|
|
"""
|
|
|
|
if self._mesos_master_url:
|
|
return urllib.parse.urljoin(private_url, path)
|
|
else:
|
|
return urllib.parse.urljoin(self._dcos_url,
|
|
'slave/{}/{}'.format(slave_id, path))
|
|
|
|
def get_master_state(self):
|
|
"""Get the Mesos master state json object
|
|
|
|
:returns: Mesos' master state json object
|
|
:rtype: dict
|
|
"""
|
|
|
|
url = self.master_url('master/state.json')
|
|
return http.get(url, timeout=self._timeout).json()
|
|
|
|
def get_slave_state(self, slave_id, private_url):
|
|
"""Get the Mesos slave state json object
|
|
|
|
:param slave_id: slave ID
|
|
:type slave_id: str
|
|
:param private_url: The slave's private URL derived from its
|
|
pid. Used when we're accessing mesos
|
|
directly, rather than through DC/OS.
|
|
:type private_url: str
|
|
:returns: Mesos' master state json object
|
|
:rtype: dict
|
|
|
|
"""
|
|
|
|
url = self.slave_url(slave_id, private_url, 'state.json')
|
|
return http.get(url, timeout=self._timeout).json()
|
|
|
|
def get_state_summary(self):
|
|
"""Get the Mesos master state summary json object
|
|
|
|
:returns: Mesos' master state summary json object
|
|
:rtype: dict
|
|
"""
|
|
|
|
url = self.master_url('master/state-summary')
|
|
return http.get(url, timeout=self._timeout).json()
|
|
|
|
def slave_file_read(self, slave_id, private_url, path, offset, length):
|
|
"""See the master_file_read() docs
|
|
|
|
:param slave_id: slave ID
|
|
:type slave_id: str
|
|
:param path: absolute path to read
|
|
:type path: str
|
|
:param private_url: The slave's private URL derived from its
|
|
pid. Used when we're accessing mesos
|
|
directly, rather than through DC/OS.
|
|
:type private_url: str
|
|
:param offset: start byte location, or -1. -1 means read no data, and
|
|
is used to fetch the size of the file in the response's
|
|
'offset' parameter.
|
|
:type offset: int
|
|
:param length: number of bytes to read, or -1. -1 means read the whole
|
|
file
|
|
:type length: int
|
|
:returns: files/read.json response
|
|
:rtype: dict
|
|
|
|
"""
|
|
|
|
url = self.slave_url(slave_id,
|
|
private_url,
|
|
'files/read.json')
|
|
params = {'path': path,
|
|
'length': length,
|
|
'offset': offset}
|
|
return http.get(url, params=params, timeout=self._timeout).json()
|
|
|
|
def master_file_read(self, path, length, offset):
|
|
"""This endpoint isn't well documented anywhere, so here is the spec
|
|
derived from the mesos source code:
|
|
|
|
request format:
|
|
{
|
|
path: absolute path to read
|
|
offset: start byte location, or -1. -1 means read no data, and
|
|
is used to fetch the size of the file in the response's
|
|
'offset' parameter.
|
|
length: number of bytes to read, or -1. -1 means read the whole
|
|
file.
|
|
}
|
|
|
|
response format:
|
|
{
|
|
data: file data. Empty if a request.offset=-1. Could be
|
|
smaller than request.length if EOF was reached, or if (I
|
|
believe) request.length is larger than the length
|
|
supported by the server (16 pages I believe).
|
|
|
|
offset: the offset value from the request, or the size of the
|
|
file if the request offset was -1 or >= the file size.
|
|
}
|
|
|
|
:param path: absolute path to read
|
|
:type path: str
|
|
:param offset: start byte location, or -1. -1 means read no data, and
|
|
is used to fetch the size of the file in the response's
|
|
'offset' parameter.
|
|
:type offset: int
|
|
:param length: number of bytes to read, or -1. -1 means read the whole
|
|
file
|
|
:type length: int
|
|
:returns: files/read.json response
|
|
:rtype: dict
|
|
"""
|
|
|
|
url = self.master_url('files/read.json')
|
|
params = {'path': path,
|
|
'length': length,
|
|
'offset': offset}
|
|
return http.get(url, params=params, timeout=self._timeout).json()
|
|
|
|
def shutdown_framework(self, framework_id):
|
|
"""Shuts down a Mesos framework
|
|
|
|
:param framework_id: ID of the framework to shutdown
|
|
:type framework_id: str
|
|
:returns: None
|
|
"""
|
|
|
|
logger.info('Shutting down framework {}'.format(framework_id))
|
|
|
|
data = 'frameworkId={}'.format(framework_id)
|
|
|
|
url = self.master_url('master/teardown')
|
|
|
|
# In Mesos 0.24, /shutdown was removed.
|
|
# If /teardown doesn't exist, we try /shutdown.
|
|
try:
|
|
http.post(url, data=data, timeout=self._timeout)
|
|
except DCOSHTTPException as e:
|
|
if e.response.status_code == 404:
|
|
url = self.master_url('master/shutdown')
|
|
http.post(url, data=data, timeout=self._timeout)
|
|
else:
|
|
raise
|
|
|
|
def metadata(self):
|
|
""" GET /metadata
|
|
|
|
:returns: /metadata content
|
|
:rtype: dict
|
|
"""
|
|
url = self.get_dcos_url('metadata')
|
|
return http.get(url, timeout=self._timeout).json()
|
|
|
|
def browse(self, slave, path):
|
|
""" GET /files/browse.json
|
|
|
|
Request
|
|
path:... # path to run ls on
|
|
|
|
Response
|
|
[
|
|
{
|
|
path: # full path to file
|
|
nlink:
|
|
size:
|
|
mtime:
|
|
mode:
|
|
uid:
|
|
gid:
|
|
}
|
|
]
|
|
|
|
:param slave: slave to issue the request on
|
|
:type slave: Slave
|
|
:returns: /files/browse.json response
|
|
:rtype: dict
|
|
"""
|
|
|
|
url = self.slave_url(slave['id'],
|
|
slave.http_url(),
|
|
'files/browse.json')
|
|
return http.get(url, params={'path': path}).json()
|
|
|
|
|
|
class MesosDNSClient(object):
|
|
""" Mesos-DNS client
|
|
|
|
:param url: mesos-dns URL
|
|
:type url: str
|
|
"""
|
|
def __init__(self, url=None):
|
|
self.url = url or urllib.parse.urljoin(
|
|
config.get_config_val('core.dcos_url'), '/mesos_dns/')
|
|
|
|
def _path(self, path):
|
|
""" Construct a full path
|
|
|
|
:param path: path suffix
|
|
:type path: str
|
|
:returns: full path
|
|
:rtype: str
|
|
"""
|
|
return urllib.parse.urljoin(self.url, path)
|
|
|
|
def hosts(self, host):
|
|
""" GET v1/hosts/<host>
|
|
|
|
:param host: host
|
|
:type host: str
|
|
:returns: {'ip', 'host'} dictionary
|
|
:rtype: dict(str, str)
|
|
"""
|
|
url = self._path('v1/hosts/{}'.format(host))
|
|
return http.get(url, headers={}).json()
|
|
|
|
|
|
class Master(object):
|
|
"""Mesos Master Model
|
|
|
|
:param state: Mesos master's state.json
|
|
:type state: dict
|
|
"""
|
|
|
|
def __init__(self, state):
|
|
self._state = state
|
|
self._frameworks = {}
|
|
self._slaves = {}
|
|
|
|
def state(self):
|
|
"""Returns master's master/state.json.
|
|
|
|
:returns: state.json
|
|
:rtype: dict
|
|
"""
|
|
|
|
return self._state
|
|
|
|
def slave(self, fltr):
|
|
"""Returns the slave that has `fltr` in its ID. If any slaves
|
|
are an exact match, returns that task, id not raises a
|
|
DCOSException if there is not exactly one such slave.
|
|
|
|
:param fltr: filter string
|
|
:type fltr: str
|
|
:returns: the slave that has `fltr` in its ID
|
|
:rtype: Slave
|
|
"""
|
|
|
|
slaves = self.slaves(fltr)
|
|
|
|
if len(slaves) == 0:
|
|
raise DCOSException('No agent found with ID "{}".'.format(fltr))
|
|
|
|
elif len(slaves) > 1:
|
|
|
|
exact_matches = [s for s in slaves if s['id'] == fltr]
|
|
if len(exact_matches) == 1:
|
|
return exact_matches[0]
|
|
|
|
else:
|
|
matches = ['\t{0}'.format(s['id']) for s in slaves]
|
|
raise DCOSException(
|
|
"There are multiple agents with that ID. " +
|
|
"Please choose one:\n{}".format('\n'.join(matches)))
|
|
|
|
else:
|
|
return slaves[0]
|
|
|
|
def task(self, fltr, completed=False):
|
|
"""Returns the task with `fltr` in its ID. Raises a DCOSException if
|
|
there is not exactly one such task.
|
|
|
|
:param fltr: filter string
|
|
:type fltr: str
|
|
:returns: the task that has `fltr` in its ID
|
|
:param completed: also include completed tasks
|
|
:type completed: bool
|
|
:rtype: Task
|
|
"""
|
|
|
|
tasks = self.tasks(fltr, completed)
|
|
|
|
if len(tasks) == 0:
|
|
raise DCOSException(
|
|
'Cannot find a task with ID containing "{}"'.format(fltr))
|
|
|
|
elif len(tasks) > 1:
|
|
msg = [("There are multiple tasks with ID matching [{}]. " +
|
|
"Please choose one:").format(fltr)]
|
|
msg += ["\t{0}".format(t["id"]) for t in tasks]
|
|
raise DCOSException('\n'.join(msg))
|
|
|
|
else:
|
|
return tasks[0]
|
|
|
|
def framework(self, framework_id):
|
|
"""Returns a framework by ID
|
|
|
|
:param framework_id: the framework's ID
|
|
:type framework_id: str
|
|
:returns: the framework
|
|
:rtype: Framework
|
|
"""
|
|
|
|
for f in self._framework_dicts(True, True):
|
|
if f['id'] == framework_id:
|
|
return self._framework_obj(f)
|
|
return None
|
|
|
|
def slaves(self, fltr=""):
|
|
"""Returns those slaves that have `fltr` in their 'id'
|
|
|
|
:param fltr: filter string
|
|
:type fltr: str
|
|
:returns: Those slaves that have `fltr` in their 'id'
|
|
:rtype: [Slave]
|
|
"""
|
|
|
|
return [self._slave_obj(slave)
|
|
for slave in self.state()['slaves']
|
|
if fltr in slave['id']]
|
|
|
|
def tasks(self, fltr=None, completed=False, all_=False):
|
|
"""Returns tasks running under the master
|
|
|
|
:param fltr: May be None, a substring or regex. None returns all tasks,
|
|
else return tasks whose 'id' matches `fltr`.
|
|
:type fltr: str | None
|
|
:param completed: completed tasks only
|
|
:type completed: bool
|
|
:param all_: If True, include all tasks
|
|
:type all_: bool
|
|
:returns: a list of tasks
|
|
:rtype: [Task]
|
|
"""
|
|
|
|
keys = ['tasks']
|
|
show_completed = completed or all_
|
|
if show_completed:
|
|
keys.extend(['completed_tasks'])
|
|
|
|
tasks = []
|
|
# get all frameworks
|
|
for framework in self._framework_dicts(True, True, True):
|
|
for task in _merge(framework, keys):
|
|
state = task.get("state")
|
|
if completed and state not in COMPLETED_TASK_STATES:
|
|
continue
|
|
|
|
if fltr is None or \
|
|
fltr in task['id'] or \
|
|
fnmatch.fnmatchcase(task['id'], fltr):
|
|
task = self._framework_obj(framework).task(task['id'])
|
|
tasks.append(task)
|
|
|
|
return tasks
|
|
|
|
def get_container_id(self, task_id):
|
|
"""Returns the container ID for a task ID matching `task_id`
|
|
|
|
:param task_id: The task ID which will be mapped to container ID
|
|
:type task_id: str
|
|
:returns: The container ID associated with 'task_id'
|
|
:rtype: str
|
|
"""
|
|
|
|
def _get_task(task_id):
|
|
candidates = []
|
|
if 'frameworks' in self.state():
|
|
for framework in self.state()['frameworks']:
|
|
if 'tasks' in framework:
|
|
for task in framework['tasks']:
|
|
if 'id' in task:
|
|
if task['id'].startswith(task_id):
|
|
candidates.append(task)
|
|
|
|
if len(candidates) == 1:
|
|
return candidates[0]
|
|
|
|
raise DCOSException(
|
|
"More than one task matching '{}' found: {}"
|
|
.format(task_id, candidates))
|
|
|
|
def _get_container_status(task):
|
|
if 'statuses' in task:
|
|
if len(task['statuses']) > 0:
|
|
if 'container_status' in task['statuses'][0]:
|
|
return task['statuses'][0]['container_status']
|
|
|
|
raise DCOSException(
|
|
"Unable to obtain container status for task '{}'"
|
|
.format(task['id']))
|
|
|
|
def _get_container_id(container_status):
|
|
if 'container_id' in container_status:
|
|
if 'value' in container_status['container_id']:
|
|
return container_status['container_id']
|
|
|
|
raise DCOSException(
|
|
"No container found for the specified task."
|
|
" It might still be spinning up."
|
|
" Please try again.")
|
|
|
|
if not task_id:
|
|
raise DCOSException("Invalid task ID")
|
|
|
|
task = _get_task(task_id)
|
|
container_status = _get_container_status(task)
|
|
return _get_container_id(container_status)
|
|
|
|
def frameworks(self, inactive=False, completed=False):
|
|
"""Returns a list of all frameworks
|
|
|
|
:param inactive: also include inactive frameworks
|
|
:type inactive: bool
|
|
:param completed: also include completed frameworks
|
|
:type completed: bool
|
|
:returns: a list of frameworks
|
|
:rtype: [Framework]
|
|
"""
|
|
|
|
return [self._framework_obj(framework)
|
|
for framework in self._framework_dicts(inactive, completed)]
|
|
|
|
@util.duration
|
|
def fetch(self, path, **kwargs):
|
|
"""GET the resource located at `path`
|
|
|
|
:param path: the URL path
|
|
:type path: str
|
|
:param **kwargs: http.get kwargs
|
|
:type **kwargs: dict
|
|
:returns: the response object
|
|
:rtype: Response
|
|
"""
|
|
|
|
url = urllib.parse.urljoin(self._base_url(), path)
|
|
return http.get(url, **kwargs)
|
|
|
|
def _slave_obj(self, slave):
|
|
"""Returns the Slave object corresponding to the provided `slave`
|
|
dict. Creates it if it doesn't exist already.
|
|
|
|
:param slave: slave
|
|
:type slave: dict
|
|
:returns: Slave
|
|
:rtype: Slave
|
|
"""
|
|
|
|
if slave['id'] not in self._slaves:
|
|
self._slaves[slave['id']] = Slave(slave, None, self)
|
|
return self._slaves[slave['id']]
|
|
|
|
def _framework_obj(self, framework):
|
|
"""Returns the Framework object corresponding to the provided `framework`
|
|
dict. Creates it if it doesn't exist already.
|
|
|
|
:param framework: framework
|
|
:type framework: dict
|
|
:returns: Framework
|
|
:rtype: Framework
|
|
"""
|
|
|
|
if framework['id'] not in self._frameworks:
|
|
self._frameworks[framework['id']] = Framework(framework, self)
|
|
return self._frameworks[framework['id']]
|
|
|
|
def _framework_dicts(self, inactive=False, completed=False, active=True):
|
|
"""Returns a list of all frameworks as their raw dictionaries
|
|
|
|
:param inactive: include inactive frameworks
|
|
:type inactive: bool
|
|
:param completed: include completed frameworks
|
|
:type completed: bool
|
|
:param active: include active frameworks
|
|
:type active: bool
|
|
:returns: a list of frameworks
|
|
"""
|
|
|
|
if completed:
|
|
for framework in self.state()['completed_frameworks']:
|
|
yield framework
|
|
|
|
for framework in self.state()['frameworks']:
|
|
active_state = framework['active']
|
|
if (active_state and active) or (not active_state and inactive):
|
|
yield framework
|
|
|
|
|
|
class Slave(object):
|
|
"""Mesos Slave Model
|
|
|
|
:param short_state: slave's entry from the master's state.json
|
|
:type short_state: dict
|
|
:param state: slave's state.json
|
|
:type state: dict | None
|
|
:param master: slave's master
|
|
:type master: Master
|
|
"""
|
|
|
|
def __init__(self, short_state, state, master):
|
|
self._short_state = short_state
|
|
self._state = state
|
|
self._master = master
|
|
|
|
def state(self):
|
|
"""Get the slave's state.json object. Fetch it if it's not already
|
|
an instance variable.
|
|
|
|
:returns: This slave's state.json object
|
|
:rtype: dict
|
|
"""
|
|
|
|
if not self._state:
|
|
self._state = DCOSClient().get_slave_state(self['id'],
|
|
self.http_url())
|
|
return self._state
|
|
|
|
def http_url(self):
|
|
"""
|
|
:returns: The private HTTP URL of the slave. Derived from the
|
|
`pid` property.
|
|
:rtype: str
|
|
"""
|
|
|
|
parsed_pid = parse_pid(self['pid'])
|
|
return 'http://{}:{}'.format(parsed_pid[1], parsed_pid[2])
|
|
|
|
def _framework_dicts(self):
|
|
"""Returns the framework dictionaries from the state.json dict
|
|
|
|
:returns: frameworks
|
|
:rtype: [dict]
|
|
"""
|
|
|
|
return _merge(self.state(), ['frameworks', 'completed_frameworks'])
|
|
|
|
def executor_dicts(self):
|
|
"""Returns the executor dictionaries from the state.json
|
|
|
|
:returns: executors
|
|
:rtype: [dict]
|
|
"""
|
|
|
|
iters = [_merge(framework, ['executors', 'completed_executors'])
|
|
for framework in self._framework_dicts()]
|
|
return itertools.chain(*iters)
|
|
|
|
def __getitem__(self, name):
|
|
"""Support the slave[attr] syntax
|
|
|
|
:param name: attribute to get
|
|
:type name: str
|
|
:returns: the value for this attribute in the underlying
|
|
slave dictionary
|
|
:rtype: object
|
|
"""
|
|
|
|
return self._short_state[name]
|
|
|
|
|
|
class Framework(object):
|
|
""" Mesos Framework Model
|
|
|
|
:param framework: framework properties
|
|
:type framework: dict
|
|
:param master: framework's master
|
|
:type master: Master
|
|
"""
|
|
|
|
def __init__(self, framework, master):
|
|
self._framework = framework
|
|
self._master = master
|
|
self._tasks = {} # id->Task map
|
|
|
|
def task(self, task_id):
|
|
"""Returns a task by id
|
|
|
|
:param task_id: the task's id
|
|
:type task_id: str
|
|
:returns: the task
|
|
:rtype: Task
|
|
"""
|
|
|
|
for task in _merge(self._framework, ['tasks', 'completed_tasks']):
|
|
if task['id'] == task_id:
|
|
return self._task_obj(task)
|
|
return None
|
|
|
|
def _task_obj(self, task):
|
|
"""Returns the Task object corresponding to the provided `task`
|
|
dict. Creates it if it doesn't exist already.
|
|
|
|
:param task: task
|
|
:type task: dict
|
|
:returns: Task
|
|
:rtype: Task
|
|
"""
|
|
|
|
if task['id'] not in self._tasks:
|
|
self._tasks[task['id']] = Task(task, self._master)
|
|
return self._tasks[task['id']]
|
|
|
|
def dict(self):
|
|
return self._framework
|
|
|
|
def __getitem__(self, name):
|
|
"""Support the framework[attr] syntax
|
|
|
|
:param name: attribute to get
|
|
:type name: str
|
|
:returns: the value for this attribute in the underlying
|
|
framework dictionary
|
|
:rtype: object
|
|
"""
|
|
|
|
return self._framework[name]
|
|
|
|
|
|
class Task(object):
|
|
"""Mesos Task Model.
|
|
|
|
:param task: task properties
|
|
:type task: dict
|
|
:param master: mesos master
|
|
:type master: Master
|
|
"""
|
|
|
|
def __init__(self, task, master):
|
|
self._task = task
|
|
self._master = master
|
|
|
|
def dict(self):
|
|
"""
|
|
:returns: dictionary representation of this Task
|
|
:rtype: dict
|
|
"""
|
|
|
|
return self._task
|
|
|
|
def framework(self):
|
|
"""Returns this task's framework
|
|
|
|
:returns: task's framework
|
|
:rtype: Framework
|
|
"""
|
|
|
|
return self._master.framework(self["framework_id"])
|
|
|
|
def slave(self):
|
|
"""Returns the task's slave
|
|
|
|
:returns: task's slave
|
|
:rtype: Slave
|
|
"""
|
|
|
|
return self._master.slave(self["slave_id"])
|
|
|
|
def user(self):
|
|
"""Task owner
|
|
|
|
:returns: task owner
|
|
:rtype: str
|
|
"""
|
|
|
|
return self.framework()['user']
|
|
|
|
def executor(self):
|
|
""" Returns this tasks' executor
|
|
|
|
:returns: task's executor
|
|
:rtype: dict
|
|
"""
|
|
for executor in self.slave().executor_dicts():
|
|
tasks = _merge(executor,
|
|
['completed_tasks',
|
|
'tasks',
|
|
'queued_tasks'])
|
|
if any(task['id'] == self['id'] for task in tasks):
|
|
return executor
|
|
return None
|
|
|
|
def directory(self):
|
|
""" Sandbox directory for this task
|
|
|
|
:returns: path to task's sandbox
|
|
:rtype: str
|
|
"""
|
|
|
|
return self.executor()['directory']
|
|
|
|
def __getitem__(self, name):
|
|
"""Support the task[attr] syntax
|
|
|
|
:param name: attribute to get
|
|
:type name: str
|
|
:returns: the value for this attribute in the underlying
|
|
task dictionary
|
|
:rtype: object
|
|
"""
|
|
|
|
return self._task[name]
|
|
|
|
def __contains__(self, name):
|
|
"""Supprt the `attr in task` syntax
|
|
|
|
:param name: attribute to test
|
|
:type name: str
|
|
:returns: True if attribute is present in the underlying dict
|
|
:rtype: bool
|
|
"""
|
|
|
|
return name in self._task
|
|
|
|
|
|
class MesosFile(object):
|
|
"""File-like object that is backed by a remote slave or master file.
|
|
Uses the files/read.json endpoint.
|
|
|
|
If `task` is provided, the file host is `task.slave()`. If
|
|
`slave` is provided, the file host is `slave`. It is invalid to
|
|
provide both. If neither is provided, the file host is the
|
|
leading master.
|
|
|
|
:param path: file's path, relative to the sandbox if `task` is given
|
|
:type path: str
|
|
:param task: file's task
|
|
:type task: Task | None
|
|
:param slave: slave where the file lives
|
|
:type slave: Slave | None
|
|
:param dcos_client: client to use for network requests
|
|
:type dcos_client: DCOSClient | None
|
|
|
|
"""
|
|
|
|
def __init__(self, path, task=None, slave=None, dcos_client=None):
|
|
if task and slave:
|
|
raise ValueError(
|
|
"You cannot provide both `task` and `slave` " +
|
|
"arguments. `slave` is understood to be `task.slave()`")
|
|
|
|
if slave:
|
|
self._slave = slave
|
|
elif task:
|
|
self._slave = task.slave()
|
|
else:
|
|
self._slave = None
|
|
|
|
self._task = task
|
|
self._path = path
|
|
self._dcos_client = dcos_client or DCOSClient()
|
|
self._cursor = 0
|
|
|
|
def size(self):
|
|
"""Size of the file
|
|
|
|
:returns: size of the file
|
|
:rtype: int
|
|
"""
|
|
|
|
params = self._params(0, offset=-1)
|
|
return self._fetch(params)["offset"]
|
|
|
|
def seek(self, offset, whence=os.SEEK_SET):
|
|
"""Seek to the provided location in the file.
|
|
|
|
:param offset: location to seek to
|
|
:type offset: int
|
|
:param whence: determines whether `offset` represents a
|
|
location that is absolute, relative to the
|
|
beginning of the file, or relative to the end
|
|
of the file
|
|
:type whence: os.SEEK_SET | os.SEEK_CUR | os.SEEK_END
|
|
:returns: None
|
|
:rtype: None
|
|
"""
|
|
|
|
if whence == os.SEEK_SET:
|
|
self._cursor = 0 + offset
|
|
elif whence == os.SEEK_CUR:
|
|
self._cursor += offset
|
|
elif whence == os.SEEK_END:
|
|
self._cursor = self.size() + offset
|
|
else:
|
|
raise ValueError(
|
|
"Unexpected value for `whence`: {}".format(whence))
|
|
|
|
def tell(self):
|
|
""" The current cursor position.
|
|
|
|
:returns: the current cursor position
|
|
:rtype: int
|
|
"""
|
|
|
|
return self._cursor
|
|
|
|
def read(self, length=None):
|
|
"""Reads up to `length` bytes, or the entire file if `length` is None.
|
|
|
|
:param length: number of bytes to read
|
|
:type length: int | None
|
|
:returns: data read
|
|
:rtype: str
|
|
"""
|
|
|
|
data = ''
|
|
while length is None or length - len(data) > 0:
|
|
chunk_length = -1 if length is None else length - len(data)
|
|
chunk = self._fetch_chunk(chunk_length)
|
|
if chunk == '':
|
|
break
|
|
data += chunk
|
|
|
|
return data
|
|
|
|
def _host_path(self):
|
|
""" The absolute path to the file on slave.
|
|
|
|
:returns: the absolute path to the file on slave
|
|
:rtype: str
|
|
"""
|
|
|
|
if self._task:
|
|
directory = self._task.directory().rstrip('/')
|
|
executor = self._task.executor()
|
|
# executor.type is currently used only by pods. All tasks in a pod
|
|
# share an executor, so if this is a pod, get the task logs instead
|
|
# of the executor logs
|
|
if executor.get('type') == "DEFAULT":
|
|
task_id = self._task.dict().get('id')
|
|
return directory + '/tasks/{}/'.format(task_id) + self._path
|
|
else:
|
|
return directory + '/' + self._path
|
|
else:
|
|
return self._path
|
|
|
|
def _params(self, length, offset=None):
|
|
"""GET parameters to send to files/read.json. See the MesosFile
|
|
docstring for full information.
|
|
|
|
:param length: number of bytes to read
|
|
:type length: int
|
|
:param offset: start location. if None, will use the location
|
|
of the current file cursor
|
|
:type offset: int
|
|
:returns: GET parameters
|
|
:rtype: dict
|
|
"""
|
|
|
|
if offset is None:
|
|
offset = self._cursor
|
|
|
|
return {
|
|
'path': self._host_path(),
|
|
'offset': offset,
|
|
'length': length
|
|
}
|
|
|
|
def _fetch_chunk(self, length, offset=None):
|
|
"""Fetch data from files/read.json
|
|
|
|
:param length: number of bytes to fetch
|
|
:type length: int
|
|
:param offset: start location. If not None, this file's
|
|
cursor is set to `offset`
|
|
:type offset: int
|
|
:returns: data read
|
|
:rtype: str
|
|
"""
|
|
|
|
if offset is not None:
|
|
self.seek(offset, os.SEEK_SET)
|
|
|
|
params = self._params(length)
|
|
data = self._fetch(params)["data"]
|
|
self.seek(len(data), os.SEEK_CUR)
|
|
return data
|
|
|
|
def _fetch(self, params):
|
|
"""Fetch data from files/read.json
|
|
|
|
:param params: GET parameters
|
|
:type params: dict
|
|
:returns: response dict
|
|
:rtype: dict
|
|
"""
|
|
|
|
if self._slave:
|
|
return self._dcos_client.slave_file_read(self._slave['id'],
|
|
self._slave.http_url(),
|
|
**params)
|
|
else:
|
|
return self._dcos_client.master_file_read(**params)
|
|
|
|
def __str__(self):
|
|
"""String representation of the file: <task_id:file_path>
|
|
|
|
:returns: string representation of the file
|
|
:rtype: str
|
|
"""
|
|
|
|
if self._task:
|
|
return "task:{0}:{1}".format(self._task['id'], self._path)
|
|
elif self._slave:
|
|
return "slave:{0}:{1}".format(self._slave['id'], self._path)
|
|
else:
|
|
return "master:{0}".format(self._path)
|
|
|
|
|
|
class TaskIO(object):
|
|
"""Object used to stream I/O between a
|
|
running Mesos task and the local terminal.
|
|
|
|
:param task: task ID
|
|
:type task: str
|
|
:param cmd: a command to launch inside the task's container
|
|
:type cmd: str
|
|
:param args: Additional arguments for the command
|
|
:type args: str
|
|
:param interactive: whether to attach STDIN of the current
|
|
terminal to the new command being launched
|
|
:type interactive: bool
|
|
:param tty: whether to allocate a tty for this command and attach
|
|
the local terminal to it
|
|
:type tty: bool
|
|
"""
|
|
|
|
# The interval to send heartbeat messages to
|
|
# keep persistent connections alive.
|
|
HEARTBEAT_INTERVAL = 30
|
|
HEARTBEAT_INTERVAL_NANOSECONDS = HEARTBEAT_INTERVAL * 1000000000
|
|
|
|
def __init__(self, task_id, cmd=None, args=None,
|
|
interactive=False, tty=False):
|
|
# Store relevant parameters of the call for later.
|
|
self.cmd = cmd
|
|
self.interactive = interactive
|
|
self.tty = tty
|
|
self.args = args
|
|
|
|
# Create a client and grab a reference to the DC/OS master.
|
|
client = DCOSClient()
|
|
master = get_master(client)
|
|
|
|
# Get the task and make sure its container was launched by the UCR.
|
|
# Since task's containers are launched by the UCR by default, we want
|
|
# to allow most tasks to pass through unchecked. The only exception is
|
|
# when a task has an explicit container specified and it is not of type
|
|
# "MESOS". Having a type of "MESOS" implies that it was launched by the
|
|
# UCR -- all other types imply it was not.
|
|
task_obj = master.task(task_id)
|
|
if "container" in task_obj.dict():
|
|
if "type" in task_obj.dict()["container"]:
|
|
if task_obj.dict()["container"]["type"] != "MESOS":
|
|
raise DCOSException(
|
|
"This command is only supported for tasks"
|
|
" launched by the Universal Container Runtime (UCR).")
|
|
|
|
# Get the URL to the agent running the task.
|
|
if client._mesos_master_url:
|
|
self.agent_url = client.slave_url(
|
|
slave_id="",
|
|
private_url=task_obj.slave().http_url(),
|
|
path="api/v1")
|
|
else:
|
|
self.agent_url = client.slave_url(
|
|
slave_id=task_obj.slave()['id'],
|
|
private_url="",
|
|
path="api/v1")
|
|
|
|
# Grab a reference to the container ID for the task.
|
|
self.parent_id = master.get_container_id(task_id)
|
|
|
|
# Generate a new UUID for the nested container
|
|
# used to run commands passed to `task exec`.
|
|
self.container_id = str(uuid.uuid4())
|
|
|
|
# Set up a recordio encoder and decoder
|
|
# for any incoming and outgoing messages.
|
|
self.encoder = recordio.Encoder(
|
|
lambda s: bytes(json.dumps(s, ensure_ascii=False), "UTF-8"))
|
|
self.decoder = recordio.Decoder(
|
|
lambda s: json.loads(s.decode("UTF-8")))
|
|
|
|
# Set up queues to send messages between threads used for
|
|
# reading/writing to STDIN/STDOUT/STDERR and threads
|
|
# sending/receiving data over the network.
|
|
self.input_queue = Queue()
|
|
self.output_queue = Queue()
|
|
|
|
# Set up an event to block attaching
|
|
# input until attaching output is complete.
|
|
self.attach_input_event = threading.Event()
|
|
self.attach_input_event.clear()
|
|
|
|
# Set up an event to block printing the output
|
|
# until an attach input event has successfully
|
|
# been established.
|
|
self.print_output_event = threading.Event()
|
|
self.print_output_event.clear()
|
|
|
|
# Set up an event to block the main thread
|
|
# from exiting until signaled to do so.
|
|
self.exit_event = threading.Event()
|
|
self.exit_event.clear()
|
|
|
|
# Use a class variable to store exceptions thrown on
|
|
# other threads and raise them on the main thread before
|
|
# exiting.
|
|
self.exception = None
|
|
|
|
def run(self):
|
|
"""Run the helper threads in this class which enable streaming
|
|
of STDIN/STDOUT/STDERR between the CLI and the Mesos Agent API.
|
|
|
|
If a tty is requested, we take over the current terminal and
|
|
put it into raw mode. We make sure to reset the terminal back
|
|
to its original settings before exiting.
|
|
"""
|
|
|
|
# Without a TTY.
|
|
if not self.tty:
|
|
try:
|
|
self._start_threads()
|
|
self.exit_event.wait()
|
|
except Exception as e:
|
|
self.exception = e
|
|
|
|
if self.exception:
|
|
raise self.exception
|
|
return
|
|
|
|
# With a TTY.
|
|
if util.is_windows_platform():
|
|
raise DCOSException(
|
|
"Running with the '--tty' flag is not supported on windows.")
|
|
|
|
if not sys.stdin.isatty():
|
|
raise DCOSException(
|
|
"Must be running in a tty to pass the '--tty flag'.")
|
|
|
|
fd = sys.stdin.fileno()
|
|
oldtermios = termios.tcgetattr(fd)
|
|
|
|
try:
|
|
if self.interactive:
|
|
tty.setraw(fd, when=termios.TCSANOW)
|
|
self._window_resize(signal.SIGWINCH, None)
|
|
signal.signal(signal.SIGWINCH, self._window_resize)
|
|
|
|
self._start_threads()
|
|
self.exit_event.wait()
|
|
except Exception as e:
|
|
self.exception = e
|
|
|
|
termios.tcsetattr(
|
|
sys.stdin.fileno(),
|
|
termios.TCSAFLUSH,
|
|
oldtermios)
|
|
|
|
if self.exception:
|
|
raise self.exception
|
|
|
|
def _thread_wrapper(self, func):
|
|
"""A wrapper around all threads used in this class
|
|
|
|
If a thread throws an exception, it will unblock the main
|
|
thread and save the exception in a class variable. The main
|
|
thread will then rethrow the exception before exiting.
|
|
|
|
:param func: The start function for the thread
|
|
:type func: function
|
|
"""
|
|
try:
|
|
func()
|
|
except Exception as e:
|
|
self.exception = e
|
|
self.exit_event.set()
|
|
|
|
def _start_threads(self):
|
|
"""Start all threads associated with this class
|
|
"""
|
|
if self.interactive:
|
|
# Collects input from STDIN and puts
|
|
# it in the input_queue as data messages.
|
|
thread = threading.Thread(
|
|
target=self._thread_wrapper,
|
|
args=(self._input_thread,))
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
# Prepares heartbeat control messages and
|
|
# puts them in the input queueaat a specific
|
|
# heartbeat interval.
|
|
thread = threading.Thread(
|
|
target=self._thread_wrapper,
|
|
args=(self._heartbeat_thread,))
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
# Opens a persistent connection with the mesos agent and
|
|
# feeds it both control and data messages from the input
|
|
# queue via ATTACH_CONTAINER_INPUT messages.
|
|
thread = threading.Thread(
|
|
target=self._thread_wrapper,
|
|
args=(self._attach_container_input,))
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
# Opens a persistent connection with a mesos agent, reads
|
|
# data messages from it and feeds them to an output_queue.
|
|
thread = threading.Thread(
|
|
target=self._thread_wrapper,
|
|
args=(self._launch_nested_container_session,))
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
# Collects data messages from the output queue and writes
|
|
# their content to STDOUT and STDERR.
|
|
thread = threading.Thread(
|
|
target=self._thread_wrapper,
|
|
args=(self._output_thread,))
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
def _launch_nested_container_session(self):
|
|
"""Sends a request to the Mesos Agent to launch a new
|
|
nested container and attach to its output stream.
|
|
The output stream is then sent back in the response.
|
|
"""
|
|
|
|
message = {
|
|
'type': "LAUNCH_NESTED_CONTAINER_SESSION",
|
|
'launch_nested_container_session': {
|
|
'container_id': {
|
|
'parent': self.parent_id,
|
|
'value': self.container_id
|
|
},
|
|
'command': {
|
|
'value': self.cmd,
|
|
'arguments': [self.cmd] + self.args,
|
|
'shell': False}}}
|
|
|
|
if self.tty:
|
|
message[
|
|
'launch_nested_container_session'][
|
|
'container'] = {
|
|
'type': 'MESOS',
|
|
'tty_info': {}}
|
|
|
|
req_extra_args = {
|
|
'stream': True,
|
|
'headers': {
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/recordio',
|
|
'Message-Accept': 'application/json'}}
|
|
|
|
response = http.post(
|
|
self.agent_url,
|
|
data=json.dumps(message),
|
|
**req_extra_args)
|
|
|
|
self._process_output_stream(response)
|
|
|
|
def _process_output_stream(self, response):
|
|
"""Gets data streamed over the given response and places the
|
|
returned messages into our output_queue. Only expects to
|
|
receive data messages.
|
|
|
|
:param response: Response from an http post
|
|
:type response: requests.models.Response
|
|
"""
|
|
|
|
# Now that we are ready to process the output stream (meaning
|
|
# our output connection has been established), allow the input
|
|
# stream to be attached by setting an event.
|
|
self.attach_input_event.set()
|
|
|
|
# If we are running in interactive mode, wait to make sure that
|
|
# our input connection succeeds before pushing any output to the
|
|
# output queue.
|
|
if self.interactive:
|
|
self.print_output_event.wait()
|
|
|
|
try:
|
|
for chunk in response.iter_content(chunk_size=None):
|
|
records = self.decoder.decode(chunk)
|
|
|
|
for r in records:
|
|
if r.get('type') and r['type'] == 'DATA':
|
|
self.output_queue.put(r['data'])
|
|
except Exception as e:
|
|
raise DCOSException(
|
|
"Error parsing output stream: {error}".format(error=e))
|
|
|
|
self.output_queue.join()
|
|
self.exit_event.set()
|
|
|
|
def _attach_container_input(self):
|
|
"""Streams all input data (e.g. STDIN) from the client to the agent
|
|
"""
|
|
|
|
def _initial_input_streamer():
|
|
"""Generator function yielding the initial ATTACH_CONTAINER_INPUT
|
|
message for streaming. We have a separate generator for this so
|
|
that we can attempt the connection once before committing to a
|
|
persistent connection where we stream the rest of the input.
|
|
|
|
:returns: A RecordIO encoded message
|
|
"""
|
|
|
|
message = {
|
|
'type': 'ATTACH_CONTAINER_INPUT',
|
|
'attach_container_input': {
|
|
'type': 'CONTAINER_ID',
|
|
'container_id': {
|
|
'parent': self.parent_id,
|
|
'value': self.container_id}}}
|
|
|
|
yield self.encoder.encode(message)
|
|
|
|
def _input_streamer():
|
|
"""Generator function yielding ATTACH_CONTAINER_INPUT
|
|
messages for streaming. It yields the _intitial_input_streamer()
|
|
message, followed by messages from the input_queue on each
|
|
subsequent call.
|
|
|
|
:returns: A RecordIO encoded message
|
|
"""
|
|
|
|
yield next(_initial_input_streamer())
|
|
|
|
while True:
|
|
record = self.input_queue.get()
|
|
if not record:
|
|
break
|
|
yield record
|
|
|
|
req_extra_args = {
|
|
'headers': {
|
|
'Content-Type': 'application/recordio',
|
|
'Message-Content-Type': 'application/json',
|
|
'Accept': 'application/json',
|
|
'Connection': 'close',
|
|
'Transfer-Encoding': 'chunked'
|
|
}
|
|
}
|
|
|
|
# Ensure we don't try to attach our input to a container that isn't
|
|
# fully up and running by waiting until the
|
|
# `_process_output_stream` function signals us that it's ready.
|
|
self.attach_input_event.wait()
|
|
|
|
# Send an intial "Test" message to ensure that we are able to
|
|
# establish a connection with the agent. If we aren't we will throw
|
|
# an exception and break out of this thread. However, in cases where
|
|
# we receive a 500 response from the agent, we actually want to
|
|
# continue without throwing an exception. A 500 error indicates that
|
|
# we can't connect to the container because it has already finished
|
|
# running. In that case we continue running to allow the output queue
|
|
# to be flushed.
|
|
try:
|
|
http.post(
|
|
self.agent_url,
|
|
data=_initial_input_streamer(),
|
|
**req_extra_args)
|
|
except DCOSHTTPException as e:
|
|
if not e.response.status_code == 500:
|
|
raise e
|
|
|
|
# If we succeeded with that connection, unblock process_output_stream()
|
|
# from sending output data to the output thread.
|
|
self.print_output_event.set()
|
|
|
|
# Begin streaming the input.
|
|
http.post(
|
|
self.agent_url,
|
|
data=_input_streamer(),
|
|
**req_extra_args)
|
|
|
|
def _input_thread(self):
|
|
"""Reads from STDIN and places a message
|
|
with that data onto the input_queue.
|
|
"""
|
|
|
|
message = {
|
|
'type': 'ATTACH_CONTAINER_INPUT',
|
|
'attach_container_input': {
|
|
'type': 'PROCESS_IO',
|
|
'process_io': {
|
|
'type': 'DATA',
|
|
'data': {
|
|
'type': 'STDIN',
|
|
'data': ''}}}}
|
|
|
|
for chunk in iter(partial(os.read, sys.stdin.fileno(), 1024), b''):
|
|
message[
|
|
'attach_container_input'][
|
|
'process_io'][
|
|
'data'][
|
|
'data'] = base64.b64encode(chunk).decode('utf-8')
|
|
|
|
self.input_queue.put(self.encoder.encode(message))
|
|
|
|
# Push an empty string to indicate EOF to the server and push
|
|
# 'None' to signal that we are done processing input.
|
|
message['attach_container_input']['process_io']['data']['data'] = ''
|
|
self.input_queue.put(self.encoder.encode(message))
|
|
self.input_queue.put(None)
|
|
|
|
def _output_thread(self):
|
|
"""Reads from the output_queue and writes the data
|
|
to the appropriate STDOUT or STDERR.
|
|
"""
|
|
|
|
while True:
|
|
# Get a message from the output queue and decode it.
|
|
# Then write the data to the appropriate stdout or stderr.
|
|
output = self.output_queue.get()
|
|
if not output.get('data'):
|
|
raise DCOSException("Error no 'data' field in output message")
|
|
|
|
data = output['data']
|
|
data = base64.b64decode(data.encode('utf-8'))
|
|
|
|
if output.get('type') and output['type'] == 'STDOUT':
|
|
sys.stdout.buffer.write(data)
|
|
sys.stdout.flush()
|
|
elif output.get('type') and output['type'] == 'STDERR':
|
|
sys.stderr.buffer.write(data)
|
|
sys.stderr.flush()
|
|
else:
|
|
raise DCOSException("Unsupported data type in output stream")
|
|
|
|
self.output_queue.task_done()
|
|
|
|
def _heartbeat_thread(self):
|
|
"""Generates a heartbeat message to send over the
|
|
ATTACH_CONTAINER_INPUT stream every `interval` seconds and
|
|
inserts it in the input queue.
|
|
"""
|
|
|
|
interval = self.HEARTBEAT_INTERVAL
|
|
nanoseconds = self.HEARTBEAT_INTERVAL_NANOSECONDS
|
|
|
|
message = {
|
|
'type': 'ATTACH_CONTAINER_INPUT',
|
|
'attach_container_input': {
|
|
'type': 'PROCESS_IO',
|
|
'process_io': {
|
|
'type': 'CONTROL',
|
|
'control': {
|
|
'type': 'HEARTBEAT',
|
|
'heartbeat': {
|
|
'interval': {
|
|
'nanoseconds': nanoseconds}}}}}}
|
|
|
|
while True:
|
|
self.input_queue.put(self.encoder.encode(message))
|
|
time.sleep(interval)
|
|
|
|
def _window_resize(self, signum, frame):
|
|
"""Signal handler for SIGWINCH.
|
|
|
|
Generates a message with the current demensions of the
|
|
terminal and puts it in the input_queue.
|
|
|
|
:param signum: the signal number being handled
|
|
:type signum: int
|
|
:param frame: current stack frame
|
|
:type frame: frame
|
|
"""
|
|
|
|
# Determine the size of our terminal, and create the message to be sent
|
|
rows, columns = os.popen('stty size', 'r').read().split()
|
|
|
|
message = {
|
|
'type': 'ATTACH_CONTAINER_INPUT',
|
|
'attach_container_input': {
|
|
'type': 'PROCESS_IO',
|
|
'process_io': {
|
|
'type': 'CONTROL',
|
|
'control': {
|
|
'type': 'TTY_INFO',
|
|
'tty_info': {
|
|
'window_size': {
|
|
'rows': int(rows),
|
|
'columns': int(columns)}}}}}}
|
|
|
|
self.input_queue.put(self.encoder.encode(message))
|
|
|
|
|
|
def parse_pid(pid):
|
|
""" Parse the mesos pid string,
|
|
|
|
:param pid: pid of the form "id@ip:port"
|
|
:type pid: str
|
|
:returns: (id, ip, port)
|
|
:rtype: (str, str, str)
|
|
"""
|
|
|
|
id_, second = pid.split('@')
|
|
ip, port = second.split(':')
|
|
return id_, ip, port
|
|
|
|
|
|
def _merge(d, keys):
|
|
""" Merge multiple lists from a dictionary into one iterator.
|
|
e.g. _merge({'a': [1, 2], 'b': [3]}, ['a', 'b']) ->
|
|
iter(1, 2, 3)
|
|
|
|
:param d: dictionary
|
|
:type d: dict
|
|
:param keys: keys to merge
|
|
:type keys: [hashable]
|
|
:returns: iterator
|
|
:rtype: iter
|
|
"""
|
|
|
|
return itertools.chain(*[d[k] for k in keys])
|