Fix [H302] errors in heat/tests

Change-Id: Ia7527c6704e61d854be8e847e2401f33fb48d4df
This commit is contained in:
Peter Razumovsky 2014-11-18 18:46:59 +03:00
parent 03b3cf777e
commit 0ef5ce3377
26 changed files with 265 additions and 240 deletions

View File

@ -22,7 +22,7 @@ import heat.api.cfn.v1.stacks as stacks
from heat.common import exception as heat_exception
from heat.common import identifier
from heat.common import policy
from heat.common.wsgi import Request
from heat.common import wsgi
from heat.rpc import api as rpc_api
from heat.rpc import client as rpc_client
from heat.tests import common
@ -66,7 +66,7 @@ class CfnStackControllerTest(common.HeatTestCase):
params = params or {}
qs = "&".join(["=".join([k, str(params[k])]) for k in params])
environ = {'REQUEST_METHOD': 'GET', 'QUERY_STRING': qs}
req = Request(environ)
req = wsgi.Request(environ)
req.context = utils.dummy_context()
return req

View File

@ -18,7 +18,7 @@ from oslo.config import cfg
from heat.api.aws import exception
import heat.api.cloudwatch.watch as watches
from heat.common import policy
from heat.common.wsgi import Request
from heat.common import wsgi
from heat.rpc import api as engine_api
from heat.rpc import client as rpc_client
from heat.tests import common
@ -36,7 +36,7 @@ class WatchControllerTest(common.HeatTestCase):
params = params or {}
qs = "&".join(["=".join([k, str(params[k])]) for k in params])
environ = {'REQUEST_METHOD': 'GET', 'QUERY_STRING': qs}
req = Request(environ)
req = wsgi.Request(environ)
req.context = utils.dummy_context()
return req

View File

@ -21,7 +21,7 @@ import requests
from heat.api.aws import ec2token
from heat.api.aws import exception
from heat.common.wsgi import Request
from heat.common import wsgi
from heat.tests import common
@ -40,7 +40,7 @@ class Ec2TokenTest(common.HeatTestCase):
environ = environ or {}
qs = "&".join(["=".join([k, str(params[k])]) for k in params])
environ.update({'REQUEST_METHOD': 'GET', 'QUERY_STRING': qs})
req = Request(environ)
req = wsgi.Request(environ)
return req
def test_conf_get_paste(self):

View File

@ -32,7 +32,7 @@ from heat.common import exception as heat_exc
from heat.common import identifier
from heat.common import policy
from heat.common import urlfetch
from heat.common.wsgi import Request
from heat.common import wsgi
from heat.rpc import api as rpc_api
from heat.rpc import client as rpc_client
from heat.tests import common
@ -253,7 +253,7 @@ class ControllerTest(object):
qs = "&".join(["=".join([k, str(params[k])]) for k in params])
environ['QUERY_STRING'] = qs
req = Request(environ)
req = wsgi.Request(environ)
req.context = utils.dummy_context('api_test_user', self.tenant)
self.context = req.context
return req
@ -272,7 +272,7 @@ class ControllerTest(object):
environ = self._environ(path)
environ['REQUEST_METHOD'] = method
req = Request(environ)
req = wsgi.Request(environ)
req.context = utils.dummy_context('api_test_user', self.tenant)
self.context = req.context
req.body = data

View File

@ -17,14 +17,14 @@ from webob import exc
from heat.api.openstack.v1 import util
from heat.common import context
from heat.common import policy
from heat.common.wsgi import Request
from heat.common import wsgi
from heat.tests import common
class TestGetAllowedParams(common.HeatTestCase):
def setUp(self):
super(TestGetAllowedParams, self).setUp()
req = Request({})
req = wsgi.Request({})
self.params = req.params.copy()
self.params.add('foo', 'foo value')
self.whitelist = {'foo': 'single'}
@ -86,7 +86,7 @@ class TestGetAllowedParams(common.HeatTestCase):
class TestPolicyEnforce(common.HeatTestCase):
def setUp(self):
super(TestPolicyEnforce, self).setUp()
self.req = Request({})
self.req = wsgi.Request({})
self.req.context = context.RequestContext(tenant_id='foo',
is_admin=False)

View File

