Wrapped calls to requests in marathon API.
This commit is contained in:
162
dcos/api/http.py
Normal file
162
dcos/api/http.py
Normal file
@@ -0,0 +1,162 @@
|
||||
import requests
|
||||
from dcos.api import util
|
||||
from dcos.api.errors import DefaultError
|
||||
|
||||
logger = util.get_logger(__name__)
|
||||
|
||||
|
||||
def _default_is_success(status_code):
|
||||
"""Returns true if the success status is between [200, 300).
|
||||
|
||||
:param response_status: the http response status
|
||||
:type response_status: int
|
||||
:returns: True for success status; False otherwise
|
||||
:rtype: bool
|
||||
"""
|
||||
|
||||
return status_code >= 200 and status_code < 300
|
||||
|
||||
|
||||
def _default_response_to_error(response):
|
||||
"""
|
||||
:param response: HTTP resonse object
|
||||
:type response: requests.Response
|
||||
:returns: the error embedded in the response JSON
|
||||
:rtype: Error
|
||||
"""
|
||||
|
||||
return DefaultError('{}: {}'.format(response.status_code, response.text))
|
||||
|
||||
|
||||
def request(method,
|
||||
url,
|
||||
is_success=_default_is_success,
|
||||
response_to_error=_default_response_to_error,
|
||||
**kwargs):
|
||||
"""Sends an HTTP request.
|
||||
|
||||
:param method: method for the new Request object
|
||||
:type method: str
|
||||
:param url: URL for the new Request object
|
||||
:type url: str
|
||||
:param is_success: Defines successful status codes for the request
|
||||
:type is_success: Function from int to bool
|
||||
:param response_to_error: Builds an Error from an unsuccessful response
|
||||
:type response_to_error: Function from requests.Response to Error
|
||||
:param kwargs: Additional arguments to requests.request
|
||||
(see http://docs.python-requests.org/en/latest/api/#requests.request)
|
||||
:type kwargs: dict
|
||||
:rtype: (Response, Error)
|
||||
"""
|
||||
|
||||
try:
|
||||
request = requests.Request(method=method, url=url, **kwargs)
|
||||
|
||||
logger.info('Sending HTTP [%r] to [%r]', request.method, request.url)
|
||||
|
||||
with requests.Session() as session:
|
||||
response = session.send(request.prepare())
|
||||
|
||||
logger.info('Received HTTP response [%r]: %r',
|
||||
response.status_code,
|
||||
response.text)
|
||||
|
||||
if is_success(response.status_code):
|
||||
return (response, None)
|
||||
else:
|
||||
return (None, response_to_error(response))
|
||||
|
||||
except Exception as ex:
|
||||
return (None, DefaultError(str(ex)))
|
||||
|
||||
|
||||
def head(url, **kwargs):
|
||||
"""Sends a HEAD request.
|
||||
|
||||
:param url: URL for the new Request object
|
||||
:type url: str
|
||||
:param kwargs: Additional arguments to requests.request
|
||||
(see py:func:`request`)
|
||||
:type kwargs: dict
|
||||
:rtype: (Response, Error)
|
||||
"""
|
||||
|
||||
return request('head', url, **kwargs)
|
||||
|
||||
|
||||
def get(url, **kwargs):
|
||||
"""Sends a GET request.
|
||||
|
||||
:param url: URL for the new Request object
|
||||
:type url: str
|
||||
:param kwargs: Additional arguments to requests.request
|
||||
(see py:func:`request`)
|
||||
:type kwargs: dict
|
||||
:rtype: (Response, Error)
|
||||
"""
|
||||
|
||||
return request('get', url, **kwargs)
|
||||
|
||||
|
||||
def post(url, data=None, json=None, **kwargs):
|
||||
"""Sends a POST request.
|
||||
|
||||
:param url: URL for the new Request object
|
||||
:type url: str
|
||||
:param data: Request body
|
||||
:type data: dict, bytes, or file-like object
|
||||
:param json: JSON request body
|
||||
:type data: dict
|
||||
:param kwargs: Additional arguments to requests.request
|
||||
(see py:func:`request`)
|
||||
:type kwargs: dict
|
||||
:rtype: (Response, Error)
|
||||
"""
|
||||
|
||||
return request('post', url, data=data, json=json, **kwargs)
|
||||
|
||||
|
||||
def put(url, data=None, **kwargs):
|
||||
"""Sends a PUT request.
|
||||
|
||||
:param url: URL for the new Request object
|
||||
:type url: str
|
||||
:param data: Request body
|
||||
:type data: dict, bytes, or file-like object
|
||||
:param kwargs: Additional arguments to requests.request
|
||||
(see py:func:`request`)
|
||||
:type kwargs: dict
|
||||
:rtype: (Response, Error)
|
||||
"""
|
||||
|
||||
return request('put', url, data=data, **kwargs)
|
||||
|
||||
|
||||
def patch(url, data=None, **kwargs):
|
||||
"""Sends a PATCH request.
|
||||
|
||||
:param url: URL for the new Request object
|
||||
:type url: str
|
||||
:param data: Request body
|
||||
:type data: dict, bytes, or file-like object
|
||||
:param kwargs: Additional arguments to requests.request
|
||||
(see py:func:`request`)
|
||||
:type kwargs: dict
|
||||
:rtype: (Response, Error)
|
||||
"""
|
||||
|
||||
return request('patch', url, data=data, **kwargs)
|
||||
|
||||
|
||||
def delete(url, **kwargs):
|
||||
"""Sends a DELETE request.
|
||||
|
||||
:param url: URL for the new Request object
|
||||
:type url: str
|
||||
:param kwargs: Additional arguments to requests.request
|
||||
(see py:func:`request`)
|
||||
:type kwargs: dict
|
||||
:rtype: (Response, Error)
|
||||
"""
|
||||
|
||||
return request('delete', url, **kwargs)
|
||||
@@ -1,12 +1,12 @@
|
||||
import json
|
||||
|
||||
import requests
|
||||
from dcos.api import errors, util
|
||||
from dcos.api import http, util
|
||||
from dcos.api.errors import DefaultError
|
||||
|
||||
try:
|
||||
from urllib import urlencode, quote
|
||||
from urllib import quote
|
||||
except ImportError:
|
||||
from urllib.parse import urlencode, quote
|
||||
from urllib.parse import quote
|
||||
|
||||
logger = util.get_logger(__name__)
|
||||
|
||||
@@ -22,6 +22,29 @@ def create_client(config):
|
||||
return Client(config['marathon.host'], config['marathon.port'])
|
||||
|
||||
|
||||
def _response_to_error(response):
|
||||
"""
|
||||
:param response: HTTP resonse object
|
||||
:type response: requests.Response
|
||||
:returns: the error embedded in the response JSON
|
||||
:rtype: Error
|
||||
"""
|
||||
|
||||
message = response.json().get('message')
|
||||
if message is None:
|
||||
errs = response.json().get('errors')
|
||||
if errs is None:
|
||||
logger.error(
|
||||
'Marathon server did not return a message: %s',
|
||||
response.json())
|
||||
return DefaultError('Unknown error from Marathon')
|
||||
|
||||
msg = '\n'.join(error['error'] for error in errs)
|
||||
return DefaultError('Error(s): {}'.format(msg))
|
||||
|
||||
return DefaultError('Error: {}'.format(response.json()['message']))
|
||||
|
||||
|
||||
class Client(object):
|
||||
"""Class for talking to the Marathon server.
|
||||
|
||||
@@ -36,50 +59,20 @@ class Client(object):
|
||||
self._host = host
|
||||
self._port = port
|
||||
|
||||
def _create_url(self, path, query_params=None):
|
||||
def _create_url(self, path):
|
||||
"""Creates the url from the provided path.
|
||||
|
||||
:param path: url path
|
||||
:type path: str
|
||||
:param query_params: query string parameters
|
||||
:type query_params: dict
|
||||
:returns: constructed url
|
||||
:rtype: str
|
||||
"""
|
||||
|
||||
url = self._url_pattern.format(
|
||||
return self._url_pattern.format(
|
||||
host=self._host,
|
||||
port=self._port,
|
||||
path=path)
|
||||
|
||||
if query_params is not None:
|
||||
query_string = urlencode(query_params)
|
||||
url = (url + '?{}').format(query_string)
|
||||
|
||||
return url
|
||||
|
||||
def _response_to_error(self, response):
|
||||
"""
|
||||
:param response: HTTP resonse object
|
||||
:type response: requests.Response
|
||||
:returns: the error embedded in the response JSON
|
||||
:rtype: Error
|
||||
"""
|
||||
|
||||
message = response.json().get('message')
|
||||
if message is None:
|
||||
errors = response.json().get('errors')
|
||||
if errors is None:
|
||||
logger.error(
|
||||
'Marathon server did not return a message: %s',
|
||||
response.json())
|
||||
return Error('Unknown error from Marathon')
|
||||
|
||||
msg = '\n'.join(error['error'] for error in errors)
|
||||
return Error('Error(s): {}'.format(msg))
|
||||
|
||||
return Error('Error: {}'.format(response.json()['message']))
|
||||
|
||||
def get_app(self, app_id, version=None):
|
||||
"""Returns a representation of the requested application version. If
|
||||
version is None the return the latest version.
|
||||
@@ -89,7 +82,7 @@ class Client(object):
|
||||
:param version: application version as a ISO8601 datetime
|
||||
:type version: str
|
||||
:returns: the requested Marathon application
|
||||
:rtype: (dict, Error)
|
||||
:rtype: (dict, errors.Error)
|
||||
"""
|
||||
|
||||
app_id = normalize_app_id(app_id)
|
||||
@@ -99,18 +92,16 @@ class Client(object):
|
||||
url = self._create_url(
|
||||
'v2/apps{}/versions/{}'.format(app_id, version))
|
||||
|
||||
logger.info('Getting %r', url)
|
||||
response = requests.get(url)
|
||||
logger.info('Got (%r): %r', response.status_code, response.text)
|
||||
response, error = http.get(url, response_to_error=_response_to_error)
|
||||
|
||||
if _success(response.status_code):
|
||||
# Looks like Marathon return different JSON for versions
|
||||
if version is None:
|
||||
return (response.json()['app'], None)
|
||||
else:
|
||||
return (response.json(), None)
|
||||
if error is not None:
|
||||
return (None, error)
|
||||
|
||||
# Looks like Marathon return different JSON for versions
|
||||
if version is None:
|
||||
return (response.json()['app'], None)
|
||||
else:
|
||||
return (None, self._response_to_error(response))
|
||||
return (response.json(), None)
|
||||
|
||||
def get_app_versions(self, app_id, max_count=None):
|
||||
"""Asks Marathon for all the versions of the Application up to a
|
||||
@@ -121,13 +112,13 @@ class Client(object):
|
||||
:param max_count: the maximum number of version to fetch
|
||||
:type max_count: int
|
||||
:returns: a list of all the version of the application
|
||||
:rtype: (list of str, Error)
|
||||
:rtype: (list of str, errors.Error)
|
||||
"""
|
||||
|
||||
if max_count is not None and max_count <= 0:
|
||||
return (
|
||||
None,
|
||||
Error(
|
||||
DefaultError(
|
||||
'Maximum count must be a positive number: {}'.format(
|
||||
max_count))
|
||||
)
|
||||
@@ -136,36 +127,32 @@ class Client(object):
|
||||
|
||||
url = self._create_url('v2/apps{}/versions'.format(app_id))
|
||||
|
||||
logger.info('Getting %r', url)
|
||||
response = requests.get(url)
|
||||
logger.info('Got (%r): %r', response.status_code, response.text)
|
||||
response, error = http.get(url, response_to_error=_response_to_error)
|
||||
|
||||
if _success(response.status_code):
|
||||
if max_count is None:
|
||||
return (response.json()['versions'], None)
|
||||
else:
|
||||
return (response.json()['versions'][:max_count], None)
|
||||
if error is not None:
|
||||
return (None, error)
|
||||
|
||||
if max_count is None:
|
||||
return (response.json()['versions'], None)
|
||||
else:
|
||||
return (None, self._response_to_error(response))
|
||||
return (response.json()['versions'][:max_count], None)
|
||||
|
||||
def get_apps(self):
|
||||
"""Get a list of known applications.
|
||||
|
||||
:returns: list of known applications
|
||||
:rtype: (list of dict, Error)
|
||||
:rtype: (list of dict, errors.Error)
|
||||
"""
|
||||
|
||||
url = self._create_url('v2/apps')
|
||||
|
||||
logger.info('Getting %r', url)
|
||||
response = requests.get(url)
|
||||
logger.info('Got (%r): %r', response.status_code, response.text)
|
||||
response, error = http.get(url, response_to_error=_response_to_error)
|
||||
|
||||
if _success(response.status_code):
|
||||
apps = response.json()['apps']
|
||||
return (apps, None)
|
||||
else:
|
||||
return (None, self._response_to_error(response))
|
||||
if error is not None:
|
||||
return (None, error)
|
||||
|
||||
apps = response.json()['apps']
|
||||
return (apps, None)
|
||||
|
||||
def add_app(self, app_resource):
|
||||
"""Add a new application.
|
||||
@@ -173,7 +160,7 @@ class Client(object):
|
||||
:param app_resource: application resource
|
||||
:type app_resource: dict, bytes or file
|
||||
:returns: the application description
|
||||
:rtype: (dict, Error)
|
||||
:rtype: (dict, errors.Error)
|
||||
"""
|
||||
|
||||
url = self._create_url('v2/apps')
|
||||
@@ -184,14 +171,14 @@ class Client(object):
|
||||
else:
|
||||
app_json = app_resource
|
||||
|
||||
logger.info('Posting %r to %r', app_json, url)
|
||||
response = requests.post(url, json=app_json)
|
||||
logger.info('Got (%r): %r', response.status_code, response.text)
|
||||
response, error = http.post(url,
|
||||
json=app_json,
|
||||
response_to_error=_response_to_error)
|
||||
|
||||
if _success(response.status_code):
|
||||
return (response.json(), None)
|
||||
else:
|
||||
return (None, self._response_to_error(response))
|
||||
if error is not None:
|
||||
return (None, error)
|
||||
|
||||
return (response.json(), None)
|
||||
|
||||
def update_app(self, app_id, payload, force=None):
|
||||
"""Update an application.
|
||||
@@ -203,7 +190,7 @@ class Client(object):
|
||||
:param force: whether to override running deployments
|
||||
:type force: bool
|
||||
:returns: the resulting deployment ID
|
||||
:rtype: (str, Error)
|
||||
:rtype: (str, errors.Error)
|
||||
"""
|
||||
|
||||
app_id = normalize_app_id(app_id)
|
||||
@@ -213,16 +200,17 @@ class Client(object):
|
||||
else:
|
||||
params = {'force': 'true'}
|
||||
|
||||
url = self._create_url('v2/apps{}'.format(app_id), params)
|
||||
url = self._create_url('v2/apps{}'.format(app_id))
|
||||
|
||||
logger.info('Putting %r to %r', payload, url)
|
||||
response = requests.put(url, json=payload)
|
||||
logger.info('Got (%r): %r', response.status_code, response.text)
|
||||
response, error = http.put(url,
|
||||
params=params,
|
||||
json=payload,
|
||||
response_to_error=_response_to_error)
|
||||
|
||||
if _success(response.status_code):
|
||||
return (response.json().get('deploymentId'), None)
|
||||
else:
|
||||
return (None, self._response_to_error(response))
|
||||
if error is not None:
|
||||
return (None, error)
|
||||
|
||||
return (response.json().get('deploymentId'), None)
|
||||
|
||||
def scale_app(self, app_id, instances, force=None):
|
||||
"""Scales an application to the requested number of instances.
|
||||
@@ -234,7 +222,7 @@ class Client(object):
|
||||
:param force: whether to override running deployments
|
||||
:type force: bool
|
||||
:returns: the resulting deployment ID
|
||||
:rtype: (bool, Error)
|
||||
:rtype: (bool, errors.Error)
|
||||
"""
|
||||
|
||||
app_id = normalize_app_id(app_id)
|
||||
@@ -244,17 +232,18 @@ class Client(object):
|
||||
else:
|
||||
params = {'force': 'true'}
|
||||
|
||||
url = self._create_url('v2/apps{}'.format(app_id), params)
|
||||
url = self._create_url('v2/apps{}'.format(app_id))
|
||||
|
||||
logger.info('Putting to %r', url)
|
||||
response = requests.put(url, json={'instances': int(instances)})
|
||||
logger.info('Got (%r): %r', response.status_code, response.text)
|
||||
response, error = http.put(url,
|
||||
params=params,
|
||||
json={'instances': int(instances)},
|
||||
response_to_error=_response_to_error)
|
||||
|
||||
if _success(response.status_code):
|
||||
deployment = response.json()['deploymentId']
|
||||
return (deployment, None)
|
||||
else:
|
||||
return (None, self._response_to_error(response))
|
||||
if error is not None:
|
||||
return (None, error)
|
||||
|
||||
deployment = response.json()['deploymentId']
|
||||
return (deployment, None)
|
||||
|
||||
def stop_app(self, app_id, force=None):
|
||||
"""Scales an application to zero instances.
|
||||
@@ -264,7 +253,7 @@ class Client(object):
|
||||
:param force: whether to override running deployments
|
||||
:type force: bool
|
||||
:returns: the resulting deployment ID
|
||||
:rtype: (bool, Error)
|
||||
:rtype: (bool, errors.Error)
|
||||
"""
|
||||
|
||||
return self.scale_app(app_id, 0, force)
|
||||
@@ -277,7 +266,7 @@ class Client(object):
|
||||
:param force: whether to override running deployments
|
||||
:type force: bool
|
||||
:returns: Error if it failed to remove the app; None otherwise
|
||||
:rtype: Error
|
||||
:rtype: errors.Error
|
||||
"""
|
||||
|
||||
app_id = normalize_app_id(app_id)
|
||||
@@ -287,16 +276,16 @@ class Client(object):
|
||||
else:
|
||||
params = {'force': 'true'}
|
||||
|
||||
url = self._create_url('v2/apps{}'.format(app_id), params)
|
||||
url = self._create_url('v2/apps{}'.format(app_id))
|
||||
|
||||
logger.info('Deleting %r', url)
|
||||
response = requests.delete(url)
|
||||
logger.info('Got (%r)', response.status_code)
|
||||
response, error = http.delete(url,
|
||||
params=params,
|
||||
response_to_error=_response_to_error)
|
||||
|
||||
if _success(response.status_code):
|
||||
return None
|
||||
else:
|
||||
return self._response_to_error(response)
|
||||
if error is not None:
|
||||
return error
|
||||
|
||||
return None
|
||||
|
||||
def restart_app(self, app_id, force=None):
|
||||
"""Performs a rolling restart of all of the tasks.
|
||||
@@ -306,7 +295,7 @@ class Client(object):
|
||||
:param force: whether to override running deployments
|
||||
:type force: bool
|
||||
:returns: the deployment id and version; Error otherwise
|
||||
:rtype: (dict, Error)
|
||||
:rtype: (dict, errors.Error)
|
||||
"""
|
||||
|
||||
app_id = normalize_app_id(app_id)
|
||||
@@ -316,16 +305,16 @@ class Client(object):
|
||||
else:
|
||||
params = {'force': 'true'}
|
||||
|
||||
url = self._create_url('v2/apps{}/restart'.format(app_id), params)
|
||||
url = self._create_url('v2/apps{}/restart'.format(app_id))
|
||||
|
||||
logger.info('Posting %r', url)
|
||||
response = requests.post(url)
|
||||
logger.info('Got (%r): %r', response.status_code, response.text)
|
||||
response, error = http.post(url,
|
||||
params=params,
|
||||
response_to_error=_response_to_error)
|
||||
|
||||
if _success(response.status_code):
|
||||
return (response.json(), None)
|
||||
else:
|
||||
return (None, self._response_to_error(response))
|
||||
if error is not None:
|
||||
return (None, error)
|
||||
|
||||
return (response.json(), None)
|
||||
|
||||
def get_deployment(self, deployment_id):
|
||||
"""Returns a deployment.
|
||||
@@ -338,19 +327,18 @@ class Client(object):
|
||||
|
||||
url = self._create_url('v2/deployments')
|
||||
|
||||
logger.info('Getting %r', url)
|
||||
response = requests.get(url)
|
||||
logger.info('Got (%r): %r', response.status_code, response.text)
|
||||
response, error = http.get(url,
|
||||
response_to_error=_response_to_error)
|
||||
|
||||
if _success(response.status_code):
|
||||
deployment = next(
|
||||
(deployment for deployment in response.json()
|
||||
if deployment_id == deployment['id']),
|
||||
None)
|
||||
if error is not None:
|
||||
return (None, error)
|
||||
|
||||
return (deployment, None)
|
||||
else:
|
||||
return (None, self._response_to_error(response))
|
||||
deployment = next(
|
||||
(deployment for deployment in response.json()
|
||||
if deployment_id == deployment['id']),
|
||||
None)
|
||||
|
||||
return (deployment, None)
|
||||
|
||||
def get_deployments(self, app_id=None):
|
||||
"""Returns a list of deployments, optionally limited to an app.
|
||||
@@ -363,23 +351,21 @@ class Client(object):
|
||||
|
||||
url = self._create_url('v2/deployments')
|
||||
|
||||
logger.info('Getting %r', url)
|
||||
response = requests.get(url)
|
||||
logger.info('Got (%r): %r', response.status_code, response.text)
|
||||
response, error = http.get(url, response_to_error=_response_to_error)
|
||||
|
||||
if _success(response.status_code):
|
||||
if app_id is not None:
|
||||
app_id = normalize_app_id(app_id)
|
||||
deployments = [
|
||||
deployment for deployment in response.json()
|
||||
if app_id in deployment['affectedApps']
|
||||
]
|
||||
else:
|
||||
deployments = response.json()
|
||||
if error is not None:
|
||||
return (None, error)
|
||||
|
||||
return (deployments, None)
|
||||
if app_id is not None:
|
||||
app_id = normalize_app_id(app_id)
|
||||
deployments = [
|
||||
deployment for deployment in response.json()
|
||||
if app_id in deployment['affectedApps']
|
||||
]
|
||||
else:
|
||||
return (None, self._response_to_error(response))
|
||||
deployments = response.json()
|
||||
|
||||
return (deployments, None)
|
||||
|
||||
def _cancel_deployment(self, deployment_id, force):
|
||||
"""Cancels an application deployment.
|
||||
@@ -400,18 +386,13 @@ class Client(object):
|
||||
else:
|
||||
params = {'force': 'true'}
|
||||
|
||||
url = self._create_url(
|
||||
'v2/deployments/{}'.format(deployment_id),
|
||||
params)
|
||||
url = self._create_url('v2/deployments/{}'.format(deployment_id))
|
||||
|
||||
logger.info('Deleting %r', url)
|
||||
response = requests.delete(url)
|
||||
logger.info('Got (%r): %r', response.status_code, response.text)
|
||||
_, error = http.delete(url,
|
||||
params=params,
|
||||
response_to_error=_response_to_error)
|
||||
|
||||
if _success(response.status_code):
|
||||
return None
|
||||
else:
|
||||
return self._response_to_error(response)
|
||||
return error
|
||||
|
||||
def rollback_deployment(self, deployment_id):
|
||||
"""Rolls back an application deployment.
|
||||
@@ -436,26 +417,6 @@ class Client(object):
|
||||
return self._cancel_deployment(deployment_id, True)
|
||||
|
||||
|
||||
class Error(errors.Error):
|
||||
""" Class for describing errors while talking to the Marathon server.
|
||||
|
||||
:param message: Error message
|
||||
:type message: str
|
||||
"""
|
||||
|
||||
def __init__(self, message):
|
||||
self._message = message
|
||||
|
||||
def error(self):
|
||||
"""Return error message
|
||||
|
||||
:returns: The error message
|
||||
:rtype: str
|
||||
"""
|
||||
|
||||
return self._message
|
||||
|
||||
|
||||
def normalize_app_id(app_id):
|
||||
"""Normalizes the application id.
|
||||
|
||||
@@ -466,15 +427,3 @@ def normalize_app_id(app_id):
|
||||
"""
|
||||
|
||||
return quote('/' + app_id.strip('/'))
|
||||
|
||||
|
||||
def _success(status_code):
|
||||
"""Returns true if the success status is between [200, 300).
|
||||
|
||||
:param response_status: the http response status
|
||||
:type response_status: int
|
||||
:returns: True for success status; False otherwise
|
||||
:rtype: bool
|
||||
"""
|
||||
|
||||
return status_code >= 200 and status_code < 300
|
||||
|
||||
Reference in New Issue
Block a user