deb-heat/heat/tests/test_neutron_vpnservice.py
Ethan Lynn 10a00d3ee4 Replace str() with six.text_type() where exception is used
When switching to oslo.i18n, tests where used str(exc) will failed
with unicodeerror, so before switching to oslo.i18n, need to
replace str with six.text_type where exception is used. And it's
recommanded to use six.text_type instead of str in openstack.

Change-Id: Ie37a6ef74a12b2580ce72afc75c99bcd19d4cf97
2014-07-29 10:34:26 +08:00

740 lines
28 KiB
Python

#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import mox
from neutronclient.v2_0 import client as neutronclient
import six
from heat.common import exception
from heat.common import template_format
from heat.engine.resources.neutron import neutron_utils
from heat.engine.resources.neutron import vpnservice
from heat.engine import scheduler
from heat.tests.common import HeatTestCase
from heat.tests import utils
vpnservice_template_deprecated = '''
{
"AWSTemplateFormatVersion" : "2010-09-09",
"Description" : "Template to test VPN service resource",
"Parameters" : {},
"Resources" : {
"VPNService" : {
"Type" : "OS::Neutron::VPNService",
"Properties" : {
"name" : "VPNService",
"description" : "My new VPN service",
"admin_state_up" : true,
"router_id" : "rou123",
"subnet_id" : "sub123"
}
}
}
}
'''
vpnservice_template = '''
{
"AWSTemplateFormatVersion" : "2010-09-09",
"Description" : "Template to test VPN service resource",
"Parameters" : {},
"Resources" : {
"VPNService" : {
"Type" : "OS::Neutron::VPNService",
"Properties" : {
"name" : "VPNService",
"description" : "My new VPN service",
"admin_state_up" : true,
"router_id" : "rou123",
"subnet" : "sub123"
}
}
}
}
'''
ipsec_site_connection_template = '''
{
"AWSTemplateFormatVersion" : "2010-09-09",
"Description" : "Template to test IPsec policy resource",
"Parameters" : {},
"Resources" : {
"IPsecSiteConnection" : {
"Type" : "OS::Neutron::IPsecSiteConnection",
"Properties" : {
"name" : "IPsecSiteConnection",
"description" : "My new VPN connection",
"peer_address" : "172.24.4.233",
"peer_id" : "172.24.4.233",
"peer_cidrs" : [ "10.2.0.0/24" ],
"mtu" : 1500,
"dpd" : {
"actions" : "hold",
"interval" : 30,
"timeout" : 120
},
"psk" : "secret",
"initiator" : "bi-directional",
"admin_state_up" : true,
"ikepolicy_id" : "ike123",
"ipsecpolicy_id" : "ips123",
"vpnservice_id" : "vpn123"
}
}
}
}
'''
ikepolicy_template = '''
{
"AWSTemplateFormatVersion" : "2010-09-09",
"Description" : "Template to test IKE policy resource",
"Parameters" : {},
"Resources" : {
"IKEPolicy" : {
"Type" : "OS::Neutron::IKEPolicy",
"Properties" : {
"name" : "IKEPolicy",
"description" : "My new IKE policy",
"auth_algorithm" : "sha1",
"encryption_algorithm" : "3des",
"phase1_negotiation_mode" : "main",
"lifetime" : {
"units" : "seconds",
"value" : 3600
},
"pfs" : "group5",
"ike_version" : "v1"
}
}
}
}
'''
ipsecpolicy_template = '''
{
"AWSTemplateFormatVersion" : "2010-09-09",
"Description" : "Template to test IPsec policy resource",
"Parameters" : {},
"Resources" : {
"IPsecPolicy" : {
"Type" : "OS::Neutron::IPsecPolicy",
"Properties" : {
"name" : "IPsecPolicy",
"description" : "My new IPsec policy",
"transform_protocol": "esp",
"encapsulation_mode" : "tunnel",
"auth_algorithm" : "sha1",
"encryption_algorithm" : "3des",
"lifetime" : {
"units" : "seconds",
"value" : 3600
},
"pfs" : "group5"
}
}
}
}
'''
class VPNServiceTest(HeatTestCase):
VPN_SERVICE_CONF = {
'vpnservice': {
'name': 'VPNService',
'description': 'My new VPN service',
'admin_state_up': True,
'router_id': 'rou123',
'subnet_id': 'sub123'
}
}
def setUp(self):
super(VPNServiceTest, self).setUp()
self.m.StubOutWithMock(neutronclient.Client, 'create_vpnservice')
self.m.StubOutWithMock(neutronclient.Client, 'delete_vpnservice')
self.m.StubOutWithMock(neutronclient.Client, 'show_vpnservice')
self.m.StubOutWithMock(neutronclient.Client, 'update_vpnservice')
self.m.StubOutWithMock(neutron_utils.neutronV20,
'find_resourceid_by_name_or_id')
self.stub_keystoneclient()
def create_vpnservice(self, resolve_neutron=True):
if resolve_neutron:
neutron_utils.neutronV20.find_resourceid_by_name_or_id(
mox.IsA(neutronclient.Client),
'subnet',
'sub123'
).AndReturn('sub123')
snippet = template_format.parse(vpnservice_template)
else:
snippet = template_format.parse(vpnservice_template_deprecated)
neutronclient.Client.create_vpnservice(
self.VPN_SERVICE_CONF).AndReturn({'vpnservice': {'id': 'vpn123'}})
self.stack = utils.parse_stack(snippet)
resource_defns = self.stack.t.resource_definitions(self.stack)
return vpnservice.VPNService('vpnservice',
resource_defns['VPNService'],
self.stack)
def test_create_deprecated(self):
self._test_create(resolve_neutron=False)
def test_create(self):
self._test_create()
def _test_create(self, resolve_neutron=True):
rsrc = self.create_vpnservice(resolve_neutron)
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
def test_create_failed(self):
neutron_utils.neutronV20.find_resourceid_by_name_or_id(
mox.IsA(neutronclient.Client),
'subnet',
'sub123'
).AndReturn('sub123')
neutronclient.Client.create_vpnservice(self.VPN_SERVICE_CONF).AndRaise(
vpnservice.NeutronClientException())
self.m.ReplayAll()
snippet = template_format.parse(vpnservice_template)
self.stack = utils.parse_stack(snippet)
resource_defns = self.stack.t.resource_definitions(self.stack)
rsrc = vpnservice.VPNService('vpnservice',
resource_defns['VPNService'],
self.stack)
error = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(rsrc.create))
self.assertEqual(
'NeutronClientException: An unknown exception occurred.',
six.text_type(error))
self.assertEqual((rsrc.CREATE, rsrc.FAILED), rsrc.state)
self.m.VerifyAll()
def test_delete(self):
neutronclient.Client.delete_vpnservice('vpn123')
neutronclient.Client.show_vpnservice('vpn123').AndRaise(
vpnservice.NeutronClientException(status_code=404))
rsrc = self.create_vpnservice()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
scheduler.TaskRunner(rsrc.delete)()
self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
def test_delete_already_gone(self):
neutronclient.Client.delete_vpnservice('vpn123').AndRaise(
vpnservice.NeutronClientException(status_code=404))
rsrc = self.create_vpnservice()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
scheduler.TaskRunner(rsrc.delete)()
self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
def test_delete_failed(self):
neutronclient.Client.delete_vpnservice('vpn123').AndRaise(
vpnservice.NeutronClientException(status_code=400))
rsrc = self.create_vpnservice()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
error = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(rsrc.delete))
self.assertEqual(
'NeutronClientException: An unknown exception occurred.',
six.text_type(error))
self.assertEqual((rsrc.DELETE, rsrc.FAILED), rsrc.state)
self.m.VerifyAll()
def test_attribute(self):
rsrc = self.create_vpnservice()
neutronclient.Client.show_vpnservice('vpn123').MultipleTimes(
).AndReturn(self.VPN_SERVICE_CONF)
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
self.assertEqual('VPNService', rsrc.FnGetAtt('name'))
self.assertEqual('My new VPN service', rsrc.FnGetAtt('description'))
self.assertIs(True, rsrc.FnGetAtt('admin_state_up'))
self.assertEqual('rou123', rsrc.FnGetAtt('router_id'))
self.assertEqual('sub123', rsrc.FnGetAtt('subnet_id'))
self.m.VerifyAll()
def test_attribute_failed(self):
rsrc = self.create_vpnservice()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
error = self.assertRaises(exception.InvalidTemplateAttribute,
rsrc.FnGetAtt, 'non-existent_property')
self.assertEqual(
'The Referenced Attribute (vpnservice non-existent_property) is '
'incorrect.',
six.text_type(error))
self.m.VerifyAll()
def test_update(self):
rsrc = self.create_vpnservice()
neutronclient.Client.update_vpnservice(
'vpn123', {'vpnservice': {'admin_state_up': False}})
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
update_template = copy.deepcopy(rsrc.t)
update_template['Properties']['admin_state_up'] = False
scheduler.TaskRunner(rsrc.update, update_template)()
self.m.VerifyAll()
class IPsecSiteConnectionTest(HeatTestCase):
IPSEC_SITE_CONNECTION_CONF = {
'ipsec_site_connection': {
'name': 'IPsecSiteConnection',
'description': 'My new VPN connection',
'peer_address': '172.24.4.233',
'peer_id': '172.24.4.233',
'peer_cidrs': ['10.2.0.0/24'],
'mtu': 1500,
'dpd': {
'actions': 'hold',
'interval': 30,
'timeout': 120
},
'psk': 'secret',
'initiator': 'bi-directional',
'admin_state_up': True,
'ikepolicy_id': 'ike123',
'ipsecpolicy_id': 'ips123',
'vpnservice_id': 'vpn123'
}
}
def setUp(self):
super(IPsecSiteConnectionTest, self).setUp()
self.m.StubOutWithMock(neutronclient.Client,
'create_ipsec_site_connection')
self.m.StubOutWithMock(neutronclient.Client,
'delete_ipsec_site_connection')
self.m.StubOutWithMock(neutronclient.Client,
'show_ipsec_site_connection')
self.m.StubOutWithMock(neutronclient.Client,
'update_ipsec_site_connection')
self.stub_keystoneclient()
def create_ipsec_site_connection(self):
neutronclient.Client.create_ipsec_site_connection(
self.IPSEC_SITE_CONNECTION_CONF).AndReturn(
{'ipsec_site_connection': {'id': 'con123'}})
snippet = template_format.parse(ipsec_site_connection_template)
self.stack = utils.parse_stack(snippet)
resource_defns = self.stack.t.resource_definitions(self.stack)
return vpnservice.IPsecSiteConnection(
'ipsec_site_connection',
resource_defns['IPsecSiteConnection'],
self.stack)
def test_create(self):
rsrc = self.create_ipsec_site_connection()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
def test_create_failed(self):
neutronclient.Client.create_ipsec_site_connection(
self.IPSEC_SITE_CONNECTION_CONF).AndRaise(
vpnservice.NeutronClientException())
self.m.ReplayAll()
snippet = template_format.parse(ipsec_site_connection_template)
self.stack = utils.parse_stack(snippet)
resource_defns = self.stack.t.resource_definitions(self.stack)
rsrc = vpnservice.IPsecSiteConnection(
'ipsec_site_connection',
resource_defns['IPsecSiteConnection'],
self.stack)
error = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(rsrc.create))
self.assertEqual(
'NeutronClientException: An unknown exception occurred.',
six.text_type(error))
self.assertEqual((rsrc.CREATE, rsrc.FAILED), rsrc.state)
self.m.VerifyAll()
def test_delete(self):
neutronclient.Client.delete_ipsec_site_connection('con123')
neutronclient.Client.show_ipsec_site_connection('con123').AndRaise(
vpnservice.NeutronClientException(status_code=404))
rsrc = self.create_ipsec_site_connection()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
scheduler.TaskRunner(rsrc.delete)()
self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
def test_delete_already_gone(self):
neutronclient.Client.delete_ipsec_site_connection('con123').AndRaise(
vpnservice.NeutronClientException(status_code=404))
rsrc = self.create_ipsec_site_connection()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
scheduler.TaskRunner(rsrc.delete)()
self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
def test_delete_failed(self):
neutronclient.Client.delete_ipsec_site_connection('con123').AndRaise(
vpnservice.NeutronClientException(status_code=400))
rsrc = self.create_ipsec_site_connection()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
error = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(rsrc.delete))
self.assertEqual(
'NeutronClientException: An unknown exception occurred.',
six.text_type(error))
self.assertEqual((rsrc.DELETE, rsrc.FAILED), rsrc.state)
self.m.VerifyAll()
def test_attribute(self):
rsrc = self.create_ipsec_site_connection()
neutronclient.Client.show_ipsec_site_connection(
'con123').MultipleTimes().AndReturn(
self.IPSEC_SITE_CONNECTION_CONF)
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
self.assertEqual('IPsecSiteConnection', rsrc.FnGetAtt('name'))
self.assertEqual('My new VPN connection', rsrc.FnGetAtt('description'))
self.assertEqual('172.24.4.233', rsrc.FnGetAtt('peer_address'))
self.assertEqual('172.24.4.233', rsrc.FnGetAtt('peer_id'))
self.assertEqual(['10.2.0.0/24'], rsrc.FnGetAtt('peer_cidrs'))
self.assertEqual('hold', rsrc.FnGetAtt('dpd')['actions'])
self.assertEqual(30, rsrc.FnGetAtt('dpd')['interval'])
self.assertEqual(120, rsrc.FnGetAtt('dpd')['timeout'])
self.assertEqual('secret', rsrc.FnGetAtt('psk'))
self.assertEqual('bi-directional', rsrc.FnGetAtt('initiator'))
self.assertIs(True, rsrc.FnGetAtt('admin_state_up'))
self.assertEqual('ike123', rsrc.FnGetAtt('ikepolicy_id'))
self.assertEqual('ips123', rsrc.FnGetAtt('ipsecpolicy_id'))
self.assertEqual('vpn123', rsrc.FnGetAtt('vpnservice_id'))
self.m.VerifyAll()
def test_attribute_failed(self):
rsrc = self.create_ipsec_site_connection()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
error = self.assertRaises(exception.InvalidTemplateAttribute,
rsrc.FnGetAtt, 'non-existent_property')
self.assertEqual(
'The Referenced Attribute (ipsec_site_connection '
'non-existent_property) is incorrect.',
six.text_type(error))
self.m.VerifyAll()
def test_update(self):
rsrc = self.create_ipsec_site_connection()
neutronclient.Client.update_ipsec_site_connection(
'con123', {'ipsec_site_connection': {'admin_state_up': False}})
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
update_template = copy.deepcopy(rsrc.t)
update_template['Properties']['admin_state_up'] = False
scheduler.TaskRunner(rsrc.update, update_template)()
self.m.VerifyAll()
class IKEPolicyTest(HeatTestCase):
IKE_POLICY_CONF = {
'ikepolicy': {
'name': 'IKEPolicy',
'description': 'My new IKE policy',
'auth_algorithm': 'sha1',
'encryption_algorithm': '3des',
'phase1_negotiation_mode': 'main',
'lifetime': {
'units': 'seconds',
'value': 3600
},
'pfs': 'group5',
'ike_version': 'v1'
}
}
def setUp(self):
super(IKEPolicyTest, self).setUp()
self.m.StubOutWithMock(neutronclient.Client, 'create_ikepolicy')
self.m.StubOutWithMock(neutronclient.Client, 'delete_ikepolicy')
self.m.StubOutWithMock(neutronclient.Client, 'show_ikepolicy')
self.m.StubOutWithMock(neutronclient.Client, 'update_ikepolicy')
self.stub_keystoneclient()
def create_ikepolicy(self):
neutronclient.Client.create_ikepolicy(
self.IKE_POLICY_CONF).AndReturn(
{'ikepolicy': {'id': 'ike123'}})
snippet = template_format.parse(ikepolicy_template)
self.stack = utils.parse_stack(snippet)
resource_defns = self.stack.t.resource_definitions(self.stack)
return vpnservice.IKEPolicy('ikepolicy',
resource_defns['IKEPolicy'],
self.stack)
def test_create(self):
rsrc = self.create_ikepolicy()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
def test_create_failed(self):
neutronclient.Client.create_ikepolicy(
self.IKE_POLICY_CONF).AndRaise(
vpnservice.NeutronClientException())
self.m.ReplayAll()
snippet = template_format.parse(ikepolicy_template)
self.stack = utils.parse_stack(snippet)
resource_defns = self.stack.t.resource_definitions(self.stack)
rsrc = vpnservice.IKEPolicy(
'ikepolicy',
resource_defns['IKEPolicy'],
self.stack)
error = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(rsrc.create))
self.assertEqual(
'NeutronClientException: An unknown exception occurred.',
six.text_type(error))
self.assertEqual((rsrc.CREATE, rsrc.FAILED), rsrc.state)
self.m.VerifyAll()
def test_delete(self):
neutronclient.Client.delete_ikepolicy('ike123')
neutronclient.Client.show_ikepolicy('ike123').AndRaise(
vpnservice.NeutronClientException(status_code=404))
rsrc = self.create_ikepolicy()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
scheduler.TaskRunner(rsrc.delete)()
self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
def test_delete_already_gone(self):
neutronclient.Client.delete_ikepolicy('ike123').AndRaise(
vpnservice.NeutronClientException(status_code=404))
rsrc = self.create_ikepolicy()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
scheduler.TaskRunner(rsrc.delete)()
self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
def test_delete_failed(self):
neutronclient.Client.delete_ikepolicy('ike123').AndRaise(
vpnservice.NeutronClientException(status_code=400))
rsrc = self.create_ikepolicy()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
error = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(rsrc.delete))
self.assertEqual(
'NeutronClientException: An unknown exception occurred.',
six.text_type(error))
self.assertEqual((rsrc.DELETE, rsrc.FAILED), rsrc.state)
self.m.VerifyAll()
def test_attribute(self):
rsrc = self.create_ikepolicy()
neutronclient.Client.show_ikepolicy(
'ike123').MultipleTimes().AndReturn(self.IKE_POLICY_CONF)
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
self.assertEqual('IKEPolicy', rsrc.FnGetAtt('name'))
self.assertEqual('My new IKE policy', rsrc.FnGetAtt('description'))
self.assertEqual('sha1', rsrc.FnGetAtt('auth_algorithm'))
self.assertEqual('3des', rsrc.FnGetAtt('encryption_algorithm'))
self.assertEqual('main', rsrc.FnGetAtt('phase1_negotiation_mode'))
self.assertEqual('seconds', rsrc.FnGetAtt('lifetime')['units'])
self.assertEqual(3600, rsrc.FnGetAtt('lifetime')['value'])
self.assertEqual('group5', rsrc.FnGetAtt('pfs'))
self.assertEqual('v1', rsrc.FnGetAtt('ike_version'))
self.m.VerifyAll()
def test_attribute_failed(self):
rsrc = self.create_ikepolicy()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
error = self.assertRaises(exception.InvalidTemplateAttribute,
rsrc.FnGetAtt, 'non-existent_property')
self.assertEqual(
'The Referenced Attribute (ikepolicy non-existent_property) is '
'incorrect.',
six.text_type(error))
self.m.VerifyAll()
def test_update(self):
rsrc = self.create_ikepolicy()
neutronclient.Client.update_ikepolicy('ike123',
{'ikepolicy': {
'name': 'New IKEPolicy'}})
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
update_template = copy.deepcopy(rsrc.t)
update_template['Properties']['name'] = 'New IKEPolicy'
scheduler.TaskRunner(rsrc.update, update_template)()
self.m.VerifyAll()
class IPsecPolicyTest(HeatTestCase):
IPSEC_POLICY_CONF = {
'ipsecpolicy': {
'name': 'IPsecPolicy',
'description': 'My new IPsec policy',
'transform_protocol': 'esp',
'encapsulation_mode': 'tunnel',
'auth_algorithm': 'sha1',
'encryption_algorithm': '3des',
'lifetime': {
'units': 'seconds',
'value': 3600
},
'pfs': 'group5'
}
}
def setUp(self):
super(IPsecPolicyTest, self).setUp()
self.m.StubOutWithMock(neutronclient.Client, 'create_ipsecpolicy')
self.m.StubOutWithMock(neutronclient.Client, 'delete_ipsecpolicy')
self.m.StubOutWithMock(neutronclient.Client, 'show_ipsecpolicy')
self.m.StubOutWithMock(neutronclient.Client, 'update_ipsecpolicy')
self.stub_keystoneclient()
def create_ipsecpolicy(self):
neutronclient.Client.create_ipsecpolicy(
self.IPSEC_POLICY_CONF).AndReturn(
{'ipsecpolicy': {'id': 'ips123'}})
snippet = template_format.parse(ipsecpolicy_template)
self.stack = utils.parse_stack(snippet)
resource_defns = self.stack.t.resource_definitions(self.stack)
return vpnservice.IPsecPolicy('ipsecpolicy',
resource_defns['IPsecPolicy'],
self.stack)
def test_create(self):
rsrc = self.create_ipsecpolicy()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
def test_create_failed(self):
neutronclient.Client.create_ipsecpolicy(
self.IPSEC_POLICY_CONF).AndRaise(
vpnservice.NeutronClientException())
self.m.ReplayAll()
snippet = template_format.parse(ipsecpolicy_template)
self.stack = utils.parse_stack(snippet)
resource_defns = self.stack.t.resource_definitions(self.stack)
rsrc = vpnservice.IPsecPolicy(
'ipsecpolicy',
resource_defns['IPsecPolicy'],
self.stack)
error = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(rsrc.create))
self.assertEqual(
'NeutronClientException: An unknown exception occurred.',
six.text_type(error))
self.assertEqual((rsrc.CREATE, rsrc.FAILED), rsrc.state)
self.m.VerifyAll()
def test_delete(self):
neutronclient.Client.delete_ipsecpolicy('ips123')
neutronclient.Client.show_ipsecpolicy('ips123').AndRaise(
vpnservice.NeutronClientException(status_code=404))
rsrc = self.create_ipsecpolicy()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
scheduler.TaskRunner(rsrc.delete)()
self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
def test_delete_already_gone(self):
neutronclient.Client.delete_ipsecpolicy('ips123').AndRaise(
vpnservice.NeutronClientException(status_code=404))
rsrc = self.create_ipsecpolicy()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
scheduler.TaskRunner(rsrc.delete)()
self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
def test_delete_failed(self):
neutronclient.Client.delete_ipsecpolicy('ips123').AndRaise(
vpnservice.NeutronClientException(status_code=400))
rsrc = self.create_ipsecpolicy()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
error = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(rsrc.delete))
self.assertEqual(
'NeutronClientException: An unknown exception occurred.',
six.text_type(error))
self.assertEqual((rsrc.DELETE, rsrc.FAILED), rsrc.state)
self.m.VerifyAll()
def test_attribute(self):
rsrc = self.create_ipsecpolicy()
neutronclient.Client.show_ipsecpolicy(
'ips123').MultipleTimes().AndReturn(self.IPSEC_POLICY_CONF)
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
self.assertEqual('IPsecPolicy', rsrc.FnGetAtt('name'))
self.assertEqual('My new IPsec policy', rsrc.FnGetAtt('description'))
self.assertEqual('esp', rsrc.FnGetAtt('transform_protocol'))
self.assertEqual('tunnel', rsrc.FnGetAtt('encapsulation_mode'))
self.assertEqual('sha1', rsrc.FnGetAtt('auth_algorithm'))
self.assertEqual('3des', rsrc.FnGetAtt('encryption_algorithm'))
self.assertEqual('seconds', rsrc.FnGetAtt('lifetime')['units'])
self.assertEqual(3600, rsrc.FnGetAtt('lifetime')['value'])
self.assertEqual('group5', rsrc.FnGetAtt('pfs'))
self.m.VerifyAll()
def test_attribute_failed(self):
rsrc = self.create_ipsecpolicy()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
error = self.assertRaises(exception.InvalidTemplateAttribute,
rsrc.FnGetAtt, 'non-existent_property')
self.assertEqual(
'The Referenced Attribute (ipsecpolicy non-existent_property) is '
'incorrect.',
six.text_type(error))
self.m.VerifyAll()
def test_update(self):
rsrc = self.create_ipsecpolicy()
neutronclient.Client.update_ipsecpolicy(
'ips123',
{'ipsecpolicy': {'name': 'New IPsecPolicy'}})
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
update_template = copy.deepcopy(rsrc.t)
update_template['Properties']['name'] = 'New IPsecPolicy'
scheduler.TaskRunner(rsrc.update, update_template)()
self.m.VerifyAll()