Merge "Add Task Monitor support for async operations"

This commit is contained in:
Zuul 2020-01-27 15:25:50 +00:00 committed by Gerrit Code Review
commit 544f000fe6
6 changed files with 361 additions and 22 deletions

View File

@ -0,0 +1,5 @@
---
features:
- |
Add support for a Task Monitor resource to be able to monitor the state
of asynchronous operations.

View File

@ -17,8 +17,10 @@ import logging
from urllib import parse as urlparse
import requests
import time
from sushy import exceptions
from sushy.resources.task_monitor import TaskMonitor
LOG = logging.getLogger(__name__)
@ -61,15 +63,17 @@ class Connector(object):
"""Close this connector and the associated HTTP session."""
self._session.close()
def _op(self, method, path='', data=None, headers=None,
**extra_session_req_kwargs):
def _op(self, method, path='', data=None, headers=None, blocking=False,
timeout=60, **extra_session_req_kwargs):
"""Generic RESTful request handler.
:param method: The HTTP method to be used, e.g: GET, POST,
PUT, PATCH, etc...
:param path: The sub-URI path to the resource.
:param path: The sub-URI or absolute URL path to the resource.
:param data: Optional JSON data.
:param headers: Optional dictionary of headers.
:param blocking: Whether to block for asynchronous operations.
:param timeout: Max time in seconds to wait for blocking async call.
:param extra_session_req_kwargs: Optional keyword argument to pass
requests library arguments which would pass on to requests session
object.
@ -77,16 +81,19 @@ class Connector(object):
:raises: ConnectionError
:raises: HTTPError
"""
url = urlparse.urljoin(self._url, path)
url = path if urlparse.urlparse(path).netloc else urlparse.urljoin(
self._url, path)
headers = headers or {}
if not any(k.lower() == 'odata-version' for k in headers):
headers['OData-Version'] = '4.0'
# TODO(lucasagomes): We should mask the data to remove sensitive
# information
LOG.debug('HTTP request: %(method)s %(url)s; headers: %(headers)s; '
'body: %(data)s; session arguments: %(session)s;',
'body: %(data)s; blocking: %(blocking)s; timeout: '
'%(timeout)s; session arguments: %(session)s;',
{'method': method, 'url': url, 'headers': headers,
'data': data, 'session': extra_session_req_kwargs})
'data': data, 'blocking': blocking, 'timeout': timeout,
'session': extra_session_req_kwargs})
try:
response = self._session.request(method, url, json=data,
headers=headers,
@ -110,6 +117,29 @@ class Connector(object):
else:
raise
if blocking and response.status_code == 202:
if not response.headers.get('location'):
m = ('HTTP response for %(method)s request to %(url)s '
'returned status 202, but no Location header'
% {'method': method, 'url': url})
raise exceptions.ConnectionError(url=url, error=m)
timeout_at = time.time() + timeout
mon = (TaskMonitor(self, response.headers.get('location'))
.set_retry_after(response.headers.get('retry-after')))
while mon.in_progress:
LOG.debug('Blocking for in-progress %(method)s call to '
'%(url)s; sleeping for %(sleep)s seconds',
{'method': method, 'url': url,
'sleep': mon.sleep_for})
time.sleep(mon.sleep_for)
if time.time() >= timeout_at and mon.in_progress:
m = ('Timeout waiting for blocking %(method)s '
'request to %(url)s (timeout = %(timeout)s)'
% {'method': method, 'url': url,
'timeout': timeout})
raise exceptions.ConnectionError(url=url, error=m)
response = mon.response
LOG.debug('HTTP response for %(method)s %(url)s: '
'status code: %(code)s',
{'method': method, 'url': url,
@ -117,13 +147,15 @@ class Connector(object):
return response
def get(self, path='', data=None, headers=None,
**extra_session_req_kwargs):
def get(self, path='', data=None, headers=None, blocking=False,
timeout=60, **extra_session_req_kwargs):
"""HTTP GET method.
:param path: Optional sub-URI path to the resource.
:param data: Optional JSON data.
:param headers: Optional dictionary of headers.
:param blocking: Whether to block for asynchronous operations.
:param timeout: Max time in seconds to wait for blocking async call.
:param extra_session_req_kwargs: Optional keyword argument to pass
requests library arguments which would pass on to requests session
object.
@ -132,15 +164,18 @@ class Connector(object):
:raises: HTTPError
"""
return self._op('GET', path, data=data, headers=headers,
blocking=blocking, timeout=timeout,
**extra_session_req_kwargs)
def post(self, path='', data=None, headers=None,
**extra_session_req_kwargs):
def post(self, path='', data=None, headers=None, blocking=False,
timeout=60, **extra_session_req_kwargs):
"""HTTP POST method.
:param path: Optional sub-URI path to the resource.
:param data: Optional JSON data.
:param headers: Optional dictionary of headers.
:param blocking: Whether to block for asynchronous operations.
:param timeout: Max time in seconds to wait for blocking async call.
:param extra_session_req_kwargs: Optional keyword argument to pass
requests library arguments which would pass on to requests session
object.
@ -149,15 +184,18 @@ class Connector(object):
:raises: HTTPError
"""
return self._op('POST', path, data=data, headers=headers,
blocking=blocking, timeout=timeout,
**extra_session_req_kwargs)
def patch(self, path='', data=None, headers=None,
**extra_session_req_kwargs):
def patch(self, path='', data=None, headers=None, blocking=False,
timeout=60, **extra_session_req_kwargs):
"""HTTP PATCH method.
:param path: Optional sub-URI path to the resource.
:param data: Optional JSON data.
:param headers: Optional dictionary of headers.
:param blocking: Whether to block for asynchronous operations.
:param timeout: Max time in seconds to wait for blocking async call.
:param extra_session_req_kwargs: Optional keyword argument to pass
requests library arguments which would pass on to requests session
object.
@ -166,15 +204,18 @@ class Connector(object):
:raises: HTTPError
"""
return self._op('PATCH', path, data=data, headers=headers,
blocking=blocking, timeout=timeout,
**extra_session_req_kwargs)
def put(self, path='', data=None, headers=None,
**extra_session_req_kwargs):
def put(self, path='', data=None, headers=None, blocking=False,
timeout=60, **extra_session_req_kwargs):
"""HTTP PUT method.
:param path: Optional sub-URI path to the resource.
:param data: Optional JSON data.
:param headers: Optional dictionary of headers.
:param blocking: Whether to block for asynchronous operations.
:param timeout: Max time in seconds to wait for blocking async call.
:param extra_session_req_kwargs: Optional keyword argument to pass
requests library arguments which would pass on to requests session
object.
@ -183,15 +224,18 @@ class Connector(object):
:raises: HTTPError
"""
return self._op('PUT', path, data=data, headers=headers,
blocking=blocking, timeout=timeout,
**extra_session_req_kwargs)
def delete(self, path='', data=None, headers=None,
**extra_session_req_kwargs):
def delete(self, path='', data=None, headers=None, blocking=False,
timeout=60, **extra_session_req_kwargs):
"""HTTP DELETE method.
:param path: Optional sub-URI path to the resource.
:param data: Optional JSON data.
:param headers: Optional dictionary of headers.
:param blocking: Whether to block for asynchronous operations.
:param timeout: Max time in seconds to wait for blocking async call.
:param extra_session_req_kwargs: Optional keyword argument to pass
requests library arguments which would pass on to requests session
object.
@ -200,6 +244,7 @@ class Connector(object):
:raises: HTTPError
"""
return self._op('DELETE', path, data=data, headers=headers,
blocking=blocking, timeout=timeout,
**extra_session_req_kwargs)
def __enter__(self):

View File

@ -338,7 +338,8 @@ class JsonDataReader(AbstractJsonReader):
def get_json(self):
"""Gets JSON file from URI directly"""
return self._conn.get(path=self._path).json()
data = self._conn.get(path=self._path)
return data.json() if data.content else {}
class JsonPublicFileReader(AbstractJsonReader):

View File

@ -0,0 +1,114 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# This is described in Redfish specification section "Asynchronous operations"
# www.dmtf.org/sites/default/files/standards/documents/DSP0266_1.7.0.pdf
from datetime import datetime
from datetime import timedelta
import logging
from dateutil import parser
from sushy.resources import base
LOG = logging.getLogger(__name__)
class TaskMonitor(base.ResourceBase):
def __init__(self,
connector,
path='',
redfish_version=None):
"""A class representing a Redfish Task Monitor
:param connector: A Connector instance
:param path: sub-URI path to the resource.
:param redfish_version: The version of Redfish. Used to construct
the object according to schema of the given version.
"""
super(TaskMonitor, self).__init__(connector, path, redfish_version)
self._retry_after = None
self._location_header = None
self._in_progress = True
self._response = None
@staticmethod
def _to_datetime(retry_after):
if isinstance(retry_after, int) or retry_after.isdigit():
# Retry-After: 120
return datetime.now() + timedelta(seconds=int(retry_after))
else:
# Retry-After: Fri, 31 Dec 1999 23:59:59 GMT
return parser.parse(retry_after)
def set_retry_after(self, value):
"""Set the time the client should wait before querying the task status
:param value: The value of the Retry-After header, which can be the
number of seconds to wait or an `HTTP-date` string as
defined by RFC 7231
:return: The TaskMonitor object
"""
self._retry_after = self._to_datetime(value or 1)
return self
@property
def retry_after(self):
"""Time the client should wait before querying the task status
:return: The Retry-After time in `datetime` format
"""
return self._retry_after
@property
def sleep_for(self):
"""Seconds the client should wait before querying the operation status
:return: The number of seconds to wait
"""
return max(0, (self._retry_after - datetime.now()).total_seconds())
@property
def location_header(self):
"""The Location header returned from the GET on the Task Monitor
:return: The Location header (an absolute URL)
"""
return self._location_header
@property
def in_progress(self):
"""Checks the status of the async task
:return: True if the async task is still in progress, False otherwise
"""
if not self._in_progress:
return False
r = self._conn.get(self._path)
self._response = r
self._location_header = r.headers.get('location')
if r.status_code == 202:
self.set_retry_after(r.headers.get('retry-after'))
else:
self._in_progress = False
return self._in_progress
@property
def response(self):
"""The response from the last TaskMonitor in_progress check
:return: The `requests` response object or None
"""
return self._response

View File

@ -0,0 +1,122 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
from datetime import timedelta
from dateutil import parser
import mock
from sushy.resources.task_monitor import TaskMonitor
from sushy.tests.unit import base
class TaskMonitorTestCase(base.TestCase):
def setUp(self):
super(TaskMonitorTestCase, self).setUp()
self.conn = mock.Mock()
self.data = {'fake': 'data'}
self.http_date = 'Fri, 31 Dec 1999 23:59:59 GMT'
self.seconds = 120
self.datetime = parser.parse(self.http_date)
self.req_headers = {'X-Fake': 'header'}
self.res_headers1 = {'location': 'https://sample.com/foo/bar',
'retry-after': self.http_date}
self.res_headers2 = {'location': 'https://sample.com/foo/bar',
'retry-after': str(self.seconds)}
def test_task_in_progress(self):
self.conn.post.return_value.status_code = 202
self.conn.post.return_value.headers = self.res_headers1.copy()
self.conn.get.return_value.status_code = 202
self.conn.get.return_value.headers = self.res_headers1.copy()
self.conn.get.return_value.json.return_value = {}
res = self.conn.post(path='fake/path', data=self.data.copy(),
headers=self.req_headers.copy())
tm = TaskMonitor(self.conn, res.headers.get('location'))\
.set_retry_after(res.headers.get('retry-after'))
self.assertIsNotNone(tm)
self.assertTrue(tm.in_progress)
def test_task_not_in_progress(self):
self.conn.post.return_value.status_code = 202
self.conn.post.return_value.headers = self.res_headers1.copy()
self.conn.get.return_value.status_code = 201
self.conn.get.return_value.json.return_value = self.data.copy()
res = self.conn.post(path='fake/path', data=self.data.copy(),
headers=self.req_headers.copy())
tm = TaskMonitor(self.conn, res.headers.get('location'))\
.set_retry_after(res.headers.get('retry-after'))
self.assertIsNotNone(tm)
self.assertFalse(tm.in_progress)
def test_retry_after_http_date(self):
self.conn.post.return_value.status_code = 202
self.conn.post.return_value.headers = self.res_headers1.copy()
self.conn.get.return_value.status_code = 202
self.conn.get.return_value.headers = self.res_headers1.copy()
self.conn.get.return_value.json.return_value = {}
res = self.conn.post(path='fake/path', data=self.data.copy(),
headers=self.req_headers.copy())
tm = TaskMonitor(self.conn, res.headers.get('location')) \
.set_retry_after(res.headers.get('retry-after'))
self.assertIsNotNone(tm)
self.assertEqual(self.datetime, tm.retry_after)
def test_retry_after_seconds(self):
self.conn.post.return_value.status_code = 202
self.conn.post.return_value.headers = self.res_headers2.copy()
self.conn.get.return_value.status_code = 202
self.conn.get.return_value.headers = self.res_headers2.copy()
self.conn.get.return_value.json.return_value = {}
start = datetime.now() + timedelta(seconds=self.seconds)
res = self.conn.post(path='fake/path', data=self.data.copy(),
headers=self.req_headers.copy())
tm = TaskMonitor(self.conn, res.headers.get('location')) \
.set_retry_after(res.headers.get('retry-after'))
end = datetime.now() + timedelta(seconds=self.seconds)
self.assertIsNotNone(tm)
self.assertTrue(start <= tm.retry_after <= end)
def test_sleep_for(self):
self.conn.post.return_value.status_code = 202
self.conn.post.return_value.headers = self.res_headers2.copy()
self.conn.get.return_value.status_code = 202
self.conn.get.return_value.headers = self.res_headers2.copy()
self.conn.get.return_value.json.return_value = {}
start = datetime.now()
res = self.conn.post(path='fake/path', data=self.data.copy(),
headers=self.req_headers.copy())
tm = TaskMonitor(self.conn, res.headers.get('location')) \
.set_retry_after(res.headers.get('retry-after'))
self.assertIsNotNone(tm)
sleep_for = tm.sleep_for
elapsed = (datetime.now() - start).total_seconds()
self.assertTrue(self.seconds - elapsed <= sleep_for <= self.seconds)
def test_response(self):
self.conn.post.return_value.status_code = 202
self.conn.post.return_value.headers = self.res_headers1.copy()
self.conn.get.return_value.status_code = 201
self.conn.get.return_value.json.return_value = self.data.copy()
res = self.conn.post(path='fake/path', data=self.data.copy(),
headers=self.req_headers.copy())
tm = TaskMonitor(self.conn, res.headers.get('location')) \
.set_retry_after(res.headers.get('retry-after'))
self.assertIsNotNone(tm)
self.assertFalse(tm.in_progress)
response = tm.response
self.assertEqual(201, response.status_code)
self.assertEqual(self.data.copy(), response.json())

View File

@ -48,35 +48,80 @@ class ConnectorMethodsTestCase(base.TestCase):
self.conn.get(path='fake/path', data=self.data.copy(),
headers=self.headers.copy())
mock__op.assert_called_once_with(mock.ANY, 'GET', 'fake/path',
data=self.data, headers=self.headers)
data=self.data, headers=self.headers,
blocking=False, timeout=60)
@mock.patch.object(connector.Connector, '_op', autospec=True)
def test_get_blocking(self, mock__op):
self.conn.get(path='fake/path', data=self.data.copy(),
headers=self.headers.copy(), blocking=True)
mock__op.assert_called_once_with(mock.ANY, 'GET', 'fake/path',
data=self.data, headers=self.headers,
blocking=True, timeout=60)
@mock.patch.object(connector.Connector, '_op', autospec=True)
def test_post(self, mock__op):
self.conn.post(path='fake/path', data=self.data.copy(),
headers=self.headers.copy())
mock__op.assert_called_once_with(mock.ANY, 'POST', 'fake/path',
data=self.data, headers=self.headers)
data=self.data, headers=self.headers,
blocking=False, timeout=60)
@mock.patch.object(connector.Connector, '_op', autospec=True)
def test_post_blocking(self, mock__op):
self.conn.post(path='fake/path', data=self.data.copy(),
headers=self.headers.copy(), blocking=True, timeout=120)
mock__op.assert_called_once_with(mock.ANY, 'POST', 'fake/path',
data=self.data, headers=self.headers,
blocking=True, timeout=120)
@mock.patch.object(connector.Connector, '_op', autospec=True)
def test_patch(self, mock__op):
self.conn.patch(path='fake/path', data=self.data.copy(),
headers=self.headers.copy())
mock__op.assert_called_once_with(mock.ANY, 'PATCH', 'fake/path',
data=self.data, headers=self.headers)
data=self.data, headers=self.headers,
blocking=False, timeout=60)
@mock.patch.object(connector.Connector, '_op', autospec=True)
def test_patch_blocking(self, mock__op):
self.conn.patch(path='fake/path', data=self.data.copy(),
headers=self.headers.copy(), blocking=True)
mock__op.assert_called_once_with(mock.ANY, 'PATCH', 'fake/path',
data=self.data, headers=self.headers,
blocking=True, timeout=60)
@mock.patch.object(connector.Connector, '_op', autospec=True)
def test_put(self, mock__op):
self.conn.put(path='fake/path', data=self.data.copy(),
headers=self.headers.copy())
mock__op.assert_called_once_with(mock.ANY, 'PUT', 'fake/path',
data=self.data, headers=self.headers)
data=self.data, headers=self.headers,
blocking=False, timeout=60)
@mock.patch.object(connector.Connector, '_op', autospec=True)
def test_put_blocking(self, mock__op):
self.conn.put(path='fake/path', data=self.data.copy(),
headers=self.headers.copy(), blocking=True)
mock__op.assert_called_once_with(mock.ANY, 'PUT', 'fake/path',
data=self.data, headers=self.headers,
blocking=True, timeout=60)
@mock.patch.object(connector.Connector, '_op', autospec=True)
def test_delete(self, mock__op):
self.conn.delete(path='fake/path', data=self.data.copy(),
headers=self.headers.copy())
mock__op.assert_called_once_with(mock.ANY, 'DELETE', 'fake/path',
data=self.data, headers=self.headers)
data=self.data, headers=self.headers,
blocking=False, timeout=60)
@mock.patch.object(connector.Connector, '_op', autospec=True)
def test_delete_blocking(self, mock__op):
self.conn.delete(path='fake/path', data=self.data.copy(),
headers=self.headers.copy(), blocking=True)
mock__op.assert_called_once_with(mock.ANY, 'DELETE', 'fake/path',
data=self.data, headers=self.headers,
blocking=True, timeout=60)
def test_set_auth(self):
mock_auth = mock.MagicMock()
@ -286,3 +331,10 @@ class ConnectorOpTestCase(base.TestCase):
self.conn._op('GET', 'http://foo.bar')
exc = cm.exception
self.assertEqual(http_client.FORBIDDEN, exc.status_code)
def test_blocking_no_location_header(self):
self.request.return_value.status_code = http_client.ACCEPTED
self.request.return_value.headers = {'retry-after': 5}
with self.assertRaisesRegex(exceptions.ConnectionError,
'status 202, but no Location header'):
self.conn._op('POST', 'http://foo.bar', blocking=True)