Merge "mypy: Enable checks for openstack.tests.functional"

This commit is contained in:
Zuul 2024-08-02 20:39:18 +00:00 committed by Gerrit Code Review
commit fc0d683a1c
20 changed files with 110 additions and 105 deletions

View File

@ -10,12 +10,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing as ty
from openstack.tests.functional import base
class BaseBaremetalTest(base.BaseFunctionalTest):
min_microversion = None
node_id = None
min_microversion: ty.Optional[str] = None
node_id: str
def setUp(self):
super().setUp()

View File

@ -41,6 +41,10 @@ def _disable_keep_alive(conn):
class BaseFunctionalTest(base.TestCase):
user_cloud: connection.Connection
user_cloud_alt: connection.Connection
operator_cloud: connection.Connection
_wait_for_timeout_key = ''
def setUp(self):
@ -49,21 +53,32 @@ class BaseFunctionalTest(base.TestCase):
_disable_keep_alive(self.conn)
self._demo_name = os.environ.get('OPENSTACKSDK_DEMO_CLOUD', 'devstack')
if not self._demo_name:
raise self.failureException(
"OPENSTACKSDK_OPERATOR_CLOUD must be set to a non-empty value"
)
self._demo_name_alt = os.environ.get(
'OPENSTACKSDK_DEMO_CLOUD_ALT',
'devstack-alt',
)
if not self._demo_name_alt:
raise self.failureException(
"OPENSTACKSDK_OPERATOR_CLOUD must be set to a non-empty value"
)
self._op_name = os.environ.get(
'OPENSTACKSDK_OPERATOR_CLOUD',
'devstack-admin',
)
if not self._op_name:
raise self.failureException(
"OPENSTACKSDK_OPERATOR_CLOUD must be set to a non-empty value"
)
self.config = openstack.config.OpenStackConfig()
self._set_user_cloud()
if self._op_name:
self._set_operator_cloud()
else:
self.operator_cloud = None
self.identity_version = self.user_cloud.config.get_api_version(
'identity'
@ -86,16 +101,11 @@ class BaseFunctionalTest(base.TestCase):
self.user_cloud = connection.Connection(config=user_config)
_disable_keep_alive(self.user_cloud)
# This cloud is used by the project_cleanup test, so you can't rely on
# it
if self._demo_name_alt:
user_config_alt = self.config.get_one(
cloud=self._demo_name_alt, **kwargs
)
self.user_cloud_alt = connection.Connection(config=user_config_alt)
_disable_keep_alive(self.user_cloud_alt)
else:
self.user_cloud_alt = None
def _set_operator_cloud(self, **kwargs):
operator_config = self.config.get_one(cloud=self._op_name, **kwargs)

View File

@ -180,7 +180,7 @@ class TestFloatingIP(base.BaseFunctionalTest):
else:
# Find network names for nova-net
data = proxy._json_response(
self.user_cloud._conn.compute.get('/os-tenant-networks')
self.user_cloud.compute.get('/os-tenant-networks')
)
nets = meta.get_and_munchify('networks', data)
self.addDetail(

View File

@ -23,7 +23,6 @@ import tempfile
from testtools import content
from openstack import exceptions
from openstack.tests.functional import base
@ -93,18 +92,9 @@ class TestObject(base.BaseFunctionalTest):
'testk'
],
)
try:
self.assertIsNotNone(
self.user_cloud.get_object(container_name, name)
)
except exceptions.SDKException as e:
self.addDetail(
'failed_response',
content.text_content(str(e.response.headers)),
)
self.addDetail(
'failed_response', content.text_content(e.response.text)
)
self.assertEqual(
name, self.user_cloud.list_objects(container_name)[0]['name']
)
@ -134,7 +124,7 @@ class TestObject(base.BaseFunctionalTest):
(64 * 1024, 5), # 64MB, 5 segments
)
for size, nseg in sizes:
fake_content = ''
fake_content = b''
segment_size = int(round(size / nseg))
with tempfile.NamedTemporaryFile() as fake_file:
fake_content = ''.join(
@ -179,22 +169,14 @@ class TestObject(base.BaseFunctionalTest):
'testk'
],
)
try:
with tempfile.NamedTemporaryFile() as fake_file:
self.user_cloud.get_object(
container_name, name, outfile=fake_file.name
)
downloaded_content = open(fake_file.name, 'rb').read()
self.assertEqual(fake_content, downloaded_content)
except exceptions.SDKException as e:
self.addDetail(
'failed_response',
content.text_content(str(e.response.headers)),
)
self.addDetail(
'failed_response', content.text_content(e.response.text)
)
raise
self.assertEqual(
name, self.user_cloud.list_objects(container_name)[0]['name']
)

