load neutron objects using neutron-lib
To access neutron objects we're moving to a dynamic plugin approach as shown in the depends on and related patches. This patch removes the imports of neutron.objects and uses neutron-lib to load them instead. The remaining base object import will be addressed later. Depends-On: https://review.openstack.org/#/c/570060/ Change-Id: If1b140d95c039833ed4c144fb5f22328345dcd19
This commit is contained in:
parent
c4420c7a17
commit
2f73bec358
@ -24,7 +24,6 @@ from neutron.db import _resource_extend as resource_extend
|
|||||||
from neutron.db import api as db_api
|
from neutron.db import api as db_api
|
||||||
from neutron.db.models import securitygroup as securitygroups_db
|
from neutron.db.models import securitygroup as securitygroups_db
|
||||||
from neutron.extensions import securitygroup as ext_sg
|
from neutron.extensions import securitygroup as ext_sg
|
||||||
from neutron.objects import securitygroup as sg_obj
|
|
||||||
from neutron_lib.api.definitions import port as port_def
|
from neutron_lib.api.definitions import port as port_def
|
||||||
from neutron_lib.api import validators
|
from neutron_lib.api import validators
|
||||||
from neutron_lib.callbacks import events
|
from neutron_lib.callbacks import events
|
||||||
@ -32,6 +31,7 @@ from neutron_lib.callbacks import registry
|
|||||||
from neutron_lib.callbacks import resources
|
from neutron_lib.callbacks import resources
|
||||||
from neutron_lib import constants as n_constants
|
from neutron_lib import constants as n_constants
|
||||||
from neutron_lib.db import model_base
|
from neutron_lib.db import model_base
|
||||||
|
from neutron_lib.objects import registry as obj_reg
|
||||||
from neutron_lib.utils import helpers
|
from neutron_lib.utils import helpers
|
||||||
from neutron_lib.utils import net as n_utils
|
from neutron_lib.utils import net as n_utils
|
||||||
|
|
||||||
@ -98,8 +98,9 @@ class ExtendedSecurityGroupPropertiesMixin(object):
|
|||||||
self._ensure_default_security_group(context, tenant_id)
|
self._ensure_default_security_group(context, tenant_id)
|
||||||
|
|
||||||
with db_api.context_manager.writer.using(context):
|
with db_api.context_manager.writer.using(context):
|
||||||
sg = sg_obj.SecurityGroup(
|
sg = obj_reg.new_instance(
|
||||||
context, id=s.get('id') or uuidutils.generate_uuid(),
|
'SecurityGroup', context,
|
||||||
|
id=s.get('id') or uuidutils.generate_uuid(),
|
||||||
description=s.get('description', ''), project_id=tenant_id,
|
description=s.get('description', ''), project_id=tenant_id,
|
||||||
name=s.get('name', ''), is_default=default_sg)
|
name=s.get('name', ''), is_default=default_sg)
|
||||||
# Note(asarfaty): for unknown reason, removing the 'is_default'
|
# Note(asarfaty): for unknown reason, removing the 'is_default'
|
||||||
|
@ -21,12 +21,11 @@ from neutron_lib.callbacks import registry
|
|||||||
from neutron_lib.callbacks import resources
|
from neutron_lib.callbacks import resources
|
||||||
from neutron_lib import context as n_context
|
from neutron_lib import context as n_context
|
||||||
from neutron_lib.exceptions import dns as dns_exc
|
from neutron_lib.exceptions import dns as dns_exc
|
||||||
|
from neutron_lib.objects import registry as obj_reg
|
||||||
from neutron_lib.plugins import directory
|
from neutron_lib.plugins import directory
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
|
||||||
from neutron.objects import network as net_obj
|
|
||||||
from neutron.objects import ports as port_obj
|
|
||||||
from neutron.services.externaldns import driver
|
from neutron.services.externaldns import driver
|
||||||
|
|
||||||
from vmware_nsx.common import driver_api
|
from vmware_nsx.common import driver_api
|
||||||
@ -56,9 +55,9 @@ class DNSExtensionDriver(driver_api.ExtensionDriver):
|
|||||||
return
|
return
|
||||||
|
|
||||||
if dns_domain:
|
if dns_domain:
|
||||||
net_obj.NetworkDNSDomain(plugin_context,
|
obj_reg.new_instance('NetworkDNSDomain', plugin_context,
|
||||||
network_id=db_data['id'],
|
network_id=db_data['id'],
|
||||||
dns_domain=dns_domain).create()
|
dns_domain=dns_domain).create()
|
||||||
db_data[dns.DNSDOMAIN] = dns_domain
|
db_data[dns.DNSDOMAIN] = dns_domain
|
||||||
|
|
||||||
def process_update_network(self, plugin_context, request_data, db_data):
|
def process_update_network(self, plugin_context, request_data, db_data):
|
||||||
@ -72,7 +71,7 @@ class DNSExtensionDriver(driver_api.ExtensionDriver):
|
|||||||
|
|
||||||
net_id = db_data['id']
|
net_id = db_data['id']
|
||||||
if current_dns_domain:
|
if current_dns_domain:
|
||||||
net_dns_domain = net_obj.NetworkDNSDomain.get_object(
|
net_dns_domain = obj_reg.load_class('NetworkDNSDomain').get_object(
|
||||||
plugin_context,
|
plugin_context,
|
||||||
network_id=net_id)
|
network_id=net_id)
|
||||||
if new_value:
|
if new_value:
|
||||||
@ -83,9 +82,9 @@ class DNSExtensionDriver(driver_api.ExtensionDriver):
|
|||||||
net_dns_domain.delete()
|
net_dns_domain.delete()
|
||||||
db_data[dns.DNSDOMAIN] = ''
|
db_data[dns.DNSDOMAIN] = ''
|
||||||
elif new_value:
|
elif new_value:
|
||||||
net_obj.NetworkDNSDomain(plugin_context,
|
obj_reg.new_instance('NetworkDNSDomain', plugin_context,
|
||||||
network_id=net_id,
|
network_id=net_id,
|
||||||
dns_domain=new_value).create()
|
dns_domain=new_value).create()
|
||||||
db_data[dns.DNSDOMAIN] = new_value
|
db_data[dns.DNSDOMAIN] = new_value
|
||||||
|
|
||||||
def process_create_port(self, plugin_context, request_data, db_data):
|
def process_create_port(self, plugin_context, request_data, db_data):
|
||||||
@ -109,7 +108,8 @@ class DNSExtensionDriver(driver_api.ExtensionDriver):
|
|||||||
dns_name, external_dns_domain,
|
dns_name, external_dns_domain,
|
||||||
self.external_dns_not_needed(plugin_context, network)))
|
self.external_dns_not_needed(plugin_context, network)))
|
||||||
|
|
||||||
dns_data_obj = port_obj.PortDNS(
|
dns_data_obj = obj_reg.new_instance(
|
||||||
|
'PortDNS',
|
||||||
plugin_context,
|
plugin_context,
|
||||||
port_id=db_data['id'],
|
port_id=db_data['id'],
|
||||||
current_dns_name=current_dns_name,
|
current_dns_name=current_dns_name,
|
||||||
@ -141,7 +141,7 @@ class DNSExtensionDriver(driver_api.ExtensionDriver):
|
|||||||
|
|
||||||
def _update_dns_db(self, dns_name, dns_domain, db_data,
|
def _update_dns_db(self, dns_name, dns_domain, db_data,
|
||||||
plugin_context, has_fixed_ips):
|
plugin_context, has_fixed_ips):
|
||||||
dns_data_db = port_obj.PortDNS.get_object(
|
dns_data_db = obj_reg.load_class('PortDNS').get_object(
|
||||||
plugin_context,
|
plugin_context,
|
||||||
port_id=db_data['id'])
|
port_id=db_data['id'])
|
||||||
if dns_data_db:
|
if dns_data_db:
|
||||||
@ -165,13 +165,11 @@ class DNSExtensionDriver(driver_api.ExtensionDriver):
|
|||||||
dns_data_db.update()
|
dns_data_db.update()
|
||||||
return dns_data_db
|
return dns_data_db
|
||||||
if dns_name:
|
if dns_name:
|
||||||
dns_data_db = port_obj.PortDNS(plugin_context,
|
dns_data_db = obj_reg.new_instance(
|
||||||
port_id=db_data['id'],
|
'PortDNS', plugin_context, port_id=db_data['id'],
|
||||||
current_dns_name=dns_name,
|
current_dns_name=dns_name, current_dns_domain=dns_domain,
|
||||||
current_dns_domain=dns_domain,
|
previous_dns_name='', previous_dns_domain='',
|
||||||
previous_dns_name='',
|
dns_name=dns_name)
|
||||||
previous_dns_domain='',
|
|
||||||
dns_name=dns_name)
|
|
||||||
dns_data_db.create()
|
dns_data_db.create()
|
||||||
return dns_data_db
|
return dns_data_db
|
||||||
|
|
||||||
@ -202,7 +200,7 @@ class DNSExtensionDriver(driver_api.ExtensionDriver):
|
|||||||
self._extend_port_dict(db_data, db_data, dns_data_db, plugin_context)
|
self._extend_port_dict(db_data, db_data, dns_data_db, plugin_context)
|
||||||
|
|
||||||
def _process_only_dns_name_update(self, plugin_context, db_data, dns_name):
|
def _process_only_dns_name_update(self, plugin_context, db_data, dns_name):
|
||||||
dns_data_db = port_obj.PortDNS.get_object(
|
dns_data_db = obj_reg.load_class('PortDNS').get_object(
|
||||||
plugin_context,
|
plugin_context,
|
||||||
port_id=db_data['id'])
|
port_id=db_data['id'])
|
||||||
if dns_data_db:
|
if dns_data_db:
|
||||||
@ -210,13 +208,11 @@ class DNSExtensionDriver(driver_api.ExtensionDriver):
|
|||||||
dns_data_db.update()
|
dns_data_db.update()
|
||||||
return dns_data_db
|
return dns_data_db
|
||||||
if dns_name:
|
if dns_name:
|
||||||
dns_data_db = port_obj.PortDNS(plugin_context,
|
dns_data_db = obj_reg.new_instance(
|
||||||
port_id=db_data['id'],
|
'PortDNS', plugin_context, port_id=db_data['id'],
|
||||||
current_dns_name='',
|
current_dns_name='', current_dns_domain='',
|
||||||
current_dns_domain='',
|
previous_dns_name='', previous_dns_domain='',
|
||||||
previous_dns_name='',
|
dns_name=dns_name)
|
||||||
previous_dns_domain='',
|
|
||||||
dns_name=dns_name)
|
|
||||||
dns_data_db.create()
|
dns_data_db.create()
|
||||||
return dns_data_db
|
return dns_data_db
|
||||||
|
|
||||||
@ -436,7 +432,7 @@ def _create_port_in_external_dns_service(resource, event, trigger, **kwargs):
|
|||||||
return
|
return
|
||||||
context = kwargs['context']
|
context = kwargs['context']
|
||||||
port = kwargs['port']
|
port = kwargs['port']
|
||||||
dns_data_db = port_obj.PortDNS.get_object(
|
dns_data_db = obj_reg.load_class('PortDNS').get_object(
|
||||||
context, port_id=port['id'])
|
context, port_id=port['id'])
|
||||||
if not (dns_data_db and dns_data_db['current_dns_name']):
|
if not (dns_data_db and dns_data_db['current_dns_name']):
|
||||||
return
|
return
|
||||||
@ -466,7 +462,7 @@ def _update_port_in_external_dns_service(resource, event, trigger, **kwargs):
|
|||||||
ips_changed = set(original_ips) != set(updated_ips)
|
ips_changed = set(original_ips) != set(updated_ips)
|
||||||
if not any((is_dns_name_changed, is_dns_domain_changed, ips_changed)):
|
if not any((is_dns_name_changed, is_dns_domain_changed, ips_changed)):
|
||||||
return
|
return
|
||||||
dns_data_db = port_obj.PortDNS.get_object(
|
dns_data_db = obj_reg.load_class('PortDNS').get_object(
|
||||||
context, port_id=updated_port['id'])
|
context, port_id=updated_port['id'])
|
||||||
if not (dns_data_db and
|
if not (dns_data_db and
|
||||||
(dns_data_db['previous_dns_name'] or
|
(dns_data_db['previous_dns_name'] or
|
||||||
@ -489,13 +485,13 @@ def _delete_port_in_external_dns_service(resource, event, trigger, **kwargs):
|
|||||||
return
|
return
|
||||||
context = kwargs['context']
|
context = kwargs['context']
|
||||||
port_id = kwargs['port_id']
|
port_id = kwargs['port_id']
|
||||||
dns_data_db = port_obj.PortDNS.get_object(
|
dns_data_db = obj_reg.load_class('PortDNS').get_object(
|
||||||
context, port_id=port_id)
|
context, port_id=port_id)
|
||||||
if not dns_data_db:
|
if not dns_data_db:
|
||||||
return
|
return
|
||||||
if dns_data_db['current_dns_name']:
|
if dns_data_db['current_dns_name']:
|
||||||
ip_allocations = port_obj.IPAllocation.get_objects(context,
|
ip_allocations = obj_reg.load_class('IPAllocation').get_objects(
|
||||||
port_id=port_id)
|
context, port_id=port_id)
|
||||||
records = [str(alloc['ip_address']) for alloc in ip_allocations]
|
records = [str(alloc['ip_address']) for alloc in ip_allocations]
|
||||||
_remove_data_from_external_dns_service(
|
_remove_data_from_external_dns_service(
|
||||||
context, dns_driver, dns_data_db['current_dns_domain'],
|
context, dns_driver, dns_data_db['current_dns_domain'],
|
||||||
|
@ -45,6 +45,7 @@ from neutron_lib.exceptions import flavors as flav_exc
|
|||||||
from neutron_lib.exceptions import l3 as l3_exc
|
from neutron_lib.exceptions import l3 as l3_exc
|
||||||
from neutron_lib.exceptions import multiprovidernet as mpnet_exc
|
from neutron_lib.exceptions import multiprovidernet as mpnet_exc
|
||||||
from neutron_lib.exceptions import port_security as psec_exc
|
from neutron_lib.exceptions import port_security as psec_exc
|
||||||
|
from neutron_lib.objects import registry as obj_reg
|
||||||
from neutron_lib.plugins import constants as plugin_const
|
from neutron_lib.plugins import constants as plugin_const
|
||||||
from neutron_lib.plugins import directory
|
from neutron_lib.plugins import directory
|
||||||
from neutron_lib.plugins import utils
|
from neutron_lib.plugins import utils
|
||||||
@ -84,7 +85,6 @@ from neutron.db import securitygroups_db
|
|||||||
from neutron.db import vlantransparent_db
|
from neutron.db import vlantransparent_db
|
||||||
from neutron.extensions import providernet
|
from neutron.extensions import providernet
|
||||||
from neutron.extensions import securitygroup as ext_sg
|
from neutron.extensions import securitygroup as ext_sg
|
||||||
from neutron.objects import securitygroup
|
|
||||||
from neutron.quota import resource_registry
|
from neutron.quota import resource_registry
|
||||||
from neutron.services.flavors import flavors_plugin
|
from neutron.services.flavors import flavors_plugin
|
||||||
from vmware_nsx.dvs import dvs
|
from vmware_nsx.dvs import dvs
|
||||||
@ -4543,7 +4543,8 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
|
|||||||
"nsx-rule %(nsx_rule_id)s doesn't exist.",
|
"nsx-rule %(nsx_rule_id)s doesn't exist.",
|
||||||
{'id': id, 'nsx_rule_id': nsx_rule_id})
|
{'id': id, 'nsx_rule_id': nsx_rule_id})
|
||||||
if delete_base:
|
if delete_base:
|
||||||
securitygroup.SecurityGroupRule.delete_objects(context, id=id)
|
obj_reg.load_class('SecurityGroupRule').delete_objects(
|
||||||
|
context, id=id)
|
||||||
|
|
||||||
def _remove_vnic_from_spoofguard_policy(self, session, net_id, vnic_id):
|
def _remove_vnic_from_spoofguard_policy(self, session, net_id, vnic_id):
|
||||||
policy_id = nsxv_db.get_spoofguard_policy_id(session, net_id)
|
policy_id = nsxv_db.get_spoofguard_policy_id(session, net_id)
|
||||||
|
@ -14,13 +14,13 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from neutron.objects.qos import policy as qos_policy
|
from neutron_lib.objects import registry as obj_reg
|
||||||
from neutron_lib.services.qos import constants as qos_consts
|
from neutron_lib.services.qos import constants as qos_consts
|
||||||
|
|
||||||
|
|
||||||
def update_network_policy_binding(context, net_id, new_policy_id):
|
def update_network_policy_binding(context, net_id, new_policy_id):
|
||||||
# detach the old policy (if exists) from the network
|
# detach the old policy (if exists) from the network
|
||||||
old_policy = qos_policy.QosPolicy.get_network_policy(
|
old_policy = obj_reg.load_class('QosPolicy').get_network_policy(
|
||||||
context, net_id)
|
context, net_id)
|
||||||
if old_policy:
|
if old_policy:
|
||||||
if old_policy.id == new_policy_id:
|
if old_policy.id == new_policy_id:
|
||||||
@ -29,7 +29,7 @@ def update_network_policy_binding(context, net_id, new_policy_id):
|
|||||||
|
|
||||||
# attach the new policy (if exists) to the network
|
# attach the new policy (if exists) to the network
|
||||||
if new_policy_id is not None:
|
if new_policy_id is not None:
|
||||||
new_policy = qos_policy.QosPolicy.get_object(
|
new_policy = obj_reg.load_class('QosPolicy').get_object(
|
||||||
context, id=new_policy_id)
|
context, id=new_policy_id)
|
||||||
if new_policy:
|
if new_policy:
|
||||||
new_policy.attach_network(net_id)
|
new_policy.attach_network(net_id)
|
||||||
@ -37,7 +37,7 @@ def update_network_policy_binding(context, net_id, new_policy_id):
|
|||||||
|
|
||||||
def update_port_policy_binding(context, port_id, new_policy_id):
|
def update_port_policy_binding(context, port_id, new_policy_id):
|
||||||
# detach the old policy (if exists) from the port
|
# detach the old policy (if exists) from the port
|
||||||
old_policy = qos_policy.QosPolicy.get_port_policy(
|
old_policy = obj_reg.load_class('QosPolicy').get_port_policy(
|
||||||
context, port_id)
|
context, port_id)
|
||||||
if old_policy:
|
if old_policy:
|
||||||
if old_policy.id == new_policy_id:
|
if old_policy.id == new_policy_id:
|
||||||
@ -46,21 +46,21 @@ def update_port_policy_binding(context, port_id, new_policy_id):
|
|||||||
|
|
||||||
# attach the new policy (if exists) to the port
|
# attach the new policy (if exists) to the port
|
||||||
if new_policy_id is not None:
|
if new_policy_id is not None:
|
||||||
new_policy = qos_policy.QosPolicy.get_object(
|
new_policy = obj_reg.load_class('QosPolicy').get_object(
|
||||||
context, id=new_policy_id)
|
context, id=new_policy_id)
|
||||||
if new_policy:
|
if new_policy:
|
||||||
new_policy.attach_port(port_id)
|
new_policy.attach_port(port_id)
|
||||||
|
|
||||||
|
|
||||||
def get_port_policy_id(context, port_id):
|
def get_port_policy_id(context, port_id):
|
||||||
policy = qos_policy.QosPolicy.get_port_policy(
|
policy = obj_reg.load_class('QosPolicy').get_port_policy(
|
||||||
context, port_id)
|
context, port_id)
|
||||||
if policy:
|
if policy:
|
||||||
return policy.id
|
return policy.id
|
||||||
|
|
||||||
|
|
||||||
def get_network_policy_id(context, net_id):
|
def get_network_policy_id(context, net_id):
|
||||||
policy = qos_policy.QosPolicy.get_network_policy(
|
policy = obj_reg.load_class('QosPolicy').get_network_policy(
|
||||||
context, net_id)
|
context, net_id)
|
||||||
if policy:
|
if policy:
|
||||||
return policy.id
|
return policy.id
|
||||||
@ -74,7 +74,7 @@ def set_qos_policy_on_new_net(context, net_data, created_net):
|
|||||||
qos_policy_id = net_data.get(qos_consts.QOS_POLICY_ID)
|
qos_policy_id = net_data.get(qos_consts.QOS_POLICY_ID)
|
||||||
if not qos_policy_id:
|
if not qos_policy_id:
|
||||||
# try and get the default one
|
# try and get the default one
|
||||||
qos_obj = qos_policy.QosPolicyDefault.get_object(
|
qos_obj = obj_reg.load_class('QosPolicyDefault').get_object(
|
||||||
context, project_id=created_net['project_id'])
|
context, project_id=created_net['project_id'])
|
||||||
if qos_obj:
|
if qos_obj:
|
||||||
qos_policy_id = qos_obj.qos_policy_id
|
qos_policy_id = qos_obj.qos_policy_id
|
||||||
|
@ -15,13 +15,12 @@
|
|||||||
import mock
|
import mock
|
||||||
|
|
||||||
from neutron_lib import context
|
from neutron_lib import context
|
||||||
|
from neutron_lib.objects import registry as obj_reg
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_utils import uuidutils
|
from oslo_utils import uuidutils
|
||||||
|
|
||||||
from neutron.common import exceptions
|
from neutron.common import exceptions
|
||||||
from neutron.objects import base as base_object
|
from neutron.objects import base as base_object
|
||||||
from neutron.objects.qos import policy as policy_object
|
|
||||||
from neutron.objects.qos import rule as rule_object
|
|
||||||
from neutron.services.qos import qos_plugin
|
from neutron.services.qos import qos_plugin
|
||||||
from neutron.tests.unit.services.qos import base
|
from neutron.tests.unit.services.qos import base
|
||||||
|
|
||||||
@ -32,6 +31,7 @@ from vmware_nsx.services.qos.nsx_v3 import utils as qos_utils
|
|||||||
from vmware_nsx.tests.unit.nsx_v3 import test_plugin
|
from vmware_nsx.tests.unit.nsx_v3 import test_plugin
|
||||||
|
|
||||||
PLUGIN_NAME = 'vmware_nsx.plugins.nsx_v3.plugin.NsxV3Plugin'
|
PLUGIN_NAME = 'vmware_nsx.plugins.nsx_v3.plugin.NsxV3Plugin'
|
||||||
|
QoSPolicyObject = obj_reg.load_class('QosPolicy')
|
||||||
|
|
||||||
|
|
||||||
class TestQosNsxV3Notification(base.BaseQosTestCase,
|
class TestQosNsxV3Notification(base.BaseQosTestCase,
|
||||||
@ -66,17 +66,20 @@ class TestQosNsxV3Notification(base.BaseQosTestCase,
|
|||||||
'dscp_marking_rule': {'id': uuidutils.generate_uuid(),
|
'dscp_marking_rule': {'id': uuidutils.generate_uuid(),
|
||||||
'dscp_mark': 22}}
|
'dscp_mark': 22}}
|
||||||
|
|
||||||
self.policy = policy_object.QosPolicy(
|
self.policy = QoSPolicyObject(
|
||||||
self.ctxt, **self.policy_data['policy'])
|
self.ctxt, **self.policy_data['policy'])
|
||||||
|
|
||||||
# egress BW limit rule
|
# egress BW limit rule
|
||||||
self.rule = rule_object.QosBandwidthLimitRule(
|
self.rule = obj_reg.new_instance(
|
||||||
self.ctxt, **self.rule_data['bandwidth_limit_rule'])
|
'QosBandwidthLimitRule', self.ctxt,
|
||||||
|
**self.rule_data['bandwidth_limit_rule'])
|
||||||
# ingress bw limit rule
|
# ingress bw limit rule
|
||||||
self.ingress_rule = rule_object.QosBandwidthLimitRule(
|
self.ingress_rule = obj_reg.new_instance(
|
||||||
self.ctxt, **self.ingress_rule_data['bandwidth_limit_rule'])
|
'QosBandwidthLimitRule', self.ctxt,
|
||||||
self.dscp_rule = rule_object.QosDscpMarkingRule(
|
**self.ingress_rule_data['bandwidth_limit_rule'])
|
||||||
self.ctxt, **self.dscp_rule_data['dscp_marking_rule'])
|
self.dscp_rule = obj_reg.new_instance(
|
||||||
|
'QosDscpMarkingRule', self.ctxt,
|
||||||
|
**self.dscp_rule_data['dscp_marking_rule'])
|
||||||
|
|
||||||
self.fake_profile_id = 'fake_profile'
|
self.fake_profile_id = 'fake_profile'
|
||||||
self.fake_profile = {'id': self.fake_profile_id}
|
self.fake_profile = {'id': self.fake_profile_id}
|
||||||
@ -124,7 +127,7 @@ class TestQosNsxV3Notification(base.BaseQosTestCase,
|
|||||||
def __test_policy_update_profile(self, *mocks):
|
def __test_policy_update_profile(self, *mocks):
|
||||||
# test the switch profile update when a QoS policy is updated
|
# test the switch profile update when a QoS policy is updated
|
||||||
fields = base_object.get_updatable_fields(
|
fields = base_object.get_updatable_fields(
|
||||||
policy_object.QosPolicy, self.policy_data['policy'])
|
QoSPolicyObject, self.policy_data['policy'])
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
'vmware_nsxlib.v3.core_resources.NsxLibQosSwitchingProfile.update'
|
'vmware_nsxlib.v3.core_resources.NsxLibQosSwitchingProfile.update'
|
||||||
) as update_profile:
|
) as update_profile:
|
||||||
@ -147,10 +150,10 @@ class TestQosNsxV3Notification(base.BaseQosTestCase,
|
|||||||
tags=expected_tags
|
tags=expected_tags
|
||||||
)
|
)
|
||||||
|
|
||||||
@mock.patch.object(policy_object.QosPolicy, '_reload_rules')
|
@mock.patch.object(QoSPolicyObject, '_reload_rules')
|
||||||
def test_bw_rule_create_profile(self, *mocks):
|
def test_bw_rule_create_profile(self, *mocks):
|
||||||
# test the switch profile update when a egress QoS BW rule is created
|
# test the switch profile update when a egress QoS BW rule is created
|
||||||
_policy = policy_object.QosPolicy(
|
_policy = QoSPolicyObject(
|
||||||
self.ctxt, **self.policy_data['policy'])
|
self.ctxt, **self.policy_data['policy'])
|
||||||
# add a rule to the policy
|
# add a rule to the policy
|
||||||
setattr(_policy, "rules", [self.rule])
|
setattr(_policy, "rules", [self.rule])
|
||||||
@ -186,10 +189,10 @@ class TestQosNsxV3Notification(base.BaseQosTestCase,
|
|||||||
qos_marking='trusted'
|
qos_marking='trusted'
|
||||||
)
|
)
|
||||||
|
|
||||||
@mock.patch.object(policy_object.QosPolicy, '_reload_rules')
|
@mock.patch.object(QoSPolicyObject, '_reload_rules')
|
||||||
def test_ingress_bw_rule_create_profile(self, *mocks):
|
def test_ingress_bw_rule_create_profile(self, *mocks):
|
||||||
# test the switch profile update when a ingress QoS BW rule is created
|
# test the switch profile update when a ingress QoS BW rule is created
|
||||||
_policy = policy_object.QosPolicy(
|
_policy = QoSPolicyObject(
|
||||||
self.ctxt, **self.policy_data['policy'])
|
self.ctxt, **self.policy_data['policy'])
|
||||||
# add a rule to the policy
|
# add a rule to the policy
|
||||||
setattr(_policy, "rules", [self.ingress_rule])
|
setattr(_policy, "rules", [self.ingress_rule])
|
||||||
@ -226,7 +229,7 @@ class TestQosNsxV3Notification(base.BaseQosTestCase,
|
|||||||
qos_marking='trusted'
|
qos_marking='trusted'
|
||||||
)
|
)
|
||||||
|
|
||||||
@mock.patch.object(policy_object.QosPolicy, '_reload_rules')
|
@mock.patch.object(QoSPolicyObject, '_reload_rules')
|
||||||
def test_bw_rule_create_profile_minimal_val(self, *mocks):
|
def test_bw_rule_create_profile_minimal_val(self, *mocks):
|
||||||
# test driver precommit with an invalid limit value
|
# test driver precommit with an invalid limit value
|
||||||
bad_limit = qos_utils.MAX_KBPS_MIN_VALUE - 1
|
bad_limit = qos_utils.MAX_KBPS_MIN_VALUE - 1
|
||||||
@ -235,10 +238,11 @@ class TestQosNsxV3Notification(base.BaseQosTestCase,
|
|||||||
'max_kbps': bad_limit,
|
'max_kbps': bad_limit,
|
||||||
'max_burst_kbps': 150}}
|
'max_burst_kbps': 150}}
|
||||||
|
|
||||||
rule = rule_object.QosBandwidthLimitRule(
|
rule = obj_reg.new_instance(
|
||||||
self.ctxt, **rule_data['bandwidth_limit_rule'])
|
'QosBandwidthLimitRule', self.ctxt,
|
||||||
|
**rule_data['bandwidth_limit_rule'])
|
||||||
|
|
||||||
_policy = policy_object.QosPolicy(
|
_policy = QoSPolicyObject(
|
||||||
self.ctxt, **self.policy_data['policy'])
|
self.ctxt, **self.policy_data['policy'])
|
||||||
# add a rule to the policy
|
# add a rule to the policy
|
||||||
setattr(_policy, "rules", [rule])
|
setattr(_policy, "rules", [rule])
|
||||||
@ -251,7 +255,7 @@ class TestQosNsxV3Notification(base.BaseQosTestCase,
|
|||||||
self.qos_plugin.update_policy_bandwidth_limit_rule,
|
self.qos_plugin.update_policy_bandwidth_limit_rule,
|
||||||
self.ctxt, rule.id, _policy.id, rule_data)
|
self.ctxt, rule.id, _policy.id, rule_data)
|
||||||
|
|
||||||
@mock.patch.object(policy_object.QosPolicy, '_reload_rules')
|
@mock.patch.object(QoSPolicyObject, '_reload_rules')
|
||||||
def test_bw_rule_create_profile_maximal_val(self, *mocks):
|
def test_bw_rule_create_profile_maximal_val(self, *mocks):
|
||||||
# test driver precommit with an invalid burst value
|
# test driver precommit with an invalid burst value
|
||||||
bad_burst = qos_utils.MAX_BURST_MAX_VALUE + 1
|
bad_burst = qos_utils.MAX_BURST_MAX_VALUE + 1
|
||||||
@ -260,10 +264,11 @@ class TestQosNsxV3Notification(base.BaseQosTestCase,
|
|||||||
'max_kbps': 1025,
|
'max_kbps': 1025,
|
||||||
'max_burst_kbps': bad_burst}}
|
'max_burst_kbps': bad_burst}}
|
||||||
|
|
||||||
rule = rule_object.QosBandwidthLimitRule(
|
rule = obj_reg.new_instance(
|
||||||
self.ctxt, **rule_data['bandwidth_limit_rule'])
|
'QosBandwidthLimitRule', self.ctxt,
|
||||||
|
**rule_data['bandwidth_limit_rule'])
|
||||||
|
|
||||||
_policy = policy_object.QosPolicy(
|
_policy = QoSPolicyObject(
|
||||||
self.ctxt, **self.policy_data['policy'])
|
self.ctxt, **self.policy_data['policy'])
|
||||||
# add a rule to the policy
|
# add a rule to the policy
|
||||||
setattr(_policy, "rules", [rule])
|
setattr(_policy, "rules", [rule])
|
||||||
@ -276,10 +281,10 @@ class TestQosNsxV3Notification(base.BaseQosTestCase,
|
|||||||
self.qos_plugin.update_policy_bandwidth_limit_rule,
|
self.qos_plugin.update_policy_bandwidth_limit_rule,
|
||||||
self.ctxt, rule.id, _policy.id, rule_data)
|
self.ctxt, rule.id, _policy.id, rule_data)
|
||||||
|
|
||||||
@mock.patch.object(policy_object.QosPolicy, '_reload_rules')
|
@mock.patch.object(QoSPolicyObject, '_reload_rules')
|
||||||
def test_dscp_rule_create_profile(self, *mocks):
|
def test_dscp_rule_create_profile(self, *mocks):
|
||||||
# test the switch profile update when a QoS DSCP rule is created
|
# test the switch profile update when a QoS DSCP rule is created
|
||||||
_policy = policy_object.QosPolicy(
|
_policy = QoSPolicyObject(
|
||||||
self.ctxt, **self.policy_data['policy'])
|
self.ctxt, **self.policy_data['policy'])
|
||||||
# add a rule to the policy
|
# add a rule to the policy
|
||||||
setattr(_policy, "rules", [self.dscp_rule])
|
setattr(_policy, "rules", [self.dscp_rule])
|
||||||
@ -314,7 +319,7 @@ class TestQosNsxV3Notification(base.BaseQosTestCase,
|
|||||||
|
|
||||||
def test_rule_delete_profile(self):
|
def test_rule_delete_profile(self):
|
||||||
# test the switch profile update when a QoS rule is deleted
|
# test the switch profile update when a QoS rule is deleted
|
||||||
_policy = policy_object.QosPolicy(
|
_policy = QoSPolicyObject(
|
||||||
self.ctxt, **self.policy_data['policy'])
|
self.ctxt, **self.policy_data['policy'])
|
||||||
# The mock will return the policy without the rule,
|
# The mock will return the policy without the rule,
|
||||||
# as if it was deleted
|
# as if it was deleted
|
||||||
|
Loading…
x
Reference in New Issue
Block a user