Browse Source

V2T migration: Support octavia barbican certificate

Also improve logging of errors

Change-Id: I261fb6c9eb8d6051ebf9d9a0bf68d417c72a0a1a
changes/27/776827/4
asarfaty 7 months ago
parent
commit
dd569f0702
  1. 245
      vmware_nsx/api_replay/client.py

245
vmware_nsx/api_replay/client.py

@ -16,6 +16,7 @@ import socket
import six
from barbicanclient.v1 import client as barbican
from keystoneauth1 import identity
from keystoneauth1 import session
from neutronclient.common import exceptions as n_exc
@ -25,7 +26,7 @@ from oslo_config import cfg
import oslo_messaging as messaging
from oslo_messaging.rpc import dispatcher
from oslo_serialization import jsonutils
from oslo_utils import excutils
from oslo_utils import encodeutils
from neutron.common import config as neutron_config
from neutron_lib import constants as nl_constants
@ -41,9 +42,6 @@ LOG.setLevel(logging.INFO)
# For internal testing only
use_old_keystone_on_dest = False
# Error counter for the migration
n_errors = 0
class ApiReplayClient(utils.PrepareObjectForMigration):
@ -121,8 +119,17 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
password=octavia_os_password,
auth_url=octavia_os_auth_url,
cert_file=cert_file)
self.barbican = self.connect_to_barbican(
username=octavia_os_username,
user_domain_id=octavia_os_user_domain_id,
tenant_name=octavia_os_tenant_name,
tenant_domain_id=octavia_os_tenant_domain_id,
password=octavia_os_password,
auth_url=octavia_os_auth_url,
cert_file=cert_file)
else:
self.octavia = None
self.barbican = None
self.dest_plugin = dest_plugin
@ -147,6 +154,9 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
else:
self.int_vni_map = None
self.n_errors = 0
self.errors = []
LOG.info("Starting NSX migration to %s.", self.dest_plugin)
# Migrate all the objects
self.migrate_quotas()
@ -159,9 +169,14 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
self.migrate_fwaas()
if self.octavia:
self.migrate_octavia()
global n_errors
LOG.info("NSX migration is Done with %s errors.", n_errors)
exit(n_errors)
if self.n_errors:
LOG.error("NSX migration is Done with %s errors:", self.n_errors)
for err in self.errors:
LOG.error(err)
else:
LOG.info("NSX migration is Done with no errors")
exit(self.n_errors)
def _get_session(self, username, user_domain_id,
tenant_name, tenant_domain_id,
@ -204,6 +219,20 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
)
return client
def connect_to_barbican(self, username, user_domain_id,
tenant_name, tenant_domain_id,
password, auth_url, cert_file):
sess = self._get_session(username, user_domain_id,
tenant_name, tenant_domain_id,
password, auth_url, cert_file)
endpoint = sess.get_endpoint(service_type='key-manager')
client = barbican.Client(
session=sess,
service_type='key-manager',
endpoint=endpoint,
)
return client
def find_subnet_by_id(self, subnet_id, subnets):
for subnet in subnets:
if subnet['id'] == subnet_id:
@ -225,9 +254,12 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
return False
def migrate_quotas(self):
global n_errors
def add_error(self, msg):
self.n_errors = self.n_errors + 1
LOG.error(msg)
self.errors.append(msg)
def migrate_quotas(self):
source_quotas = self.source_neutron.list_quotas()['quotas']
dest_quotas = self.dest_neutron.list_quotas()['quotas']
@ -244,9 +276,8 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
{'count': count, 'total': total_num,
'q': new_quota})
except Exception as e:
LOG.error("Failed to create quota %(q)s: %(e)s",
{'q': quota, 'e': e})
n_errors = n_errors + 1
self.add_error("Failed to create quota %(q)s: %(e)s" %
{'q': quota, 'e': e})
def migrate_qos_rule(self, dest_policy, source_rule):
"""Add the QoS rule from the source to the QoS policy
@ -254,7 +285,6 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
If there is already a rule of that type, skip it since
the QoS policy can have only one rule of each type
"""
global n_errors
#TODO(asarfaty) also take rule direction into account once
#ingress support is upstream
rule_type = source_rule.get('type')
@ -279,14 +309,12 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
{'rule': rule_type, 'pol': pol_id})
LOG.info("created QoS policy %s rule %s", pol_id, rule)
except Exception as e:
LOG.error("Failed to create QoS rule %(rule)s for policy %(pol)s: "
"%(e)s", {'rule': body, 'pol': pol_id, 'e': e})
n_errors = n_errors + 1
self.add_error("Failed to create QoS rule %(rule)s for policy "
"%(pol)s: %(e)s" % {'rule': body, 'pol': pol_id,
'e': e})
def migrate_qos_policies(self):
"""Migrates QoS policies from source to dest neutron."""
global n_errors
# first fetch the QoS policies from both the
# source and destination neutron server
try:
@ -321,9 +349,8 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
new_pol = self.dest_neutron.create_qos_policy(
body={'policy': body})
except Exception as e:
LOG.error("Failed to create QoS policy %(pol)s: %(e)s",
{'pol': pol['id'], 'e': e})
n_errors = n_errors + 1
self.add_error("Failed to create QoS policy %(pol)s: "
"%(e)s" % {'pol': pol['id'], 'e': e})
continue
else:
LOG.info("Created QoS policy %s", new_pol)
@ -332,8 +359,6 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
def migrate_security_groups(self):
"""Migrates security groups from source to dest neutron."""
global n_errors
# first fetch the security groups from both the
# source and dest neutron server
source_sec_groups = self.source_neutron.list_security_groups()
@ -378,10 +403,8 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
{'count': count, 'total': total_num,
'sg': new_sg})
except Exception as e:
LOG.error("Failed to create security group (%(sg)s): "
"%(e)s",
{'sg': sg, 'e': e})
n_errors = n_errors + 1
self.add_error("Failed to create security group (%(sg)s): "
"%(e)s" % {'sg': sg, 'e': e})
# Use bulk rules creation for the rules of the SG
if not sg_rules:
@ -412,9 +435,8 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
LOG.debug("created %s security group rules for SG %s",
len(rules), sg['id'])
except Exception as e:
LOG.error("Failed to create security group %s "
"rules: %s", sg['id'], e)
n_errors = n_errors + 1
self.add_error("Failed to create security group %s "
"rules: %s" % (sg['id'], e))
def get_dest_availablity_zones(self, resource):
azs = self.dest_neutron.list_availability_zones()['availability_zones']
@ -429,7 +451,6 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
ports are set.
And return a dictionary of external gateway info per router
"""
global n_errors
try:
source_routers = self.source_neutron.list_routers()['routers']
except Exception:
@ -468,10 +489,9 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
try:
self.dest_neutron.create_network({'network': net_body})
except Exception as e:
LOG.error("Failed to create internal network for router "
"%(rtr)s: %(e)s",
{'rtr': router['id'], 'e': e})
n_errors = n_errors + 1
self.add_error("Failed to create internal network for "
"router %(rtr)s: %(e)s" %
{'rtr': router['id'], 'e': e})
dest_router = self.have_id(router['id'], dest_routers)
if dest_router is False:
body = self.prepare_router(router, dest_azs=dest_azs)
@ -482,14 +502,12 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
{'count': count, 'total': total_num,
'rtr': new_router})
except Exception as e:
LOG.error("Failed to create router %(rtr)s: %(e)s",
{'rtr': router, 'e': e})
n_errors = n_errors + 1
self.add_error("Failed to create router %(rtr)s: %(e)s" %
{'rtr': router, 'e': e})
return update_routes, gw_info
def migrate_routers_routes(self, routers_routes):
"""Add static routes to the created routers."""
global n_errors
total_num = len(routers_routes)
LOG.info("Migrating %s routers routes", total_num)
for count, (router_id, routes) in enumerate(
@ -501,13 +519,11 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
{'count': count, 'total': total_num,
'rtr': router_id})
except Exception as e:
LOG.error("Failed to add routes %(routes)s to router "
"%(rtr)s: %(e)s",
{'routes': routes, 'rtr': router_id, 'e': e})
n_errors = n_errors + 1
self.add_error("Failed to add routes %(routes)s to router "
"%(rtr)s: %(e)s" %
{'routes': routes, 'rtr': router_id, 'e': e})
def migrate_subnetpools(self):
global n_errors
subnetpools_map = {}
try:
source_subnetpools = self.source_neutron.list_subnetpools()[
@ -541,14 +557,12 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
dest_subnetpools = self.dest_neutron.list_subnetpools()[
'subnetpools']
except Exception as e:
LOG.error("Failed to create subnetpool %(pool)s: %(e)s",
{'pool': pool, 'e': e})
n_errors = n_errors + 1
self.add_error("Failed to create subnetpool %(pool)s: "
"%(e)s" % {'pool': pool, 'e': e})
return subnetpools_map
def migrate_networks_subnets_ports(self, routers_gw_info):
"""Migrates networks/ports/router-uplinks from src to dest neutron."""
global n_errors
source_ports = self.source_neutron.list_ports()['ports']
source_subnets = self.source_neutron.list_subnets()['subnets']
source_networks = self.source_neutron.list_networks()['networks']
@ -602,11 +616,8 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
{'count': count, 'total': total_num,
'net': created_net})
except Exception as e:
# Print the network and exception to help debugging
with excutils.save_and_reraise_exception():
LOG.error("Failed to create network %s", body)
n_errors = n_errors + 1
raise e
self.add_error("Failed to create network: %s : %s" % (body, e))
continue
subnets_map = {}
dhcp_subnets = []
@ -664,9 +675,8 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
dhcp_subnets.append({'id': created_subnet['id'],
'host_routes': sub_host_routes})
except n_exc.BadRequest as e:
LOG.error("Failed to create subnet: %(subnet)s: %(e)s",
{'subnet': subnet, 'e': e})
n_errors = n_errors + 1
self.add_error("Failed to create subnet: %(subnet)s: "
"%(e)s" % {'subnet': subnet, 'e': e})
# create the ports on the network
ports = self.get_ports_on_network(network['id'], source_ports)
@ -715,10 +725,9 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
'net': port['network_id']})
except Exception as e:
LOG.error("Failed to add router gateway with port "
"(%(port)s): %(e)s",
{'port': port, 'e': e})
n_errors = n_errors + 1
self.add_error("Failed to add router gateway with "
"port (%(port)s): %(e)s" %
{'port': port, 'e': e})
continue
# Let the neutron dhcp-agent recreate this on its own
@ -758,10 +767,9 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
# Note(asarfaty): also if the same network in
# source is attached to 2 routers, which the v3
# plugins does not support.
LOG.error("Failed to add router interface port"
"(%(port)s): %(e)s",
{'port': port, 'e': e})
n_errors = n_errors + 1
self.add_error("Failed to add router interface "
"port (%(port)s): %(e)s" %
{'port': port, 'e': e})
continue
try:
@ -770,9 +778,8 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
except Exception as e:
# NOTE(arosen): this occurs here if you run the
# script multiple times as we don't track this.
LOG.error("Failed to create port (%(port)s) : %(e)s",
{'port': port, 'e': e})
n_errors = n_errors + 1
self.add_error("Failed to create port (%(port)s) : "
"%(e)s" % {'port': port, 'e': e})
else:
ip_addr = None
if created_port.get('fixed_ips'):
@ -794,14 +801,12 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
self.dest_neutron.update_subnet(subnet['id'],
{'subnet': data})
except Exception as e:
LOG.error("Failed to enable DHCP on subnet %(subnet)s: "
"%(e)s",
{'subnet': subnet['id'], 'e': e})
n_errors = n_errors + 1
self.add_error("Failed to enable DHCP on subnet "
"%(subnet)s: %(e)s" %
{'subnet': subnet['id'], 'e': e})
def migrate_floatingips(self):
"""Migrates floatingips from source to dest neutron."""
global n_errors
try:
source_fips = self.source_neutron.list_floatingips()['floatingips']
except Exception:
@ -816,15 +821,19 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
LOG.info("Created floatingip %(count)s/%(total)s : %(fip)s",
{'count': count, 'total': total_num, 'fip': fip})
except Exception as e:
LOG.error("Failed to create floating ip (%(fip)s) : %(e)s",
{'fip': source_fip, 'e': e})
n_errors = n_errors + 1
self.add_error("Failed to create floating ip (%(fip)s) : "
"%(e)s" % {'fip': source_fip, 'e': e})
def _plural_res_type(self, resource_type):
if resource_type.endswith('y'):
return "%sies" % resource_type[:-1]
return "%ss" % resource_type
def _migrate_fwaas_resource(self, resource_type, source_objects,
dest_objects, prepare_method, create_method):
global n_errors
total_num = len(source_objects)
LOG.info("Migrating %s %ss", total_num, resource_type)
LOG.info("Migrating %s %s", total_num,
self._plural_res_type(resource_type))
for count, source_obj in enumerate(source_objects, 1):
# Check if the object already exists
if self.have_id(source_obj['id'], dest_objects):
@ -845,10 +854,10 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
{'resource': resource_type, 'count': count,
'total': total_num, 'obj': new_obj})
except Exception as e:
LOG.error("Failed to create %(resource)s (%(obj)s) : %(e)s",
{'resource': resource_type, 'obj': source_obj,
'e': e})
n_errors = n_errors + 1
self.add_error("Failed to create %(resource)s (%(obj)s) : "
"%(e)s" %
{'resource': resource_type, 'obj': source_obj,
'e': e})
def migrate_fwaas(self):
"""Migrates FWaaS V2 objects from source to dest neutron."""
@ -901,16 +910,44 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
kw = {'loadbalancer': body}
self.octavia_rpc_client.call({}, 'loadbalancer_delete_cascade', **kw)
def _migrate_octavia_lb(self, lb, orig_map):
def _create_lb_certificate(self, listener_dict):
# Extract Octavia certificate data into a dict which is readable by
# the listener_mgr
def get_certificate(cert_data):
if cert_data.certificate:
return encodeutils.to_utf8(
cert_data.certificate.payload)
return None
def get_private_key(cert_data):
if cert_data.private_key:
return encodeutils.to_utf8(
cert_data.private_key.payload)
return None
def get_private_key_passphrase(cert_data):
if cert_data.private_key_passphrase:
return encodeutils.to_utf8(
cert_data.private_key_passphrase.payload)
return None
if listener_dict.get('default_tls_container_ref'):
cert_data = self.barbican.containers.get(
container_ref=listener_dict['default_tls_container_ref'])
return {'ref': listener_dict['default_tls_container_ref'],
'certificate': get_certificate(cert_data),
'private_key': get_private_key(cert_data),
'passphrase': get_private_key_passphrase(cert_data)}
def _migrate_octavia_lb(self, lb, orig_map, count, total_num):
# Creating all loadbalancers resources on the new nsx driver
# using RPC calls to the plugin listener.
global n_errors
# Create the loadbalancer:
lb_body = self.prepare_lb_loadbalancer(lb)
kw = {'loadbalancer': lb_body}
if not self.octavia_rpc_client.call({}, 'loadbalancer_create', **kw):
LOG.error("Failed to create loadbalancer (%s)", lb_body)
self.add_error("Failed to create loadbalancer (%s)" % lb_body)
self._delete_octavia_lb(lb_body)
return
@ -923,15 +960,15 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
for listener_dict in lb.get('listeners', []):
listener_id = listener_dict['id']
listener = orig_map['listeners'][listener_id]
cert = self._create_lb_certificate(listener)
body = self.prepare_lb_listener(listener, lb_body)
body['loadbalancer'] = lb_body
body['loadbalancer_id'] = lb_id
kw = {'listener': body, 'cert': None}
kw = {'listener': body, 'cert': cert}
if not self.octavia_rpc_client.call({}, 'listener_create', **kw):
LOG.error("Failed to create loadbalancer %(lb)s listener "
"(%(list)s)", {'list': listener, 'lb': lb_id})
self.add_error("Failed to create loadbalancer %(lb)s listener "
"(%(list)s)" % {'list': listener, 'lb': lb_id})
self._delete_octavia_lb(lb_body_for_deletion)
n_errors = n_errors + 1
return
listeners_map[listener_id] = body
lb_body_for_deletion['listeners'].append(body)
@ -947,10 +984,9 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
pool_body['listener'] = listeners_map.get(listener_id)
kw = {'pool': pool_body}
if not self.octavia_rpc_client.call({}, 'pool_create', **kw):
LOG.error("Failed to create loadbalancer %(lb)s pool "
"(%(pool)s)", {'pool': pool, 'lb': lb_id})
self.add_error("Failed to create loadbalancer %(lb)s pool "
"(%(pool)s)" % {'pool': pool, 'lb': lb_id})
self._delete_octavia_lb(lb_body_for_deletion)
n_errors = n_errors + 1
return
lb_body_for_deletion['pools'].append(pool)
@ -965,11 +1001,10 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
body['pool'] = pool_body
kw = {'member': body}
if not self.octavia_rpc_client.call({}, 'member_create', **kw):
LOG.error("Failed to create pool %(pool)s member "
"(%(member)s)",
{'member': member, 'pool': pool_id})
self.add_error("Failed to create pool %(pool)s member "
"(%(member)s)" %
{'member': member, 'pool': pool_id})
self._delete_octavia_lb(lb_body_for_deletion)
n_errors = n_errors + 1
return
# Add pool health monitor
@ -982,10 +1017,10 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
kw = {'healthmonitor': body}
if not self.octavia_rpc_client.call(
{}, 'healthmonitor_create', **kw):
LOG.error("Failed to create pool %(pool)s healthmonitor "
"(%(hm)s)", {'hm': hm, 'pool': pool_id})
self.add_error("Failed to create pool %(pool)s "
"healthmonitor (%(hm)s)" %
{'hm': hm, 'pool': pool_id})
self._delete_octavia_lb(lb_body_for_deletion)
n_errors = n_errors + 1
return
lb_body_for_deletion['pools'][-1]['healthmonitor'] = body
@ -1007,13 +1042,13 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
kw = {'l7policy': pol_body}
if not self.octavia_rpc_client.call(
{}, 'l7policy_create', **kw):
LOG.error("Failed to create l7policy (%(l7pol)s)",
{'l7pol': l7pol})
self.add_error("Failed to create l7policy "
"(%(l7pol)s)" %
{'l7pol': l7pol})
self._delete_octavia_lb(lb_body_for_deletion)
n_errors = n_errors + 1
return
LOG.info("Created loadbalancer %s", lb_id)
LOG.info("Created loadbalancer %s/%s: %s", count, total_num, lb_id)
def _map_orig_objects_of_type(self, source_objects):
result = {}
@ -1085,9 +1120,9 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
hms, l7pols)
total_num = len(loadbalancers)
LOG.info("Migrating %d loadbalancer(s)", total_num)
for lb in loadbalancers:
for count, lb in enumerate(loadbalancers, 1):
if lb['provisioning_status'] == 'ACTIVE':
self._migrate_octavia_lb(lb, orig_map)
self._migrate_octavia_lb(lb, orig_map, count, total_num)
else:
LOG.info("Skipping %s loadbalancer %s",
lb['provisioning_status'], lb['id'])
Loading…
Cancel
Save