OpenStack Networking (Neutron)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

211 lines
8.5KB

  1. # Copyright 2016 Hewlett Packard Enterprise Development LP
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  4. # not use this file except in compliance with the License. You may obtain
  5. # a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  11. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  12. # License for the specific language governing permissions and limitations
  13. # under the License.
  14. import collections
  15. from neutron_lib.api.definitions import portbindings
  16. from neutron_lib.db import api as db_api
  17. from neutron_lib.plugins import directory
  18. from neutron_lib import rpc as n_rpc
  19. from oslo_log import helpers as log_helpers
  20. from oslo_log import log as logging
  21. import oslo_messaging
  22. from sqlalchemy.orm import exc
  23. from neutron.api.rpc.callbacks import events
  24. from neutron.api.rpc.callbacks.producer import registry
  25. from neutron.api.rpc.callbacks import resources
  26. from neutron.api.rpc.handlers import resources_rpc
  27. from neutron.objects import trunk as trunk_objects
  28. from neutron.services.trunk import constants as trunk_consts
  29. from neutron.services.trunk import exceptions as trunk_exc
  30. from neutron.services.trunk.rpc import constants
  31. LOG = logging.getLogger(__name__)
  32. # This module contains stub (client-side) and skeleton (server-side)
  33. # proxy code that executes in the Neutron server process space. This
  34. # is needed if any of the trunk service plugin drivers has a remote
  35. # component (e.g. agent), that needs to communicate with the Neutron
  36. # Server.
  37. # The Server side exposes the following remote methods:
  38. #
  39. # - lookup method to retrieve trunk details: used by the agent to learn
  40. # about the trunk.
  41. # - update methods for trunk and its subports: used by the agent to
  42. # inform the server about local trunk status changes.
  43. #
  44. # For agent-side stub and skeleton proxy code, please look at agent.py
  45. def trunk_by_port_provider(resource, port_id, context, **kwargs):
  46. """Provider callback to supply trunk information by parent port."""
  47. return trunk_objects.Trunk.get_object(context, port_id=port_id)
  48. class TrunkSkeleton(object):
  49. """Skeleton proxy code for agent->server communication."""
  50. # API version history:
  51. # 1.0 Initial version
  52. target = oslo_messaging.Target(version='1.0',
  53. namespace=constants.TRUNK_BASE_NAMESPACE)
  54. _core_plugin = None
  55. def __init__(self):
  56. # Used to provide trunk lookups for the agent.
  57. registry.provide(trunk_by_port_provider, resources.TRUNK)
  58. self._connection = n_rpc.Connection()
  59. self._connection.create_consumer(
  60. constants.TRUNK_BASE_TOPIC, [self], fanout=False)
  61. self._connection.consume_in_threads()
  62. @property
  63. def core_plugin(self):
  64. if not self._core_plugin:
  65. self._core_plugin = directory.get_plugin()
  66. return self._core_plugin
  67. @log_helpers.log_method_call
  68. def update_subport_bindings(self, context, subports):
  69. """Update subport bindings to match trunk host binding."""
  70. el = context.elevated()
  71. ports_by_trunk_id = collections.defaultdict(list)
  72. updated_ports = collections.defaultdict(list)
  73. for s in subports:
  74. ports_by_trunk_id[s['trunk_id']].append(s['port_id'])
  75. for trunk_id, subport_ids in ports_by_trunk_id.items():
  76. trunk = trunk_objects.Trunk.get_object(el, id=trunk_id)
  77. if not trunk:
  78. LOG.debug("Trunk not found. id: %s", trunk_id)
  79. continue
  80. trunk_updated_ports = self._process_trunk_subport_bindings(
  81. el,
  82. trunk,
  83. subport_ids)
  84. updated_ports[trunk.id].extend(trunk_updated_ports)
  85. return updated_ports
  86. def _safe_update_trunk(self, trunk, **kwargs):
  87. for try_cnt in range(db_api.MAX_RETRIES):
  88. try:
  89. trunk.update(**kwargs)
  90. break
  91. except exc.StaleDataError as e:
  92. if try_cnt < db_api.MAX_RETRIES - 1:
  93. LOG.debug("Got StaleDataError exception: %s", e)
  94. continue
  95. else:
  96. # re-raise when all tries failed
  97. raise
  98. def update_trunk_status(self, context, trunk_id, status):
  99. """Update the trunk status to reflect outcome of data plane wiring."""
  100. with db_api.autonested_transaction(context.session):
  101. trunk = trunk_objects.Trunk.get_object(context, id=trunk_id)
  102. if trunk:
  103. self._safe_update_trunk(trunk, status=status)
  104. def _process_trunk_subport_bindings(self, context, trunk, port_ids):
  105. """Process port bindings for subports on the given trunk."""
  106. updated_ports = []
  107. trunk_port_id = trunk.port_id
  108. trunk_port = self.core_plugin.get_port(context, trunk_port_id)
  109. trunk_host = trunk_port.get(portbindings.HOST_ID)
  110. # NOTE(status_police) Set the trunk in BUILD state before
  111. # processing subport bindings. The trunk will stay in BUILD
  112. # state until an attempt has been made to bind all subports
  113. # passed here and the agent acknowledges the operation was
  114. # successful.
  115. self._safe_update_trunk(
  116. trunk, status=trunk_consts.BUILD_STATUS)
  117. for port_id in port_ids:
  118. try:
  119. updated_port = self._handle_port_binding(context, port_id,
  120. trunk, trunk_host)
  121. # NOTE(fitoduarte): consider trimming down the content
  122. # of the port data structure.
  123. updated_ports.append(updated_port)
  124. except trunk_exc.SubPortBindingError as e:
  125. LOG.error("Failed to bind subport: %s", e)
  126. # NOTE(status_police) The subport binding has failed in a
  127. # manner in which we cannot proceed and the user must take
  128. # action to bring the trunk back to a sane state.
  129. self._safe_update_trunk(
  130. trunk, status=trunk_consts.ERROR_STATUS)
  131. return []
  132. except Exception as e:
  133. msg = ("Failed to bind subport port %(port)s on trunk "
  134. "%(trunk)s: %(exc)s")
  135. LOG.error(msg, {'port': port_id, 'trunk': trunk.id, 'exc': e})
  136. if len(port_ids) != len(updated_ports):
  137. self._safe_update_trunk(
  138. trunk, status=trunk_consts.DEGRADED_STATUS)
  139. return updated_ports
  140. def _handle_port_binding(self, context, port_id, trunk, trunk_host):
  141. """Bind the given port to the given host.
  142. :param context: The context to use for the operation
  143. :param port_id: The UUID of the port to be bound
  144. :param trunk: The trunk that the given port belongs to
  145. :param trunk_host: The host to bind the given port to
  146. """
  147. port = self.core_plugin.update_port(
  148. context, port_id,
  149. {'port': {portbindings.HOST_ID: trunk_host,
  150. 'device_owner': trunk_consts.TRUNK_SUBPORT_OWNER}})
  151. vif_type = port.get(portbindings.VIF_TYPE)
  152. if vif_type == portbindings.VIF_TYPE_BINDING_FAILED:
  153. raise trunk_exc.SubPortBindingError(port_id=port_id,
  154. trunk_id=trunk.id)
  155. return port
  156. class TrunkStub(object):
  157. """Stub proxy code for server->agent communication."""
  158. def __init__(self):
  159. self._resource_rpc = resources_rpc.ResourcesPushRpcApi()
  160. @log_helpers.log_method_call
  161. def trunk_created(self, context, trunk):
  162. """Tell the agent about a trunk being created."""
  163. self._resource_rpc.push(context, [trunk], events.CREATED)
  164. @log_helpers.log_method_call
  165. def trunk_deleted(self, context, trunk):
  166. """Tell the agent about a trunk being deleted."""
  167. self._resource_rpc.push(context, [trunk], events.DELETED)
  168. @log_helpers.log_method_call
  169. def subports_added(self, context, subports):
  170. """Tell the agent about new subports to add."""
  171. self._resource_rpc.push(context, subports, events.CREATED)
  172. @log_helpers.log_method_call
  173. def subports_deleted(self, context, subports):
  174. """Tell the agent about existing subports to remove."""
  175. self._resource_rpc.push(context, subports, events.DELETED)