OpenStack Networking (Neutron)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

287 lines
11KB

  1. # Copyright (c) 2015 Mellanox Technologies, Ltd
  2. # All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. import abc
  16. import collections
  17. from neutron_lib import exceptions
  18. from oslo_concurrency import lockutils
  19. from oslo_log import log as logging
  20. import six
  21. from neutron._i18n import _LW, _LI
  22. from neutron.agent.l2 import agent_extension
  23. from neutron.agent.linux import tc_lib
  24. from neutron.api.rpc.callbacks.consumer import registry
  25. from neutron.api.rpc.callbacks import events
  26. from neutron.api.rpc.callbacks import resources
  27. from neutron.api.rpc.handlers import resources_rpc
  28. from neutron import manager
  29. LOG = logging.getLogger(__name__)
  30. @six.add_metaclass(abc.ABCMeta)
  31. class QosAgentDriver(object):
  32. """Defines stable abstract interface for QoS Agent Driver.
  33. QoS Agent driver defines the interface to be implemented by Agent
  34. for applying QoS Rules on a port.
  35. """
  36. # Each QoS driver should define the set of rule types that it supports, and
  37. # corresponding handlers that has the following names:
  38. #
  39. # create_<type>
  40. # update_<type>
  41. # delete_<type>
  42. #
  43. # where <type> is one of VALID_RULE_TYPES
  44. SUPPORTED_RULES = set()
  45. @abc.abstractmethod
  46. def initialize(self):
  47. """Perform QoS agent driver initialization.
  48. """
  49. def create(self, port, qos_policy):
  50. """Apply QoS rules on port for the first time.
  51. :param port: port object.
  52. :param qos_policy: the QoS policy to be applied on port.
  53. """
  54. self._handle_update_create_rules('create', port, qos_policy)
  55. def consume_api(self, agent_api):
  56. """Consume the AgentAPI instance from the QoSAgentExtension class
  57. This allows QosAgentDrivers to gain access to resources limited to the
  58. NeutronAgent when this method is overridden.
  59. :param agent_api: An instance of an agent specific API
  60. """
  61. def update(self, port, qos_policy):
  62. """Apply QoS rules on port.
  63. :param port: port object.
  64. :param qos_policy: the QoS policy to be applied on port.
  65. """
  66. self._handle_update_create_rules('update', port, qos_policy)
  67. def delete(self, port, qos_policy=None):
  68. """Remove QoS rules from port.
  69. :param port: port object.
  70. :param qos_policy: the QoS policy to be removed from port.
  71. """
  72. if qos_policy is None:
  73. rule_types = self.SUPPORTED_RULES
  74. else:
  75. rule_types = set(
  76. [rule.rule_type
  77. for rule in self._iterate_rules(qos_policy.rules)])
  78. for rule_type in rule_types:
  79. self._handle_rule_delete(port, rule_type)
  80. def _iterate_rules(self, rules):
  81. for rule in rules:
  82. rule_type = rule.rule_type
  83. if rule_type in self.SUPPORTED_RULES:
  84. yield rule
  85. else:
  86. LOG.warning(_LW('Unsupported QoS rule type for %(rule_id)s: '
  87. '%(rule_type)s; skipping'),
  88. {'rule_id': rule.id, 'rule_type': rule_type})
  89. def _handle_rule_delete(self, port, rule_type):
  90. handler_name = "".join(("delete_", rule_type))
  91. handler = getattr(self, handler_name)
  92. handler(port)
  93. def _handle_update_create_rules(self, action, port, qos_policy):
  94. for rule in self._iterate_rules(qos_policy.rules):
  95. if rule.should_apply_to_port(port):
  96. handler_name = "".join((action, "_", rule.rule_type))
  97. handler = getattr(self, handler_name)
  98. handler(port, rule)
  99. else:
  100. LOG.debug("Port %(port)s excluded from QoS rule %(rule)s",
  101. {'port': port, 'rule': rule.id})
  102. def _get_egress_burst_value(self, rule):
  103. """Return burst value used for egress bandwidth limitation.
  104. Because Egress bw_limit is done on ingress qdisc by LB and ovs drivers
  105. so it will return burst_value used by tc on as ingress_qdisc.
  106. """
  107. return tc_lib.TcCommand.get_ingress_qdisc_burst_value(
  108. rule.max_kbps, rule.max_burst_kbps)
  109. class PortPolicyMap(object):
  110. def __init__(self):
  111. # we cannot use a dict of sets here because port dicts are not hashable
  112. self.qos_policy_ports = collections.defaultdict(dict)
  113. self.known_policies = {}
  114. self.port_policies = {}
  115. def get_ports(self, policy):
  116. return self.qos_policy_ports[policy.id].values()
  117. def get_policy(self, policy_id):
  118. return self.known_policies.get(policy_id)
  119. def update_policy(self, policy):
  120. self.known_policies[policy.id] = policy
  121. def has_policy_changed(self, port, policy_id):
  122. return self.port_policies.get(port['port_id']) != policy_id
  123. def get_port_policy(self, port):
  124. policy_id = self.port_policies.get(port['port_id'])
  125. if policy_id:
  126. return self.get_policy(policy_id)
  127. def set_port_policy(self, port, policy):
  128. """Attach a port to policy and return any previous policy on port."""
  129. port_id = port['port_id']
  130. old_policy = self.get_port_policy(port)
  131. self.known_policies[policy.id] = policy
  132. self.port_policies[port_id] = policy.id
  133. self.qos_policy_ports[policy.id][port_id] = port
  134. if old_policy and old_policy.id != policy.id:
  135. del self.qos_policy_ports[old_policy.id][port_id]
  136. return old_policy
  137. def clean_by_port(self, port):
  138. """Detach port from policy and cleanup data we don't need anymore."""
  139. port_id = port['port_id']
  140. if port_id in self.port_policies:
  141. del self.port_policies[port_id]
  142. for qos_policy_id, port_dict in self.qos_policy_ports.items():
  143. if port_id in port_dict:
  144. del port_dict[port_id]
  145. if not port_dict:
  146. self._clean_policy_info(qos_policy_id)
  147. return
  148. raise exceptions.PortNotFound(port_id=port['port_id'])
  149. def _clean_policy_info(self, qos_policy_id):
  150. del self.qos_policy_ports[qos_policy_id]
  151. del self.known_policies[qos_policy_id]
  152. class QosAgentExtension(agent_extension.AgentCoreResourceExtension):
  153. SUPPORTED_RESOURCES = [resources.QOS_POLICY]
  154. def initialize(self, connection, driver_type):
  155. """Perform Agent Extension initialization.
  156. """
  157. self.resource_rpc = resources_rpc.ResourcesPullRpcApi()
  158. self.qos_driver = manager.NeutronManager.load_class_for_provider(
  159. 'neutron.qos.agent_drivers', driver_type)()
  160. self.qos_driver.consume_api(self.agent_api)
  161. self.qos_driver.initialize()
  162. self.policy_map = PortPolicyMap()
  163. registry.subscribe(self._handle_notification, resources.QOS_POLICY)
  164. self._register_rpc_consumers(connection)
  165. def consume_api(self, agent_api):
  166. self.agent_api = agent_api
  167. def _register_rpc_consumers(self, connection):
  168. endpoints = [resources_rpc.ResourcesPushRpcCallback()]
  169. for resource_type in self.SUPPORTED_RESOURCES:
  170. # we assume that neutron-server always broadcasts the latest
  171. # version known to the agent
  172. topic = resources_rpc.resource_type_versioned_topic(resource_type)
  173. connection.create_consumer(topic, endpoints, fanout=True)
  174. @lockutils.synchronized('qos-port')
  175. def _handle_notification(self, qos_policy, event_type):
  176. # server does not allow to remove a policy that is attached to any
  177. # port, so we ignore DELETED events. Also, if we receive a CREATED
  178. # event for a policy, it means that there are no ports so far that are
  179. # attached to it. That's why we are interested in UPDATED events only
  180. if event_type == events.UPDATED:
  181. self._process_update_policy(qos_policy)
  182. @lockutils.synchronized('qos-port')
  183. def handle_port(self, context, port):
  184. """Handle agent QoS extension for port.
  185. This method applies a new policy to a port using the QoS driver.
  186. Update events are handled in _handle_notification.
  187. """
  188. port_id = port['port_id']
  189. port_qos_policy_id = port.get('qos_policy_id')
  190. network_qos_policy_id = port.get('network_qos_policy_id')
  191. qos_policy_id = port_qos_policy_id or network_qos_policy_id
  192. if qos_policy_id is None:
  193. self._process_reset_port(port)
  194. return
  195. if not self.policy_map.has_policy_changed(port, qos_policy_id):
  196. return
  197. qos_policy = self.resource_rpc.pull(
  198. context, resources.QOS_POLICY, qos_policy_id)
  199. if qos_policy is None:
  200. LOG.info(_LI("QoS policy %(qos_policy_id)s applied to port "
  201. "%(port_id)s is not available on server, "
  202. "it has been deleted. Skipping."),
  203. {'qos_policy_id': qos_policy_id, 'port_id': port_id})
  204. self._process_reset_port(port)
  205. else:
  206. old_qos_policy = self.policy_map.set_port_policy(port, qos_policy)
  207. if old_qos_policy:
  208. self.qos_driver.delete(port, old_qos_policy)
  209. self.qos_driver.update(port, qos_policy)
  210. else:
  211. self.qos_driver.create(port, qos_policy)
  212. def delete_port(self, context, port):
  213. self._process_reset_port(port)
  214. def _policy_rules_modified(self, old_policy, policy):
  215. return not (len(old_policy.rules) == len(policy.rules) and
  216. all(i in old_policy.rules for i in policy.rules))
  217. def _process_update_policy(self, qos_policy):
  218. old_qos_policy = self.policy_map.get_policy(qos_policy.id)
  219. if old_qos_policy:
  220. if self._policy_rules_modified(old_qos_policy, qos_policy):
  221. for port in self.policy_map.get_ports(qos_policy):
  222. #NOTE(QoS): for now, just reflush the rules on the port.
  223. # Later, we may want to apply the difference
  224. # between the old and new rule lists.
  225. self.qos_driver.delete(port, old_qos_policy)
  226. self.qos_driver.update(port, qos_policy)
  227. self.policy_map.update_policy(qos_policy)
  228. def _process_reset_port(self, port):
  229. try:
  230. self.policy_map.clean_by_port(port)
  231. self.qos_driver.delete(port)
  232. except exceptions.PortNotFound:
  233. LOG.info(_LI("QoS extension did have no information about the "
  234. "port %s that we were trying to reset"),
  235. port['port_id'])