Merge "Remove usage of six"

This commit is contained in:
Zuul 2022-06-15 10:43:44 +00:00 committed by Gerrit Code Review
commit e4fbbaae08
4 changed files with 25 additions and 29 deletions

View File

@ -15,7 +15,6 @@
from oslo_concurrency import lockutils from oslo_concurrency import lockutils
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
import six
from neutron.agent import securitygroups_rpc from neutron.agent import securitygroups_rpc
from neutron import manager from neutron import manager
@ -426,7 +425,7 @@ class PortFirewallGroupMap(object):
# information. Need to consider map initialization in __init__() # information. Need to consider map initialization in __init__()
def port_id(self, port): def port_id(self, port):
return (port if isinstance(port, six.string_types) return (port if isinstance(port, str)
else port.get('port_id', port.get('id'))) else port.get('port_id', port.get('id')))
def get_fwg(self, fwg_id): def get_fwg(self, fwg_id):

View File

@ -14,7 +14,6 @@
# under the License. # under the License.
import mock import mock
import six
import testtools import testtools
import webob.exc import webob.exc
@ -54,7 +53,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
with self.firewall_policy(name=name, shared=self.SHARED, with self.firewall_policy(name=name, shared=self.SHARED,
firewall_rules=None, audited=self.AUDITED firewall_rules=None, audited=self.AUDITED
) as firewall_policy: ) as firewall_policy:
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, firewall_policy['firewall_policy'][k]) self.assertEqual(v, firewall_policy['firewall_policy'][k])
def test_create_firewall_policy_with_rules(self): def test_create_firewall_policy_with_rules(self):
@ -70,7 +69,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
with self.firewall_policy(name=name, shared=self.SHARED, with self.firewall_policy(name=name, shared=self.SHARED,
firewall_rules=fw_rule_ids, firewall_rules=fw_rule_ids,
audited=self.AUDITED) as fwp: audited=self.AUDITED) as fwp:
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, fwp['firewall_policy'][k]) self.assertEqual(v, fwp['firewall_policy'][k])
def test_create_firewall_policy_with_previously_associated_rule(self): def test_create_firewall_policy_with_previously_associated_rule(self):
@ -92,7 +91,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
audited=self.AUDITED) as fwp: audited=self.AUDITED) as fwp:
res = self._show_req('firewall_policies', res = self._show_req('firewall_policies',
fwp['firewall_policy']['id']) fwp['firewall_policy']['id'])
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, res['firewall_policy'][k]) self.assertEqual(v, res['firewall_policy'][k])
def test_list_firewall_policies(self): def test_list_firewall_policies(self):
@ -114,7 +113,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
req = self.new_update_request('firewall_policies', data, req = self.new_update_request('firewall_policies', data,
fwp['firewall_policy']['id']) fwp['firewall_policy']['id'])
res = self.deserialize(self.fmt, req.get_response(self.ext_api)) res = self.deserialize(self.fmt, req.get_response(self.ext_api))
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, res['firewall_policy'][k]) self.assertEqual(v, res['firewall_policy'][k])
def _test_update_firewall_policy(self, with_audited): def _test_update_firewall_policy(self, with_audited):
@ -131,7 +130,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
res = self.deserialize(self.fmt, res = self.deserialize(self.fmt,
req.get_response(self.ext_api)) req.get_response(self.ext_api))
attrs['description'] = 'fw_p1' attrs['description'] = 'fw_p1'
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, res['firewall_policy'][k]) self.assertEqual(v, res['firewall_policy'][k])
def test_update_firewall_policy_set_audited_false(self): def test_update_firewall_policy_set_audited_false(self):
@ -161,7 +160,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
# TODO(sridar): set it so that the ordering is maintained # TODO(sridar): set it so that the ordering is maintained
res['firewall_policy']['firewall_rules'] = sorted( res['firewall_policy']['firewall_rules'] = sorted(
res['firewall_policy']['firewall_rules']) res['firewall_policy']['firewall_rules'])
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, res['firewall_policy'][k]) self.assertEqual(v, res['firewall_policy'][k])
def test_update_firewall_policy_replace_rules(self): def test_update_firewall_policy_replace_rules(self):
@ -191,7 +190,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
res = self.deserialize(self.fmt, res = self.deserialize(self.fmt,
req.get_response(self.ext_api)) req.get_response(self.ext_api))
attrs['audited'] = False attrs['audited'] = False
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, res['firewall_policy'][k]) self.assertEqual(v, res['firewall_policy'][k])
@testtools.skip('bug/1614673') @testtools.skip('bug/1614673')
@ -256,7 +255,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
# check if none of the rules got added to the policy # check if none of the rules got added to the policy
res = self._show_req('firewall_policies', res = self._show_req('firewall_policies',
fwp['firewall_policy']['id']) fwp['firewall_policy']['id'])
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, res['firewall_policy'][k]) self.assertEqual(v, res['firewall_policy'][k])
def test_update_shared_firewall_policy_with_nonshared_rule(self): def test_update_shared_firewall_policy_with_nonshared_rule(self):
@ -434,28 +433,28 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
attrs = self._get_test_firewall_rule_attrs() attrs = self._get_test_firewall_rule_attrs()
with self.firewall_rule() as firewall_rule: with self.firewall_rule() as firewall_rule:
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, firewall_rule['firewall_rule'][k]) self.assertEqual(v, firewall_rule['firewall_rule'][k])
attrs['source_port'] = None attrs['source_port'] = None
attrs['destination_port'] = None attrs['destination_port'] = None
with self.firewall_rule(source_port=None, with self.firewall_rule(source_port=None,
destination_port=None) as firewall_rule: destination_port=None) as firewall_rule:
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, firewall_rule['firewall_rule'][k]) self.assertEqual(v, firewall_rule['firewall_rule'][k])
attrs['source_port'] = '10000' attrs['source_port'] = '10000'
attrs['destination_port'] = '80' attrs['destination_port'] = '80'
with self.firewall_rule(source_port=10000, with self.firewall_rule(source_port=10000,
destination_port=80) as firewall_rule: destination_port=80) as firewall_rule:
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, firewall_rule['firewall_rule'][k]) self.assertEqual(v, firewall_rule['firewall_rule'][k])
attrs['source_port'] = '10000' attrs['source_port'] = '10000'
attrs['destination_port'] = '80' attrs['destination_port'] = '80'
with self.firewall_rule(source_port='10000', with self.firewall_rule(source_port='10000',
destination_port='80') as firewall_rule: destination_port='80') as firewall_rule:
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, firewall_rule['firewall_rule'][k]) self.assertEqual(v, firewall_rule['firewall_rule'][k])
def test_create_firewall_src_port_illegal_range(self): def test_create_firewall_src_port_illegal_range(self):
@ -485,7 +484,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
with self.firewall_rule(source_port=None, with self.firewall_rule(source_port=None,
destination_port=None, destination_port=None,
protocol='icmp') as firewall_rule: protocol='icmp') as firewall_rule:
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, firewall_rule['firewall_rule'][k]) self.assertEqual(v, firewall_rule['firewall_rule'][k])
def test_create_firewall_without_source(self): def test_create_firewall_without_source(self):
@ -519,7 +518,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
with self.firewall_rule() as fw_rule: with self.firewall_rule() as fw_rule:
res = self._show_req('firewall_rules', res = self._show_req('firewall_rules',
fw_rule['firewall_rule']['id']) fw_rule['firewall_rule']['id'])
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, res['firewall_rule'][k]) self.assertEqual(v, res['firewall_rule'][k])
@testtools.skip('bug/1614673') @testtools.skip('bug/1614673')
@ -537,7 +536,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
req.get_response(self.ext_api) req.get_response(self.ext_api)
res = self._show_req('firewall_rules', res = self._show_req('firewall_rules',
fw_rule['firewall_rule']['id']) fw_rule['firewall_rule']['id'])
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, res['firewall_rule'][k]) self.assertEqual(v, res['firewall_rule'][k])
def test_create_firewall_rule_with_ipv6_addrs_and_wrong_ip_version(self): def test_create_firewall_rule_with_ipv6_addrs_and_wrong_ip_version(self):
@ -586,7 +585,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
fwr['firewall_rule']['id']) fwr['firewall_rule']['id'])
res = self.deserialize(self.fmt, res = self.deserialize(self.fmt,
req.get_response(self.ext_api)) req.get_response(self.ext_api))
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, res['firewall_rule'][k]) self.assertEqual(v, res['firewall_rule'][k])
attrs['source_port'] = '10000' attrs['source_port'] = '10000'
@ -600,7 +599,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
fwr['firewall_rule']['id']) fwr['firewall_rule']['id'])
res = self.deserialize(self.fmt, res = self.deserialize(self.fmt,
req.get_response(self.ext_api)) req.get_response(self.ext_api))
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, res['firewall_rule'][k]) self.assertEqual(v, res['firewall_rule'][k])
attrs['source_port'] = '10000' attrs['source_port'] = '10000'
@ -614,7 +613,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
fwr['firewall_rule']['id']) fwr['firewall_rule']['id'])
res = self.deserialize(self.fmt, res = self.deserialize(self.fmt,
req.get_response(self.ext_api)) req.get_response(self.ext_api))
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, res['firewall_rule'][k]) self.assertEqual(v, res['firewall_rule'][k])
attrs['source_port'] = None attrs['source_port'] = None
@ -627,7 +626,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
fwr['firewall_rule']['id']) fwr['firewall_rule']['id'])
res = self.deserialize(self.fmt, res = self.deserialize(self.fmt,
req.get_response(self.ext_api)) req.get_response(self.ext_api))
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, res['firewall_rule'][k]) self.assertEqual(v, res['firewall_rule'][k])
def test_update_firewall_rule_with_port_and_no_proto(self): def test_update_firewall_rule_with_port_and_no_proto(self):
@ -729,7 +728,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
fwr['firewall_rule']['id']) fwr['firewall_rule']['id'])
res = self.deserialize(self.fmt, res = self.deserialize(self.fmt,
req.get_response(self.ext_api)) req.get_response(self.ext_api))
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, res['firewall_rule'][k]) self.assertEqual(v, res['firewall_rule'][k])
res = self._show_req('firewall_policies', res = self._show_req('firewall_policies',
fwp['firewall_policy']['id']) fwp['firewall_policy']['id'])
@ -1038,7 +1037,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
admin_state_up=self.ADMIN_STATE_UP) as firewall_group: admin_state_up=self.ADMIN_STATE_UP) as firewall_group:
res = self._show_req('firewall_groups', res = self._show_req('firewall_groups',
firewall_group['firewall_group']['id']) firewall_group['firewall_group']['id'])
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, res['firewall_group'][k]) self.assertEqual(v, res['firewall_group'][k])
def test_show_firewall_group(self): def test_show_firewall_group(self):
@ -1089,7 +1088,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
firewall['firewall_group']['id']) firewall['firewall_group']['id'])
res = self.deserialize(self.fmt, res = self.deserialize(self.fmt,
req.get_response(self.ext_api)) req.get_response(self.ext_api))
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, res['firewall_group'][k]) self.assertEqual(v, res['firewall_group'][k])
def test_existing_default_create_default_firewall_group(self): def test_existing_default_create_default_firewall_group(self):

