Update service functions to support UDP

This patch extends 'create_setup_for_service_test' and
'assert_backend_amount' functions to support both UDP
and TCP services.

In addition this patch also verifies that all members
appears in K8S-Endpoints are responsive.

The tests for UDP services will be covered in a follow-up
patches.

Change-Id: I1969ecc4fda361c890ff70e12fa1be0232ded1d4
This commit is contained in:
Yossi Boaron 2018-10-21 13:43:47 +03:00
parent 9032555f79
commit 6e23790eec
2 changed files with 122 additions and 24 deletions

View File

@ -13,8 +13,10 @@
# limitations under the License.
import six.moves
from functools import partial
import json
from multiprocessing import pool
import socket
import time
from oslo_log import log as logging
@ -348,25 +350,84 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest):
raise lib_exc.ServerFault()
@classmethod
def wait_service_status(cls, service_ip, timeout_period):
session = requests.Session()
def _verify_connectivity(cls, dest_ip, timeout_period, protocol, port):
udp_client_sock = None
def verify_tcp(dest_ip, port, session):
try:
session.get("http://{0}:{1}".format(dest_ip, port),
timeout=2)
except Exception:
return False
return True
def verify_udp(dest_ip, port, udp_client_sock):
udp_client_sock.sendto("Hi Server, howRU?", (dest_ip, port))
try:
udp_client_sock.recvfrom(512)
except socket.timeout:
return False
return True
if protocol == "TCP":
session = requests.Session()
iter_func = partial(verify_tcp, session=session)
elif protocol == "UDP":
udp_client_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_client_sock.settimeout(2.0)
iter_func = partial(verify_udp, udp_client_sock=udp_client_sock)
else:
LOG.warning("Unsupported protocol %s, returning", protocol)
return False
start = time.time()
while time.time() - start < timeout_period:
try:
time.sleep(5)
session.get("http://{0}".format(service_ip), timeout=2)
time.sleep(5)
if iter_func(dest_ip, port):
return True
LOG.warning('No initial traffic is passing through.')
LOG.error("Can't connect to %s:%d", dest_ip, port)
return False
return
except Exception:
LOG.warning('No initial traffic is passing through.')
time.sleep(5)
LOG.error(
"Traffic didn't pass within the period of %s" % timeout_period)
raise lib_exc.ServerFault()
@classmethod
def wait_service_status(cls, service_ip, timeout_period,
protocol="TCP", port=80):
if cls._verify_connectivity(service_ip, timeout_period,
protocol, port):
LOG.info('Service responding...')
else:
LOG.error("Can't connect service's IP %s", service_ip)
raise lib_exc.ServerFault()
@classmethod
def wait_ep_members_status(cls, ep_name, namespace, timeout_period):
num_of_be = 0
ep = cls.k8s_client.CoreV1Api().read_namespaced_endpoints(
ep_name, namespace)
try:
subset = ep.subsets[0]
subset_ports = subset.ports[0]
for subset_address in subset.addresses:
num_of_be += 1
LOG.info('Verifying connectivity for EP backend: %s:%d; '
'prot=%s', subset_address.ip, subset_ports.port,
subset_ports.protocol)
if cls._verify_connectivity(subset_address.ip, timeout_period,
subset_ports.protocol,
subset_ports.port):
LOG.info('EP member %s responding...', subset_address.ip)
else:
LOG.error("Can't connect to EP member %s",
subset_address.ip)
raise lib_exc.ServerFault()
except Exception:
return 0
return num_of_be
@classmethod
def create_setup_for_service_test(cls, pod_num=2, spec_type="ClusterIP",
protocol="TCP", label=None,
protocol="TCP", port=80,
target_port=8080, label=None,
namespace="default", get_ip=True,
service_name=None):
@ -379,14 +440,21 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest):
cls.pod_num = pod_num
service_name, service_obj = cls.create_service(
pod_label=pod.metadata.labels, spec_type=spec_type,
protocol=protocol, namespace=namespace, service_name=service_name)
protocol=protocol, port=port, target_port=target_port,
namespace=namespace, service_name=service_name)
if get_ip:
cls.service_ip = cls.get_service_ip(
service_name, spec_type=spec_type, namespace=namespace)
cls.verify_lbaas_endpoints_configured(service_name, pod_num)
cls.service_name = service_name
cls.wait_service_status(
cls.service_ip, CONF.kuryr_kubernetes.lb_build_timeout)
cls.wait_service_status(cls.service_ip,
CONF.kuryr_kubernetes.lb_build_timeout,
protocol, port)
if pod_num != cls.wait_ep_members_status(
cls.service_name, namespace,
CONF.kuryr_kubernetes.lb_build_timeout):
LOG.error("Actual EP backend num != pod_num")
raise lib_exc.ServerFault()
cls.addClassResourceCleanup(cls.delete_service, service_name,
namespace=namespace)
@ -462,20 +530,52 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest):
'Number of exclusive responses is incorrect. '
'Got %s.' % cmd_outputs)
def assert_backend_amount(self, url, amount, headers=None,
repetitions=100, threads=8, request_timeout=5):
def req():
def assert_backend_amount(self, server_ip, amount, server_port=None,
protocol="TCP", headers=None, repetitions=100,
threads=8, request_timeout=5):
def req_tcp():
resp = requests.get(url, headers=headers)
self.assertEqual(requests.codes.OK, resp.status_code,
'Non-successful request to {}'.format(url))
return resp
def req_udp():
# FIXME(yboaron): Current Octavia implementation doesn't
# round-robin UDP pool as expected, to work-around that
# a new socket (new local UDP port) is allocated per request.
udp_client_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_client_sock.settimeout(3.0)
udp_client_sock.sendto("Hi Server, howRU?",
(server_ip, server_port))
try:
data, addr = udp_client_sock.recvfrom(1024)
except Exception:
# NOTE(yboaron): for UDP (unlike TCP) not getting reply from
# the server is a valid use case.
return None
return data
def pred(tester, responses):
unique_resps = set(resp.content for resp in responses)
if protocol == 'TCP':
unique_resps = set(resp.content for resp in responses)
else:
unique_resps = set(resp for resp in responses if resp
is not None)
tester.assertEqual(amount, len(unique_resps),
'Incorrect amount of unique backends. '
'Got {}'.format(unique_resps))
if protocol == 'TCP':
url = 'http://{}'.format(server_ip)
req = req_tcp
elif protocol == "UDP":
self.assertIsNotNone(server_port, "server_port must be "
"provided for UDP protocol")
req = req_udp
else:
LOG.info("Unsupported protocol %s, returning", protocol)
return
self._run_threaded_and_assert(req, pred, repetitions=repetitions,
threads=threads,
fn_timeout=request_timeout)

View File

@ -39,8 +39,7 @@ class TestServiceScenario(base.BaseKuryrScenarioTest):
@decorators.idempotent_id('bddf5441-1244-449d-a125-b5fdcfc1a1a9')
def test_service_curl(self):
LOG.info("Trying to curl the service IP %s" % self.service_ip)
self.assert_backend_amount('http://{}'.format(self.service_ip),
self.pod_num)
self.assert_backend_amount(self.service_ip, self.pod_num)
@decorators.idempotent_id('bddf5441-1244-449d-a125-b5fdcfa1a7a9')
def test_pod_service_curl(self):
@ -71,8 +70,7 @@ class TestLoadBalancerServiceScenario(base.BaseKuryrScenarioTest):
def test_lb_service_http(self):
LOG.info("Trying to curl the service IP %s" % self.service_ip)
self.assert_backend_amount('http://{}'.format(self.service_ip),
self.pod_num)
self.assert_backend_amount(self.service_ip, self.pod_num)
# TODO(yboaron): Use multi threads for 'test_vm_service_http' test
@decorators.idempotent_id('bddf5441-1244-449d-a125-b5fdcfa1b5a9')