EC: don't 503 on marginally-successful PUT
On EC PUT in an M+K scheme, we require M+1 fragment archives to durably land on disk. If we get that, then we go ahead and ask the object servers to "commit" the object by writing out .durable files. We only require 2 of those. When we got exactly M+1 fragment archives on disk, and then one connection timed out while writing .durable files, we should still be okay (provided M is at least 3). However, we'd take our M > 2 remaining successful responses and pass that off to best_response() with a quorum size of M+1, thus getting a 503 even though everything worked well enough. Now we pass 2 to best_response() to avoid that false negative. There was also a spot where we were getting the quorum size wrong. If we wrote out 3 fragment archives for a 2+1 policy, we were only requiring 2 successful backend PUTs. That's wrong; the right number is 3, which is what the policy's .quorum() method says. There was a spot where the right number wasn't getting plumbed through, but it is now. Change-Id: Ic658a199e952558db329268f4d7b4009f47c6d03 Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> Closes-Bug: 1452468
This commit is contained in:
parent
55dd705a86
commit
666bf06c26
@ -1197,16 +1197,18 @@ class Controller(object):
|
||||
"""
|
||||
return quorum_size(n)
|
||||
|
||||
def have_quorum(self, statuses, node_count):
|
||||
def have_quorum(self, statuses, node_count, quorum=None):
|
||||
"""
|
||||
Given a list of statuses from several requests, determine if
|
||||
a quorum response can already be decided.
|
||||
|
||||
:param statuses: list of statuses returned
|
||||
:param node_count: number of nodes being queried (basically ring count)
|
||||
:param quorum: number of statuses required for quorum
|
||||
:returns: True or False, depending on if quorum is established
|
||||
"""
|
||||
quorum = self._quorum_size(node_count)
|
||||
if quorum is None:
|
||||
quorum = self._quorum_size(node_count)
|
||||
if len(statuses) >= quorum:
|
||||
for hundred in (HTTP_CONTINUE, HTTP_OK, HTTP_MULTIPLE_CHOICES,
|
||||
HTTP_BAD_REQUEST):
|
||||
|
@ -2171,7 +2171,7 @@ class ECObjectController(BaseObjectController):
|
||||
else:
|
||||
# intermediate response phase - set return value to true only
|
||||
# if there are enough 100-continue acknowledgements
|
||||
if self.have_quorum(statuses, num_nodes):
|
||||
if self.have_quorum(statuses, num_nodes, quorum=min_responses):
|
||||
quorum = True
|
||||
|
||||
return statuses, reasons, bodies, etags, quorum
|
||||
@ -2203,12 +2203,17 @@ class ECObjectController(BaseObjectController):
|
||||
nodes, min_conns, etag_hasher)
|
||||
final_phase = True
|
||||
need_quorum = False
|
||||
min_resp = 2
|
||||
# The .durable file will propagate in a replicated fashion; if
|
||||
# one exists, the reconstructor will spread it around. Thus, we
|
||||
# don't require as many .durable files to be successfully
|
||||
# written as we do fragment archives in order to call the PUT a
|
||||
# success.
|
||||
min_conns = 2
|
||||
putters = [p for p in putters if not p.failed]
|
||||
# ignore response etags, and quorum boolean
|
||||
statuses, reasons, bodies, _etags, _quorum = \
|
||||
self._get_put_responses(req, putters, len(nodes),
|
||||
final_phase, min_resp,
|
||||
final_phase, min_conns,
|
||||
need_quorum=need_quorum)
|
||||
except HTTPException as resp:
|
||||
return resp
|
||||
|
@ -704,6 +704,74 @@ def mock(update):
|
||||
delattr(module, attr)
|
||||
|
||||
|
||||
class FakeStatus(object):
|
||||
"""
|
||||
This will work with our fake_http_connect, if you hand in one of these
|
||||
instead of a status int or status int tuple to the "codes" iter you can
|
||||
add some eventlet sleep to the expect and response stages of the
|
||||
connection.
|
||||
"""
|
||||
|
||||
def __init__(self, status, expect_sleep=None, response_sleep=None):
|
||||
"""
|
||||
:param status: the response status int, or a tuple of
|
||||
([expect_status, ...], response_status)
|
||||
:param expect_sleep: float, time to eventlet sleep during expect, can
|
||||
be a iter of floats
|
||||
:param response_sleep: float, time to eventlet sleep during response
|
||||
"""
|
||||
# connect exception
|
||||
if isinstance(status, (Exception, eventlet.Timeout)):
|
||||
raise status
|
||||
if isinstance(status, tuple):
|
||||
self.expect_status = list(status[:-1])
|
||||
self.status = status[-1]
|
||||
self.explicit_expect_list = True
|
||||
else:
|
||||
self.expect_status, self.status = ([], status)
|
||||
self.explicit_expect_list = False
|
||||
if not self.expect_status:
|
||||
# when a swift backend service returns a status before reading
|
||||
# from the body (mostly an error response) eventlet.wsgi will
|
||||
# respond with that status line immediately instead of 100
|
||||
# Continue, even if the client sent the Expect 100 header.
|
||||
# BufferedHttp and the proxy both see these error statuses
|
||||
# when they call getexpect, so our FakeConn tries to act like
|
||||
# our backend services and return certain types of responses
|
||||
# as expect statuses just like a real backend server would do.
|
||||
if self.status in (507, 412, 409):
|
||||
self.expect_status = [status]
|
||||
else:
|
||||
self.expect_status = [100, 100]
|
||||
|
||||
# setup sleep attributes
|
||||
if not isinstance(expect_sleep, (list, tuple)):
|
||||
expect_sleep = [expect_sleep] * len(self.expect_status)
|
||||
self.expect_sleep_list = list(expect_sleep)
|
||||
while len(self.expect_sleep_list) < len(self.expect_status):
|
||||
self.expect_sleep_list.append(None)
|
||||
self.response_sleep = response_sleep
|
||||
|
||||
def get_response_status(self):
|
||||
if self.response_sleep is not None:
|
||||
eventlet.sleep(self.response_sleep)
|
||||
if self.expect_status and self.explicit_expect_list:
|
||||
raise Exception('Test did not consume all fake '
|
||||
'expect status: %r' % (self.expect_status,))
|
||||
if isinstance(self.status, (Exception, eventlet.Timeout)):
|
||||
raise self.status
|
||||
return self.status
|
||||
|
||||
def get_expect_status(self):
|
||||
expect_sleep = self.expect_sleep_list.pop(0)
|
||||
if expect_sleep is not None:
|
||||
eventlet.sleep(expect_sleep)
|
||||
expect_status = self.expect_status.pop(0)
|
||||
if isinstance(expect_status, (Exception, eventlet.Timeout)):
|
||||
raise expect_status
|
||||
return expect_status
|
||||
|
||||
|
||||
class SlowBody(object):
|
||||
"""
|
||||
This will work with our fake_http_connect, if you hand in these
|
||||
@ -741,29 +809,9 @@ def fake_http_connect(*code_iter, **kwargs):
|
||||
def __init__(self, status, etag=None, body='', timestamp='1',
|
||||
headers=None, expect_headers=None, connection_id=None,
|
||||
give_send=None):
|
||||
# connect exception
|
||||
if isinstance(status, (Exception, eventlet.Timeout)):
|
||||
raise status
|
||||
if isinstance(status, tuple):
|
||||
self.expect_status = list(status[:-1])
|
||||
self.status = status[-1]
|
||||
self.explicit_expect_list = True
|
||||
else:
|
||||
self.expect_status, self.status = ([], status)
|
||||
self.explicit_expect_list = False
|
||||
if not self.expect_status:
|
||||
# when a swift backend service returns a status before reading
|
||||
# from the body (mostly an error response) eventlet.wsgi will
|
||||
# respond with that status line immediately instead of 100
|
||||
# Continue, even if the client sent the Expect 100 header.
|
||||
# BufferedHttp and the proxy both see these error statuses
|
||||
# when they call getexpect, so our FakeConn tries to act like
|
||||
# our backend services and return certain types of responses
|
||||
# as expect statuses just like a real backend server would do.
|
||||
if self.status in (507, 412, 409):
|
||||
self.expect_status = [status]
|
||||
else:
|
||||
self.expect_status = [100, 100]
|
||||
if not isinstance(status, FakeStatus):
|
||||
status = FakeStatus(status)
|
||||
self._status = status
|
||||
self.reason = 'Fake'
|
||||
self.host = '1.2.3.4'
|
||||
self.port = '1234'
|
||||
@ -785,11 +833,6 @@ def fake_http_connect(*code_iter, **kwargs):
|
||||
eventlet.sleep()
|
||||
|
||||
def getresponse(self):
|
||||
if self.expect_status and self.explicit_expect_list:
|
||||
raise Exception('Test did not consume all fake '
|
||||
'expect status: %r' % (self.expect_status,))
|
||||
if isinstance(self.status, (Exception, eventlet.Timeout)):
|
||||
raise self.status
|
||||
exc = kwargs.get('raise_exc')
|
||||
if exc:
|
||||
if isinstance(exc, (Exception, eventlet.Timeout)):
|
||||
@ -797,16 +840,17 @@ def fake_http_connect(*code_iter, **kwargs):
|
||||
raise Exception('test')
|
||||
if kwargs.get('raise_timeout_exc'):
|
||||
raise eventlet.Timeout()
|
||||
self.status = self._status.get_response_status()
|
||||
return self
|
||||
|
||||
def getexpect(self):
|
||||
expect_status = self.expect_status.pop(0)
|
||||
if isinstance(self.expect_status, (Exception, eventlet.Timeout)):
|
||||
raise self.expect_status
|
||||
expect_status = self._status.get_expect_status()
|
||||
headers = dict(self.expect_headers)
|
||||
if expect_status == 409:
|
||||
headers['X-Backend-Timestamp'] = self.timestamp
|
||||
return FakeConn(expect_status, headers=headers)
|
||||
response = FakeConn(expect_status, headers=headers)
|
||||
response.status = expect_status
|
||||
return response
|
||||
|
||||
def getheaders(self):
|
||||
etag = self.etag
|
||||
|
@ -35,7 +35,7 @@ from swift.proxy.controllers.base import get_info as _real_get_info
|
||||
from swift.common.storage_policy import POLICIES, ECDriverError
|
||||
|
||||
from test.unit import FakeRing, FakeMemcache, fake_http_connect, \
|
||||
debug_logger, patch_policies, SlowBody
|
||||
debug_logger, patch_policies, SlowBody, FakeStatus
|
||||
from test.unit.proxy.test_server import node_error_count
|
||||
|
||||
|
||||
@ -1406,6 +1406,35 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase):
|
||||
self.assertEqual(1, len(error_lines))
|
||||
self.assertTrue('retrying' in error_lines[0])
|
||||
|
||||
def test_PUT_with_slow_commits(self):
|
||||
# It's important that this timeout be much less than the delay in
|
||||
# the slow commit responses so that the slow commits are not waited
|
||||
# for.
|
||||
self.app.post_quorum_timeout = 0.01
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT',
|
||||
body='')
|
||||
# plenty of slow commits
|
||||
response_sleep = 5.0
|
||||
codes = [FakeStatus(201, response_sleep=response_sleep)
|
||||
for i in range(self.replicas())]
|
||||
# swap out some with regular fast responses
|
||||
number_of_fast_responses_needed_to_be_quick_enough = 2
|
||||
fast_indexes = random.sample(
|
||||
xrange(self.replicas()),
|
||||
number_of_fast_responses_needed_to_be_quick_enough)
|
||||
for i in fast_indexes:
|
||||
codes[i] = 201
|
||||
expect_headers = {
|
||||
'X-Obj-Metadata-Footer': 'yes',
|
||||
'X-Obj-Multiphase-Commit': 'yes'
|
||||
}
|
||||
with set_http_connect(*codes, expect_headers=expect_headers):
|
||||
start = time.time()
|
||||
resp = req.get_response(self.app)
|
||||
response_time = time.time() - start
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
self.assertTrue(response_time < response_sleep)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -6217,7 +6217,8 @@ class TestECMismatchedFA(unittest.TestCase):
|
||||
# pyeclib has checks for unequal-length; we don't want to trip those
|
||||
self.assertEqual(len(obj1), len(obj2))
|
||||
|
||||
# Servers obj1 and obj2 will have the first version of the object
|
||||
# Server obj1 will have the first version of the object (obj2 also
|
||||
# gets it, but that gets stepped on later)
|
||||
prosrv._error_limiting = {}
|
||||
with nested(
|
||||
mock.patch.object(obj3srv, 'PUT', bad_disk),
|
||||
@ -6227,18 +6228,13 @@ class TestECMismatchedFA(unittest.TestCase):
|
||||
resp = put_req1.get_response(prosrv)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
# Server obj3 (and, in real life, some handoffs) will have the
|
||||
# second version of the object.
|
||||
# Servers obj2 and obj3 will have the second version of the object.
|
||||
prosrv._error_limiting = {}
|
||||
with nested(
|
||||
mock.patch.object(obj1srv, 'PUT', bad_disk),
|
||||
mock.patch.object(obj2srv, 'PUT', bad_disk),
|
||||
mock.patch(
|
||||
'swift.common.storage_policy.ECStoragePolicy.quorum'),
|
||||
mock.patch(
|
||||
'swift.proxy.controllers.base.Controller._quorum_size',
|
||||
lambda *a, **kw: 1)):
|
||||
type(ec_policy).quorum = mock.PropertyMock(return_value=1)
|
||||
'swift.common.storage_policy.ECStoragePolicy.quorum')):
|
||||
type(ec_policy).quorum = mock.PropertyMock(return_value=2)
|
||||
resp = put_req2.get_response(prosrv)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
@ -6258,10 +6254,10 @@ class TestECMismatchedFA(unittest.TestCase):
|
||||
environ={"REQUEST_METHOD": "GET"},
|
||||
headers={"X-Auth-Token": "t"})
|
||||
prosrv._error_limiting = {}
|
||||
with mock.patch.object(obj3srv, 'GET', bad_disk):
|
||||
with mock.patch.object(obj1srv, 'GET', bad_disk):
|
||||
resp = get_req.get_response(prosrv)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.body, obj1)
|
||||
self.assertEqual(resp.body, obj2)
|
||||
|
||||
# A GET that sees 2 mismatching FAs will fail
|
||||
get_req = Request.blank("/v1/a/ec-crazytown/obj",
|
||||
|
Loading…
Reference in New Issue
Block a user