OpenStack Networking (Neutron)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

type_tunnel.py 24KB


  1. # Copyright (c) 2013 OpenStack Foundation
  2. # All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. import abc
  16. import itertools
  17. import operator
  18. import netaddr
  19. from neutron_lib.agent import topics
  20. from neutron_lib import constants as p_const
  21. from neutron_lib import context
  22. from neutron_lib.db import api as db_api
  23. from neutron_lib import exceptions as exc
  24. from neutron_lib.plugins import constants as plugin_constants
  25. from neutron_lib.plugins import directory
  26. from neutron_lib.plugins.ml2 import api
  27. from neutron_lib.plugins import utils as plugin_utils
  28. from oslo_config import cfg
  29. from oslo_db import exception as db_exc
  30. from oslo_log import log
  31. from oslo_utils import uuidutils
  32. import six
  33. from six import moves
  34. from sqlalchemy import or_
  35. from neutron._i18n import _
  36. from neutron.objects import base as base_obj
  37. from neutron.objects import network_segment_range as range_obj
  38. from neutron.plugins.ml2.drivers import helpers
  39. from neutron.services.network_segment_range import plugin as range_plugin
  40. LOG = log.getLogger(__name__)
  41. TUNNEL = 'tunnel'
  42. def chunks(iterable, chunk_size):
  43. """Chunks data into chunk with size<=chunk_size."""
  44. iterator = iter(iterable)
  45. chunk = list(itertools.islice(iterator, 0, chunk_size))
  46. while chunk:
  47. yield chunk
  48. chunk = list(itertools.islice(iterator, 0, chunk_size))
  49. @six.add_metaclass(abc.ABCMeta)
  50. class _TunnelTypeDriverBase(helpers.SegmentTypeDriver):
  51. BULK_SIZE = 100
  52. def __init__(self, model):
  53. super(_TunnelTypeDriverBase, self).__init__(model)
  54. self.segmentation_key = next(iter(self.primary_keys))
  55. @abc.abstractmethod
  56. def add_endpoint(self, ip, host):
  57. """Register the endpoint in the type_driver database.
  58. :param ip: the IP address of the endpoint
  59. :param host: the Host name of the endpoint
  60. """
  61. @abc.abstractmethod
  62. def get_endpoints(self):
  63. """Get every endpoint managed by the type_driver
  64. :returns: a list of dict [{ip_address:endpoint_ip, host:endpoint_host},
  65. ..]
  66. """
  67. @abc.abstractmethod
  68. def get_endpoint_by_host(self, host):
  69. """Get endpoint for a given host managed by the type_driver
  70. :param host: the Host name of the endpoint
  71. if host found in type_driver database
  72. :returns: db object for that particular host
  73. else
  74. :returns: None
  75. """
  76. @abc.abstractmethod
  77. def get_endpoint_by_ip(self, ip):
  78. """Get endpoint for a given tunnel ip managed by the type_driver
  79. :param ip: the IP address of the endpoint
  80. if ip found in type_driver database
  81. :returns: db object for that particular ip
  82. else
  83. :returns: None
  84. """
  85. @abc.abstractmethod
  86. def delete_endpoint(self, ip):
  87. """Delete the endpoint in the type_driver database.
  88. :param ip: the IP address of the endpoint
  89. """
  90. @abc.abstractmethod
  91. def delete_endpoint_by_host_or_ip(self, host, ip):
  92. """Delete the endpoint in the type_driver database.
  93. This function will delete any endpoint matching the specified
  94. ip or host.
  95. :param host: the host name of the endpoint
  96. :param ip: the IP address of the endpoint
  97. """
  98. def _initialize(self, raw_tunnel_ranges):
  99. self.tunnel_ranges = []
  100. self._parse_tunnel_ranges(raw_tunnel_ranges, self.tunnel_ranges)
  101. if not range_plugin.is_network_segment_range_enabled():
  102. # service plugins are initialized/loaded after the ML2 driver
  103. # initialization. Thus, we base on the information whether
  104. # ``network_segment_range`` service plugin is enabled/defined in
  105. # ``neutron.conf`` to decide whether to skip the first time sync
  106. # allocation during driver initialization, instead of using the
  107. # directory.get_plugin() method - the normal way used elsewhere to
  108. # check if a plugin is loaded.
  109. self.sync_allocations()
  110. def _parse_tunnel_ranges(self, tunnel_ranges, current_range):
  111. for entry in tunnel_ranges:
  112. entry = entry.strip()
  113. try:
  114. tun_min, tun_max = entry.split(':')
  115. tun_min = tun_min.strip()
  116. tun_max = tun_max.strip()
  117. tunnel_range = int(tun_min), int(tun_max)
  118. except ValueError as ex:
  119. raise exc.NetworkTunnelRangeError(tunnel_range=entry, error=ex)
  120. plugin_utils.verify_tunnel_range(tunnel_range, self.get_type())
  121. current_range.append(tunnel_range)
  122. LOG.info("%(type)s ID ranges: %(range)s",
  123. {'type': self.get_type(), 'range': current_range})
  124. @db_api.retry_db_errors
  125. def _populate_new_default_network_segment_ranges(self):
  126. ctx = context.get_admin_context()
  127. for tun_min, tun_max in self.tunnel_ranges:
  128. res = {
  129. 'id': uuidutils.generate_uuid(),
  130. 'name': '',
  131. 'default': True,
  132. 'shared': True,
  133. 'network_type': self.get_type(),
  134. 'minimum': tun_min,
  135. 'maximum': tun_max}
  136. with db_api.CONTEXT_WRITER.using(ctx):
  137. new_default_range_obj = (
  138. range_obj.NetworkSegmentRange(ctx, **res))
  139. new_default_range_obj.create()
  140. @db_api.retry_db_errors
  141. def _delete_expired_default_network_segment_ranges(self):
  142. ctx = context.get_admin_context()
  143. with db_api.CONTEXT_WRITER.using(ctx):
  144. filters = {
  145. 'default': True,
  146. 'network_type': self.get_type(),
  147. }
  148. old_default_range_objs = range_obj.NetworkSegmentRange.get_objects(
  149. ctx, **filters)
  150. for obj in old_default_range_objs:
  151. obj.delete()
  152. @db_api.retry_db_errors
  153. def _get_network_segment_ranges_from_db(self):
  154. ranges = []
  155. ctx = context.get_admin_context()
  156. with db_api.CONTEXT_READER.using(ctx):
  157. range_objs = (range_obj.NetworkSegmentRange.get_objects(
  158. ctx, network_type=self.get_type()))
  159. for obj in range_objs:
  160. ranges.append((obj['minimum'], obj['maximum']))
  161. return ranges
  162. def initialize_network_segment_range_support(self):
  163. self._delete_expired_default_network_segment_ranges()
  164. self._populate_new_default_network_segment_ranges()
  165. # Override self.tunnel_ranges with the network segment range
  166. # information from DB and then do a sync_allocations since the
  167. # segment range service plugin has not yet been loaded at this
  168. # initialization time.
  169. self.tunnel_ranges = self._get_network_segment_ranges_from_db()
  170. self.sync_allocations()
  171. def update_network_segment_range_allocations(self):
  172. self.sync_allocations()
  173. @db_api.retry_db_errors
  174. def sync_allocations(self):
  175. # determine current configured allocatable tunnel ids
  176. tunnel_ids = set()
  177. ranges = self.get_network_segment_ranges()
  178. for tun_min, tun_max in ranges:
  179. tunnel_ids |= set(moves.range(tun_min, tun_max + 1))
  180. tunnel_id_getter = operator.attrgetter(self.segmentation_key)
  181. tunnel_col = getattr(self.model, self.segmentation_key)
  182. ctx = context.get_admin_context()
  183. with db_api.CONTEXT_WRITER.using(ctx):
  184. # remove from table unallocated tunnels not currently allocatable
  185. # fetch results as list via all() because we'll be iterating
  186. # through them twice
  187. allocs = ctx.session.query(self.model).all()
  188. # collect those vnis that needs to be deleted from db
  189. unallocateds = (
  190. tunnel_id_getter(a) for a in allocs if not a.allocated)
  191. to_remove = (x for x in unallocateds if x not in tunnel_ids)
  192. # Immediately delete tunnels in chunks. This leaves no work for
  193. # flush at the end of transaction
  194. for chunk in chunks(to_remove, self.BULK_SIZE):
  195. (ctx.session.query(self.model).filter(tunnel_col.in_(chunk)).
  196. filter_by(allocated=False).delete(synchronize_session=False))
  197. # collect vnis that need to be added
  198. existings = {tunnel_id_getter(a) for a in allocs}
  199. missings = list(tunnel_ids - existings)
  200. for chunk in chunks(missings, self.BULK_SIZE):
  201. bulk = [{self.segmentation_key: x, 'allocated': False}
  202. for x in chunk]
  203. ctx.session.execute(self.model.__table__.insert(), bulk)
  204. def is_partial_segment(self, segment):
  205. return segment.get(api.SEGMENTATION_ID) is None
  206. def validate_provider_segment(self, segment):
  207. physical_network = segment.get(api.PHYSICAL_NETWORK)
  208. if physical_network:
  209. msg = _("provider:physical_network specified for %s "
  210. "network") % segment.get(api.NETWORK_TYPE)
  211. raise exc.InvalidInput(error_message=msg)
  212. for key, value in segment.items():
  213. if value and key not in [api.NETWORK_TYPE,
  214. api.SEGMENTATION_ID]:
  215. msg = (_("%(key)s prohibited for %(tunnel)s provider network"),
  216. {'key': key, 'tunnel': segment.get(api.NETWORK_TYPE)})
  217. raise exc.InvalidInput(error_message=msg)
  218. def get_mtu(self, physical_network=None):
  219. seg_mtu = super(_TunnelTypeDriverBase, self).get_mtu()
  220. mtu = []
  221. if seg_mtu > 0:
  222. mtu.append(seg_mtu)
  223. if cfg.CONF.ml2.path_mtu > 0:
  224. mtu.append(cfg.CONF.ml2.path_mtu)
  225. version = cfg.CONF.ml2.overlay_ip_version
  226. ip_header_length = p_const.IP_HEADER_LENGTH[version]
  227. return min(mtu) - ip_header_length if mtu else 0
  228. def get_network_segment_ranges(self):
  229. """Get the driver network segment ranges.
  230. Queries all tunnel network segment ranges from DB if the
  231. ``NETWORK_SEGMENT_RANGE`` service plugin is enabled. Otherwise,
  232. they will be loaded from the host config file - `ml2_conf.ini`.
  233. """
  234. ranges = self.tunnel_ranges
  235. if directory.get_plugin(plugin_constants.NETWORK_SEGMENT_RANGE):
  236. ranges = self._get_network_segment_ranges_from_db()
  237. return ranges
  238. @six.add_metaclass(abc.ABCMeta)
  239. class TunnelTypeDriver(_TunnelTypeDriverBase):
  240. """Define stable abstract interface for ML2 type drivers.
  241. tunnel type networks rely on tunnel endpoints. This class defines abstract
  242. methods to manage these endpoints.
  243. ML2 type driver that passes session to functions:
  244. - reserve_provider_segment
  245. - allocate_tenant_segment
  246. - release_segment
  247. - get_allocation
  248. """
  249. def reserve_provider_segment(self, session, segment, filters=None):
  250. if self.is_partial_segment(segment):
  251. filters = filters or {}
  252. alloc = self.allocate_partially_specified_segment(session,
  253. **filters)
  254. if not alloc:
  255. raise exc.NoNetworkAvailable()
  256. else:
  257. segmentation_id = segment.get(api.SEGMENTATION_ID)
  258. alloc = self.allocate_fully_specified_segment(
  259. session, **{self.segmentation_key: segmentation_id})
  260. if not alloc:
  261. raise exc.TunnelIdInUse(tunnel_id=segmentation_id)
  262. return {api.NETWORK_TYPE: self.get_type(),
  263. api.PHYSICAL_NETWORK: None,
  264. api.SEGMENTATION_ID: getattr(alloc, self.segmentation_key),
  265. api.MTU: self.get_mtu()}
  266. def allocate_tenant_segment(self, session, filters=None):
  267. filters = filters or {}
  268. alloc = self.allocate_partially_specified_segment(session, **filters)
  269. if not alloc:
  270. return
  271. return {api.NETWORK_TYPE: self.get_type(),
  272. api.PHYSICAL_NETWORK: None,
  273. api.SEGMENTATION_ID: getattr(alloc, self.segmentation_key),
  274. api.MTU: self.get_mtu()}
  275. def release_segment(self, session, segment):
  276. tunnel_id = segment[api.SEGMENTATION_ID]
  277. ranges = self.get_network_segment_ranges()
  278. inside = any(lo <= tunnel_id <= hi for lo, hi in ranges)
  279. info = {'type': self.get_type(), 'id': tunnel_id}
  280. with session.begin(subtransactions=True):
  281. query = (session.query(self.model).
  282. filter_by(**{self.segmentation_key: tunnel_id}))
  283. if inside:
  284. count = query.update({"allocated": False})
  285. if count:
  286. LOG.debug("Releasing %(type)s tunnel %(id)s to pool",
  287. info)
  288. else:
  289. count = query.delete()
  290. if count:
  291. LOG.debug("Releasing %(type)s tunnel %(id)s outside pool",
  292. info)
  293. if not count:
  294. LOG.warning("%(type)s tunnel %(id)s not found", info)
  295. def get_allocation(self, session, tunnel_id):
  296. return (session.query(self.model).
  297. filter_by(**{self.segmentation_key: tunnel_id}).
  298. first())
  299. @six.add_metaclass(abc.ABCMeta)
  300. class ML2TunnelTypeDriver(_TunnelTypeDriverBase):
  301. """Define stable abstract interface for ML2 type drivers.
  302. tunnel type networks rely on tunnel endpoints. This class defines abstract
  303. methods to manage these endpoints.
  304. ML2 type driver that passes context as argument to functions:
  305. - reserve_provider_segment
  306. - allocate_tenant_segment
  307. - release_segment
  308. - get_allocation
  309. """
  310. def reserve_provider_segment(self, context, segment, filters=None):
  311. if self.is_partial_segment(segment):
  312. filters = filters or {}
  313. alloc = self.allocate_partially_specified_segment(context,
  314. **filters)
  315. if not alloc:
  316. raise exc.NoNetworkAvailable()
  317. else:
  318. segmentation_id = segment.get(api.SEGMENTATION_ID)
  319. alloc = self.allocate_fully_specified_segment(
  320. context, **{self.segmentation_key: segmentation_id})
  321. if not alloc:
  322. raise exc.TunnelIdInUse(tunnel_id=segmentation_id)
  323. return {api.NETWORK_TYPE: self.get_type(),
  324. api.PHYSICAL_NETWORK: None,
  325. api.SEGMENTATION_ID: getattr(alloc, self.segmentation_key),
  326. api.MTU: self.get_mtu()}
  327. def allocate_tenant_segment(self, context, filters=None):
  328. filters = filters or {}
  329. alloc = self.allocate_partially_specified_segment(context, **filters)
  330. if not alloc:
  331. return
  332. return {api.NETWORK_TYPE: self.get_type(),
  333. api.PHYSICAL_NETWORK: None,
  334. api.SEGMENTATION_ID: getattr(alloc, self.segmentation_key),
  335. api.MTU: self.get_mtu()}
  336. def release_segment(self, context, segment):
  337. tunnel_id = segment[api.SEGMENTATION_ID]
  338. ranges = self.get_network_segment_ranges()
  339. inside = any(lo <= tunnel_id <= hi for lo, hi in ranges)
  340. info = {'type': self.get_type(), 'id': tunnel_id}
  341. with db_api.CONTEXT_WRITER.using(context):
  342. query = (context.session.query(self.model).
  343. filter_by(**{self.segmentation_key: tunnel_id}))
  344. if inside:
  345. count = query.update({"allocated": False})
  346. if count:
  347. LOG.debug("Releasing %(type)s tunnel %(id)s to pool",
  348. info)
  349. else:
  350. count = query.delete()
  351. if count:
  352. LOG.debug("Releasing %(type)s tunnel %(id)s outside pool",
  353. info)
  354. if not count:
  355. LOG.warning("%(type)s tunnel %(id)s not found", info)
  356. @db_api.CONTEXT_READER
  357. def get_allocation(self, context, tunnel_id):
  358. return (context.session.query(self.model).
  359. filter_by(**{self.segmentation_key: tunnel_id}).
  360. first())
  361. class EndpointTunnelTypeDriver(ML2TunnelTypeDriver):
  362. def __init__(self, segment_model, endpoint_model):
  363. super(EndpointTunnelTypeDriver, self).__init__(segment_model)
  364. if issubclass(endpoint_model, base_obj.NeutronDbObject):
  365. self.endpoint_model = endpoint_model.db_model
  366. else:
  367. self.endpoint_model = endpoint_model
  368. self.segmentation_key = next(iter(self.primary_keys))
  369. def get_endpoint_by_host(self, host):
  370. LOG.debug("get_endpoint_by_host() called for host %s", host)
  371. session = db_api.get_reader_session()
  372. return (session.query(self.endpoint_model).
  373. filter_by(host=host).first())
  374. def get_endpoint_by_ip(self, ip):
  375. LOG.debug("get_endpoint_by_ip() called for ip %s", ip)
  376. session = db_api.get_reader_session()
  377. return (session.query(self.endpoint_model).
  378. filter_by(ip_address=ip).first())
  379. def delete_endpoint(self, ip):
  380. LOG.debug("delete_endpoint() called for ip %s", ip)
  381. session = db_api.get_writer_session()
  382. session.query(self.endpoint_model).filter_by(ip_address=ip).delete()
  383. def delete_endpoint_by_host_or_ip(self, host, ip):
  384. LOG.debug("delete_endpoint_by_host_or_ip() called for "
  385. "host %(host)s or %(ip)s", {'host': host, 'ip': ip})
  386. session = db_api.get_writer_session()
  387. session.query(self.endpoint_model).filter(
  388. or_(self.endpoint_model.host == host,
  389. self.endpoint_model.ip_address == ip)).delete()
  390. def _get_endpoints(self):
  391. LOG.debug("_get_endpoints() called")
  392. session = db_api.get_reader_session()
  393. return session.query(self.endpoint_model)
  394. def _add_endpoint(self, ip, host, **kwargs):
  395. LOG.debug("_add_endpoint() called for ip %s", ip)
  396. session = db_api.get_writer_session()
  397. try:
  398. endpoint = self.endpoint_model(ip_address=ip, host=host, **kwargs)
  399. endpoint.save(session)
  400. except db_exc.DBDuplicateEntry:
  401. endpoint = (session.query(self.endpoint_model).
  402. filter_by(ip_address=ip).one())
  403. LOG.warning("Endpoint with ip %s already exists", ip)
  404. return endpoint
  405. class TunnelRpcCallbackMixin(object):
  406. def setup_tunnel_callback_mixin(self, notifier, type_manager):
  407. self._notifier = notifier
  408. self._type_manager = type_manager
  409. def tunnel_sync(self, rpc_context, **kwargs):
  410. """Update new tunnel.
  411. Updates the database with the tunnel IP. All listening agents will also
  412. be notified about the new tunnel IP.
  413. """
  414. tunnel_ip = kwargs.get('tunnel_ip')
  415. if not tunnel_ip:
  416. msg = _("Tunnel IP value needed by the ML2 plugin")
  417. raise exc.InvalidInput(error_message=msg)
  418. host = kwargs.get('host')
  419. version = netaddr.IPAddress(tunnel_ip).version
  420. if version != cfg.CONF.ml2.overlay_ip_version:
  421. msg = (_("Tunnel IP version does not match ML2 "
  422. "overlay_ip_version, host: %(host)s, tunnel_ip: %(ip)s"),
  423. {'host': host, 'ip': tunnel_ip})
  424. raise exc.InvalidInput(error_message=msg)
  425. tunnel_type = kwargs.get('tunnel_type')
  426. if not tunnel_type:
  427. msg = _("Network type value needed by the ML2 plugin")
  428. raise exc.InvalidInput(error_message=msg)
  429. driver = self._type_manager.drivers.get(tunnel_type)
  430. if driver:
  431. # The given conditional statements will verify the following
  432. # things:
  433. # 1. If host is not passed from an agent, it is a legacy mode.
  434. # 2. If passed host and tunnel_ip are not found in the DB,
  435. # it is a new endpoint.
  436. # 3. If host is passed from an agent and it is not found in DB
  437. # but the passed tunnel_ip is found, delete the endpoint
  438. # from DB and add the endpoint with (tunnel_ip, host),
  439. # it is an upgrade case.
  440. # 4. If passed host is found in DB and passed tunnel ip is not
  441. # found, delete the endpoint belonging to that host and
  442. # add endpoint with latest (tunnel_ip, host), it is a case
  443. # where local_ip of an agent got changed.
  444. # 5. If the passed host had another ip in the DB the host-id has
  445. # roamed to a different IP then delete any reference to the new
  446. # local_ip or the host id. Don't notify tunnel_delete for the
  447. # old IP since that one could have been taken by a different
  448. # agent host-id (neutron-ovs-cleanup should be used to clean up
  449. # the stale endpoints).
  450. # Finally create a new endpoint for the (tunnel_ip, host).
  451. if host:
  452. host_endpoint = driver.obj.get_endpoint_by_host(host)
  453. ip_endpoint = driver.obj.get_endpoint_by_ip(tunnel_ip)
  454. if (ip_endpoint and ip_endpoint.host is None and
  455. host_endpoint is None):
  456. driver.obj.delete_endpoint(ip_endpoint.ip_address)
  457. elif (ip_endpoint and ip_endpoint.host != host):
  458. LOG.info(
  459. "Tunnel IP %(ip)s was used by host %(host)s and "
  460. "will be assigned to %(new_host)s",
  461. {'ip': ip_endpoint.ip_address,
  462. 'host': ip_endpoint.host,
  463. 'new_host': host})
  464. driver.obj.delete_endpoint_by_host_or_ip(
  465. host, ip_endpoint.ip_address)
  466. elif (host_endpoint and host_endpoint.ip_address != tunnel_ip):
  467. # Notify all other listening agents to delete stale tunnels
  468. self._notifier.tunnel_delete(
  469. rpc_context, host_endpoint.ip_address, tunnel_type)
  470. driver.obj.delete_endpoint(host_endpoint.ip_address)
  471. tunnel = driver.obj.add_endpoint(tunnel_ip, host)
  472. tunnels = driver.obj.get_endpoints()
  473. entry = {'tunnels': tunnels}
  474. # Notify all other listening agents
  475. self._notifier.tunnel_update(rpc_context, tunnel.ip_address,
  476. tunnel_type)
  477. # Return the list of tunnels IP's to the agent
  478. return entry
  479. else:
  480. msg = _("Network type value '%s' not supported") % tunnel_type
  481. raise exc.InvalidInput(error_message=msg)
  482. class TunnelAgentRpcApiMixin(object):
  483. def _get_tunnel_update_topic(self):
  484. return topics.get_topic_name(self.topic,
  485. TUNNEL,
  486. topics.UPDATE)
  487. def tunnel_update(self, context, tunnel_ip, tunnel_type):
  488. cctxt = self.client.prepare(topic=self._get_tunnel_update_topic(),
  489. fanout=True)
  490. cctxt.cast(context, 'tunnel_update', tunnel_ip=tunnel_ip,
  491. tunnel_type=tunnel_type)
  492. def _get_tunnel_delete_topic(self):
  493. return topics.get_topic_name(self.topic,
  494. TUNNEL,
  495. topics.DELETE)
  496. def tunnel_delete(self, context, tunnel_ip, tunnel_type):
  497. cctxt = self.client.prepare(topic=self._get_tunnel_delete_topic(),
  498. fanout=True)
  499. cctxt.cast(context, 'tunnel_delete', tunnel_ip=tunnel_ip,
  500. tunnel_type=tunnel_type)