View File

@ -14,7 +14,6 @@
# under the License. # under the License.
import mock import mock
import six
from neutron import extensions as neutron_extensions from neutron import extensions as neutron_extensions
from neutron.tests.unit.extensions import test_l3 from neutron.tests.unit.extensions import test_l3
@ -526,7 +525,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
fwr['firewall_rule']['id']) fwr['firewall_rule']['id'])
res = self.deserialize(self.fmt, res = self.deserialize(self.fmt,
req.get_response(self.ext_api)) req.get_response(self.ext_api))
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, res['firewall_rule'][k]) self.assertEqual(v, res['firewall_rule'][k])
def test_update_firewall_rule_on_pending_create_fwg(self): def test_update_firewall_rule_on_pending_create_fwg(self):

View File

@ -16,7 +16,6 @@
import contextlib import contextlib
import mock import mock
import six
import webob.exc import webob.exc
from neutron.api import extensions as api_ext from neutron.api import extensions as api_ext
@ -424,7 +423,7 @@ class FirewallPluginV2TestCase(test_db_plugin.NeutronDbPluginV2TestCase):
admin_state_up=self.ADMIN_STATE_UP, admin_state_up=self.ADMIN_STATE_UP,
ports=attrs['ports'] if 'ports' in attrs else None, ports=attrs['ports'] if 'ports' in attrs else None,
) as firewall_group: ) as firewall_group:
for k, v in six.iteritems(attrs): for k, v in attrs.items():
self.assertEqual(v, firewall_group['firewall_group'][k]) self.assertEqual(v, firewall_group['firewall_group'][k])