fixed ips connectivity, multiple, and negative test cases added

Change-Id: I2d7fc36247db5e6edb21d23c07046d20a876a804
This commit is contained in:
Basavaraj Lamani 2018-09-25 18:27:43 +05:30
parent e8b03ca925
commit 1e1793559e
3 changed files with 609 additions and 0 deletions

View File

@ -0,0 +1,246 @@
"""
Copyright 2018 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import re
from cafe.drivers.unittest.decorators import tags
from cafe.engine.clients.ping import PingClient
from cloudcafe.auth.provider import AuthProvider
from cloudcafe.compute.config import ComputeAdminEndpointConfig, \
ComputeAdminUserConfig, ComputeAdminAuthConfig
from cloudcafe.compute.servers_api.client import ServersClient
from cloudcafe.networking.networks.personas import ServerPersona
from cloudcafe.networking.networks.common.tools.connectivity import \
Connectivity
from cloudroast.networking.networks.fixtures import NetworkingComputeFixture
from cloudroast.networking.networks.scenario.common import ScenarioMixin
class TestFixedIPsConnectivity(NetworkingComputeFixture, ScenarioMixin):
"""
Testing connectivity between servers by adding fixed ips to existing
servers.
"""
NAMES_PREFIX = 'fixed_ips_connectivity'
PRIVATE_KEY_PATH = '/root/pkey'
MAX_RETRIES = 5
admin_user = ComputeAdminUserConfig()
compute_admin_endpoint = ComputeAdminEndpointConfig()
auth_endpoint_config = ComputeAdminAuthConfig()
access_data = AuthProvider.get_access_data(
auth_endpoint_config, admin_user)
compute_service = access_data.get_service(
compute_admin_endpoint.compute_endpoint_name)
url = compute_service.get_endpoint(
compute_admin_endpoint.region).public_url
servers_client = ServersClient(
url, access_data.token.id_, 'json', 'json')
SSH_COMMAND = ('ssh -o UserKnownHostsFile=/dev/null '
'-o StrictHostKeyChecking=no -o ConnectTimeout=60 '
'-i {private_key_path} {user}@{ip_address}')
ssh_msg = ('Failed remote ssh connection from '
'server {0} to server {1}')
@classmethod
def setUpClass(cls):
super(TestFixedIPsConnectivity, cls).setUpClass()
network_name = 'network_{0}'.format(cls.NAMES_PREFIX)
cls.network = cls.create_server_network(name=network_name, ipv4=True)
cls.delete_networks.append(cls.network.id)
keypair_name = 'key_{0}'.format(cls.NAMES_PREFIX)
cls.keypair = cls.create_keypair(name=keypair_name)
cls.delete_keypairs.append(cls.keypair.name)
svr_name_1 = 'svr_1_{0}'.format(cls.NAMES_PREFIX)
svr_name_2 = 'svr_2_{0}'.format(cls.NAMES_PREFIX)
network_ids = [cls.public_network_id, cls.service_network_id,
cls.network.id]
cls.server1 = cls.create_test_server(
name=svr_name_1, key_name=cls.keypair.name,
network_ids=network_ids, active_server=False)
cls.server2 = cls.create_test_server(
name=svr_name_2, key_name=cls.keypair.name,
network_ids=network_ids, active_server=False)
cls.servers = [cls.server1, cls.server2]
cls.FIXED_IPS_TO_ADD = cls.net.config.fixed_ips_to_add
cls.PNET_FIX_IPv4_COUNT = cls.FIXED_IPS_TO_ADD + 1
cls.SNET_FIX_IPv4_COUNT = cls.FIXED_IPS_TO_ADD + 1
cls.INET_FIX_IPv4_COUNT = cls.FIXED_IPS_TO_ADD + 1
cls.TOTAL_INITIAL_IPS_SERVER = 3
cls.TOTAL_NETWORKS_ATTACHED_TO_SERVER = 3
cls.TOTAL_IPS_SERVER = cls.TOTAL_INITIAL_IPS_SERVER + \
(cls.FIXED_IPS_TO_ADD * cls.TOTAL_NETWORKS_ATTACHED_TO_SERVER)
# Add fixed IPs to servers
for server in cls.servers:
cls.add_fixed_ips_network(server, cls.public_network_id,
number_fixed_ips=cls.FIXED_IPS_TO_ADD)
cls.add_fixed_ips_network(server, cls.service_network_id,
number_fixed_ips=cls.FIXED_IPS_TO_ADD)
cls.add_fixed_ips_network(server, cls.network.id,
number_fixed_ips=cls.FIXED_IPS_TO_ADD)
cls.server_persona1 = ServerPersona(
server=cls.server1, pnet=True, snet=True, inet=True,
pnet_fix_ipv4_count=cls.PNET_FIX_IPv4_COUNT,
snet_fix_ipv4_count=cls.SNET_FIX_IPv4_COUNT,
inet_fix_ipv4_count=cls.INET_FIX_IPv4_COUNT,
network=cls.network, keypair=cls.keypair, ssh_username='root')
cls.server_persona2 = ServerPersona(
server=cls.server2, pnet=True, snet=True, inet=True,
pnet_fix_ipv4_count=cls.PNET_FIX_IPv4_COUNT,
snet_fix_ipv4_count=cls.SNET_FIX_IPv4_COUNT,
inet_fix_ipv4_count=cls.INET_FIX_IPv4_COUNT,
network=cls.network, keypair=cls.keypair,
ssh_username='root')
server_ids = [cls.server_persona1.server.id,
cls.server_persona2.server.id]
cls.delete_servers.extend(server_ids)
cls._transfer_private_key_to_vm(
cls.server_persona1.remote_client.ssh_client,
cls.keypair.private_key, cls.PRIVATE_KEY_PATH)
cls._transfer_private_key_to_vm(
cls.server_persona2.remote_client.ssh_client,
cls.keypair.private_key, cls.PRIVATE_KEY_PATH)
@tags('admin', 'positive')
def test_server_ifconfig(self):
"""Testing ifconfig on servers"""
servers = [self.server_persona1, self.server_persona2]
for server in servers:
ips = []
ips.extend(server.pnet_fix_ipv4)
ips.extend(server.snet_fix_ipv4)
ips.extend(server.inet_fix_ipv4)
rm_client = server.remote_client
ifconfig_ips = []
stdout = None
retry_count = 0
while stdout is None or len(ifconfig_ips) != self.TOTAL_IPS_SERVER:
del ifconfig_ips[:]
if retry_count < self.MAX_RETRIES:
ifconfig_output = rm_client.ssh_client.\
execute_shell_command("hostname -I")
stdout = ifconfig_output.stdout
pattern = re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}')
matches = pattern.finditer(stdout)
for match in matches:
ifconfig_ips.append(match.group())
if len(ifconfig_ips) == self.TOTAL_IPS_SERVER or \
retry_count == self.MAX_RETRIES:
break
retry_count += 1
server_ip_not_found = False
for ip in ips:
if ip not in ifconfig_ips:
server_ip_not_found = True
break
self.assertFalse(server_ip_not_found,
msg="server {} ip {} not found in output of "
"ifconfig {}".
format(server, ip, ifconfig_ips))
@tags('admin', 'positive')
def test_public_ping(self):
"""Testing ping on servers with public network"""
msg_err = 'Public ping to IP address {0} - FAILED'
msg_ok = 'Public ping to IP address {0} - OK'
pub_ipv4_addr = []
pub_ipv4_addr.extend(self.server_persona1.pnet_fix_ipv4)
pub_ipv4_addr.extend(self.server_persona2.pnet_fix_ipv4)
all_pub_ips_ping_result = []
failure_flag = False
for ip_addr in pub_ipv4_addr:
ip_addr_reachable = PingClient.ping(ip_addr, 4)
if ip_addr_reachable:
all_pub_ips_ping_result.append(msg_ok.format(ip_addr))
else:
all_pub_ips_ping_result.append(msg_err.format(ip_addr))
failure_flag = True
msg = 'Got connectivity failures. Ping Results: {0}'
# Fail the test if any ping failure is found
self.assertFalse(failure_flag, msg.format(all_pub_ips_ping_result))
@tags('admin', 'positive')
def test_remote_public_ping(self):
"""Testing public network remote ping on servers"""
self._test_remote_ping(port_type='pnet')
@tags('admin', 'positive')
def test_remote_private_ping(self):
"""Testing private network remote ping on servers"""
self._test_remote_ping(port_type='snet')
@tags('admin', 'positive')
def test_remote_isolated_ping(self):
"""Testing isolated network remote ping on servers"""
self._test_remote_ping(port_type='inet')
def _test_remote_ping(self, port_type):
"""Testing remote ping on servers"""
conn = Connectivity(self.server_persona2, self.server_persona1)
icmp_basic = dict(port_type=port_type, protocol='icmp', ip_version=4)
rp = conn.verify_personas_conn(**icmp_basic)
result = rp[0]
ping_result = result['connection']
self.assertTrue(ping_result, rp)
@tags('admin', 'positive')
def test_remote_public_ssh(self):
"""Testing Public remote ssh on servers"""
self._test_remote_ssh(self.server_persona1.pnet_fix_ipv4[0])
@tags('admin', 'positive')
def test_remote_private_ssh(self):
"""Testing ServiceNet remote ssh on servers"""
self._test_remote_ssh(self.server_persona1.snet_fix_ipv4[0])
@tags('admin', 'positive')
def test_remote_isolated_ssh(self):
"""Testing isolated network A remote ssh on servers"""
self._test_remote_ssh(self.server_persona1.inet_fix_ipv4[0])
def _test_remote_ssh(self, target_ip_addr):
"""Testing remote ssh on servers"""
rc2 = self.server_persona2.remote_client
ssh_cmd = self.SSH_COMMAND.format(
private_key_path=self.PRIVATE_KEY_PATH,
user=self.server_persona1.ssh_username, ip_address=target_ip_addr)
stdout = None
ssh_connection_established = False
retry_count = 0
while stdout is None or not stdout.endswith('# '):
if retry_count < self.MAX_RETRIES:
output = rc2.ssh_client.execute_shell_command(ssh_cmd)
stdout = output.stdout
retry_count += 1
if retry_count == self.MAX_RETRIES:
break
if stdout.endswith('# '):
ssh_connection_established = True
self.assertTrue(ssh_connection_established, self.ssh_msg.format(
self.server_persona2.pnet_fix_ipv4[0], target_ip_addr))
@classmethod
def add_fixed_ips_network(cls, server, network, number_fixed_ips):
# Add fixed IP's to server
for _ in range(number_fixed_ips):
cls.servers_client.add_fixed_ip(server.id, network)

View File

@ -0,0 +1,200 @@
"""
Copyright 2018 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from cafe.drivers.unittest.decorators import tags
from cloudcafe.auth.provider import AuthProvider
from cloudcafe.compute.config import ComputeAdminEndpointConfig, \
ComputeAdminUserConfig, ComputeAdminAuthConfig
from cloudcafe.compute.servers_api.client import ServersClient
from cloudcafe.networking.networks.common.constants \
import NeutronResponseCodes
from cloudcafe.networking.networks.personas import ServerPersona
from cloudroast.networking.networks.fixtures import NetworkingComputeFixture
class TestFixedIPsMultiple(NetworkingComputeFixture):
"""
Tests add and remove multiple public, private and isolated server IPs
default value is 4, config parameter is multiple_fixed_ips_to_add
Requires the IPy Python package and the following data in the config file,
[admin]
admin_auth_url=<auth_url>
admin_username=<admin_username>
admin_password=<admin_password>
"""
NAMES_PREFIX = 'fixed_ips_multiple'
@classmethod
def setUpClass(cls):
super(TestFixedIPsMultiple, cls).setUpClass()
network_name = 'network_{0}'.format(cls.NAMES_PREFIX)
cls.network = cls.create_server_network(name=network_name, ipv4=True)
cls.delete_networks.append(cls.network.id)
keypair_name = 'key_{0}'.format(cls.NAMES_PREFIX)
cls.keypair = cls.create_keypair(name=keypair_name)
cls.delete_keypairs.append(cls.keypair.name)
svr_name = 'svr_{0}'.format(cls.NAMES_PREFIX)
network_ids = [cls.public_network_id, cls.service_network_id,
cls.network.id]
cls.server = cls.create_test_server(
name=svr_name, key_name=cls.keypair.name,
network_ids=network_ids, active_server=False)
cls.server_persona = ServerPersona(
server=cls.server, pnet=True, snet=True, inet=True,
pnet_fix_ipv4_count=1, snet_fix_ipv4_count=1,
inet_fix_ipv4_count=1,
network=cls.network, keypair=cls.keypair, ssh_username='root')
server_ids = [cls.server_persona.server.id]
cls.delete_servers.extend(server_ids)
cls.pub_net_id = cls.public_network_id
cls.pri_net_id = cls.service_network_id
cls.iso_net_id = cls.network.id
cls.initial_pub_ip = cls.server_persona.pnet_fix_ipv4
cls.initial_pri_ip = cls.server_persona.snet_fix_ipv4
cls.initial_iso_ip = cls.server_persona.inet_fix_ipv4
# Initial IPv4 counts, update as needed if using a specific server
cls.ini_ips_count = 1
# Multiple fixed IPs to add
cls.FIXED_IPS_TO_ADD = cls.net.config.multiple_fixed_ips_to_add
admin_user = ComputeAdminUserConfig()
compute_admin_endpoint = ComputeAdminEndpointConfig()
auth_endpoint_config = ComputeAdminAuthConfig()
access_data = AuthProvider.get_access_data(
auth_endpoint_config, admin_user)
compute_service = access_data.get_service(
compute_admin_endpoint.compute_endpoint_name)
url = compute_service.get_endpoint(
compute_admin_endpoint.region).public_url
cls.servers_client = ServersClient(
url, access_data.token.id_, 'json', 'json')
cls.add_msg = ('Unable to add a {0} network fixed IP to server {1} '
'Response: {2}')
cls.rem_msg = ('Unable to remove a {0} network fixed IP of server {1} '
'Response: {2}')
@tags('admin', 'limits')
def test_multiple_add_remove_fixed_ips_public(self):
"""
Testing adding and removing multiple public fixed IP's
"""
self._test_add_remove_fixed_ip(port_type=self.pub_net_id,
number_fixed_ips=self.FIXED_IPS_TO_ADD,
ini_ips_count=self.ini_ips_count)
@tags('admin', 'limits')
def test_multiple_add_remove_fixed_ips_private(self):
"""
Testing adding and removing multiple private fixed IP's
"""
self._test_add_remove_fixed_ip(port_type=self.pri_net_id,
number_fixed_ips=self.FIXED_IPS_TO_ADD,
ini_ips_count=self.ini_ips_count)
@tags('admin', 'limits')
def test_multiple_add_remove_fixed_ips_isolated(self):
"""
Testing adding and removing multiple isolated fixed IP's
"""
self._test_add_remove_fixed_ip(port_type=self.iso_net_id,
number_fixed_ips=self.FIXED_IPS_TO_ADD,
ini_ips_count=self.ini_ips_count)
def _test_add_remove_fixed_ip(self, port_type, number_fixed_ips,
ini_ips_count):
# For initial IPs, assert the expected counts for IPv4
self.assertServerPersonaFixedIps(self.server_persona)
ip_count = ini_ips_count
added_ips = []
for _ in range(number_fixed_ips):
# Add an IP and assert the new counts for IPv4
add_ip_response = self.servers_client.add_fixed_ip(
self.server.id, port_type)
self.assertEqual(add_ip_response.status_code,
NeutronResponseCodes.ADD_FIXED_IP,
msg=self.add_msg.format(
port_type, self.server.id, add_ip_response))
if port_type == self.pub_net_id:
ip_count += 1
server_persona = ServerPersona(
server=self.server, pnet=True, snet=True, inet=True,
pnet_fix_ipv4_count=ip_count,
snet_fix_ipv4_count=ini_ips_count,
inet_fix_ipv4_count=ini_ips_count, network=self.network,
keypair=self.keypair, ssh_username='root')
# Get the added public IP address and saving the added
# ips for removing
public_ips = server_persona.pnet_fix_ipv4
for ip in public_ips:
if ip != self.initial_pub_ip[0] and ip not in added_ips:
added_ips.append(ip)
elif port_type == self.pri_net_id:
ip_count += 1
server_persona = ServerPersona(
server=self.server, pnet=True, snet=True, inet=True,
pnet_fix_ipv4_count=ini_ips_count,
snet_fix_ipv4_count=ip_count,
inet_fix_ipv4_count=ini_ips_count, network=self.network,
keypair=self.keypair, ssh_username='root')
# Get the added private IP address and saving the added
# ips for removing
private_ips = server_persona.snet_fix_ipv4
for ip in private_ips:
if ip != self.initial_pri_ip[0] and ip not in added_ips:
added_ips.append(ip)
elif port_type == self.iso_net_id:
ip_count += 1
server_persona = ServerPersona(
server=self.server, pnet=True, snet=True, inet=True,
pnet_fix_ipv4_count=ini_ips_count,
snet_fix_ipv4_count=ini_ips_count,
inet_fix_ipv4_count=ip_count, network=self.network,
keypair=self.keypair, ssh_username='root')
# Get the added isolated IP address and saving the added
# ips for removing
isolated_ips = server_persona.inet_fix_ipv4
for ip in isolated_ips:
if ip != self.initial_iso_ip[0] and ip not in added_ips:
added_ips.append(ip)
self.assertServerPersonaFixedIps(server_persona)
persona_args = {"server": self.server, "keypair": self.keypair,
"pnet": True, "snet": True, "inet": True,
"pnet_fix_ipv4_count": ini_ips_count,
"snet_fix_ipv4_count": ini_ips_count,
"inet_fix_ipv4_count": ini_ips_count,
"network": self.network, "ssh_username": 'root'}
for ip_to_remove in added_ips:
# Remove the added IP and assert the updated counts for IPv4
removed_ip_response = self.servers_client.remove_fixed_ip(
self.server.id, ip_to_remove)
self.assertEqual(removed_ip_response.status_code,
NeutronResponseCodes.REMOVE_FIXED_IP,
msg=self.rem_msg.format(
port_type, self.server.id,
removed_ip_response))
if port_type == self.pub_net_id:
ip_count -= 1
persona_args["pnet_fix_ipv4_count"] = ip_count
elif port_type == self.pri_net_id:
ip_count -= 1
persona_args["snet_fix_ipv4_count"] = ip_count
elif port_type == self.iso_net_id:
ip_count -= 1
persona_args["inet_fix_ipv4_count"] = ip_count
server_persona = ServerPersona(**persona_args)
self.assertServerPersonaFixedIps(server_persona)