View File

@ -16,8 +16,11 @@ test_project_cleanup
Functional tests for project cleanup methods.
"""
import queue
from openstack.network.v2 import network as _network
from openstack import resource
from openstack.tests.functional import base
@ -52,7 +55,7 @@ class TestProjectCleanup(base.BaseFunctionalTest):
def test_cleanup(self):
self._create_network_resources()
status_queue = queue.Queue()
status_queue: queue.Queue[resource.Resource] = queue.Queue()
# First round - check no resources are old enough
self.conn.project_cleanup(
@ -127,7 +130,7 @@ class TestProjectCleanup(base.BaseFunctionalTest):
if not self.user_cloud.has_service('object-store'):
self.skipTest('Object service is requred, but not available')
status_queue = queue.Queue()
status_queue: queue.Queue[resource.Resource] = queue.Queue()
vol = self.conn.block_storage.create_volume(name='vol1', size='1')
self.conn.block_storage.wait_for_status(vol)
@ -211,7 +214,8 @@ class TestProjectCleanup(base.BaseFunctionalTest):
if not self.user_cloud.has_service('object-store'):
self.skipTest('Object service is requred, but not available')
status_queue = queue.Queue()
status_queue: queue.Queue[resource.Resource] = queue.Queue()
self.conn.object_store.create_container('test_cleanup')
for i in range(1, 10):
self.conn.object_store.create_object(
@ -261,15 +265,14 @@ class TestProjectCleanup(base.BaseFunctionalTest):
if not list(self.conn.network.service_providers(service_type="VPN")):
self.skipTest("VPNaaS plugin is requred, but not available")
status_queue = queue.Queue()
status_queue: queue.Queue[resource.Resource] = queue.Queue()
# Find available external networks and use one
external_network = None
for network in self.conn.network.networks():
if network.is_router_external:
external_network = network
external_network: _network.Network = network
break
if not external_network:
else:
self.skipTest("External network is required, but not available")
# Create left network resources

View File

@ -148,10 +148,10 @@ class TestVolume(base.BaseFunctionalTest):
volumes.append(v)
self.addCleanup(self.cleanup, volumes)
result = []
for i in self.user_cloud.list_volumes():
if i['name'] and i['name'].startswith(self.id()):
result.append(i['id'])
self.assertEqual(sorted([i['id'] for i in volumes]), sorted(result))
for v in self.user_cloud.list_volumes():
if v['name'] and v['name'].startswith(self.id()):
result.append(v['id'])
self.assertEqual(sorted([v['id'] for v in volumes]), sorted(result))
def test_update_volume(self):
name, desc = self.getUniqueString('name'), self.getUniqueString('desc')

View File

@ -115,7 +115,7 @@ class TestVolume(base.BaseFunctionalTest):
self.assertEqual(2, len(backups))
backups = self.user_cloud.list_volume_backups(
search_opts={"name": backup_name_1}
filters={"name": backup_name_1}
)
self.assertEqual(1, len(backups))
self.assertEqual(backup_name_1, backups[0]['name'])

View File

@ -68,7 +68,6 @@ class TestServer(ft_base.BaseComputeTest):
def setUp(self):
super().setUp()
self.NAME = self.getUniqueString()
self.server = None
self.network = None
self.subnet = None
self.cidr = '10.99.99.0/16'

View File

@ -12,12 +12,14 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# import unittest
import typing as ty
from openstack.compute.v2 import hypervisor
from openstack import connection
from openstack.tests.functional import base
HYPERVISORS = []
HYPERVISORS: ty.List[hypervisor.Hypervisor] = []
def hypervisors():

View File

@ -17,7 +17,7 @@ from openstack.tests.functional import base
class TestAgent(base.BaseFunctionalTest):
AGENT = None
AGENT: agent.Agent
DESC = "test description"
def validate_uuid(self, s):

View File

@ -10,15 +10,15 @@
# License for the specific language governing permissions and limitations
# under the License.
from openstack.network.v2 import agent
from openstack.network.v2 import network
from openstack.tests.functional import base
class TestAgentNetworks(base.BaseFunctionalTest):
NETWORK_ID = None
AGENT = None
AGENT_ID = None
NETWORK_ID: str
AGENT: agent.Agent
AGENT_ID: str
def setUp(self):
super().setUp()

View File

@ -10,14 +10,14 @@
# License for the specific language governing permissions and limitations
# under the License.
from openstack.network.v2 import agent
from openstack.network.v2 import router
from openstack.tests.functional import base
class TestAgentRouters(base.BaseFunctionalTest):
ROUTER = None
AGENT = None
ROUTER: router.Router
AGENT: agent.Agent
def setUp(self):
super().setUp()

View File

@ -23,13 +23,13 @@ class TestFloatingIP(base.BaseFunctionalTest):
IPV4 = 4
EXT_CIDR = "10.100.0.0/24"
INT_CIDR = "10.101.0.0/24"
EXT_NET_ID = None
INT_NET_ID = None
EXT_SUB_ID = None
INT_SUB_ID = None
ROT_ID = None
PORT_ID = None
FIP = None
EXT_NET_ID: str
INT_NET_ID: str
EXT_SUB_ID: str
INT_SUB_ID: str
ROT_ID: str
PORT_ID: str
FIP: floating_ip.FloatingIP
DNS_DOMAIN = "example.org."
DNS_NAME = "fip1"
@ -43,8 +43,6 @@ class TestFloatingIP(base.BaseFunctionalTest):
self.ROT_NAME = self.getUniqueString()
self.INT_NET_NAME = self.getUniqueString()
self.INT_SUB_NAME = self.getUniqueString()
self.EXT_NET_ID = None
self.EXT_SUB_ID = None
self.is_dns_supported = False
# Find External Network
@ -58,8 +56,9 @@ class TestFloatingIP(base.BaseFunctionalTest):
# credentials available
# WARNING: this external net is not dropped
# Create External Network
args = {"router:external": True}
net = self._create_network(self.EXT_NET_NAME, **args)
net = self._create_network(
self.EXT_NET_NAME, **{"router:external": True}
)
self.EXT_NET_ID = net.id
sub = self._create_subnet(
self.EXT_SUB_NAME, self.EXT_NET_ID, self.EXT_CIDR
@ -74,8 +73,10 @@ class TestFloatingIP(base.BaseFunctionalTest):
)
self.INT_SUB_ID = sub.id
# Create Router
args = {"external_gateway_info": {"network_id": self.EXT_NET_ID}}
sot = self.user_cloud.network.create_router(name=self.ROT_NAME, **args)
sot = self.user_cloud.network.create_router(
name=self.ROT_NAME,
**{"external_gateway_info": {"network_id": self.EXT_NET_ID}}
)
assert isinstance(sot, router.Router)
self.assertEqual(self.ROT_NAME, sot.name)
self.ROT_ID = sot.id

View File

@ -52,8 +52,9 @@ class TestNDPProxy(base.BaseFunctionalTest):
# credentials available
# WARNING: this external net is not dropped
# Create External Network
args = {"router:external": True}
net = self._create_network(self.EXT_NET_NAME, **args)
net = self._create_network(
self.EXT_NET_NAME, **{"router:external": True}
)
self.EXT_NET_ID = net.id
sub = self._create_subnet(
self.EXT_SUB_NAME, self.EXT_NET_ID, self.EXT_CIDR
@ -61,11 +62,13 @@ class TestNDPProxy(base.BaseFunctionalTest):
self.EXT_SUB_ID = sub.id
# Create Router
args = {
sot = self.user_cloud.network.create_router(
name=self.ROT_NAME,
**{
"external_gateway_info": {"network_id": self.EXT_NET_ID},
"enable_ndp_proxy": True,
}
sot = self.user_cloud.network.create_router(name=self.ROT_NAME, **args)
},
)
assert isinstance(sot, router.Router)
self.assertEqual(self.ROT_NAME, sot.name)
self.ROT_ID = sot.id

View File

@ -67,8 +67,9 @@ class TestPortForwarding(base.BaseFunctionalTest):
# credentials available
# WARNING: this external net is not dropped
# Create External Network
args = {"router:external": True}
net = self._create_network(self.EXT_NET_NAME, **args)
net = self._create_network(
self.EXT_NET_NAME, **{"router:external": True}
)
self.EXT_NET_ID = net.id
sub = self._create_subnet(
self.EXT_SUB_NAME, self.EXT_NET_ID, self.EXT_CIDR
@ -83,8 +84,10 @@ class TestPortForwarding(base.BaseFunctionalTest):
)
self.INT_SUB_ID = sub.id
# Create Router
args = {"external_gateway_info": {"network_id": self.EXT_NET_ID}}
sot = self.user_cloud.network.create_router(name=self.ROT_NAME, **args)
sot = self.user_cloud.network.create_router(
name=self.ROT_NAME,
**{"external_gateway_info": {"network_id": self.EXT_NET_ID}}
)
assert isinstance(sot, router.Router)
self.assertEqual(self.ROT_NAME, sot.name)
self.ROT_ID = sot.id

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing as ty
from openstack.network.v2 import qos_policy as _qos_policy
from openstack.tests.functional import base
@ -19,7 +20,7 @@ class TestQoSPolicy(base.BaseFunctionalTest):
QOS_POLICY_ID = None
IS_SHARED = False
IS_DEFAULT = False
RULES = []
RULES: ty.List[str] = []
QOS_POLICY_DESCRIPTION = "QoS policy description"
def setUp(self):

View File

@ -20,10 +20,10 @@ from openstack.tests.functional import base
class TestRouterInterface(base.BaseFunctionalTest):
CIDR = "10.100.0.0/16"
IPV4 = 4
ROUTER_ID = None
NET_ID = None
SUB_ID = None
ROT = None
ROUTER_ID: str
NET_ID: str
SUB_ID: str
ROT: router.Router
def setUp(self):
super().setUp()

View File

@ -10,12 +10,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing as ty
from openstack import resource
from openstack.tests.functional import base
class BaseSharedFileSystemTest(base.BaseFunctionalTest):
min_microversion = None
min_microversion: ty.Optional[str] = None
def setUp(self):
super().setUp()

View File

@ -10,6 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing as ty
from openstack.shared_file_system.v2 import share as _share
from openstack.tests.functional.shared_file_system import base
@ -80,7 +82,7 @@ class ShareMetadataTest(base.BaseSharedFileSystemTest):
new_meta = {"newFoo": "newBar"}
full_meta = {"foo": "bar", "newFoo": "newBar"}
empty_meta = {}
empty_meta: ty.Dict[str, str] = {}
updated_share = (
self.user_cloud.shared_file_system.update_share_metadata(

View File

@ -40,16 +40,11 @@ incremental = true
check_untyped_defs = true
warn_unused_ignores = false
# keep this in-sync with 'mypy.exclude' in '.pre-commit-config.yaml'
# TODO(stephenfin) Eventually we should remove everything here except the
# unit tests module
exclude = (?x)(
doc
| examples
| releasenotes
)
[mypy-openstack.tests.functional.*]
ignore_errors = true
[mypy-openstack.tests.unit.*]
ignore_errors = true