Files
deb-python-dcos/dcos/metronome.py

412 lines
14 KiB
Python

import json
from six.moves import urllib
from dcos import config, cosmos, http, packagemanager, rpcclient, util
from dcos.errors import DCOSException
logger = util.get_logger(__name__)
EMBED_ACTIVE_RUNS = 'activeRuns'
EMBED_SCHEDULES = 'schedules'
EMBED_HISTORY = 'history'
EMBED_HISTORY_SUMMARY = 'historySummary'
def create_client(toml_config=None):
"""Creates a Metronome client with the supplied configuration.
:param toml_config: configuration dictionary
:type toml_config: config.Toml
:returns: Metronome client
:rtype: dcos.metronome.Client
"""
if toml_config is None:
toml_config = config.get_config()
metronome_url = _get_metronome_url(toml_config)
timeout = config.get_config_val('core.timeout') or http.DEFAULT_TIMEOUT
rpc_client = rpcclient.create_client(metronome_url, timeout)
logger.info('Creating metronome client with: %r', metronome_url)
return Client(rpc_client)
def _get_embed_query_string(embed_list):
return '?{}'.format('&'.join('embed=%s' % (item) for item in embed_list))
def _get_metronome_url(toml_config=None):
"""
:param toml_config: configuration dictionary
:type toml_config: config.Toml
:returns: metronome base url
:rtype: str
"""
if toml_config is None:
toml_config = config.get_config()
metronome_url = config.get_config_val('metronome.url', toml_config)
if metronome_url is None:
# dcos must be capable to use dcos_url
_check_capability()
dcos_url = config.get_config_val('core.dcos_url', toml_config)
if dcos_url is None:
raise config.missing_config_exception(['core.dcos_url'])
metronome_url = urllib.parse.urljoin(dcos_url, 'service/metronome/')
return metronome_url
class Client(object):
"""Class for talking to the Metronome server.
:param rpc_client: provides a method for making HTTP requests
:type rpc_client: _RpcClient
"""
def __init__(self, rpc_client):
self._rpc = rpc_client
def get_about(self):
"""Returns info about Metronome instance
:returns Metronome information
:rtype: dict
"""
response = self._rpc.http_req(http.get, 'v1/info')
return response.json()
def get_job(self, job_id, embed_with=None):
"""Returns a representation of the requested job.
:param job_id: the ID of the application
:type job_id: str
:param embed_with: list of strings to ?embed=str&embed=str2...
:type embed_with: [str]
:returns: the requested Metronome job
:rtype: dict
"""
# refactor util name it isn't marathon specific
job_id = util.normalize_marathon_id_path(job_id)
embeds = _get_embed_query_string(embed_with) if embed_with else None
url = ('v1/jobs{}{}'.format(job_id, embeds)
if embeds else 'v1/jobs{}'.format(job_id))
response = self._rpc.http_req(http.get, url)
return response.json()
def get_jobs(self, embed_with=None):
"""Get a list of known jobs.
:param embed_with: list of strings to ?embed=str&embed=str2...
:type embed_with: [str]
:returns: list of known jobs
:rtype: [dict]
"""
embeds = _get_embed_query_string(embed_with) if embed_with else None
url = 'v1/jobs{}'.format(embeds) if embeds else 'v1/jobs'
response = self._rpc.http_req(http.get, url)
return response.json()
def add_job(self, job_resource):
"""Add a new job.
:param job_resource: job resource
:type job_resource: dict, bytes or file
:returns: the job description
:rtype: dict
"""
# The file type exists only in Python 2, preventing type(...) is file.
if hasattr(job_resource, 'read'):
job_json = json.load(job_resource)
else:
job_json = job_resource
response = self._rpc.http_req(http.post, 'v1/jobs', json=job_json)
# need deployment ID
return response.json()
def _update_req(
self, resource_type, resource_id, resource_json, force=False):
"""Send an HTTP request to update an application, group, or pod.
:param resource_type: one of 'apps', 'groups', or 'pods'
:type resource_type: str
:param resource_id: the app, group, or pod ID
:type resource_id: str
:param resource_json: the json payload
:type resource_json: {}
:param force: whether to override running deployments
:type force: bool
:returns: the response from Metronome
:rtype: requests.Response
"""
path_template = 'v1/{}/{{}}'.format(resource_type)
path = self._job_id_path_format(path_template, resource_id)
params = self._force_params(force)
return self._rpc.http_req(
http.put, path, params=params, json=resource_json)
def _update(self, resource_type, resource_id, resource_json, force=False):
"""Update an application or group.
The HTTP response is handled differently for pods; see `update_pod`.
:param resource_type: either 'apps' or 'groups'
:type resource_type: str
:param resource_id: the app or group ID
:type resource_id: str
:param resource_json: the json payload
:type resource_json: {}
:param force: whether to override running deployments
:type force: bool
:returns: the resulting deployment ID
:rtype: str
"""
response = self._update_req(
resource_type, resource_id, resource_json, force)
body_json = self._parse_json(response)
try:
return body_json.get('deploymentId')
except KeyError:
template = ('Error: missing "deploymentId" field in the following '
'JSON response from Job:\n{}')
rendered_json = json.dumps(body_json, indent=2, sort_keys=True)
raise DCOSException(template.format(rendered_json))
def update_job(self, job_id, payload, force=False):
"""Update a job.
:param job_id: the job id
:type job_id: str
:param payload: the json payload
:type payload: dict
:param force: whether to override running deployments
:type force: bool
:returns: the resulting deployment ID
:rtype: str
"""
return self._update('jobs', job_id, payload, force)
def remove_job(self, job_id, force=False):
"""Completely removes the requested application.
:param job_id: the ID of the job to remove
:type job_id: str
:param force: whether to override running deployments
:type force: bool
:rtype: None
"""
job_id = util.normalize_marathon_id_path(job_id)
params = self._force_params(force)
path = 'v1/jobs{}'.format(job_id)
self._rpc.http_req(http.delete, path, params=params)
def get_schedules(self, job_id):
"""Gets the schedules for a given job
:param job_id: the ID of the job to remove
:type job_id: str
:rtype: json of schedules
"""
job_id = util.normalize_marathon_id_path(job_id)
path = 'v1/jobs{}/schedules'.format(job_id)
response = self._rpc.http_req(http.get, path)
return response.json()
def get_schedule(self, job_id, schedule_id):
"""Gets the schedules for a given job
:param job_id: the ID of the job to remove
:type job_id: str
:rtype: json of schedules
"""
job_id = util.normalize_marathon_id_path(job_id)
schedule_id = util.normalize_marathon_id_path(schedule_id)
path = 'v1/jobs{}/schedules/{}'.format(job_id, schedule_id)
response = self._rpc.http_req(http.get, path)
return response.json()
def add_schedule(self, job_id, schedule_resource):
"""Gets the schedules for a given job
:param job_id: the ID of the job to remove
:type job_id: str
:rtype: json of schedules
"""
job_id = util.normalize_marathon_id_path(job_id)
if hasattr(schedule_resource, 'read'):
schedule_json = json.load(schedule_resource)
else:
schedule_json = schedule_resource
path = 'v1/jobs{}/schedules'.format(job_id)
response = self._rpc.http_req(http.post, path, json=schedule_json)
return response.json()
def update_schedule(self, job_id, schedule_id, schedule_resource):
"""Gets the schedules for a given job
:param job_id: the ID of the job to remove
:type job_id: str
:rtype: json of schedules
"""
job_id = util.normalize_marathon_id_path(job_id)
if hasattr(schedule_resource, 'read'):
schedule_json = json.load(schedule_resource)
else:
schedule_json = schedule_resource
path = 'v1/jobs{}/schedules/{}'.format(job_id, schedule_id)
response = self._rpc.http_req(http.put, path, json=schedule_json)
return response.json()
def remove_schedule(self, job_id, schedule_id):
"""Completely removes the requested application.
:param job_id: the ID of the job to remove
:type job_id: str
:param force: whether to override running deployments
:type force: bool
:rtype: None
"""
job_id = util.normalize_marathon_id_path(job_id)
schedule_id = util.normalize_marathon_id_path(schedule_id)
path = 'v1/jobs{}/schedules/{}'.format(job_id, schedule_id)
self._rpc.http_req(http.delete, path)
def run_job(self, job_id):
"""Add a new job.
:param job_id: the ID of the job to remove
:type job_id: str
:rtype: None
"""
job_id = util.normalize_marathon_id_path(job_id)
path = '/v1/jobs{}/runs'.format(job_id)
response = self._rpc.http_req(http.post, path)
return response.json()
def get_runs(self, job_id):
"""Gets the schedules for a given job
:param job_id: the ID of the job to remove
:type job_id: str
:rtype: json of schedules
"""
job_id = util.normalize_marathon_id_path(job_id)
path = '/v1/jobs{}/runs'.format(job_id)
response = self._rpc.http_req(http.get, path)
return response.json()
def get_run(self, job_id, run_id):
"""Add a new job.
:param job_id: the ID of the job
:type job_id: str
:param run_id: the ID of the job run
:type run_id: str
:rtype: None
"""
job_id = util.normalize_marathon_id_path(job_id)
run_id = util.normalize_marathon_id_path(run_id)
path = '/v1/jobs{}/runs{}'.format(job_id, run_id)
response = self._rpc.http_req(http.get, path)
return response.json()
def kill_run(self, job_id, run_id):
"""Add a new job.
:param job_id: the ID of the job
:type job_id: str
:param run_id: the ID of the job run to remove
:type run_id: str
:rtype: None
"""
job_id = util.normalize_marathon_id_path(job_id)
run_id = util.normalize_marathon_id_path(run_id)
path = '/v1/jobs{}/runs{}/actions/stop'.format(job_id, run_id)
self._rpc.http_req(http.post, path)
@staticmethod
def _job_id_path_format(url_path_template, id_path):
"""Substitutes a Metronome "ID path" into a URL path format string,
ensuring the result is well-formed.
All leading and trailing slashes in the ID will be removed, and the ID
will have all URL-unsafe characters escaped, as if by
urllib.parse.quote().
:param url_path_template: format string for the path portion of a URL,
with a single format specifier (i.e. {})
where the "ID path" should be inserted
:type url_path_template: str
:param id_path: a Job "ID path", e.g. app, group, or pod ID
:type id_path: str
:returns: the url path template with the ID inserted
:rtype: str
"""
normalized_id_path = urllib.parse.quote(id_path.strip('/'))
return url_path_template.format(normalized_id_path)
@staticmethod
def _force_params(force):
"""Returns the query parameters that signify the provided force value.
:param force: whether to override running deployments
:type force: bool
:rtype: {} | None
"""
return {'stopCurrentJobRuns': 'true'} if force else None
@staticmethod
def _parse_json(response):
"""Attempts to parse the body of the given response as JSON.
Raises DCOSException if parsing fails.
:param response: the response containing the body to parse
:type response: requests.Response
:return: the parsed JSON
:rtype: {} | [] | str | int | float | bool | None
"""
try:
return response.json()
except:
template = ('Error: Response from Metronome was not in expected '
'JSON format:\n{}')
raise DCOSException(template.format(response.text))
def _check_capability():
"""
The function checks if cluster has metronome capability.
:raises: DCOSException if cluster does not have metronome capability
"""
manager = packagemanager.PackageManager(cosmos.get_cosmos_url())
if not manager.has_capability('METRONOME'):
raise DCOSException(
'DC/OS backend does not support metronome capabilities in this '
'version. Must be DC/OS >= 1.8')