Add Task Monitor support for async operations

Redfish services that support asynchronous operations implement the
Task service, Task resource, and Task Monitor. When performing long
running update operations, the service returns a status of 202
Accepted. When returning a status code of 202, the response includes
a Location header containing the URL of the Task Monitor. The response
may also include the Retry-After header to specify the amount of time
the client should wait before querying the operation status.

The Task Monitor is an opaque URL that the client can query via a GET
request. While the async operation is still in progress, the service
returns a 202 Accepted status. The body of the response may be a Task
resource or it may be empty.

Once the operation completes, a GET on the Task Monitor will return a
status that indicates the result of the operation (e.g., 200, 201,
4XX).

A new TaskMonitor class is added to represent the Task Monitor and
give the sushy user the ability to monitor the status of an async
operation.

The Connector class is updated allow an option to make HTTP requests
blocking. This change utilizes the newly added TaskMonitor.

Change-Id: Ice6700806710f37b590f6b879ec38656326b39c6
Story: 2003514
Task: 30719
This commit is contained in:
Bill Dodd
2019-07-11 13:17:08 -05:00
committed by Julia Kreger
parent c944ab44d1
commit 1f60044bd8
6 changed files with 361 additions and 22 deletions

View File

@@ -0,0 +1,5 @@
---
features:
- |
Add support for a Task Monitor resource to be able to monitor the state
of asynchronous operations.

View File