@ -14,11 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from keystoneclient.exceptions import Unauthorized
from keystoneclient import exceptions as keystone_exc
from keystoneclient.v2_0 import client as keystone_client
import webob
from heat.common.auth_password import KeystonePasswordAuthProtocol
from heat.common import auth_password
from heat.tests import common
EXPECTED_V2_DEFAULT_ENV_RESPONSE = {
@ -78,7 +78,8 @@ class KeystonePasswordAuthProtocolTest(common.HeatTestCase):
self.config = {'auth_uri': 'http://keystone.test.com:5000'}
self.app = FakeApp(
expected_env={'HTTP_X_AUTH_URL': self.config['auth_uri']})
self.middleware = KeystonePasswordAuthProtocol(self.app, self.config)
self.middleware = auth_password.KeystonePasswordAuthProtocol(
self.app, self.config)
def _start_fake_response(self, status, headers):
self.response_status = int(status.split(' ', 1)[0])
@ -106,7 +107,7 @@ class KeystonePasswordAuthProtocolTest(common.HeatTestCase):
mock_client = keystone_client.Client(
username='user_name1', password='badpassword',
tenant_id='tenant_id1', auth_url=self.config['auth_uri'])
mock_client.AndRaise(Unauthorized(401))
mock_client.AndRaise(keystone_exc.Unauthorized(401))
self.m.ReplayAll()
req = webob.Request.blank('/tenant_id1/')
req.headers['X_AUTH_USER'] = 'user_name1'

View File

@ -17,7 +17,7 @@ import six
import mox
from oslo.config import cfg
from testtools.matchers import MatchesRegex
from testtools import matchers
from heat.common import exception
from heat.common import template_format
@ -30,7 +30,7 @@ from heat.engine.resources import loadbalancer as lb
from heat.engine.resources import wait_condition as wc
from heat.tests import common
from heat.tests import utils
from heat.tests.v1_1 import fakes as fakes11
from heat.tests.v1_1 import fakes as fakes_v1_1
asg_tmpl_without_updt_policy = '''
@ -205,7 +205,7 @@ class AutoScalingGroupTest(common.HeatTestCase):
def setUp(self):
super(AutoScalingGroupTest, self).setUp()
self.fc = fakes11.FakeClient()
self.fc = fakes_v1_1.FakeClient()
self.stub_keystoneclient(username='test_stack.CfnLBUser')
cfg.CONF.set_default('heat_waitcondition_server_url',
'http://127.0.0.1:8000/v1/waitcondition')
@ -481,7 +481,8 @@ class AutoScalingGroupTest(common.HeatTestCase):
# test that physical resource name of launch configuration is used
conf = stack['LaunchConfig']
conf_name_pattern = '%s-LaunchConfig-[a-zA-Z0-9]+$' % stack.name
self.assertThat(conf.FnGetRefId(), MatchesRegex(conf_name_pattern))
self.assertThat(conf.FnGetRefId(),
matchers.MatchesRegex(conf_name_pattern))
# get launch conf name here to compare result after update
conf_name = self.get_launch_conf_name(stack, 'WebServerGroup')
@ -746,7 +747,8 @@ class AutoScalingGroupTest(common.HeatTestCase):
# test that physical resource name of launch configuration is used
conf = stack['LaunchConfig']
conf_name_pattern = '%s-LaunchConfig-[a-zA-Z0-9]+$' % stack.name
self.assertThat(conf.FnGetRefId(), MatchesRegex(conf_name_pattern))
self.assertThat(conf.FnGetRefId(),
matchers.MatchesRegex(conf_name_pattern))
# test the number of instances created
nested = stack['WebServerGroup'].nested()

View File

@ -23,7 +23,7 @@ from heat.common import exception
from heat.common import template_format
from heat.engine.clients.os import ceilometer
from heat.engine import parser
from heat.engine.properties import schemata
from heat.engine import properties as props
from heat.engine import resource
from heat.engine.resources.ceilometer import alarm
from heat.engine import rsrc_defn
@ -204,7 +204,7 @@ class CeilometerAlarmTest(common.HeatTestCase):
self.stack = self.create_stack(template=json.dumps(t))
self.m.StubOutWithMock(self.fa.alarms, 'update')
schema = schemata(alarm.CeilometerAlarm.properties_schema)
schema = props.schemata(alarm.CeilometerAlarm.properties_schema)
exns = ['period', 'evaluation_periods', 'threshold',
'statistic', 'comparison_operator', 'meter_name',
'matching_metadata', 'query']
@ -230,8 +230,8 @@ class CeilometerAlarmTest(common.HeatTestCase):
self.stack.create()
rsrc = self.stack['MEMAlarmHigh']
props = copy.copy(rsrc.properties.data)
props.update({
properties = copy.copy(rsrc.properties.data)
properties.update({
'comparison_operator': 'lt',
'description': 'fruity',
'evaluation_periods': '2',
@ -248,7 +248,7 @@ class CeilometerAlarmTest(common.HeatTestCase):
})
snippet = rsrc_defn.ResourceDefinition(rsrc.name,
rsrc.type(),
props)
properties)
scheduler.TaskRunner(rsrc.update, snippet)()
@ -270,11 +270,11 @@ class CeilometerAlarmTest(common.HeatTestCase):
self.stack.create()
rsrc = self.stack['MEMAlarmHigh']
props = copy.copy(rsrc.properties.data)
props['meter_name'] = 'temp'
properties = copy.copy(rsrc.properties.data)
properties['meter_name'] = 'temp'
snippet = rsrc_defn.ResourceDefinition(rsrc.name,
rsrc.type(),
props)
properties)
updater = scheduler.TaskRunner(rsrc.update, snippet)
self.assertRaises(resource.UpdateReplace, updater)

View File

@ -20,12 +20,12 @@ from keystoneclient import exceptions as keystone_exc
from neutronclient.common import exceptions as neutron_exc
from saharaclient.api import base as sahara_base
from swiftclient import exceptions as swift_exc
from troveclient.client import exceptions as trove_exc
from troveclient import client as troveclient
from heatclient import client as heatclient
import mock
from oslo.config import cfg
from testtools.testcase import skip
from testtools import testcase
from heat.engine import clients
from heat.engine.clients import client_plugin
@ -206,7 +206,7 @@ class ClientPluginTest(common.HeatTestCase):
class TestClientPluginsInitialise(common.HeatTestCase):
@skip('skipped until keystone can read context auth_ref')
@testcase.skip('skipped until keystone can read context auth_ref')
def test_create_all_clients(self):
con = mock.Mock()
con.auth_url = "http://auth.example.com:5000/v2.0"
@ -554,7 +554,7 @@ class TestIsNotFound(common.HeatTestCase):
is_client_exception=True,
is_conflict=False,
plugin='trove',
exception=lambda: trove_exc.NotFound(message='gone'),
exception=lambda: troveclient.exceptions.NotFound(message='gone'),
)),
('trove_exception', dict(
is_not_found=False,
@ -570,7 +570,7 @@ class TestIsNotFound(common.HeatTestCase):
is_client_exception=True,
is_conflict=False,
plugin='trove',
exception=lambda: trove_exc.RequestEntityTooLarge(
exception=lambda: troveclient.exceptions.RequestEntityTooLarge(
message='over'),
)),
('trove_conflict', dict(
@ -579,7 +579,7 @@ class TestIsNotFound(common.HeatTestCase):
is_client_exception=True,
is_conflict=True,
plugin='trove',
exception=lambda: trove_exc.Conflict(
exception=lambda: troveclient.exceptions.Conflict(
message='Conflict'),
)),
('sahara_not_found', dict(

View File

@ -14,8 +14,7 @@
import testtools
from heat.engine.dependencies import CircularDependencyException
from heat.engine.dependencies import Dependencies
from heat.engine import dependencies
class dependenciesTest(testtools.TestCase):
@ -23,7 +22,7 @@ class dependenciesTest(testtools.TestCase):
def _dep_test(self, func, checkorder, deps):
nodes = set.union(*[set(e) for e in deps])
d = Dependencies(deps)
d = dependencies.Dependencies(deps)
order = list(func(d))
for n in nodes:
@ -49,22 +48,22 @@ class dependenciesTest(testtools.TestCase):
def test_edges(self):
input_edges = [('1', None), ('2', '3'), ('2', '4')]
dp = Dependencies(input_edges)
dp = dependencies.Dependencies(input_edges)
self.assertEqual(set(input_edges), set(dp.graph().edges()))
def test_repr(self):
dp = Dependencies([('1', None), ('2', '3'), ('2', '4')])
dp = dependencies.Dependencies([('1', None), ('2', '3'), ('2', '4')])
s = "Dependencies([('1', None), ('2', '3'), ('2', '4')])"
self.assertEqual(s, repr(dp))
def test_single_node(self):
d = Dependencies([('only', None)])
d = dependencies.Dependencies([('only', None)])
l = list(iter(d))
self.assertEqual(1, len(l))
self.assertEqual('only', l[0])
def test_disjoint(self):
d = Dependencies([('1', None), ('2', None)])
d = dependencies.Dependencies([('1', None), ('2', None)])
l = list(iter(d))
self.assertEqual(2, len(l))
self.assertIn('1', l)
@ -123,51 +122,60 @@ class dependenciesTest(testtools.TestCase):
('b1', 'first'), ('b2', 'first'))
def test_circular_fwd(self):
d = Dependencies([('first', 'second'),
('second', 'third'),
('third', 'first')])
self.assertRaises(CircularDependencyException, list, iter(d))
d = dependencies.Dependencies([('first', 'second'),
('second', 'third'),
('third', 'first')])
self.assertRaises(dependencies.CircularDependencyException,
list,
iter(d))
def test_circular_rev(self):
d = Dependencies([('first', 'second'),
('second', 'third'),
('third', 'first')])
self.assertRaises(CircularDependencyException, list, reversed(d))
d = dependencies.Dependencies([('first', 'second'),
('second', 'third'),
('third', 'first')])
self.assertRaises(dependencies.CircularDependencyException,
list,
reversed(d))
def test_self_ref(self):
d = Dependencies([('node', 'node')])
self.assertRaises(CircularDependencyException, list, iter(d))
d = dependencies.Dependencies([('node', 'node')])
self.assertRaises(dependencies.CircularDependencyException,
list,
iter(d))
def test_complex_circular_fwd(self):
d = Dependencies([('last', 'e1'), ('last', 'mid1'), ('last', 'mid2'),
('mid1', 'e2'), ('mid1', 'mid3'),
('mid2', 'mid3'),
('mid3', 'e3'),
('e3', 'mid1')])
self.assertRaises(CircularDependencyException, list, iter(d))
d = dependencies.Dependencies([('last', 'e1'), ('last', 'mid1'),
('last', 'mid2'), ('mid1', 'e2'),
('mid1', 'mid3'), ('mid2', 'mid3'),
('mid3', 'e3'), ('e3', 'mid1')])
self.assertRaises(dependencies.CircularDependencyException,
list,
iter(d))
def test_complex_circular_rev(self):
d = Dependencies([('last', 'e1'), ('last', 'mid1'), ('last', 'mid2'),
('mid1', 'e2'), ('mid1', 'mid3'),
('mid2', 'mid3'),
('mid3', 'e3'),
('e3', 'mid1')])
self.assertRaises(CircularDependencyException, list, reversed(d))
d = dependencies.Dependencies([('last', 'e1'), ('last', 'mid1'),
('last', 'mid2'), ('mid1', 'e2'),
('mid1', 'mid3'), ('mid2', 'mid3'),
('mid3', 'e3'), ('e3', 'mid1')])
self.assertRaises(dependencies.CircularDependencyException,
list,
reversed(d))
def test_noexist_partial(self):
d = Dependencies([('foo', 'bar')])
d = dependencies.Dependencies([('foo', 'bar')])
get = lambda i: d[i]
self.assertRaises(KeyError, get, 'baz')
def test_single_partial(self):
d = Dependencies([('last', 'first')])
d = dependencies.Dependencies([('last', 'first')])
p = d['last']
l = list(iter(p))
self.assertEqual(1, len(l))
self.assertEqual('last', l[0])
def test_simple_partial(self):
d = Dependencies([('last', 'middle'), ('middle', 'first')])
d = dependencies.Dependencies([('last', 'middle'),
('middle', 'first')])
p = d['middle']
order = list(iter(p))
self.assertEqual(2, len(order))
@ -177,9 +185,9 @@ class dependenciesTest(testtools.TestCase):
self.assertTrue(order.index('last') > order.index('middle'))
def test_simple_multilevel_partial(self):
d = Dependencies([('last', 'middle'),
('middle', 'target'),
('target', 'first')])
d = dependencies.Dependencies([('last', 'middle'),
('middle', 'target'),
('target', 'first')])
p = d['target']
order = list(iter(p))
self.assertEqual(3, len(order))
@ -188,10 +196,10 @@ class dependenciesTest(testtools.TestCase):
"'%s' not found in dependency order" % n)
def test_complex_partial(self):
d = Dependencies([('last', 'e1'), ('last', 'mid1'), ('last', 'mid2'),
('mid1', 'e2'), ('mid1', 'mid3'),
('mid2', 'mid3'),
('mid3', 'e3')])
d = dependencies.Dependencies([('last', 'e1'), ('last', 'mid1'),
('last', 'mid2'), ('mid1', 'e2'),
('mid1', 'mid3'), ('mid2', 'mid3'),
('mid3', 'e3')])
p = d['mid3']
order = list(iter(p))
self.assertEqual(4, len(order))
@ -200,10 +208,10 @@ class dependenciesTest(testtools.TestCase):
"'%s' not found in dependency order" % n)
def test_required_by(self):
d = Dependencies([('last', 'e1'), ('last', 'mid1'), ('last', 'mid2'),
('mid1', 'e2'), ('mid1', 'mid3'),
('mid2', 'mid3'),
('mid3', 'e3')])
d = dependencies.Dependencies([('last', 'e1'), ('last', 'mid1'),
('last', 'mid2'), ('mid1', 'e2'),
('mid1', 'mid3'), ('mid2', 'mid3'),
('mid3', 'e3')])
self.assertEqual(0, len(list(d.required_by('last'))))

View File

@ -11,17 +11,17 @@
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
import datetime as dt
import uuid
import json
import mock
import six
from heat.common.identifier import EventIdentifier
from heat.common import identifier
from heat.common import template_format
from heat.engine import api
from heat.engine.event import Event
from heat.engine import event
from heat.engine import parameters
from heat.engine import parser
from heat.engine import resource
@ -30,6 +30,8 @@ from heat.tests import common
from heat.tests import generic_resource as generic_rsrc
from heat.tests import utils
datetime = dt.datetime
class FormatTest(common.HeatTestCase):
def setUp(self):
@ -53,11 +55,12 @@ class FormatTest(common.HeatTestCase):
def _dummy_event(self, event_id):
resource = self.stack['generic1']
return Event(utils.dummy_context(), self.stack, 'CREATE', 'COMPLETE',
'state changed', 'z3455xyc-9f88-404d-a85b-5315293e67de',
resource.properties, resource.name, resource.type(),
uuid='abc123yc-9f88-404d-a85b-531529456xyz',
id=event_id)
return event.Event(utils.dummy_context(), self.stack, 'CREATE',
'COMPLETE', 'state changed',
'z3455xyc-9f88-404d-a85b-5315293e67de',
resource.properties, resource.name, resource.type(),
uuid='abc123yc-9f88-404d-a85b-531529456xyz',
id=event_id)
def test_format_stack_resource(self):
res = self.stack['generic1']
@ -194,10 +197,11 @@ class FormatTest(common.HeatTestCase):
self.assertEqual(event_keys, set(formatted.keys()))
event_id_formatted = formatted[rpc_api.EVENT_ID]
event_identifier = EventIdentifier(event_id_formatted['tenant'],
event_id_formatted['stack_name'],
event_id_formatted['stack_id'],
event_id_formatted['path'])
event_identifier = identifier.EventIdentifier(
event_id_formatted['tenant'],
event_id_formatted['stack_name'],
event_id_formatted['stack_id'],
event_id_formatted['path'])
self.assertEqual(event_id, event_identifier.event_id)
@mock.patch.object(api, 'format_stack_resource')

View File

@ -15,7 +15,7 @@ import copy
import json
import mox
from testtools.matchers import MatchesRegex
from testtools import matchers
from heat.common import exception
from heat.common import template_format
@ -393,7 +393,8 @@ class InstanceGroupTest(common.HeatTestCase):
# test that physical resource name of launch configuration is used
conf = stack['JobServerConfig']
conf_name_pattern = '%s-JobServerConfig-[a-zA-Z0-9]+$' % stack.name
self.assertThat(conf.FnGetRefId(), MatchesRegex(conf_name_pattern))
self.assertThat(conf.FnGetRefId(),
matchers.MatchesRegex(conf_name_pattern))
# get launch conf name here to compare result after update
conf_name = self.get_launch_conf_name(stack, 'JobServerGroup')
@ -628,7 +629,8 @@ class InstanceGroupTest(common.HeatTestCase):
# test that physical resource name of launch configuration is used
conf = stack['JobServerConfig']
conf_name_pattern = '%s-JobServerConfig-[a-zA-Z0-9]+$' % stack.name
self.assertThat(conf.FnGetRefId(), MatchesRegex(conf_name_pattern))
self.assertThat(conf.FnGetRefId(),
matchers.MatchesRegex(conf_name_pattern))
# test the number of instances created
nested = stack['JobServerGroup'].nested()

View File

@ -27,7 +27,7 @@ from heat.engine.clients.os import neutron
from heat.engine import properties
from heat.engine import resource
from heat.engine.resources.neutron import net
from heat.engine.resources.neutron.neutron import NeutronResource as qr
from heat.engine.resources.neutron import neutron as nr
from heat.engine.resources.neutron import provider_net
from heat.engine.resources.neutron import router
from heat.engine.resources.neutron import subnet
@ -507,68 +507,68 @@ class NeutronTest(common.HeatTestCase):
data = {'admin_state_up': False,
'value_specs': vs}
p = properties.Properties(net.Net.properties_schema, data)
self.assertIsNone(qr.validate_properties(p))
self.assertIsNone(nr.NeutronResource.validate_properties(p))
vs['shared'] = True
self.assertEqual('shared not allowed in value_specs',
qr.validate_properties(p))
nr.NeutronResource.validate_properties(p))
vs.pop('shared')
vs['name'] = 'foo'
self.assertEqual('name not allowed in value_specs',
qr.validate_properties(p))
nr.NeutronResource.validate_properties(p))
vs.pop('name')
vs['tenant_id'] = '1234'
self.assertEqual('tenant_id not allowed in value_specs',
qr.validate_properties(p))
nr.NeutronResource.validate_properties(p))
vs.pop('tenant_id')
vs['foo'] = '1234'
self.assertIsNone(qr.validate_properties(p))
self.assertIsNone(nr.NeutronResource.validate_properties(p))
def test_validate_depr_properties_required(self):
data = {'network_id': '1234',
'network': 'abc'}
p = properties.Properties(subnet.Subnet.properties_schema, data)
self.assertRaises(exception.ResourcePropertyConflict,
qr._validate_depr_property_required,
nr.NeutronResource._validate_depr_property_required,
p, 'network', 'network_id')
data = {}
p = properties.Properties(subnet.Subnet.properties_schema, data)
self.assertRaises(exception.StackValidationFailed,
qr._validate_depr_property_required,
nr.NeutronResource._validate_depr_property_required,
p, 'network', 'network_id')
def test_prepare_properties(self):
data = {'admin_state_up': False,
'value_specs': {'router:external': True}}
p = properties.Properties(net.Net.properties_schema, data)
props = qr.prepare_properties(p, 'resource_name')
props = nr.NeutronResource.prepare_properties(p, 'resource_name')
self.assertEqual({'name': 'resource_name',
'router:external': True,
'admin_state_up': False,
'shared': False}, props)
def test_is_built(self):
self.assertTrue(qr.is_built({'status': 'ACTIVE'}))
self.assertTrue(qr.is_built({'status': 'DOWN'}))
self.assertFalse(qr.is_built({'status': 'BUILD'}))
e = self.assertRaises(resource.ResourceInError, qr.is_built, {
'status': 'ERROR'
})
self.assertTrue(nr.NeutronResource.is_built({'status': 'ACTIVE'}))
self.assertTrue(nr.NeutronResource.is_built({'status': 'DOWN'}))
self.assertFalse(nr.NeutronResource.is_built({'status': 'BUILD'}))
e = self.assertRaises(
resource.ResourceInError,
nr.NeutronResource.is_built, {'status': 'ERROR'})
self.assertEqual(
'Went to status ERROR due to "Unknown"',
six.text_type(e))
e = self.assertRaises(resource.ResourceUnknownStatus, qr.is_built, {
'status': 'FROBULATING'
})
e = self.assertRaises(
resource.ResourceUnknownStatus,
nr.NeutronResource.is_built, {'status': 'FROBULATING'})
self.assertEqual(
'Resource is not built - Unknown status FROBULATING',
six.text_type(e))
def test_resolve_attribute(self):
class SomeNeutronResource(qr):
class SomeNeutronResource(nr.NeutronResource):
properties_schema = {}
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
@ -593,7 +593,9 @@ class NeutronTest(common.HeatTestCase):
security_groups = ['b62c3079-6946-44f5-a67b-6b9091884d4f',
'9887157c-d092-40f5-b547-6361915fce7d']
self.assertEqual(security_groups,
qr.get_secgroup_uuids(security_groups, None, None))
nr.NeutronResource.get_secgroup_uuids(security_groups,
None,
None))
# test get_secgroup_uuids with name
secgroups = ['security_group_1']
expected_groups = ['0389f747-7785-4757-b7bb-2ab07e4b09c3']
@ -616,8 +618,9 @@ class NeutronTest(common.HeatTestCase):
fake_groups_list)
self.m.ReplayAll()
self.assertEqual(expected_groups,
qr.get_secgroup_uuids(secgroups, nclient,
ctx.tenant_id))
nr.NeutronResource.get_secgroup_uuids(secgroups,
nclient,
ctx.tenant_id))
self.m.VerifyAll()
self.m.UnsetStubs()
# test there are two securityGroups with same name, but there is
@ -645,8 +648,9 @@ class NeutronTest(common.HeatTestCase):
fake_groups_list)
self.m.ReplayAll()
self.assertEqual(expected_groups,
qr.get_secgroup_uuids(secgroups, nclient,
ctx.tenant_id))
nr.NeutronResource.get_secgroup_uuids(secgroups,
nclient,
ctx.tenant_id))
self.m.VerifyAll()
self.m.UnsetStubs()
# test there are two securityGroups with same name, and the two
@ -673,7 +677,7 @@ class NeutronTest(common.HeatTestCase):
neutronclient.Client.list_security_groups().AndReturn(fake_groups_list)
self.m.ReplayAll()
self.assertRaises(exception.PhysicalResourceNameAmbiguity,
qr.get_secgroup_uuids,
nr.NeutronResource.get_secgroup_uuids,
secgroups, nclient, ctx.tenant_id)
self.m.VerifyAll()
self.m.UnsetStubs()

View File

@ -15,7 +15,6 @@
# limitations under the License.
import mox
from mox import IgnoreArg
from neutronclient.common import exceptions as qe
from neutronclient.neutron import v2_0 as neutronV20
from neutronclient.v2_0 import client as neutronclient
@ -441,7 +440,8 @@ class NeutronNetworkGatewayTest(common.HeatTestCase):
'segmentation_id': 10}]
})
prop_diff = {'name': u'NetworkGatewayUpdate'}
self.assertIsNone(rsrc.handle_update(snippet_for_update, IgnoreArg(),
self.assertIsNone(rsrc.handle_update(snippet_for_update,
mox.IgnoreArg(),
prop_diff))
# update connections
@ -464,11 +464,13 @@ class NeutronNetworkGatewayTest(common.HeatTestCase):
'segmentation_type': u'flat',
'segmentation_id': 0}]
}
self.assertIsNone(rsrc.handle_update(snippet_for_update, IgnoreArg(),
self.assertIsNone(rsrc.handle_update(snippet_for_update,
mox.IgnoreArg(),
prop_diff))
# update connections once more
self.assertIsNone(rsrc.handle_update(snippet_for_update, IgnoreArg(),
self.assertIsNone(rsrc.handle_update(snippet_for_update,
mox.IgnoreArg(),
prop_diff))
# update devices
@ -490,7 +492,8 @@ class NeutronNetworkGatewayTest(common.HeatTestCase):
'id': u'e52148ca-7db9-4ec3-abe6-2c7c0ff316eb',
'interface_name': u'breth2'}]
}
self.assertIsNone(rsrc.handle_update(snippet_for_update, IgnoreArg(),
self.assertIsNone(rsrc.handle_update(snippet_for_update,
mox.IgnoreArg(),
prop_diff))
self.m.VerifyAll()

View File

@ -11,7 +11,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutronclient.common.exceptions import NeutronClientException
from neutronclient.common import exceptions as neutron_exc
from neutronclient.v2_0 import client as neutronclient
from novaclient.v1_1 import security_group_rules as nova_sgr
from novaclient.v1_1 import security_groups as nova_sg
@ -569,7 +569,7 @@ Resources:
'security_group_id': 'aaaa'
}
}).AndRaise(
NeutronClientException(status_code=409))
neutron_exc.NeutronClientException(status_code=409))
neutronclient.Client.create_security_group_rule({
'security_group_rule': {
'direction': 'ingress',
@ -582,7 +582,7 @@ Resources:
'security_group_id': 'aaaa'
}
}).AndRaise(
NeutronClientException(status_code=409))
neutron_exc.NeutronClientException(status_code=409))
neutronclient.Client.create_security_group_rule({
'security_group_rule': {
'direction': 'ingress',
@ -595,7 +595,7 @@ Resources:
'security_group_id': 'aaaa'
}
}).AndRaise(
NeutronClientException(status_code=409))
neutron_exc.NeutronClientException(status_code=409))
neutronclient.Client.show_security_group('aaaa').AndReturn({
'security_group': {
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
@ -617,7 +617,7 @@ Resources:
'security_group_id': 'aaaa'
}
}).AndRaise(
NeutronClientException(status_code=409))
neutron_exc.NeutronClientException(status_code=409))
neutronclient.Client.create_security_group_rule({
'security_group_rule': {
'direction': 'egress',
@ -630,7 +630,7 @@ Resources:
'security_group_id': 'aaaa'
}
}).AndRaise(
NeutronClientException(status_code=409))
neutron_exc.NeutronClientException(status_code=409))
neutronclient.Client.create_security_group_rule({
'security_group_rule': {
'direction': 'egress',
@ -643,7 +643,7 @@ Resources:
'security_group_id': 'aaaa'
}
}).AndRaise(
NeutronClientException(status_code=409))
neutron_exc.NeutronClientException(status_code=409))
# delete script
neutronclient.Client.show_security_group('aaaa').AndReturn({
@ -720,22 +720,22 @@ Resources:
}],
'id': 'aaaa'}})
neutronclient.Client.delete_security_group_rule('bbbb').AndRaise(
NeutronClientException(status_code=404))
neutron_exc.NeutronClientException(status_code=404))
neutronclient.Client.delete_security_group_rule('cccc').AndRaise(
NeutronClientException(status_code=404))
neutron_exc.NeutronClientException(status_code=404))
neutronclient.Client.delete_security_group_rule('dddd').AndRaise(
NeutronClientException(status_code=404))
neutron_exc.NeutronClientException(status_code=404))
neutronclient.Client.delete_security_group_rule('eeee').AndRaise(
NeutronClientException(status_code=404))
neutron_exc.NeutronClientException(status_code=404))
neutronclient.Client.delete_security_group_rule('ffff').AndRaise(
NeutronClientException(status_code=404))
neutron_exc.NeutronClientException(status_code=404))
neutronclient.Client.delete_security_group_rule('gggg').AndRaise(
NeutronClientException(status_code=404))
neutron_exc.NeutronClientException(status_code=404))
neutronclient.Client.delete_security_group('aaaa').AndRaise(
NeutronClientException(status_code=404))
neutron_exc.NeutronClientException(status_code=404))
neutronclient.Client.show_security_group('aaaa').AndRaise(
NeutronClientException(status_code=404))
neutron_exc.NeutronClientException(status_code=404))
self.m.ReplayAll()
stack = self.create_stack(self.test_template)

View File

@ -32,7 +32,7 @@ from heat.engine import stack_resource
from heat.tests import common
from heat.tests import generic_resource
# reuse the same template than autoscaling tests
from heat.tests.test_autoscaling import as_template
from heat.tests import test_autoscaling as test_as
from heat.tests import utils
@ -150,7 +150,7 @@ class ScaleNotificationTest(common.HeatTestCase):
env = environment.Environment()
env.load({u'parameters':
{u'KeyName': 'foo', 'ImageId': 'cloudimage'}})
t = template_format.parse(as_template)
t = template_format.parse(test_as.as_template)
template = parser.Template(t)
self.stack_name = utils.random_name()
stack = parser.Stack(self.ctx, self.stack_name, template,

View File

@ -18,8 +18,7 @@ import six
from heat.common import exception as heat_ex
from heat.common import template_format
from heat.engine.clients.os import nova
from heat.engine.resources.nova_floatingip import NovaFloatingIp
from heat.engine.resources.nova_floatingip import NovaFloatingIpAssociation
from heat.engine.resources import nova_floatingip
from heat.engine import rsrc_defn
from heat.engine import scheduler
from heat.tests import common
@ -92,7 +91,9 @@ class NovaFloatingIPTest(common.HeatTestCase):
stack = utils.parse_stack(template)
floating_ip = stack.t.resource_definitions(stack)['MyFloatingIP']
return NovaFloatingIp('MyFloatingIP', floating_ip, stack)
return nova_floatingip.NovaFloatingIp('MyFloatingIP',
floating_ip,
stack)
def prepare_floating_ip_assoc(self):
nova.NovaClientPlugin._create().AndReturn(
@ -111,8 +112,8 @@ class NovaFloatingIPTest(common.HeatTestCase):
resource_defns = stack.t.resource_definitions(stack)
floating_ip_assoc = resource_defns['MyFloatingIPAssociation']
return NovaFloatingIpAssociation('MyFloatingIPAssociation',
floating_ip_assoc, stack)
return nova_floatingip.NovaFloatingIpAssociation(
'MyFloatingIPAssociation', floating_ip_assoc, stack)
def test_floating_ip_create(self):
rsrc = self.prepare_floating_ip()

View File

@ -18,7 +18,7 @@ import time
from keystoneclient import exceptions as kc_exceptions
import mock
from mox import IgnoreArg
import mox
from oslo.config import cfg
import six
import warnings
@ -29,7 +29,6 @@ from heat.common import heat_keystoneclient as hkc
from heat.common import template_format
from heat.common import urlfetch
import heat.db.api as db_api
import heat.engine.cfn.functions
from heat.engine.cfn import functions as cfn_funcs
from heat.engine.cfn import template as cfn_t
from heat.engine.clients.os import keystone
@ -44,10 +43,10 @@ from heat.engine import rsrc_defn
from heat.engine import scheduler
from heat.engine import template
from heat.tests import common
from heat.tests.fakes import FakeKeystoneClient
from heat.tests import fakes
from heat.tests import generic_resource as generic_rsrc
from heat.tests import utils
from heat.tests.v1_1 import fakes
from heat.tests.v1_1 import fakes as fakes_v1_1
def join(raw):
@ -381,7 +380,7 @@ Mappings:
p_snippet = {"Ref": "baz"}
parsed = tmpl.parse(stack, p_snippet)
self.assertTrue(isinstance(parsed, heat.engine.cfn.functions.ParamRef))
self.assertTrue(isinstance(parsed, cfn_funcs.ParamRef))
def test_select_from_list(self):
tmpl = parser.Template(empty_template)
@ -471,7 +470,7 @@ Mappings:
stack = parser.Stack(self.ctx, 'test_stack',
parser.Template(empty_template))
self.m.StubOutWithMock(nova.NovaClientPlugin, '_create')
fc = fakes.FakeClient()
fc = fakes_v1_1.FakeClient()
nova.NovaClientPlugin._create().AndReturn(fc)
self.m.ReplayAll()
self.assertEqual(["nova1"], self.resolve(snippet, tmpl, stack))
@ -1140,12 +1139,12 @@ class StackTest(common.HeatTestCase):
stack.timeout, True, stack.disable_rollback,
'parent', owner_id=None,
stack_user_project_id=None,
created_time=IgnoreArg(),
created_time=mox.IgnoreArg(),
updated_time=None,
user_creds_id=stack.user_creds_id,
tenant_id='test_tenant_id',
use_stored_context=False,
username=IgnoreArg())
username=mox.IgnoreArg())
self.m.ReplayAll()
parser.Stack.load(self.ctx, stack_id=self.stack.id,
@ -1452,7 +1451,7 @@ class StackTest(common.HeatTestCase):
trustor_ctx = utils.dummy_context(user_id='thetrustor')
self.m.StubOutWithMock(hkc, 'KeystoneClient')
hkc.KeystoneClient(trustor_ctx).AndReturn(
FakeKeystoneClient(user_id='thetrustor'))
fakes.FakeKeystoneClient(user_id='thetrustor'))
self.m.ReplayAll()
self.stack = parser.Stack(
@ -1485,11 +1484,11 @@ class StackTest(common.HeatTestCase):
self.m.StubOutWithMock(hkc, 'KeystoneClient')
hkc.KeystoneClient(trustor_ctx).AndReturn(
FakeKeystoneClient(user_id='thetrustor'))
fakes.FakeKeystoneClient(user_id='thetrustor'))
self.m.StubOutWithMock(parser.Stack, 'stored_context')
parser.Stack.stored_context().AndReturn(stored_ctx)
hkc.KeystoneClient(stored_ctx).AndReturn(
FakeKeystoneClient(user_id='nottrustor'))
fakes.FakeKeystoneClient(user_id='nottrustor'))
self.m.ReplayAll()
self.stack = parser.Stack(
@ -1515,7 +1514,7 @@ class StackTest(common.HeatTestCase):
def test_delete_trust_backup(self):
cfg.CONF.set_override('deferred_auth_method', 'trusts')
class FakeKeystoneClientFail(FakeKeystoneClient):
class FakeKeystoneClientFail(fakes.FakeKeystoneClient):
def delete_trust(self, trust_id):
raise Exception("Shouldn't delete")
@ -1541,7 +1540,7 @@ class StackTest(common.HeatTestCase):
def test_delete_trust_nested(self):
cfg.CONF.set_override('deferred_auth_method', 'trusts')
class FakeKeystoneClientFail(FakeKeystoneClient):
class FakeKeystoneClientFail(fakes.FakeKeystoneClient):
def delete_trust(self, trust_id):
raise Exception("Shouldn't delete")
@ -1571,7 +1570,7 @@ class StackTest(common.HeatTestCase):
def test_delete_trust_fail(self):
cfg.CONF.set_override('deferred_auth_method', 'trusts')
class FakeKeystoneClientFail(FakeKeystoneClient):
class FakeKeystoneClientFail(fakes.FakeKeystoneClient):
def delete_trust(self, trust_id):
raise kc_exceptions.Forbidden("Denied!")
@ -1596,7 +1595,7 @@ class StackTest(common.HeatTestCase):
self.assertIn('Error deleting trust', self.stack.status_reason)
def test_delete_deletes_project(self):
fkc = FakeKeystoneClient()
fkc = fakes.FakeKeystoneClient()
fkc.delete_stack_domain_project = mock.Mock()
self.m.StubOutWithMock(keystone.KeystoneClientPlugin, '_create')
@ -1622,7 +1621,7 @@ class StackTest(common.HeatTestCase):
project_id='aproject456')
def test_abandon_nodelete_project(self):
fkc = FakeKeystoneClient()
fkc = fakes.FakeKeystoneClient()
fkc.delete_stack_domain_project = mock.Mock()
self.m.StubOutWithMock(keystone.KeystoneClientPlugin, '_create')
@ -2205,10 +2204,12 @@ class StackTest(common.HeatTestCase):
self.m.StubOutWithMock(generic_rsrc.ResWithComplexPropsAndAttrs,
'handle_update')
generic_rsrc.ResWithComplexPropsAndAttrs.handle_update(
IgnoreArg(), IgnoreArg(), prop_diff1)
generic_rsrc.ResWithComplexPropsAndAttrs.handle_update(
IgnoreArg(), IgnoreArg(), prop_diff2)
generic_rsrc.ResWithComplexPropsAndAttrs.handle_update(mox.IgnoreArg(),
mox.IgnoreArg(),
prop_diff1)
generic_rsrc.ResWithComplexPropsAndAttrs.handle_update(mox.IgnoreArg(),
mox.IgnoreArg(),
prop_diff2)
self.m.ReplayAll()
@ -3850,7 +3851,7 @@ class StackTest(common.HeatTestCase):
self.m.StubOutWithMock(keystone.KeystoneClientPlugin, '_create')
keystone.KeystoneClientPlugin._create().AndReturn(
FakeKeystoneClient(user_id='auser123'))
fakes.FakeKeystoneClient(user_id='auser123'))
self.m.ReplayAll()
self.stack = parser.Stack(
@ -4039,7 +4040,7 @@ class StackTest(common.HeatTestCase):
def test_stack_user_project_id_delete_fail(self):
class FakeKeystoneClientFail(FakeKeystoneClient):
class FakeKeystoneClientFail(fakes.FakeKeystoneClient):
def delete_stack_domain_project(self, project_id):
raise kc_exceptions.Forbidden("Denied!")

View File

@ -14,13 +14,12 @@
import re
import six
from testtools.matchers import HasLength
from testtools.matchers import MatchesRegex
from testtools import matchers
from heat.common import exception
from heat.common import template_format
from heat.engine import parser
from heat.engine.resources.random_string import RandomString
from heat.engine.resources import random_string as rs
from heat.engine import template
from heat.tests import common
from heat.tests import utils
@ -251,9 +250,10 @@ class TestGenerateRandomString(common.HeatTestCase):
# run each test multiple times to confirm random generator
# doesn't generate a matching pattern by chance
for i in range(1, 32):
sequence = RandomString._sequences[self.seq]
r = RandomString._deprecated_random_string(sequence, self.length)
sequence = rs.RandomString._sequences[self.seq]
r = rs.RandomString._deprecated_random_string(sequence,
self.length)
self.assertThat(r, HasLength(self.length))
self.assertThat(r, matchers.HasLength(self.length))
regex = '%s{%s}' % (self.pattern, self.length)
self.assertThat(r, MatchesRegex(regex))
self.assertThat(r, matchers.MatchesRegex(regex))

View File

@ -15,7 +15,7 @@ import collections
import copy
from keystoneclient import exceptions as keystone_exc
from neutronclient.common.exceptions import NeutronClientException
from neutronclient.common import exceptions as neutron_exc
from neutronclient.v2_0 import client as neutronclient
from novaclient.v1_1 import security_group_rules as nova_sgr
from novaclient.v1_1 import security_groups as nova_sg
@ -747,7 +747,7 @@ Resources:
'security_group_id': 'aaaa'
}
}).AndRaise(
NeutronClientException(status_code=409))
neutron_exc.NeutronClientException(status_code=409))
neutronclient.Client.create_security_group_rule({
'security_group_rule': {
'direction': 'ingress',
@ -760,7 +760,7 @@ Resources:
'security_group_id': 'aaaa'
}
}).AndRaise(
NeutronClientException(status_code=409))
neutron_exc.NeutronClientException(status_code=409))
neutronclient.Client.create_security_group_rule({
'security_group_rule': {
'direction': 'ingress',
@ -773,7 +773,7 @@ Resources:
'security_group_id': 'aaaa'
}
}).AndRaise(
NeutronClientException(status_code=409))
neutron_exc.NeutronClientException(status_code=409))
neutronclient.Client.create_security_group_rule({
'security_group_rule': {
'direction': 'egress',
@ -786,7 +786,7 @@ Resources:
'security_group_id': 'aaaa'
}
}).AndRaise(
NeutronClientException(status_code=409))
neutron_exc.NeutronClientException(status_code=409))
neutronclient.Client.create_security_group_rule({
'security_group_rule': {
'direction': 'egress',
@ -799,7 +799,7 @@ Resources:
'security_group_id': 'aaaa'
}
}).AndRaise(
NeutronClientException(status_code=409))
neutron_exc.NeutronClientException(status_code=409))
# delete script
neutronclient.Client.show_security_group('aaaa').AndReturn({
@ -865,20 +865,20 @@ Resources:
}],
'id': 'aaaa'}})
neutronclient.Client.delete_security_group_rule('bbbb').AndRaise(
NeutronClientException(status_code=404))
neutron_exc.NeutronClientException(status_code=404))
neutronclient.Client.delete_security_group_rule('cccc').AndRaise(
NeutronClientException(status_code=404))
neutron_exc.NeutronClientException(status_code=404))
neutronclient.Client.delete_security_group_rule('dddd').AndRaise(
NeutronClientException(status_code=404))
neutron_exc.NeutronClientException(status_code=404))
neutronclient.Client.delete_security_group_rule('eeee').AndRaise(
NeutronClientException(status_code=404))
neutron_exc.NeutronClientException(status_code=404))
neutronclient.Client.delete_security_group_rule('ffff').AndRaise(
NeutronClientException(status_code=404))
neutron_exc.NeutronClientException(status_code=404))
neutronclient.Client.delete_security_group('aaaa').AndRaise(
NeutronClientException(status_code=404))
neutron_exc.NeutronClientException(status_code=404))
neutronclient.Client.show_security_group('aaaa').AndRaise(
NeutronClientException(status_code=404))
neutron_exc.NeutronClientException(status_code=404))
self.m.ReplayAll()
stack = self.create_stack(self.test_template_neutron)

View File

@ -12,8 +12,7 @@
# under the License.
import datetime
from json import dumps
from json import loads
import json
import six
import uuid
@ -29,7 +28,7 @@ from heat.engine.clients.os import glance
from heat.engine.clients.os import nova
from heat.engine import environment
from heat.engine import parser
from heat.engine.resource import Resource
from heat.engine import resource as rsrc
from heat.engine.resources import instance as instances
from heat.engine import scheduler
from heat.tests import common
@ -66,7 +65,7 @@ UUIDs = (UUID1, UUID2, UUID3) = sorted([str(uuid.uuid4())
for x in range(3)])
class MyResource(Resource):
class MyResource(rsrc.Resource):
properties_schema = {
'ServerName': {'Type': 'String', 'Required': True},
'Flavor': {'Type': 'String', 'Required': True},
@ -311,12 +310,13 @@ class SqlAlchemyTest(common.HeatTestCase):
self._mock_create(self.m)
self.m.ReplayAll()
stack.create()
rsrc = stack['WebServer']
rsrc.data_set('test', 'test_data')
self.assertEqual('test_data', db_api.resource_data_get(rsrc, 'test'))
db_api.resource_data_delete(rsrc, 'test')
resource = stack['WebServer']
resource.data_set('test', 'test_data')
self.assertEqual('test_data', db_api.resource_data_get(resource,
'test'))
db_api.resource_data_delete(resource, 'test')
self.assertRaises(exception.NotFound,
db_api.resource_data_get, rsrc, 'test')
db_api.resource_data_get, resource, 'test')
def test_stack_get_by_name(self):
stack = self._setup_test_stack('stack', UUID1,
@ -1119,7 +1119,7 @@ def create_resource(ctx, stack, **kwargs):
'action': 'create',
'status': 'complete',
'status_reason': 'create_complete',
'rsrc_metadata': loads('{"foo": "123"}'),
'rsrc_metadata': json.loads('{"foo": "123"}'),
'stack_id': stack.id
}
values.update(kwargs)
@ -1153,7 +1153,7 @@ def create_event(ctx, **kwargs):
def create_watch_rule(ctx, stack, **kwargs):
values = {
'name': 'test_rule',
'rule': loads('{"foo": "123"}'),
'rule': json.loads('{"foo": "123"}'),
'state': 'normal',
'last_evaluated': timeutils.utcnow(),
'stack_id': stack.id,
@ -1164,7 +1164,7 @@ def create_watch_rule(ctx, stack, **kwargs):
def create_watch_data(ctx, watch_rule, **kwargs):
values = {
'data': loads('{"foo": "bar"}'),
'data': json.loads('{"foo": "bar"}'),
'watch_rule_id': watch_rule.id
}
values.update(kwargs)
@ -1551,7 +1551,7 @@ class DBAPIResourceTest(common.HeatTestCase):
self.assertEqual('create', ret_res.action)
self.assertEqual('complete', ret_res.status)
self.assertEqual('create_complete', ret_res.status_reason)
self.assertEqual('{"foo": "123"}', dumps(ret_res.rsrc_metadata))
self.assertEqual('{"foo": "123"}', json.dumps(ret_res.rsrc_metadata))
self.assertEqual(self.stack.id, ret_res.stack_id)
def test_resource_get(self):
@ -1869,7 +1869,7 @@ class DBAPIWatchRuleTest(common.HeatTestCase):
ret_wr = db_api.watch_rule_get(self.ctx, watch_rule.id)
self.assertIsNotNone(ret_wr)
self.assertEqual('test_rule', ret_wr.name)
self.assertEqual('{"foo": "123"}', dumps(ret_wr.rule))
self.assertEqual('{"foo": "123"}', json.dumps(ret_wr.rule))
self.assertEqual('normal', ret_wr.state)
self.assertEqual(self.stack.id, ret_wr.stack_id)
@ -1912,13 +1912,13 @@ class DBAPIWatchRuleTest(common.HeatTestCase):
watch_rule = create_watch_rule(self.ctx, self.stack)
values = {
'name': 'test_rule_1',
'rule': loads('{"foo": "bar"}'),
'rule': json.loads('{"foo": "bar"}'),
'state': 'nodata',
}
db_api.watch_rule_update(self.ctx, watch_rule.id, values)
watch_rule = db_api.watch_rule_get(self.ctx, watch_rule.id)
self.assertEqual('test_rule_1', watch_rule.name)
self.assertEqual('{"foo": "bar"}', dumps(watch_rule.rule))
self.assertEqual('{"foo": "bar"}', json.dumps(watch_rule.rule))
self.assertEqual('nodata', watch_rule.state)
self.assertRaises(exception.NotFound, db_api.watch_rule_update,
@ -1950,14 +1950,14 @@ class DBAPIWatchDataTest(common.HeatTestCase):
ret_data = db_api.watch_data_get_all(self.ctx)
self.assertEqual(1, len(ret_data))
self.assertEqual('{"foo": "bar"}', dumps(ret_data[0].data))
self.assertEqual('{"foo": "bar"}', json.dumps(ret_data[0].data))
self.assertEqual(self.watch_rule.id, ret_data[0].watch_rule_id)
def test_watch_data_get_all(self):
values = [
{'data': loads('{"foo": "d1"}')},
{'data': loads('{"foo": "d2"}')},
{'data': loads('{"foo": "d3"}')}
{'data': json.loads('{"foo": "d1"}')},
{'data': json.loads('{"foo": "d2"}')},
{'data': json.loads('{"foo": "d3"}')}
]
[create_watch_data(self.ctx, self.watch_rule, **val) for val in values]
watch_data = db_api.watch_data_get_all(self.ctx)

View File

@ -12,7 +12,7 @@
# under the License.
import mock
from testtools.matchers import MatchesRegex
from testtools import matchers
from heat.engine.clients.os import swift
from heat.tests import common
@ -76,7 +76,7 @@ class SwiftUtilsTests(SwiftClientPluginTestCase):
"/%s\?temp_url_sig=[0-9a-f]{40}&"
"temp_url_expires=[0-9]{10}" %
(container_name, obj_name))
self.assertThat(url, MatchesRegex(regexp))
self.assertThat(url, matchers.MatchesRegex(regexp))
timeout = int(url.split('=')[-1])
self.assertTrue(timeout < swift.MAX_EPOCH)
@ -113,4 +113,4 @@ class SwiftUtilsTests(SwiftClientPluginTestCase):
"/%s\?temp_url_sig=[0-9a-f]{40}&"
"temp_url_expires=[0-9]{10}" %
(container_name, obj_name))
self.assertThat(url, MatchesRegex(regexp))
self.assertThat(url, matchers.MatchesRegex(regexp))

View File

@ -15,8 +15,6 @@ from oslo.config import cfg
import requests
from requests import exceptions
import six
from six.moves import cStringIO
from six.moves import urllib
from heat.common import urlfetch
from heat.tests import common
@ -50,8 +48,9 @@ class UrlFetchTest(common.HeatTestCase):
data = '{ "foo": "bar" }'
url = 'file:///etc/profile'
self.m.StubOutWithMock(urllib.request, 'urlopen')
urllib.request.urlopen(url).AndReturn(cStringIO(data))
self.m.StubOutWithMock(six.moves.urllib.request, 'urlopen')
six.moves.urllib.request.urlopen(url).AndReturn(
six.moves.cStringIO(data))
self.m.ReplayAll()
self.assertEqual(data, urlfetch.get(url, allowed_schemes=['file']))
@ -60,8 +59,9 @@ class UrlFetchTest(common.HeatTestCase):
def test_file_scheme_failure(self):
url = 'file:///etc/profile'
self.m.StubOutWithMock(urllib.request, 'urlopen')
urllib.request.urlopen(url).AndRaise(urllib.error.URLError('oops'))
self.m.StubOutWithMock(six.moves.urllib.request, 'urlopen')
six.moves.urllib.request.urlopen(url).AndRaise(
six.moves.urllib.error.URLError('oops'))
self.m.ReplayAll()
self.assertRaises(urlfetch.URLFetchError,

View File

@ -20,7 +20,7 @@ from heat.common import template_format
from heat.engine.clients.os import glance
from heat.engine.clients.os import nova
from heat.engine import environment
from heat.engine.hot.template import HOTemplate20130523
from heat.engine.hot import template as tmpl
from heat.engine import parser
from heat.engine import resources
from heat.engine import service
@ -1383,7 +1383,7 @@ class validateTest(common.HeatTestCase):
def test_validate_duplicate_parameters_in_group(self):
t = template_format.parse(test_template_duplicate_parameters)
template = HOTemplate20130523(t)
template = tmpl.HOTemplate20130523(t)
stack = parser.Stack(self.ctx, 'test_stack', template,
environment.Environment({
'KeyName': 'test',
@ -1398,7 +1398,7 @@ class validateTest(common.HeatTestCase):
def test_validate_invalid_parameter_in_group(self):
t = template_format.parse(test_template_invalid_parameter_name)
template = HOTemplate20130523(t)
template = tmpl.HOTemplate20130523(t)
stack = parser.Stack(self.ctx, 'test_stack', template,
environment.Environment({
'KeyName': 'test',
@ -1414,7 +1414,7 @@ class validateTest(common.HeatTestCase):
def test_validate_no_parameters_in_group(self):
t = template_format.parse(test_template_no_parameters)
template = HOTemplate20130523(t)
template = tmpl.HOTemplate20130523(t)
stack = parser.Stack(self.ctx, 'test_stack', template)
exc = self.assertRaises(exception.StackValidationFailed,
stack.validate)

View File

@ -13,7 +13,7 @@
import webob
from heat.api.middleware.version_negotiation import VersionNegotiationFilter
from heat.api.middleware import version_negotiation as vn
from heat.tests import common
@ -26,7 +26,7 @@ class VersionNegotiationMiddlewareTest(common.HeatTestCase):
return VersionController()
def test_match_version_string(self):
version_negotiation = VersionNegotiationFilter(
version_negotiation = vn.VersionNegotiationFilter(
self._version_controller_factory, None, None)
request = webob.Request({})
major_version = 1
@ -39,7 +39,7 @@ class VersionNegotiationMiddlewareTest(common.HeatTestCase):
self.assertEqual(minor_version, request.environ['api.minor_version'])
def test_not_match_version_string(self):
version_negotiation = VersionNegotiationFilter(
version_negotiation = vn.VersionNegotiationFilter(
self._version_controller_factory, None, None)
request = webob.Request({})
@ -47,7 +47,7 @@ class VersionNegotiationMiddlewareTest(common.HeatTestCase):
self.assertFalse(match)
def test_return_version_controller_when_request_path_is_version(self):
version_negotiation = VersionNegotiationFilter(
version_negotiation = vn.VersionNegotiationFilter(
self._version_controller_factory, None, None)
request = webob.Request({'PATH_INFO': 'versions'})
@ -56,7 +56,7 @@ class VersionNegotiationMiddlewareTest(common.HeatTestCase):
self.assertIsInstance(response, VersionController)
def test_return_version_controller_when_request_path_is_empty(self):
version_negotiation = VersionNegotiationFilter(
version_negotiation = vn.VersionNegotiationFilter(
self._version_controller_factory, None, None)
request = webob.Request({'PATH_INFO': '/'})
@ -65,7 +65,7 @@ class VersionNegotiationMiddlewareTest(common.HeatTestCase):
self.assertIsInstance(response, VersionController)
def test_request_path_contains_valid_version(self):
version_negotiation = VersionNegotiationFilter(
version_negotiation = vn.VersionNegotiationFilter(
self._version_controller_factory, None, None)
major_version = 1
minor_version = 0
@ -80,7 +80,7 @@ class VersionNegotiationMiddlewareTest(common.HeatTestCase):
self.assertEqual(minor_version, request.environ['api.minor_version'])
def test_removes_version_from_request_path(self):
version_negotiation = VersionNegotiationFilter(
version_negotiation = vn.VersionNegotiationFilter(
self._version_controller_factory, None, None)
expected_path = 'resource'
request = webob.Request({'PATH_INFO': 'v1.0/{0}'.format(expected_path)
@ -92,7 +92,7 @@ class VersionNegotiationMiddlewareTest(common.HeatTestCase):
self.assertEqual(expected_path, request.path_info_peek())
def test_request_path_contains_unknown_version(self):
version_negotiation = VersionNegotiationFilter(
version_negotiation = vn.VersionNegotiationFilter(
self._version_controller_factory, None, None)
request = webob.Request({'PATH_INFO': 'v2.0/resource'})
@ -101,7 +101,7 @@ class VersionNegotiationMiddlewareTest(common.HeatTestCase):
self.assertIsInstance(response, VersionController)
def test_accept_header_contains_valid_version(self):
version_negotiation = VersionNegotiationFilter(
version_negotiation = vn.VersionNegotiationFilter(
self._version_controller_factory, None, None)
major_version = 1
minor_version = 0
@ -116,7 +116,7 @@ class VersionNegotiationMiddlewareTest(common.HeatTestCase):
self.assertEqual(minor_version, request.environ['api.minor_version'])
def test_accept_header_contains_unknown_version(self):
version_negotiation = VersionNegotiationFilter(
version_negotiation = vn.VersionNegotiationFilter(
self._version_controller_factory, None, None)
request = webob.Request({'PATH_INFO': 'resource'})
request.headers['Accept'] = 'application/vnd.openstack.' \
@ -127,7 +127,7 @@ class VersionNegotiationMiddlewareTest(common.HeatTestCase):
self.assertIsInstance(response, VersionController)
def test_no_URI_version_accept_header_contains_invalid_MIME_type(self):
version_negotiation = VersionNegotiationFilter(
version_negotiation = vn.VersionNegotiationFilter(
self._version_controller_factory, None, None)
request = webob.Request({'PATH_INFO': 'resource'})
request.headers['Accept'] = 'application/invalidMIMEType'

View File

@ -23,7 +23,7 @@ from heat.tests import common
from heat.tests import utils
try:
from neutronclient.common.exceptions import NeutronClientException
from neutronclient.common import exceptions as neutron_exc
from neutronclient.v2_0 import client as neutronclient
except ImportError:
neutronclient = None
@ -235,11 +235,11 @@ class VPCTestBase(common.HeatTestCase):
'id': '0389f747-7785-4757-b7bb-2ab07e4b09c3'}})
elif group == 'INVALID-NO-REF':
neutronclient.Client.show_security_group(group).AndRaise(
NeutronClientException(status_code=404))
neutron_exc.NeutronClientException(status_code=404))
elif group == 'RaiseException':
neutronclient.Client.show_security_group(
'0389f747-7785-4757-b7bb-2ab07e4b09c3').AndRaise(
NeutronClientException(status_code=403))
neutron_exc.NeutronClientException(status_code=403))
def mock_delete_security_group(self):
self.mock_show_security_group()
@ -354,7 +354,7 @@ Resources:
neutronclient.Client.create_network(
{
'network': {'name': self.vpc_name}
}).AndRaise(NeutronClientException())
}).AndRaise(neutron_exc.NeutronClientException())
def test_vpc(self):
self.mock_create_network()
@ -408,9 +408,9 @@ Resources:
neutronclient.Client.remove_interface_router(
u'bbbb',
{'subnet_id': 'cccc'}).AndRaise(
NeutronClientException(status_code=404))
neutron_exc.NeutronClientException(status_code=404))
neutronclient.Client.delete_subnet('cccc').AndRaise(
NeutronClientException(status_code=404))
neutron_exc.NeutronClientException(status_code=404))
self.m.ReplayAll()
stack = self.create_stack(self.test_template)
@ -446,9 +446,8 @@ Resources:
'tenant_id': 'c1210485b2424d48804aad5d39c61b8f',
'id': 'cccc'}})
neutronclient.Client.show_network(
'aaaa'
).MultipleTimes().AndRaise(NeutronClientException(status_code=404))
neutronclient.Client.show_network('aaaa').MultipleTimes().AndRaise(
neutron_exc.NeutronClientException(status_code=404))
def test_create_failed_delete_success(self):
self._mock_create_subnet_failed()