Added tests for disk_types, disks and snapshots management

Added default volume size for new empty disk creation.

Change-Id: Ia881951bf7e3fc9189c59dcd89270ffdef7a4d44
This commit is contained in:
alexey-mr 2015-11-10 19:22:14 +03:00
parent 93aabba2ff
commit 350e260b28
17 changed files with 579 additions and 51 deletions

View File

@ -173,6 +173,10 @@ function configure_gceapi {
iniset $GCEAPI_CONF_FILE DEFAULT region $REGION_NAME
iniset $GCEAPI_CONF_FILE DEFAULT keystone_url "$OS_AUTH_URL"
# set default new empty disk size to 1GB, default production is 500
# that corresponds to default Google pd-standard disk-type
iniset $GCEAPI_CONF_FILE DEFAULT default_volume_size_gb 1
iniset $GCEAPI_CONF_FILE keystone_authtoken admin_tenant_name $SERVICE_TENANT_NAME
iniset $GCEAPI_CONF_FILE keystone_authtoken admin_user $GCEAPI_ADMIN_USER
iniset $GCEAPI_CONF_FILE keystone_authtoken admin_password $SERVICE_PASSWORD

View File

@ -52,6 +52,10 @@ gce_opts = [
cfg.StrOpt('region',
default='RegionOne',
help='Region of this service'),
cfg.IntOpt('default_volume_size_gb',
default=500,
help='Default new volume size if sizeGb, sourceSnapshot and '
'sourceImage are not provided'),
]
CONF = cfg.CONF

View File

@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from gceapi.api import base_api
from gceapi.api import clients
from gceapi.api import image_api
@ -22,6 +24,7 @@ from gceapi.api import utils
from gceapi import exception
CONF = cfg.CONF
GB = 1024 ** 3
@ -131,6 +134,9 @@ class API(base_api.API):
image_size_in_gb = (int(image['size']) + GB - 1) / GB
if not sizeGb or sizeGb < image_size_in_gb:
sizeGb = image_size_in_gb
else:
if not sizeGb:
sizeGb = CONF.default_volume_size_gb
operation_util.start_operation(context, self._get_add_item_progress)
volume = client.volumes.create(

View File

@ -32,9 +32,11 @@ class Controller(gce_common.Controller):
"creationTimestamp": self._format_date(volume["created_at"]),
"status": volume["status"],
"name": volume["display_name"],
"description": volume["display_description"],
"sizeGb": volume["size"],
"sizeGb": u"{}".format(volume["size"]),
}
description = volume["display_description"]
if description is not None:
result_dict["description"] = description
snapshot = volume["snapshot"]
if snapshot:
result_dict["sourceSnapshot"] = self._qualify(request,
@ -62,7 +64,13 @@ class Controller(gce_common.Controller):
context = self._get_context(req)
operation_util.init_operation(context, "createSnapshot",
self._type_name, id, scope)
snapshot_api.API().add_item(context, body, scope)
snapshot = snapshot_api.API().add_item(context, body, scope)
# TODO(alexey-mr): workaround: have to set item id here
# snapshot_api.API().add_item set_item_id has no effect because
# of different type_name disk vs. snapshot
# but snapshot type_name can't be used in init_operation because
# targetLink and targetId should point to disk object
operation_util.set_item_id(context, snapshot['id'], self._type_name)
def create_resource():

View File

@ -80,7 +80,7 @@ class API(base_api.API):
operation_util.start_operation(context, self._get_add_item_progress)
snapshot = client.volume_snapshots.create(
volumes[0].id, True, name, body["description"])
volumes[0].id, True, name, body.get("description"))
operation_util.set_item_id(context, snapshot.id, self.KIND)
return self._prepare_item(client, utils.to_dict(snapshot))

View File

@ -29,10 +29,12 @@ class Controller(gce_common.Controller):
result_dict = {
"creationTimestamp": self._format_date(snapshot["created_at"]),
"status": snapshot["status"],
"diskSizeGb": snapshot["size"],
"diskSizeGb": u"{}".format(snapshot["size"]),
"name": snapshot["name"],
"description": snapshot["display_description"],
}
}
description = snapshot["display_description"]
if description:
result_dict["description"] = description
disk = snapshot.get("disk")
if disk is not None:
result_dict["sourceDisk"] = self._qualify(

View File

@ -127,7 +127,7 @@ networking=${networking}
region=${REGION:-'region-one'}
# convert flavor name: becase GCE dowsn't allows '.' and converts '-' into '.'
machine_type=${flavor_name//\./-}
image=${os_image_name}
image=${OS_PROJECT_NAME}/global/images/${os_image_name}
EOF"
fi

View File

@ -36,7 +36,6 @@ class TestAddressesBase(test_base.GCETestCase):
return res
def _create_address(self, options):
self._add_cleanup(self._delete_address, options['name'])
cfg = self.cfg
project_id = cfg.project_id
region = cfg.region
@ -46,6 +45,7 @@ class TestAddressesBase(test_base.GCETestCase):
project=project_id,
region=region,
body=config)
self._add_cleanup(self._delete_address, options['name'])
self._execute_async_request(request, project_id, region=region)
def _delete_address(self, name):
@ -58,8 +58,8 @@ class TestAddressesBase(test_base.GCETestCase):
project=project_id,
region=region,
address=name)
self._remove_cleanup(self._delete_address, name)
self._execute_async_request(request, project_id, region=region)
self._remove_cleanup(self._delete_address, name)
def _list_addresses(self):
cfg = self.cfg

