OpenStack Networking (Neutron)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

505 lines
21 KiB

  1. # Copyright (c) 2013 OpenStack Foundation
  2. # All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. import abc
  16. import itertools
  17. import operator
  18. import netaddr
  19. from neutron_lib.agent import topics
  20. from neutron_lib import constants as p_const
  21. from neutron_lib import context
  22. from neutron_lib.db import api as db_api
  23. from neutron_lib import exceptions as exc
  24. from neutron_lib.plugins import constants as plugin_constants
  25. from neutron_lib.plugins import directory
  26. from neutron_lib.plugins.ml2 import api
  27. from neutron_lib.plugins import utils as plugin_utils
  28. from oslo_config import cfg
  29. from oslo_db import exception as db_exc
  30. from oslo_log import log
  31. from oslo_utils import uuidutils
  32. from sqlalchemy import or_
  33. from neutron._i18n import _
  34. from neutron.objects import network_segment_range as range_obj
  35. from neutron.plugins.ml2.drivers import helpers
  36. from neutron.services.network_segment_range import plugin as range_plugin
  37. LOG = log.getLogger(__name__)
  38. TUNNEL = 'tunnel'
  39. def chunks(iterable, chunk_size):
  40. """Chunks data into chunk with size<=chunk_size."""
  41. iterator = iter(iterable)
  42. chunk = list(itertools.islice(iterator, 0, chunk_size))
  43. while chunk:
  44. yield chunk
  45. chunk = list(itertools.islice(iterator, 0, chunk_size))
  46. class _TunnelTypeDriverBase(helpers.SegmentTypeDriver, metaclass=abc.ABCMeta):
  47. BULK_SIZE = 100
  48. def __init__(self, model):
  49. super(_TunnelTypeDriverBase, self).__init__(model)
  50. self.segmentation_key = next(iter(self.primary_keys))
  51. @abc.abstractmethod
  52. def add_endpoint(self, ip, host):
  53. """Register the endpoint in the type_driver database.
  54. :param ip: the IP address of the endpoint
  55. :param host: the Host name of the endpoint
  56. """
  57. @abc.abstractmethod
  58. def get_endpoints(self):
  59. """Get every endpoint managed by the type_driver
  60. :returns: a list of dict [{ip_address:endpoint_ip, host:endpoint_host},
  61. ..]
  62. """
  63. @abc.abstractmethod
  64. def get_endpoint_by_host(self, host):
  65. """Get endpoint for a given host managed by the type_driver
  66. :param host: the Host name of the endpoint
  67. if host found in type_driver database
  68. :returns: db object for that particular host
  69. else
  70. :returns: None
  71. """
  72. @abc.abstractmethod
  73. def get_endpoint_by_ip(self, ip):
  74. """Get endpoint for a given tunnel ip managed by the type_driver
  75. :param ip: the IP address of the endpoint
  76. if ip found in type_driver database
  77. :returns: db object for that particular ip
  78. else
  79. :returns: None
  80. """
  81. @abc.abstractmethod
  82. def delete_endpoint(self, ip):
  83. """Delete the endpoint in the type_driver database.
  84. :param ip: the IP address of the endpoint
  85. """
  86. @abc.abstractmethod
  87. def delete_endpoint_by_host_or_ip(self, host, ip):
  88. """Delete the endpoint in the type_driver database.
  89. This function will delete any endpoint matching the specified
  90. ip or host.
  91. :param host: the host name of the endpoint
  92. :param ip: the IP address of the endpoint
  93. """
  94. def _initialize(self, raw_tunnel_ranges):
  95. self.tunnel_ranges = []
  96. self._parse_tunnel_ranges(raw_tunnel_ranges, self.tunnel_ranges)
  97. if not range_plugin.is_network_segment_range_enabled():
  98. # service plugins are initialized/loaded after the ML2 driver
  99. # initialization. Thus, we base on the information whether
  100. # ``network_segment_range`` service plugin is enabled/defined in
  101. # ``neutron.conf`` to decide whether to skip the first time sync
  102. # allocation during driver initialization, instead of using the
  103. # directory.get_plugin() method - the normal way used elsewhere to
  104. # check if a plugin is loaded.
  105. self.sync_allocations()
  106. def _parse_tunnel_ranges(self, tunnel_ranges, current_range):
  107. for entry in tunnel_ranges:
  108. entry = entry.strip()
  109. try:
  110. tun_min, tun_max = entry.split(':')
  111. tun_min = tun_min.strip()
  112. tun_max = tun_max.strip()
  113. tunnel_range = int(tun_min), int(tun_max)
  114. except ValueError as ex:
  115. raise exc.NetworkTunnelRangeError(tunnel_range=entry, error=ex)
  116. plugin_utils.verify_tunnel_range(tunnel_range, self.get_type())
  117. current_range.append(tunnel_range)
  118. LOG.info("%(type)s ID ranges: %(range)s",
  119. {'type': self.get_type(), 'range': current_range})
  120. @db_api.retry_db_errors
  121. def _populate_new_default_network_segment_ranges(self):
  122. ctx = context.get_admin_context()
  123. for tun_min, tun_max in self.tunnel_ranges:
  124. res = {
  125. 'id': uuidutils.generate_uuid(),
  126. 'name': '',
  127. 'default': True,
  128. 'shared': True,
  129. 'network_type': self.get_type(),
  130. 'minimum': tun_min,
  131. 'maximum': tun_max}
  132. with db_api.CONTEXT_WRITER.using(ctx):
  133. new_default_range_obj = (
  134. range_obj.NetworkSegmentRange(ctx, **res))
  135. new_default_range_obj.create()
  136. @db_api.retry_db_errors
  137. def _get_network_segment_ranges_from_db(self):
  138. ranges = []
  139. ctx = context.get_admin_context()
  140. with db_api.CONTEXT_READER.using(ctx):
  141. range_objs = (range_obj.NetworkSegmentRange.get_objects(
  142. ctx, network_type=self.get_type()))
  143. for obj in range_objs:
  144. ranges.append((obj['minimum'], obj['maximum']))
  145. return ranges
  146. def initialize_network_segment_range_support(self):
  147. self._delete_expired_default_network_segment_ranges()
  148. self._populate_new_default_network_segment_ranges()
  149. # Override self.tunnel_ranges with the network segment range
  150. # information from DB and then do a sync_allocations since the
  151. # segment range service plugin has not yet been loaded at this
  152. # initialization time.
  153. self.tunnel_ranges = self._get_network_segment_ranges_from_db()
  154. self.sync_allocations()
  155. def update_network_segment_range_allocations(self):
  156. self.sync_allocations()
  157. @db_api.retry_db_errors
  158. def sync_allocations(self):
  159. # determine current configured allocatable tunnel ids
  160. tunnel_ids = set()
  161. ranges = self.get_network_segment_ranges()
  162. for tun_min, tun_max in ranges:
  163. tunnel_ids |= set(range(tun_min, tun_max + 1))
  164. tunnel_id_getter = operator.attrgetter(self.segmentation_key)
  165. tunnel_col = getattr(self.model, self.segmentation_key)
  166. ctx = context.get_admin_context()
  167. with db_api.CONTEXT_WRITER.using(ctx):
  168. # remove from table unallocated tunnels not currently allocatable
  169. # fetch results as list via all() because we'll be iterating
  170. # through them twice
  171. allocs = ctx.session.query(self.model).all()
  172. # collect those vnis that needs to be deleted from db
  173. unallocateds = (
  174. tunnel_id_getter(a) for a in allocs if not a.allocated)
  175. to_remove = (x for x in unallocateds if x not in tunnel_ids)
  176. # Immediately delete tunnels in chunks. This leaves no work for
  177. # flush at the end of transaction
  178. for chunk in chunks(to_remove, self.BULK_SIZE):
  179. (ctx.session.query(self.model).filter(tunnel_col.in_(chunk)).
  180. filter_by(allocated=False).delete(synchronize_session=False))
  181. # collect vnis that need to be added
  182. existings = {tunnel_id_getter(a) for a in allocs}
  183. missings = list(tunnel_ids - existings)
  184. for chunk in chunks(missings, self.BULK_SIZE):
  185. bulk = [{self.segmentation_key: x, 'allocated': False}
  186. for x in chunk]
  187. ctx.session.execute(self.model.__table__.insert(), bulk)
  188. def is_partial_segment(self, segment):
  189. return segment.get(api.SEGMENTATION_ID) is None
  190. def validate_provider_segment(self, segment):
  191. physical_network = segment.get(api.PHYSICAL_NETWORK)
  192. if physical_network:
  193. msg = _("provider:physical_network specified for %s "
  194. "network") % segment.get(api.NETWORK_TYPE)
  195. raise exc.InvalidInput(error_message=msg)
  196. for key, value in segment.items():
  197. if value and key not in [api.NETWORK_TYPE,
  198. api.SEGMENTATION_ID]:
  199. msg = (_("%(key)s prohibited for %(tunnel)s provider network"),
  200. {'key': key, 'tunnel': segment.get(api.NETWORK_TYPE)})
  201. raise exc.InvalidInput(error_message=msg)
  202. def get_mtu(self, physical_network=None):
  203. seg_mtu = super(_TunnelTypeDriverBase, self).get_mtu()
  204. mtu = []
  205. if seg_mtu > 0:
  206. mtu.append(seg_mtu)
  207. if cfg.CONF.ml2.path_mtu > 0:
  208. mtu.append(cfg.CONF.ml2.path_mtu)
  209. version = cfg.CONF.ml2.overlay_ip_version
  210. ip_header_length = p_const.IP_HEADER_LENGTH[version]
  211. return min(mtu) - ip_header_length if mtu else 0
  212. def get_network_segment_ranges(self):
  213. """Get the driver network segment ranges.
  214. Queries all tunnel network segment ranges from DB if the
  215. ``NETWORK_SEGMENT_RANGE`` service plugin is enabled. Otherwise,
  216. they will be loaded from the host config file - `ml2_conf.ini`.
  217. """
  218. ranges = self.tunnel_ranges
  219. if directory.get_plugin(plugin_constants.NETWORK_SEGMENT_RANGE):
  220. ranges = self._get_network_segment_ranges_from_db()
  221. return ranges
  222. class ML2TunnelTypeDriver(_TunnelTypeDriverBase, metaclass=abc.ABCMeta):
  223. """Define stable abstract interface for ML2 type drivers.
  224. tunnel type networks rely on tunnel endpoints. This class defines abstract
  225. methods to manage these endpoints.
  226. ML2 type driver that passes context as argument to functions:
  227. - reserve_provider_segment
  228. - allocate_tenant_segment
  229. - release_segment
  230. - get_allocation
  231. """
  232. def reserve_provider_segment(self, context, segment, filters=None):
  233. if self.is_partial_segment(segment):
  234. filters = filters or {}
  235. alloc = self.allocate_partially_specified_segment(context,
  236. **filters)
  237. if not alloc:
  238. raise exc.NoNetworkAvailable()
  239. else:
  240. segmentation_id = segment.get(api.SEGMENTATION_ID)
  241. alloc = self.allocate_fully_specified_segment(
  242. context, **{self.segmentation_key: segmentation_id})
  243. if not alloc:
  244. raise exc.TunnelIdInUse(tunnel_id=segmentation_id)
  245. return {api.NETWORK_TYPE: self.get_type(),
  246. api.PHYSICAL_NETWORK: None,
  247. api.SEGMENTATION_ID: getattr(alloc, self.segmentation_key),
  248. api.MTU: self.get_mtu()}
  249. def allocate_tenant_segment(self, context, filters=None):
  250. filters = filters or {}
  251. alloc = self.allocate_partially_specified_segment(context, **filters)
  252. if not alloc:
  253. return
  254. return {api.NETWORK_TYPE: self.get_type(),
  255. api.PHYSICAL_NETWORK: None,
  256. api.SEGMENTATION_ID: getattr(alloc, self.segmentation_key),
  257. api.MTU: self.get_mtu()}
  258. def release_segment(self, context, segment):
  259. tunnel_id = segment[api.SEGMENTATION_ID]
  260. ranges = self.get_network_segment_ranges()
  261. inside = any(lo <= tunnel_id <= hi for lo, hi in ranges)
  262. info = {'type': self.get_type(), 'id': tunnel_id}
  263. with db_api.CONTEXT_WRITER.using(context):
  264. query = (context.session.query(self.model).
  265. filter_by(**{self.segmentation_key: tunnel_id}))
  266. if inside:
  267. count = query.update({"allocated": False})
  268. if count:
  269. LOG.debug("Releasing %(type)s tunnel %(id)s to pool",
  270. info)
  271. else:
  272. count = query.delete()
  273. if count:
  274. LOG.debug("Releasing %(type)s tunnel %(id)s outside pool",
  275. info)
  276. if not count:
  277. LOG.warning("%(type)s tunnel %(id)s not found", info)
  278. @db_api.CONTEXT_READER
  279. def get_allocation(self, context, tunnel_id):
  280. return (context.session.query(self.model).
  281. filter_by(**{self.segmentation_key: tunnel_id}).
  282. first())
  283. class EndpointTunnelTypeDriver(ML2TunnelTypeDriver):
  284. def __init__(self, segment_model, endpoint_model):
  285. super(EndpointTunnelTypeDriver, self).__init__(segment_model)
  286. self.endpoint_model = endpoint_model.db_model
  287. self.segmentation_key = next(iter(self.primary_keys))
  288. def get_endpoint_by_host(self, host):
  289. LOG.debug("get_endpoint_by_host() called for host %s", host)
  290. session = db_api.get_reader_session()
  291. return (session.query(self.endpoint_model).
  292. filter_by(host=host).first())
  293. def get_endpoint_by_ip(self, ip):
  294. LOG.debug("get_endpoint_by_ip() called for ip %s", ip)
  295. session = db_api.get_reader_session()
  296. return (session.query(self.endpoint_model).
  297. filter_by(ip_address=ip).first())
  298. def delete_endpoint(self, ip):
  299. LOG.debug("delete_endpoint() called for ip %s", ip)
  300. session = db_api.get_writer_session()
  301. session.query(self.endpoint_model).filter_by(ip_address=ip).delete()
  302. def delete_endpoint_by_host_or_ip(self, host, ip):
  303. LOG.debug("delete_endpoint_by_host_or_ip() called for "
  304. "host %(host)s or %(ip)s", {'host': host, 'ip': ip})
  305. session = db_api.get_writer_session()
  306. session.query(self.endpoint_model).filter(
  307. or_(self.endpoint_model.host == host,
  308. self.endpoint_model.ip_address == ip)).delete()
  309. def _get_endpoints(self):
  310. LOG.debug("_get_endpoints() called")
  311. session = db_api.get_reader_session()
  312. return session.query(self.endpoint_model)
  313. def _add_endpoint(self, ip, host, **kwargs):
  314. LOG.debug("_add_endpoint() called for ip %s", ip)
  315. session = db_api.get_writer_session()
  316. try:
  317. endpoint = self.endpoint_model(ip_address=ip, host=host, **kwargs)
  318. endpoint.save(session)
  319. except db_exc.DBDuplicateEntry:
  320. endpoint = (session.query(self.endpoint_model).
  321. filter_by(ip_address=ip).one())
  322. LOG.warning("Endpoint with ip %s already exists", ip)
  323. return endpoint
  324. class TunnelRpcCallbackMixin(object):
  325. def setup_tunnel_callback_mixin(self, notifier, type_manager):
  326. self._notifier = notifier
  327. self._type_manager = type_manager
  328. def tunnel_sync(self, rpc_context, **kwargs):
  329. """Update new tunnel.
  330. Updates the database with the tunnel IP. All listening agents will also
  331. be notified about the new tunnel IP.
  332. """
  333. tunnel_ip = kwargs.get('tunnel_ip')
  334. if not tunnel_ip:
  335. msg = _("Tunnel IP value needed by the ML2 plugin")
  336. raise exc.InvalidInput(error_message=msg)
  337. host = kwargs.get('host')
  338. version = netaddr.IPAddress(tunnel_ip).version
  339. if version != cfg.CONF.ml2.overlay_ip_version:
  340. msg = (_("Tunnel IP version does not match ML2 "
  341. "overlay_ip_version: %(overlay)s, host: %(host)s, "
  342. "tunnel_ip: %(ip)s"),
  343. {'overlay': cfg.CONF.ml2.overlay_ip_version,
  344. 'host': host, 'ip': tunnel_ip})
  345. raise exc.InvalidInput(error_message=msg)
  346. tunnel_type = kwargs.get('tunnel_type')
  347. if not tunnel_type:
  348. msg = _("Network type value needed by the ML2 plugin")
  349. raise exc.InvalidInput(error_message=msg)
  350. driver = self._type_manager.drivers.get(tunnel_type)
  351. if driver:
  352. # The given conditional statements will verify the following
  353. # things:
  354. # 1. If host is not passed from an agent, it is a legacy mode.
  355. # 2. If passed host and tunnel_ip are not found in the DB,
  356. # it is a new endpoint.
  357. # 3. If host is passed from an agent and it is not found in DB
  358. # but the passed tunnel_ip is found, delete the endpoint
  359. # from DB and add the endpoint with (tunnel_ip, host),
  360. # it is an upgrade case.
  361. # 4. If passed host is found in DB and passed tunnel ip is not
  362. # found, delete the endpoint belonging to that host and
  363. # add endpoint with latest (tunnel_ip, host), it is a case
  364. # where local_ip of an agent got changed.
  365. # 5. If the passed host had another ip in the DB the host-id has
  366. # roamed to a different IP then delete any reference to the new
  367. # local_ip or the host id. Don't notify tunnel_delete for the
  368. # old IP since that one could have been taken by a different
  369. # agent host-id (neutron-ovs-cleanup should be used to clean up
  370. # the stale endpoints).
  371. # Finally create a new endpoint for the (tunnel_ip, host).
  372. if host:
  373. host_endpoint = driver.obj.get_endpoint_by_host(host)
  374. ip_endpoint = driver.obj.get_endpoint_by_ip(tunnel_ip)
  375. if (ip_endpoint and ip_endpoint.host is None and
  376. host_endpoint is None):
  377. driver.obj.delete_endpoint(ip_endpoint.ip_address)
  378. elif (ip_endpoint and ip_endpoint.host != host):
  379. LOG.info(
  380. "Tunnel IP %(ip)s was used by host %(host)s and "
  381. "will be assigned to %(new_host)s",
  382. {'ip': ip_endpoint.ip_address,
  383. 'host': ip_endpoint.host,
  384. 'new_host': host})
  385. driver.obj.delete_endpoint_by_host_or_ip(
  386. host, ip_endpoint.ip_address)
  387. elif (host_endpoint and host_endpoint.ip_address != tunnel_ip):
  388. # Notify all other listening agents to delete stale tunnels
  389. self._notifier.tunnel_delete(
  390. rpc_context, host_endpoint.ip_address, tunnel_type)
  391. driver.obj.delete_endpoint(host_endpoint.ip_address)
  392. tunnel = driver.obj.add_endpoint(tunnel_ip, host)
  393. tunnels = driver.obj.get_endpoints()
  394. entry = {'tunnels': tunnels}
  395. # Notify all other listening agents
  396. self._notifier.tunnel_update(rpc_context, tunnel.ip_address,
  397. tunnel_type)
  398. # Return the list of tunnels IP's to the agent
  399. return entry
  400. else:
  401. msg = (_("Network type value %(type)s not supported, "
  402. "host: %(host)s with tunnel IP: %(ip)s") %
  403. {'type': tunnel_type,
  404. 'host': host or 'legacy mode (no host provided by agent)',
  405. 'ip': tunnel_ip})
  406. raise exc.InvalidInput(error_message=msg)
  407. class TunnelAgentRpcApiMixin(object):
  408. def _get_tunnel_update_topic(self):
  409. return topics.get_topic_name(self.topic,
  410. TUNNEL,
  411. topics.UPDATE)
  412. def tunnel_update(self, context, tunnel_ip, tunnel_type):
  413. cctxt = self.client.prepare(topic=self._get_tunnel_update_topic(),
  414. fanout=True)
  415. cctxt.cast(context, 'tunnel_update', tunnel_ip=tunnel_ip,
  416. tunnel_type=tunnel_type)
  417. def _get_tunnel_delete_topic(self):
  418. return topics.get_topic_name(self.topic,
  419. TUNNEL,
  420. topics.DELETE)
  421. def tunnel_delete(self, context, tunnel_ip, tunnel_type):
  422. cctxt = self.client.prepare(topic=self._get_tunnel_delete_topic(),
  423. fanout=True)
  424. cctxt.cast(context, 'tunnel_delete', tunnel_ip=tunnel_ip,
  425. tunnel_type=tunnel_type)