Neutron integration with OVN
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

373 lines
16KB

  1. # Copyright 2017 Red Hat, Inc.
  2. # All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. import inspect
  16. import threading
  17. from futurist import periodics
  18. from neutron.common import config as n_conf
  19. from neutron_lib import context as n_context
  20. from neutron_lib import exceptions as n_exc
  21. from neutron_lib import worker
  22. from oslo_config import cfg
  23. from oslo_log import log
  24. from oslo_utils import timeutils
  25. from networking_ovn.common import constants as ovn_const
  26. from networking_ovn.db import maintenance as db_maint
  27. from networking_ovn.db import revision as db_rev
  28. from networking_ovn import ovn_db_sync
  29. CONF = cfg.CONF
  30. LOG = log.getLogger(__name__)
  31. DB_CONSISTENCY_CHECK_INTERVAL = 300 # 5 minutes
  32. INCONSISTENCY_TYPE_CREATE_UPDATE = 'create/update'
  33. INCONSISTENCY_TYPE_DELETE = 'delete'
  34. class MaintenanceWorker(worker.BaseWorker):
  35. def start(self):
  36. super(MaintenanceWorker, self).start()
  37. # NOTE(twilson) The super class will trigger the post_fork_initialize
  38. # in the driver, which starts the connection/IDL notify loop which
  39. # keeps the process from exiting
  40. def stop(self):
  41. """Stop service."""
  42. super(MaintenanceWorker, self).stop()
  43. def wait(self):
  44. """Wait for service to complete."""
  45. super(MaintenanceWorker, self).wait()
  46. @staticmethod
  47. def reset():
  48. n_conf.reset_service()
  49. class MaintenanceThread(object):
  50. def __init__(self):
  51. self._callables = []
  52. self._thread = None
  53. self._worker = None
  54. def add_periodics(self, obj):
  55. for name, member in inspect.getmembers(obj):
  56. if periodics.is_periodic(member):
  57. LOG.debug('Periodic task found: %(owner)s.%(member)s',
  58. {'owner': obj.__class__.__name__, 'member': name})
  59. self._callables.append((member, (), {}))
  60. def start(self):
  61. if self._thread is None:
  62. self._worker = periodics.PeriodicWorker(self._callables)
  63. self._thread = threading.Thread(target=self._worker.start)
  64. self._thread.daemon = True
  65. self._thread.start()
  66. def stop(self):
  67. self._worker.stop()
  68. self._worker.wait()
  69. self._thread.join()
  70. self._worker = self._thread = None
  71. class DBInconsistenciesPeriodics(object):
  72. def __init__(self, ovn_client):
  73. self._ovn_client = ovn_client
  74. # FIXME(lucasagomes): We should not be accessing private
  75. # attributes like that, perhaps we should extend the OVNClient
  76. # class and create an interface for the locks ?
  77. self._nb_idl = self._ovn_client._nb_idl
  78. self._idl = self._nb_idl.idl
  79. self._idl.set_lock('ovn_db_inconsistencies_periodics')
  80. self._sync_timer = timeutils.StopWatch()
  81. self._resources_func_map = {
  82. ovn_const.TYPE_NETWORKS: {
  83. 'neutron_get': self._ovn_client._plugin.get_network,
  84. 'ovn_get': self._nb_idl.get_lswitch,
  85. 'ovn_create': self._ovn_client.create_network,
  86. 'ovn_update': self._ovn_client.update_network,
  87. 'ovn_delete': self._ovn_client.delete_network,
  88. },
  89. ovn_const.TYPE_PORTS: {
  90. 'neutron_get': self._ovn_client._plugin.get_port,
  91. 'ovn_get': self._nb_idl.get_lswitch_port,
  92. 'ovn_create': self._ovn_client.create_port,
  93. 'ovn_update': self._ovn_client.update_port,
  94. 'ovn_delete': self._ovn_client.delete_port,
  95. },
  96. ovn_const.TYPE_FLOATINGIPS: {
  97. 'neutron_get': self._ovn_client._l3_plugin.get_floatingip,
  98. 'ovn_get': self._nb_idl.get_floatingip,
  99. 'ovn_create': self._ovn_client.create_floatingip,
  100. 'ovn_update': self._ovn_client.update_floatingip,
  101. 'ovn_delete': self._ovn_client.delete_floatingip,
  102. },
  103. ovn_const.TYPE_ROUTERS: {
  104. 'neutron_get': self._ovn_client._l3_plugin.get_router,
  105. 'ovn_get': self._nb_idl.get_lrouter,
  106. 'ovn_create': self._ovn_client.create_router,
  107. 'ovn_update': self._ovn_client.update_router,
  108. 'ovn_delete': self._ovn_client.delete_router,
  109. },
  110. ovn_const.TYPE_SECURITY_GROUPS: {
  111. 'neutron_get': self._ovn_client._plugin.get_security_group,
  112. 'ovn_get': self._get_security_group,
  113. 'ovn_create': self._ovn_client.create_security_group,
  114. 'ovn_delete': self._ovn_client.delete_security_group,
  115. },
  116. ovn_const.TYPE_SECURITY_GROUP_RULES: {
  117. 'neutron_get':
  118. self._ovn_client._plugin.get_security_group_rule,
  119. 'ovn_get': self._nb_idl.get_acl_by_id,
  120. 'ovn_create': self._ovn_client.create_security_group_rule,
  121. 'ovn_delete': self._ovn_client.delete_security_group_rule,
  122. },
  123. ovn_const.TYPE_ROUTER_PORTS: {
  124. 'neutron_get':
  125. self._ovn_client._plugin.get_port,
  126. 'ovn_get': self._nb_idl.get_lrouter_port,
  127. 'ovn_create': self._create_lrouter_port,
  128. 'ovn_update': self._ovn_client.update_router_port,
  129. 'ovn_delete': self._ovn_client.delete_router_port,
  130. },
  131. }
  132. def _get_security_group(self, uuid):
  133. return (self._nb_idl.get_address_set(uuid) or
  134. self._nb_idl.get_port_group(uuid))
  135. @property
  136. def has_lock(self):
  137. return not self._idl.is_lock_contended
  138. def _fix_create_update(self, row):
  139. res_map = self._resources_func_map[row.resource_type]
  140. admin_context = n_context.get_admin_context()
  141. try:
  142. # Get the latest version of the resource in Neutron DB
  143. n_obj = res_map['neutron_get'](admin_context, row.resource_uuid)
  144. except n_exc.NotFound:
  145. LOG.warning('Skip fixing resource %(res_uuid)s (type: '
  146. '%(res_type)s). Resource does not exist in Neutron '
  147. 'database anymore', {'res_uuid': row.resource_uuid,
  148. 'res_type': row.resource_type})
  149. return
  150. ovn_obj = res_map['ovn_get'](row.resource_uuid)
  151. if not ovn_obj:
  152. res_map['ovn_create'](n_obj)
  153. else:
  154. if row.resource_type == ovn_const.TYPE_SECURITY_GROUP_RULES:
  155. LOG.error("SG rule %s found with a revision number while "
  156. "this resource doesn't support updates",
  157. row.resource_uuid)
  158. elif row.resource_type == ovn_const.TYPE_SECURITY_GROUPS:
  159. # In OVN, we don't care about updates to security groups,
  160. # so just bump the revision number to whatever it's
  161. # supposed to be.
  162. db_rev.bump_revision(n_obj, row.resource_type)
  163. else:
  164. ext_ids = getattr(ovn_obj, 'external_ids', {})
  165. ovn_revision = int(ext_ids.get(
  166. ovn_const.OVN_REV_NUM_EXT_ID_KEY, -1))
  167. # If the resource exist in the OVN DB but the revision
  168. # number is different from Neutron DB, updated it.
  169. if ovn_revision != n_obj['revision_number']:
  170. res_map['ovn_update'](n_obj)
  171. else:
  172. # If the resource exist and the revision number
  173. # is equal on both databases just bump the revision on
  174. # the cache table.
  175. db_rev.bump_revision(n_obj, row.resource_type)
  176. def _fix_delete(self, row):
  177. res_map = self._resources_func_map[row.resource_type]
  178. ovn_obj = res_map['ovn_get'](row.resource_uuid)
  179. if not ovn_obj:
  180. db_rev.delete_revision(row.resource_uuid, row.resource_type)
  181. else:
  182. res_map['ovn_delete'](row.resource_uuid)
  183. def _fix_create_update_subnet(self, row):
  184. # Get the lasted version of the port in Neutron DB
  185. admin_context = n_context.get_admin_context()
  186. sn_db_obj = self._ovn_client._plugin.get_subnet(
  187. admin_context, row.resource_uuid)
  188. n_db_obj = self._ovn_client._plugin.get_network(
  189. admin_context, sn_db_obj['network_id'])
  190. if row.revision_number == ovn_const.INITIAL_REV_NUM:
  191. self._ovn_client.create_subnet(sn_db_obj, n_db_obj)
  192. else:
  193. self._ovn_client.update_subnet(sn_db_obj, n_db_obj)
  194. # The migration will run just once per neutron-server instance. If the lock
  195. # is held by some other neutron-server instance in the cloud, we'll attempt
  196. # to perform the migration every 10 seconds until completed.
  197. @periodics.periodic(spacing=10, run_immediately=True)
  198. def migrate_to_port_groups(self):
  199. """Perform the migration from Address Sets to Port Groups. """
  200. # TODO(dalvarez): Remove this in U cycle when we're sure that all
  201. # versions are running using Port Groups (and OVS >= 2.10).
  202. # If Port Groups are not supported or we've already migrated, we don't
  203. # need to attempt to migrate again.
  204. if (not self._nb_idl.is_port_groups_supported() or
  205. not self._nb_idl.get_address_sets()):
  206. raise periodics.NeverAgain()
  207. # Only the worker holding a valid lock within OVSDB will perform the
  208. # migration.
  209. if not self.has_lock:
  210. return
  211. admin_context = n_context.get_admin_context()
  212. nb_sync = ovn_db_sync.OvnNbSynchronizer(
  213. self._ovn_client._plugin, self._nb_idl, self._ovn_client._sb_idl,
  214. None, None)
  215. nb_sync.migrate_to_port_groups(admin_context)
  216. raise periodics.NeverAgain()
  217. def _log_maintenance_inconsistencies(self, create_update_inconsistencies,
  218. delete_inconsistencies):
  219. if not CONF.debug:
  220. return
  221. def _log(inconsistencies, type_):
  222. if not inconsistencies:
  223. return
  224. c = {}
  225. for f in inconsistencies:
  226. if f.resource_type not in c:
  227. c[f.resource_type] = 1
  228. else:
  229. c[f.resource_type] += 1
  230. fail_str = ', '.join('{}={}'.format(k, v) for k, v in c.items())
  231. LOG.debug('Maintenance task: Number of inconsistencies '
  232. 'found at %(type_)s: %(fail_str)s',
  233. {'type_': type_, 'fail_str': fail_str})
  234. _log(create_update_inconsistencies, INCONSISTENCY_TYPE_CREATE_UPDATE)
  235. _log(delete_inconsistencies, INCONSISTENCY_TYPE_DELETE)
  236. @periodics.periodic(spacing=DB_CONSISTENCY_CHECK_INTERVAL,
  237. run_immediately=True)
  238. def check_for_inconsistencies(self):
  239. # Only the worker holding a valid lock within OVSDB will run
  240. # this periodic
  241. if not self.has_lock:
  242. return
  243. create_update_inconsistencies = db_maint.get_inconsistent_resources()
  244. delete_inconsistencies = db_maint.get_deleted_resources()
  245. if not any([create_update_inconsistencies, delete_inconsistencies]):
  246. LOG.debug('Maintenance task: No inconsistencies found. Skipping')
  247. return
  248. LOG.debug('Maintenance task: Synchronizing Neutron '
  249. 'and OVN databases')
  250. self._log_maintenance_inconsistencies(create_update_inconsistencies,
  251. delete_inconsistencies)
  252. self._sync_timer.restart()
  253. dbg_log_msg = ('Maintenance task: Fixing resource %(res_uuid)s '
  254. '(type: %(res_type)s) at %(type_)s')
  255. # Fix the create/update resources inconsistencies
  256. for row in create_update_inconsistencies:
  257. LOG.debug(dbg_log_msg, {'res_uuid': row.resource_uuid,
  258. 'res_type': row.resource_type,
  259. 'type_': INCONSISTENCY_TYPE_CREATE_UPDATE})
  260. try:
  261. # NOTE(lucasagomes): The way to fix subnets is bit
  262. # different than other resources. A subnet in OVN language
  263. # is just a DHCP rule but, this rule only exist if the
  264. # subnet in Neutron has the "enable_dhcp" attribute set
  265. # to True. So, it's possible to have a consistent subnet
  266. # resource even when it does not exist in the OVN database.
  267. if row.resource_type == ovn_const.TYPE_SUBNETS:
  268. self._fix_create_update_subnet(row)
  269. else:
  270. self._fix_create_update(row)
  271. except Exception:
  272. LOG.exception('Maintenance task: Failed to fix resource '
  273. '%(res_uuid)s (type: %(res_type)s)',
  274. {'res_uuid': row.resource_uuid,
  275. 'res_type': row.resource_type})
  276. # Fix the deleted resources inconsistencies
  277. for row in delete_inconsistencies:
  278. LOG.debug(dbg_log_msg, {'res_uuid': row.resource_uuid,
  279. 'res_type': row.resource_type,
  280. 'type_': INCONSISTENCY_TYPE_DELETE})
  281. try:
  282. if row.resource_type == ovn_const.TYPE_SUBNETS:
  283. self._ovn_client.delete_subnet(row.resource_uuid)
  284. else:
  285. self._fix_delete(row)
  286. except Exception:
  287. LOG.exception('Maintenance task: Failed to fix deleted '
  288. 'resource %(res_uuid)s (type: %(res_type)s)',
  289. {'res_uuid': row.resource_uuid,
  290. 'res_type': row.resource_type})
  291. self._sync_timer.stop()
  292. LOG.info('Maintenance task: Synchronization finished '
  293. '(took %.2f seconds)', self._sync_timer.elapsed())
  294. def _create_lrouter_port(self, port):
  295. admin_context = n_context.get_admin_context()
  296. router_id = port['device_id']
  297. self._ovn_client._l3_plugin.add_router_interface(
  298. admin_context, router_id, {'port_id': port['id']}, may_exist=True)
  299. @periodics.periodic(spacing=600, run_immediately=True)
  300. def check_for_port_security_unknown_address(self):
  301. if not self.has_lock:
  302. return
  303. for port in self._nb_idl.lsp_list().execute(check_error=True):
  304. if port.type == ovn_const.LSP_TYPE_LOCALNET:
  305. continue
  306. addresses = port.addresses
  307. type_ = port.type.strip()
  308. if not port.port_security:
  309. if not type_ and ovn_const.UNKNOWN_ADDR not in addresses:
  310. addresses.append(ovn_const.UNKNOWN_ADDR)
  311. elif type_ and ovn_const.UNKNOWN_ADDR in addresses:
  312. addresses.remove(ovn_const.UNKNOWN_ADDR)
  313. else:
  314. if type_ and ovn_const.UNKNOWN_ADDR in addresses:
  315. addresses.remove(ovn_const.UNKNOWN_ADDR)
  316. elif not type_ and ovn_const.UNKNOWN_ADDR in addresses:
  317. addresses.remove(ovn_const.UNKNOWN_ADDR)
  318. self._nb_idl.lsp_set_addresses(
  319. port.name, addresses=addresses).execute(check_error=True)
  320. raise periodics.NeverAgain()