Scenario: test with concurrent clients

Test load balancer with multiple client threads sending requests
concurrently.

Rename _check_connection() to _wait_for_http_service() to better
communicate what this method does.

Extend _send_requests() to accept an optional path argument.

Use screen instead of nohup to background test httpd in CirrOS
instances.

Remove duplicate initialization of server_ips from BaseTestCase.

Change-Id: Ia3fa2ee1977c630e70a13e069e383edfd81ecde8
This commit is contained in:
Dustin Lundquist 2016-10-11 21:08:23 -07:00
parent 451faa2534
commit 2a7c6f4b17

View File

@ -23,6 +23,7 @@ import shutil
import socket import socket
import subprocess import subprocess
import tempfile import tempfile
import threading
import time import time
from oslo_log import log as logging from oslo_log import log as logging
@ -67,7 +68,6 @@ class BaseTestCase(manager.NetworkScenarioTest):
self.port1 = 80 self.port1 = 80
self.port2 = 88 self.port2 = 88
self.num = 50 self.num = 50
self.server_ips = {}
self.server_fixed_ips = {} self.server_fixed_ips = {}
self._create_security_group_for_test() self._create_security_group_for_test()
@ -264,9 +264,8 @@ class BaseTestCase(manager.NetworkScenarioTest):
username, key.name) username, key.name)
# Start httpd # Start httpd
start_server = ('sudo sh -c "ulimit -n 100000; nohup ' start_server = ('sudo sh -c "ulimit -n 100000; screen -d -m '
'/dev/shm/httpd -id %(id)s ' '/dev/shm/httpd -id %(id)s -port %(port)s"')
'-port %(port)s &"')
cmd = start_server % {'id': server_name[-1], cmd = start_server % {'id': server_name[-1],
'port': self.port1} 'port': self.port1}
ssh_client.exec_command(cmd) ssh_client.exec_command(cmd)
@ -586,16 +585,16 @@ class BaseTestCase(manager.NetworkScenarioTest):
2. Check that the requests are shared between the two servers 2. Check that the requests are shared between the two servers
""" """
LOG.info(_('Checking load balancing...')) LOG.info(_('Checking load balancing...'))
self._check_connection(self.vip_ip) self._wait_for_http_service(self.vip_ip)
LOG.info(_('Connection to {vip} is valid').format(vip=self.vip_ip)) LOG.info(_('Connection to {vip} is valid').format(vip=self.vip_ip))
counters = self._send_requests(self.vip_ip, counters = self._send_concurrent_requests(self.vip_ip,
["server1", "server2"]) ["server1", "server2"])
for member, counter in six.iteritems(counters): for member, counter in six.iteritems(counters):
self.assertGreater(counter, 0, self.assertGreater(counter, 0,
'Member %s never balanced' % member) 'Member %s never balanced' % member)
LOG.info(_('Done checking load balancing...')) LOG.info(_('Done checking load balancing...'))
def _check_connection(self, check_ip, port=80): def _wait_for_http_service(self, check_ip, port=80):
def try_connect(check_ip, port): def try_connect(check_ip, port):
try: try:
LOG.info(('checking connection to ip: {0} port: {1}'.format( LOG.info(('checking connection to ip: {0} port: {1}'.format(
@ -619,11 +618,11 @@ class BaseTestCase(manager.NetworkScenarioTest):
message = "Timed out trying to connect to %s" % check_ip message = "Timed out trying to connect to %s" % check_ip
raise exceptions.TimeoutException(message) raise exceptions.TimeoutException(message)
def _send_requests(self, vip_ip, servers): def _send_requests(self, vip_ip, servers, path=''):
counters = dict.fromkeys(servers, 0) counters = dict.fromkeys(servers, 0)
for i in range(self.num): for i in range(self.num):
try: try:
server = urllib2.urlopen("http://{0}/".format(vip_ip), server = urllib2.urlopen("http://{0}/{1}".format(vip_ip, path),
None, 2).read() None, 2).read()
counters[server] += 1 counters[server] += 1
# HTTP exception means fail of server, so don't increase counter # HTTP exception means fail of server, so don't increase counter
@ -638,6 +637,49 @@ class BaseTestCase(manager.NetworkScenarioTest):
self.assertGreater(counted, 1) self.assertGreater(counted, 1)
return counters return counters
def _send_concurrent_requests(self, vip_ip, servers, path='', clients=5,
timeout=None):
class ClientThread(threading.Thread):
def __init__(self, test_case, cid, vip_ip, servers, path=''):
super(ClientThread, self).__init__(
name='ClientThread-{0}'.format(cid))
self.vip_ip = vip_ip
self.servers = servers
self.path = path
self.test_case = test_case
self.counters = dict.fromkeys(servers, 0)
def run(self):
# NOTE(dlundquist): _send_requests() does not mutate
# BaseTestCase so concurrent uses of _send_requests does not
# require a mutex.
self.counters = self.test_case._send_requests(self.vip_ip,
self.servers,
path=self.path)
def join(self, timeout=None):
start = time.time()
super(ClientThread, self).join(timeout)
return time.time() - start
client_threads = [ClientThread(self, i, vip_ip, servers, path=path)
for i in range(clients)]
for ct in client_threads:
ct.start()
if timeout is None:
# timeout for all client threads defaults to 400ms per request
timeout = self.num * 0.4
total_counters = dict.fromkeys(servers, 0)
for ct in client_threads:
timeout -= ct.join(timeout)
if timeout <= 0:
LOG.error("Client thread {0} timed out".format(ct.name))
return dict.fromkeys(servers, 0)
for server in servers:
total_counters[server] += ct.counters[server]
return total_counters
def _traffic_validation_after_stopping_server(self): def _traffic_validation_after_stopping_server(self):
"""Check that the requests are sent to the only ACTIVE server.""" """Check that the requests are sent to the only ACTIVE server."""
@ -668,7 +710,7 @@ class BaseTestCase(manager.NetworkScenarioTest):
that handled it the first time. that handled it the first time.
""" """
# Check that backends are reachable # Check that backends are reachable
self._check_connection(self.vip_ip) self._wait_for_http_service(self.vip_ip)
resp = [] resp = []
for count in range(10): for count in range(10):