OpenStack Networking (Neutron)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

263 lines
11KB

  1. #
  2. # Licensed under the Apache License, Version 2.0 (the "License");
  3. # you may not use this file except in compliance with the License.
  4. # You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software
  9. # distributed under the License is distributed on an "AS IS" BASIS,
  10. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. # See the License for the specific language governing permissions and
  12. # limitations under the License.
  13. import re
  14. import eventlet
  15. import netaddr
  16. from oslo_concurrency import lockutils
  17. from oslo_log import log as logging
  18. from neutron.agent.linux import utils as linux_utils
  19. from neutron.common import constants as n_const
  20. from neutron.common import exceptions as n_exc
  21. LOG = logging.getLogger(__name__)
  22. CONTRACK_MGRS = {}
  23. MAX_CONNTRACK_ZONES = 65535
  24. ZONE_START = 4097
  25. class IpConntrackUpdate(object):
  26. """Encapsulates a conntrack update
  27. An instance of this object carries the information necessary to
  28. process a request to update the conntrack table.
  29. """
  30. def __init__(self, device_info_list, rule, remote_ips):
  31. self.device_info_list = device_info_list
  32. self.rule = rule
  33. self.remote_ips = remote_ips
  34. class IpConntrackProcessingQueue(object):
  35. """Manager of the queue of conntrack updates to process."""
  36. def __init__(self):
  37. self._queue = eventlet.queue.LightQueue()
  38. def add(self, update):
  39. self._queue.put(update)
  40. def updates(self):
  41. """Grabs the next conntrack update from the queue and processes."""
  42. while not self._queue.empty():
  43. try:
  44. update = self._queue.get()
  45. yield update
  46. except IndexError:
  47. # queue was empty, another worker stole our entry
  48. continue
  49. except Exception as e:
  50. LOG.error("Failed to yield ip_conntrack process queue "
  51. "entry: %s", e)
  52. @lockutils.synchronized('conntrack')
  53. def get_conntrack(get_rules_for_table_func, filtered_ports, unfiltered_ports,
  54. execute=None, namespace=None, zone_per_port=False):
  55. try:
  56. return CONTRACK_MGRS[namespace]
  57. except KeyError:
  58. ipconntrack = IpConntrackManager(get_rules_for_table_func,
  59. filtered_ports, unfiltered_ports,
  60. execute, namespace, zone_per_port)
  61. CONTRACK_MGRS[namespace] = ipconntrack
  62. return CONTRACK_MGRS[namespace]
  63. class IpConntrackManager(object):
  64. """Smart wrapper for ip conntrack."""
  65. def __init__(self, get_rules_for_table_func, filtered_ports,
  66. unfiltered_ports, execute=None, namespace=None,
  67. zone_per_port=False):
  68. self.get_rules_for_table_func = get_rules_for_table_func
  69. self.execute = execute or linux_utils.execute
  70. self.namespace = namespace
  71. self.filtered_ports = filtered_ports
  72. self.unfiltered_ports = unfiltered_ports
  73. self.zone_per_port = zone_per_port # zone per port vs per network
  74. self._populate_initial_zone_map()
  75. self._queue = IpConntrackProcessingQueue()
  76. # Don't start the queue processing thread here, do later when
  77. # the first entry is added to the queue.
  78. self._process_queue_started = False
  79. def _start_process_queue(self):
  80. eventlet.spawn_n(self._process_queue_loop)
  81. def _process_queue_loop(self):
  82. LOG.debug("Starting ipconntrack _process_queue_loop()")
  83. pool = eventlet.GreenPool(size=8)
  84. while True:
  85. pool.spawn_n(self._process_queue)
  86. def _process_queue(self):
  87. for update in self._queue.updates():
  88. if update.remote_ips:
  89. for remote_ip in update.remote_ips:
  90. self._delete_conntrack_state(
  91. update.device_info_list, update.rule, remote_ip)
  92. else:
  93. self._delete_conntrack_state(update.device_info_list,
  94. update.rule)
  95. def _process(self, device_info_list, rule, remote_ips=None):
  96. if not self._process_queue_started:
  97. self._process_queue_started = True
  98. self._start_process_queue()
  99. # queue the update to allow the caller to resume its work
  100. update = IpConntrackUpdate(device_info_list, rule, remote_ips)
  101. self._queue.add(update)
  102. @staticmethod
  103. def _generate_conntrack_cmd_by_rule(rule, namespace):
  104. ethertype = rule.get('ethertype')
  105. protocol = rule.get('protocol')
  106. direction = rule.get('direction')
  107. cmd = ['conntrack', '-D']
  108. if protocol:
  109. cmd.extend(['-p', str(protocol)])
  110. cmd.extend(['-f', str(ethertype).lower()])
  111. cmd.append('-d' if direction == 'ingress' else '-s')
  112. cmd_ns = []
  113. if namespace:
  114. cmd_ns.extend(['ip', 'netns', 'exec', namespace])
  115. cmd_ns.extend(cmd)
  116. return cmd_ns
  117. def _get_conntrack_cmds(self, device_info_list, rule, remote_ip=None):
  118. conntrack_cmds = set()
  119. cmd = self._generate_conntrack_cmd_by_rule(rule, self.namespace)
  120. ethertype = rule.get('ethertype')
  121. for device_info in device_info_list:
  122. zone_id = self.get_device_zone(device_info, create=False)
  123. if not zone_id:
  124. LOG.debug("No zone for device %(dev)s. Will not try to "
  125. "clear conntrack state. Zone map: %(zm)s",
  126. {'dev': device_info['device'],
  127. 'zm': self._device_zone_map})
  128. continue
  129. ips = device_info.get('fixed_ips', [])
  130. for ip in ips:
  131. net = netaddr.IPNetwork(ip)
  132. if str(net.version) not in ethertype:
  133. continue
  134. ip_cmd = [str(net.ip), '-w', zone_id]
  135. if remote_ip and str(
  136. netaddr.IPNetwork(remote_ip).version) in ethertype:
  137. if rule.get('direction') == 'ingress':
  138. direction = '-s'
  139. else:
  140. direction = '-d'
  141. ip_cmd.extend([direction, str(remote_ip)])
  142. conntrack_cmds.add(tuple(cmd + ip_cmd))
  143. return conntrack_cmds
  144. def _delete_conntrack_state(self, device_info_list, rule, remote_ip=None):
  145. conntrack_cmds = self._get_conntrack_cmds(device_info_list,
  146. rule, remote_ip)
  147. for cmd in conntrack_cmds:
  148. try:
  149. self.execute(list(cmd), run_as_root=True,
  150. check_exit_code=True,
  151. extra_ok_codes=[1])
  152. except RuntimeError:
  153. LOG.exception("Failed execute conntrack command %s", cmd)
  154. def delete_conntrack_state_by_rule(self, device_info_list, rule):
  155. self._process(device_info_list, rule)
  156. def delete_conntrack_state_by_remote_ips(self, device_info_list,
  157. ethertype, remote_ips):
  158. for direction in ['ingress', 'egress']:
  159. rule = {'ethertype': str(ethertype).lower(),
  160. 'direction': direction}
  161. self._process(device_info_list, rule, remote_ips)
  162. def _populate_initial_zone_map(self):
  163. """Setup the map between devices and zones based on current rules."""
  164. self._device_zone_map = {}
  165. rules = self.get_rules_for_table_func('raw')
  166. for rule in rules:
  167. match = re.match(r'.* --physdev-in (?P<dev>[a-zA-Z0-9\-]+)'
  168. r'.* -j CT --zone (?P<zone>\d+).*', rule)
  169. if match:
  170. # strip off any prefix that the interface is using
  171. short_port_id = (match.group('dev')
  172. [n_const.LINUX_DEV_PREFIX_LEN:])
  173. self._device_zone_map[short_port_id] = int(match.group('zone'))
  174. LOG.debug("Populated conntrack zone map: %s", self._device_zone_map)
  175. def _device_key(self, port):
  176. # we have to key the device_zone_map based on the fragment of the
  177. # UUID that shows up in the interface name. This is because the initial
  178. # map is populated strictly based on interface names that we don't know
  179. # the full UUID of.
  180. if self.zone_per_port:
  181. identifier = port['device'][n_const.LINUX_DEV_PREFIX_LEN:]
  182. else:
  183. identifier = port['network_id']
  184. return identifier[:(n_const.LINUX_DEV_LEN -
  185. n_const.LINUX_DEV_PREFIX_LEN)]
  186. def get_device_zone(self, port, create=True):
  187. device_key = self._device_key(port)
  188. try:
  189. return self._device_zone_map[device_key]
  190. except KeyError:
  191. if create:
  192. return self._generate_device_zone(device_key)
  193. def _free_zones_from_removed_ports(self):
  194. """Clears any entries from the zone map of removed ports."""
  195. existing_ports = [
  196. self._device_key(port)
  197. for port in (list(self.filtered_ports.values()) +
  198. list(self.unfiltered_ports.values()))
  199. ]
  200. removed = set(self._device_zone_map) - set(existing_ports)
  201. for dev in removed:
  202. self._device_zone_map.pop(dev, None)
  203. def _generate_device_zone(self, short_device_id):
  204. """Generates a unique conntrack zone for the passed in ID."""
  205. try:
  206. zone = self._find_open_zone()
  207. except n_exc.CTZoneExhaustedError:
  208. # Free some zones and try again, repeat failure will not be caught
  209. self._free_zones_from_removed_ports()
  210. zone = self._find_open_zone()
  211. self._device_zone_map[short_device_id] = zone
  212. LOG.debug("Assigned CT zone %(z)s to device %(dev)s.",
  213. {'z': zone, 'dev': short_device_id})
  214. return self._device_zone_map[short_device_id]
  215. def _find_open_zone(self):
  216. # call set to dedup because old ports may be mapped to the same zone.
  217. zones_in_use = sorted(set(self._device_zone_map.values()))
  218. if not zones_in_use:
  219. return ZONE_START
  220. # attempt to increment onto the highest used zone first. if we hit the
  221. # end, go back and look for any gaps left by removed devices.
  222. last = zones_in_use[-1]
  223. if last < MAX_CONNTRACK_ZONES:
  224. return max(last + 1, ZONE_START)
  225. for index, used in enumerate(zones_in_use):
  226. if used - index != ZONE_START:
  227. # gap found, let's use it!
  228. return index + ZONE_START
  229. # conntrack zones exhausted :( :(
  230. raise n_exc.CTZoneExhaustedError()