Open-code eventlet.listen()
Recently out gate started blowing up intermittently with a strange case of ports mixed up. Sometimes a functional tests tries to authorize on a port that's clearly an object server port, and the like. As it turns out, eventlet developers added an unavoidable SO_REUSEPORT into listen(), which makes listen(("localhost",0) to reuse ports. There's an issue about it: https://github.com/eventlet/eventlet/issues/411 This patch is working around the problem while eventlet people consider the issue. Change-Id: I67522909f96495a6a30e1acdb79835dce2189549
This commit is contained in:
parent
cd712cd144
commit
5dfc3a75fb
@ -33,6 +33,8 @@ except ImportError:
|
||||
return result
|
||||
return result[:_MAX_LENGTH] + ' [truncated]...'
|
||||
|
||||
from eventlet.green import socket
|
||||
|
||||
# make unittests pass on all locale
|
||||
import swift
|
||||
setattr(swift, 'gettext_', lambda x: x)
|
||||
@ -71,3 +73,16 @@ def get_config(section_name=None, defaults=None):
|
||||
except ValueError as e:
|
||||
print(e)
|
||||
return config
|
||||
|
||||
|
||||
def listen_zero():
|
||||
"""
|
||||
The eventlet.listen() always sets SO_REUSEPORT, so when called with
|
||||
("localhost",0), instead of returning unique ports it can return the
|
||||
same port twice. That causes our tests to fail, so open-code it here
|
||||
without SO_REUSEPORT.
|
||||
"""
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.bind(("127.0.0.1", 0))
|
||||
sock.listen(50)
|
||||
return sock
|
||||
|
@ -40,7 +40,7 @@ from six.moves.http_client import HTTPException
|
||||
from swift.common.middleware.memcache import MemcacheMiddleware
|
||||
from swift.common.storage_policy import parse_storage_policies, PolicyError
|
||||
|
||||
from test import get_config
|
||||
from test import get_config, listen_zero
|
||||
from test.functional.swift_test_client import Account, Connection, Container, \
|
||||
ResponseError
|
||||
# This has the side effect of mocking out the xattr module so that unit tests
|
||||
@ -259,7 +259,7 @@ def _in_process_setup_ring(swift_conf, conf_src_dir, testdir):
|
||||
device = 'sd%c1' % chr(len(obj_sockets) + ord('a'))
|
||||
utils.mkdirs(os.path.join(_testdir, 'sda1'))
|
||||
utils.mkdirs(os.path.join(_testdir, 'sda1', 'tmp'))
|
||||
obj_socket = eventlet.listen(('localhost', 0))
|
||||
obj_socket = listen_zero()
|
||||
obj_sockets.append(obj_socket)
|
||||
dev['port'] = obj_socket.getsockname()[1]
|
||||
dev['ip'] = '127.0.0.1'
|
||||
@ -271,7 +271,7 @@ def _in_process_setup_ring(swift_conf, conf_src_dir, testdir):
|
||||
# make default test ring, 3 replicas, 4 partitions, 3 devices
|
||||
# which will work for a replication policy or a 2+1 EC policy
|
||||
_info('No source object ring file, creating 3rep/4part/3dev ring')
|
||||
obj_sockets = [eventlet.listen(('localhost', 0)) for _ in (0, 1, 2)]
|
||||
obj_sockets = [listen_zero() for _ in (0, 1, 2)]
|
||||
replica2part2dev_id = [[0, 1, 2, 0],
|
||||
[1, 2, 0, 1],
|
||||
[2, 0, 1, 2]]
|
||||
@ -462,7 +462,7 @@ def in_process_setup(the_object_server=object_server):
|
||||
# We create the proxy server listening socket to get its port number so
|
||||
# that we can add it as the "auth_port" value for the functional test
|
||||
# clients.
|
||||
prolis = eventlet.listen(('localhost', 0))
|
||||
prolis = listen_zero()
|
||||
_test_socks.append(prolis)
|
||||
|
||||
# The following set of configuration values is used both for the
|
||||
@ -519,10 +519,10 @@ def in_process_setup(the_object_server=object_server):
|
||||
config['object_post_as_copy'] = str(object_post_as_copy)
|
||||
_debug('Setting object_post_as_copy to %r' % object_post_as_copy)
|
||||
|
||||
acc1lis = eventlet.listen(('localhost', 0))
|
||||
acc2lis = eventlet.listen(('localhost', 0))
|
||||
con1lis = eventlet.listen(('localhost', 0))
|
||||
con2lis = eventlet.listen(('localhost', 0))
|
||||
acc1lis = listen_zero()
|
||||
acc2lis = listen_zero()
|
||||
con1lis = listen_zero()
|
||||
con2lis = listen_zero()
|
||||
_test_socks += [acc1lis, acc2lis, con1lis, con2lis] + obj_sockets
|
||||
|
||||
account_ring_path = os.path.join(_testdir, 'account.ring.gz')
|
||||
|
@ -19,10 +19,12 @@ import unittest
|
||||
|
||||
import socket
|
||||
|
||||
from eventlet import spawn, Timeout, listen
|
||||
from eventlet import spawn, Timeout
|
||||
|
||||
from swift.common import bufferedhttp
|
||||
|
||||
from test import listen_zero
|
||||
|
||||
|
||||
class MockHTTPSConnection(object):
|
||||
|
||||
@ -45,7 +47,7 @@ class MockHTTPSConnection(object):
|
||||
class TestBufferedHTTP(unittest.TestCase):
|
||||
|
||||
def test_http_connect(self):
|
||||
bindsock = listen(('127.0.0.1', 0))
|
||||
bindsock = listen_zero()
|
||||
|
||||
def accept(expected_par):
|
||||
try:
|
||||
|
@ -23,7 +23,6 @@ import os
|
||||
from textwrap import dedent
|
||||
from collections import defaultdict
|
||||
|
||||
from eventlet import listen
|
||||
import six
|
||||
from six import BytesIO
|
||||
from six import StringIO
|
||||
@ -45,6 +44,7 @@ from swift.common.swob import Request
|
||||
from swift.common import wsgi, utils
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
||||
from test import listen_zero
|
||||
from test.unit import (
|
||||
temptree, with_tempdir, write_fake_ring, patch_policies, FakeLogger)
|
||||
|
||||
@ -384,7 +384,7 @@ class TestWSGI(unittest.TestCase):
|
||||
with mock.patch('swift.common.wsgi.inspect'):
|
||||
conf = wsgi.appconfig(conf_file)
|
||||
logger = logging.getLogger('test')
|
||||
sock = listen(('localhost', 0))
|
||||
sock = listen_zero()
|
||||
wsgi.run_server(conf, logger, sock)
|
||||
self.assertEqual('HTTP/1.0',
|
||||
_wsgi.HttpProtocol.default_request_version)
|
||||
@ -434,7 +434,7 @@ class TestWSGI(unittest.TestCase):
|
||||
getargspec=argspec_stub):
|
||||
conf = wsgi.appconfig(conf_file)
|
||||
logger = logging.getLogger('test')
|
||||
sock = listen(('localhost', 0))
|
||||
sock = listen_zero()
|
||||
wsgi.run_server(conf, logger, sock)
|
||||
|
||||
self.assertTrue(_wsgi.server.called)
|
||||
@ -472,7 +472,7 @@ class TestWSGI(unittest.TestCase):
|
||||
with mock.patch('time.tzset') as mock_tzset:
|
||||
conf = wsgi.appconfig(conf_dir)
|
||||
logger = logging.getLogger('test')
|
||||
sock = listen(('localhost', 0))
|
||||
sock = listen_zero()
|
||||
wsgi.run_server(conf, logger, sock)
|
||||
self.assertEqual(os.environ['TZ'], 'UTC+0')
|
||||
self.assertEqual(mock_tzset.mock_calls,
|
||||
@ -529,7 +529,7 @@ class TestWSGI(unittest.TestCase):
|
||||
with mock.patch('swift.common.wsgi.eventlet') as _eventlet:
|
||||
conf = wsgi.appconfig(conf_file)
|
||||
logger = logging.getLogger('test')
|
||||
sock = listen(('localhost', 0))
|
||||
sock = listen_zero()
|
||||
wsgi.run_server(conf, logger, sock)
|
||||
self.assertEqual('HTTP/1.0',
|
||||
_wsgi.HttpProtocol.default_request_version)
|
||||
|
@ -27,7 +27,7 @@ from xml.dom import minidom
|
||||
import time
|
||||
import random
|
||||
|
||||
from eventlet import spawn, Timeout, listen
|
||||
from eventlet import spawn, Timeout
|
||||
import json
|
||||
import six
|
||||
from six import BytesIO
|
||||
@ -45,6 +45,7 @@ from test.unit import fake_http_connect, debug_logger
|
||||
from swift.common.storage_policy import (POLICIES, StoragePolicy)
|
||||
from swift.common.request_helpers import get_sys_meta_prefix
|
||||
|
||||
from test import listen_zero
|
||||
from test.unit import patch_policies
|
||||
|
||||
|
||||
@ -1023,7 +1024,7 @@ class TestContainerController(unittest.TestCase):
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
|
||||
def test_account_update_account_override_deleted(self):
|
||||
bindsock = listen(('127.0.0.1', 0))
|
||||
bindsock = listen_zero()
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c',
|
||||
environ={'REQUEST_METHOD': 'PUT',
|
||||
@ -1041,7 +1042,7 @@ class TestContainerController(unittest.TestCase):
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
def test_PUT_account_update(self):
|
||||
bindsock = listen(('127.0.0.1', 0))
|
||||
bindsock = listen_zero()
|
||||
|
||||
def accept(return_code, expected_timestamp):
|
||||
try:
|
||||
@ -1901,7 +1902,7 @@ class TestContainerController(unittest.TestCase):
|
||||
self.assertEqual(obj['last_modified'], t9.isoformat)
|
||||
|
||||
def test_DELETE_account_update(self):
|
||||
bindsock = listen(('127.0.0.1', 0))
|
||||
bindsock = listen_zero()
|
||||
|
||||
def accept(return_code, expected_timestamp):
|
||||
try:
|
||||
|
@ -23,7 +23,7 @@ from shutil import rmtree
|
||||
from tempfile import mkdtemp
|
||||
from test.unit import FakeLogger
|
||||
|
||||
from eventlet import spawn, Timeout, listen
|
||||
from eventlet import spawn, Timeout
|
||||
|
||||
from swift.common import utils
|
||||
from swift.container import updater as container_updater
|
||||
@ -31,6 +31,8 @@ from swift.container.backend import ContainerBroker, DATADIR
|
||||
from swift.common.ring import RingData
|
||||
from swift.common.utils import normalize_timestamp
|
||||
|
||||
from test import listen_zero
|
||||
|
||||
|
||||
class TestContainerUpdater(unittest.TestCase):
|
||||
|
||||
@ -164,7 +166,7 @@ class TestContainerUpdater(unittest.TestCase):
|
||||
traceback.print_exc()
|
||||
return err
|
||||
return None
|
||||
bindsock = listen(('127.0.0.1', 0))
|
||||
bindsock = listen_zero()
|
||||
|
||||
def spawn_accepts():
|
||||
events = []
|
||||
@ -236,7 +238,7 @@ class TestContainerUpdater(unittest.TestCase):
|
||||
return err
|
||||
return None
|
||||
|
||||
bindsock = listen(('127.0.0.1', 0))
|
||||
bindsock = listen_zero()
|
||||
|
||||
def spawn_accepts():
|
||||
events = []
|
||||
|
@ -27,7 +27,7 @@ from tempfile import mkdtemp
|
||||
import time
|
||||
|
||||
|
||||
from eventlet import listen, spawn, wsgi
|
||||
from eventlet import spawn, wsgi
|
||||
import mock
|
||||
from shutil import rmtree
|
||||
import six.moves.cPickle as pickle
|
||||
@ -45,6 +45,7 @@ from swift.obj import server as object_server
|
||||
from swift.proxy import server as proxy_server
|
||||
import swift.proxy.controllers.obj
|
||||
|
||||
from test import listen_zero
|
||||
from test.unit import write_fake_ring, DEFAULT_TEST_EC_TYPE, debug_logger, \
|
||||
connect_tcp, readuntil2crlfs
|
||||
|
||||
@ -93,17 +94,17 @@ def setup_servers(the_object_server=object_server, extra_conf=None):
|
||||
'allow_versions': 't'}
|
||||
if extra_conf:
|
||||
conf.update(extra_conf)
|
||||
prolis = listen(('localhost', 0))
|
||||
acc1lis = listen(('localhost', 0))
|
||||
acc2lis = listen(('localhost', 0))
|
||||
con1lis = listen(('localhost', 0))
|
||||
con2lis = listen(('localhost', 0))
|
||||
obj1lis = listen(('localhost', 0))
|
||||
obj2lis = listen(('localhost', 0))
|
||||
obj3lis = listen(('localhost', 0))
|
||||
obj4lis = listen(('localhost', 0))
|
||||
obj5lis = listen(('localhost', 0))
|
||||
obj6lis = listen(('localhost', 0))
|
||||
prolis = listen_zero()
|
||||
acc1lis = listen_zero()
|
||||
acc2lis = listen_zero()
|
||||
con1lis = listen_zero()
|
||||
con2lis = listen_zero()
|
||||
obj1lis = listen_zero()
|
||||
obj2lis = listen_zero()
|
||||
obj3lis = listen_zero()
|
||||
obj4lis = listen_zero()
|
||||
obj5lis = listen_zero()
|
||||
obj6lis = listen_zero()
|
||||
objsocks = [obj1lis, obj2lis, obj3lis, obj4lis, obj5lis, obj6lis]
|
||||
context["test_sockets"] = \
|
||||
(prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis, obj2lis, obj3lis,
|
||||
|
@ -36,13 +36,14 @@ from collections import defaultdict
|
||||
from contextlib import contextmanager
|
||||
from textwrap import dedent
|
||||
|
||||
from eventlet import sleep, spawn, wsgi, listen, Timeout, tpool, greenthread
|
||||
from eventlet import sleep, spawn, wsgi, Timeout, tpool, greenthread
|
||||
from eventlet.green import httplib
|
||||
|
||||
from nose import SkipTest
|
||||
|
||||
from swift import __version__ as swift_version
|
||||
from swift.common.http import is_success
|
||||
from test import listen_zero
|
||||
from test.unit import FakeLogger, debug_logger, mocked_http_conn, \
|
||||
make_timestamp_iter, DEFAULT_TEST_EC_TYPE
|
||||
from test.unit import connect_tcp, readuntil2crlfs, patch_policies, \
|
||||
@ -4365,7 +4366,7 @@ class TestObjectController(unittest.TestCase):
|
||||
self.assertEqual(outbuf.getvalue()[:4], '405 ')
|
||||
|
||||
def test_chunked_put(self):
|
||||
listener = listen(('localhost', 0))
|
||||
listener = listen_zero()
|
||||
port = listener.getsockname()[1]
|
||||
killer = spawn(wsgi.server, listener, self.object_controller,
|
||||
NullLogger())
|
||||
@ -4394,7 +4395,7 @@ class TestObjectController(unittest.TestCase):
|
||||
killer.kill()
|
||||
|
||||
def test_chunked_content_length_mismatch_zero(self):
|
||||
listener = listen(('localhost', 0))
|
||||
listener = listen_zero()
|
||||
port = listener.getsockname()[1]
|
||||
killer = spawn(wsgi.server, listener, self.object_controller,
|
||||
NullLogger())
|
||||
@ -6877,7 +6878,7 @@ class TestObjectServer(unittest.TestCase):
|
||||
self.logger = debug_logger('test-object-server')
|
||||
self.app = object_server.ObjectController(
|
||||
self.conf, logger=self.logger)
|
||||
sock = listen(('127.0.0.1', 0))
|
||||
sock = listen_zero()
|
||||
self.server = spawn(wsgi.server, sock, self.app, utils.NullLogger())
|
||||
self.port = sock.getsockname()[1]
|
||||
|
||||
@ -7566,7 +7567,7 @@ class TestZeroCopy(unittest.TestCase):
|
||||
self.df_mgr = diskfile.DiskFileManager(
|
||||
conf, self.object_controller.logger)
|
||||
|
||||
listener = listen(('localhost', 0))
|
||||
listener = listen_zero()
|
||||
port = listener.getsockname()[1]
|
||||
self.wsgi_greenlet = spawn(
|
||||
wsgi.server, listener, self.object_controller, NullLogger())
|
||||
|
@ -33,6 +33,7 @@ from swift.obj.reconstructor import RebuildingECDiskFileStream, \
|
||||
ObjectReconstructor
|
||||
from swift.obj.replicator import ObjectReplicator
|
||||
|
||||
from test import listen_zero
|
||||
from test.unit import patch_policies, debug_logger, encode_frag_archive_bodies
|
||||
from test.unit.obj.common import BaseTest
|
||||
|
||||
@ -60,7 +61,7 @@ class TestBaseSsync(BaseTest):
|
||||
self.ts_iter = (Timestamp(t)
|
||||
for t in itertools.count(int(time.time())))
|
||||
self.rx_ip = '127.0.0.1'
|
||||
sock = eventlet.listen((self.rx_ip, 0))
|
||||
sock = listen_zero()
|
||||
self.rx_server = eventlet.spawn(
|
||||
eventlet.wsgi.server, sock, self.rx_controller, self.rx_logger)
|
||||
self.rx_port = sock.getsockname()[1]
|
||||
|
@ -33,7 +33,7 @@ from swift.obj import server
|
||||
from swift.obj import ssync_receiver, ssync_sender
|
||||
from swift.obj.reconstructor import ObjectReconstructor
|
||||
|
||||
from test import unit
|
||||
from test import listen_zero, unit
|
||||
from test.unit import debug_logger, patch_policies, make_timestamp_iter
|
||||
|
||||
|
||||
@ -1933,7 +1933,6 @@ class TestSsyncRxServer(unittest.TestCase):
|
||||
# server socket.
|
||||
|
||||
def setUp(self):
|
||||
self.rx_ip = '127.0.0.1'
|
||||
# dirs
|
||||
self.tmpdir = tempfile.mkdtemp()
|
||||
self.tempdir = os.path.join(self.tmpdir, 'tmp_test_obj_server')
|
||||
@ -1948,7 +1947,8 @@ class TestSsyncRxServer(unittest.TestCase):
|
||||
}
|
||||
self.rx_logger = debug_logger('test-object-server')
|
||||
rx_server = server.ObjectController(self.conf, logger=self.rx_logger)
|
||||
self.sock = eventlet.listen((self.rx_ip, 0))
|
||||
self.rx_ip = '127.0.0.1'
|
||||
self.sock = listen_zero()
|
||||
self.rx_server = eventlet.spawn(
|
||||
eventlet.wsgi.server, self.sock, rx_server, utils.NullLogger())
|
||||
self.rx_port = self.sock.getsockname()[1]
|
||||
|
@ -23,11 +23,13 @@ from contextlib import closing
|
||||
from gzip import GzipFile
|
||||
from tempfile import mkdtemp
|
||||
from shutil import rmtree
|
||||
from test import listen_zero
|
||||
from test.unit import FakeLogger, make_timestamp_iter
|
||||
from test.unit import debug_logger, patch_policies, mocked_http_conn
|
||||
from time import time
|
||||
from distutils.dir_util import mkpath
|
||||
|
||||
from eventlet import spawn, Timeout, listen
|
||||
from eventlet import spawn, Timeout
|
||||
|
||||
from swift.obj import updater as object_updater
|
||||
from swift.obj.diskfile import (ASYNCDIR_BASE, get_async_dir, DiskFileManager,
|
||||
@ -37,7 +39,6 @@ from swift.common import utils
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
from swift.common.utils import hash_path, normalize_timestamp, mkdirs, \
|
||||
write_pickle
|
||||
from test.unit import debug_logger, patch_policies, mocked_http_conn
|
||||
from swift.common.storage_policy import StoragePolicy, POLICIES
|
||||
|
||||
|
||||
@ -303,7 +304,7 @@ class TestObjectUpdater(unittest.TestCase):
|
||||
{'failures': 1, 'unlinks': 1})
|
||||
self.assertIsNone(pickle.load(open(op_path)).get('successes'))
|
||||
|
||||
bindsock = listen(('127.0.0.1', 0))
|
||||
bindsock = listen_zero()
|
||||
|
||||
def accepter(sock, return_code):
|
||||
try:
|
||||
|
@ -42,21 +42,19 @@ from collections import defaultdict
|
||||
import uuid
|
||||
|
||||
import mock
|
||||
from eventlet import sleep, spawn, wsgi, listen, Timeout, debug
|
||||
from eventlet import sleep, spawn, wsgi, Timeout, debug
|
||||
from eventlet.green import httplib
|
||||
from six import BytesIO
|
||||
from six import StringIO
|
||||
from six.moves import range
|
||||
from six.moves.urllib.parse import quote
|
||||
|
||||
from swift.common.utils import hash_path, storage_directory, \
|
||||
parse_content_type, parse_mime_headers, \
|
||||
iter_multipart_mime_documents, public
|
||||
|
||||
from test import listen_zero
|
||||
from test.unit import (
|
||||
connect_tcp, readuntil2crlfs, FakeLogger, fake_http_connect, FakeRing,
|
||||
FakeMemcache, debug_logger, patch_policies, write_fake_ring,
|
||||
mocked_http_conn, DEFAULT_TEST_EC_TYPE, make_timestamp_iter)
|
||||
from test.unit.helpers import setup_servers, teardown_servers
|
||||
from swift.proxy import server as proxy_server
|
||||
from swift.proxy.controllers.obj import ReplicatedObjectController
|
||||
from swift.obj import server as object_server
|
||||
@ -66,7 +64,9 @@ from swift.common.middleware.acl import parse_acl, format_acl
|
||||
from swift.common.exceptions import ChunkReadTimeout, DiskFileNotExist, \
|
||||
APIVersionError, ChunkWriteTimeout
|
||||
from swift.common import utils, constraints
|
||||
from swift.common.utils import mkdirs, NullLogger
|
||||
from swift.common.utils import hash_path, storage_directory, \
|
||||
parse_content_type, parse_mime_headers, \
|
||||
iter_multipart_mime_documents, public, mkdirs, NullLogger
|
||||
from swift.common.wsgi import monkey_patch_mimetools, loadapp
|
||||
from swift.proxy.controllers import base as proxy_base
|
||||
from swift.proxy.controllers.base import get_cache_key, cors_validation, \
|
||||
@ -80,8 +80,6 @@ from swift.common.storage_policy import StoragePolicy, POLICIES
|
||||
import swift.common.request_helpers
|
||||
from swift.common.request_helpers import get_sys_meta_prefix
|
||||
|
||||
from test.unit.helpers import setup_servers, teardown_servers
|
||||
|
||||
# mocks
|
||||
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
|
||||
|
||||
@ -8786,7 +8784,7 @@ class TestSocketObjectVersions(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
global _test_sockets
|
||||
self.prolis = prolis = listen(('localhost', 0))
|
||||
self.prolis = prolis = listen_zero()
|
||||
self._orig_prolis = _test_sockets[0]
|
||||
allowed_headers = ', '.join([
|
||||
'content-encoding',
|
||||
|
Loading…
Reference in New Issue
Block a user