View File

@ -0,0 +1,99 @@
# Copyright 2015 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from gceapi.tests.functional import test_base
TYPES = [
'local-ssd',
'pd-ssd',
'pd-standard'
]
class TestDiskTypes(test_base.GCETestCase):
@property
def disk_types(self):
res = self.api.compute.diskTypes()
self.assertIsNotNone(res,
'Null regions object, api is not built properly')
return res
def setUp(self):
if not self.is_real_gce:
self.skipTest('Not supported in Openstack GCE API')
return
super(TestDiskTypes, self).setUp()
def _get_disk_type(self, name):
project_id = self.cfg.project_id
zone = self.cfg.zone
self.trace('Get disk-type: project_id={} zone={} name={}'.format(
project_id, zone, name))
request = self.disk_types.get(project=project_id,
zone=zone,
diskType=name)
result = request.execute()
self.trace('Disk type: {}'.format(result))
self.api.validate_schema(value=result, schema_name='DiskType')
return result
def _check_disk_type_prperties(self, name, disk_type, zone=None):
self.assertEqual(name, disk_type['name'])
self.assertEqual(self.api.get_zone_url(zone=zone), disk_type['zone'])
self.assertIn('description', disk_type)
self.assertIn('validDiskSize', disk_type)
self.assertIn('defaultDiskSizeGb', disk_type)
self.assertIn('kind', disk_type)
self.assertIn('creationTimestamp', disk_type)
def test_get_disk_type(self):
for t in TYPES:
result = self._get_disk_type(t)
self._check_disk_type_prperties(t, result)
def test_list_disk_types(self):
cfg = self.cfg
project_id = cfg.project_id
zone = cfg.zone
self.trace('List disk types: project_id={} zone={}'.format(project_id,
zone))
result = self.disk_types.list(project=project_id, zone=zone).execute()
self.trace('Disk types list: {}'.format(result))
self.api.validate_schema(value=result, schema_name='DiskTypeList')
for t in TYPES:
dt = self.assertFind(t, result)
self._check_disk_type_prperties(t, dt)
def test_aggregated_list_disk_types(self):
cfg = self.cfg
project_id = cfg.project_id
self.trace(
'Aggregated list disk types: project_id={}'.format(project_id))
result = self.disk_types.aggregatedList(project=project_id).execute()
self.trace('Aggregated disk types list: {}'.format(result))
self.api.validate_schema(
value=result,
schema_name='DiskTypeAggregatedList')
self.assertIn('items', result)
items = result['items']
if len(items) == 0:
self.fail('Empty aggregated disk types list')
for zone_resources in items.items():
for t in TYPES:
dt = self.assertFind(t, zone_resources[1], 'diskTypes')
self._check_disk_type_prperties(t, dt, zone=zone_resources[0])

