Shared VxLAN (Part2: async job)

1. What is the problem?
VLAN network has some restrictions that VxLAN network doesn't have.
For more flexible networking deployment, we consider supporting
cross-pod VxLAN network.

We are going to use shadow agent/port mechanism to synchronize VTEP
information and make cross-pod VxLAN networking available, as discussed
in the specification document[1].

In part1[2], we have added the necessary logic to retrieve agent info
from local Neutron and save it in the shadow agent table in central
Neutron. Now we need to utilize this info to create shadow agent and
shadow port.

2. What is the solution to the problem?
An asynchronous job triggered when instance port is updated to active
is added. It calculates needed shadow ports and then create them in
the target pod.

3. What the features need to be implemented to the Tricircle
to realize the solution?
This is the second patch for cross-pod VxLAN networking support, which
introduces the following changes:

(1) A new asynchronous job setup_shadow_ports is added. Each asynchronous
job only handles the shadow ports setup in one given pod for one given
network. If shadow ports in other pod also needs to be updated, the job
registers one new job for each pod.

[1] https://review.openstack.org/#/c/429155/
[2] https://review.openstack.org/#/c/425128/

Change-Id: I9481016b54feb57aacd03688de882b8912a78018
This commit is contained in:
zhiyuan_cai 2016-12-28 17:30:13 +08:00
parent bb73104ca2
commit e87e080281
11 changed files with 559 additions and 136 deletions

View File

@ -79,6 +79,7 @@ PROFILE_HOST = 'host'
PROFILE_AGENT_TYPE = 'type'
PROFILE_TUNNEL_IP = 'tunnel_ip'
PROFILE_FORCE_UP = 'force_up'
DEVICE_OWNER_SHADOW = 'compute:shadow'
# job type
JT_ROUTER = 'router'
@ -87,6 +88,8 @@ JT_PORT_DELETE = 'port_delete'
JT_SEG_RULE_SETUP = 'seg_rule_setup'
JT_NETWORK_UPDATE = 'update_network'
JT_SUBNET_UPDATE = 'subnet_update'
JT_SHADOW_PORT_SETUP = 'shadow_port_setup'
# network type
NT_LOCAL = 'local'
NT_VLAN = 'vlan'

View File

@ -112,3 +112,10 @@ class XJobAPI(object):
self.client.prepare(exchange='openstack').cast(
ctxt, 'update_subnet',
payload={constants.JT_SUBNET_UPDATE: combine_id})
def setup_shadow_ports(self, ctxt, pod_id, net_id):
combine_id = '%s#%s' % (pod_id, net_id)
db_api.new_job(ctxt, constants.JT_SHADOW_PORT_SETUP, combine_id)
self.client.prepare(exchange='openstack').cast(
ctxt, 'setup_shadow_ports',
payload={constants.JT_SHADOW_PORT_SETUP: combine_id})

View File

@ -458,8 +458,7 @@ def ensure_agent_exists(context, pod_id, host, _type, tunnel_ip):
context.session.begin()
agents = core.query_resource(
context, models.ShadowAgent,
[{'key': 'pod_id', 'comparator': 'eq', 'value': pod_id},
{'key': 'host', 'comparator': 'eq', 'value': host},
[{'key': 'host', 'comparator': 'eq', 'value': host},
{'key': 'type', 'comparator': 'eq', 'value': _type}], [])
if agents:
return
@ -477,6 +476,14 @@ def ensure_agent_exists(context, pod_id, host, _type, tunnel_ip):
context.session.close()
def get_agent_by_host_type(context, host, _type):
agents = core.query_resource(
context, models.ShadowAgent,
[{'key': 'host', 'comparator': 'eq', 'value': host},
{'key': 'type', 'comparator': 'eq', 'value': _type}], [])
return agents[0] if agents else None
def _is_user_context(context):
"""Indicates if the request context is a normal user."""
if not context:

View File

@ -30,8 +30,8 @@ def upgrade(migrate_engine):
sql.Column('type', sql.String(length=36), nullable=False),
sql.Column('tunnel_ip', sql.String(length=48), nullable=False),
migrate.UniqueConstraint(
'pod_id', 'host', 'type',
name='pod_id0host0type'),
'host', 'type',
name='host0type'),
mysql_engine='InnoDB',
mysql_charset='utf8')
shadow_agents.create()

View File

@ -118,8 +118,8 @@ class ShadowAgent(core.ModelBase, core.DictBase):
__tablename__ = 'shadow_agents'
__table_args__ = (
schema.UniqueConstraint(
'pod_id', 'host', 'type',
name='pod_id0host0type'),
'host', 'type',
name='host0type'),
)
attributes = ['id', 'pod_id', 'host', 'type', 'tunnel_ip']

View File

