import json from six.moves import urllib from dcos import config, cosmos, http, packagemanager, rpcclient, util from dcos.errors import DCOSException logger = util.get_logger(__name__) EMBED_ACTIVE_RUNS = 'activeRuns' EMBED_SCHEDULES = 'schedules' EMBED_HISTORY = 'history' EMBED_HISTORY_SUMMARY = 'historySummary' def create_client(toml_config=None): """Creates a Metronome client with the supplied configuration. :param toml_config: configuration dictionary :type toml_config: config.Toml :returns: Metronome client :rtype: dcos.metronome.Client """ if toml_config is None: toml_config = config.get_config() metronome_url = _get_metronome_url(toml_config) timeout = config.get_config_val('core.timeout') or http.DEFAULT_TIMEOUT rpc_client = rpcclient.create_client(metronome_url, timeout) logger.info('Creating metronome client with: %r', metronome_url) return Client(rpc_client) def _get_embed_query_string(embed_list): return '?{}'.format('&'.join('embed=%s' % (item) for item in embed_list)) def _get_metronome_url(toml_config=None): """ :param toml_config: configuration dictionary :type toml_config: config.Toml :returns: metronome base url :rtype: str """ if toml_config is None: toml_config = config.get_config() metronome_url = config.get_config_val('metronome.url', toml_config) if metronome_url is None: # dcos must be capable to use dcos_url _check_capability() dcos_url = config.get_config_val('core.dcos_url', toml_config) if dcos_url is None: raise config.missing_config_exception(['core.dcos_url']) metronome_url = urllib.parse.urljoin(dcos_url, 'service/metronome/') return metronome_url class Client(object): """Class for talking to the Metronome server. :param rpc_client: provides a method for making HTTP requests :type rpc_client: _RpcClient """ def __init__(self, rpc_client): self._rpc = rpc_client def get_about(self): """Returns info about Metronome instance :returns Metronome information :rtype: dict """ response = self._rpc.http_req(http.get, 'v1/info') return response.json() def get_job(self, job_id, embed_with=None): """Returns a representation of the requested job. :param job_id: the ID of the application :type job_id: str :param embed_with: list of strings to ?embed=str&embed=str2... :type embed_with: [str] :returns: the requested Metronome job :rtype: dict """ # refactor util name it isn't marathon specific job_id = util.normalize_marathon_id_path(job_id) embeds = _get_embed_query_string(embed_with) if embed_with else None url = ('v1/jobs{}{}'.format(job_id, embeds) if embeds else 'v1/jobs{}'.format(job_id)) response = self._rpc.http_req(http.get, url) return response.json() def get_jobs(self, embed_with=None): """Get a list of known jobs. :param embed_with: list of strings to ?embed=str&embed=str2... :type embed_with: [str] :returns: list of known jobs :rtype: [dict] """ embeds = _get_embed_query_string(embed_with) if embed_with else None url = 'v1/jobs{}'.format(embeds) if embeds else 'v1/jobs' response = self._rpc.http_req(http.get, url) return response.json() def add_job(self, job_resource): """Add a new job. :param job_resource: job resource :type job_resource: dict, bytes or file :returns: the job description :rtype: dict """ # The file type exists only in Python 2, preventing type(...) is file. if hasattr(job_resource, 'read'): job_json = json.load(job_resource) else: job_json = job_resource response = self._rpc.http_req(http.post, 'v1/jobs', json=job_json) # need deployment ID return response.json() def _update_req( self, resource_type, resource_id, resource_json, force=False): """Send an HTTP request to update an application, group, or pod. :param resource_type: one of 'apps', 'groups', or 'pods' :type resource_type: str :param resource_id: the app, group, or pod ID :type resource_id: str :param resource_json: the json payload :type resource_json: {} :param force: whether to override running deployments :type force: bool :returns: the response from Metronome :rtype: requests.Response """ path_template = 'v1/{}/{{}}'.format(resource_type) path = self._job_id_path_format(path_template, resource_id) params = self._force_params(force) return self._rpc.http_req( http.put, path, params=params, json=resource_json) def _update(self, resource_type, resource_id, resource_json, force=False): """Update an application or group. The HTTP response is handled differently for pods; see `update_pod`. :param resource_type: either 'apps' or 'groups' :type resource_type: str :param resource_id: the app or group ID :type resource_id: str :param resource_json: the json payload :type resource_json: {} :param force: whether to override running deployments :type force: bool :returns: the resulting deployment ID :rtype: str """ response = self._update_req( resource_type, resource_id, resource_json, force) body_json = self._parse_json(response) try: return body_json.get('deploymentId') except KeyError: template = ('Error: missing "deploymentId" field in the following ' 'JSON response from Job:\n{}') rendered_json = json.dumps(body_json, indent=2, sort_keys=True) raise DCOSException(template.format(rendered_json)) def update_job(self, job_id, payload, force=False): """Update a job. :param job_id: the job id :type job_id: str :param payload: the json payload :type payload: dict :param force: whether to override running deployments :type force: bool :returns: the resulting deployment ID :rtype: str """ return self._update('jobs', job_id, payload, force) def remove_job(self, job_id, force=False): """Completely removes the requested application. :param job_id: the ID of the job to remove :type job_id: str :param force: whether to override running deployments :type force: bool :rtype: None """ job_id = util.normalize_marathon_id_path(job_id) params = self._force_params(force) path = 'v1/jobs{}'.format(job_id) self._rpc.http_req(http.delete, path, params=params) def get_schedules(self, job_id): """Gets the schedules for a given job :param job_id: the ID of the job to remove :type job_id: str :rtype: json of schedules """ job_id = util.normalize_marathon_id_path(job_id) path = 'v1/jobs{}/schedules'.format(job_id) response = self._rpc.http_req(http.get, path) return response.json() def get_schedule(self, job_id, schedule_id): """Gets the schedules for a given job :param job_id: the ID of the job to remove :type job_id: str :rtype: json of schedules """ job_id = util.normalize_marathon_id_path(job_id) schedule_id = util.normalize_marathon_id_path(schedule_id) path = 'v1/jobs{}/schedules/{}'.format(job_id, schedule_id) response = self._rpc.http_req(http.get, path) return response.json() def add_schedule(self, job_id, schedule_resource): """Gets the schedules for a given job :param job_id: the ID of the job to remove :type job_id: str :rtype: json of schedules """ job_id = util.normalize_marathon_id_path(job_id) if hasattr(schedule_resource, 'read'): schedule_json = json.load(schedule_resource) else: schedule_json = schedule_resource path = 'v1/jobs{}/schedules'.format(job_id) response = self._rpc.http_req(http.post, path, json=schedule_json) return response.json() def update_schedule(self, job_id, schedule_id, schedule_resource): """Gets the schedules for a given job :param job_id: the ID of the job to remove :type job_id: str :rtype: json of schedules """ job_id = util.normalize_marathon_id_path(job_id) if hasattr(schedule_resource, 'read'): schedule_json = json.load(schedule_resource) else: schedule_json = schedule_resource path = 'v1/jobs{}/schedules/{}'.format(job_id, schedule_id) response = self._rpc.http_req(http.put, path, json=schedule_json) return response.json() def remove_schedule(self, job_id, schedule_id): """Completely removes the requested application. :param job_id: the ID of the job to remove :type job_id: str :param force: whether to override running deployments :type force: bool :rtype: None """ job_id = util.normalize_marathon_id_path(job_id) schedule_id = util.normalize_marathon_id_path(schedule_id) path = 'v1/jobs{}/schedules/{}'.format(job_id, schedule_id) self._rpc.http_req(http.delete, path) def run_job(self, job_id): """Add a new job. :param job_id: the ID of the job to remove :type job_id: str :rtype: None """ job_id = util.normalize_marathon_id_path(job_id) path = '/v1/jobs{}/runs'.format(job_id) response = self._rpc.http_req(http.post, path) return response.json() def get_runs(self, job_id): """Gets the schedules for a given job :param job_id: the ID of the job to remove :type job_id: str :rtype: json of schedules """ job_id = util.normalize_marathon_id_path(job_id) path = '/v1/jobs{}/runs'.format(job_id) response = self._rpc.http_req(http.get, path) return response.json() def get_run(self, job_id, run_id): """Add a new job. :param job_id: the ID of the job :type job_id: str :param run_id: the ID of the job run :type run_id: str :rtype: None """ job_id = util.normalize_marathon_id_path(job_id) run_id = util.normalize_marathon_id_path(run_id) path = '/v1/jobs{}/runs{}'.format(job_id, run_id) response = self._rpc.http_req(http.get, path) return response.json() def kill_run(self, job_id, run_id): """Add a new job. :param job_id: the ID of the job :type job_id: str :param run_id: the ID of the job run to remove :type run_id: str :rtype: None """ job_id = util.normalize_marathon_id_path(job_id) run_id = util.normalize_marathon_id_path(run_id) path = '/v1/jobs{}/runs{}/actions/stop'.format(job_id, run_id) self._rpc.http_req(http.post, path) @staticmethod def _job_id_path_format(url_path_template, id_path): """Substitutes a Metronome "ID path" into a URL path format string, ensuring the result is well-formed. All leading and trailing slashes in the ID will be removed, and the ID will have all URL-unsafe characters escaped, as if by urllib.parse.quote(). :param url_path_template: format string for the path portion of a URL, with a single format specifier (i.e. {}) where the "ID path" should be inserted :type url_path_template: str :param id_path: a Job "ID path", e.g. app, group, or pod ID :type id_path: str :returns: the url path template with the ID inserted :rtype: str """ normalized_id_path = urllib.parse.quote(id_path.strip('/')) return url_path_template.format(normalized_id_path) @staticmethod def _force_params(force): """Returns the query parameters that signify the provided force value. :param force: whether to override running deployments :type force: bool :rtype: {} | None """ return {'stopCurrentJobRuns': 'true'} if force else None @staticmethod def _parse_json(response): """Attempts to parse the body of the given response as JSON. Raises DCOSException if parsing fails. :param response: the response containing the body to parse :type response: requests.Response :return: the parsed JSON :rtype: {} | [] | str | int | float | bool | None """ try: return response.json() except: template = ('Error: Response from Metronome was not in expected ' 'JSON format:\n{}') raise DCOSException(template.format(response.text)) def _check_capability(): """ The function checks if cluster has metronome capability. :raises: DCOSException if cluster does not have metronome capability """ manager = packagemanager.PackageManager(cosmos.get_cosmos_url()) if not manager.has_capability('METRONOME'): raise DCOSException( 'DC/OS backend does not support metronome capabilities in this ' 'version. Must be DC/OS >= 1.8')