View File

@ -0,0 +1,307 @@
# Copyright 2015 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from gceapi.tests.functional import test_base
class TestSnapshotsBase(test_base.GCETestCase):
@property
def snapshots(self):
res = self.api.compute.snapshots()
self.assertIsNotNone(
res,
'Null snapshots object, api is not built properly')
return res
def _delete_snapshot(self, name):
project_id = self.cfg.project_id
self.trace('Delete snapshot: project_id={} name={}'.
format(project_id, name))
request = self.snapshots.delete(
project=project_id,
snapshot=name)
self._execute_async_request(request, project_id)
self._remove_cleanup(self._delete_snapshot, name)
def _list_snapshots(self, filter=None):
project_id = self.cfg.project_id
self.trace('List snapshots: project_id={}'.format(project_id))
request = self.snapshots.list(project=project_id, filter=filter)
result = request.execute()
self.trace('Snapshots: {}'.format(result))
self.api.validate_schema(value=result, schema_name='SnapshotList')
return result
def _get_snapshot(self, name):
project_id = self.cfg.project_id
self.trace('Get snapshot: project_id={} name={}'.
format(project_id, name))
request = self.snapshots.get(
project=project_id,
snapshot=name)
result = request.execute()
self.trace('Snapshot: {}'.format(result))
self.api.validate_schema(value=result, schema_name='Snapshot')
return result
def _get_expected_snapshot_fields(self, disk_name, options):
snapshot = copy.deepcopy(options)
# fill defaults if needed
if 'kind' not in snapshot:
snapshot['kind'] = 'compute#snapshot'
if 'selfLink' not in snapshot:
snapshot_url = 'global/snapshots/{}'.format(snapshot['name'])
snapshot['selfLink'] = self.api.get_project_url(snapshot_url)
if 'status' not in snapshot:
snapshot['status'] = 'READY'
if 'sourceDisk' not in options:
src_disk_url = 'disks/{}'.format(disk_name)
snapshot['sourceDisk'] = self.api.get_zone_url(src_disk_url)
return snapshot
def _ensure_snapshot_created(self, disk_name, options):
name = options['name']
snapshot = self._get_expected_snapshot_fields(disk_name, options)
# get object from server and check properties
result = self._get_snapshot(name)
self.assertObject(snapshot, result)
return result
class TestDiskBase(TestSnapshotsBase):
@property
def disks(self):
res = self.api.compute.disks()
self.assertIsNotNone(
res,
'Null disks object, api is not built properly')
return res
def _create_disk(self, options, source_image=None):
cfg = self.cfg
project_id = cfg.project_id
zone = cfg.zone
self.trace('Crete disk with options {} source_image={}'.
format(options, source_image))
request = self.disks.insert(
project=project_id,
zone=zone,
sourceImage=source_image,
body=options)
self._add_cleanup(self._delete_disk, options['name'])
self._execute_async_request(request, project_id, zone=zone)
def _delete_disk(self, name):
cfg = self.cfg
project_id = cfg.project_id
zone = cfg.zone
self.trace('Delete disk: project_id={} zone={} name={}'.
format(project_id, zone, name))
request = self.disks.delete(
project=project_id,
zone=zone,
disk=name)
self._execute_async_request(request, project_id, zone=zone)
self._remove_cleanup(self._delete_disk, name)
def _list_disks(self, filter=None):
cfg = self.cfg
project_id = cfg.project_id
zone = cfg.zone
self.trace('List disks: project_id={} zone={} filter={}'.
format(project_id, zone, filter))
request = self.disks.list(
project=project_id,
zone=zone,
filter=filter)
result = request.execute()
self.trace('Disks: {}'.format(result))
self.api.validate_schema(value=result, schema_name='DiskList')
return result
def _get_disk(self, name):
cfg = self.cfg
project_id = cfg.project_id
zone = cfg.zone
self.trace('Get disk: project_id={} zone={} name={}'.
format(project_id, zone, name))
request = self.disks.get(
project=project_id,
zone=zone,
disk=name)
result = request.execute()
self.trace('Disk: {}'.format(result))
self.api.validate_schema(value=result, schema_name='Disk')
return result
def _get_expected_disk_fields(self, options, source_image=None):
disk = copy.deepcopy(options)
# fill defaults if needed
if 'kind' not in disk:
disk['kind'] = 'compute#disk'
if 'sizeGb' not in disk:
disk['sizeGb'] = '500' if self.is_real_gce else '1'
if 'zone' not in disk:
disk['zone'] = self.api.get_zone_url()
if 'selfLink' not in disk:
disk_url = 'disks/{}'.format(options['name'])
disk['selfLink'] = self.api.get_zone_url(disk_url)
# TODO(alexey-mr): un-comment when disk-types be supported
# if 'type' not in disk:
# disk['type'] = self.api.get_zone_url('diskTypes/pd-standard')
if 'status' not in disk:
disk['status'] = 'READY'
if 'sourceSnapshot' in options:
snapshot_url = self.api.get_project_url(options['sourceSnapshot'])
disk['sourceSnapshot'] = snapshot_url
if source_image is not None:
disk['sourceImage'] = self.api.get_global_url(source_image)
return disk
def _ensure_disk_created(self, options, source_image=None):
name = options['name']
disk = self._get_expected_disk_fields(options, source_image)
# get object from server and check properties
result = self._get_disk(name)
self.assertObject(disk, result)
return result
def _create_snapshot(self, disk_name, options):
name = options['name']
cfg = self.cfg
project_id = cfg.project_id
zone = cfg.zone
self.trace('Create snapshot {} for disk={}'.format(name, disk_name))
request = self.disks.createSnapshot(
project=project_id,
zone=zone,
disk=disk_name,
body=options)
self._add_cleanup(self._delete_snapshot, name)
self._execute_async_request(request, project_id, zone=zone)
def _create_disk_and_snapshot(self):
# prepare disk for snapshot
disk_name = self._rand_name('testdisk')
disk_options = {
'name': disk_name,
'sizeGb': '1'
}
self._create_disk(disk_options)
disk = self._ensure_disk_created(disk_options)
# prepare snapshot
snapshot_name = self._rand_name('testsnapshot')
snapshot_options = {
'name': snapshot_name
}
self._create_snapshot(disk_name=disk_name, options=snapshot_options)
snapshot_options['diskSizeGb'] = disk_options['sizeGb']
snapshot = self._ensure_snapshot_created(disk_name=disk_name,
options=snapshot_options)
return {'disk': disk, 'snapshot': snapshot}
class TestDisks(TestDiskBase):
def test_create_delete_default_disk(self):
name = self._rand_name('testdisk')
options = {
'name': name
}
self._create_disk(options)
self._ensure_disk_created(options)
self._delete_disk(name)
def test_create_delete_disk_with_size(self):
name = self._rand_name('testdisk')
options = {
'name': name,
'sizeGb': None
}
for size in ['1', '2']:
options['sizeGb'] = size
self._create_disk(options)
self._ensure_disk_created(options)
self._delete_disk(name)
def test_create_disk_from_image(self):
name = self._rand_name('testdisk')
image = 'projects/{}'.format(self.cfg.image)
options = {
'name': name,
}
self._create_disk(options, source_image=image)
# TODO(alexey-mr): read size of image and add it for checking
# options['sizeGb'] = self._get_image_size(self.cfg.image)
self._ensure_disk_created(options, source_image=image)
self._delete_disk(name)
def test_create_disk_from_snapshot(self):
data = self._create_disk_and_snapshot()
snapshot = data['snapshot']
snapshot_name = snapshot['name']
# create testing disk by snapshot
disk_name = self._rand_name('testdisk')
snapshot_url = 'global/snapshots/{}'.format(snapshot_name)
options = {
'name': disk_name,
'sourceSnapshot': snapshot_url
}
self._create_disk(options)
options['sizeGb'] = snapshot['diskSizeGb']
self._ensure_disk_created(options)
# delete resources
self._delete_disk(disk_name)
self._delete_snapshot(snapshot_name)
self._delete_disk(data['disk']['name'])
def test_list_disks(self):
# create disks
name = self._rand_name('testdisk')
options = {
'name': name,
'sizeGb': '1'
}
self._create_disk(options)
disk = self._ensure_disk_created(options)
# list and find object from server and check properties
result = self._list_disks()
result = self.assertFind(name, result)
self.assertObject(disk, result)
self._delete_disk(name)
def test_list_disks_by_filter_name(self):
# prepare disks
names = list()
for i in range(0, 3):
names.append(self._rand_name('testdisk'))
disks = dict()
for name in names:
options = {
'name': name,
'sizeGb': '1'
}
self._create_disk(options)
disks[name] = self._ensure_disk_created(options)
# list disks with filter by name
for name in names:
result = self._list_disks(filter='name eq {}'.format(name))
self.assertEqual(1, len(result['items']))
self.assertObject(disks[name], result['items'][0])
# clean resources
for name in names:
self._delete_disk(name)

