Migrate octavia resources

The nsx-migration tool will migrate octavia resources to the new
plugin driver without changing the octavia DB, by calling the driver directly
using RPC.
In addition, since octavia keeps subnet-ids, the logic of the nsx-migration
was modified so that subnet-ids will not change.

Change-Id: I7b729cb28da362a9fdca21068cb6006d7f743286
This commit is contained in:
asarfaty 2019-12-17 11:10:12 +02:00 committed by Adit Sarfaty
parent 25fbd47253
commit 10cee60da7
9 changed files with 292 additions and 186 deletions

View File

@ -37,8 +37,15 @@ class ApiReplayCli(object):
dest_os_auth_url=args.dest_os_auth_url, dest_os_auth_url=args.dest_os_auth_url,
dest_plugin=args.dest_plugin, dest_plugin=args.dest_plugin,
use_old_keystone=args.use_old_keystone, use_old_keystone=args.use_old_keystone,
max_retry=args.max_retry, octavia_os_tenant_name=args.octavia_os_project_name,
logfile=args.logfile) octavia_os_tenant_domain_id=args.octavia_os_project_domain_id,
octavia_os_username=args.octavia_os_username,
octavia_os_user_domain_id=args.octavia_os_user_domain_id,
octavia_os_password=args.octavia_os_password,
octavia_os_auth_url=args.octavia_os_auth_url,
neutron_conf=args.neutron_conf,
logfile=args.logfile,
max_retry=args.max_retry)
def _setup_argparse(self): def _setup_argparse(self):
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
@ -115,11 +122,42 @@ class ApiReplayCli(object):
action='store_true', action='store_true',
help="Use old keystone client for source authentication.") help="Use old keystone client for source authentication.")
# Arguments required to connect to the octavia client (read only)
parser.add_argument(
"--octavia-os-username",
help="The octavia os-username to use to "
"gather loadbalancers resources with.")
parser.add_argument(
"--octavia-os-user-domain-id",
default=DEFAULT_DOMAIN_ID,
help="The octavia os-user-domain-id to use to "
"gather loadbalancers resources with.")
parser.add_argument(
"--octavia-os-project-name",
help="The octavia os-project-name to use to "
"gather loadbalancers resource with.")
parser.add_argument(
"--octavia-os-project-domain-id",
default=DEFAULT_DOMAIN_ID,
help="The octavia os-project-domain-id to use to "
"gather loadbalancers resource with.")
parser.add_argument(
"--octavia-os-password",
help="The password for this octavia user.")
parser.add_argument(
"--octavia-os-auth-url",
help="They keystone api endpoint for this octavia user.")
parser.add_argument( parser.add_argument(
"--logfile", "--logfile",
default=DEFAULT_LOGFILE, default=DEFAULT_LOGFILE,
help="Output logfile.") help="Output logfile.")
parser.add_argument(
"--neutron_conf",
default='/etc/neutron/neutron.conf',
help="neutron config file path.")
parser.add_argument( parser.add_argument(
"--max-retry", "--max-retry",
default=10, default=10,

View File

@ -10,8 +10,9 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import copy
import logging import logging
import time import socket
import six import six
@ -20,13 +21,20 @@ from keystoneauth1 import session
from neutronclient.common import exceptions as n_exc from neutronclient.common import exceptions as n_exc
from neutronclient.v2_0 import client from neutronclient.v2_0 import client
from octaviaclient.api.v2 import octavia from octaviaclient.api.v2 import octavia
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_messaging.rpc import dispatcher
from oslo_utils import excutils from oslo_utils import excutils
from neutron.common import config as neutron_config
from octavia_lib.api.drivers import driver_lib
from vmware_nsx.api_replay import utils from vmware_nsx.api_replay import utils
from vmware_nsx.common import nsxv_constants from vmware_nsx.common import nsxv_constants
from vmware_nsx.services.lbaas.octavia import constants as d_const
logging.basicConfig(level=logging.INFO)
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
LOG.setLevel(logging.INFO)
# For internal testing only # For internal testing only
use_old_keystone_on_dest = False use_old_keystone_on_dest = False
@ -41,7 +49,15 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
dest_os_username, dest_os_user_domain_id, dest_os_username, dest_os_user_domain_id,
dest_os_tenant_name, dest_os_tenant_domain_id, dest_os_tenant_name, dest_os_tenant_domain_id,
dest_os_password, dest_os_auth_url, dest_plugin, dest_os_password, dest_os_auth_url, dest_plugin,
use_old_keystone, logfile, max_retry): use_old_keystone,
octavia_os_username, octavia_os_user_domain_id,
octavia_os_tenant_name, octavia_os_tenant_domain_id,
octavia_os_password, octavia_os_auth_url,
neutron_conf, logfile, max_retry):
# Init config and logging
if neutron_conf:
neutron_config.init(args=['--config-file', neutron_conf])
if logfile: if logfile:
f_handler = logging.FileHandler(logfile) f_handler = logging.FileHandler(logfile)
@ -54,6 +70,7 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
# connect to both clients # connect to both clients
if use_old_keystone: if use_old_keystone:
LOG.info("Using old keystone for source neutron")
# Since we are not sure what keystone version will be used on the # Since we are not sure what keystone version will be used on the
# source setup, we add an option to use the v2 client # source setup, we add an option to use the v2 client
self.source_neutron = client.Client( self.source_neutron = client.Client(
@ -61,7 +78,6 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
tenant_name=source_os_tenant_name, tenant_name=source_os_tenant_name,
password=source_os_password, password=source_os_password,
auth_url=source_os_auth_url) auth_url=source_os_auth_url)
self.source_octavia = None
else: else:
self.source_neutron = self.connect_to_client( self.source_neutron = self.connect_to_client(
username=source_os_username, username=source_os_username,
@ -70,21 +86,14 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
tenant_domain_id=source_os_tenant_domain_id, tenant_domain_id=source_os_tenant_domain_id,
password=source_os_password, password=source_os_password,
auth_url=source_os_auth_url) auth_url=source_os_auth_url)
self.source_octavia = self.connect_to_octavia(
username=source_os_username,
user_domain_id=source_os_user_domain_id,
tenant_name=source_os_tenant_name,
tenant_domain_id=source_os_tenant_domain_id,
password=source_os_password,
auth_url=source_os_auth_url)
if use_old_keystone_on_dest: if use_old_keystone_on_dest:
LOG.info("Using old keystone for destination neutron")
self.dest_neutron = client.Client( self.dest_neutron = client.Client(
username=dest_os_username, username=dest_os_username,
tenant_name=dest_os_tenant_name, tenant_name=dest_os_tenant_name,
password=dest_os_password, password=dest_os_password,
auth_url=dest_os_auth_url) auth_url=dest_os_auth_url)
self.dest_octavia = None
else: else:
self.dest_neutron = self.connect_to_client( self.dest_neutron = self.connect_to_client(
username=dest_os_username, username=dest_os_username,
@ -93,13 +102,17 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
tenant_domain_id=dest_os_tenant_domain_id, tenant_domain_id=dest_os_tenant_domain_id,
password=dest_os_password, password=dest_os_password,
auth_url=dest_os_auth_url) auth_url=dest_os_auth_url)
self.dest_octavia = self.connect_to_octavia(
username=dest_os_username, if octavia_os_auth_url:
user_domain_id=dest_os_user_domain_id, self.octavia = self.connect_to_octavia(
tenant_name=dest_os_tenant_name, username=octavia_os_username,
tenant_domain_id=dest_os_tenant_domain_id, user_domain_id=octavia_os_user_domain_id,
password=dest_os_password, tenant_name=octavia_os_tenant_name,
auth_url=dest_os_auth_url) tenant_domain_id=octavia_os_tenant_domain_id,
password=octavia_os_password,
auth_url=octavia_os_auth_url)
else:
self.octavia = None
self.dest_plugin = dest_plugin self.dest_plugin = dest_plugin
@ -112,7 +125,7 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
self.migrate_floatingips() self.migrate_floatingips()
self.migrate_routers_routes(routers_routes) self.migrate_routers_routes(routers_routes)
self.migrate_fwaas() self.migrate_fwaas()
if self.source_octavia and self.dest_octavia: if self.octavia:
self.migrate_octavia() self.migrate_octavia()
LOG.info("NSX migration is Done.") LOG.info("NSX migration is Done.")
@ -430,6 +443,7 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
source_networks = self.source_neutron.list_networks()['networks'] source_networks = self.source_neutron.list_networks()['networks']
dest_networks = self.dest_neutron.list_networks()['networks'] dest_networks = self.dest_neutron.list_networks()['networks']
dest_ports = self.dest_neutron.list_ports()['ports'] dest_ports = self.dest_neutron.list_ports()['ports']
dest_subnets = self.dest_neutron.list_subnets()['subnets']
remove_qos = False remove_qos = False
if not self.dest_qos_support: if not self.dest_qos_support:
@ -485,6 +499,12 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
dhcp_subnets = [] dhcp_subnets = []
count_dhcp_subnet = 0 count_dhcp_subnet = 0
for subnet_id in network['subnets']: for subnet_id in network['subnets']:
# only create subnet if the dest server doesn't have it
if self.have_id(subnet_id, dest_subnets):
LOG.info("Skip network %s: Already exists on the "
"destination", network['id'])
continue
subnet = self.find_subnet_by_id(subnet_id, source_subnets) subnet = self.find_subnet_by_id(subnet_id, source_subnets)
body = self.prepare_subnet(subnet) body = self.prepare_subnet(subnet)
@ -528,10 +548,6 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
except n_exc.BadRequest as e: except n_exc.BadRequest as e:
LOG.error("Failed to create subnet: %(subnet)s: %(e)s", LOG.error("Failed to create subnet: %(subnet)s: %(e)s",
{'subnet': subnet, 'e': e}) {'subnet': subnet, 'e': e})
# NOTE(arosen): this occurs here if you run the script
# multiple times as we don't currently
# preserve the subnet_id. Also, 409 would be a better
# response code for this in neutron :(
# create the ports on the network # create the ports on the network
ports = self.get_ports_on_network(network['id'], source_ports) ports = self.get_ports_on_network(network['id'], source_ports)
@ -741,93 +757,75 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
LOG.info("FWaaS V2 migration done") LOG.info("FWaaS V2 migration done")
def _wait_for_lb_up(self, lb_id): def _delete_octavia_lb(self, body):
retry_num = 0 kw = {'loadbalancer': body}
while retry_num < self.max_retry: self.octavia_rpc_client.call({}, 'loadbalancer_delete_cascade', **kw)
lb = self.dest_octavia.load_balancer_show(lb_id)
if not lb['provisioning_status'].startswith('PENDING'):
if lb['provisioning_status'] == 'ACTIVE':
return True
# No point in trying any more
return False
retry_num = retry_num + 1
time.sleep(1)
return False
def _migrate_octavia_lb(self, lb, orig_map): def _migrate_octavia_lb(self, lb, orig_map):
# Creating all loadbalancers resources on the new nsx driver
# using RPC calls to the plugin listener.
# Create the loadbalancer: # Create the loadbalancer:
body = self.prepare_lb_loadbalancer(lb) lb_body = self.prepare_lb_loadbalancer(lb)
try: kw = {'loadbalancer': lb_body}
new_lb = self.dest_octavia.load_balancer_create( if not self.octavia_rpc_client.call({}, 'loadbalancer_create', **kw):
json={'loadbalancer': body})['loadbalancer'] LOG.error("Failed to create loadbalancer (%s)", lb_body)
except Exception as e: self._delete_octavia_lb(lb_body)
LOG.error("Failed to create loadbalancer (%(lb)s): %(e)s",
{'lb': lb, 'e': e})
return
new_lb_id = new_lb['id']
if not self._wait_for_lb_up(new_lb_id):
LOG.error("New loadbalancer %s does not become active", new_lb_id)
return return
lb_id = lb['id']
lb_body_for_deletion = copy.deepcopy(lb_body)
lb_body_for_deletion['listeners'] = []
lb_body_for_deletion['pools'] = []
listeners_map = {} listeners_map = {}
for listener_dict in lb.get('listeners', []): for listener_dict in lb.get('listeners', []):
listener_id = listener_dict['id'] listener_id = listener_dict['id']
listener = orig_map['listeners'][listener_id] listener = orig_map['listeners'][listener_id]
body = self.prepare_lb_listener(listener) body = self.prepare_lb_listener(listener, lb_body)
# Update loadbalancer in listener body['loadbalancer'] = lb_body
body['loadbalancer_id'] = new_lb_id body['loadbalancer_id'] = lb_id
try: kw = {'listener': body, 'cert': None}
new_listener = self.dest_octavia.listener_create( if not self.octavia_rpc_client.call({}, 'listener_create', **kw):
json={'listener': body})['listener'] LOG.error("Failed to create loadbalancer %(lb)s listener "
except Exception as e: "(%(list)s)", {'list': listener, 'lb': lb_id})
LOG.error("Failed to create listener (%(list)s): %(e)s", self._delete_octavia_lb(lb_body_for_deletion)
{'list': listener, 'e': e})
return return
if not self._wait_for_lb_up(new_lb_id): listeners_map[listener_id] = body
LOG.error("New loadbalancer %s does not become active after " lb_body_for_deletion['listeners'].append(body)
"listener creation", new_lb_id)
return
# map old-id to new
listeners_map[listener_id] = new_listener['id']
pools_map = {}
for pool_dict in lb.get('pools', []): for pool_dict in lb.get('pools', []):
pool_id = pool_dict['id'] pool_id = pool_dict['id']
pool = orig_map['pools'][pool_id] pool = orig_map['pools'][pool_id]
body = self.prepare_lb_pool(pool) pool_body = self.prepare_lb_pool(pool, lb_body)
# Update loadbalancer and listeners in pool # Update listeners in pool
body['loadbalancer_id'] = new_lb_id
if pool.get('listeners'): if pool.get('listeners'):
body['listener_id'] = listeners_map[pool['listeners'][0]['id']] listener_id = pool['listeners'][0]['id']
try: pool_body['listener_id'] = listener_id
new_pool = self.dest_octavia.pool_create( pool_body['listener'] = listeners_map.get(listener_id)
json={'pool': body})['pool'] kw = {'pool': pool_body}
except Exception as e: if not self.octavia_rpc_client.call({}, 'pool_create', **kw):
LOG.error("Failed to create pool (%(pool)s): %(e)s", LOG.error("Failed to create loadbalancer %(lb)s pool "
{'pool': pool, 'e': e}) "(%(pool)s)", {'pool': pool, 'lb': lb_id})
self._delete_octavia_lb(lb_body_for_deletion)
return return
if not self._wait_for_lb_up(new_lb_id): lb_body_for_deletion['pools'].append(pool)
LOG.error("New loadbalancer %s does not become active after "
"pool creation", new_lb_id)
return
# map old-id to new
pools_map[pool_id] = new_pool['id']
# Add members to this pool # Add members to this pool
source_members = self.source_octavia.member_list(pool_id)[ pool_members = self.octavia.member_list(pool_id)['members']
'members'] for member in pool_members:
for member in source_members: body = self.prepare_lb_member(member, lb_body)
body = self.prepare_lb_member(member) if not member['subnet_id']:
try: # Add the loadbalancer subnet
self.dest_octavia.member_create( body['subnet_id'] = lb_body['vip_subnet_id']
new_pool['id'], json={'member': body})['member']
except Exception as e: body['pool'] = pool_body
LOG.error("Failed to create member (%(member)s): %(e)s", kw = {'member': body}
{'member': member, 'e': e}) if not self.octavia_rpc_client.call({}, 'member_create', **kw):
return LOG.error("Failed to create pool %(pool)s member "
if not self._wait_for_lb_up(new_lb_id): "(%(member)s)",
LOG.error("New loadbalancer %s does not become active " {'member': member, 'pool': pool_id})
"after member creation", new_lb_id) self._delete_octavia_lb(lb_body_for_deletion)
return return
# Add pool health monitor # Add pool health monitor
@ -835,62 +833,42 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
hm_id = pool['healthmonitor_id'] hm_id = pool['healthmonitor_id']
hm = orig_map['hms'][hm_id] hm = orig_map['hms'][hm_id]
body = self.prepare_lb_hm(hm) body = self.prepare_lb_hm(hm)
body['pool'] = pool_body
# Update pool id in hm # Update pool id in hm
body['pool_id'] = new_pool['id'] kw = {'healthmonitor': body}
try: if not self.octavia_rpc_client.call(
self.dest_octavia.health_monitor_create( {}, 'healthmonitor_create', **kw):
json={'healthmonitor': body})['healthmonitor'] LOG.error("Failed to create pool %(pool)s healthmonitor "
except Exception as e: "(%(hm)s)", {'hm': hm, 'pool': pool_id})
LOG.error("Failed to create healthmonitor (%(hm)s): %(e)s", self._delete_octavia_lb(lb_body_for_deletion)
{'hm': hm, 'e': e})
return
if not self._wait_for_lb_up(new_lb_id):
LOG.error("New loadbalancer %s does not become active "
"after health monitor creation", new_lb_id)
return return
lb_body_for_deletion['pools'][-1]['healthmonitor'] = body
# Add listeners L7 policies # Add listeners L7 policies
for listener_id in listeners_map: for listener_id in listeners_map.keys():
listener = orig_map['listeners'][listener_id] listener = orig_map['listeners'][listener_id]
for l7pol_dict in listener.get('l7policies', []): for l7pol_dict in listener.get('l7policies', []):
l7pol = orig_map['l7pols'][l7pol_dict['id']] l7_pol_id = l7pol_dict['id']
body = self.prepare_lb_l7policy(l7pol) l7pol = orig_map['l7pols'][l7_pol_id]
# Update pool id in l7 policy pol_body = self.prepare_lb_l7policy(l7pol)
body['listener_id'] = listeners_map[listener_id]
# update redirect_pool_id
if l7pol.get('redirect_pool_id'):
body['redirect_pool_id'] = pools_map[
l7pol['redirect_pool_id']]
try:
new_pol = self.dest_octavia.l7policy_create(
json={'l7policy': body})['l7policy']
except Exception as e:
LOG.error("Failed to create l7policy (%(l7pol)s): "
"%(e)s", {'l7pol': l7pol, 'e': e})
return
if not self._wait_for_lb_up(new_lb_id):
LOG.error("New loadbalancer %s does not become active "
"after L7 policy creation", new_lb_id)
return
# Add the rules of this policy # Add the rules of this policy
source_l7rules = self.source_octavia.l7rule_list( source_l7rules = self.octavia.l7rule_list(
l7pol['id'])['rules'] l7_pol_id)['rules']
for rule in source_l7rules: for rule in source_l7rules:
body = self.prepare_lb_l7rule(rule) rule_body = self.prepare_lb_l7rule(rule)
try: pol_body['rules'].append(rule_body)
self.dest_octavia.l7rule_create(
new_pol['id'], json={'rule': body})['rule'] kw = {'l7policy': pol_body}
except Exception as e: if not self.octavia_rpc_client.call(
LOG.error("Failed to create l7rule (%(rule)s): " {}, 'l7policy_create', **kw):
"%(e)s", {'rule': rule, 'e': e}) LOG.error("Failed to create l7policy (%(l7pol)s)",
return {'l7pol': l7pol})
if not self._wait_for_lb_up(new_lb_id): self._delete_octavia_lb(lb_body_for_deletion)
LOG.error("New loadbalancer %s does not become "
"active after L7 rule creation",
new_lb_id)
return return
LOG.info("Created loadbalancer %s", lb_id)
def _map_orig_objects_of_type(self, source_objects): def _map_orig_objects_of_type(self, source_objects):
result = {} result = {}
for obj in source_objects: for obj in source_objects:
@ -908,35 +886,58 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
return result return result
def migrate_octavia(self): def migrate_octavia(self):
"""Migrates Octavia objects from source to dest neutron. """Migrates Octavia NSX objects to the new neutron driver.
The Octavia proccess & DB will remain unchanged.
Right now the Octavia objects are created with new IDS, and Using RPC connection to connect directly with the new plugin driver.
do not keep their original IDs
""" """
# TODO(asarfaty): Keep original ids # Read all existing octavia resources
try: try:
source_loadbalancers = self.source_octavia.\ loadbalancers = self.octavia.load_balancer_list()['loadbalancers']
load_balancer_list()['loadbalancers'] listeners = self.octavia.listener_list()['listeners']
source_listeners = self.source_octavia.listener_list()['listeners'] pools = self.octavia.pool_list()['pools']
source_pools = self.source_octavia.pool_list()['pools'] hms = self.octavia.health_monitor_list()['healthmonitors']
source_hms = self.source_octavia.\ l7pols = self.octavia.l7policy_list()['l7policies']
health_monitor_list()['healthmonitors']
source_l7pols = self.source_octavia.l7policy_list()['l7policies']
except Exception as e: except Exception as e:
# Octavia might be disabled in the source # Octavia might be disabled in the source
LOG.info("Octavia was not found on the source server: %s", e) LOG.info("Octavia was not found on the server: %s", e)
return return
try: # Init the RPC connection for sending messages to the octavia driver
self.dest_octavia.load_balancer_list() topic = d_const.OCTAVIA_TO_DRIVER_MIGRATION_TOPIC
except Exception as e: transport = messaging.get_rpc_transport(cfg.CONF)
# Octavia might be disabled in the destination target = messaging.Target(topic=topic, exchange="common",
LOG.warning("Skipping Octavia migration. Octavia was not found " namespace='control', fanout=False,
"on the destination server: %s", e) version='1.0')
return self.octavia_rpc_client = messaging.RPCClient(transport, target)
orig_map = self._map_orig_lb_objects(source_listeners, source_pools,
source_hms, source_l7pols) # Initialize RPC listener for getting status updates from the driver
total_num = len(source_loadbalancers) # so that the rsource status will not change in the octavia DB
topic = d_const.DRIVER_TO_OCTAVIA_MIGRATION_TOPIC
server = socket.gethostname()
target = messaging.Target(topic=topic, server=server,
exchange="common", fanout=False)
class MigrationOctaviaDriverEndpoint(driver_lib.DriverLibrary):
target = messaging.Target(namespace="control", version='1.0')
def update_loadbalancer_status(self, **kw):
# Do nothing
pass
endpoints = [MigrationOctaviaDriverEndpoint]
access_policy = dispatcher.DefaultRPCAccessPolicy
self.octavia_rpc_server = messaging.get_rpc_server(
transport, target, endpoints, executor='threading',
access_policy=access_policy)
self.octavia_rpc_server.start()
orig_map = self._map_orig_lb_objects(listeners, pools,
hms, l7pols)
total_num = len(loadbalancers)
LOG.info("Migrating %d loadbalancer(s)", total_num) LOG.info("Migrating %d loadbalancer(s)", total_num)
for lb in source_loadbalancers: for lb in loadbalancers:
if lb['provisioning_status'] == 'ACTIVE':
self._migrate_octavia_lb(lb, orig_map) self._migrate_octavia_lb(lb, orig_map)
else:
LOG.info("Skipping %s loadbalancer %s",
lb['provisioning_status'], lb['id'])

