From 24e7bc97f153d98f4f8e34e0f720b39c1476b9b4 Mon Sep 17 00:00:00 2001 From: Andrey Pavlov Date: Mon, 20 Feb 2017 12:21:07 +0300 Subject: [PATCH] remove obsolete tests Change-Id: I4e530476cedb8fd415cc8a117f1d292fed619180 --- gceapi/tests/obsolete/base.py | 289 ---------- gceapi/tests/obsolete/test_gce_scenario.py | 590 --------------------- 2 files changed, 879 deletions(-) delete mode 100644 gceapi/tests/obsolete/base.py delete mode 100644 gceapi/tests/obsolete/test_gce_scenario.py diff --git a/gceapi/tests/obsolete/base.py b/gceapi/tests/obsolete/base.py deleted file mode 100644 index f4333f3..0000000 --- a/gceapi/tests/obsolete/base.py +++ /dev/null @@ -1,289 +0,0 @@ -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import json -import subprocess -import unittest -import urlparse - -from oslo_log import log as logging -from tempest.lib.common import rest_client -import testtools - -from tempest.common.utils.linux import remote_client -from tempest import config -from tempest import exceptions -from tempest.lib.common.utils import test_utils -from tempest import manager - -CONF = config.CONF -LOG = logging.getLogger("tempest.thirdparty.gce") -REGION_NAME = 'region-one' - - -class GCEConnection(rest_client.RestClient): - - def __init__(self, auth_provider): - super(GCEConnection, self).__init__(auth_provider, - "gceapi", REGION_NAME) - self.service = CONF.gceapi.catalog_type - - def set_zone(self, zone): - self.zone = zone - - def set_region(self, region): - self.region = region - - def auth_request(self, uri, **kwargs): - if 'headers' not in kwargs: - kwargs['headers'] = {} - kwargs['headers']['AUTHORIZATION'] = "a " + self.token - return self.http_obj.request(uri, **kwargs) - - def _combine_uri(self, *path): - if not len(path): - return None - - parts = [] - gce_cfg = CONF.gceapi - if not urlparse.urlsplit(path[0]).netloc: - parts = [self.base_url, gce_cfg.api_path, self.tenant_name] - parts.extend(path) - uri = "/".join(x.strip("/") for x in parts if x) - return uri - - def _add_params(self, uri, params): - if not params: - return uri - param_list = ["%s=%s" % (param, value) - for (param, value) in params.iteritems()] - return "%s?%s" % (uri, ";".join(param_list)) - - def _convert_response(self, response): - header = response[0] - body = (json.loads(response[1]) - if header.get("content-type") == "application/json" - else None) - return (header.status, body) - - def request(self, *path, **kwargs): - if (self.token is None) or (self.base_url is None): - self._set_auth() - - uri = self._combine_uri(*path) - params = kwargs.pop("params", None) - uri = self._add_params(uri, params) - response = self.auth_request(uri, **kwargs) - return self._convert_response(response) - - def _add_zone_path(self, *path): - return ('zones', self.zone) + path - - def _add_region_path(self, *path): - return ('regions', self.region) + path - - def get(self, *path): - return self.request(*path) - - def zone_get(self, *path): - return self.get(*self._add_zone_path(*path)) - - def region_get(self, *path): - return self.get(*self._add_region_path(*path)) - - def post(self, *path, **kwargs): - req_args = {"method": "POST"} - if "body" in kwargs: - req_args["body"] = json.dumps(kwargs["body"]) - req_args["headers"] = {"Content-Type": "application/json"} - if "params" in kwargs: - req_args["params"] = kwargs["params"] - return self.request(*path, **req_args) - - def zone_post(self, *path, **kwargs): - return self.post(*self._add_zone_path(*path), **kwargs) - - def region_post(self, *path, **kwargs): - return self.post(*self._add_region_path(*path), **kwargs) - - def delete(self, *path): - return self.request(*path, method="DELETE") - - def zone_delete(self, *path): - return self.delete(*self._add_zone_path(*path)) - - -class GCESmokeTestCase(testtools.TestCase): - failed = False - - @classmethod - def setUpClass(cls): - super(GCESmokeTestCase, cls).setUpClass() - cls.gce = GCEConnection(manager.Manager().auth_provider) - cls._trash_bin = [] - - @classmethod - def tearDownClass(cls): - targetLink = None - opLink = None - - def check(): - (status, body) = cls.gce.get(opLink) - if status == 200 and body.get("progress") == 100: - return True - return False - - timeout = CONF.gceapi.operation_timeout - idle = CONF.gceapi.operation_interval - while cls._trash_bin: - targetLink = cls._trash_bin.pop() - (status, body) = cls.gce.delete(targetLink) - if status != 200: - msg = ("Delete operation fails for resource %s" - % targetLink) - LOG.error(msg) - continue - - try: - opLink = body["selfLink"] - result = test_utils.call_until_true(check, timeout, idle) - if not result: - msg = ("Timed out waiting for deletion resource %s" - % targetLink) - LOG.error(msg) - except Exception as exc: - LOG.exception(exc) - - @classmethod - def add_resource_cleanup(cls, resource_link): - if resource_link not in cls._trash_bin: - cls._trash_bin.append(resource_link) - - @classmethod - def cancel_resource_cleanup(cls, resource_link): - if resource_link in cls._trash_bin: - cls._trash_bin.remove(resource_link) - - @staticmethod - def incremental(meth): - def decorator(*args, **kwargs): - try: - meth(*args, **kwargs) - except unittest.SkipTest: - raise - except Exception: - GCESmokeTestCase.failed = True - raise - decorator.__test__ = True - return decorator - - def setUp(self): - if GCESmokeTestCase.failed: - raise unittest.SkipTest("Skipped by previous exception") - super(GCESmokeTestCase, self).setUp() - self.skipTest('Not to run in gating. It is just an old example and ' - 'will be remove removed in future') - - def wait_for_operation(self, body, operation, status): - self.assertEqual("compute#operation", body["kind"]) - self.assertEqual(operation, body["operationType"]) - targetLink = body["targetLink"] - opLink = body["selfLink"] - - def check(): - (http_status, op_body) = self.gce.get(opLink) - self.assertEqual(http_status, 200) - self.assertEqual(targetLink, op_body["targetLink"]) - self.assertEqual(opLink, op_body["selfLink"]) - - error = op_body.get("error") - if error: - msg = "Operation(resource %s) error %s (%s)" % (targetLink, - error["errors"], op_body.get("httpErrorMessage")) - self.fail(msg) - - if op_body["progress"] != 100: - return False - self.assertEqual("DONE", op_body["status"]) - return True - - timeout = CONF.gceapi.operation_timeout - idle = CONF.gceapi.operation_interval - result = test_utils.call_until_true(check, timeout, idle) - if not result: - message = "Timed out waiting for uri %s" % targetLink - raise exceptions.TimeoutException(message) - - (http_status, body) = self.gce.get(targetLink) - # NOTE(apavlov): only for delete* we wait 404. for other 200 - target_status = 404 if "delete" in operation else 200 - self.assertEqual(http_status, target_status) - - if status: - self.assertEqual(status, body["status"]) - - def verify_resource_uri(self, uri, resource_path=None, resource_name=None): - (scheme, netloc, path, query, fragment) = urlparse.urlsplit(str(uri)) - self.assertEqual(self.gce.base_url.strip("/"), - scheme + "://" + netloc) - resource_parts = [CONF.gceapi.api_path.strip("/"), - self.gce.tenant_name] - if resource_path is not None: - resource_parts.append(resource_path.strip("/")) - if resource_name is not None: - resource_parts.append(resource_name) - - self.assertEqual("/" + "/".join(resource_parts), path) - self.assertFalse(query) - self.assertFalse(fragment) - - def verify_region_resource_uri(self, uri, resource_path, resource_name): - resource_path = "/".join(["regions", self.ctx.region["name"], - resource_path.strip("/")]) - self.verify_resource_uri(uri, resource_path, resource_name) - - def verify_zone_resource_uri(self, uri, resource_path, resource_name): - resource_path = "/".join(["zones", self.ctx.zone["name"], - resource_path.strip("/")]) - self.verify_resource_uri(uri, resource_path, resource_name) - - def _get_ip_address(self, network_interface): - if CONF.gceapi.use_floatingip: - ipaddr = network_interface["accessConfigs"][0]["natIP"] - else: - ipaddr = network_interface["networkIP"] - return ipaddr - - def _ping_ip_address(self, ip_address): - cmd = ['ping', '-c1', '-w1', ip_address] - - def ping(): - proc = subprocess.Popen(cmd, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - proc.wait() - return True if proc.returncode == 0 else False - - result = test_utils.call_until_true( - ping, CONF.validation.ping_timeout, 1) - if result: - return - - message = "Timed out waiting for ping %s" % (ip_address) - raise exceptions.TimeoutException(message) - - def _check_ssh_connectivity(self, ip_address, username, pkey): - ssh = remote_client.RemoteClient(ip_address, username, pkey=pkey) - ssh.validate_authentication() diff --git a/gceapi/tests/obsolete/test_gce_scenario.py b/gceapi/tests/obsolete/test_gce_scenario.py deleted file mode 100644 index 7169f62..0000000 --- a/gceapi/tests/obsolete/test_gce_scenario.py +++ /dev/null @@ -1,590 +0,0 @@ -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import itertools -import netaddr -import testtools - -from oslo_log import log as logging -from tempest.common.utils import data_utils -from tempest import config - -from gceapi.tests.functional import base as base_gce - - -CONF = config.CONF -LOG = logging.getLogger("tempest.thirdparty.gce") - -PUBLIC_KEY = ("ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCiU5kpbgCLrKxP1LYH9" - "dumtf8d6Rb+CX/6irKYyJNbsNYSX1skM9jur17TiFlXQFCjorNYXZ/A1e" - "EKbiDcZUKrINhibQfQlAJZpYP1isLUwJlUhJtGFFBW38wTuyG0MFBO+TF" - "RtAG8GQRRfGDxIXvwUxuDR8sClNuTc0MURTbLCJGPFaK2S99NElNYP7R0" - "QpzQyTHkfl492NKD9Zr7kjvnssqihuQ8dZ0dh5xE2RuF9VChdmmPmsfQG" - "qtRXS6xf1Dy0rPHilEcJpGevcUs/JcqEnUd455uugfdueHLqhOvUt3WJU" - "6mThQ28kTAe7nN17Pj3yKRyurF42bigVKNBudD GCE API testing") - -PRIVATE_KEY = ( - "-----BEGIN RSA PRIVATE KEY-----\n" - "MIIEogIBAAKCAQEAolOZKW4Ai6ysT9S2B/XbprX/HekW/gl/+oqymMiTW7DWEl9b\n" - "JDPY7q9e04hZV0BQo6KzWF2fwNXhCm4g3GVCqyDYYm0H0JQCWaWD9YrC1MCZVISb\n" - "RhRQVt/ME7shtDBQTvkxUbQBvBkEUXxg8SF78FMbg0fLApTbk3NDFEU2ywiRjxWi\n" - "tkvfTRJTWD+0dEKc0Mkx5H5ePdjSg/Wa+5I757LKoobkPHWdHYecRNkbhfVQoXZp\n" - "j5rH0BqrUV0usX9Q8tKzx4pRHCaRnr3FLPyXKhJ1HeOebroH3bnhy6oTr1Ld1iVO\n" - "pk4UNvJEwHu5zdez498ikcrqxeNm4oFSjQbnQwIDAQABAoIBAG0MkjlF3/H1V3Dt\n" - "6jfgz+XoH/H9E+gng6VRpfeDz5LqcnW3P6hLeHGouKCM2dAGseWsOKWlh9vpExyJ\n" - "rWPCVw5Vq2g77OMPe6Cz07mRtZ9tn9QqnZFvtiUWhae/sD23s0vKlnpX3k550+/W\n" - "Cd4T64ogmrwP7+7VB8m/xhGJCe1My2j3bziloNo/3hmmZQPjgSAVn8sDLCmRGt84\n" - "TYO/f4yY9ftGsWZEa0GhtixBGs9YviyuHz1ANyTGJg6VJ/GIwaK/sefD22MGKWTN\n" - "AMuVdPThDwGftcL6Apd44yiiIbm5ufD7w2ZS2l9/dG+0RXV+iSTXQZlNn6MHo3zw\n" - "ebc+m6ECgYEAzqImO6hyKAEnWFgXc/NgLk4xxdumEbn3FqVqeTKlJTuUGqiAVzfD\n" - "+UDswIvRKwxGnJXlNTMwkT6w9LeFC5W7xvnr4a2YFj68oDNY4Gi6CFcv3kG7/6MR\n" - "u9bLtxDh3Q4JHwhLO5cWejIdZ4P+9aG7GzUTqofPMmXEEaiiam7NonMCgYEAyRuc\n" - "J1TOm3B/zy29rYgY8BLgdEsdQN07v1JA1xcNG6A24mxBbPrkOuDrZ0kLtnFewVFG\n" - "4fLEO5MQBhsRKHK0pw3VE5azO8jHyFzccEjuObUeuYXSLxZFmK8jdcIkRirh6O7D\n" - "qCm/cEePnkxIFEIjMrctWXxa/jYEZYheRCXH4/ECgYB0PfvMK+KsZpm/tS7cZ9l/\n" - "szWE3R/7cOZzsvLG45rL60xSAuDQL+rrWX7WgtFUqj8+74RV/UohK2dZA7Sw47cT\n" - "JJ1yA7o/KWPrq3cgJ0ogTwv6uHgOQ6pCRX+sqK6nMLIo5v2LtF9Mtsyb40GW5Tjh\n" - "AWbi1CvXajB2zqsvvM2pyQKBgG6dSBt+ExH+I96BqzWaiRTrXRe6BQIbbXSDOnTU\n" - "Efqi+e06XBYkPYqBEhnCXLXhz5uHJ/S5geO+tO6Wzq4vwVutSQi4OCdm/TQgl4MP\n" - "KjEFhTvH9l694lPj6R4pRahuh8mGIooJRGnugnkwPekeo5uOk1wIAUiXz31FL4xO\n" - "N48RAoGAK7+20dPiStPo8dnFrYjQ2j5xuMO2/0+BaLFhDWTiHHjALCWOHkXJ1JtN\n" - "9LM2cIlCC79p4+7KQwUXvMBcnAx6qwTMHisGg3WfxlD8f7MuDDR+or1fd/c0byti\n" - "z1r/I9Ya6/bAXQOjruxpHkECZl5DEdVvT0E2qL/pQx0rfqnkdfE=\n" - "-----END RSA PRIVATE KEY-----\n") - - -class GCEScenarioContext(object): - zone = None - region = None - machine_type = None - image_name = None - image = None - boot_disk_name = None - boot_disk = None - empty_disk_name = None - empty_disk = None - network_name = None - network_cidr = None - gateway_ip = None - network = None - firewall_name = None - instance_name = None - instance = None - address_name = None - address = None - access_config_name = None - - -class TestGCEScenario(base_gce.GCESmokeTestCase): - @classmethod - def setUpClass(cls): - super(TestGCEScenario, cls).setUpClass() - cls.ctx = GCEScenarioContext() - - @classmethod - def tearDownClass(cls): - super(TestGCEScenario, cls).tearDownClass() - - def setUp(self): - super(TestGCEScenario, self).setUp() - - @base_gce.GCESmokeTestCase.incremental - def test_000_get_zone(self): - (status, body) = self.gce.get("/zones") - self.assertEqual(200, status, message=body.get("message")) - self.ctx.zone = next(itertools.ifilter(lambda x: x["status"] == "UP", - body["items"]), - None) - self.assertIsNotNone(self.ctx.zone) - self.gce.set_zone(self.ctx.zone["name"]) - - @base_gce.GCESmokeTestCase.incremental - def test_001_get_region(self): - (status, body) = self.gce.get("/regions") - self.assertEqual(200, status, message=body.get("message")) - self.ctx.region = next(itertools.ifilter(lambda x: x["status"] == "UP", - body["items"]), - None) - self.assertIsNotNone(self.ctx.region) - self.gce.set_region(self.ctx.region["name"]) - - @base_gce.GCESmokeTestCase.incremental - def test_002_get_machine_type(self): - (status, body) = self.gce.zone_get("/machineTypes") - self.assertEqual(200, status, message=body.get("message")) - machine_types = body.get("items") - self.assertTrue(machine_types) - if CONF.gceapi.machine_type: - self.ctx.machine_type = next( - t for t in machine_types - if t["name"] == CONF.gceapi.machine_type) - else: - self.ctx.machine_type = min(machine_types, - key=lambda x: x["memoryMb"]) - - @base_gce.GCESmokeTestCase.incremental - def test_003_get_project(self): - (status, body) = self.gce.get("") - self.assertEqual(200, status, message=body.get("message")) - - @base_gce.GCESmokeTestCase.incremental - @testtools.skipIf( - not CONF.gceapi["existing_image"], - "Skipped by config settings") - def test_010_get_image(self): - (status, body) = self.gce.get("/global/images", - CONF.gceapi.existing_image) - self.assertEqual(200, status, message=body.get("message")) - self.assertIsNotNone(body) - self.ctx.image = body - - @base_gce.GCESmokeTestCase.incremental - @testtools.skipIf( - CONF.gceapi["existing_image"], - "Skipped by config settings") - def test_011_create_image(self): - self.ctx.image_name = data_utils.rand_name("image-") - (status, body) = self.gce.post( - "/global/images", - body={ - "name": self.ctx.image_name, - "rawDisk": { - "source": CONF.gceapi.http_raw_image, - }, - "sourceType": "RAW", - }) - self.assertEqual(200, status, message=body.get("message")) - self.add_resource_cleanup(body["targetLink"]) - self.verify_resource_uri(body["targetLink"], "/global/images", - self.ctx.image_name) - self.wait_for_operation(body, "insert", "READY") - - @base_gce.GCESmokeTestCase.incremental - @testtools.skipIf( - CONF.gceapi["existing_image"], - "Skipped by config settings") - def test_012_check_image_info(self): - (status, body) = self.gce.get("/global/images", self.ctx.image_name) - self.assertEqual(200, status, message=body.get("message")) - self.assertEqual(self.ctx.image_name, body["name"]) - self.assertEqual("READY", body["status"]) - self.assertEqual("RAW", body["sourceType"]) - self.verify_resource_uri(body["selfLink"], "/global/images", - self.ctx.image_name) - self.ctx.image = body - - @base_gce.GCESmokeTestCase.incremental - @testtools.skipIf( - CONF.gceapi["skip_bootable_volume"], - "Skipped by config settings") - def test_020_create_bootable_disk(self): - self.ctx.boot_disk_name = data_utils.rand_name("boot-disk-") - (status, body) = self.gce.zone_post( - "/disks", - params={ - "sourceImage": self.ctx.image["selfLink"], - }, - body={ - "name": self.ctx.boot_disk_name, - }) - self.assertEqual(200, status, message=body.get("message")) - self.add_resource_cleanup(body["targetLink"]) - self.verify_zone_resource_uri(body["targetLink"], "/disks", - self.ctx.boot_disk_name) - self.wait_for_operation(body, "insert", "READY") - - @base_gce.GCESmokeTestCase.incremental - @testtools.skipIf( - CONF.gceapi["skip_bootable_volume"], - "Skipped by config settings") - def test_021_check_bootable_disk_info(self): - (status, body) = self.gce.zone_get("/disks/", self.ctx.boot_disk_name) - self.assertEqual(200, status, message=body.get("message")) - self.assertEqual(self.ctx.boot_disk_name, body["name"]) - self.assertIsNone(body["description"]) - self.assertEqual("READY", body["status"]) - self.verify_zone_resource_uri(body["selfLink"], "/disks", - self.ctx.boot_disk_name) - self.ctx.boot_disk = body - - @base_gce.GCESmokeTestCase.incremental - @testtools.skipIf( - CONF.gceapi["skip_empty_volume"], - "Skipped by config settings") - def test_022_create_empty_disk(self): - self.ctx.empty_disk_name = data_utils.rand_name("empty-disk-") - (status, body) = self.gce.zone_post( - "/disks", - body={ - "name": self.ctx.empty_disk_name, - "sizeGb": 1, - "description": "test empty volume", - }) - self.assertEqual(200, status, message=body.get("message")) - self.add_resource_cleanup(body["targetLink"]) - self.verify_zone_resource_uri(body["targetLink"], "/disks", - self.ctx.empty_disk_name) - self.wait_for_operation(body, "insert", "READY") - - @base_gce.GCESmokeTestCase.incremental - @testtools.skipIf( - CONF.gceapi["skip_empty_volume"], - "Skipped by config settings") - def test_023_check_empty_disk_info(self): - (status, body) = self.gce.zone_get("/disks/", self.ctx.empty_disk_name) - self.assertEqual(200, status, message=body.get("message")) - self.assertEqual(self.ctx.empty_disk_name, body["name"]) - self.assertEqual("test empty volume", body["description"]) - self.assertEqual("READY", body["status"]) - self.assertEqual(1, body["sizeGb"]) - self.verify_zone_resource_uri(body["selfLink"], "/disks", - self.ctx.empty_disk_name) - self.ctx.empty_disk = body - - @base_gce.GCESmokeTestCase.incremental - def test_030_create_network(self): - self.ctx.network_name = data_utils.rand_name("network-") - cfg = CONF.network - network_cidr = netaddr.IPNetwork(cfg.tenant_network_cidr) - subnet_cidr = next(s for s in network_cidr. - subnet(cfg.tenant_network_mask_bits)) - gateway_ip = netaddr.IPAddress(subnet_cidr.last - 1) - self.ctx.network_cidr = str(subnet_cidr) - self.ctx.gateway_ip = str(gateway_ip) - (status, body) = self.gce.post( - "/global/networks", - body={ - "name": self.ctx.network_name, - "IPv4Range": self.ctx.network_cidr, - "gatewayIPv4": self.ctx.gateway_ip, - }) - self.assertEqual(200, status, message=body.get("message")) - self.add_resource_cleanup(body["targetLink"]) - self.verify_resource_uri(body["targetLink"], "/global/networks", - self.ctx.network_name) - self.wait_for_operation(body, "insert", None) - - @base_gce.GCESmokeTestCase.incremental - def test_031_check_network_info(self): - (status, body) = self.gce.get("/global/networks", - self.ctx.network_name) - self.assertEqual(200, status, message=body.get("message")) - self.assertEqual(self.ctx.network_name, body["name"]) - self.assertEqual(self.ctx.network_cidr, body["IPv4Range"]) - self.assertEqual(self.ctx.gateway_ip, body["gatewayIPv4"]) - self.verify_resource_uri(body["selfLink"], "/global/networks", - self.ctx.network_name) - self.ctx.network = body - - @base_gce.GCESmokeTestCase.incremental - def test_040_create_firewall(self): - self.ctx.firewall_name = data_utils.rand_name("firewall-") - (status, body) = self.gce.post( - "/global/firewalls", - body={ - "name": self.ctx.firewall_name, - "description": "test firewall", - "network": self.ctx.network["selfLink"], - "sourceRanges": ["0.0.0.0/0"], - "allowed": [ - {"IPProtocol": "icmp"}, - { - "IPProtocol": "tcp", - "ports": ["22", "80", "8080-8089"], - }, - ], - }) - self.assertEqual(200, status, message=body.get("message")) - self.add_resource_cleanup(body["targetLink"]) - self.verify_resource_uri(body["targetLink"], "/global/firewalls", - self.ctx.firewall_name) - self.wait_for_operation(body, "insert", None) - - @base_gce.GCESmokeTestCase.incremental - def test_041_check_firewall_info(self): - (status, body) = self.gce.get("/global/firewalls", - self.ctx.firewall_name) - self.assertEqual(200, status, message=body.get("message")) - self.assertEqual(self.ctx.firewall_name, body["name"]) - self.assertTrue(body["description"].startswith("test firewall")) - self.assertEqual(body["network"], self.ctx.network["selfLink"]) - self.assertEqual(body["sourceRanges"], ["0.0.0.0/0"]) - self.assertEqual(2, len(body["allowed"])) - self.assertIn({"IPProtocol": "icmp"}, body["allowed"]) - tcp_range = next((x for x in body["allowed"] - if x["IPProtocol"] == "tcp"), None) - self.assertIsNotNone(tcp_range) - self.assertItemsEqual(["22", "80", "8080-8089"], tcp_range["ports"]) - self.verify_resource_uri(body["selfLink"], "/global/firewalls", - self.ctx.firewall_name) - - @base_gce.GCESmokeTestCase.incremental - def test_050_run_instance(self): - self.ctx.instance_name = data_utils.rand_name("instance-") - disks = [] - if not CONF.gceapi.skip_bootable_volume: - disks.append({ - "kind": self.ctx.boot_disk["kind"], - "type": "PERSISTENT", - "mode": "READ_WRITE", - "source": self.ctx.boot_disk["selfLink"], - "deviceName": "vda", - "boot": True, - }) - if not CONF.gceapi.skip_empty_volume: - disks.append({ - "kind": self.ctx.empty_disk["kind"], - "type": "PERSISTENT", - "mode": "READ_WRITE", - "source": self.ctx.empty_disk["selfLink"], - "deviceName": "vdb", - "boot": False, - }) - body = { - "name": self.ctx.instance_name, - "description": "test instance", - "machineType": self.ctx.machine_type["selfLink"], - "disks": disks, - "metadata": { - "items": [ - { - "key": "sshKeys", - "value": ":".join([data_utils.rand_name("keypair-"), - PUBLIC_KEY]) - }, - ], - }, - "networkInterfaces": [{ - "network": self.ctx.network["selfLink"], - }], - "tags": { - "items": [], - }, - } - if CONF.gceapi.skip_bootable_volume: - body["image"] = self.ctx.image["selfLink"] - (status, body) = self.gce.zone_post("/instances", body=body) - self.assertEqual(200, status, message=body.get("message")) - self.add_resource_cleanup(body["targetLink"]) - self.verify_zone_resource_uri(body["targetLink"], "/instances", - self.ctx.instance_name) - self.wait_for_operation(body, "insert", "RUNNING") - - @base_gce.GCESmokeTestCase.incremental - def test_051_check_instance_info(self): - (status, body) = self.gce.zone_get("/instances/", - self.ctx.instance_name) - self.assertEqual(200, status, message=body.get("message")) - self.assertEqual(self.ctx.instance_name, body["name"]) - self.assertEqual("test instance", body["description"]) - self.assertEqual("RUNNING", body["status"]) - self.assertEqual("ACTIVE", body["statusMessage"]) - self.assertEqual(self.ctx.machine_type["selfLink"], - body["machineType"]) - nwifs = body.get("networkInterfaces") - self.assertEqual(1, len(nwifs)) - nwif = nwifs[0] - try: - self.assertEqual(self.ctx.network_name, nwif["name"]) - except Exception: - LOG.exception("Is nova network here?") - # NOTE(apavlov): change network name for future usage in this case - self.ctx.network_name = nwif["name"] - - cidrs = netaddr.all_matching_cidrs(nwif["networkIP"], - [self.ctx.network_cidr]) - try: - self.assertEqual(1, len(cidrs)) - except Exception: - LOG.exception("Is nova network here?") - - try: - self.assertEqual(self.ctx.network_cidr, str(cidrs[0])) - except Exception: - LOG.exception("Is nova network here?") - - self.verify_zone_resource_uri(body["selfLink"], "/instances", - self.ctx.instance_name) - self.ctx.instance = body - - @testtools.skipIf( - not CONF.gceapi["use_floatingip"], - "Skipped by config settings") - @base_gce.GCESmokeTestCase.incremental - def test_060_reserve_address(self): - self.ctx.address_name = data_utils.rand_name("address-") - (status, body) = self.gce.region_post( - "/addresses", - body={ - "name": self.ctx.address_name, - "description": "test address", - }) - self.assertEqual(200, status, message=body.get("message")) - self.add_resource_cleanup(body["targetLink"]) - self.verify_region_resource_uri(body["targetLink"], "/addresses", - self.ctx.address_name) - self.wait_for_operation(body, "insert", "RESERVED") - - @testtools.skipIf( - not CONF.gceapi["use_floatingip"], - "Skipped by config settings") - @base_gce.GCESmokeTestCase.incremental - def test_061_check_address_info(self): - (status, body) = self.gce.region_get("/addresses/", - self.ctx.address_name) - self.assertEqual(200, status, message=body.get("message")) - self.assertEqual(self.ctx.address_name, body["name"]) - self.assertEqual("test address", body["description"]) - self.assertEqual("RESERVED", body["status"]) - self.assertIsNotNone(body["address"]) - - self.verify_region_resource_uri(body["selfLink"], "/addresses", - self.ctx.address_name) - self.ctx.address = body - - @testtools.skipIf( - not CONF.gceapi["use_floatingip"], - "Skipped by config settings") - @base_gce.GCESmokeTestCase.incremental - def test_062_associate_floating_ip(self): - self.ctx.access_config_name = data_utils.rand_name("accessConfig-") - (status, body) = self.gce.zone_post( - "/instances", - self.ctx.instance_name, - "addAccessConfig", - params={ - "networkInterface": self.ctx.network_name, - }, - body={ - "type": "ONE_TO_ONE_NAT", - "name": self.ctx.access_config_name, - "natIP": self.ctx.address["address"] - }) - self.assertEqual(200, status, message=body.get("message")) - self.wait_for_operation(body, "addAccessConfig", "RUNNING") - - @testtools.skipIf( - not CONF.gceapi["use_floatingip"], - "Skipped by config settings") - @base_gce.GCESmokeTestCase.incremental - def test_063_get_floating_ip_info(self): - (status, body) = self.gce.zone_get("/instances/", - self.ctx.instance_name) - self.assertEqual(200, status, message=body.get("message")) - nwifs = body.get("networkInterfaces") - self.assertEqual(1, len(nwifs)) - nwif = nwifs[0] - fp_infs = nwif.get("accessConfigs") - self.assertEqual(1, len(fp_infs)) - fp_inf = fp_infs[0] - self.assertEqual("ONE_TO_ONE_NAT", fp_inf["type"]) - self.assertEqual(self.ctx.access_config_name, fp_inf["name"]) - self.assertEqual(self.ctx.address["address"], fp_inf["natIP"]) - self.ctx.instance = body - - @base_gce.GCESmokeTestCase.incremental - def test_064_ping_instance(self): - for network_interface in self.ctx.instance["networkInterfaces"]: - ip = self._get_ip_address(network_interface) - self._ping_ip_address(ip) - - @base_gce.GCESmokeTestCase.incremental - def test_065_ssh_instance(self): - for network_interface in self.ctx.instance["networkInterfaces"]: - ip = self._get_ip_address(network_interface) - self._check_ssh_connectivity(ip, - CONF.gceapi.image_username, - PRIVATE_KEY) - - @base_gce.GCESmokeTestCase.incremental - def test_100_reset_instance(self): - (status, body) = self.gce.zone_post("/instances", - self.ctx.instance_name, - "reset") - self.assertEqual(200, status, message=body.get("message")) - self.wait_for_operation(body, "reset", "RUNNING") - - @base_gce.GCESmokeTestCase.incremental - def test_101_ping_reseted_instance(self): - for network_interface in self.ctx.instance["networkInterfaces"]: - ip = self._get_ip_address(network_interface) - self._ping_ip_address(ip) - - @base_gce.GCESmokeTestCase.incremental - def test_900_stop_instance(self): - instance_link = self.ctx.instance["selfLink"] - self.cancel_resource_cleanup(instance_link) - (status, body) = self.gce.delete(instance_link) - self.assertEqual(200, status, message=body.get("message")) - self.wait_for_operation(body, "delete", None) - self.ctx.instance = None - - @testtools.skipIf( - not CONF.gceapi["use_floatingip"], - "Skipped by config settings") - @base_gce.GCESmokeTestCase.incremental - def test_901_release_address(self): - address_link = self.ctx.address["selfLink"] - self.cancel_resource_cleanup(address_link) - (status, body) = self.gce.delete(address_link) - self.assertEqual(200, status, message=body.get("message")) - self.wait_for_operation(body, "delete", None) - self.ctx.address = None - - @base_gce.GCESmokeTestCase.incremental - def test_910_delete_network(self): - network_link = self.ctx.network["selfLink"] - self.cancel_resource_cleanup(network_link) - (status, body) = self.gce.delete(network_link) - self.assertEqual(200, status, message=body.get("message")) - self.wait_for_operation(body, "delete", None) - self.ctx.network = None - - @base_gce.GCESmokeTestCase.incremental - @testtools.skipIf( - CONF.gceapi["skip_empty_volume"], - "Skipped by config settings") - def test_920_delete_empty_disk(self): - disk_link = self.ctx.empty_disk["selfLink"] - self.cancel_resource_cleanup(disk_link) - (status, body) = self.gce.delete(disk_link) - self.assertEqual(200, status, message=body.get("message")) - self.wait_for_operation(body, "delete", None) - self.ctx.empty_disk = None - - @base_gce.GCESmokeTestCase.incremental - @testtools.skipIf( - CONF.gceapi["skip_bootable_volume"], - "Skipped by config settings") - def test_921_delete_bootable_disk(self): - disk_link = self.ctx.boot_disk["selfLink"] - self.cancel_resource_cleanup(disk_link) - (status, body) = self.gce.delete(disk_link) - self.assertEqual(200, status, message=body.get("message")) - self.wait_for_operation(body, "delete", None) - self.ctx.boot_disk = None - - @base_gce.GCESmokeTestCase.incremental - @testtools.skipIf( - CONF.gceapi["existing_image"], - "Skipped by config settings") - def test_922_delete_image(self): - image_link = self.ctx.image["selfLink"] - self.cancel_resource_cleanup(image_link) - (status, body) = self.gce.delete(image_link) - self.assertEqual(200, status, message=body.get("message")) - self.wait_for_operation(body, "delete", None) - self.ctx.image = None