View File

@ -76,7 +76,6 @@ class TestInstancesBase(test_base.GCETestCase):
return res
def _create_instance(self, options):
self._add_cleanup(self._delete_instance, options['name'])
cfg = self.cfg
project_id = cfg.project_id
zone = cfg.zone
@ -86,6 +85,7 @@ class TestInstancesBase(test_base.GCETestCase):
project=project_id,
zone=zone,
body=config)
self._add_cleanup(self._delete_instance, options['name'])
self._execute_async_request(request, project_id, zone=zone)
def _delete_instance(self, name):
@ -98,8 +98,8 @@ class TestInstancesBase(test_base.GCETestCase):
project=project_id,
zone=zone,
instance=name)
self._remove_cleanup(self._delete_instance, name)
self._execute_async_request(request, project_id, zone=zone)
self._remove_cleanup(self._delete_instance, name)
def _list_instances(self):
project_id = self.cfg.project_id

View File

@ -39,13 +39,13 @@ class TestNetworksBase(test_base.GCETestCase):
return res
def _create_network(self, options):
self._add_cleanup(self._delete_network, options['name'])
project_id = self.cfg.project_id
config = _prepare_network_create_parameters(**options)
self.trace('Crete network with options {}'.format(config))
request = self.networks.insert(
project=project_id,
body=config)
self._add_cleanup(self._delete_network, options['name'])
self._execute_async_request(request, project_id)
def _delete_network(self, name):
@ -56,8 +56,8 @@ class TestNetworksBase(test_base.GCETestCase):
request = self.networks.delete(
project=project_id,
network=name)
self._remove_cleanup(self._delete_network, name)
self._execute_async_request(request, project_id)
self._remove_cleanup(self._delete_network, name)
def _list_networks(self):
project_id = self.cfg.project_id

