192 lines
6.8 KiB
Python
192 lines
6.8 KiB
Python
# Copyright 2015 United States Government as represented by the
|
|
# Administrator of the National Aeronautics and Space Administration.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import copy
|
|
|
|
from gceapi.tests.functional.api import test_networks
|
|
|
|
|
|
class TestFirewallBase(test_networks.TestNetworksBase):
|
|
@property
|
|
def firewalls(self):
|
|
res = self.api.compute.firewalls()
|
|
self.assertIsNotNone(
|
|
res,
|
|
'Null firewalls object, api is not built properly')
|
|
return res
|
|
|
|
def _create_firewall(self, options):
|
|
project_id = self.cfg.project_id
|
|
self.trace('Create firewall with options {}'.format(options))
|
|
request = self.firewalls.insert(
|
|
project=project_id,
|
|
body=options)
|
|
self._add_cleanup(self._delete_firewall, options['name'])
|
|
self._execute_async_request(request, project_id)
|
|
|
|
def _delete_firewall(self, name):
|
|
cfg = self.cfg
|
|
project_id = cfg.project_id
|
|
self.trace('Delete firewall: project_id={} firewall={}'.
|
|
format(project_id, name))
|
|
request = self.firewalls.delete(
|
|
project=project_id,
|
|
firewall=name)
|
|
self._execute_async_request(request, project_id)
|
|
self._remove_cleanup(self._delete_firewall, name)
|
|
|
|
def _list_firewalls(self, filter=None):
|
|
project_id = self.cfg.project_id
|
|
self.trace('List firewalls: project_id={}'.format(project_id))
|
|
request = self.firewalls.list(project=project_id, filter=filter)
|
|
self.trace_request(request)
|
|
result = request.execute()
|
|
self.trace('Firewalls: {}'.format(result))
|
|
self.api.validate_schema(value=result, schema_name='FirewallList')
|
|
return result
|
|
|
|
def _get_firewall(self, name):
|
|
project_id = self.cfg.project_id
|
|
self.trace('Get firewall: project_id={} firewall={}'.
|
|
format(project_id, name))
|
|
request = self.firewalls.get(
|
|
project=project_id,
|
|
firewall=name)
|
|
result = request.execute()
|
|
self.trace('Firewall: {}'.format(result))
|
|
self.api.validate_schema(value=result, schema_name='Firewall')
|
|
return result
|
|
|
|
def _get_expected_firewall(self, options):
|
|
firewall = copy.deepcopy(options)
|
|
firewall.setdefault('kind', u'compute#firewall')
|
|
self_link = 'global/firewalls/{}'.format(firewall['name'])
|
|
firewall.setdefault('selfLink', self.api.get_project_url(self_link))
|
|
# just to check on exist
|
|
firewall.setdefault('allowed', [])
|
|
# TODO(alexey-mr): OS GCE default firewall doesn't provide network
|
|
# firewall.setdefault('network', '.*')
|
|
return firewall
|
|
|
|
def _ensure_firewall_created(self, options):
|
|
result = self._get_firewall(options['name'])
|
|
expected = self._get_expected_firewall(options)
|
|
self.assertObject(expected, result)
|
|
return result
|
|
|
|
def _create_firewall_and_validate(self, options):
|
|
self._create_firewall(options)
|
|
result = self._get_firewall(options['name'])
|
|
expected = self._get_expected_firewall(options)
|
|
self.assertObject(expected, result)
|
|
return expected
|
|
|
|
|
|
class TestFirewalls(TestFirewallBase):
|
|
def test_list_default_firewalls(self):
|
|
result = self._list_firewalls()
|
|
for firewall in result['items']:
|
|
options = {
|
|
'name': firewall['name']
|
|
}
|
|
expected = self._get_expected_firewall(options)
|
|
self.assertObject(expected, firewall)
|
|
|
|
def test_create_delete_firewall_ip_range_tcp_port(self):
|
|
name = self._rand_name('testfirewall')
|
|
options = {
|
|
'name': name,
|
|
'allowed': [
|
|
{
|
|
'IPProtocol': 'tcp',
|
|
'ports': ['44444']
|
|
}
|
|
],
|
|
'sourceRanges': ['10.240.0.0/16']
|
|
}
|
|
self._create_firewall_and_validate(options)
|
|
self._delete_firewall(name)
|
|
|
|
def test_create_delete_firewall_source_tag_tcp_port_range(self):
|
|
if not self.full_compatibility:
|
|
self.skipTest('Skip because of OS GCE does not support tags')
|
|
return
|
|
name = self._rand_name('testfirewall')
|
|
options = {
|
|
'name': name,
|
|
'allowed': [
|
|
{
|
|
'IPProtocol': 'tcp',
|
|
'ports': ['50000-55000']
|
|
}
|
|
],
|
|
'sourceTags': ['no-ip']
|
|
}
|
|
self._create_firewall_and_validate(options)
|
|
self._delete_firewall(name)
|
|
|
|
def test_create_delete_firewall_target_tag_tcp_empty_ports(self):
|
|
if not self.full_compatibility:
|
|
self.skipTest('Skip because of OS GCE does not support tags')
|
|
return
|
|
name = self._rand_name('testfirewall')
|
|
options = {
|
|
'name': name,
|
|
'allowed': [
|
|
{
|
|
'IPProtocol': 'tcp'
|
|
}
|
|
],
|
|
'sourceTags': ['src-no-ip'],
|
|
'targetTags': ['trg-no-ip']
|
|
}
|
|
self._create_firewall_and_validate(options)
|
|
self._delete_firewall(name)
|
|
|
|
def _prepare_network(self):
|
|
name = self._rand_name('testnetwork')
|
|
options = {
|
|
'name': name,
|
|
'IPv4Range': '10.241.0.0/16',
|
|
}
|
|
self._create_network(options)
|
|
options['gatewayIPv4'] = '10.241.0.1'
|
|
return self._ensure_network_created(options)
|
|
|
|
def test_create_delete_firewall_custom_network(self):
|
|
if self.is_nova_network:
|
|
self.skipTest('Skip because of nova-network cannot create network')
|
|
return
|
|
network = self._prepare_network()
|
|
name = self._rand_name('testfirewall')
|
|
options = {
|
|
'name': name,
|
|
'allowed': [
|
|
{
|
|
'IPProtocol': 'udp',
|
|
'ports': ['30000', '40000', '50000-51000']
|
|
},
|
|
{
|
|
'IPProtocol': 'icmp'
|
|
}
|
|
],
|
|
'sourceRanges': [network['IPv4Range']],
|
|
'network': network['selfLink']
|
|
}
|
|
self._create_firewall_and_validate(options)
|
|
self._delete_firewall(name)
|
|
self._delete_network(network['name'])
|