View File

@ -73,7 +73,6 @@ class PrepareObjectForMigration(object):
drop_subnet_fields = basic_ignore_fields + [ drop_subnet_fields = basic_ignore_fields + [
'advanced_service_providers', 'advanced_service_providers',
'id',
'service_types'] 'service_types']
drop_port_fields = basic_ignore_fields + [ drop_port_fields = basic_ignore_fields + [
@ -103,14 +102,14 @@ class PrepareObjectForMigration(object):
drop_fwaas_group_fields = ['status'] drop_fwaas_group_fields = ['status']
lb_ignore_fields = ['created_at', 'updated_at', 'operating_status', lb_ignore_fields = ['created_at', 'updated_at', 'operating_status',
'provisioning_status', 'id'] 'provisioning_status']
drop_lb_loadbalancer_fields = lb_ignore_fields + [ drop_lb_loadbalancer_fields = lb_ignore_fields + [
'listeners', 'pools', # Those objects will be created laster 'listeners', 'pools', # Those objects will be created later
'vip_subnet_id', # vip_port_id will be used
'flavor_id', # not supported by the driver 'flavor_id', # not supported by the driver
'vip_qos_policy_id', # not supported by the driver
] ]
drop_lb_listener_fields = lb_ignore_fields + [ drop_lb_listener_fields = lb_ignore_fields + [
'loadbalancers', 'l7policies', 'default_pool_id'] 'l7policies', 'default_pool_id']
drop_lb_pool_fields = lb_ignore_fields + [ drop_lb_pool_fields = lb_ignore_fields + [
'loadbalancers', 'healthmonitor_id', 'listeners', 'members'] 'loadbalancers', 'healthmonitor_id', 'listeners', 'members']
drop_lb_member_fields = lb_ignore_fields drop_lb_member_fields = lb_ignore_fields
@ -192,6 +191,7 @@ class PrepareObjectForMigration(object):
# external networks needs some special care # external networks needs some special care
if body.get('router:external'): if body.get('router:external'):
fields_reset = False fields_reset = False
# TODO(asarfaty): map external network neutron ids to Policy tier0
for field in ['provider:network_type', 'provider:segmentation_id', for field in ['provider:network_type', 'provider:segmentation_id',
'provider:physical_network']: 'provider:physical_network']:
if field in body: if field in body:
@ -264,10 +264,6 @@ class PrepareObjectForMigration(object):
if 'device_owner' not in body: if 'device_owner' not in body:
body['device_owner'] = "" body['device_owner'] = ""
if body.get('device_owner') == 'Octavia':
# remove device id & owner. Octavia will re-set it.
body['device_id'] = ""
body['device_owner'] = ""
return body return body
def prepare_floatingip(self, fip, direct_call=False): def prepare_floatingip(self, fip, direct_call=False):
@ -297,20 +293,31 @@ class PrepareObjectForMigration(object):
def prepare_lb_loadbalancer(self, lb_obj): def prepare_lb_loadbalancer(self, lb_obj):
return self.drop_fields(lb_obj, self.drop_lb_loadbalancer_fields) return self.drop_fields(lb_obj, self.drop_lb_loadbalancer_fields)
def prepare_lb_listener(self, lb_obj): def prepare_lb_listener(self, listener_obj, lb_body):
return self.drop_fields(lb_obj, self.drop_lb_listener_fields) body = self.drop_fields(listener_obj, self.drop_lb_listener_fields)
body['loadbalancer'] = lb_body
body['loadbalancer_id'] = lb_body['id']
return body
def prepare_lb_pool(self, lb_obj): def prepare_lb_pool(self, pool_obj, lb_body):
return self.drop_fields(lb_obj, self.drop_lb_pool_fields) body = self.drop_fields(pool_obj, self.drop_lb_pool_fields)
body['loadbalancer'] = lb_body
body['loadbalancer_id'] = lb_body['id']
return body
def prepare_lb_member(self, lb_obj): def prepare_lb_member(self, mem_obj, lb_body):
return self.drop_fields(lb_obj, self.drop_lb_member_fields) body = self.drop_fields(mem_obj, self.drop_lb_member_fields)
body['loadbalancer'] = lb_body
body['loadbalancer_id'] = lb_body['id']
return body
def prepare_lb_hm(self, lb_obj): def prepare_lb_hm(self, lb_obj):
return self.drop_fields(lb_obj, self.drop_lb_hm_fields) return self.drop_fields(lb_obj, self.drop_lb_hm_fields)
def prepare_lb_l7policy(self, lb_obj): def prepare_lb_l7policy(self, lb_obj):
return self.drop_fields(lb_obj, self.drop_lb_l7policy_fields) body = self.drop_fields(lb_obj, self.drop_lb_l7policy_fields)
body['rules'] = []
return body
def prepare_lb_l7rule(self, lb_obj): def prepare_lb_l7rule(self, lb_obj):
return self.drop_fields(lb_obj, self.drop_lb_l7rule_fields) return self.drop_fields(lb_obj, self.drop_lb_l7rule_fields)

View File

@ -37,6 +37,9 @@ RESOURCE_ATTRIBUTE_MAP = {
'networks': { 'networks': {
'id': ID_WITH_POST, 'id': ID_WITH_POST,
}, },
'subnets': {
'id': ID_WITH_POST,
},
'security_groups': { 'security_groups': {
'id': ID_WITH_POST, 'id': ID_WITH_POST,
'name': {'allow_post': True, 'allow_put': True, 'name': {'allow_post': True, 'allow_put': True,

View File

@ -945,6 +945,7 @@ class NsxPolicyPlugin(nsx_plugin_common.NsxPluginV3Base):
self.nsxpolicy.tier1.update(router_id, self.nsxpolicy.tier1.update(router_id,
ipv6_ndra_profile_id=profile_id) ipv6_ndra_profile_id=profile_id)
@nsx_plugin_common.api_replay_mode_wrapper
def create_subnet(self, context, subnet): def create_subnet(self, context, subnet):
return self._create_subnet(context, subnet) return self._create_subnet(context, subnet)

View File

@ -1220,6 +1220,7 @@ class NsxV3Plugin(nsx_plugin_common.NsxPluginV3Base,
LOG.warning("Failed to update network %(id)s dhcp server on " LOG.warning("Failed to update network %(id)s dhcp server on "
"the NSX: %(e)s", {'id': network['id'], 'e': e}) "the NSX: %(e)s", {'id': network['id'], 'e': e})
@nsx_plugin_common.api_replay_mode_wrapper
def create_subnet(self, context, subnet): def create_subnet(self, context, subnet):
return self._create_subnet(context, subnet) return self._create_subnet(context, subnet)

View File

@ -103,8 +103,11 @@ class EdgeMemberManagerFromDict(base_mgr.NsxpLoadbalancerBaseManager):
def create(self, context, member, completor): def create(self, context, member, completor):
pool_client = self.core_plugin.nsxpolicy.load_balancer.lb_pool pool_client = self.core_plugin.nsxpolicy.load_balancer.lb_pool
self._validate_member_lb_connectivity(context, member, completor) self._validate_member_lb_connectivity(context, member, completor)
if member.get('subnet_id'):
network = lb_utils.get_network_from_subnet( network = lb_utils.get_network_from_subnet(
context, self.core_plugin, member['subnet_id']) context, self.core_plugin, member['subnet_id'])
else:
network = None
if network and network.get('router:external'): if network and network.get('router:external'):
fixed_ip = self._get_info_from_fip(context, member['address']) fixed_ip = self._get_info_from_fip(context, member['address'])
else: else:

View File

@ -14,7 +14,9 @@
# under the License. # under the License.
OCTAVIA_TO_DRIVER_TOPIC = 'vmware_nsx__lb_listener' OCTAVIA_TO_DRIVER_TOPIC = 'vmware_nsx__lb_listener'
OCTAVIA_TO_DRIVER_MIGRATION_TOPIC = 'vmware_nsx__lb_listener_migration'
DRIVER_TO_OCTAVIA_TOPIC = 'vmware_nsx__driver_listener' DRIVER_TO_OCTAVIA_TOPIC = 'vmware_nsx__driver_listener'
DRIVER_TO_OCTAVIA_MIGRATION_TOPIC = 'vmware_nsx__driver_listener_migration'
LOADBALANCER = 'loadbalancer' LOADBALANCER = 'loadbalancer'
LISTENER = 'listener' LISTENER = 'listener'

View File

@ -44,6 +44,9 @@ class NSXOctaviaListener(object):
loadbalancer, member, pool) loadbalancer, member, pool)
def _init_rpc_messaging(self): def _init_rpc_messaging(self):
if cfg.CONF.api_replay_mode:
topic = constants.DRIVER_TO_OCTAVIA_MIGRATION_TOPIC
else:
topic = constants.DRIVER_TO_OCTAVIA_TOPIC topic = constants.DRIVER_TO_OCTAVIA_TOPIC
transport = messaging.get_rpc_transport(cfg.CONF) transport = messaging.get_rpc_transport(cfg.CONF)
target = messaging.Target(topic=topic, exchange="common", target = messaging.Target(topic=topic, exchange="common",
@ -54,6 +57,9 @@ class NSXOctaviaListener(object):
def _init_rpc_listener(self, healthmonitor, l7policy, l7rule, listener, def _init_rpc_listener(self, healthmonitor, l7policy, l7rule, listener,
loadbalancer, member, pool): loadbalancer, member, pool):
# Initialize RPC listener # Initialize RPC listener
if cfg.CONF.api_replay_mode:
topic = constants.OCTAVIA_TO_DRIVER_MIGRATION_TOPIC
else:
topic = constants.OCTAVIA_TO_DRIVER_TOPIC topic = constants.OCTAVIA_TO_DRIVER_TOPIC
server = socket.gethostname() server = socket.gethostname()
transport = messaging.get_rpc_transport(cfg.CONF) transport = messaging.get_rpc_transport(cfg.CONF)
@ -299,6 +305,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver loadbalancer_create failed %s', e) LOG.error('NSX driver loadbalancer_create failed %s', e)
completor(success=False) completor(success=False)
return False
return True
@log_helpers.log_method_call @log_helpers.log_method_call
def loadbalancer_delete_cascade(self, ctxt, loadbalancer): def loadbalancer_delete_cascade(self, ctxt, loadbalancer):
@ -336,6 +344,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver loadbalancer_delete_cascade failed %s', e) LOG.error('NSX driver loadbalancer_delete_cascade failed %s', e)
completor(success=False) completor(success=False)
return False
return True
@log_helpers.log_method_call @log_helpers.log_method_call
def loadbalancer_delete(self, ctxt, loadbalancer, cascade=False): def loadbalancer_delete(self, ctxt, loadbalancer, cascade=False):
@ -350,6 +360,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver loadbalancer_delete failed %s', e) LOG.error('NSX driver loadbalancer_delete failed %s', e)
completor(success=False) completor(success=False)
return False
return True
@log_helpers.log_method_call @log_helpers.log_method_call
def loadbalancer_update(self, ctxt, old_loadbalancer, new_loadbalancer): def loadbalancer_update(self, ctxt, old_loadbalancer, new_loadbalancer):
@ -362,6 +374,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver loadbalancer_update failed %s', e) LOG.error('NSX driver loadbalancer_update failed %s', e)
completor(success=False) completor(success=False)
return False
return True
# Listener # Listener
@log_helpers.log_method_call @log_helpers.log_method_call
@ -375,6 +389,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver listener_create failed %s', e) LOG.error('NSX driver listener_create failed %s', e)
completor(success=False) completor(success=False)
return False
return True
@log_helpers.log_method_call @log_helpers.log_method_call
def listener_delete(self, ctxt, listener): def listener_delete(self, ctxt, listener):
@ -386,6 +402,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver listener_delete failed %s', e) LOG.error('NSX driver listener_delete failed %s', e)
completor(success=False) completor(success=False)
return False
return True
@log_helpers.log_method_call @log_helpers.log_method_call
def listener_update(self, ctxt, old_listener, new_listener, cert): def listener_update(self, ctxt, old_listener, new_listener, cert):
@ -398,6 +416,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver listener_update failed %s', e) LOG.error('NSX driver listener_update failed %s', e)
completor(success=False) completor(success=False)
return False
return True
# Pool # Pool
@log_helpers.log_method_call @log_helpers.log_method_call
@ -410,6 +430,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver pool_create failed %s', e) LOG.error('NSX driver pool_create failed %s', e)
completor(success=False) completor(success=False)
return False
return True
@log_helpers.log_method_call @log_helpers.log_method_call
def pool_delete(self, ctxt, pool): def pool_delete(self, ctxt, pool):
@ -421,6 +443,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver pool_delete failed %s', e) LOG.error('NSX driver pool_delete failed %s', e)
completor(success=False) completor(success=False)
return False
return True
@log_helpers.log_method_call @log_helpers.log_method_call
def pool_update(self, ctxt, old_pool, new_pool): def pool_update(self, ctxt, old_pool, new_pool):
@ -432,6 +456,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver pool_update failed %s', e) LOG.error('NSX driver pool_update failed %s', e)
completor(success=False) completor(success=False)
return False
return True
# Member # Member
@log_helpers.log_method_call @log_helpers.log_method_call
@ -444,6 +470,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver member_create failed %s', e) LOG.error('NSX driver member_create failed %s', e)
completor(success=False) completor(success=False)
return False
return True
@log_helpers.log_method_call @log_helpers.log_method_call
def member_delete(self, ctxt, member): def member_delete(self, ctxt, member):
@ -455,6 +483,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver member_delete failed %s', e) LOG.error('NSX driver member_delete failed %s', e)
completor(success=False) completor(success=False)
return False
return True
@log_helpers.log_method_call @log_helpers.log_method_call
def member_update(self, ctxt, old_member, new_member): def member_update(self, ctxt, old_member, new_member):
@ -466,6 +496,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver member_update failed %s', e) LOG.error('NSX driver member_update failed %s', e)
completor(success=False) completor(success=False)
return False
return True
# Health Monitor # Health Monitor
@log_helpers.log_method_call @log_helpers.log_method_call
@ -478,6 +510,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver healthmonitor_create failed %s', e) LOG.error('NSX driver healthmonitor_create failed %s', e)
completor(success=False) completor(success=False)
return False
return True
@log_helpers.log_method_call @log_helpers.log_method_call
def healthmonitor_delete(self, ctxt, healthmonitor): def healthmonitor_delete(self, ctxt, healthmonitor):
@ -489,6 +523,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver healthmonitor_delete failed %s', e) LOG.error('NSX driver healthmonitor_delete failed %s', e)
completor(success=False) completor(success=False)
return False
return True
@log_helpers.log_method_call @log_helpers.log_method_call
def healthmonitor_update(self, ctxt, old_healthmonitor, new_healthmonitor): def healthmonitor_update(self, ctxt, old_healthmonitor, new_healthmonitor):
@ -501,6 +537,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver healthmonitor_update failed %s', e) LOG.error('NSX driver healthmonitor_update failed %s', e)
completor(success=False) completor(success=False)
return False
return True
# L7 Policy # L7 Policy
@log_helpers.log_method_call @log_helpers.log_method_call
@ -513,6 +551,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver l7policy_create failed %s', e) LOG.error('NSX driver l7policy_create failed %s', e)
completor(success=False) completor(success=False)
return False
return True
@log_helpers.log_method_call @log_helpers.log_method_call
def l7policy_delete(self, ctxt, l7policy): def l7policy_delete(self, ctxt, l7policy):
@ -524,6 +564,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver l7policy_delete failed %s', e) LOG.error('NSX driver l7policy_delete failed %s', e)
completor(success=False) completor(success=False)
return False
return True
@log_helpers.log_method_call @log_helpers.log_method_call
def l7policy_update(self, ctxt, old_l7policy, new_l7policy): def l7policy_update(self, ctxt, old_l7policy, new_l7policy):
@ -535,6 +577,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver l7policy_update failed %s', e) LOG.error('NSX driver l7policy_update failed %s', e)
completor(success=False) completor(success=False)
return False
return True
# L7 Rule # L7 Rule
@log_helpers.log_method_call @log_helpers.log_method_call
@ -546,6 +590,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver l7rule_create failed %s', e) LOG.error('NSX driver l7rule_create failed %s', e)
completor(success=False) completor(success=False)
return False
return True
@log_helpers.log_method_call @log_helpers.log_method_call
def l7rule_delete(self, ctxt, l7rule): def l7rule_delete(self, ctxt, l7rule):
@ -557,6 +603,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver l7rule_delete failed %s', e) LOG.error('NSX driver l7rule_delete failed %s', e)
completor(success=False) completor(success=False)
return False
return True
@log_helpers.log_method_call @log_helpers.log_method_call
def l7rule_update(self, ctxt, old_l7rule, new_l7rule): def l7rule_update(self, ctxt, old_l7rule, new_l7rule):
@ -567,6 +615,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e: except Exception as e:
LOG.error('NSX driver l7rule_update failed %s', e) LOG.error('NSX driver l7rule_update failed %s', e)
completor(success=False) completor(success=False)
return False
return True
class NSXOctaviaStatisticsCollector(object): class NSXOctaviaStatisticsCollector(object):