View File

@ -0,0 +1,49 @@
# Copyright 2015 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from gceapi.tests.functional.api import test_disks
class TestSnapshots(test_disks.TestDiskBase):
def test_list_snapshots(self):
# prepare object for listing
data = self._create_disk_and_snapshot()
disk_name = data['disk']['name']
snapshot = data['snapshot']
snapshot_name = snapshot['name']
# list and find object from server and check properties
result = self._list_snapshots()
result = self.assertFind(snapshot_name, result)
self.assertObject(snapshot, result)
self._delete_snapshot(snapshot_name)
self._delete_disk(disk_name)
def test_list_snapshots_by_filter_name(self):
# prepare objects for listings
objects = list()
for i in range(0, 3):
objects.append(self._create_disk_and_snapshot())
# list snapshots with filter by name
for item in objects:
snapshot = item['snapshot']
snapshot_filter = 'name eq {}'.format(snapshot['name'])
result = self._list_snapshots(filter=snapshot_filter)
self.assertEqual(1, len(result['items']))
self.assertObject(snapshot, result['items'][0])
# clean resources
for item in objects:
self._delete_snapshot(item['snapshot']['name'])
self._delete_disk(item['disk']['name'])

View File

@ -51,6 +51,10 @@ class CredentialsProvider(object):
def keystone_client(self):
return self._create_keystone_client()
@property
def is_google_auth(self):
return self.cfg.cred_type == 'gcloud_auth'
@property
def credentials(self):
cred_type = self.cfg.cred_type

