Files
gce-api/gceapi/tests/functional/api/test_firewalls.py
Andrey Pavlov 9d31912f36 update config & fix test against google
Change-Id: I713724a866a9fc8727bf0ad150c596bc4b4b6d12
2015-12-23 21:56:13 +03:00

192 lines
6.8 KiB
Python

# Copyright 2015 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from gceapi.tests.functional.api import test_networks
class TestFirewallBase(test_networks.TestNetworksBase):
@property
def firewalls(self):
res = self.api.compute.firewalls()
self.assertIsNotNone(
res,
'Null firewalls object, api is not built properly')
return res
def _create_firewall(self, options):
project_id = self.cfg.project_id
self.trace('Create firewall with options {}'.format(options))
request = self.firewalls.insert(
project=project_id,
body=options)
self._add_cleanup(self._delete_firewall, options['name'])
self._execute_async_request(request, project_id)
def _delete_firewall(self, name):
cfg = self.cfg
project_id = cfg.project_id
self.trace('Delete firewall: project_id={} firewall={}'.
format(project_id, name))
request = self.firewalls.delete(
project=project_id,
firewall=name)
self._execute_async_request(request, project_id)
self._remove_cleanup(self._delete_firewall, name)
def _list_firewalls(self, filter=None):
project_id = self.cfg.project_id
self.trace('List firewalls: project_id={}'.format(project_id))
request = self.firewalls.list(project=project_id, filter=filter)
self.trace_request(request)
result = request.execute()
self.trace('Firewalls: {}'.format(result))
self.api.validate_schema(value=result, schema_name='FirewallList')
return result
def _get_firewall(self, name):
project_id = self.cfg.project_id
self.trace('Get firewall: project_id={} firewall={}'.
format(project_id, name))
request = self.firewalls.get(
project=project_id,
firewall=name)
result = request.execute()
self.trace('Firewall: {}'.format(result))
self.api.validate_schema(value=result, schema_name='Firewall')
return result
def _get_expected_firewall(self, options):
firewall = copy.deepcopy(options)
firewall.setdefault('kind', u'compute#firewall')
self_link = 'global/firewalls/{}'.format(firewall['name'])
firewall.setdefault('selfLink', self.api.get_project_url(self_link))
# just to check on exist
firewall.setdefault('allowed', [])
# TODO(alexey-mr): OS GCE default firewall doesn't provide network
# firewall.setdefault('network', '.*')
return firewall
def _ensure_firewall_created(self, options):
result = self._get_firewall(options['name'])
expected = self._get_expected_firewall(options)
self.assertObject(expected, result)
return result
def _create_firewall_and_validate(self, options):
self._create_firewall(options)
result = self._get_firewall(options['name'])
expected = self._get_expected_firewall(options)
self.assertObject(expected, result)
return expected
class TestFirewalls(TestFirewallBase):
def test_list_default_firewalls(self):
result = self._list_firewalls()
for firewall in result['items']:
options = {
'name': firewall['name']
}
expected = self._get_expected_firewall(options)
self.assertObject(expected, firewall)
def test_create_delete_firewall_ip_range_tcp_port(self):
name = self._rand_name('testfirewall')
options = {
'name': name,
'allowed': [
{
'IPProtocol': 'tcp',
'ports': ['44444']
}
],
'sourceRanges': ['10.240.0.0/16']
}
self._create_firewall_and_validate(options)
self._delete_firewall(name)
def test_create_delete_firewall_source_tag_tcp_port_range(self):
if not self.full_compatibility:
self.skipTest('Skip because of OS GCE does not support tags')
return
name = self._rand_name('testfirewall')
options = {
'name': name,
'allowed': [
{
'IPProtocol': 'tcp',
'ports': ['50000-55000']
}
],
'sourceTags': ['no-ip']
}
self._create_firewall_and_validate(options)
self._delete_firewall(name)
def test_create_delete_firewall_target_tag_tcp_empty_ports(self):
if not self.full_compatibility:
self.skipTest('Skip because of OS GCE does not support tags')
return
name = self._rand_name('testfirewall')
options = {
'name': name,
'allowed': [
{
'IPProtocol': 'tcp'
}
],
'sourceTags': ['src-no-ip'],
'targetTags': ['trg-no-ip']
}
self._create_firewall_and_validate(options)
self._delete_firewall(name)
def _prepare_network(self):
name = self._rand_name('testnetwork')
options = {
'name': name,
'IPv4Range': '10.241.0.0/16',
}
self._create_network(options)
options['gatewayIPv4'] = '10.241.0.1'
return self._ensure_network_created(options)
def test_create_delete_firewall_custom_network(self):
if self.is_nova_network:
self.skipTest('Skip because of nova-network cannot create network')
return
network = self._prepare_network()
name = self._rand_name('testfirewall')
options = {
'name': name,
'allowed': [
{
'IPProtocol': 'udp',
'ports': ['30000', '40000', '50000-51000']
},
{
'IPProtocol': 'icmp'
}
],
'sourceRanges': [network['IPv4Range']],
'network': network['selfLink']
}
self._create_firewall_and_validate(options)
self._delete_firewall(name)
self._delete_network(network['name'])