From dd569f0702ed0b3442b8cace4db88d4fed123efb Mon Sep 17 00:00:00 2001 From: asarfaty Date: Sun, 21 Feb 2021 11:50:34 +0200 Subject: [PATCH] V2T migration: Support octavia barbican certificate Also improve logging of errors Change-Id: I261fb6c9eb8d6051ebf9d9a0bf68d417c72a0a1a --- vmware_nsx/api_replay/client.py | 245 ++++++++++++++++++-------------- 1 file changed, 140 insertions(+), 105 deletions(-) diff --git a/vmware_nsx/api_replay/client.py b/vmware_nsx/api_replay/client.py index 1e9fb61de1..0a6ea848b9 100644 --- a/vmware_nsx/api_replay/client.py +++ b/vmware_nsx/api_replay/client.py @@ -16,6 +16,7 @@ import socket import six +from barbicanclient.v1 import client as barbican from keystoneauth1 import identity from keystoneauth1 import session from neutronclient.common import exceptions as n_exc @@ -25,7 +26,7 @@ from oslo_config import cfg import oslo_messaging as messaging from oslo_messaging.rpc import dispatcher from oslo_serialization import jsonutils -from oslo_utils import excutils +from oslo_utils import encodeutils from neutron.common import config as neutron_config from neutron_lib import constants as nl_constants @@ -41,9 +42,6 @@ LOG.setLevel(logging.INFO) # For internal testing only use_old_keystone_on_dest = False -# Error counter for the migration -n_errors = 0 - class ApiReplayClient(utils.PrepareObjectForMigration): @@ -121,8 +119,17 @@ class ApiReplayClient(utils.PrepareObjectForMigration): password=octavia_os_password, auth_url=octavia_os_auth_url, cert_file=cert_file) + self.barbican = self.connect_to_barbican( + username=octavia_os_username, + user_domain_id=octavia_os_user_domain_id, + tenant_name=octavia_os_tenant_name, + tenant_domain_id=octavia_os_tenant_domain_id, + password=octavia_os_password, + auth_url=octavia_os_auth_url, + cert_file=cert_file) else: self.octavia = None + self.barbican = None self.dest_plugin = dest_plugin @@ -147,6 +154,9 @@ class ApiReplayClient(utils.PrepareObjectForMigration): else: self.int_vni_map = None + self.n_errors = 0 + self.errors = [] + LOG.info("Starting NSX migration to %s.", self.dest_plugin) # Migrate all the objects self.migrate_quotas() @@ -159,9 +169,14 @@ class ApiReplayClient(utils.PrepareObjectForMigration): self.migrate_fwaas() if self.octavia: self.migrate_octavia() - global n_errors - LOG.info("NSX migration is Done with %s errors.", n_errors) - exit(n_errors) + + if self.n_errors: + LOG.error("NSX migration is Done with %s errors:", self.n_errors) + for err in self.errors: + LOG.error(err) + else: + LOG.info("NSX migration is Done with no errors") + exit(self.n_errors) def _get_session(self, username, user_domain_id, tenant_name, tenant_domain_id, @@ -204,6 +219,20 @@ class ApiReplayClient(utils.PrepareObjectForMigration): ) return client + def connect_to_barbican(self, username, user_domain_id, + tenant_name, tenant_domain_id, + password, auth_url, cert_file): + sess = self._get_session(username, user_domain_id, + tenant_name, tenant_domain_id, + password, auth_url, cert_file) + endpoint = sess.get_endpoint(service_type='key-manager') + client = barbican.Client( + session=sess, + service_type='key-manager', + endpoint=endpoint, + ) + return client + def find_subnet_by_id(self, subnet_id, subnets): for subnet in subnets: if subnet['id'] == subnet_id: @@ -225,9 +254,12 @@ class ApiReplayClient(utils.PrepareObjectForMigration): return False - def migrate_quotas(self): - global n_errors + def add_error(self, msg): + self.n_errors = self.n_errors + 1 + LOG.error(msg) + self.errors.append(msg) + def migrate_quotas(self): source_quotas = self.source_neutron.list_quotas()['quotas'] dest_quotas = self.dest_neutron.list_quotas()['quotas'] @@ -244,9 +276,8 @@ class ApiReplayClient(utils.PrepareObjectForMigration): {'count': count, 'total': total_num, 'q': new_quota}) except Exception as e: - LOG.error("Failed to create quota %(q)s: %(e)s", - {'q': quota, 'e': e}) - n_errors = n_errors + 1 + self.add_error("Failed to create quota %(q)s: %(e)s" % + {'q': quota, 'e': e}) def migrate_qos_rule(self, dest_policy, source_rule): """Add the QoS rule from the source to the QoS policy @@ -254,7 +285,6 @@ class ApiReplayClient(utils.PrepareObjectForMigration): If there is already a rule of that type, skip it since the QoS policy can have only one rule of each type """ - global n_errors #TODO(asarfaty) also take rule direction into account once #ingress support is upstream rule_type = source_rule.get('type') @@ -279,14 +309,12 @@ class ApiReplayClient(utils.PrepareObjectForMigration): {'rule': rule_type, 'pol': pol_id}) LOG.info("created QoS policy %s rule %s", pol_id, rule) except Exception as e: - LOG.error("Failed to create QoS rule %(rule)s for policy %(pol)s: " - "%(e)s", {'rule': body, 'pol': pol_id, 'e': e}) - n_errors = n_errors + 1 + self.add_error("Failed to create QoS rule %(rule)s for policy " + "%(pol)s: %(e)s" % {'rule': body, 'pol': pol_id, + 'e': e}) def migrate_qos_policies(self): """Migrates QoS policies from source to dest neutron.""" - global n_errors - # first fetch the QoS policies from both the # source and destination neutron server try: @@ -321,9 +349,8 @@ class ApiReplayClient(utils.PrepareObjectForMigration): new_pol = self.dest_neutron.create_qos_policy( body={'policy': body}) except Exception as e: - LOG.error("Failed to create QoS policy %(pol)s: %(e)s", - {'pol': pol['id'], 'e': e}) - n_errors = n_errors + 1 + self.add_error("Failed to create QoS policy %(pol)s: " + "%(e)s" % {'pol': pol['id'], 'e': e}) continue else: LOG.info("Created QoS policy %s", new_pol) @@ -332,8 +359,6 @@ class ApiReplayClient(utils.PrepareObjectForMigration): def migrate_security_groups(self): """Migrates security groups from source to dest neutron.""" - global n_errors - # first fetch the security groups from both the # source and dest neutron server source_sec_groups = self.source_neutron.list_security_groups() @@ -378,10 +403,8 @@ class ApiReplayClient(utils.PrepareObjectForMigration): {'count': count, 'total': total_num, 'sg': new_sg}) except Exception as e: - LOG.error("Failed to create security group (%(sg)s): " - "%(e)s", - {'sg': sg, 'e': e}) - n_errors = n_errors + 1 + self.add_error("Failed to create security group (%(sg)s): " + "%(e)s" % {'sg': sg, 'e': e}) # Use bulk rules creation for the rules of the SG if not sg_rules: @@ -412,9 +435,8 @@ class ApiReplayClient(utils.PrepareObjectForMigration): LOG.debug("created %s security group rules for SG %s", len(rules), sg['id']) except Exception as e: - LOG.error("Failed to create security group %s " - "rules: %s", sg['id'], e) - n_errors = n_errors + 1 + self.add_error("Failed to create security group %s " + "rules: %s" % (sg['id'], e)) def get_dest_availablity_zones(self, resource): azs = self.dest_neutron.list_availability_zones()['availability_zones'] @@ -429,7 +451,6 @@ class ApiReplayClient(utils.PrepareObjectForMigration): ports are set. And return a dictionary of external gateway info per router """ - global n_errors try: source_routers = self.source_neutron.list_routers()['routers'] except Exception: @@ -468,10 +489,9 @@ class ApiReplayClient(utils.PrepareObjectForMigration): try: self.dest_neutron.create_network({'network': net_body}) except Exception as e: - LOG.error("Failed to create internal network for router " - "%(rtr)s: %(e)s", - {'rtr': router['id'], 'e': e}) - n_errors = n_errors + 1 + self.add_error("Failed to create internal network for " + "router %(rtr)s: %(e)s" % + {'rtr': router['id'], 'e': e}) dest_router = self.have_id(router['id'], dest_routers) if dest_router is False: body = self.prepare_router(router, dest_azs=dest_azs) @@ -482,14 +502,12 @@ class ApiReplayClient(utils.PrepareObjectForMigration): {'count': count, 'total': total_num, 'rtr': new_router}) except Exception as e: - LOG.error("Failed to create router %(rtr)s: %(e)s", - {'rtr': router, 'e': e}) - n_errors = n_errors + 1 + self.add_error("Failed to create router %(rtr)s: %(e)s" % + {'rtr': router, 'e': e}) return update_routes, gw_info def migrate_routers_routes(self, routers_routes): """Add static routes to the created routers.""" - global n_errors total_num = len(routers_routes) LOG.info("Migrating %s routers routes", total_num) for count, (router_id, routes) in enumerate( @@ -501,13 +519,11 @@ class ApiReplayClient(utils.PrepareObjectForMigration): {'count': count, 'total': total_num, 'rtr': router_id}) except Exception as e: - LOG.error("Failed to add routes %(routes)s to router " - "%(rtr)s: %(e)s", - {'routes': routes, 'rtr': router_id, 'e': e}) - n_errors = n_errors + 1 + self.add_error("Failed to add routes %(routes)s to router " + "%(rtr)s: %(e)s" % + {'routes': routes, 'rtr': router_id, 'e': e}) def migrate_subnetpools(self): - global n_errors subnetpools_map = {} try: source_subnetpools = self.source_neutron.list_subnetpools()[ @@ -541,14 +557,12 @@ class ApiReplayClient(utils.PrepareObjectForMigration): dest_subnetpools = self.dest_neutron.list_subnetpools()[ 'subnetpools'] except Exception as e: - LOG.error("Failed to create subnetpool %(pool)s: %(e)s", - {'pool': pool, 'e': e}) - n_errors = n_errors + 1 + self.add_error("Failed to create subnetpool %(pool)s: " + "%(e)s" % {'pool': pool, 'e': e}) return subnetpools_map def migrate_networks_subnets_ports(self, routers_gw_info): """Migrates networks/ports/router-uplinks from src to dest neutron.""" - global n_errors source_ports = self.source_neutron.list_ports()['ports'] source_subnets = self.source_neutron.list_subnets()['subnets'] source_networks = self.source_neutron.list_networks()['networks'] @@ -602,11 +616,8 @@ class ApiReplayClient(utils.PrepareObjectForMigration): {'count': count, 'total': total_num, 'net': created_net}) except Exception as e: - # Print the network and exception to help debugging - with excutils.save_and_reraise_exception(): - LOG.error("Failed to create network %s", body) - n_errors = n_errors + 1 - raise e + self.add_error("Failed to create network: %s : %s" % (body, e)) + continue subnets_map = {} dhcp_subnets = [] @@ -664,9 +675,8 @@ class ApiReplayClient(utils.PrepareObjectForMigration): dhcp_subnets.append({'id': created_subnet['id'], 'host_routes': sub_host_routes}) except n_exc.BadRequest as e: - LOG.error("Failed to create subnet: %(subnet)s: %(e)s", - {'subnet': subnet, 'e': e}) - n_errors = n_errors + 1 + self.add_error("Failed to create subnet: %(subnet)s: " + "%(e)s" % {'subnet': subnet, 'e': e}) # create the ports on the network ports = self.get_ports_on_network(network['id'], source_ports) @@ -715,10 +725,9 @@ class ApiReplayClient(utils.PrepareObjectForMigration): 'net': port['network_id']}) except Exception as e: - LOG.error("Failed to add router gateway with port " - "(%(port)s): %(e)s", - {'port': port, 'e': e}) - n_errors = n_errors + 1 + self.add_error("Failed to add router gateway with " + "port (%(port)s): %(e)s" % + {'port': port, 'e': e}) continue # Let the neutron dhcp-agent recreate this on its own @@ -758,10 +767,9 @@ class ApiReplayClient(utils.PrepareObjectForMigration): # Note(asarfaty): also if the same network in # source is attached to 2 routers, which the v3 # plugins does not support. - LOG.error("Failed to add router interface port" - "(%(port)s): %(e)s", - {'port': port, 'e': e}) - n_errors = n_errors + 1 + self.add_error("Failed to add router interface " + "port (%(port)s): %(e)s" % + {'port': port, 'e': e}) continue try: @@ -770,9 +778,8 @@ class ApiReplayClient(utils.PrepareObjectForMigration): except Exception as e: # NOTE(arosen): this occurs here if you run the # script multiple times as we don't track this. - LOG.error("Failed to create port (%(port)s) : %(e)s", - {'port': port, 'e': e}) - n_errors = n_errors + 1 + self.add_error("Failed to create port (%(port)s) : " + "%(e)s" % {'port': port, 'e': e}) else: ip_addr = None if created_port.get('fixed_ips'): @@ -794,14 +801,12 @@ class ApiReplayClient(utils.PrepareObjectForMigration): self.dest_neutron.update_subnet(subnet['id'], {'subnet': data}) except Exception as e: - LOG.error("Failed to enable DHCP on subnet %(subnet)s: " - "%(e)s", - {'subnet': subnet['id'], 'e': e}) - n_errors = n_errors + 1 + self.add_error("Failed to enable DHCP on subnet " + "%(subnet)s: %(e)s" % + {'subnet': subnet['id'], 'e': e}) def migrate_floatingips(self): """Migrates floatingips from source to dest neutron.""" - global n_errors try: source_fips = self.source_neutron.list_floatingips()['floatingips'] except Exception: @@ -816,15 +821,19 @@ class ApiReplayClient(utils.PrepareObjectForMigration): LOG.info("Created floatingip %(count)s/%(total)s : %(fip)s", {'count': count, 'total': total_num, 'fip': fip}) except Exception as e: - LOG.error("Failed to create floating ip (%(fip)s) : %(e)s", - {'fip': source_fip, 'e': e}) - n_errors = n_errors + 1 + self.add_error("Failed to create floating ip (%(fip)s) : " + "%(e)s" % {'fip': source_fip, 'e': e}) + + def _plural_res_type(self, resource_type): + if resource_type.endswith('y'): + return "%sies" % resource_type[:-1] + return "%ss" % resource_type def _migrate_fwaas_resource(self, resource_type, source_objects, dest_objects, prepare_method, create_method): - global n_errors total_num = len(source_objects) - LOG.info("Migrating %s %ss", total_num, resource_type) + LOG.info("Migrating %s %s", total_num, + self._plural_res_type(resource_type)) for count, source_obj in enumerate(source_objects, 1): # Check if the object already exists if self.have_id(source_obj['id'], dest_objects): @@ -845,10 +854,10 @@ class ApiReplayClient(utils.PrepareObjectForMigration): {'resource': resource_type, 'count': count, 'total': total_num, 'obj': new_obj}) except Exception as e: - LOG.error("Failed to create %(resource)s (%(obj)s) : %(e)s", - {'resource': resource_type, 'obj': source_obj, - 'e': e}) - n_errors = n_errors + 1 + self.add_error("Failed to create %(resource)s (%(obj)s) : " + "%(e)s" % + {'resource': resource_type, 'obj': source_obj, + 'e': e}) def migrate_fwaas(self): """Migrates FWaaS V2 objects from source to dest neutron.""" @@ -901,16 +910,44 @@ class ApiReplayClient(utils.PrepareObjectForMigration): kw = {'loadbalancer': body} self.octavia_rpc_client.call({}, 'loadbalancer_delete_cascade', **kw) - def _migrate_octavia_lb(self, lb, orig_map): + def _create_lb_certificate(self, listener_dict): + # Extract Octavia certificate data into a dict which is readable by + # the listener_mgr + + def get_certificate(cert_data): + if cert_data.certificate: + return encodeutils.to_utf8( + cert_data.certificate.payload) + return None + + def get_private_key(cert_data): + if cert_data.private_key: + return encodeutils.to_utf8( + cert_data.private_key.payload) + return None + + def get_private_key_passphrase(cert_data): + if cert_data.private_key_passphrase: + return encodeutils.to_utf8( + cert_data.private_key_passphrase.payload) + return None + + if listener_dict.get('default_tls_container_ref'): + cert_data = self.barbican.containers.get( + container_ref=listener_dict['default_tls_container_ref']) + return {'ref': listener_dict['default_tls_container_ref'], + 'certificate': get_certificate(cert_data), + 'private_key': get_private_key(cert_data), + 'passphrase': get_private_key_passphrase(cert_data)} + + def _migrate_octavia_lb(self, lb, orig_map, count, total_num): # Creating all loadbalancers resources on the new nsx driver # using RPC calls to the plugin listener. - global n_errors - # Create the loadbalancer: lb_body = self.prepare_lb_loadbalancer(lb) kw = {'loadbalancer': lb_body} if not self.octavia_rpc_client.call({}, 'loadbalancer_create', **kw): - LOG.error("Failed to create loadbalancer (%s)", lb_body) + self.add_error("Failed to create loadbalancer (%s)" % lb_body) self._delete_octavia_lb(lb_body) return @@ -923,15 +960,15 @@ class ApiReplayClient(utils.PrepareObjectForMigration): for listener_dict in lb.get('listeners', []): listener_id = listener_dict['id'] listener = orig_map['listeners'][listener_id] + cert = self._create_lb_certificate(listener) body = self.prepare_lb_listener(listener, lb_body) body['loadbalancer'] = lb_body body['loadbalancer_id'] = lb_id - kw = {'listener': body, 'cert': None} + kw = {'listener': body, 'cert': cert} if not self.octavia_rpc_client.call({}, 'listener_create', **kw): - LOG.error("Failed to create loadbalancer %(lb)s listener " - "(%(list)s)", {'list': listener, 'lb': lb_id}) + self.add_error("Failed to create loadbalancer %(lb)s listener " + "(%(list)s)" % {'list': listener, 'lb': lb_id}) self._delete_octavia_lb(lb_body_for_deletion) - n_errors = n_errors + 1 return listeners_map[listener_id] = body lb_body_for_deletion['listeners'].append(body) @@ -947,10 +984,9 @@ class ApiReplayClient(utils.PrepareObjectForMigration): pool_body['listener'] = listeners_map.get(listener_id) kw = {'pool': pool_body} if not self.octavia_rpc_client.call({}, 'pool_create', **kw): - LOG.error("Failed to create loadbalancer %(lb)s pool " - "(%(pool)s)", {'pool': pool, 'lb': lb_id}) + self.add_error("Failed to create loadbalancer %(lb)s pool " + "(%(pool)s)" % {'pool': pool, 'lb': lb_id}) self._delete_octavia_lb(lb_body_for_deletion) - n_errors = n_errors + 1 return lb_body_for_deletion['pools'].append(pool) @@ -965,11 +1001,10 @@ class ApiReplayClient(utils.PrepareObjectForMigration): body['pool'] = pool_body kw = {'member': body} if not self.octavia_rpc_client.call({}, 'member_create', **kw): - LOG.error("Failed to create pool %(pool)s member " - "(%(member)s)", - {'member': member, 'pool': pool_id}) + self.add_error("Failed to create pool %(pool)s member " + "(%(member)s)" % + {'member': member, 'pool': pool_id}) self._delete_octavia_lb(lb_body_for_deletion) - n_errors = n_errors + 1 return # Add pool health monitor @@ -982,10 +1017,10 @@ class ApiReplayClient(utils.PrepareObjectForMigration): kw = {'healthmonitor': body} if not self.octavia_rpc_client.call( {}, 'healthmonitor_create', **kw): - LOG.error("Failed to create pool %(pool)s healthmonitor " - "(%(hm)s)", {'hm': hm, 'pool': pool_id}) + self.add_error("Failed to create pool %(pool)s " + "healthmonitor (%(hm)s)" % + {'hm': hm, 'pool': pool_id}) self._delete_octavia_lb(lb_body_for_deletion) - n_errors = n_errors + 1 return lb_body_for_deletion['pools'][-1]['healthmonitor'] = body @@ -1007,13 +1042,13 @@ class ApiReplayClient(utils.PrepareObjectForMigration): kw = {'l7policy': pol_body} if not self.octavia_rpc_client.call( {}, 'l7policy_create', **kw): - LOG.error("Failed to create l7policy (%(l7pol)s)", - {'l7pol': l7pol}) + self.add_error("Failed to create l7policy " + "(%(l7pol)s)" % + {'l7pol': l7pol}) self._delete_octavia_lb(lb_body_for_deletion) - n_errors = n_errors + 1 return - LOG.info("Created loadbalancer %s", lb_id) + LOG.info("Created loadbalancer %s/%s: %s", count, total_num, lb_id) def _map_orig_objects_of_type(self, source_objects): result = {} @@ -1085,9 +1120,9 @@ class ApiReplayClient(utils.PrepareObjectForMigration): hms, l7pols) total_num = len(loadbalancers) LOG.info("Migrating %d loadbalancer(s)", total_num) - for lb in loadbalancers: + for count, lb in enumerate(loadbalancers, 1): if lb['provisioning_status'] == 'ACTIVE': - self._migrate_octavia_lb(lb, orig_map) + self._migrate_octavia_lb(lb, orig_map, count, total_num) else: LOG.info("Skipping %s loadbalancer %s", lb['provisioning_status'], lb['id'])