View File

@ -18,6 +18,7 @@ import json
import string
import time
import traceback
import urlparse
from googleapiclient import discovery
from googleapiclient import schema
@ -32,6 +33,8 @@ from gceapi.tests.functional import credentials
CONF = config.CONF.gce
LOG = logging.getLogger("gceapi")
API_NAME = 'compute'
API_VER = 'v1'
def trace(msg):
@ -93,44 +96,79 @@ class GCEApi(object):
url = self._discovery_url
trace('Build Google compute api with discovery url {}'.format(url))
self._compute = discovery.build(
'compute', 'v1',
API_NAME,
API_VER,
credentials=credentials,
discoveryServiceUrl=url
)
@property
def _discovery_url(self):
cfg = CONF
return '{}://{}:{}{}'.format(
cfg.protocol,
cfg.host,
cfg.port,
cfg.discovery_url
)
@property
def compute(self):
assert(self._compute is not None)
return self._compute
@property
def base_url(self):
cfg = CONF
return '{}://{}:{}'.format(
cfg.protocol,
cfg.host,
cfg.port
)
def validate_schema(self, value, schema_name):
schema = self._schema.get(schema_name)
jsonschema.validate(value, schema, resolver=self._scheme_ref_resolver)
@staticmethod
def _is_absolute_url(url):
return bool(urlparse.urlparse(url).netloc)
@staticmethod
def _is_standard_port(protocol, port):
_map = {'http': 80, 'https': 443}
return _map[protocol] == port
@property
def _host_url(self):
cfg = CONF
if self._is_standard_port(cfg.protocol, cfg.port):
return '{}://{}'.format(cfg.protocol, cfg.host)
return '{}://{}:{}'.format(cfg.protocol, cfg.host, cfg.port)
@property
def _discovery_url(self):
t = '{}{}' if CONF.discovery_url.startswith('/') else '{}/{}'
return t.format(self._host_url, CONF.discovery_url)
@property
def _api_url(self):
return '{}/{}/{}'.format(self._host_url, API_NAME, API_VER)
@property
def project_url(self):
return '{}/projects/{}'.format(self._api_url, CONF.project_id)
def get_zone_url(self, resource=None, zone=None):
z = zone
if z is None:
z = CONF.zone
if not self._is_absolute_url(z):
t = '{}/{}' if z.startswith('zones/') else '{}/zones/{}'
z = t.format(self.project_url, z)
if resource is None:
return z
return '{}/{}'.format(z, resource)
def get_global_url(self, resource):
if self._is_absolute_url(resource):
return resource
t = '{}/{}' if resource.startswith('projects/') else '{}/projects/{}'
return t.format(self._api_url, resource)
def get_project_url(self, resource):
if self._is_absolute_url(resource):
return resource
t = '{}/{}'
return t.format(self.project_url, resource)
class GCETestCase(base.BaseTestCase):
@property
def api(self):
assert(self._api is not None)
if self._api is None:
self.fail('Api object is None - test is not initialized properly')
return self._api
@property
@ -147,22 +185,30 @@ class GCETestCase(base.BaseTestCase):
@classmethod
def setUpClass(cls):
cp = credentials.CredentialsProvider(CONF)
cls._api = GCEApi(cp)
cls._credentials_provider = credentials.CredentialsProvider(CONF)
cls._api = GCEApi(cls._credentials_provider)
cls._api.init()
super(GCETestCase, cls).setUpClass()
def assertFind(self, item, items_list):
key = 'items'
def assertFind(self, item, items_list, key='items'):
items = []
if key in items_list:
items = items_list[key]
for i in items:
if i['name'] == item:
return
return i
self.fail(
'There is no required item {} in the list {}'.format(item, items))
def assertObject(self, expected, observed):
self.trace('Validate object: \n\texpected: {}\n\tobserved: {}'.
format(expected, observed))
self.assertDictContainsSubset(expected, observed)
@property
def is_real_gce(self):
return self._credentials_provider.is_google_auth
def _get_operations_request(self, name, project, zone, region):
if zone is not None:
return self.api.compute.zoneOperations().get(
@ -197,6 +243,7 @@ class GCETestCase(base.BaseTestCase):
self.trace('Waiting for operation {} to finish...'.format(name))
begin = time.time()
timeout = self.cfg.build_timeout
interval = self.cfg.build_interval
result = None
while time.time() - begin < timeout:
result = self._get_operations_request(
@ -209,8 +256,7 @@ class GCETestCase(base.BaseTestCase):
else:
self.trace("Request {} done successfully".format(name))
return
time.sleep(1)
time.sleep(interval)
self.fail('Request {} failed with timeout {},'
' latest operation status {}'.format(name, timeout, result))

View File

@ -23,7 +23,7 @@ EXPECTED_DISK_1 = {
"fake_project/global/snapshots/fake-snapshot",
"kind": "compute#disk",
"name": "fake-disk-1",
"sizeGb": 2,
"sizeGb": '2',
"sourceSnapshotId": "991cda9c-28bd-420f-8432-f5159def85d6",
"zone": "http://localhost/compute/v1beta15/projects/"
"fake_project/zones/nova",
@ -35,7 +35,7 @@ EXPECTED_DISK_1 = {
}
EXPECTED_DISK_2 = {
"status": "READY",
"sizeGb": 1,
"sizeGb": '1',
"kind": "compute#disk",
"name": "fake-disk-2",
"zone": "http://localhost/compute/v1beta15/projects/"
@ -51,7 +51,7 @@ EXPECTED_DISK_2 = {
}
EXPECTED_DISK_3 = {
"status": "READY",
"sizeGb": 3,
"sizeGb": '3',
"kind": "compute#disk",
"name": "fake-disk-3",
"zone": "http://localhost/compute/v1beta15/projects/"
@ -64,7 +64,7 @@ EXPECTED_DISK_3 = {
}
NEW_DISK = {
"status": "READY",
"sizeGb": 15,
"sizeGb": '15',
"kind": "compute#disk",
"name": "new-disk",
"zone": "http://localhost/compute/v1beta15/projects/"
@ -73,13 +73,12 @@ NEW_DISK = {
"id": "5151144363316117590",
"selfLink": "http://localhost/compute/v1beta15/projects/"
"fake_project/zones/nova/disks/new-disk",
"description": None,
}
NEW_IMAGE_DISK = {
"status": "READY",
"kind": "compute#disk",
"name": "new-image-disk",
"sizeGb": 1,
"sizeGb": '1',
"sourceImage": "http://localhost/compute/v1beta15/projects/"
"fake_project/global/images/fake-image-2",
"sourceImageId": "5721131091780319468",
@ -97,7 +96,7 @@ NEW_SN_DISK = {
"fake_project/global/snapshots/fake-snapshot",
"kind": "compute#disk",
"name": "new-sn-disk",
"sizeGb": 25,
"sizeGb": '25',
"sourceSnapshotId": "991cda9c-28bd-420f-8432-f5159def85d6",
"zone": "http://localhost/compute/v1beta15/projects/"
"fake_project/zones/nova",

View File

@ -24,7 +24,7 @@ EXPECTED_SNAPSHOTS = [{
"id": "8386122516930476063",
"creationTimestamp": "2013-08-14T12:32:28Z",
"status": "READY",
"diskSizeGb": 2,
"diskSizeGb": "2",
"sourceDisk": "http://localhost/compute/v1beta15/projects/"
"fake_project/zones/nova/disks/fake-disk-3",
"name": "fake-snapshot",
@ -111,7 +111,7 @@ class SnapshotsTest(common.GCEControllerTest):
"operationType": "createSnapshot",
"targetId": "9202387718698825406",
"targetLink": "http://localhost/compute/v1beta15/projects/"
"fake_project/zones/nova/disks/fake-disk-3",
"fake_project/zones/nova/disks/fake-disk-3",
}
expected.update(common.COMMON_ZONE_PENDING_OPERATION)
self.assertDictEqual(expected, response.json_body)