Add test_security_group_without_rules

Also fixes some errors in security group + police tests

Change-Id: I5b97bd54cd62d788798becabafff29acf1c2a29f
This commit is contained in:
Georgy Dyuldin 2017-02-03 14:14:37 +03:00
parent 012a1515a7
commit 5b7985568c
6 changed files with 119 additions and 18 deletions

View File

@ -14,11 +14,14 @@ from hamcrest import is_
from stepler.third_party import ping
from stepler.third_party import waiter
CONNECTED = 'CONNECTED'
def check_connection_status(ip,
remote,
port=22,
udp=False,
answer=CONNECTED,
must_available=True,
timeout=0):
"""Check TCP/UDP port connection availability (or not).
@ -47,15 +50,22 @@ def check_connection_status(ip,
def _predicate():
result = remote.execute(
'echo "QUIT" | nc {proto} -w1 {ip} {port}'.format(
ip=ip, port=port, proto=proto_flag))
return waiter.expect_that(result.exit_code == 0, is_(must_available))
'echo "" | nc {proto} -w1 {ip} {port} | grep {answer}'.format(
ip=ip, port=port, proto=proto_flag, answer=answer))
msg = ("Expected that {proto} connection for {port}"
" will be {status}").format(
proto='UDP' if udp else 'TCP',
port=port,
status='available' if must_available else 'unavailable')
return waiter.expect_that(result.exit_code == 0,
is_(must_available), msg)
return waiter.wait(_predicate, timeout_seconds=timeout, sleep_seconds=2)
def check_icmp_connection_status(ip, remote, must_available=True, timeout=0):
"""Check that icmp connection to ip is `must_available`."""
def predicate():
ping_result = ping.Pinger(ip, remote=remote).ping(count=3)
if must_available:
@ -65,3 +75,18 @@ def check_icmp_connection_status(ip, remote, must_available=True, timeout=0):
return waiter.expect_that(value, is_(0))
return waiter.wait(predicate, timeout_seconds=timeout)
def start_port_listener(server_ssh, port, udp=False, answer=CONNECTED):
"""Start background netcat listener on remote server.
Note:
Netcat call syntax is valid only for cirros.
"""
proto = '-u' if udp else ''
listener_cmd = 'nc {proto} -l -p {port} -e echo "{answer}"'.format(
proto=proto, port=port, answer=answer)
loop_cmd = "while true; do {}; done".format(listener_cmd)
server_ssh.background_call(loop_cmd)

View File

@ -185,7 +185,7 @@ def test_outbound_traffic_without_egress_rule(
server_ssh,
must_available=False,
timeout=settings.SECURITY_GROUP_APPLY_TIMEOUT)
# Add ingress rule
# Add egress rule
client_sg_entries.add_policy_rule(policy.POLICY_RULE_ALLOW_EGRESS_ICMP)
client_sg.security_group_entries = client_sg_entries
contrail_api_client.security_group_update(client_sg)
@ -193,3 +193,84 @@ def test_outbound_traffic_without_egress_rule(
fixed_ip,
server_ssh,
timeout=settings.SECURITY_GROUP_APPLY_TIMEOUT)
def test_security_group_without_rules(connectivity_test_resources,
server_steps, floating_ip_steps,
contrail_api_client, nova_client):
"""Verify that security group without rules deny any traffic.
Steps:
#. Create 2 security groups
#. Create network with subnet
#. Boot 2 nova instances (client and server) in network
#. Add Floating IP to client
#. Add security group to client with allow all rules
#. Add security group to server without rules
#. Check that there are no success pings from client to server
#. Add egress ICMP rule to client's security group
#. Check that there are success pings from client to server
"""
TCP_PORT = 7000
UDP_PORT = 7001
client, server = connectivity_test_resources.servers
client_sg, server_sg = connectivity_test_resources.security_groups
(client_floating_ip,
server_floating_ip) = connectivity_test_resources.floating_ips
client_sg_entries = client_sg.security_group_entries
server_sg_entries = server_sg.security_group_entries
# Start server listeners
with server_steps.get_server_ssh(
server, server_floating_ip['floating_ip_address']) as server_ssh:
connectivity.start_port_listener(server_ssh, TCP_PORT)
connectivity.start_port_listener(server_ssh, UDP_PORT, udp=True)
# Remove server floating ip
floating_ip_steps.detach_floating_ip(server_floating_ip)
# Setup client and server security groups
server_sg.security_group_entries = None
contrail_api_client.security_group_update(server_sg)
client_sg_entries.add_policy_rule(policy.POLICY_RULE_ALLOW_EGRESS_ALL)
client_sg_entries.add_policy_rule(policy.POLICY_RULE_ALLOW_INGRESS_ALL)
client_sg.security_group_entries = client_sg_entries
contrail_api_client.security_group_update(client_sg)
# Add security group to client
nova_client.servers.add_security_group(client, client_sg.name)
# Remove all and add clear security group server
for security_group in server.security_groups:
nova_client.servers.remove_security_group(server,
security_group['name'])
nova_client.servers.add_security_group(server, server_sg.name)
fixed_ip = server_steps.get_fixed_ip(server)
with server_steps.get_server_ssh(
client,
ip=client_floating_ip['floating_ip_address']) as server_ssh:
# Check no icmp traffic
connectivity.check_icmp_connection_status(
fixed_ip, server_ssh, must_available=False)
# Check no tcp traffic
connectivity.check_connection_status(
fixed_ip, server_ssh, port=TCP_PORT, must_available=False)
# Check no udp traffic
connectivity.check_connection_status(
fixed_ip,
server_ssh,
port=UDP_PORT,
udp=True,
must_available=False)
# Add ingress rule
server_sg_entries.add_policy_rule(
policy.POLICY_RULE_ALLOW_INGRESS_ICMP)
server_sg.security_group_entries = server_sg_entries
contrail_api_client.security_group_update(server_sg)
connectivity.check_icmp_connection_status(
fixed_ip,
server_ssh,
timeout=settings.SECURITY_GROUP_APPLY_TIMEOUT)

View File

@ -13,7 +13,6 @@
import functools
import attrdict
from hamcrest import assert_that, equal_to # noqa H301
import pytest
from stepler import config as stepler_config
from stepler.third_party import utils
@ -56,9 +55,9 @@ UDP_PORT = 7001
check_tcp = functools.partial(
connectivity.check_connection_status, port=TCP_PORT)
check_tcp_ssh = functools.partial(
connectivity.check_connection_status, port=TCP_SSH_PORT)
connectivity.check_connection_status, port=TCP_SSH_PORT, answer='SSH')
check_udp = functools.partial(
connectivity.check_connection_status, port=UDP_PORT)
connectivity.check_connection_status, port=UDP_PORT, udp=True)
check_icmp = connectivity.check_icmp_connection_status
tcp_all_policy = policy.make_policy_entry(
@ -129,13 +128,11 @@ def connectivity_test_resources(
floating_ips.append(floating_ip)
server, client = servers
# Start listeners
tcp_server_cmd = 'nc -v -l -p {0} -e echo Reply'.format(TCP_PORT)
udp_server_cmd = 'nc -v -u -l -p {0} -e echo Reply'.format(UDP_PORT)
# Start listeners
with server_steps.get_server_ssh(server) as server_ssh:
server_ssh.background_call(tcp_server_cmd)
server_ssh.background_call(udp_server_cmd)
connectivity.start_port_listener(server_ssh, TCP_PORT)
connectivity.start_port_listener(server_ssh, UDP_PORT, udp=True)
# Detach 1st server floating IP
floating_ip_steps.detach_floating_ip(floating_ips[0])
@ -159,7 +156,7 @@ def connectivity_test_resources(
(SG_RULES['tcp_all'] + SG_RULES['udp_all'], {
check_tcp: True,
check_tcp_ssh: True,
check_udp: False,
check_udp: True,
check_icmp: False
}),
],

View File

@ -76,9 +76,7 @@ def test_vrouter_uve_xmpp_connections(session, client_contrail_analytics,
assert actual_connections_count == expected_connection_count
def test_peer_count_in_bgp_router_uve(client_contrail_analytics,
contrail_services_http_introspect_ports,
nodes_ips):
def test_peer_count_in_bgp_router_uve(client_contrail_analytics, nodes_ips):
# count of xmpp peer and bgp peer verification in bgp-router uve
contrail_computes_fqdns = settings.CONTRAIL_ROLES_DISTRIBUTION[
settings.ROLE_CONTRAIL_COMPUTE]

View File

@ -17,7 +17,7 @@ from stepler.third_party import utils
def test_subnet_creating_with_custom_ipam(
contrail_api_client, contrail_ipam, contrail_network,
contrail_ipam, contrail_network,
contrail_create_subnet, cirros_image, flavor, security_group,
server_steps, port_steps, create_floating_ip, public_network):
"""Check creating subnet with custom IPAM.

View File

@ -165,7 +165,7 @@ def test_create_and_terminate_networks(contrail_api_client, network_steps):
# Create new network
net_name, = utils.generate_ids()
new_network = network_steps.create(name)
new_network = network_steps.create(net_name)
# Check that created network is present in Contrail
contrail_networks = contrail_api_client.virtual_networks_list()