@ -617,7 +617,9 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2,
pod = db_api.get_pod_by_name(t_ctx, region_name)
net = self.get_network(context, res['network_id'])
if net[provider_net.NETWORK_TYPE] == t_constants.NT_VxLAN:
is_vxlan_network = (
net[provider_net.NETWORK_TYPE] == t_constants.NT_VxLAN)
if is_vxlan_network:
# if a local type network happens to be a vxlan network, local
# plugin will still send agent info, so we double check here
self.helper.create_shadow_agent_if_needed(t_ctx,
@ -657,6 +659,11 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2,
self.xjob_handler.configure_security_group_rules(t_ctx,
res['tenant_id'])
if is_vxlan_network and (
cfg.CONF.client.cross_pod_vxlan_mode in (
t_constants.NM_P2P, t_constants.NM_L2GW)):
self.xjob_handler.setup_shadow_ports(t_ctx, pod['pod_id'],
res['network_id'])
# for vm port or port with empty device_owner, update top port and
# bottom port
elif top_port.get('device_owner') not in NON_VM_PORT_TYPES:

View File

@ -35,6 +35,7 @@ import tricircle.network.exceptions as t_network_exc
AZ_HINTS = 'availability_zone_hints'
EXTERNAL = 'router:external' # neutron.extensions.external_net.EXTERNAL
TYPE_VLAN = 'vlan' # neutron.plugins.common.constants.TYPE_VLAN
VIF_TYPE_OVS = 'ovs' # neutron.extensions.portbindings.VIF_TYPE_OVS
OVS_AGENT_DATA_TEMPLATE = {
'agent_type': None,
@ -60,6 +61,9 @@ OVS_AGENT_DATA_TEMPLATE = {
'enable_distributed_routing': False,
'bridge_mappings': {}}}
VIF_AGENT_TYPE_MAP = {
VIF_TYPE_OVS: constants.AGENT_TYPE_OVS}
AGENT_DATA_TEMPLATE_MAP = {
constants.AGENT_TYPE_OVS: OVS_AGENT_DATA_TEMPLATE}
@ -216,6 +220,9 @@ class NetworkHelper(object):
value = t_constants.shadow_port_name % ele_['id']
elif _type_ == t_constants.RT_NETWORK:
value = utils.get_bottom_network_name(ele_)
elif _type_ == t_constants.RT_SD_PORT:
_type_ = t_constants.RT_PORT
value = t_constants.shadow_port_name % ele_['id']
else:
value = ele_['id']
return client.list_resources(_type_, t_ctx_,
@ -228,6 +235,8 @@ class NetworkHelper(object):
elif _type_ == t_constants.RT_SD_PORT:
_type_ = t_constants.RT_PORT
client = self._get_client(pod_['region_name'])
if _type_ == t_constants.RT_SD_PORT:
_type_ = t_constants.RT_PORT
return client.create_resources(_type_, t_ctx_, body_)
return t_lock.get_or_create_element(
@ -701,6 +710,10 @@ class NetworkHelper(object):
router_az_hint = router_az_hints[0]
return bool(db_api.get_pod_by_name(t_ctx, router_az_hint))
@staticmethod
def get_agent_type_by_vif(vif_type):
return VIF_AGENT_TYPE_MAP.get(vif_type)
@staticmethod
def construct_agent_data(agent_type, host, tunnel_ip):
if agent_type not in AGENT_DATA_TEMPLATE_MAP:

View File

@ -20,6 +20,7 @@ from oslo_log import log
from neutron_lib.api.definitions import portbindings
from neutron_lib.api.definitions import provider_net
from neutron_lib.api import validators
import neutron_lib.constants as q_constants
import neutron_lib.exceptions as q_exceptions
@ -54,9 +55,6 @@ cfg.CONF.register_opts(tricircle_opts, group=tricircle_opt_group)
LOG = log.getLogger(__name__)
VIF_AGENT_TYPE_MAP = {
portbindings.VIF_TYPE_OVS: q_constants.AGENT_TYPE_OVS}
class TricirclePlugin(plugin.Ml2Plugin):
def __init__(self):
@ -461,7 +459,7 @@ class TricirclePlugin(plugin.Ml2Plugin):
# get_subnet will create bottom subnet if it doesn't exist
self.get_subnet(context, subnet_id)
for field in ('name', 'device_id', 'binding:host_id'):
for field in ('name', 'device_id', 'device_owner', 'binding:host_id'):
if port_body.get(field):
t_port[field] = port_body[field]
@ -488,6 +486,8 @@ class TricirclePlugin(plugin.Ml2Plugin):
if not utils.is_extension_supported(self.core_plugin, 'agent'):
return
profile_dict = port_body.get(portbindings.PROFILE, {})
if not validators.is_attr_set(profile_dict):
return
if t_constants.PROFILE_TUNNEL_IP not in profile_dict:
return
agent_type = profile_dict[t_constants.PROFILE_AGENT_TYPE]
@ -526,9 +526,9 @@ class TricirclePlugin(plugin.Ml2Plugin):
return
vif_type = port[portbindings.VIF_TYPE]
if vif_type not in VIF_AGENT_TYPE_MAP:
agent_type = helper.NetworkHelper.get_agent_type_by_vif(vif_type)
if not agent_type:
return
agent_type = VIF_AGENT_TYPE_MAP[vif_type]
agents = self.core_plugin.get_agents(
context, filters={'agent_type': [agent_type], 'host': [host]})
if not agents:

View File

@ -489,8 +489,13 @@ class FakeClient(object):
filters = filters or []
for query_filter in filters:
key = query_filter['key']
value = query_filter['value']
filter_dict[key] = value
# when querying ports, "fields" is passed in the query string to
# ask the server to only return necessary fields, which can reduce
# the data being transfered. in test, we just return all the fields
# since there's no need to optimize
if key != 'fields':
value = query_filter['value']
filter_dict[key] = value
return self.client.get('', {'filters': filter_dict})['ports']
def get_ports(self, ctx, port_id):
@ -1009,6 +1014,12 @@ class FakeBaseRPCAPI(object):
self.xmanager.update_subnet(
ctxt, payload={constants.JT_SUBNET_UPDATE: combine_id})
def configure_security_group_rules(self, ctxt, project_id):
pass
def setup_shadow_ports(self, ctxt, pod_id, net_id):
pass
class FakeRPCAPI(FakeBaseRPCAPI):
def __init__(self, fake_plugin):
@ -1025,6 +1036,11 @@ class FakeRPCAPI(FakeBaseRPCAPI):
def configure_security_group_rules(self, ctxt, project_id):
pass
def setup_shadow_ports(self, ctxt, pod_id, net_id):
combine_id = '%s#%s' % (pod_id, net_id)
self.xmanager.setup_shadow_ports(
ctxt, payload={constants.JT_SHADOW_PORT_SETUP: combine_id})
class FakeExtension(object):
def __init__(self, ext_obj):
@ -1449,7 +1465,7 @@ class PluginTest(unittest.TestCase,
az_hints = []
region_names = []
t_net_id, _, _, _ = self._prepare_network_test(
t_net_id, _, _, _ = self._prepare_network_subnet(
tenant_id, t_ctx, 'pod_1', 1, az_hints=az_hints)
net_filter = {'id': [t_net_id]}
top_net = fake_plugin.get_networks(neutron_context, net_filter)
@ -1458,8 +1474,8 @@ class PluginTest(unittest.TestCase,
az_hints = '["az_name_1", "az_name_2"]'
region_names = ['pod_1', 'pod_2']
t_net_id, _, _, _ = self._prepare_network_test(
tenant_id, t_ctx, 'pod_1', 1, az_hints=az_hints)
t_net_id, _, _, _ = self._prepare_network_subnet(
tenant_id, t_ctx, 'pod_1', 2, az_hints=az_hints)
net_filter = {'id': [t_net_id]}
top_net = fake_plugin.get_networks(neutron_context, net_filter)
six.assertCountEqual(self, top_net[0]['availability_zone_hints'],
@ -1467,8 +1483,8 @@ class PluginTest(unittest.TestCase,
az_hints = '["pod_1", "pod_2"]'
region_names = ['pod_1', 'pod_2']
t_net_id, _, _, _ = self._prepare_network_test(
tenant_id, t_ctx, 'pod_1', 1, az_hints=az_hints)
t_net_id, _, _, _ = self._prepare_network_subnet(
tenant_id, t_ctx, 'pod_1', 3, az_hints=az_hints)
net_filter = {'id': [t_net_id]}
top_net = fake_plugin.get_networks(neutron_context, net_filter)
six.assertCountEqual(self, top_net[0]['availability_zone_hints'],
@ -1476,8 +1492,8 @@ class PluginTest(unittest.TestCase,
az_hints = '["pod_1", "az_name_2"]'
region_names = ['pod_1', 'pod_2']
t_net_id, _, _, _ = self._prepare_network_test(
tenant_id, t_ctx, 'pod_1', 1, az_hints=az_hints)
t_net_id, _, _, _ = self._prepare_network_subnet(
tenant_id, t_ctx, 'pod_1', 4, az_hints=az_hints)
net_filter = {'id': [t_net_id]}
top_net = fake_plugin.get_networks(neutron_context, net_filter)
six.assertCountEqual(self, top_net[0]['availability_zone_hints'],
@ -1489,8 +1505,8 @@ class PluginTest(unittest.TestCase,
db_api.create_pod(self.context, pod4)
az_hints = '["pod_1", "az_name_1"]'
region_names = ['pod_1', 'pod_4']
t_net_id, _, _, _ = self._prepare_network_test(
tenant_id, t_ctx, 'pod_1', 1, az_hints=az_hints)
t_net_id, _, _, _ = self._prepare_network_subnet(
tenant_id, t_ctx, 'pod_1', 5, az_hints=az_hints)
net_filter = {'id': [t_net_id]}
top_net = fake_plugin.get_networks(neutron_context, net_filter)
six.assertCountEqual(self, top_net[0]['availability_zone_hints'],
@ -1503,9 +1519,8 @@ class PluginTest(unittest.TestCase,
tenant_id = TEST_TENANT_ID
self._basic_pod_route_setup()
t_ctx = context.get_db_context()
t_net_id, _, b_net_id, _ = self._prepare_network_test(tenant_id,
t_ctx, 'pod_1',
1)
t_net_id, _, b_net_id, _ = self._prepare_network_subnet(
tenant_id, t_ctx, 'pod_1', 1)
fake_plugin = FakePlugin()
fake_client = FakeClient('pod_1')
neutron_context = FakeNeutronContext()
@ -1547,8 +1562,8 @@ class PluginTest(unittest.TestCase,
tenant_id = TEST_TENANT_ID
self._basic_pod_route_setup()
t_ctx = context.get_db_context()
t_net_id, _, _, _ = self._prepare_network_test(tenant_id, t_ctx,
'pod_1', 1)
t_net_id, _, _, _ = self._prepare_network_subnet(tenant_id, t_ctx,
'pod_1', 1)
fake_plugin = FakePlugin()
neutron_context = FakeNeutronContext()
mock_context.return_value = t_ctx
@ -1568,8 +1583,8 @@ class PluginTest(unittest.TestCase,
tenant_id = TEST_TENANT_ID
self._basic_pod_route_setup()
t_ctx = context.get_db_context()
t_net_id, _, _, _ = self._prepare_network_test(tenant_id, t_ctx,
'pod_1', 1)
t_net_id, _, _, _ = self._prepare_network_subnet(tenant_id, t_ctx,
'pod_1', 1)
fake_plugin = FakePlugin()
neutron_context = FakeNeutronContext()
mock_context.return_value = t_ctx
@ -1787,55 +1802,70 @@ class PluginTest(unittest.TestCase,
return t_port_id, b_port_id
@staticmethod
def _prepare_network_test(tenant_id, ctx, region_name, index,
enable_dhcp=True, az_hints=None,
network_type=constants.NT_LOCAL):
t_net_id = b_net_id = uuidutils.generate_uuid()
t_subnet_id = b_subnet_id = uuidutils.generate_uuid()
# no need to specify az, we will setup router in the pod where bottom
# network is created
t_net = {
'id': t_net_id,
'name': 'top_net_%d' % index,
'tenant_id': tenant_id,
'description': 'description',
'admin_state_up': False,
'shared': False,
'provider:network_type': network_type,
'availability_zone_hints': az_hints
}
t_subnet = {
'id': t_subnet_id,
'network_id': t_net_id,
'name': 'top_subnet_%d' % index,
'ip_version': 4,
'cidr': '10.0.%d.0/24' % index,
'allocation_pools': [],
'enable_dhcp': enable_dhcp,
'gateway_ip': '10.0.%d.1' % index,
'ipv6_address_mode': '',
'ipv6_ra_mode': '',
'tenant_id': tenant_id,
'description': 'description',
'host_routes': [],
'dns_nameservers': []
}
TOP_NETS.append(DotDict(t_net))
TOP_SUBNETS.append(DotDict(t_subnet))
def _prepare_network_subnet(project_id, ctx, region_name, index,
enable_dhcp=True, az_hints=None,
network_type=constants.NT_LOCAL):
t_client = FakeClient()
t_net_name = 'top_net_%d' % index
t_nets = t_client.list_networks(ctx, [{'key': 'name',
'comparator': 'eq',
'value': t_net_name}])
if not t_nets:
t_net_id = uuidutils.generate_uuid()
t_subnet_id = uuidutils.generate_uuid()
t_net = {
'id': t_net_id,
'name': 'top_net_%d' % index,
'tenant_id': project_id,
'description': 'description',
'admin_state_up': False,
'shared': False,
'provider:network_type': network_type,
'availability_zone_hints': az_hints
}
t_subnet = {
'id': t_subnet_id,
'network_id': t_net_id,
'name': 'top_subnet_%d' % index,
'ip_version': 4,
'cidr': '10.0.%d.0/24' % index,
'allocation_pools': [],
'enable_dhcp': True,
'gateway_ip': '10.0.%d.1' % index,
'ipv6_address_mode': '',
'ipv6_ra_mode': '',
'tenant_id': project_id,
'description': 'description',
'host_routes': [],
'dns_nameservers': []
}
TOP_NETS.append(DotDict(t_net))
TOP_SUBNETS.append(DotDict(t_subnet))
else:
t_net_id = t_nets[0]['id']
t_subnet_name = 'top_subnet_%d' % index
t_subnets = t_client.list_subnets(ctx, [{'key': 'name',
'comparator': 'eq',
'value': t_subnet_name}])
t_subnet_id = t_subnets[0]['id']
# top and bottom ids are the same
return t_net_id, t_subnet_id, t_net_id, t_subnet_id
b_net_id = t_net_id
b_subnet_id = t_subnet_id
b_net = {
'id': b_net_id,
'name': t_net_id,
'tenant_id': tenant_id,
'tenant_id': project_id,
'description': 'description',
'admin_state_up': False,
'shared': False
'shared': False,
'tenant_id': project_id
}
b_subnet = {
'id': b_subnet_id,
'network_id': b_net_id,
'name': b_subnet_id,
'name': t_subnet_id,
'ip_version': 4,
'cidr': '10.0.%d.0/24' % index,
'allocation_pools': [],
@ -1843,7 +1873,7 @@ class PluginTest(unittest.TestCase,
'gateway_ip': '10.0.%d.25' % index,
'ipv6_address_mode': '',
'ipv6_ra_mode': '',
'tenant_id': tenant_id,
'tenant_id': project_id,
'description': 'description',
'host_routes': [],
'dns_nameservers': []
@ -1860,21 +1890,74 @@ class PluginTest(unittest.TestCase,
{'top_id': t_net_id,
'bottom_id': b_net_id,
'pod_id': pod_id,
'project_id': tenant_id,
'project_id': project_id,
'resource_type': constants.RT_NETWORK})
core.create_resource(ctx, models.ResourceRouting,
{'top_id': t_subnet_id,
'bottom_id': b_subnet_id,
'pod_id': pod_id,
'project_id': tenant_id,
'project_id': project_id,
'resource_type': constants.RT_SUBNET})
return t_net_id, t_subnet_id, b_net_id, b_subnet_id
@staticmethod
def _prepare_port(project_id, ctx, region_name, index, extra_attrs={}):
t_client = FakeClient()
t_net_name = 'top_net_%d' % index
t_nets = t_client.list_networks(ctx, [{'key': 'name',
'comparator': 'eq',
'value': t_net_name}])
t_subnet_name = 'top_subnet_%d' % index
t_subnets = t_client.list_subnets(ctx, [{'key': 'name',
'comparator': 'eq',
'value': t_subnet_name}])
t_port_id = uuidutils.generate_uuid()
b_port_id = t_port_id
ip_suffix = index if region_name == 'pod_1' else 100 + index
t_port = {
'id': b_port_id,
'network_id': t_nets[0]['id'],
'device_id': 'vm%d_id' % index,
'device_owner': 'compute:None',
'fixed_ips': [{'subnet_id': t_subnets[0]['id'],
'ip_address': '10.0.%d.%d' % (index, ip_suffix)}],
'security_groups': [],
'tenant_id': project_id
}
t_port.update(extra_attrs)
# resource ids in top and bottom pod are the same
b_port = {
'id': t_port_id,
'network_id': t_nets[0]['id'],
'device_id': 'vm%d_id' % index,
'device_owner': 'compute:None',
'fixed_ips': [{'subnet_id': t_subnets[0]['id'],
'ip_address': '10.0.%d.%d' % (index, ip_suffix)}],
'security_groups': [],
'tenant_id': project_id
}
b_port.update(extra_attrs)
TOP_PORTS.append(DotDict(t_port))
if region_name == 'pod_1':
BOTTOM1_PORTS.append(DotDict(b_port))
else:
BOTTOM2_PORTS.append(DotDict(b_port))
pod_id = 'pod_id_1' if region_name == 'pod_1' else 'pod_id_2'
core.create_resource(ctx, models.ResourceRouting,
{'top_id': t_port_id,
'bottom_id': t_port_id,
'pod_id': pod_id,
'project_id': project_id,
'resource_type': constants.RT_PORT})
return t_port_id, b_port_id
def _prepare_router_test(self, tenant_id, ctx, region_name, index,
router_az_hints=None, net_az_hints=None,
create_new_router=False):
(t_net_id, t_subnet_id, b_net_id,
b_subnet_id) = self._prepare_network_test(
b_subnet_id) = self._prepare_network_subnet(
tenant_id, ctx, region_name, index, az_hints=net_az_hints)
t_router_id = uuidutils.generate_uuid()
t_router = {
@ -1943,7 +2026,7 @@ class PluginTest(unittest.TestCase,
self._basic_pod_route_setup()
neutron_context = FakeNeutronContext()
t_ctx = context.get_db_context()
_, t_subnet_id, _, b_subnet_id = self._prepare_network_test(
_, t_subnet_id, _, b_subnet_id = self._prepare_network_subnet(
tenant_id, t_ctx, 'pod_1', 1)
fake_plugin = FakePlugin()
@ -2012,7 +2095,7 @@ class PluginTest(unittest.TestCase,
self._basic_pod_route_setup()
neutron_context = FakeNeutronContext()
t_ctx = context.get_db_context()
_, t_subnet_id, _, b_subnet_id = self._prepare_network_test(
_, t_subnet_id, _, b_subnet_id = self._prepare_network_subnet(
tenant_id, t_ctx, 'pod_1', 1, enable_dhcp=False)
fake_plugin = FakePlugin()
@ -2066,7 +2149,7 @@ class PluginTest(unittest.TestCase,
neutron_context = FakeNeutronContext()
t_ctx = context.get_db_context()
(t_net_id, t_subnet_id,
b_net_id, b_subnet_id) = self._prepare_network_test(
b_net_id, b_subnet_id) = self._prepare_network_subnet(
project_id, t_ctx, 'pod_1', 1)
t_port_id, b_port_id = self._prepare_port_test(
project_id, t_ctx, 'pod_1', 1, t_net_id, b_net_id,
@ -2146,7 +2229,7 @@ class PluginTest(unittest.TestCase,
neutron_context = FakeNeutronContext()
t_ctx = context.get_db_context()
(t_net_id, t_subnet_id,
b_net_id, b_subnet_id) = self._prepare_network_test(
b_net_id, b_subnet_id) = self._prepare_network_subnet(
tenant_id, t_ctx, 'pod_1', 1)
(t_port_id, b_port_id) = self._prepare_port_test(
tenant_id, t_ctx, 'pod_1', 1, t_net_id, b_net_id,
@ -2176,7 +2259,7 @@ class PluginTest(unittest.TestCase,
neutron_context = FakeNeutronContext()
mock_context.return_value = t_ctx
(t_net_id, t_subnet_id,
b_net_id, b_subnet_id) = self._prepare_network_test(
b_net_id, b_subnet_id) = self._prepare_network_subnet(
tenant_id, t_ctx, 'pod_1', 1)
fake_plugin = FakePlugin()
fake_client = FakeClient('pod_1')
@ -2201,19 +2284,20 @@ class PluginTest(unittest.TestCase,
bottom_port = fake_client.get_ports(t_ctx, b_port_id)
self.assertEqual(bottom_port['binding:host_id'], 'zhiyuan-5')
@patch.object(FakeRPCAPI, 'setup_shadow_ports')
@patch.object(directory, 'get_plugin', new=fake_get_plugin)
@patch.object(driver.Pool, 'get_instance', new=fake_get_instance)
@patch.object(_utils, 'filter_non_model_columns',
new=fake_filter_non_model_columns)
@patch.object(context, 'get_context_from_neutron_context')
def test_update_vm_port(self, mock_context):
def test_update_vm_port(self, mock_context, mock_setup):
tenant_id = TEST_TENANT_ID
self._basic_pod_route_setup()
t_ctx = context.get_db_context()
neutron_context = FakeNeutronContext()
mock_context.return_value = t_ctx
(t_net_id, t_subnet_id,
b_net_id, b_subnet_id) = self._prepare_network_test(
b_net_id, b_subnet_id) = self._prepare_network_subnet(
tenant_id, t_ctx, 'pod_1', 1, network_type=constants.NT_LOCAL)
fake_plugin = FakePlugin()
@ -2233,6 +2317,7 @@ class PluginTest(unittest.TestCase,
agents = core.query_resource(t_ctx, models.ShadowAgent, [], [])
# we only create shadow agent for vxlan network
self.assertEqual(len(agents), 0)
self.assertFalse(mock_setup.called)
client = FakeClient()
# in fact provider attribute is not allowed to be updated, but in test
@ -2247,6 +2332,9 @@ class PluginTest(unittest.TestCase,
self.assertEqual(agents[0]['type'], 'Open vSwitch agent')
self.assertEqual(agents[0]['host'], 'fake_host')
self.assertEqual(agents[0]['tunnel_ip'], '192.168.1.101')
# we test the exact effect of setup_shadow_ports in
# test_update_port_trigger_l2pop
self.assertTrue(mock_setup.called)
@patch.object(directory, 'get_plugin', new=fake_get_plugin)
@patch.object(driver.Pool, 'get_instance', new=fake_get_instance)
@ -2361,7 +2449,7 @@ class PluginTest(unittest.TestCase,
net_az_hints = '["pod_1", "az_name_2"]'
(t_net_id, t_subnet_id,
t_router_id, b_net_id, b_subnet_id) = self._prepare_router_test(
tenant_id, t_ctx, 'pod_1', 1, router_az_hints, net_az_hints, True)
tenant_id, t_ctx, 'pod_1', 2, router_az_hints, net_az_hints, True)
router = fake_plugin._get_router(q_ctx, t_router_id)
net = fake_plugin.get_network(q_ctx, t_net_id)
self.assertRaises(t_exceptions.RouterNetworkLocationMismatch,
@ -2372,7 +2460,7 @@ class PluginTest(unittest.TestCase,
net_az_hints = '["az_name_1", "pod_2"]'
(t_net_id, t_subnet_id,
t_router_id, b_net_id, b_subnet_id) = self._prepare_router_test(
tenant_id, t_ctx, 'pod_1', 1, router_az_hints, net_az_hints, True)
tenant_id, t_ctx, 'pod_1', 3, router_az_hints, net_az_hints, True)
router = fake_plugin._get_router(q_ctx, t_router_id)
net = fake_plugin.get_network(q_ctx, t_net_id)
self.assertRaises(t_exceptions.RouterNetworkLocationMismatch,
@ -2383,7 +2471,7 @@ class PluginTest(unittest.TestCase,
net_az_hints = None
(t_net_id, t_subnet_id,
t_router_id, b_net_id, b_subnet_id) = self._prepare_router_test(
tenant_id, t_ctx, 'pod_1', 1, router_az_hints, net_az_hints, True)
tenant_id, t_ctx, 'pod_1', 4, router_az_hints, net_az_hints, True)
router = fake_plugin._get_router(q_ctx, t_router_id)
net = fake_plugin.get_network(q_ctx, t_net_id)
self.assertRaises(t_exceptions.RouterNetworkLocationMismatch,
@ -2394,7 +2482,7 @@ class PluginTest(unittest.TestCase,
net_az_hints = '["pod_1", "az_name_2"]'
(t_net_id, t_subnet_id,
t_router_id, b_net_id, b_subnet_id) = self._prepare_router_test(
tenant_id, t_ctx, 'pod_1', 1, router_az_hints, net_az_hints, True)
tenant_id, t_ctx, 'pod_1', 5, router_az_hints, net_az_hints, True)
router = fake_plugin._get_router(q_ctx, t_router_id)
net = fake_plugin.get_network(q_ctx, t_net_id)
is_local_router = helper.NetworkHelper.is_local_router(t_ctx, router)
@ -2405,7 +2493,7 @@ class PluginTest(unittest.TestCase,
net_az_hints = None
(t_net_id, t_subnet_id,
t_router_id, b_net_id, b_subnet_id) = self._prepare_router_test(
tenant_id, t_ctx, 'pod_1', 1, router_az_hints, net_az_hints, True)
tenant_id, t_ctx, 'pod_1', 6, router_az_hints, net_az_hints, True)
router = fake_plugin._get_router(q_ctx, t_router_id)
net = fake_plugin.get_network(q_ctx, t_net_id)
is_local_router = helper.NetworkHelper.is_local_router(t_ctx, router)
@ -2416,7 +2504,7 @@ class PluginTest(unittest.TestCase,
net_az_hints = '["pod_1"]'
(t_net_id, t_subnet_id,
t_router_id, b_net_id, b_subnet_id) = self._prepare_router_test(
tenant_id, t_ctx, 'pod_1', 1, router_az_hints, net_az_hints, True)
tenant_id, t_ctx, 'pod_1', 7, router_az_hints, net_az_hints, True)
router = fake_plugin._get_router(q_ctx, t_router_id)
net = fake_plugin.get_network(q_ctx, t_net_id)
is_local_router = helper.NetworkHelper.is_local_router(t_ctx, router)
@ -2427,7 +2515,7 @@ class PluginTest(unittest.TestCase,
net_az_hints = '["az_name_2"]'
(t_net_id, t_subnet_id,
t_router_id, b_net_id, b_subnet_id) = self._prepare_router_test(
tenant_id, t_ctx, 'pod_1', 1, router_az_hints, net_az_hints, True)
tenant_id, t_ctx, 'pod_1', 8, router_az_hints, net_az_hints, True)
router = fake_plugin._get_router(q_ctx, t_router_id)
net = fake_plugin.get_network(q_ctx, t_net_id)
is_local_router = helper.NetworkHelper.is_local_router(t_ctx, router)
@ -2439,7 +2527,7 @@ class PluginTest(unittest.TestCase,
t_ctx.is_admin = True
(t_net_id, t_subnet_id,
t_router_id, b_net_id, b_subnet_id) = self._prepare_router_test(
tenant_id, t_ctx, 'pod_1', 1, router_az_hints, net_az_hints, True)
tenant_id, t_ctx, 'pod_1', 9, router_az_hints, net_az_hints, True)
router = fake_plugin._get_router(q_ctx, t_router_id)
net = fake_plugin.get_network(q_ctx, t_net_id)
is_local_router = helper.NetworkHelper.is_local_router(t_ctx, router)
@ -2449,8 +2537,8 @@ class PluginTest(unittest.TestCase,
net_az_hints = '["pod_1"]'
t_ctx.is_admin = True
(t_net_id, t_subnet_id, b_net_id,
b_subnet_id) = self._prepare_network_test(
tenant_id, t_ctx, 'pod_1', 1, az_hints=net_az_hints)
b_subnet_id) = self._prepare_network_subnet(
tenant_id, t_ctx, 'pod_1', 10, az_hints=net_az_hints)
# add a use case: router's extra_attributes attr is not exist but
# availability_zone_hints attr exist
@ -3141,10 +3229,13 @@ class PluginTest(unittest.TestCase,
self.assertIsNone(TOP_FLOATINGIPS[0]['fixed_port_id'])
self.assertIsNone(TOP_FLOATINGIPS[0]['fixed_ip_address'])
self.assertIsNone(TOP_FLOATINGIPS[0]['router_id'])
# check routing entry for copied port has been created
# both creating floating ip and booting instance in vxlan network will
# create shadow port, so we leave shadow port deletion work to central
# plugin, it will delete shadow port when deleting instance port
cp_port_mappings = db_api.get_bottom_mappings_by_top_id(
t_ctx, t_port_id, constants.RT_SD_PORT)
self.assertEqual(0, len(cp_port_mappings))
self.assertEqual(1, len(cp_port_mappings))
@patch.object(directory, 'get_plugin', new=fake_get_plugin)
@patch.object(driver.Pool, 'get_instance', new=fake_get_instance)
@ -3366,6 +3457,57 @@ class PluginTest(unittest.TestCase,
fake_plugin, q_ctx, t_ctx, 'pod_id_1', TOP_SGS, TOP_SG_RULES,
BOTTOM1_SGS)
@patch.object(FakeBaseRPCAPI, 'setup_shadow_ports')
@patch.object(FakeClient, 'update_ports')
@patch.object(context, 'get_context_from_neutron_context')
def test_update_port_trigger_l2pop(self, mock_context, mock_update,
mock_setup):
fake_plugin = FakePlugin()
q_ctx = FakeNeutronContext()
t_ctx = context.get_db_context()
t_ctx.project_id = TEST_TENANT_ID
mock_context.return_value = t_ctx
self._basic_pod_route_setup()
(t_net_id, _, _, _) = self._prepare_network_subnet(
TEST_TENANT_ID, t_ctx, 'pod_1', 1, network_type=constants.NT_VxLAN)
self._prepare_network_subnet(TEST_TENANT_ID, t_ctx, 'pod_2', 1,
network_type=constants.NT_VxLAN)
t_port_id1, b_port_id1 = self._prepare_port(
TEST_TENANT_ID, t_ctx, 'pod_1', 1,
{'binding:host_id': 'host1',
'binding:vif_type': helper.VIF_TYPE_OVS})
update_body = {'port': {
'binding:profile': {
constants.PROFILE_REGION: 'pod_1',
constants.PROFILE_HOST: 'host1',
constants.PROFILE_AGENT_TYPE: q_constants.AGENT_TYPE_OVS,
constants.PROFILE_TUNNEL_IP: '192.168.1.101'}}}
fake_plugin.update_port(q_ctx, t_port_id1, update_body)
t_port_id2, b_port_id2 = self._prepare_port(
TEST_TENANT_ID, t_ctx, 'pod_2', 1,
{'binding:host_id': 'host2',
'binding:vif_type': helper.VIF_TYPE_OVS})
update_body = {'port': {
'binding:profile': {
constants.PROFILE_REGION: 'pod_2',
constants.PROFILE_HOST: 'host2',
constants.PROFILE_AGENT_TYPE: q_constants.AGENT_TYPE_OVS,
constants.PROFILE_TUNNEL_IP: '192.168.1.102'}}}
fake_plugin.update_port(q_ctx, t_port_id2, update_body)
# shadow port is created
b_sd_port_id1 = db_api.get_bottom_id_by_top_id_region_name(
t_ctx, t_port_id1, 'pod_2', constants.RT_SD_PORT)
# shadow port is updated to active
mock_update.assert_called_once_with(
t_ctx, b_sd_port_id1, {'port': {
'binding:profile': {constants.PROFILE_FORCE_UP: 'True'}}})
# asynchronous job in pod_1 is registered
mock_setup.assert_called_once_with(t_ctx, 'pod_id_1', t_net_id)
def tearDown(self):
core.ModelBase.metadata.drop_all(core.get_engine())
for res in RES_LIST:

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import datetime
import mock
from mock import patch
@ -20,6 +21,7 @@ import six
from six.moves import xrange
import unittest
import neutron_lib.constants as q_constants
from oslo_config import cfg
from oslo_utils import uuidutils
@ -28,6 +30,7 @@ from tricircle.common import context
import tricircle.db.api as db_api
from tricircle.db import core
from tricircle.db import models
from tricircle.network import helper
from tricircle.xjob import xmanager
from tricircle.xjob import xservice
@ -72,11 +75,22 @@ RES_MAP = {'top': {'network': TOP_NETWORK,
'floatingips': BOTTOM2_FIP}}
def fake_get_client(self, region_name=None):
return FakeClient(region_name)
class FakeXManager(xmanager.XManager):
def __init__(self):
self.clients = {'top': FakeClient(),
'pod_1': FakeClient('pod_1'),
'pod_2': FakeClient('pod_2')}
self.helper = helper.NetworkHelper()
self.xjob_handler = FakeXJobAPI()
class FakeXJobAPI(object):
def setup_shadow_ports(self, ctx, pod_id, t_net_id):
pass
class FakeClient(object):
@ -92,6 +106,9 @@ class FakeClient(object):
for res in RES_MAP[self.region_name][resource]:
is_selected = True
for _filter in filters:
if _filter['key'] == 'fields':
# in test, we don't need to filter fields
continue
if _filter['key'] not in res:
is_selected = False
break
@ -99,12 +116,32 @@ class FakeClient(object):
is_selected = False
break
if is_selected:
res_list.append(res)
res_list.append(copy.copy(res))
return res_list
def create_resources(self, resource, cxt, body):
res = body[resource]
if 'id' not in res:
res['id'] = uuidutils.generate_uuid()
RES_MAP[self.region_name][resource].append(res)
return res
def update_resources(self, resource, cxt, _id, body):
for res in RES_MAP[self.region_name][resource]:
if res['id'] == _id:
res.update(body[resource])
def list_ports(self, cxt, filters=None):
return self.list_resources('port', cxt, filters)
def get_ports(self, cxt, port_id):
return self.list_resources(
'port', cxt,
[{'key': 'id', 'comparator': 'eq', 'value': port_id}])[0]
def update_ports(self, cxt, _id, body):
self.update_resources('port', cxt, _id, body)
def list_subnets(self, cxt, filters=None):
return self.list_resources('subnet', cxt, filters)
@ -113,6 +150,11 @@ class FakeClient(object):
'subnet', cxt,
[{'key': 'id', 'comparator': 'eq', 'value': subnet_id}])[0]
def get_networks(self, cxt, net_id):
return self.list_resources(
'network', cxt,
[{'key': 'id', 'comparator': 'eq', 'value': net_id}])[0]
def get_routers(self, cxt, router_id):
return self.list_resources(
'router', cxt,
@ -470,6 +512,100 @@ class XManagerTest(unittest.TestCase):
'security_group_id': sg_id}]})]
mock_create.assert_has_calls(calls)
@patch.object(helper.NetworkHelper, '_get_client', new=fake_get_client)
@patch.object(FakeXJobAPI, 'setup_shadow_ports')
def test_setup_shadow_ports(self, mock_setup):
project_id = uuidutils.generate_uuid()
net1_id = uuidutils.generate_uuid()
subnet1_id = uuidutils.generate_uuid()
port1_id = uuidutils.generate_uuid()
port2_id = uuidutils.generate_uuid()
for i in (1, 2):
pod_id = 'pod_id_%d' % i
pod_dict = {'pod_id': pod_id,
'region_name': 'pod_%d' % i,
'az_name': 'az_name_%d' % i}
db_api.create_pod(self.context, pod_dict)
db_api.create_resource_mapping(
self.context, net1_id, net1_id, pod_id, project_id,
constants.RT_NETWORK)
TOP_NETWORK.append({'id': net1_id, 'tenant_id': project_id})
BOTTOM1_PORT.append({'id': port1_id,
'network_id': net1_id,
'device_owner': 'compute:None',
'binding:vif_type': 'ovs',
'binding:host_id': 'host1',
'fixed_ips': [{'subnet_id': subnet1_id,
'ip_address': '10.0.1.3'}]})
BOTTOM2_PORT.append({'id': port2_id,
'network_id': net1_id,
'device_owner': 'compute:None',
'binding:vif_type': 'ovs',
'binding:host_id': 'host2',
'fixed_ips': [{'subnet_id': subnet1_id,
'ip_address': '10.0.1.4'}]})
db_api.ensure_agent_exists(
self.context, 'pod_id_1', 'host1', q_constants.AGENT_TYPE_OVS,
'192.168.1.101')
db_api.ensure_agent_exists(
self.context, 'pod_id_2', 'host2', q_constants.AGENT_TYPE_OVS,
'192.168.1.102')
resource_id = 'pod_id_1#' + net1_id
db_api.new_job(self.context, constants.JT_SHADOW_PORT_SETUP,
resource_id)
self.xmanager.setup_shadow_ports(
self.context,
payload={constants.JT_SHADOW_PORT_SETUP: resource_id})
# check shadow port in pod1 is created and updated
client1 = FakeClient('pod_1')
sd_ports = client1.list_ports(
self.context, [{'key': 'device_owner',
'comparator': 'eq',
'value': constants.DEVICE_OWNER_SHADOW}])
self.assertEqual(sd_ports[0]['fixed_ips'][0]['ip_address'],
'10.0.1.4')
self.assertIn(constants.PROFILE_FORCE_UP,
sd_ports[0]['binding:profile'])
# check job to setup shadow ports for pod2 is registered
mock_setup.assert_called_once_with(self.context, 'pod_id_2', net1_id)
# update shadow port to down and test again, this is possible when we
# succeed to create shadow port but fail to update it to active
profile = sd_ports[0]['binding:profile']
profile.pop(constants.PROFILE_FORCE_UP)
client1.update_ports(self.context, sd_ports[0]['id'],
{'port': {'status': q_constants.PORT_STATUS_DOWN,
'binding:profile': profile}})
db_api.new_job(self.context, constants.JT_SHADOW_PORT_SETUP,
resource_id)
self.xmanager.setup_shadow_ports(
self.context,
payload={constants.JT_SHADOW_PORT_SETUP: resource_id})
# check shadow port is udpated to active again
sd_port = client1.get_ports(self.context, sd_ports[0]['id'])
self.assertIn(constants.PROFILE_FORCE_UP, sd_port['binding:profile'])
# manually trigger shadow ports setup in pod2
resource_id = 'pod_id_2#' + net1_id
db_api.new_job(self.context, constants.JT_SHADOW_PORT_SETUP,
resource_id)
self.xmanager.setup_shadow_ports(
self.context,
payload={constants.JT_SHADOW_PORT_SETUP: resource_id})
client2 = FakeClient('pod_2')
sd_ports = client2.list_ports(
self.context, [{'key': 'device_owner',
'comparator': 'eq',
'value': constants.DEVICE_OWNER_SHADOW}])
self.assertEqual(sd_ports[0]['fixed_ips'][0]['ip_address'],
'10.0.1.3')
def test_job_handle(self):
job_type = 'fake_resource'

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import datetime
import eventlet
import netaddr
@ -33,8 +34,6 @@ from tricircle.common import constants
from tricircle.common.i18n import _LE, _LI, _LW
from tricircle.common import xrpcapi
import tricircle.db.api as db_api
from tricircle.db import core
from tricircle.db import models
import tricircle.network.exceptions as t_network_exc
from tricircle.network import helper
@ -152,7 +151,8 @@ class XManager(PeriodicTasks):
constants.JT_PORT_DELETE: self.delete_server_port,
constants.JT_SEG_RULE_SETUP: self.configure_security_group_rules,
constants.JT_NETWORK_UPDATE: self.update_network,
constants.JT_SUBNET_UPDATE: self.update_subnet}
constants.JT_SUBNET_UPDATE: self.update_subnet,
constants.JT_SHADOW_PORT_SETUP: self.setup_shadow_ports}
self.helper = helper.NetworkHelper()
self.xjob_handler = xrpcapi.XJobAPI()
super(XManager, self).__init__()
@ -419,10 +419,11 @@ class XManager(PeriodicTasks):
if b_ext_pod['pod_id'] != b_pod['pod_id']:
# if the internal port is not located in the external network
# pod, we need to create a copied port in that pod for floating
# pod, we need to create a shadow port in that pod for floating
# ip association purpose
t_int_net_id = t_int_port['network_id']
t_int_subnet_id = t_int_port['fixed_ips'][0]['subnet_id']
# TODO(zhiyuan) adapt shadow agent way to create shadow port
port_body = {
'port': {
'tenant_id': project_id,
@ -436,7 +437,7 @@ class XManager(PeriodicTasks):
self.helper.prepare_bottom_element(
ctx, project_id, b_ext_pod, t_int_port,
constants.RT_SD_PORT, port_body)
# create routing entries for copied network and subnet so we
# create routing entries for shadow network and subnet so we
# can easily find them during central network and subnet
# deletion, create_resource_mapping will catch DBDuplicateEntry
# exception and ignore it so it's safe to call this function
@ -456,44 +457,10 @@ class XManager(PeriodicTasks):
for del_fip in del_fips:
fip = b_ip_fip_map[del_fip]
if b_ext_pod['pod_id'] != b_pod['pod_id'] and fip['port_id']:
# expire the routing entry for copy port
with ctx.session.begin():
core.update_resources(
ctx, models.ResourceRouting,
[{'key': 'bottom_id', 'comparator': 'eq',
'value': fip['port_id']},
{'key': 'resource_type', 'comparator': 'eq',
'value': constants.RT_SD_PORT}],
{'bottom_id': None,
'created_at': constants.expire_time,
'updated_at': constants.expire_time})
# delete copy port
b_ext_client.delete_ports(ctx, fip['port_id'])
# delete the expired entry, even if this deletion fails, we
# still have a chance that lock_handle module will delete it
with ctx.session.begin():
core.delete_resources(ctx, models.ResourceRouting,
[{'key': 'top_id',
'comparator': 'eq',
'value': fip['port_id']},
{'key': 'resource_type',
'comparator': 'eq',
'value': constants.RT_SD_PORT}])
# delete port before floating ip disassociation, copy
# network and copy subnet are deleted during central
# network and subnet deletion
# shadow port is created in this case, but we leave shadow port
# deletion work to plugin, so do nothing
pass
b_ext_client.delete_floatingips(ctx, fip['id'])
# we first delete the internal port then delete the floating
# ip. during the deletion of the internal port, the floating
# ip will be disassociated automatically.
# the reason we delete the internal port first is that if we
# succeed to delete the internal port but fail to delete the
# floating ip, in the next run, we can still find the floating
# ip and try to delete it. but if we delete the floating ip
# first, after we fail to delete the internal port, it's not
# easy for us to find the internal port again because we cannot
# find the internal port id the floating ip body
@_job_handle(constants.JT_ROUTER_SETUP)
def setup_bottom_router(self, ctx, payload):
@ -914,3 +881,144 @@ class XManager(PeriodicTasks):
LOG.error(_LE('subnet: %(subnet_id)s not found, '
'pod name: %(name)s'),
{'subnet_id': b_subnet_id, 'name': b_region_name})
@_job_handle(constants.JT_SHADOW_PORT_SETUP)
def setup_shadow_ports(self, ctx, payload):
"""Setup shadow ports for the target pod and network
this job workes as following:
(1) query all shadow ports from pods the target network is mapped to
(2) query all real ports from pods the target network is mapped to
(3) check the shadow ports and real ports in the target pod, create
needed shadow ports
(4) check the shadow ports and real ports in other pods, create a new
job if the pod lacks some shadow ports
:param ctx: tricircle context
:param payload: {JT_SHADOW_PORT_SETUP: pod_id#network_id}
:return: None
"""
run_label = 'during shadow ports setup'
(target_pod_id,
t_net_id) = payload[constants.JT_SHADOW_PORT_SETUP].split('#')
target_pod = db_api.get_pod(ctx, target_pod_id)
t_client = self._get_client()
t_net = t_client.get_networks(ctx, t_net_id)
if not t_net:
# we just end this job if top network no longer exists
return
project_id = t_net['tenant_id']
mappings = db_api.get_bottom_mappings_by_top_id(ctx, t_net_id,
constants.RT_NETWORK)
pod_ids = set([pod['pod_id'] for pod, _ in mappings])
pod_port_ids_map = collections.defaultdict(set)
pod_sw_port_ids_map = {}
port_info_map = {}
if target_pod_id not in pod_ids:
LOG.debug('Pod %s not found %s', target_pod_id, run_label)
# network is not mapped to the specified pod, nothing to do
return
for b_pod, b_net_id in mappings:
b_client = self._get_client(b_pod['region_name'])
# port table has (network_id, device_owner) index
b_sw_ports = b_client.list_ports(
ctx, filters=[{'key': 'network_id', 'comparator': 'eq',
'value': b_net_id},
{'key': 'device_owner', 'comparator': 'eq',
'value': constants.DEVICE_OWNER_SHADOW},
{'key': 'status', 'comparator': 'eq',
'value': q_constants.PORT_STATUS_ACTIVE},
{'key': 'fields', 'comparator': 'eq',
'value': 'id'}])
b_sw_port_ids = set([port['id'] for port in b_sw_ports])
pod_sw_port_ids_map[b_pod['pod_id']] = b_sw_port_ids
# port table has (network_id, device_owner) index
b_ports = b_client.list_ports(
ctx, filters=[{'key': 'network_id', 'comparator': 'eq',
'value': b_net_id},
{'key': 'fields', 'comparator': 'eq',
'value': ['id', 'binding:vif_type',
'binding:host_id', 'fixed_ips',
'device_owner']}])
LOG.debug('Shadow ports %s in pod %s %s',
b_sw_ports, target_pod_id, run_label)
LOG.debug('Ports %s in pod %s %s',
b_ports, target_pod_id, run_label)
for b_port in b_ports:
if not b_port['device_owner'].startswith('compute:'):
continue
if b_port['device_owner'] == constants.DEVICE_OWNER_SHADOW:
continue
b_port_id = b_port['id']
pod_port_ids_map[b_pod['pod_id']].add(b_port_id)
port_info_map[b_port_id] = b_port
all_port_ids = set()
for port_ids in six.itervalues(pod_port_ids_map):
all_port_ids |= port_ids
sync_port_ids = all_port_ids - (
pod_port_ids_map[target_pod_id] | pod_sw_port_ids_map[
target_pod_id])
sync_pod_list = []
for pod_id in pod_port_ids_map:
if pod_id == target_pod_id:
continue
if pod_port_ids_map[target_pod_id] - (
pod_port_ids_map[pod_id] | pod_sw_port_ids_map[pod_id]):
sync_pod_list.append(pod_id)
LOG.debug('Sync port ids %s %s', sync_port_ids, run_label)
LOG.debug('Sync pod ids %s %s', sync_pod_list, run_label)
agent_info_map = {}
for port_id in sync_port_ids:
port_body = port_info_map[port_id]
host = port_body['binding:host_id']
agent_type = self.helper.get_agent_type_by_vif(
port_body['binding:vif_type'])
if not agent_type:
continue
key = '%s#%s' % (host, agent_type)
if key in agent_info_map:
agent = agent_info_map[key]
else:
agent = db_api.get_agent_by_host_type(ctx, host, agent_type)
if not agent:
LOG.error(_LE('Agent of type %(agent_type)s in '
'host %(host)s not found during shadow '
'ports setup'), {'agent_type': agent_type,
'host': host})
continue
agent_info_map[key] = agent
create_body = {
'port': {
'tenant_id': project_id,
'admin_state_up': True,
'name': constants.shadow_port_name % port_id,
'network_id': t_net_id,
'fixed_ips': [{
'ip_address': port_body[
'fixed_ips'][0]['ip_address']}],
'device_owner': constants.DEVICE_OWNER_SHADOW,
'binding:host_id': host,
'binding:profile': {
constants.PROFILE_AGENT_TYPE: agent_type,
constants.PROFILE_TUNNEL_IP: agent['tunnel_ip']}
}
}
# value for key constants.PROFILE_FORCE_UP does not matter
update_body = {
'port': {
'binding:profile': {constants.PROFILE_FORCE_UP: 'True'}
}
}
_, sw_port_id = self.helper.prepare_bottom_element(
ctx, project_id, target_pod, {'id': port_id},
constants.RT_SD_PORT, create_body)
self._get_client(target_pod['region_name']).update_ports(
ctx, sw_port_id, update_body)
for pod_id in sync_pod_list:
self.xjob_handler.setup_shadow_ports(ctx, pod_id, t_net_id)