Unit test for keepalive timeout
Create a unit test to verify client timeout for multiple requests Change-Id: I974e01cd2cb18f4ea87c3966dbf4b06bff22ed39
This commit is contained in:
@@ -512,6 +512,17 @@ def readuntil2crlfs(fd):
|
|||||||
return rv
|
return rv
|
||||||
|
|
||||||
|
|
||||||
|
def readlength(fd, size, timeout=1.0):
|
||||||
|
buf = b''
|
||||||
|
with eventlet.Timeout(timeout):
|
||||||
|
while len(buf) < size:
|
||||||
|
chunk = fd.read(min(64, size - len(buf)))
|
||||||
|
buf += chunk
|
||||||
|
if len(buf) >= size:
|
||||||
|
break
|
||||||
|
return buf
|
||||||
|
|
||||||
|
|
||||||
def connect_tcp(hostport):
|
def connect_tcp(hostport):
|
||||||
rv = socket.socket()
|
rv = socket.socket()
|
||||||
rv.connect(hostport)
|
rv.connect(hostport)
|
||||||
|
@@ -96,6 +96,7 @@ def setup_servers(the_object_server=object_server, extra_conf=None):
|
|||||||
'allow_versions': 't', 'node_timeout': 20}
|
'allow_versions': 't', 'node_timeout': 20}
|
||||||
if extra_conf:
|
if extra_conf:
|
||||||
conf.update(extra_conf)
|
conf.update(extra_conf)
|
||||||
|
context['conf'] = conf
|
||||||
prolis = listen_zero()
|
prolis = listen_zero()
|
||||||
acc1lis = listen_zero()
|
acc1lis = listen_zero()
|
||||||
acc2lis = listen_zero()
|
acc2lis = listen_zero()
|
||||||
|
@@ -22,6 +22,7 @@ import math
|
|||||||
import os
|
import os
|
||||||
import posix
|
import posix
|
||||||
import socket
|
import socket
|
||||||
|
import errno
|
||||||
import sys
|
import sys
|
||||||
import traceback
|
import traceback
|
||||||
import unittest
|
import unittest
|
||||||
@@ -54,9 +55,10 @@ from six.moves.urllib.parse import quote, parse_qsl
|
|||||||
from test import listen_zero
|
from test import listen_zero
|
||||||
from test.debug_logger import debug_logger
|
from test.debug_logger import debug_logger
|
||||||
from test.unit import (
|
from test.unit import (
|
||||||
connect_tcp, readuntil2crlfs, fake_http_connect, FakeRing, FakeMemcache,
|
connect_tcp, readuntil2crlfs, fake_http_connect, FakeRing,
|
||||||
patch_policies, write_fake_ring, mocked_http_conn, DEFAULT_TEST_EC_TYPE,
|
FakeMemcache, patch_policies, write_fake_ring, mocked_http_conn,
|
||||||
make_timestamp_iter, skip_if_no_xattrs, FakeHTTPResponse)
|
DEFAULT_TEST_EC_TYPE, make_timestamp_iter, skip_if_no_xattrs,
|
||||||
|
FakeHTTPResponse)
|
||||||
from test.unit.helpers import setup_servers, teardown_servers
|
from test.unit.helpers import setup_servers, teardown_servers
|
||||||
from swift.proxy import server as proxy_server
|
from swift.proxy import server as proxy_server
|
||||||
from swift.proxy.controllers.obj import ReplicatedObjectController
|
from swift.proxy.controllers.obj import ReplicatedObjectController
|
||||||
@@ -2412,9 +2414,9 @@ class BaseTestObjectController(object):
|
|||||||
if condition():
|
if condition():
|
||||||
break
|
break
|
||||||
|
|
||||||
def put_container(self, policy_name, container_name):
|
def put_container(self, policy_name, container_name, prolis=None):
|
||||||
# Note: only works if called with unpatched policies
|
# Note: only works if called with unpatched policies
|
||||||
prolis = _test_sockets[0]
|
prolis = prolis or _test_sockets[0]
|
||||||
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||||
fd = sock.makefile('rwb')
|
fd = sock.makefile('rwb')
|
||||||
fd.write(('PUT /v1/a/%s HTTP/1.1\r\n'
|
fd.write(('PUT /v1/a/%s HTTP/1.1\r\n'
|
||||||
@@ -7321,6 +7323,22 @@ class TestReplicatedObjectController(
|
|||||||
])
|
])
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def in_process_proxy(prosrv, **extra_server_kwargs):
|
||||||
|
server_kwargs = {
|
||||||
|
'protocol': SwiftHttpProtocol,
|
||||||
|
'capitalize_response_headers': False,
|
||||||
|
}
|
||||||
|
server_kwargs.update(extra_server_kwargs)
|
||||||
|
prolis = listen_zero()
|
||||||
|
try:
|
||||||
|
proxy_thread = spawn(wsgi.server, prolis, prosrv,
|
||||||
|
prosrv.logger, **server_kwargs)
|
||||||
|
yield prolis
|
||||||
|
finally:
|
||||||
|
proxy_thread.kill()
|
||||||
|
|
||||||
|
|
||||||
class BaseTestECObjectController(BaseTestObjectController):
|
class BaseTestECObjectController(BaseTestObjectController):
|
||||||
def test_PUT_ec(self):
|
def test_PUT_ec(self):
|
||||||
self.put_container(self.ec_policy.name, self.ec_policy.name)
|
self.put_container(self.ec_policy.name, self.ec_policy.name)
|
||||||
@@ -8177,6 +8195,73 @@ class BaseTestECObjectController(BaseTestObjectController):
|
|||||||
os.rename(self.ec_policy.object_ring.serialized_path + '.bak',
|
os.rename(self.ec_policy.object_ring.serialized_path + '.bak',
|
||||||
self.ec_policy.object_ring.serialized_path)
|
self.ec_policy.object_ring.serialized_path)
|
||||||
|
|
||||||
|
def test_GET_ec_pipeline(self):
|
||||||
|
conf = _test_context['conf']
|
||||||
|
conf['client_timeout'] = 0.1
|
||||||
|
prosrv = proxy_server.Application(conf, logger=debug_logger('proxy'))
|
||||||
|
with in_process_proxy(
|
||||||
|
prosrv, socket_timeout=conf['client_timeout']) as prolis:
|
||||||
|
self.put_container(self.ec_policy.name, self.ec_policy.name,
|
||||||
|
prolis=prolis)
|
||||||
|
|
||||||
|
obj = b'0123456' * 11 * 17
|
||||||
|
|
||||||
|
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||||
|
fd = sock.makefile('rwb')
|
||||||
|
fd.write(('PUT /v1/a/%s/go-get-it HTTP/1.1\r\n'
|
||||||
|
'Host: localhost\r\n'
|
||||||
|
'Content-Length: %d\r\n'
|
||||||
|
'X-Storage-Token: t\r\n'
|
||||||
|
'X-Object-Meta-Color: chartreuse\r\n'
|
||||||
|
'Content-Type: application/octet-stream\r\n'
|
||||||
|
'\r\n' % (
|
||||||
|
self.ec_policy.name,
|
||||||
|
len(obj),
|
||||||
|
)).encode('ascii'))
|
||||||
|
fd.write(obj)
|
||||||
|
fd.flush()
|
||||||
|
headers = readuntil2crlfs(fd)
|
||||||
|
exp = b'HTTP/1.1 201'
|
||||||
|
self.assertEqual(headers[:len(exp)], exp)
|
||||||
|
|
||||||
|
fd.write(('GET /v1/a/%s/go-get-it HTTP/1.1\r\n'
|
||||||
|
'Host: localhost\r\n'
|
||||||
|
'X-Storage-Token: t\r\n'
|
||||||
|
'\r\n' % self.ec_policy.name).encode('ascii'))
|
||||||
|
fd.flush()
|
||||||
|
headers = readuntil2crlfs(fd)
|
||||||
|
exp = b'HTTP/1.1 200'
|
||||||
|
self.assertEqual(headers[:len(exp)], exp)
|
||||||
|
for line in headers.splitlines():
|
||||||
|
if b'Content-Length' in line:
|
||||||
|
h, v = line.split()
|
||||||
|
content_length = int(v.strip())
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
self.fail("Didn't find content-length in %r" % (headers,))
|
||||||
|
|
||||||
|
gotten_obj = fd.read(content_length)
|
||||||
|
self.assertEqual(gotten_obj, obj)
|
||||||
|
|
||||||
|
sleep(0.3) # client_timeout should kick us off
|
||||||
|
|
||||||
|
fd.write(('GET /v1/a/%s/go-get-it HTTP/1.1\r\n'
|
||||||
|
'Host: localhost\r\n'
|
||||||
|
'X-Storage-Token: t\r\n'
|
||||||
|
'\r\n' % self.ec_policy.name).encode('ascii'))
|
||||||
|
fd.flush()
|
||||||
|
# makefile is a little weird, but this is disconnected
|
||||||
|
self.assertEqual(b'', fd.read())
|
||||||
|
# I expected this to raise a socket error
|
||||||
|
self.assertEqual(b'', sock.recv(1024))
|
||||||
|
# ... but we ARE disconnected
|
||||||
|
with self.assertRaises(socket.error) as caught:
|
||||||
|
sock.send(b'test')
|
||||||
|
self.assertEqual(caught.exception.errno, errno.EPIPE)
|
||||||
|
# and logging confirms we've timed out
|
||||||
|
last_debug_msg = prosrv.logger.get_lines_for_level('debug')[-1]
|
||||||
|
self.assertIn('timed out', last_debug_msg)
|
||||||
|
|
||||||
def test_ec_client_disconnect(self):
|
def test_ec_client_disconnect(self):
|
||||||
prolis = _test_sockets[0]
|
prolis = _test_sockets[0]
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user