# Copyright 2015 Huawei Technologies Co., Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. import collections import datetime import eventlet import netaddr import random import six from oslo_config import cfg from oslo_log import log as logging import oslo_messaging as messaging from oslo_service import periodic_task import neutron_lib.constants as q_constants import neutron_lib.exceptions as q_exceptions import neutronclient.common.exceptions as q_cli_exceptions from tricircle.common import client from tricircle.common import constants from tricircle.common.i18n import _LE, _LI, _LW from tricircle.common import xrpcapi import tricircle.db.api as db_api import tricircle.network.exceptions as t_network_exc from tricircle.network import helper CONF = cfg.CONF LOG = logging.getLogger(__name__) IN_TEST = False AZ_HINTS = 'availability_zone_hints' def _job_handle(job_type): def handle_func(func): @six.wraps(func) def handle_args(*args, **kwargs): if IN_TEST: # NOTE(zhiyuan) job mechanism will cause some unpredictable # result in unit test so we would like to bypass it. However # we have problem mocking a decorator which decorates member # functions, that's why we use this label, not an elegant # way though. func(*args, **kwargs) return ctx = args[1] payload = kwargs['payload'] resource_id = payload[job_type] start_time = datetime.datetime.now() while True: current_time = datetime.datetime.now() delta = current_time - start_time if delta.seconds >= CONF.worker_handle_timeout: # quit when this handle is running for a long time break time_new = db_api.get_latest_timestamp(ctx, constants.JS_New, job_type, resource_id) if not time_new: break time_success = db_api.get_latest_timestamp( ctx, constants.JS_Success, job_type, resource_id) if time_success and time_success >= time_new: break job = db_api.register_job(ctx, job_type, resource_id) if not job: # fail to obtain the lock, let other worker handle the job running_job = db_api.get_running_job(ctx, job_type, resource_id) if not running_job: # there are two reasons that running_job is None. one # is that the running job has just been finished, the # other is that all workers fail to register the job # due to deadlock exception. so we sleep and try again eventlet.sleep(CONF.worker_sleep_time) continue job_time = running_job['timestamp'] current_time = datetime.datetime.now() delta = current_time - job_time if delta.seconds > CONF.job_run_expire: # previous running job expires, we set its status to # fail and try again to obtain the lock db_api.finish_job(ctx, running_job['id'], False, time_new) LOG.warning(_LW('Job %(job)s of type %(job_type)s for ' 'resource %(resource)s expires, set ' 'its state to Fail'), {'job': running_job['id'], 'job_type': job_type, 'resource': resource_id}) eventlet.sleep(CONF.worker_sleep_time) continue else: # previous running job is still valid, we just leave # the job to the worker who holds the lock break # successfully obtain the lock, start to execute handler try: func(*args, **kwargs) except Exception: db_api.finish_job(ctx, job['id'], False, time_new) LOG.error(_LE('Job %(job)s of type %(job_type)s for ' 'resource %(resource)s fails'), {'job': job['id'], 'job_type': job_type, 'resource': resource_id}) break db_api.finish_job(ctx, job['id'], True, time_new) eventlet.sleep(CONF.worker_sleep_time) return handle_args return handle_func class PeriodicTasks(periodic_task.PeriodicTasks): def __init__(self): super(PeriodicTasks, self).__init__(CONF) class XManager(PeriodicTasks): target = messaging.Target(version='1.0') def __init__(self, host=None, service_name='xjob'): LOG.debug('XManager initialization...') if not host: host = CONF.host self.host = host self.service_name = service_name # self.notifier = rpc.get_notifier(self.service_name, self.host) self.additional_endpoints = [] self.clients = {constants.TOP: client.Client()} self.job_handles = { constants.JT_ROUTER: self.configure_extra_routes, constants.JT_ROUTER_SETUP: self.setup_bottom_router, constants.JT_PORT_DELETE: self.delete_server_port, constants.JT_SEG_RULE_SETUP: self.configure_security_group_rules, constants.JT_NETWORK_UPDATE: self.update_network, constants.JT_SUBNET_UPDATE: self.update_subnet, constants.JT_SHADOW_PORT_SETUP: self.setup_shadow_ports} self.helper = helper.NetworkHelper() self.xjob_handler = xrpcapi.XJobAPI() super(XManager, self).__init__() def _get_client(self, region_name=None): if not region_name: return self.clients[constants.TOP] if region_name not in self.clients: self.clients[region_name] = client.Client(region_name) return self.clients[region_name] def periodic_tasks(self, context, raise_on_error=False): """Tasks to be run at a periodic interval.""" return self.run_periodic_tasks(context, raise_on_error=raise_on_error) def init_host(self): """init_host Hook to do additional manager initialization when one requests the service be started. This is called before any service record is created. Child classes should override this method. """ LOG.debug('XManager init_host...') def cleanup_host(self): """cleanup_host Hook to do cleanup work when the service shuts down. Child classes should override this method. """ LOG.debug('XManager cleanup_host...') def pre_start_hook(self): """pre_start_hook Hook to provide the manager the ability to do additional start-up work before any RPC queues/consumers are created. This is called after other initialization has succeeded and a service record is created. Child classes should override this method. """ LOG.debug('XManager pre_start_hook...') def post_start_hook(self): """post_start_hook Hook to provide the manager the ability to do additional start-up work immediately after a service creates RPC consumers and starts 'running'. Child classes should override this method. """ LOG.debug('XManager post_start_hook...') # rpc message endpoint handling def test_rpc(self, ctx, payload): LOG.info(_LI("xmanager receive payload: %s"), payload) info_text = "xmanager receive payload: %s" % payload return info_text @staticmethod def _get_resource_by_name(cli, cxt, _type, name): return cli.list_resources(_type, cxt, filters=[{'key': 'name', 'comparator': 'eq', 'value': name}])[0] @staticmethod def _get_router_interfaces(cli, cxt, router_id, net_id): interfaces = cli.list_ports( cxt, filters=[{'key': 'network_id', 'comparator': 'eq', 'value': net_id}, {'key': 'device_id', 'comparator': 'eq', 'value': router_id}]) return [inf for inf in interfaces if inf['device_owner'] in ( q_constants.DEVICE_OWNER_ROUTER_INTF, q_constants.DEVICE_OWNER_DVR_INTERFACE)] @periodic_task.periodic_task def redo_failed_or_new_job(self, ctx): failed_jobs, new_jobs = db_api.get_latest_failed_or_new_jobs(ctx) failed_jobs = [ job for job in failed_jobs if job['type'] in self.job_handles] new_jobs = [ job for job in new_jobs if job['type'] in self.job_handles] if not failed_jobs and not new_jobs: return if new_jobs: jobs = new_jobs is_new_job = True else: jobs = failed_jobs is_new_job = False # in one run we only pick one job to handle job_index = random.randint(0, len(jobs) - 1) job_type = jobs[job_index]['type'] resource_id = jobs[job_index]['resource_id'] payload = {job_type: resource_id} LOG.debug('Redo %(status)s job for %(resource_id)s of type ' '%(job_type)s', {'status': 'new' if is_new_job else 'failed', 'resource_id': resource_id, 'job_type': job_type}) if not is_new_job: db_api.new_job(ctx, job_type, resource_id) self.job_handles[job_type](ctx, payload=payload) @staticmethod def _safe_create_bottom_floatingip(t_ctx, pod, client, fip_net_id, fip_address, port_id): try: client.create_floatingips( t_ctx, {'floatingip': {'floating_network_id': fip_net_id, 'floating_ip_address': fip_address, 'port_id': port_id}}) except q_cli_exceptions.IpAddressInUseClient: fips = client.list_floatingips(t_ctx, [{'key': 'floating_ip_address', 'comparator': 'eq', 'value': fip_address}]) if not fips: # this is rare case that we got IpAddressInUseClient exception # a second ago but now the floating ip is missing raise t_network_exc.BottomPodOperationFailure( resource='floating ip', region_name=pod['region_name']) associated_port_id = fips[0].get('port_id') if associated_port_id == port_id: # if the internal port associated with the existing fip is what # we expect, just ignore this exception pass elif not associated_port_id: # if the existing fip is not associated with any internal port, # update the fip to add association client.update_floatingips(t_ctx, fips[0]['id'], {'floatingip': {'port_id': port_id}}) else: raise def _setup_router_one_pod(self, ctx, t_pod, b_pod, t_client, t_net, t_router, t_bridge_net, t_bridge_subnet, is_ext_net_pod): # NOTE(zhiyuan) after the bridge network combination, external network # is attached to a separate router, which is created in central plugin, # so is_ext_net_pod is not used in the current implementation, but we # choose to keep this parameter since it's an important attribute of a # pod and we may need to use it later. b_client = self._get_client(b_pod['region_name']) is_distributed = t_router.get('distributed', False) router_body = {'router': {'name': t_router['id'], 'distributed': is_distributed}} project_id = t_router['tenant_id'] # create bottom router in target bottom pod _, b_router_id = self.helper.prepare_bottom_element( ctx, project_id, b_pod, t_router, constants.RT_ROUTER, router_body) q_ctx = None # no need to pass neutron context when using client is_local_router = self.helper.is_local_router(ctx, t_router) if not is_local_router: # create top bridge port t_bridge_port_id = self.helper.get_bridge_interface( ctx, q_ctx, project_id, t_pod, t_bridge_net['id'], b_router_id) # create bottom bridge port # if target bottom pod is hosting real external network, we create # another bottom router and attach the bridge network as internal # network, but this work is done by central plugin when user sets # router gateway. t_bridge_port = t_client.get_ports(ctx, t_bridge_port_id) (is_new, b_bridge_port_id, b_bridge_subnet_id, b_bridge_net_id) = self.helper.get_bottom_bridge_elements( ctx, project_id, b_pod, t_bridge_net, True, t_bridge_subnet, None) # we attach the bridge port as router gateway # add_gateway is update operation, which can run multiple times gateway_ip = t_bridge_port['fixed_ips'][0]['ip_address'] b_client.action_routers( ctx, 'add_gateway', b_router_id, {'network_id': b_bridge_net_id, 'enable_snat': False, 'external_fixed_ips': [{'subnet_id': b_bridge_subnet_id, 'ip_address': gateway_ip}]}) # attach internal port to bottom router t_ports = self._get_router_interfaces(t_client, ctx, t_router['id'], t_net['id']) b_net_id = db_api.get_bottom_id_by_top_id_region_name( ctx, t_net['id'], b_pod['region_name'], constants.RT_NETWORK) if b_net_id: b_ports = self._get_router_interfaces(b_client, ctx, b_router_id, b_net_id) else: b_ports = [] if not t_ports and b_ports: # remove redundant bottom interface b_port = b_ports[0] request_body = {'port_id': b_port['id']} b_client.action_routers(ctx, 'remove_interface', b_router_id, request_body) elif t_ports and not b_ports: # create new bottom interface t_port = t_ports[0] # only consider ipv4 address currently t_subnet_id = t_port['fixed_ips'][0]['subnet_id'] t_subnet = t_client.get_subnets(ctx, t_subnet_id) if CONF.enable_api_gateway: (b_net_id, subnet_map) = self.helper.prepare_bottom_network_subnets( ctx, q_ctx, project_id, b_pod, t_net, [t_subnet]) else: (b_net_id, subnet_map) = (t_net['id'], {t_subnet['id']: t_subnet['id']}) # the gateway ip of bottom subnet is set to the ip of t_port, so # we just attach the bottom subnet to the bottom router and neutron # server in the bottom pod will create the interface for us, using # the gateway ip. b_client.action_routers(ctx, 'add_interface', b_router_id, {'subnet_id': subnet_map[t_subnet_id]}) if not t_router['external_gateway_info']: return # handle floatingip t_ext_net_id = t_router['external_gateway_info']['network_id'] t_fips = t_client.list_floatingips(ctx, [{'key': 'floating_network_id', 'comparator': 'eq', 'value': t_ext_net_id}]) # skip unbound top floatingip t_ip_fip_map = dict([(fip['floating_ip_address'], fip) for fip in t_fips if fip['port_id']]) mappings = db_api.get_bottom_mappings_by_top_id(ctx, t_ext_net_id, constants.RT_NETWORK) # bottom external network should exist b_ext_pod, b_ext_net_id = mappings[0] b_ext_client = self._get_client(b_ext_pod['region_name']) b_fips = b_ext_client.list_floatingips( ctx, [{'key': 'floating_network_id', 'comparator': 'eq', 'value': b_ext_net_id}]) b_ip_fip_map = dict([(fip['floating_ip_address'], fip) for fip in b_fips]) add_fips = [ip for ip in t_ip_fip_map if ip not in b_ip_fip_map] del_fips = [ip for ip in b_ip_fip_map if ip not in t_ip_fip_map] for add_fip in add_fips: fip = t_ip_fip_map[add_fip] t_int_port_id = fip['port_id'] b_int_port_id = db_api.get_bottom_id_by_top_id_region_name( ctx, t_int_port_id, b_pod['region_name'], constants.RT_PORT) if not b_int_port_id: LOG.warning(_LW('Port %(port_id)s associated with floating ip ' '%(fip)s is not mapped to bottom pod'), {'port_id': t_int_port_id, 'fip': add_fip}) continue t_int_port = t_client.get_ports(ctx, t_int_port_id) if t_int_port['network_id'] != t_net['id']: # only handle floating ip association for the given top network continue if b_ext_pod['pod_id'] != b_pod['pod_id']: # if the internal port is not located in the external network # pod, we need to create a shadow port in that pod for floating # ip association purpose t_int_net_id = t_int_port['network_id'] t_int_subnet_id = t_int_port['fixed_ips'][0]['subnet_id'] b_int_port = b_client.get_ports(ctx, b_int_port_id) host = b_int_port['binding:host_id'] agent_type = self.helper.get_agent_type_by_vif( b_int_port['binding:vif_type']) agent = db_api.get_agent_by_host_type(ctx, host, agent_type) max_bulk_size = CONF.client.max_shadow_port_bulk_size self.helper.prepare_shadow_ports( ctx, project_id, b_ext_pod, t_int_net_id, [b_int_port], [agent], max_bulk_size) # create routing entries for shadow network and subnet so we # can easily find them during central network and subnet # deletion, create_resource_mapping will catch DBDuplicateEntry # exception and ignore it so it's safe to call this function # multiple times db_api.create_resource_mapping(ctx, t_int_net_id, t_int_net_id, b_ext_pod['pod_id'], project_id, constants.RT_SD_NETWORK) db_api.create_resource_mapping(ctx, t_int_subnet_id, t_int_subnet_id, b_ext_pod['pod_id'], project_id, constants.RT_SD_SUBNET) self._safe_create_bottom_floatingip( ctx, b_pod, b_ext_client, b_ext_net_id, add_fip, b_int_port_id) for del_fip in del_fips: fip = b_ip_fip_map[del_fip] if b_ext_pod['pod_id'] != b_pod['pod_id'] and fip['port_id']: # shadow port is created in this case, but we leave shadow port # deletion work to plugin, so do nothing pass b_ext_client.delete_floatingips(ctx, fip['id']) @_job_handle(constants.JT_ROUTER_SETUP) def setup_bottom_router(self, ctx, payload): (b_pod_id, t_router_id, t_net_id) = payload[constants.JT_ROUTER_SETUP].split('#') if b_pod_id == constants.POD_NOT_SPECIFIED: mappings = db_api.get_bottom_mappings_by_top_id( ctx, t_net_id, constants.RT_NETWORK) b_pods = [mapping[0] for mapping in mappings] for b_pod in b_pods: # NOTE(zhiyuan) we create one job for each pod to avoid # conflict caused by different workers operating the same pod self.xjob_handler.setup_bottom_router( ctx, t_net_id, t_router_id, b_pod['pod_id']) return t_client = self._get_client() t_pod = db_api.get_top_pod(ctx) t_router = t_client.get_routers(ctx, t_router_id) if not t_router: # we just end this job if top router no longer exists return t_net = t_client.get_networks(ctx, t_net_id) if not t_net: # we just end this job if top network no longer exists return project_id = t_router['tenant_id'] b_pod = db_api.get_pod(ctx, b_pod_id) is_local_router = self.helper.is_local_router(ctx, t_router) if is_local_router: t_bridge_net = None t_bridge_subnet = None else: t_bridge_net_name = constants.bridge_net_name % project_id t_bridge_subnet_name = constants.bridge_subnet_name % project_id t_bridge_net = self._get_resource_by_name(t_client, ctx, 'network', t_bridge_net_name) t_bridge_subnet = self._get_resource_by_name( t_client, ctx, 'subnet', t_bridge_subnet_name) ext_nets = t_client.list_networks(ctx, filters=[{'key': 'router:external', 'comparator': 'eq', 'value': True}]) ext_net_region_names = set( [ext_net[AZ_HINTS][0] for ext_net in ext_nets]) if not ext_net_region_names: is_ext_net_pod = False elif b_pod['region_name'] in ext_net_region_names: is_ext_net_pod = True else: is_ext_net_pod = False self._setup_router_one_pod( ctx, t_pod, b_pod, t_client, t_net, t_router, t_bridge_net, t_bridge_subnet, is_ext_net_pod) if not is_local_router: self.xjob_handler.configure_extra_routes(ctx, t_router_id) @_job_handle(constants.JT_ROUTER) def configure_extra_routes(self, ctx, payload): t_router_id = payload[constants.JT_ROUTER] t_client = self._get_client() t_router = t_client.get_routers(ctx, t_router_id) if not t_router: return if t_router.get('external_gateway_info'): t_ext_net_id = t_router['external_gateway_info']['network_id'] else: t_ext_net_id = None non_vm_port_types = [q_constants.DEVICE_OWNER_ROUTER_INTF, q_constants.DEVICE_OWNER_DVR_INTERFACE, q_constants.DEVICE_OWNER_ROUTER_SNAT, q_constants.DEVICE_OWNER_ROUTER_GW, q_constants.DEVICE_OWNER_DHCP, constants.DEVICE_OWNER_SHADOW] ew_attached_port_types = [q_constants.DEVICE_OWNER_ROUTER_INTF, q_constants.DEVICE_OWNER_DVR_INTERFACE, q_constants.DEVICE_OWNER_ROUTER_GW] ns_attached_port_types = [q_constants.DEVICE_OWNER_ROUTER_INTF, q_constants.DEVICE_OWNER_DVR_INTERFACE] mappings = db_api.get_bottom_mappings_by_top_id(ctx, t_router_id, constants.RT_ROUTER) if not mappings: b_pods, b_router_ids = [], [] else: b_pods, b_router_ids = map(list, zip(*mappings)) ns_mappings = db_api.get_bottom_mappings_by_top_id( ctx, t_router_id, constants.RT_NS_ROUTER) b_ns_pdd, b_ns_router_id = None, None if ns_mappings: b_ns_pdd, b_ns_router_id = ns_mappings[0] b_pods.append(b_ns_pdd) b_router_ids.append(b_ns_router_id) router_ew_bridge_ip_map = {} router_ns_bridge_ip_map = {} router_ips_map = {} for i, b_pod in enumerate(b_pods): is_ns_router = b_router_ids[i] == b_ns_router_id bottom_client = self._get_client(b_pod['region_name']) if is_ns_router: device_owner_filter = ns_attached_port_types else: device_owner_filter = ew_attached_port_types b_interfaces = bottom_client.list_ports( ctx, filters=[{'key': 'device_id', 'comparator': 'eq', 'value': b_router_ids[i]}, {'key': 'device_owner', 'comparator': 'eq', 'value': device_owner_filter}]) router_ips_map[b_router_ids[i]] = {} for b_interface in b_interfaces: ip = b_interface['fixed_ips'][0]['ip_address'] bridge_cidr = CONF.client.bridge_cidr if netaddr.IPAddress(ip) in netaddr.IPNetwork(bridge_cidr): if is_ns_router: # this ip is the default gateway ip for north-south # networking router_ns_bridge_ip_map[b_router_ids[i]] = ip else: # this ip is the next hop for east-west networking router_ew_bridge_ip_map[b_router_ids[i]] = ip continue b_net_id = b_interface['network_id'] b_subnet = bottom_client.get_subnets( ctx, b_interface['fixed_ips'][0]['subnet_id']) b_ports = bottom_client.list_ports( ctx, filters=[{'key': 'network_id', 'comparator': 'eq', 'value': b_net_id}]) b_vm_ports = [b_port for b_port in b_ports if b_port.get( 'device_owner', '') not in non_vm_port_types] ips = [vm_port['fixed_ips'][0][ 'ip_address'] for vm_port in b_vm_ports] router_ips_map[b_router_ids[i]][b_subnet['cidr']] = ips # handle extra routes for east-west traffic for i, b_router_id in enumerate(b_router_ids): if b_router_id == b_ns_router_id: continue bottom_client = self._get_client(b_pods[i]['region_name']) extra_routes = [] if not router_ips_map[b_router_id]: bottom_client.update_routers( ctx, b_router_id, {'router': {'routes': extra_routes}}) continue for router_id, cidr_ips_map in six.iteritems(router_ips_map): if router_id == b_router_id: continue for cidr, ips in six.iteritems(cidr_ips_map): if router_ips_map[b_router_id].get(cidr): # if the ip list is not empty, meaning that there are # already vm ports in the pod of b_router, so no need # to add extra routes continue for ip in ips: route = {'nexthop': router_ew_bridge_ip_map[router_id], 'destination': ip + '/32'} extra_routes.append(route) if router_ns_bridge_ip_map and t_ext_net_id: extra_routes.append( {'nexthop': router_ns_bridge_ip_map.values()[0], 'destination': constants.DEFAULT_DESTINATION}) bottom_client.update_routers( ctx, b_router_id, {'router': {'routes': extra_routes}}) if not b_ns_router_id: # router for north-south networking not exist, skip extra routes # configuration for north-south router return if not t_ext_net_id: # router not attached to external gateway but router for north- # south networking exists, clear the extra routes bottom_client = self._get_client(b_ns_pdd['region_name']) bottom_client.update_routers( ctx, b_ns_router_id, {'router': {'routes': []}}) return # handle extra routes for north-south router ip_bridge_ip_map = {} for router_id, cidr_ips_map in six.iteritems(router_ips_map): if router_id not in router_ew_bridge_ip_map: continue for cidr, ips in six.iteritems(cidr_ips_map): for ip in ips: nexthop = router_ew_bridge_ip_map[router_id] destination = ip + '/32' ip_bridge_ip_map[destination] = nexthop bottom_client = self._get_client(b_ns_pdd['region_name']) extra_routes = [] for fixed_ip in ip_bridge_ip_map: extra_routes.append( {'nexthop': ip_bridge_ip_map[fixed_ip], 'destination': fixed_ip}) bottom_client.update_routers( ctx, b_ns_router_id, {'router': {'routes': extra_routes}}) @_job_handle(constants.JT_PORT_DELETE) def delete_server_port(self, ctx, payload): b_pod_id, b_port_id = payload[constants.JT_PORT_DELETE].split('#') b_pod = db_api.get_pod(ctx, b_pod_id) self._get_client(b_pod['region_name']).delete_ports(ctx, b_port_id) @staticmethod def _safe_create_security_group_rule(context, client, body): try: client.create_security_group_rules(context, body) except q_exceptions.Conflict: return @staticmethod def _safe_delete_security_group_rule(context, client, _id): try: client.delete_security_group_rules(context, _id) except q_exceptions.NotFound: return @staticmethod def _construct_bottom_rule(rule, sg_id, ip=None): ip = ip or rule['remote_ip_prefix'] # if ip is passed, this is an extended rule for remote group return {'remote_group_id': None, 'direction': rule['direction'], 'remote_ip_prefix': ip, 'protocol': rule.get('protocol'), 'ethertype': rule['ethertype'], 'port_range_max': rule.get('port_range_max'), 'port_range_min': rule.get('port_range_min'), 'security_group_id': sg_id} @staticmethod def _compare_rule(rule1, rule2): for key in ('direction', 'remote_ip_prefix', 'protocol', 'ethertype', 'port_range_max', 'port_range_min'): if rule1[key] != rule2[key]: return False return True @_job_handle(constants.JT_SEG_RULE_SETUP) def configure_security_group_rules(self, ctx, payload): project_id = payload[constants.JT_SEG_RULE_SETUP] top_client = self._get_client() sg_filters = [{'key': 'tenant_id', 'comparator': 'eq', 'value': project_id}] top_sgs = top_client.list_security_groups(ctx, sg_filters) for top_sg in top_sgs: new_b_rules = [] for t_rule in top_sg['security_group_rules']: if not t_rule['remote_group_id']: # leave sg_id empty here new_b_rules.append( self._construct_bottom_rule(t_rule, '')) continue if top_sg['name'] != 'default': # currently we only handle rules containing remote_group_id # for default security group continue if t_rule['ethertype'] != 'IPv4': continue subnets = top_client.list_subnets( ctx, [{'key': 'tenant_id', 'comparator': 'eq', 'value': project_id}]) bridge_ip_net = netaddr.IPNetwork(CONF.client.bridge_cidr) for subnet in subnets: ip_net = netaddr.IPNetwork(subnet['cidr']) if ip_net in bridge_ip_net: continue # leave sg_id empty here. # Tricircle has not supported IPv6 well yet, # so we ignore seg rules temporarily. if subnet['ip_version'] == q_constants.IP_VERSION_4: new_b_rules.append( self._construct_bottom_rule(t_rule, '', subnet['cidr'])) mappings = db_api.get_bottom_mappings_by_top_id( ctx, top_sg['id'], constants.RT_SG) for pod, b_sg_id in mappings: client = self._get_client(pod['region_name']) b_sg = client.get_security_groups(ctx, b_sg_id) add_rules = [] del_rules = [] match_index = set() for b_rule in b_sg['security_group_rules']: match = False for i, rule in enumerate(new_b_rules): if self._compare_rule(b_rule, rule): match = True match_index.add(i) break if not match: del_rules.append(b_rule) for i, rule in enumerate(new_b_rules): if i not in match_index: add_rules.append(rule) for del_rule in del_rules: self._safe_delete_security_group_rule( ctx, client, del_rule['id']) if add_rules: rule_body = {'security_group_rules': []} for add_rule in add_rules: add_rule['security_group_id'] = b_sg_id rule_body['security_group_rules'].append(add_rule) self._safe_create_security_group_rule( ctx, client, rule_body) @_job_handle(constants.JT_NETWORK_UPDATE) def update_network(self, ctx, payload): """update bottom network if bottom pod id equal to POD_NOT_SPECIFIED, dispatch jobs for every mapped bottom pod via RPC, otherwise update network in the specified pod. :param ctx: tricircle context :param payload: dict whose key is JT_NETWORK_UPDATE and value is "bottom_pod_id#top_network_id" :return: None """ (b_pod_id, t_network_id) = payload[ constants.JT_NETWORK_UPDATE].split('#') if b_pod_id == constants.POD_NOT_SPECIFIED: mappings = db_api.get_bottom_mappings_by_top_id( ctx, t_network_id, constants.RT_NETWORK) b_pods = [mapping[0] for mapping in mappings] for b_pod in b_pods: self.xjob_handler.update_network(ctx, t_network_id, b_pod['pod_id']) return t_client = self._get_client() t_network = t_client.get_networks(ctx, t_network_id) if not t_network: return b_pod = db_api.get_pod(ctx, b_pod_id) b_region_name = b_pod['region_name'] b_client = self._get_client(region_name=b_region_name) b_network_id = db_api.get_bottom_id_by_top_id_region_name( ctx, t_network_id, b_region_name, constants.RT_NETWORK) # name is not allowed to be updated, because it is used by # lock_handle to retrieve bottom/local resources that have been # created but not registered in the resource routing table body = { 'network': { 'description': t_network['description'], 'admin_state_up': t_network['admin_state_up'], 'shared': t_network['shared'] } } try: b_client.update_networks(ctx, b_network_id, body) except q_cli_exceptions.NotFound: LOG.error(_LE('network: %(net_id)s not found,' 'pod name: %(name)s'), {'net_id': b_network_id, 'name': b_region_name}) @_job_handle(constants.JT_SUBNET_UPDATE) def update_subnet(self, ctx, payload): """update bottom subnet if bottom pod id equal to POD_NOT_SPECIFIED, dispatch jobs for every mapped bottom pod via RPC, otherwise update subnet in the specified pod. :param ctx: tricircle context :param payload: dict whose key is JT_SUBNET_UPDATE and value is "bottom_pod_id#top_subnet_id" :return: None """ (b_pod_id, t_subnet_id) = payload[ constants.JT_SUBNET_UPDATE].split('#') if b_pod_id == constants.POD_NOT_SPECIFIED: mappings = db_api.get_bottom_mappings_by_top_id( ctx, t_subnet_id, constants.RT_SUBNET) b_pods = [mapping[0] for mapping in mappings] for b_pod in b_pods: self.xjob_handler.update_subnet(ctx, t_subnet_id, b_pod['pod_id']) return t_client = self._get_client() t_subnet = t_client.get_subnets(ctx, t_subnet_id) if not t_subnet: return b_pod = db_api.get_pod(ctx, b_pod_id) b_region_name = b_pod['region_name'] b_subnet_id = db_api.get_bottom_id_by_top_id_region_name( ctx, t_subnet_id, b_region_name, constants.RT_SUBNET) b_client = self._get_client(region_name=b_region_name) b_subnet = b_client.get_subnets(ctx, b_subnet_id) b_gateway_ip = b_subnet['gateway_ip'] # we need to remove the bottom subnet gateway ip from the top subnet # allaction pools b_allocation_pools = helper.NetworkHelper.get_bottom_subnet_pools( t_subnet, b_gateway_ip) # bottom gateway_ip doesn't need to be updated, because it is reserved # by top pod. # name is not allowed to be updated, because it is used by # lock_handle to retrieve bottom/local resources that have been # created but not registered in the resource routing table body = { 'subnet': {'description': t_subnet['description'], 'enable_dhcp': t_subnet['enable_dhcp'], 'allocation_pools': b_allocation_pools, 'host_routes': t_subnet['host_routes'], 'dns_nameservers': t_subnet['dns_nameservers']} } try: b_client.update_subnets(ctx, b_subnet_id, body) except q_cli_exceptions.NotFound: LOG.error(_LE('subnet: %(subnet_id)s not found, ' 'pod name: %(name)s'), {'subnet_id': b_subnet_id, 'name': b_region_name}) @_job_handle(constants.JT_SHADOW_PORT_SETUP) def setup_shadow_ports(self, ctx, payload): """Setup shadow ports for the target pod and network this job workes as following: (1) query all shadow ports from pods the target network is mapped to (2) query all real ports from pods the target network is mapped to (3) check the shadow ports and real ports in the target pod, create needed shadow ports (4) check the shadow ports and real ports in other pods, create a new job if the pod lacks some shadow ports :param ctx: tricircle context :param payload: {JT_SHADOW_PORT_SETUP: pod_id#network_id} :return: None """ run_label = 'during shadow ports setup' (target_pod_id, t_net_id) = payload[constants.JT_SHADOW_PORT_SETUP].split('#') target_pod = db_api.get_pod(ctx, target_pod_id) t_client = self._get_client() t_net = t_client.get_networks(ctx, t_net_id) if not t_net: # we just end this job if top network no longer exists return project_id = t_net['tenant_id'] mappings = db_api.get_bottom_mappings_by_top_id(ctx, t_net_id, constants.RT_NETWORK) pod_ids = set([pod['pod_id'] for pod, _ in mappings]) pod_port_ids_map = collections.defaultdict(set) pod_sw_port_ids_map = {} port_info_map = {} if target_pod_id not in pod_ids: LOG.debug('Pod %s not found %s', target_pod_id, run_label) # network is not mapped to the specified pod, nothing to do return for b_pod, b_net_id in mappings: b_client = self._get_client(b_pod['region_name']) # port table has (network_id, device_owner) index b_sw_ports = b_client.list_ports( ctx, filters=[{'key': 'network_id', 'comparator': 'eq', 'value': b_net_id}, {'key': 'device_owner', 'comparator': 'eq', 'value': constants.DEVICE_OWNER_SHADOW}, {'key': 'fields', 'comparator': 'eq', 'value': ['id', 'status']}]) b_sw_port_ids = set([port['id'] for port in b_sw_ports]) if b_pod['pod_id'] == target_pod['pod_id']: b_down_sw_port_ids = set( [port['id'] for port in b_sw_ports if ( port['status'] == q_constants.PORT_STATUS_DOWN)]) pod_sw_port_ids_map[b_pod['pod_id']] = b_sw_port_ids # port table has (network_id, device_owner) index b_ports = b_client.list_ports( ctx, filters=[{'key': 'network_id', 'comparator': 'eq', 'value': b_net_id}, {'key': 'fields', 'comparator': 'eq', 'value': ['id', 'binding:vif_type', 'binding:host_id', 'fixed_ips', 'device_owner', 'mac_address']}]) LOG.debug('Shadow ports %s in pod %s %s', b_sw_ports, target_pod_id, run_label) LOG.debug('Ports %s in pod %s %s', b_ports, target_pod_id, run_label) for b_port in b_ports: if not self.helper.is_need_top_sync_port( b_port, cfg.CONF.client.bridge_cidr): continue if b_port['device_owner'] == constants.DEVICE_OWNER_SHADOW: continue b_port_id = b_port['id'] pod_port_ids_map[b_pod['pod_id']].add(b_port_id) port_info_map[b_port_id] = b_port all_port_ids = set() for port_ids in six.itervalues(pod_port_ids_map): all_port_ids |= port_ids sync_port_ids = all_port_ids - ( pod_port_ids_map[target_pod_id] | pod_sw_port_ids_map[ target_pod_id]) sync_pod_list = [] for pod_id in pod_port_ids_map: if pod_id == target_pod_id: continue if pod_port_ids_map[target_pod_id] - ( pod_port_ids_map[pod_id] | pod_sw_port_ids_map[pod_id]): sync_pod_list.append(pod_id) LOG.debug('Sync port ids %s %s', sync_port_ids, run_label) LOG.debug('Sync pod ids %s %s', sync_pod_list, run_label) agent_info_map = {} port_bodys = [] agents = [] for port_id in sync_port_ids: port_body = port_info_map[port_id] host = port_body['binding:host_id'] agent_type = self.helper.get_agent_type_by_vif( port_body['binding:vif_type']) if not agent_type: continue key = '%s#%s' % (host, agent_type) if key in agent_info_map: agent = agent_info_map[key] else: agent = db_api.get_agent_by_host_type(ctx, host, agent_type) if not agent: LOG.error(_LE('Agent of type %(agent_type)s in ' 'host %(host)s not found during shadow ' 'ports setup'), {'agent_type': agent_type, 'host': host}) continue agent_info_map[key] = agent port_bodys.append(port_body) agents.append(agent) max_bulk_size = CONF.client.max_shadow_port_bulk_size sw_port_ids = self.helper.prepare_shadow_ports( ctx, project_id, target_pod, t_net_id, port_bodys, agents, max_bulk_size) b_down_sw_port_ids = b_down_sw_port_ids | set(sw_port_ids) # value for key constants.PROFILE_FORCE_UP does not matter update_body = { 'port': { 'binding:profile': {constants.PROFILE_FORCE_UP: 'True'} } } for sw_port_id in b_down_sw_port_ids: self._get_client(target_pod['region_name']).update_ports( ctx, sw_port_id, update_body) for pod_id in sync_pod_list: self.xjob_handler.setup_shadow_ports(ctx, pod_id, t_net_id)