Add timeout for requests

Bandit emits errors for request methods without the timeout parameter.
It's better to follow the instruction to avoid hanging.

Added timeout parameters and config options to set timeout.

[1] https://bandit.readthedocs.io/en/1.7.5/plugins/b113_request_without_timeout.html

Change-Id: I0c022c3cc57f30530ebdef6e434753ece2bdf912
(cherry picked from commit 92cdf8a0a5)
This commit is contained in:
Hiromu Asahina 2023-03-18 00:35:47 +09:00 committed by Grzegorz Grasza
parent 404b44320f
commit b34fcefd3c
3 changed files with 31 additions and 16 deletions

View File

@ -44,6 +44,8 @@ keystone_ec2_opts = [
'CAs.'),
cfg.BoolOpt('insecure', default=False,
help='Disable SSL certificate verification.'),
cfg.IntOpt('timeout', default=60,
help='Timeout to obtain token.'),
]
CONF = cfg.CONF
@ -172,9 +174,10 @@ class EC2Token(object):
elif CONF.keystone_ec2_token.certfile:
cert = CONF.keystone_ec2_token.certfile
response = requests.request('POST', CONF.keystone_ec2_token.url,
data=creds_json, headers=headers,
verify=verify, cert=cert)
response = requests.post(CONF.keystone_ec2_token.url,
data=creds_json, headers=headers,
verify=verify, cert=cert,
timeout=CONF.keystone_ec2_token.timeout)
# NOTE(vish): We could save a call to keystone by
# having keystone return token, tenant,

View File

@ -33,12 +33,21 @@ This WSGI component:
import webob
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils import strutils
import requests
import six
s3_opts = [
cfg.IntOpt('timeout', default=60,
help='Timeout to obtain token.'),
]
CONF = cfg.CONF
CONF.register_opts(s3_opts, group='s3_token')
PROTOCOL_NAME = 'S3 Token Authentication'
@ -113,7 +122,8 @@ class S3Token(object):
try:
response = requests.post('%s/v2.0/s3tokens' % self._request_uri,
headers=headers, data=creds_json,
verify=self._verify)
verify=self._verify,
timeout=CONF.s3_token.timeout)
except requests.exceptions.RequestException as e:
self._logger.info('HTTP connection exception: %s', e)
resp = self._deny_request('InvalidURI')

View File

@ -73,7 +73,7 @@ class EC2TokenMiddlewareTestBase(utils.TestCase):
class EC2TokenMiddlewareTestGood(EC2TokenMiddlewareTestBase):
@mock.patch.object(
requests, 'request',
requests, 'post',
return_value=FakeResponse(EMPTY_RESPONSE, status_code=200))
def test_protocol_old_versions(self, mock_request):
req = webob.Request.blank('/test')
@ -85,9 +85,9 @@ class EC2TokenMiddlewareTestGood(EC2TokenMiddlewareTestBase):
self.assertEqual(TOKEN_ID, req.headers['X-Auth-Token'])
mock_request.assert_called_with(
'POST', 'http://localhost:5000/v3/ec2tokens',
'http://localhost:5000/v3/ec2tokens',
data=mock.ANY, headers={'Content-Type': 'application/json'},
verify=True, cert=None)
verify=True, cert=None, timeout=mock.ANY)
data = jsonutils.loads(mock_request.call_args[1]['data'])
expected_data = {
@ -104,7 +104,7 @@ class EC2TokenMiddlewareTestGood(EC2TokenMiddlewareTestBase):
self.assertDictEqual(expected_data, data)
@mock.patch.object(
requests, 'request',
requests, 'post',
return_value=FakeResponse(EMPTY_RESPONSE, status_code=200))
def test_protocol_v4(self, mock_request):
req = webob.Request.blank('/test')
@ -120,9 +120,9 @@ class EC2TokenMiddlewareTestGood(EC2TokenMiddlewareTestBase):
self.assertEqual(TOKEN_ID, req.headers['X-Auth-Token'])
mock_request.assert_called_with(
'POST', 'http://localhost:5000/v3/ec2tokens',
'http://localhost:5000/v3/ec2tokens',
data=mock.ANY, headers={'Content-Type': 'application/json'},
verify=True, cert=None)
verify=True, cert=None, timeout=mock.ANY)
data = jsonutils.loads(mock_request.call_args[1]['data'])
expected_data = {
@ -155,7 +155,7 @@ class EC2TokenMiddlewareTestBad(EC2TokenMiddlewareTestBase):
self._validate_ec2_error(resp, 400, 'AuthFailure')
@mock.patch.object(requests,
'request',
'post',
return_value=FakeResponse(EMPTY_RESPONSE))
def test_communication_failure(self, mock_request):
req = webob.Request.blank('/test')
@ -163,12 +163,13 @@ class EC2TokenMiddlewareTestBad(EC2TokenMiddlewareTestBase):
req.GET['AWSAccessKeyId'] = 'test-key-id'
resp = req.get_response(self.middleware)
self._validate_ec2_error(resp, 400, 'AuthFailure')
mock_request.assert_called_with('POST', mock.ANY,
mock_request.assert_called_with(mock.ANY,
data=mock.ANY, headers=mock.ANY,
verify=mock.ANY, cert=mock.ANY)
verify=mock.ANY, cert=mock.ANY,
timeout=mock.ANY)
@mock.patch.object(requests,
'request',
'post',
return_value=FakeResponse(EMPTY_RESPONSE))
def test_no_result_data(self, mock_request):
req = webob.Request.blank('/test')
@ -176,6 +177,7 @@ class EC2TokenMiddlewareTestBad(EC2TokenMiddlewareTestBase):
req.GET['AWSAccessKeyId'] = 'test-key-id'
resp = req.get_response(self.middleware)
self._validate_ec2_error(resp, 400, 'AuthFailure')
mock_request.assert_called_with('POST', mock.ANY,
mock_request.assert_called_with(mock.ANY,
data=mock.ANY, headers=mock.ANY,
verify=mock.ANY, cert=mock.ANY)
verify=mock.ANY, cert=mock.ANY,
timeout=mock.ANY)