@@ -17,8 +17,10 @@ import logging
from urllib import parse as urlparse
import requests
import time
from sushy import exceptions
from sushy.resources.task_monitor import TaskMonitor
LOG = logging.getLogger(__name__)
@@ -61,15 +63,17 @@ class Connector(object):
"""Close this connector and the associated HTTP session."""
self._session.close()
def _op(self, method, path='', data=None, headers=None,
**extra_session_req_kwargs):
def _op(self, method, path='', data=None, headers=None, blocking=False,
timeout=60, **extra_session_req_kwargs):
"""Generic RESTful request handler.
:param method: The HTTP method to be used, e.g: GET, POST,
PUT, PATCH, etc...
:param path: The sub-URI path to the resource.
:param path: The sub-URI or absolute URL path to the resource.
:param data: Optional JSON data.
:param headers: Optional dictionary of headers.
:param blocking: Whether to block for asynchronous operations.
:param timeout: Max time in seconds to wait for blocking async call.
:param extra_session_req_kwargs: Optional keyword argument to pass
requests library arguments which would pass on to requests session
object.
@@ -77,16 +81,19 @@ class Connector(object):
:raises: ConnectionError
:raises: HTTPError
"""
url = urlparse.urljoin(self._url, path)
url = path if urlparse.urlparse(path).netloc else urlparse.urljoin(
self._url, path)
headers = headers or {}
if not any(k.lower() == 'odata-version' for k in headers):
headers['OData-Version'] = '4.0'
# TODO(lucasagomes): We should mask the data to remove sensitive
# information
LOG.debug('HTTP request: %(method)s %(url)s; headers: %(headers)s; '
'body: %(data)s; session arguments: %(session)s;',
'body: %(data)s; blocking: %(blocking)s; timeout: '
'%(timeout)s; session arguments: %(session)s;',
{'method': method, 'url': url, 'headers': headers,
'data': data, 'session': extra_session_req_kwargs})
'data': data, 'blocking': blocking, 'timeout': timeout,
'session': extra_session_req_kwargs})
try:
response = self._session.request(method, url, json=data,
headers=headers,
@@ -110,6 +117,29 @@ class Connector(object):
else:
raise
if blocking and response.status_code == 202:
if not response.headers.get('location'):
m = ('HTTP response for %(method)s request to %(url)s '
'returned status 202, but no Location header'
% {'method': method, 'url': url})
raise exceptions.ConnectionError(url=url, error=m)
timeout_at = time.time() + timeout
mon = (TaskMonitor(self, response.headers.get('location'))
.set_retry_after(response.headers.get('retry-after')))
while mon.in_progress:
LOG.debug('Blocking for in-progress %(method)s call to '
'%(url)s; sleeping for %(sleep)s seconds',
{'method': method, 'url': url,
'sleep': mon.sleep_for})
time.sleep(mon.sleep_for)
if time.time() >= timeout_at and mon.in_progress:
m = ('Timeout waiting for blocking %(method)s '
'request to %(url)s (timeout = %(timeout)s)'
% {'method': method, 'url': url,
'timeout': timeout})
raise exceptions.ConnectionError(url=url, error=m)
response = mon.response
LOG.debug('HTTP response for %(method)s %(url)s: '
'status code: %(code)s',
{'method': method, 'url': url,
@@ -117,13 +147,15 @@ class Connector(object):
return response
def get(self, path='', data=None, headers=None,
**extra_session_req_kwargs):
def get(self, path='', data=None, headers=None, blocking=False,
timeout=60, **extra_session_req_kwargs):
"""HTTP GET method.
:param path: Optional sub-URI path to the resource.
:param data: Optional JSON data.
:param headers: Optional dictionary of headers.
:param blocking: Whether to block for asynchronous operations.
:param timeout: Max time in seconds to wait for blocking async call.
:param extra_session_req_kwargs: Optional keyword argument to pass
requests library arguments which would pass on to requests session
object.
@@ -132,15 +164,18 @@ class Connector(object):
:raises: HTTPError
"""
return self._op('GET', path, data=data, headers=headers,
blocking=blocking, timeout=timeout,
**extra_session_req_kwargs)
def post(self, path='', data=None, headers=None,
**extra_session_req_kwargs):
def post(self, path='', data=None, headers=None, blocking=False,
timeout=60, **extra_session_req_kwargs):
"""HTTP POST method.
:param path: Optional sub-URI path to the resource.
:param data: Optional JSON data.
:param headers: Optional dictionary of headers.
:param blocking: Whether to block for asynchronous operations.
:param timeout: Max time in seconds to wait for blocking async call.
:param extra_session_req_kwargs: Optional keyword argument to pass
requests library arguments which would pass on to requests session
object.
@@ -149,15 +184,18 @@ class Connector(object):
:raises: HTTPError
"""
return self._op('POST', path, data=data, headers=headers,
blocking=blocking, timeout=timeout,
**extra_session_req_kwargs)
def patch(self, path='', data=None, headers=None,
**extra_session_req_kwargs):
def patch(self, path='', data=None, headers=None, blocking=False,
timeout=60, **extra_session_req_kwargs):
"""HTTP PATCH method.
:param path: Optional sub-URI path to the resource.
:param data: Optional JSON data.
:param headers: Optional dictionary of headers.
:param blocking: Whether to block for asynchronous operations.
:param timeout: Max time in seconds to wait for blocking async call.
:param extra_session_req_kwargs: Optional keyword argument to pass
requests library arguments which would pass on to requests session
object.
@@ -166,15 +204,18 @@ class Connector(object):
:raises: HTTPError
"""
return self._op('PATCH', path, data=data, headers=headers,
blocking=blocking, timeout=timeout,
**extra_session_req_kwargs)
def put(self, path='', data=None, headers=None,
**extra_session_req_kwargs):
def put(self, path='', data=None, headers=None, blocking=False,
timeout=60, **extra_session_req_kwargs):
"""HTTP PUT method.
:param path: Optional sub-URI path to the resource.
:param data: Optional JSON data.
:param headers: Optional dictionary of headers.
:param blocking: Whether to block for asynchronous operations.
:param timeout: Max time in seconds to wait for blocking async call.
:param extra_session_req_kwargs: Optional keyword argument to pass
requests library arguments which would pass on to requests session
object.
@@ -183,15 +224,18 @@ class Connector(object):
:raises: HTTPError
"""
return self._op('PUT', path, data=data, headers=headers,
blocking=blocking, timeout=timeout,
**extra_session_req_kwargs)
def delete(self, path='', data=None, headers=None,
**extra_session_req_kwargs):
def delete(self, path='', data=None, headers=None, blocking=False,
timeout=60, **extra_session_req_kwargs):
"""HTTP DELETE method.
:param path: Optional sub-URI path to the resource.
:param data: Optional JSON data.
:param headers: Optional dictionary of headers.
:param blocking: Whether to block for asynchronous operations.
:param timeout: Max time in seconds to wait for blocking async call.
:param extra_session_req_kwargs: Optional keyword argument to pass
requests library arguments which would pass on to requests session
object.
@@ -200,6 +244,7 @@ class Connector(object):
:raises: HTTPError
"""
return self._op('DELETE', path, data=data, headers=headers,
blocking=blocking, timeout=timeout,
**extra_session_req_kwargs)
def __enter__(self):

View File

@@ -338,7 +338,8 @@ class JsonDataReader(AbstractJsonReader):
def get_json(self):
"""Gets JSON file from URI directly"""
return self._conn.get(path=self._path).json()
data = self._conn.get(path=self._path)
return data.json() if data.content else {}
class JsonPublicFileReader(AbstractJsonReader):

View File

@@ -0,0 +1,114 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# This is described in Redfish specification section "Asynchronous operations"
# www.dmtf.org/sites/default/files/standards/documents/DSP0266_1.7.0.pdf
from datetime import datetime
from datetime import timedelta
import logging
from dateutil import parser
from sushy.resources import base
LOG = logging.getLogger(__name__)
class TaskMonitor(base.ResourceBase):
def __init__(self,
connector,
path='',
redfish_version=None):
"""A class representing a Redfish Task Monitor
:param connector: A Connector instance
:param path: sub-URI path to the resource.
:param redfish_version: The version of Redfish. Used to construct
the object according to schema of the given version.
"""
super(TaskMonitor, self).__init__(connector, path, redfish_version)
self._retry_after = None
self._location_header = None
self._in_progress = True
self._response = None
@staticmethod
def _to_datetime(retry_after):
if isinstance(retry_after, int) or retry_after.isdigit():
# Retry-After: 120
return datetime.now() + timedelta(seconds=int(retry_after))
else:
# Retry-After: Fri, 31 Dec 1999 23:59:59 GMT
return parser.parse(retry_after)
def set_retry_after(self, value):
"""Set the time the client should wait before querying the task status
:param value: The value of the Retry-After header, which can be the
number of seconds to wait or an `HTTP-date` string as
defined by RFC 7231
:return: The TaskMonitor object
"""
self._retry_after = self._to_datetime(value or 1)
return self
@property
def retry_after(self):
"""Time the client should wait before querying the task status
:return: The Retry-After time in `datetime` format
"""
return self._retry_after
@property
def sleep_for(self):
"""Seconds the client should wait before querying the operation status
:return: The number of seconds to wait
"""
return max(0, (self._retry_after - datetime.now()).total_seconds())
@property
def location_header(self):
"""The Location header returned from the GET on the Task Monitor
:return: The Location header (an absolute URL)
"""
return self._location_header
@property
def in_progress(self):
"""Checks the status of the async task
:return: True if the async task is still in progress, False otherwise
"""
if not self._in_progress:
return False
r = self._conn.get(self._path)
self._response = r
self._location_header = r.headers.get('location')
if r.status_code == 202:
self.set_retry_after(r.headers.get('retry-after'))
else:
self._in_progress = False
return self._in_progress
@property
def response(self):
"""The response from the last TaskMonitor in_progress check
:return: The `requests` response object or None
"""
return self._response

View File

@@ -0,0 +1,122 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
from datetime import timedelta
from dateutil import parser
import mock
from sushy.resources.task_monitor import TaskMonitor
from sushy.tests.unit import base
class TaskMonitorTestCase(base.TestCase):
def setUp(self):
super(TaskMonitorTestCase, self).setUp()
self.conn = mock.Mock()
self.data = {'fake': 'data'}
self.http_date = 'Fri, 31 Dec 1999 23:59:59 GMT'
self.seconds = 120
self.datetime = parser.parse(self.http_date)
self.req_headers = {'X-Fake': 'header'}
self.res_headers1 = {'location': 'https://sample.com/foo/bar',
'retry-after': self.http_date}
self.res_headers2 = {'location': 'https://sample.com/foo/bar',
'retry-after': str(self.seconds)}
def test_task_in_progress(self):
self.conn.post.return_value.status_code = 202
self.conn.post.return_value.headers = self.res_headers1.copy()
self.conn.get.return_value.status_code = 202
self.conn.get.return_value.headers = self.res_headers1.copy()
self.conn.get.return_value.json.return_value = {}
res = self.conn.post(path='fake/path', data=self.data.copy(),
headers=self.req_headers.copy())
tm = TaskMonitor(self.conn, res.headers.get('location'))\
.set_retry_after(res.headers.get('retry-after'))
self.assertIsNotNone(tm)
self.assertTrue(tm.in_progress)
def test_task_not_in_progress(self):
self.conn.post.return_value.status_code = 202
self.conn.post.return_value.headers = self.res_headers1.copy()
self.conn.get.return_value.status_code = 201
self.conn.get.return_value.json.return_value = self.data.copy()
res = self.conn.post(path='fake/path', data=self.data.copy(),
headers=self.req_headers.copy())
tm = TaskMonitor(self.conn, res.headers.get('location'))\
.set_retry_after(res.headers.get('retry-after'))
self.assertIsNotNone(tm)
self.assertFalse(tm.in_progress)
def test_retry_after_http_date(self):
self.conn.post.return_value.status_code = 202
self.conn.post.return_value.headers = self.res_headers1.copy()
self.conn.get.return_value.status_code = 202
self.conn.get.return_value.headers = self.res_headers1.copy()
self.conn.get.return_value.json.return_value = {}
res = self.conn.post(path='fake/path', data=self.data.copy(),
headers=self.req_headers.copy())
tm = TaskMonitor(self.conn, res.headers.get('location')) \
.set_retry_after(res.headers.get('retry-after'))
self.assertIsNotNone(tm)
self.assertEqual(self.datetime, tm.retry_after)
def test_retry_after_seconds(self):
self.conn.post.return_value.status_code = 202
self.conn.post.return_value.headers = self.res_headers2.copy()
self.conn.get.return_value.status_code = 202
self.conn.get.return_value.headers = self.res_headers2.copy()
self.conn.get.return_value.json.return_value = {}
start = datetime.now() + timedelta(seconds=self.seconds)
res = self.conn.post(path='fake/path', data=self.data.copy(),
headers=self.req_headers.copy())
tm = TaskMonitor(self.conn, res.headers.get('location')) \
.set_retry_after(res.headers.get('retry-after'))
end = datetime.now() + timedelta(seconds=self.seconds)
self.assertIsNotNone(tm)
self.assertTrue(start <= tm.retry_after <= end)
def test_sleep_for(self):
self.conn.post.return_value.status_code = 202
self.conn.post.return_value.headers = self.res_headers2.copy()
self.conn.get.return_value.status_code = 202
self.conn.get.return_value.headers = self.res_headers2.copy()
self.conn.get.return_value.json.return_value = {}
start = datetime.now()
res = self.conn.post(path='fake/path', data=self.data.copy(),
headers=self.req_headers.copy())
tm = TaskMonitor(self.conn, res.headers.get('location')) \
.set_retry_after(res.headers.get('retry-after'))
self.assertIsNotNone(tm)
sleep_for = tm.sleep_for
elapsed = (datetime.now() - start).total_seconds()
self.assertTrue(self.seconds - elapsed <= sleep_for <= self.seconds)
def test_response(self):
self.conn.post.return_value.status_code = 202
self.conn.post.return_value.headers = self.res_headers1.copy()
self.conn.get.return_value.status_code = 201
self.conn.get.return_value.json.return_value = self.data.copy()
res = self.conn.post(path='fake/path', data=self.data.copy(),
headers=self.req_headers.copy())
tm = TaskMonitor(self.conn, res.headers.get('location')) \
.set_retry_after(res.headers.get('retry-after'))
self.assertIsNotNone(tm)
self.assertFalse(tm.in_progress)
response = tm.response
self.assertEqual(201, response.status_code)
self.assertEqual(self.data.copy(), response.json())

View File

@@ -48,35 +48,80 @@ class ConnectorMethodsTestCase(base.TestCase):
self.conn.get(path='fake/path', data=self.data.copy(),
headers=self.headers.copy())
mock__op.assert_called_once_with(mock.ANY, 'GET', 'fake/path',
data=self.data, headers=self.headers)
data=self.data, headers=self.headers,
blocking=False, timeout=60)
@mock.patch.object(connector.Connector, '_op', autospec=True)
def test_get_blocking(self, mock__op):
self.conn.get(path='fake/path', data=self.data.copy(),
headers=self.headers.copy(), blocking=True)
mock__op.assert_called_once_with(mock.ANY, 'GET', 'fake/path',
data=self.data, headers=self.headers,
blocking=True, timeout=60)
@mock.patch.object(connector.Connector, '_op', autospec=True)
def test_post(self, mock__op):
self.conn.post(path='fake/path', data=self.data.copy(),
headers=self.headers.copy())
mock__op.assert_called_once_with(mock.ANY, 'POST', 'fake/path',
data=self.data, headers=self.headers)
data=self.data, headers=self.headers,
blocking=False, timeout=60)
@mock.patch.object(connector.Connector, '_op', autospec=True)
def test_post_blocking(self, mock__op):
self.conn.post(path='fake/path', data=self.data.copy(),
headers=self.headers.copy(), blocking=True, timeout=120)
mock__op.assert_called_once_with(mock.ANY, 'POST', 'fake/path',
data=self.data, headers=self.headers,
blocking=True, timeout=120)
@mock.patch.object(connector.Connector, '_op', autospec=True)
def test_patch(self, mock__op):
self.conn.patch(path='fake/path', data=self.data.copy(),
headers=self.headers.copy())
mock__op.assert_called_once_with(mock.ANY, 'PATCH', 'fake/path',
data=self.data, headers=self.headers)
data=self.data, headers=self.headers,
blocking=False, timeout=60)
@mock.patch.object(connector.Connector, '_op', autospec=True)
def test_patch_blocking(self, mock__op):
self.conn.patch(path='fake/path', data=self.data.copy(),
headers=self.headers.copy(), blocking=True)
mock__op.assert_called_once_with(mock.ANY, 'PATCH', 'fake/path',
data=self.data, headers=self.headers,
blocking=True, timeout=60)
@mock.patch.object(connector.Connector, '_op', autospec=True)
def test_put(self, mock__op):
self.conn.put(path='fake/path', data=self.data.copy(),
headers=self.headers.copy())
mock__op.assert_called_once_with(mock.ANY, 'PUT', 'fake/path',
data=self.data, headers=self.headers)
data=self.data, headers=self.headers,
blocking=False, timeout=60)
@mock.patch.object(connector.Connector, '_op', autospec=True)
def test_put_blocking(self, mock__op):
self.conn.put(path='fake/path', data=self.data.copy(),
headers=self.headers.copy(), blocking=True)
mock__op.assert_called_once_with(mock.ANY, 'PUT', 'fake/path',
data=self.data, headers=self.headers,
blocking=True, timeout=60)
@mock.patch.object(connector.Connector, '_op', autospec=True)
def test_delete(self, mock__op):
self.conn.delete(path='fake/path', data=self.data.copy(),
headers=self.headers.copy())
mock__op.assert_called_once_with(mock.ANY, 'DELETE', 'fake/path',
data=self.data, headers=self.headers)
data=self.data, headers=self.headers,
blocking=False, timeout=60)
@mock.patch.object(connector.Connector, '_op', autospec=True)
def test_delete_blocking(self, mock__op):
self.conn.delete(path='fake/path', data=self.data.copy(),
headers=self.headers.copy(), blocking=True)
mock__op.assert_called_once_with(mock.ANY, 'DELETE', 'fake/path',
data=self.data, headers=self.headers,
blocking=True, timeout=60)
def test_set_auth(self):
mock_auth = mock.MagicMock()
@@ -286,3 +331,10 @@ class ConnectorOpTestCase(base.TestCase):
self.conn._op('GET', 'http://foo.bar')
exc = cm.exception
self.assertEqual(http_client.FORBIDDEN, exc.status_code)
def test_blocking_no_location_header(self):
self.request.return_value.status_code = http_client.ACCEPTED
self.request.return_value.headers = {'retry-after': 5}
with self.assertRaisesRegex(exceptions.ConnectionError,
'status 202, but no Location header'):
self.conn._op('POST', 'http://foo.bar', blocking=True)