View File

@ -0,0 +1,163 @@
"""
Copyright 2018 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from cafe.drivers.unittest.decorators import tags
from cloudcafe.auth.provider import AuthProvider
from cloudcafe.compute.config import ComputeAdminEndpointConfig, \
ComputeAdminUserConfig, ComputeAdminAuthConfig
from cloudcafe.compute.servers_api.client import ServersClient
from cloudcafe.networking.networks.common.constants \
import NeutronResponseCodes
from cloudcafe.networking.networks.personas import ServerPersona
from cloudroast.networking.networks.fixtures import NetworkingComputeFixture
class TestFixedIPsMultiple(NetworkingComputeFixture):
"""
Tests try to remove all public, private and isolated server IPs and
should NOT be able to do so, the last IP of each network shall remain
Requires the IPy Python package and the following data in the config file,
[admin]
admin_auth_url=<auth_url>
admin_username=<admin_username>
admin_password=<admin_password>
"""
NAMES_PREFIX = 'fixed_ips_neg'
@classmethod
def setUpClass(cls):
super(TestFixedIPsMultiple, cls).setUpClass()
network_name = 'network_{0}'.format(cls.NAMES_PREFIX)
cls.network = cls.create_server_network(name=network_name, ipv4=True)
cls.delete_networks.append(cls.network.id)
keypair_name = 'key_{0}'.format(cls.NAMES_PREFIX)
cls.keypair = cls.create_keypair(name=keypair_name)
cls.delete_keypairs.append(cls.keypair.name)
svr_name = 'svr_{0}'.format(cls.NAMES_PREFIX)
network_ids = [cls.public_network_id, cls.service_network_id,
cls.network.id]
cls.server = cls.create_test_server(
name=svr_name, key_name=cls.keypair.name,
network_ids=network_ids, active_server=False)
cls.server_persona = ServerPersona(
server=cls.server, pnet=True, snet=True, inet=True,
network=cls.network, keypair=cls.keypair, ssh_username='root')
server_ids = [cls.server_persona.server.id]
cls.delete_servers.extend(server_ids)
cls.pub_net_id = cls.public_network_id
cls.pri_net_id = cls.service_network_id
cls.iso_net_id = cls.network.id
# Initial IPv4 counts, update as needed if using a specific server
cls.ini_ips_count = 1
# Multiple fixed IPs to add
cls.FIXED_IPS_TO_ADD = cls.net.config.multiple_fixed_ips_to_add
admin_user = ComputeAdminUserConfig()
compute_admin_endpoint = ComputeAdminEndpointConfig()
auth_endpoint_config = ComputeAdminAuthConfig()
access_data = AuthProvider.get_access_data(
auth_endpoint_config, admin_user)
compute_service = access_data.get_service(
compute_admin_endpoint.compute_endpoint_name)
url = compute_service.get_endpoint(
compute_admin_endpoint.region).public_url
cls.servers_client = ServersClient(
url, access_data.token.id_, 'json', 'json')
cls.rem_msg = ('Unable to remove a {0} network fixed IP of server {1} '
'Response: {2}')
@tags('admin', 'negative')
def test_remove_all_fixed_ips_public(self):
"""
Testing removing all public fixed IPs
"""
self._add_fixed_ips_network(self.server, self.pub_net_id,
number_fixed_ips=self.FIXED_IPS_TO_ADD)
self._remove_all_ips(self.server, self.pub_net_id,
self.ini_ips_count)
@tags('admin', 'negative')
def test_remove_all_fixed_ips_private(self):
"""
Testing removing all private fixed IPs
"""
self._add_fixed_ips_network(self.server, self.pri_net_id,
number_fixed_ips=self.FIXED_IPS_TO_ADD)
self._remove_all_ips(self.server, self.pri_net_id,
self.ini_ips_count)
@tags('admin', 'negative')
def test_remove_all_fixed_ips_isolated(self):
"""
Testing removing all isolated fixed IPs
"""
self._add_fixed_ips_network(self.server, self.iso_net_id,
number_fixed_ips=self.FIXED_IPS_TO_ADD)
self._remove_all_ips(self.server, self.iso_net_id,
self.ini_ips_count)
def _add_fixed_ips_network(self, server, network, number_fixed_ips):
# Add fixed IP's to server
for _ in range(number_fixed_ips):
self.servers_client.add_fixed_ip(server.id, network)
def _remove_all_ips(self, server, port_type, ini_ips_count):
"""
Tries to remove all network IPs from a server and verifies
that the last IP of a network can NOT be removed
"""
persona_args = {"server": self.server, "keypair": self.keypair,
"pnet": True, "snet": True, "inet": True,
"pnet_fix_ipv4_count": ini_ips_count,
"snet_fix_ipv4_count": ini_ips_count,
"inet_fix_ipv4_count": ini_ips_count,
"network": self.network, "ssh_username": 'root'}
if port_type == self.pub_net_id:
persona_args["pnet_fix_ipv4_count"] = self.FIXED_IPS_TO_ADD+1
server_persona = ServerPersona(**persona_args)
ips = server_persona.pnet_fix_ipv4
ip_count = server_persona.pnet_fix_ipv4_count
if port_type == self.pri_net_id:
persona_args["snet_fix_ipv4_count"] = self.FIXED_IPS_TO_ADD+1
server_persona = ServerPersona(**persona_args)
ips = server_persona.snet_fix_ipv4
ip_count = server_persona.snet_fix_ipv4_count
if port_type == self.iso_net_id:
persona_args["inet_fix_ipv4_count"] = self.FIXED_IPS_TO_ADD + 1
server_persona = ServerPersona(**persona_args)
ips = server_persona.inet_fix_ipv4
ip_count = server_persona.inet_fix_ipv4_count
# Try to remove all IPv4 IPs
for ip_to_remove in ips:
removed_ip_response = self.servers_client.\
remove_fixed_ip(self.server.id, ip_to_remove)
ip_count -= 1
if ip_count >= 1:
self.assertEqual(removed_ip_response.status_code,
NeutronResponseCodes.REMOVE_FIXED_IP,
msg=self.rem_msg.format(
port_type, self.server.id,
removed_ip_response))
else:
msg = ('Tried to remove last IP of {0} network in server {1} '
'Unexpected Response: {2}'.
format(port_type, server.id,
removed_ip_response.status_code))
self.assertEqual(removed_ip_response.status_code,
NeutronResponseCodes.REMOVE_FIXED_IP, msg)
self.assertServerPersonaFixedIps(self.server_persona)