Merge "Allow PDs to associate RPC listeners with RPC worker pool"
This commit is contained in:
@@ -1091,6 +1091,9 @@ class GroupPolicyDbPlugin(gpolicy.GroupPolicyPluginBase,
|
||||
raise gpolicy.SubnetPrefixLengthExceedsIpPool(
|
||||
ip_pool=pool, subnet_size=new_prefix_length)
|
||||
|
||||
def start_rpc_listeners(self):
|
||||
pass
|
||||
|
||||
@log.log_method_call
|
||||
def create_policy_target(self, context, policy_target):
|
||||
pt = policy_target['policy_target']
|
||||
|
||||
@@ -1306,3 +1306,7 @@ class GroupPolicyPluginBase(service_base.ServicePluginBase):
|
||||
@abc.abstractmethod
|
||||
def delete_nat_pool(self, context, nat_pool_id):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def start_rpc_listeners(self):
|
||||
pass
|
||||
|
||||
@@ -186,7 +186,6 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
|
||||
'per L2 Policy'))
|
||||
self.create_per_l3p_implicit_contracts = (
|
||||
cfg.CONF.aim_mapping.create_per_l3p_implicit_contracts)
|
||||
self.setup_opflex_rpc_listeners()
|
||||
self.advertise_mtu = cfg.CONF.aim_mapping.advertise_mtu
|
||||
local_api.QUEUE_OUT_OF_PROCESS_NOTIFICATIONS = True
|
||||
if self.create_per_l3p_implicit_contracts:
|
||||
@@ -194,6 +193,10 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
|
||||
'for l3_policies which do not have them.'))
|
||||
self._create_per_l3p_implicit_contracts()
|
||||
|
||||
@log.log_method_call
|
||||
def start_rpc_listeners(self):
|
||||
return self.setup_opflex_rpc_listeners()
|
||||
|
||||
@property
|
||||
def aim_mech_driver(self):
|
||||
if not self._apic_aim_mech_driver:
|
||||
@@ -2384,7 +2387,8 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
|
||||
return AUTO_PTG_NAME_PREFIX % l2p['id']
|
||||
|
||||
def _get_auto_ptg_id(self, l2p_id):
|
||||
return AUTO_PTG_ID_PREFIX % hashlib.md5(l2p_id).hexdigest()
|
||||
if l2p_id:
|
||||
return AUTO_PTG_ID_PREFIX % hashlib.md5(l2p_id).hexdigest()
|
||||
|
||||
def _is_auto_ptg(self, ptg):
|
||||
return ptg['id'].startswith(AUTO_PTG_PREFIX)
|
||||
|
||||
@@ -47,7 +47,7 @@ class AIMMappingRPCMixin(ha_ip_db.HAIPOwnerDbMixin):
|
||||
self.opflex_conn = n_rpc.create_connection()
|
||||
self.opflex_conn.create_consumer(
|
||||
self.opflex_topic, self.opflex_endpoints, fanout=False)
|
||||
self.opflex_conn.consume_in_threads()
|
||||
return self.opflex_conn.consume_in_threads()
|
||||
|
||||
@db_api.retry_db_errors
|
||||
def _retrieve_vrf_details(self, context, **kwargs):
|
||||
|
||||
@@ -1379,6 +1379,13 @@ class PolicyDriver(object):
|
||||
"""
|
||||
pass
|
||||
|
||||
def start_rpc_listeners(self):
|
||||
"""Start the RPC listeners for the policy drivers.
|
||||
|
||||
When implemented it should return the RPC server object.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class ExtensionDriver(object):
|
||||
|
||||
@@ -69,6 +69,9 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
|
||||
self._aliases = aliases
|
||||
return self._aliases
|
||||
|
||||
def start_rpc_listeners(self):
|
||||
return self.policy_driver_manager.start_rpc_listeners()
|
||||
|
||||
@property
|
||||
def servicechain_plugin(self):
|
||||
# REVISIT(rkukura): Need initialization method after all
|
||||
|
||||
@@ -105,7 +105,7 @@ class PolicyDriverManager(stevedore.named.NamedExtensionManager):
|
||||
self.native_bulk_support &= getattr(driver.obj,
|
||||
'native_bulk_support', True)
|
||||
|
||||
def _call_on_drivers(self, method_name, context,
|
||||
def _call_on_drivers(self, method_name, context=None,
|
||||
continue_on_failure=False):
|
||||
"""Helper method for calling a method across all policy drivers.
|
||||
|
||||
@@ -120,9 +120,16 @@ class PolicyDriverManager(stevedore.named.NamedExtensionManager):
|
||||
drivers = (self.ordered_policy_drivers if not
|
||||
method_name.startswith('delete') else
|
||||
self.reverse_ordered_policy_drivers)
|
||||
if method_name == 'start_rpc_listeners':
|
||||
servers = []
|
||||
for driver in drivers:
|
||||
try:
|
||||
getattr(driver.obj, method_name)(context)
|
||||
if method_name == 'start_rpc_listeners':
|
||||
server = getattr(driver.obj, method_name)()
|
||||
if server:
|
||||
servers.extend(server)
|
||||
else:
|
||||
getattr(driver.obj, method_name)(context)
|
||||
except Exception as e:
|
||||
if db_api.is_retriable(e):
|
||||
with excutils.save_and_reraise_exception():
|
||||
@@ -151,6 +158,9 @@ class PolicyDriverManager(stevedore.named.NamedExtensionManager):
|
||||
if error:
|
||||
raise gp_exc.GroupPolicyDriverError(method=method_name)
|
||||
|
||||
if method_name == 'start_rpc_listeners':
|
||||
return servers
|
||||
|
||||
def ensure_tenant(self, plugin_context, tenant_id):
|
||||
for driver in self.ordered_policy_drivers:
|
||||
if isinstance(driver.obj, group_policy_driver_api.PolicyDriver):
|
||||
@@ -474,3 +484,6 @@ class PolicyDriverManager(stevedore.named.NamedExtensionManager):
|
||||
|
||||
def get_nat_pool_status(self, context):
|
||||
self._call_on_drivers("get_nat_pool_status", context)
|
||||
|
||||
def start_rpc_listeners(self):
|
||||
return self._call_on_drivers("start_rpc_listeners")
|
||||
|
||||
Reference in New Issue
Block a user