neutron-tempest-plugin/neutron_tempest_plugin/scenario/test_multicast.py

306 lines
12 KiB
Python

# Copyright 2018 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from neutron_lib import constants
from oslo_log import log
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from neutron_tempest_plugin.common import ssh
from neutron_tempest_plugin.common import utils
from neutron_tempest_plugin import config
from neutron_tempest_plugin import exceptions
from neutron_tempest_plugin.scenario import base
CONF = config.CONF
LOG = log.getLogger(__name__)
PYTHON3_BIN = "python3"
def get_receiver_script(group, port, hello_message, ack_message, result_file):
return """
import socket
import struct
import sys
multicast_group = '%(group)s'
server_address = ('', %(port)s)
# Create the socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
# Bind to the server address
sock.bind(server_address)
# Tell the operating system to add the socket to the multicast group
# on all interfaces.
group = socket.inet_aton(multicast_group)
mreq = struct.pack('4sL', group, socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
# Receive/respond loop
with open('%(result_file)s', 'w') as f:
f.write('%(hello_message)s')
f.flush()
data, address = sock.recvfrom(1024)
f.write('received ' + str(len(data)) + ' bytes from ' + str(address))
f.write(str(data))
sock.sendto(b'%(ack_message)s', address)
""" % {'group': group,
'port': port,
'hello_message': hello_message,
'ack_message': ack_message,
'result_file': result_file}
def get_sender_script(group, port, message, result_file):
return """
import socket
import sys
message = b'%(message)s'
multicast_group = ('%(group)s', %(port)s)
# Create the datagram socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
# Set the time-to-live for messages to 1 so they do not go past the
# local network segment.
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
# Set a timeout so the socket does not block indefinitely when trying
# to receive data.
sock.settimeout(1)
with open('%(result_file)s', 'w') as f:
try:
# Send data to the multicast group
sent = sock.sendto(message, multicast_group)
# Look for responses from all recipients
while True:
try:
data, server = sock.recvfrom(1024)
except socket.timeout:
f.write('timed out, no more responses')
break
else:
f.write('received reply ' + str(data) + ' from ' + str(server))
finally:
sys.stdout.write('closing socket')
sock.close()
""" % {'group': group,
'port': port,
'message': message,
'result_file': result_file}
class BaseMulticastTest(object):
credentials = ['primary']
force_tenant_isolation = False
# Import configuration options
available_type_drivers = (
CONF.neutron_plugin_options.available_type_drivers)
hello_message = "I am waiting..."
multicast_port = 5007
multicast_message = "Big Bang"
receiver_output_file = "/tmp/receiver_mcast_out"
sender_output_file = "/tmp/sender_mcast_out"
@classmethod
def skip_checks(cls):
super(BaseMulticastTest, cls).skip_checks()
advanced_image_available = (
CONF.neutron_plugin_options.advanced_image_ref or
CONF.neutron_plugin_options.default_image_is_advanced)
if not advanced_image_available:
skip_reason = "This test require advanced tools for this test"
raise cls.skipException(skip_reason)
@classmethod
def resource_setup(cls):
super(BaseMulticastTest, cls).resource_setup()
if CONF.neutron_plugin_options.default_image_is_advanced:
cls.flavor_ref = CONF.compute.flavor_ref
cls.image_ref = CONF.compute.image_ref
cls.username = CONF.validation.image_ssh_user
else:
cls.flavor_ref = (
CONF.neutron_plugin_options.advanced_image_flavor_ref)
cls.image_ref = CONF.neutron_plugin_options.advanced_image_ref
cls.username = CONF.neutron_plugin_options.advanced_image_ssh_user
# setup basic topology for servers we can log into it
cls.network = cls.create_network()
cls.subnet = cls.create_subnet(cls.network)
cls.router = cls.create_router_by_client()
cls.create_router_interface(cls.router['id'], cls.subnet['id'])
cls.keypair = cls.create_keypair()
cls.secgroup = cls.os_primary.network_client.create_security_group(
name='secgroup_mtu')
cls.security_groups.append(cls.secgroup['security_group'])
cls.create_loginable_secgroup_rule(
secgroup_id=cls.secgroup['security_group']['id'])
cls.create_pingable_secgroup_rule(
secgroup_id=cls.secgroup['security_group']['id'])
# Create security group rule for UDP (multicast traffic)
cls.create_secgroup_rules(
rule_list=[dict(protocol=constants.PROTO_NAME_UDP,
direction=constants.INGRESS_DIRECTION,
remote_ip_prefix=cls.any_addresses,
ethertype=cls.ethertype)],
secgroup_id=cls.secgroup['security_group']['id'])
# Multicast IP range to be used for multicast group IP asignement
if '-' in cls.multicast_group_range:
multicast_group_range = netaddr.IPRange(
*cls.multicast_group_range.split('-'))
else:
multicast_group_range = netaddr.IPNetwork(
cls.multicast_group_range)
cls.multicast_group_iter = iter(multicast_group_range)
def _create_server(self):
name = data_utils.rand_name("multicast-server")
server = self.create_server(
flavor_ref=self.flavor_ref,
image_ref=self.image_ref,
key_name=self.keypair['name'], name=name,
networks=[{'uuid': self.network['id']}],
security_groups=[{'name': self.secgroup['security_group']['name']}]
)['server']
self.wait_for_server_active(server)
port = self.client.list_ports(
network_id=self.network['id'], device_id=server['id'])['ports'][0]
server['fip'] = self.create_floatingip(port=port)
server['ssh_client'] = ssh.Client(server['fip']['floating_ip_address'],
self.username,
pkey=self.keypair['private_key'])
self._check_python_installed_on_server(server['ssh_client'],
server['id'])
return server
def _check_python_installed_on_server(self, ssh_client, server_id):
try:
ssh_client.execute_script('which %s' % PYTHON3_BIN)
except exceptions.SSHScriptFailed:
raise self.skipException(
"%s is not available on server %s" % (PYTHON3_BIN, server_id))
def _prepare_sender(self, server, mcast_address):
check_script = get_sender_script(
group=mcast_address, port=self.multicast_port,
message=self.multicast_message,
result_file=self.sender_output_file)
server['ssh_client'].execute_script(
'echo "%s" > ~/multicast_traffic_sender.py' % check_script)
def _prepare_receiver(self, server, mcast_address):
check_script = get_receiver_script(
group=mcast_address, port=self.multicast_port,
hello_message=self.hello_message, ack_message=server['id'],
result_file=self.receiver_output_file)
ssh_client = ssh.Client(
server['fip']['floating_ip_address'],
self.username,
pkey=self.keypair['private_key'])
self._check_python_installed_on_server(ssh_client, server['id'])
server['ssh_client'].execute_script(
'echo "%s" > ~/multicast_traffic_receiver.py' % check_script)
@utils.unstable_test("bug 1850288")
@decorators.idempotent_id('113486fc-24c9-4be4-8361-03b1c9892867')
def test_multicast_between_vms_on_same_network(self):
"""Test multicast messaging between two servers on the same network
[Sender server] -> (Multicast network) -> [Receiver server]
"""
sender = self._create_server()
receivers = [self._create_server() for _ in range(1)]
# Sender can be also receiver of multicast traffic
receivers.append(sender)
self._check_multicast_conectivity(sender=sender, receivers=receivers)
def _check_multicast_conectivity(self, sender, receivers):
"""Test multi-cast messaging between two servers
[Sender server] -> ... some network topology ... -> [Receiver server]
"""
mcast_address = next(self.multicast_group_iter)
LOG.debug("Multicast group address: %s", mcast_address)
def _message_received(client, msg, file_path):
result = client.execute_script(
"cat {path} || echo '{path} not exists yet'".format(
path=file_path))
return msg in result
self._prepare_sender(sender, mcast_address)
receiver_ids = []
for receiver in receivers:
self._prepare_receiver(receiver, mcast_address)
receiver['ssh_client'].execute_script(
"%s ~/multicast_traffic_receiver.py &" % PYTHON3_BIN,
shell="bash")
utils.wait_until_true(
lambda: _message_received(
receiver['ssh_client'], self.hello_message,
self.receiver_output_file),
exception=RuntimeError(
"Receiver script didn't start properly on server "
"{!r}.".format(receiver['id'])))
receiver_ids.append(receiver['id'])
# Now lets run scripts on sender
sender['ssh_client'].execute_script(
"%s ~/multicast_traffic_sender.py" % PYTHON3_BIN)
# And check if message was received
for receiver in receivers:
utils.wait_until_true(
lambda: _message_received(
receiver['ssh_client'], self.multicast_message,
self.receiver_output_file),
exception=RuntimeError(
"Receiver {!r} didn't get multicast message".format(
receiver['id'])))
# TODO(slaweq): add validation of answears on sended server
replies_result = sender['ssh_client'].execute_script(
"cat {path} || echo '{path} not exists yet'".format(
path=self.sender_output_file))
for receiver_id in receiver_ids:
self.assertIn(receiver_id, replies_result)
class MulticastTestIPv4(BaseMulticastTest, base.BaseTempestTestCase):
# Import configuration options
multicast_group_range = CONF.neutron_plugin_options.multicast_group_range
# IP version specific parameters
_ip_version = constants.IP_VERSION_4
any_addresses = constants.IPv4_ANY