Add port base attributes update operation support

1. What is the problem
Tricircle does not support port update operation now.

2. What is the solution to the problem
Implement related functions

3. What the features need to be implemented to the Tricircle
Add name, description, admin_state_up, extra_dhcp_opts, device_owner,
device_id, mac_address, security group attribute updates supported,
where updating name only takes effect on the central pod.

Change-Id: Id0f1175f77f66721eaf739413edf81bfc9231957
This commit is contained in:
xiulin yin 2016-12-28 16:21:25 +08:00
parent 33e3a8d353
commit ac65bc3832
4 changed files with 460 additions and 23 deletions

View File

@ -0,0 +1,15 @@
---
features:
- |
Port
* Update port
* name, description, admin_state_up, extra_dhcp_opts, device_owner,
device_id, mac_address, security group attribute updates supported
issues:
- |
Update port may not lead to the expected result if an instance is being
booted at the same time. You can redo the update operation later to make
it execute correctly.

View File

@ -65,7 +65,7 @@ class NeutronResourceHandle(ResourceHandle):
support_resource = {
'network': LIST | CREATE | DELETE | GET | UPDATE,
'subnet': LIST | CREATE | DELETE | GET | UPDATE,
'port': LIST | CREATE | DELETE | GET,
'port': LIST | CREATE | DELETE | GET | UPDATE,
'router': LIST | CREATE | DELETE | ACTION | GET | UPDATE,
'security_group': LIST | CREATE | GET,
'security_group_rule': LIST | CREATE | DELETE,

View File

@ -36,6 +36,7 @@ from neutron.db import portbindings_db
from neutron.extensions import availability_zone as az_ext
from neutron.extensions import external_net
from neutron.extensions import l3
from neutron.extensions import portbindings
from neutron.extensions import providernet as provider
from neutron_lib.api import validators
from neutron_lib import constants
@ -456,18 +457,71 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2,
self._process_port_create_security_group(context, result, sgids)
return result
def _check_mac_update_allowed(self, orig_port, port):
unplugged_types = (portbindings.VIF_TYPE_BINDING_FAILED,
portbindings.VIF_TYPE_UNBOUND)
new_mac = port.get('mac_address')
mac_change = (new_mac is not None and
orig_port['mac_address'] != new_mac)
if mac_change and (
orig_port[portbindings.VIF_TYPE] not in unplugged_types):
raise exceptions.PortBound(
port_id=orig_port['id'],
vif_type=orig_port[portbindings.VIF_TYPE],
old_mac=orig_port['mac_address'],
new_mac=port['mac_address'])
def _filter_unsupported_attrs(self, port_data):
unsupported_attrs = ['fixed_ips', 'qos_policy',
'allowed_address_pair']
remove_keys = [key for key in port_data.keys() if (
key in unsupported_attrs)]
for key in remove_keys:
port_data.pop(key)
def _log_update_port_sensitive_attrs(self, port_id, port):
sensitive_attrs = ['device_id', 'device_owner', portbindings.VNIC_TYPE,
portbindings.PROFILE, portbindings.HOST_ID]
request_body = port['port']
updated_sens_attrs = []
for key in request_body.keys():
if key in sensitive_attrs:
updated_sens_attrs.append('%s = %s' % (key, request_body[key]))
warning_attrs = ', '.join(updated_sens_attrs)
LOG.warning(_LW('update port: %(port_id)s , %(warning_attrs)s'),
{'port_id': port_id, 'warning_attrs': warning_attrs})
def _handle_bottom_security_group(self, t_ctx, top_sg, bottom_pod):
if top_sg:
b_region_name = bottom_pod['region_name']
for sg_id in top_sg:
b_client = self._get_client(region_name=b_region_name)
b_client.get_security_groups(t_ctx, sg_id)
if db_api.get_bottom_id_by_top_id_region_name(
t_ctx, sg_id, b_region_name, t_constants.RT_SG):
continue
db_api.create_resource_mapping(
t_ctx, sg_id, sg_id, bottom_pod['pod_id'], t_ctx.tenant,
t_constants.RT_SG)
def update_port(self, context, port_id, port):
# TODO(zhiyuan) handle bottom port update
t_ctx = t_context.get_context_from_neutron_context(context)
top_port = super(TricirclePlugin, self).get_port(context, port_id)
# be careful that l3_db will call update_port to update device_id of
# router interface, we cannot directly update bottom port in this case,
# otherwise we will fail when attaching bottom port to bottom router
# because its device_id is not empty
res = super(TricirclePlugin, self).update_port(context, port_id, port)
if t_constants.PROFILE_REGION in port['port'].get(
'binding:profile', {}):
res = super(TricirclePlugin, self).update_port(context, port_id,
port)
region_name = port['port']['binding:profile'][
t_constants.PROFILE_REGION]
t_ctx = t_context.get_context_from_neutron_context(context)
pod = db_api.get_pod_by_name(t_ctx, region_name)
entries = [(ip['subnet_id'],
t_constants.RT_SUBNET) for ip in res['fixed_ips']]
@ -503,6 +557,54 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2,
self.xjob_handler.configure_security_group_rules(t_ctx,
res['tenant_id'])
# for vm port or port with empty device_owner, update top port and
# bottom port
elif top_port.get('device_owner') not in NON_VM_PORT_TYPES:
mappings = db_api.get_bottom_mappings_by_top_id(
t_ctx, port_id, t_constants.RT_PORT)
request_body = port[attributes.PORT]
if mappings:
with context.session.begin():
b_pod, b_port_id = mappings[0]
b_region_name = b_pod['region_name']
b_client = self._get_client(region_name=b_region_name)
b_port = b_client.get_ports(context, b_port_id)
self._check_mac_update_allowed(b_port, request_body)
self._filter_unsupported_attrs(request_body)
request_body = port[attributes.PORT]
if request_body.get('security_groups', None):
self._handle_bottom_security_group(
t_ctx, request_body['security_groups'], b_pod)
res = super(TricirclePlugin, self).update_port(
context, port_id, port)
# name is not allowed to be updated, because it is used by
# lock_handle to retrieve bottom/local resources that have
# been created but not registered in the resource routing
# table
request_body.pop('name', None)
try:
b_client.update_ports(t_ctx, b_port_id, port)
except q_cli_exceptions.NotFound:
LOG.error(
_LE('port: %(port_id)s not found, '
'region name: %(name)s'),
{'port_id': b_port_id, 'name': b_region_name})
if request_body.get('security_groups', None):
self.xjob_handler.configure_security_group_rules(
t_ctx, res['tenant_id'])
else:
self._filter_unsupported_attrs(request_body)
res = super(TricirclePlugin, self).update_port(
context, port_id, port)
else:
# for router interface, router gw, dhcp port, not directly
# update bottom port
res = super(TricirclePlugin, self).update_port(
context, port_id, port)
self._log_update_port_sensitive_attrs(port_id, port)
return res
def delete_port(self, context, port_id, l3_port_check=True):

View File

@ -34,6 +34,7 @@ from neutron_lib.plugins import directory
import neutron.api.v2.attributes as neutron_attributes
import neutron.conf.common as q_config
from neutron.db import _utils
from neutron.db import db_base_plugin_common
from neutron.db import db_base_plugin_v2
from neutron.db import ipam_pluggable_backend
@ -42,7 +43,10 @@ from neutron.db import models_v2
from neutron.db import rbac_db_models as rbac_db
from neutron.extensions import availability_zone as az_ext
from neutron.extensions import portbindings
from neutron.ipam import driver
from neutron.ipam import exceptions as ipam_exc
from neutron.ipam import requests
import neutron.ipam.utils as ipam_utils
from neutron import manager
@ -55,6 +59,9 @@ from oslo_utils import uuidutils
from tricircle.common import client
from tricircle.common import constants
from tricircle.common import context
from tricircle.common.i18n import _
import tricircle.db.api as db_api
from tricircle.db import core
from tricircle.db import models
@ -66,7 +73,6 @@ from tricircle.network import managers
from tricircle.tests.unit.network import test_security_groups
from tricircle.xjob import xmanager
TOP_NETS = []
TOP_SUBNETS = []
TOP_PORTS = []
@ -82,6 +88,8 @@ TOP_FLOATINGIPS = []
TOP_SGS = []
TOP_SG_RULES = []
TOP_NETWORK_RBAC = []
TOP_SUBNETROUTES = []
TOP_DNSNAMESERVERS = []
BOTTOM1_NETS = []
BOTTOM1_SUBNETS = []
BOTTOM1_PORTS = []
@ -116,8 +124,9 @@ RES_MAP = {'networks': TOP_NETS,
'floatingips': TOP_FLOATINGIPS,
'securitygroups': TOP_SGS,
'securitygrouprules': TOP_SG_RULES,
'networkrbacs': TOP_NETWORK_RBAC}
SUBNET_INFOS = {}
'networkrbacs': TOP_NETWORK_RBAC,
'subnetroutes': TOP_SUBNETROUTES,
'dnsnameservers': TOP_DNSNAMESERVERS}
TEST_TENANT_ID = 'test_tenant_id'
@ -189,7 +198,6 @@ class FakePool(driver.Pool):
'cidr': subnet_request.subnet_cidr,
'gateway': subnet_request.gateway_ip,
'pools': subnet_request.allocation_pools}
SUBNET_INFOS[subnet_info['id']] = subnet_info
return FakeIpamSubnet(subnet_info)
prefix = self._subnetpool.prefixes[0]
subnet = next(prefix.subnet(subnet_request.prefixlen))
@ -201,17 +209,30 @@ class FakePool(driver.Pool):
'cidr': subnet.cidr,
'gateway': gateway,
'pools': pools}
SUBNET_INFOS[subnet_info['id']] = subnet_info
return FakeIpamSubnet(subnet_info)
def get_subnet(self, subnet_id):
return FakeIpamSubnet(SUBNET_INFOS[subnet_id])
for subnet in TOP_SUBNETS:
if subnet['id'] == subnet_id:
return FakeIpamSubnet(subnet)
raise q_lib_exc.SubnetNotFound(subnet_id=id)
def get_allocator(self, subnet_ids):
return driver.SubnetGroup()
def update_subnet(self, subnet_request):
return FakeIpamSubnet()
pools = []
for subnet in TOP_SUBNETS:
if subnet['id'] == subnet_request.subnet_id:
for request_pool in subnet_request.allocation_pools:
pool = {'start': str(request_pool._start),
'end': str(request_pool._end)}
pools.append(pool)
subnet['allocation_pools'] = pools
return FakeIpamSubnet(subnet_request)
raise ipam_exc.InvalidSubnetRequest(
reason=_("updated subnet id no not found"))
def remove_subnet(self, subnet_id):
pass
@ -228,6 +249,9 @@ class DotDict(dict):
return []
return self.get(item)
def __copy__(self):
return DotDict(self)
class DotList(list):
def all(self):
@ -435,6 +459,19 @@ class FakeClient(object):
def delete_subnets(self, ctx, subnet_id):
self.delete_resources('subnet', ctx, subnet_id)
def update_ports(self, ctx, port_id, body):
subnet_data = body[neutron_attributes.PORT]
if self.region_name == 'pod_1':
ports = BOTTOM1_PORTS
else:
ports = BOTTOM2_PORTS
for port in ports:
if port['id'] == port_id:
for key in subnet_data:
port[key] = subnet_data[key]
return
def update_subnets(self, ctx, subnet_id, body):
pass
@ -572,6 +609,9 @@ class FakeClient(object):
ret_sg = copy.deepcopy(sg)
return ret_sg
def get_security_group(self, context, _id, fields=None, tenant_id=None):
pass
class FakeNeutronContext(object):
def __init__(self):
@ -879,6 +919,23 @@ class FakeSession(object):
'routerports', 'router_id',
'routers', 'id', 'attached_ports')
if model_obj.__tablename__ == 'subnetroutes':
for subnet in TOP_SUBNETS:
if subnet['id'] != model_dict['subnet_id']:
continue
host_route = {'nexthop': model_dict['nexthop'],
'destination': model_dict['destination']}
subnet['host_routes'].append(host_route)
break
if model_obj.__tablename__ == 'dnsnameservers':
for subnet in TOP_SUBNETS:
if subnet['id'] != model_dict['subnet_id']:
continue
dnsnameservers = model_dict['address']
subnet['dns_nameservers'].append(dnsnameservers)
break
RES_MAP[model_obj.__tablename__].append(model_dict)
def _cascade_delete(self, model_dict, foreign_key, table, key):
@ -900,7 +957,7 @@ class FakeSession(object):
delete_model(res_list, model_obj)
class FakeBaseManager(xmanager.XManager):
class FakeBaseXManager(xmanager.XManager):
def __init__(self, fake_plugin):
self.clients = {constants.TOP: client.Client()}
self.job_handles = {
@ -913,7 +970,7 @@ class FakeBaseManager(xmanager.XManager):
return FakeClient(region_name)
class FakeXManager(FakeBaseManager):
class FakeXManager(FakeBaseXManager):
def __init__(self, fake_plugin):
super(FakeXManager, self).__init__(fake_plugin)
self.xjob_handler = FakeBaseRPCAPI(fake_plugin)
@ -921,7 +978,7 @@ class FakeXManager(FakeBaseManager):
class FakeBaseRPCAPI(object):
def __init__(self, fake_plugin):
self.xmanager = FakeBaseManager(fake_plugin)
self.xmanager = FakeBaseXManager(fake_plugin)
def configure_extra_routes(self, ctxt, router_id):
pass
@ -944,6 +1001,9 @@ class FakeRPCAPI(FakeBaseRPCAPI):
def delete_server_port(self, ctxt, port_id, pod_id):
pass
def configure_security_group_rules(self, ctxt, project_id):
pass
class FakeExtension(object):
def __init__(self, ext_obj):
@ -1095,6 +1155,10 @@ def fake_get_plugin(alias=q_constants.CORE):
return FakePlugin()
def fake_filter_non_model_columns(data, model):
return data
class PluginTest(unittest.TestCase,
test_security_groups.TricircleSecurityGroupTestMixin):
def setUp(self):
@ -1501,7 +1565,128 @@ class PluginTest(unittest.TestCase,
self.assertEqual(bottom_entry_map['port']['bottom_id'], b_port_id)
@staticmethod
def _prepare_network_test(tenant_id, ctx, region_name, index):
def _prepare_sg_test(project_id, ctx, pod_name):
t_sg_id = uuidutils.generate_uuid()
t_rule_id = uuidutils.generate_uuid()
b_sg_id = uuidutils.generate_uuid()
b_rule_id = uuidutils.generate_uuid()
t_sg = {
'id': t_sg_id,
'name': 'default',
'description': '',
'tenant_id': project_id,
'security_group_rules': [
{'security_group_id': t_sg_id,
'id': t_rule_id,
'tenant_id': project_id,
'remote_group_id': t_sg_id,
'direction': 'ingress',
'remote_ip_prefix': '10.0.0.0/24',
'protocol': None,
'port_range_max': None,
'port_range_min': None,
'ethertype': 'IPv4'}
]
}
TOP_PORTS.append(DotDict(t_sg))
b_sg = {
'id': b_sg_id,
'name': 'default',
'description': '',
'tenant_id': project_id,
'security_group_rules': [
{'security_group_id': b_sg_id,
'id': b_rule_id,
'tenant_id': project_id,
'remote_group_id': b_sg_id,
'direction': 'ingress',
'remote_ip_prefix': '10.0.0.0/24',
'protocol': None,
'port_range_max': None,
'port_range_min': None,
'ethertype': 'IPv4'}
]
}
if pod_name == 'pod_1':
BOTTOM1_PORTS.append(DotDict(b_sg))
else:
BOTTOM2_PORTS.append(DotDict(b_sg))
pod_id = 'pod_id_1' if pod_name == 'pod_1' else 'pod_id_2'
core.create_resource(ctx, models.ResourceRouting,
{'top_id': t_sg_id,
'bottom_id': b_sg_id,
'pod_id': pod_id,
'project_id': project_id,
'resource_type': constants.RT_SG})
return t_sg_id, b_sg_id
@staticmethod
def _prepare_port_test(tenant_id, ctx, pod_name, index, t_net_id,
b_net_id, vif_type=portbindings.VIF_TYPE_UNBOUND,
device_onwer='compute:None'):
t_port_id = uuidutils.generate_uuid()
b_port_id = uuidutils.generate_uuid()
t_port = {
'id': t_port_id,
'name': 'top_port_%d' % index,
'description': 'old_top_description',
'extra_dhcp_opts': [],
'device_owner': device_onwer,
'security_groups': [],
'device_id': '68f46ee4-d66a-4c39-bb34-ac2e5eb85470',
'admin_state_up': True,
'network_id': t_net_id,
'tenant_id': tenant_id,
'mac_address': 'fa:16:3e:cd:76:40',
'binding:vif_type': vif_type,
'project_id': 'tenant_id',
'binding:host_id': 'zhiyuan-5',
'status': 'ACTIVE'
}
TOP_PORTS.append(DotDict(t_port))
b_port = {
'id': b_port_id,
'name': b_port_id,
'description': 'old_bottom_description',
'extra_dhcp_opts': [],
'device_owner': device_onwer,
'security_groups': [],
'device_id': '68f46ee4-d66a-4c39-bb34-ac2e5eb85470',
'admin_state_up': True,
'network_id': b_net_id,
'tenant_id': tenant_id,
'device_owner': 'compute:None',
'extra_dhcp_opts': [],
'mac_address': 'fa:16:3e:cd:76:40',
'binding:vif_type': vif_type,
'project_id': 'tenant_id',
'binding:host_id': 'zhiyuan-5',
'status': 'ACTIVE'
}
if pod_name == 'pod_1':
BOTTOM1_PORTS.append(DotDict(b_port))
else:
BOTTOM2_PORTS.append(DotDict(b_port))
pod_id = 'pod_id_1' if pod_name == 'pod_1' else 'pod_id_2'
core.create_resource(ctx, models.ResourceRouting,
{'top_id': t_port_id,
'bottom_id': b_port_id,
'pod_id': pod_id,
'project_id': tenant_id,
'resource_type': constants.RT_PORT})
return t_port_id, b_port_id
@staticmethod
def _prepare_network_test(tenant_id, ctx, region_name, index,
enable_dhcp=True):
t_net_id = b_net_id = uuidutils.generate_uuid()
t_subnet_id = b_subnet_id = uuidutils.generate_uuid()
@ -1522,7 +1707,7 @@ class PluginTest(unittest.TestCase,
'ip_version': 4,
'cidr': '10.0.%d.0/24' % index,
'allocation_pools': [],
'enable_dhcp': True,
'enable_dhcp': enable_dhcp,
'gateway_ip': '10.0.%d.1' % index,
'ipv6_address_mode': '',
'ipv6_ra_mode': '',
@ -1530,12 +1715,6 @@ class PluginTest(unittest.TestCase,
}
TOP_NETS.append(DotDict(t_net))
TOP_SUBNETS.append(DotDict(t_subnet))
subnet_info = {'id': t_subnet['id'],
'tenant_id': t_subnet['tenant_id'],
'cidr': t_subnet['cidr'],
'gateway': t_subnet['gateway_ip'],
'pools': t_subnet['allocation_pools']}
SUBNET_INFOS[subnet_info['id']] = subnet_info
b_net = {
'id': b_net_id,
@ -1552,7 +1731,7 @@ class PluginTest(unittest.TestCase,
'ip_version': 4,
'cidr': '10.0.%d.0/24' % index,
'allocation_pools': [],
'enable_dhcp': True,
'enable_dhcp': enable_dhcp,
'gateway_ip': '10.0.%d.1' % index,
'ipv6_address_mode': '',
'ipv6_ra_mode': '',
@ -1641,6 +1820,147 @@ class PluginTest(unittest.TestCase,
# check pre-created ports are all deleted
self.assertEqual(port_num - pre_created_port_num, len(TOP_PORTS))
@patch.object(directory, 'get_plugin', new=fake_get_plugin)
@patch.object(driver.Pool, 'get_instance', new=fake_get_instance)
@patch.object(_utils, 'filter_non_model_columns',
new=fake_filter_non_model_columns)
@patch.object(context, 'get_context_from_neutron_context')
def test_update_port(self, mock_context):
project_id = TEST_TENANT_ID
self._basic_pod_route_setup()
neutron_context = FakeNeutronContext()
t_ctx = context.get_db_context()
t_net_id, t_subnet_id, b_net_id, _ = self._prepare_network_test(
project_id, t_ctx, 'pod_1', 1)
t_port_id, b_port_id = self._prepare_port_test(
project_id, t_ctx, 'pod_1', 1, t_net_id, b_net_id)
t_sg_id, _ = self._prepare_sg_test(project_id, t_ctx, 'pod_1')
fake_plugin = FakePlugin()
fake_client = FakeClient('pod_1')
mock_context.return_value = t_ctx
update_body = {
'port': {
'description': 'new_description',
'extra_dhcp_opts': [
{"opt_value": "123.123.123.45",
"opt_name": "server-ip-address"},
{"opt_value": "123.123.123.123",
"opt_name": "tftp-server"}
],
'device_owner': 'compute:new',
'device_id': 'new_device_id',
'name': 'new_name',
'admin_state_up': False,
'mac_address': 'fa:16:3e:cd:76:bb',
'security_groups': [t_sg_id]
}
}
body_copy = copy.deepcopy(update_body)
top_port = fake_plugin.update_port(
neutron_context, t_port_id, update_body)
self.assertEqual(top_port['name'], body_copy['port']['name'])
self.assertEqual(top_port['description'],
body_copy['port']['description'])
self.assertEqual(top_port['extra_dhcp_opts'],
body_copy['port']['extra_dhcp_opts'])
self.assertEqual(top_port['device_owner'],
body_copy['port']['device_owner'])
self.assertEqual(top_port['device_id'],
body_copy['port']['device_id'])
self.assertEqual(top_port['admin_state_up'],
body_copy['port']['admin_state_up'])
self.assertEqual(top_port['mac_address'],
body_copy['port']['mac_address'])
self.assertEqual(top_port['security_groups'],
body_copy['port']['security_groups'])
bottom_port = fake_client.get_ports(t_ctx, b_port_id)
# name is set to bottom resource id, which is used by lock_handle to
# retrieve bottom/local resources that have been created but not
# registered in the resource routing table, so it's not allowed
# to be updated
self.assertEqual(bottom_port['name'], b_port_id)
self.assertEqual(bottom_port['description'],
body_copy['port']['description'])
self.assertEqual(bottom_port['extra_dhcp_opts'],
body_copy['port']['extra_dhcp_opts'])
self.assertEqual(bottom_port['device_owner'],
body_copy['port']['device_owner'])
self.assertEqual(bottom_port['device_id'],
body_copy['port']['device_id'])
self.assertEqual(bottom_port['admin_state_up'],
body_copy['port']['admin_state_up'])
self.assertEqual(bottom_port['mac_address'],
body_copy['port']['mac_address'])
self.assertEqual(bottom_port['security_groups'],
body_copy['port']['security_groups'])
@patch.object(directory, 'get_plugin', new=fake_get_plugin)
@patch.object(driver.Pool, 'get_instance', new=fake_get_instance)
@patch.object(_utils, 'filter_non_model_columns',
new=fake_filter_non_model_columns)
@patch.object(context, 'get_context_from_neutron_context')
def test_update_bound_port_mac(self, mock_context):
tenant_id = TEST_TENANT_ID
self._basic_pod_route_setup()
neutron_context = FakeNeutronContext()
t_ctx = context.get_db_context()
t_net_id, t_subnet_id, b_net_id, _ = self._prepare_network_test(
tenant_id, t_ctx, 'pod_1', 1)
(t_port_id, b_port_id) = self._prepare_port_test(
tenant_id, t_ctx, 'pod_1', 1, t_net_id, b_net_id, vif_type='ovs',
device_onwer='compute:None')
fake_plugin = FakePlugin()
mock_context.return_value = t_ctx
update_body = {
'port': {
'mac_address': 'fa:16:3e:cd:76:bb'
}
}
self.assertRaises(q_lib_exc.PortBound, fake_plugin.update_port,
neutron_context, t_port_id, update_body)
@patch.object(directory, 'get_plugin', new=fake_get_plugin)
@patch.object(driver.Pool, 'get_instance', new=fake_get_instance)
@patch.object(_utils, 'filter_non_model_columns',
new=fake_filter_non_model_columns)
@patch.object(context, 'get_context_from_neutron_context')
def test_update_non_vm_port(self, mock_context):
tenant_id = TEST_TENANT_ID
self._basic_pod_route_setup()
t_ctx = context.get_db_context()
neutron_context = FakeNeutronContext()
mock_context.return_value = t_ctx
t_net_id, t_subnet_id, b_net_id, _ = self._prepare_network_test(
tenant_id, t_ctx, 'pod_1', 1)
fake_plugin = FakePlugin()
fake_client = FakeClient('pod_1')
non_vm_port_types = [q_constants.DEVICE_OWNER_ROUTER_INTF,
q_constants.DEVICE_OWNER_ROUTER_GW,
q_constants.DEVICE_OWNER_DHCP]
for port_type in non_vm_port_types:
(t_port_id, b_port_id) = self._prepare_port_test(
tenant_id, t_ctx, 'pod_1', 1, t_net_id, b_net_id,
device_onwer=port_type)
update_body = {
'port': {'binding:host_id': 'zhiyuan-6'}
}
body_copy = copy.deepcopy(update_body)
top_port = fake_plugin.update_port(
neutron_context, t_port_id, update_body)
self.assertEqual(top_port['binding:host_id'],
body_copy['port']['binding:host_id'])
# for router interface, router gw, dhcp port, not directly
# update bottom, so bottom not changed
bottom_port = fake_client.get_ports(t_ctx, b_port_id)
self.assertEqual(bottom_port['binding:host_id'], 'zhiyuan-5')
@patch.object(directory, 'get_plugin', new=fake_get_plugin)
@patch.object(driver.Pool, 'get_instance', new=fake_get_instance)
@patch.object(ipam_pluggable_backend.IpamPluggableBackend,