OpenStack Networking (Neutron)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

plugin.py 85KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797
  1. # Copyright (c) 2013 OpenStack Foundation
  2. # All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. import copy
  16. from eventlet import greenthread
  17. from neutron_lib.api import validators
  18. from neutron_lib import constants as const
  19. from neutron_lib import exceptions as exc
  20. from oslo_config import cfg
  21. from oslo_db import exception as os_db_exception
  22. from oslo_log import helpers as log_helpers
  23. from oslo_log import log
  24. from oslo_serialization import jsonutils
  25. from oslo_utils import excutils
  26. from oslo_utils import importutils
  27. from oslo_utils import uuidutils
  28. from sqlalchemy.orm import exc as sa_exc
  29. from neutron._i18n import _, _LE, _LI, _LW
  30. from neutron.agent import securitygroups_rpc as sg_rpc
  31. from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
  32. from neutron.api.rpc.handlers import dhcp_rpc
  33. from neutron.api.rpc.handlers import dvr_rpc
  34. from neutron.api.rpc.handlers import metadata_rpc
  35. from neutron.api.rpc.handlers import resources_rpc
  36. from neutron.api.rpc.handlers import securitygroups_rpc
  37. from neutron.api.v2 import attributes
  38. from neutron.callbacks import events
  39. from neutron.callbacks import exceptions
  40. from neutron.callbacks import registry
  41. from neutron.callbacks import resources
  42. from neutron.common import constants as n_const
  43. from neutron.common import rpc as n_rpc
  44. from neutron.common import topics
  45. from neutron.common import utils
  46. from neutron.db import address_scope_db
  47. from neutron.db import agents_db
  48. from neutron.db import agentschedulers_db
  49. from neutron.db import allowedaddresspairs_db as addr_pair_db
  50. from neutron.db import api as db_api
  51. from neutron.db import db_base_plugin_v2
  52. from neutron.db import dvr_mac_db
  53. from neutron.db import external_net_db
  54. from neutron.db import extradhcpopt_db
  55. from neutron.db.models import securitygroup as sg_models
  56. from neutron.db import models_v2
  57. from neutron.db import provisioning_blocks
  58. from neutron.db.quota import driver # noqa
  59. from neutron.db import securitygroups_rpc_base as sg_db_rpc
  60. from neutron.db import segments_db
  61. from neutron.db import subnet_service_type_db_models as service_type_db
  62. from neutron.db import vlantransparent_db
  63. from neutron.extensions import allowedaddresspairs as addr_pair
  64. from neutron.extensions import availability_zone as az_ext
  65. from neutron.extensions import extra_dhcp_opt as edo_ext
  66. from neutron.extensions import multiprovidernet as mpnet
  67. from neutron.extensions import portbindings
  68. from neutron.extensions import portsecurity as psec
  69. from neutron.extensions import providernet as provider
  70. from neutron.extensions import vlantransparent
  71. from neutron import manager
  72. from neutron.plugins.common import constants as service_constants
  73. from neutron.plugins.ml2.common import exceptions as ml2_exc
  74. from neutron.plugins.ml2 import config # noqa
  75. from neutron.plugins.ml2 import db
  76. from neutron.plugins.ml2 import driver_api as api
  77. from neutron.plugins.ml2 import driver_context
  78. from neutron.plugins.ml2.extensions import qos as qos_ext
  79. from neutron.plugins.ml2 import managers
  80. from neutron.plugins.ml2 import models
  81. from neutron.plugins.ml2 import rpc
  82. from neutron.quota import resource_registry
  83. from neutron.services.qos import qos_consts
  84. from neutron.services.segments import plugin as segments_plugin
  85. LOG = log.getLogger(__name__)
  86. MAX_BIND_TRIES = 10
  87. SERVICE_PLUGINS_REQUIRED_DRIVERS = {
  88. 'qos': [qos_ext.QOS_EXT_DRIVER_ALIAS]
  89. }
  90. class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
  91. dvr_mac_db.DVRDbMixin,
  92. external_net_db.External_net_db_mixin,
  93. sg_db_rpc.SecurityGroupServerRpcMixin,
  94. agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
  95. addr_pair_db.AllowedAddressPairsMixin,
  96. vlantransparent_db.Vlantransparent_db_mixin,
  97. extradhcpopt_db.ExtraDhcpOptMixin,
  98. address_scope_db.AddressScopeDbMixin,
  99. service_type_db.SubnetServiceTypeMixin):
  100. """Implement the Neutron L2 abstractions using modules.
  101. Ml2Plugin is a Neutron plugin based on separately extensible sets
  102. of network types and mechanisms for connecting to networks of
  103. those types. The network types and mechanisms are implemented as
  104. drivers loaded via Python entry points. Networks can be made up of
  105. multiple segments (not yet fully implemented).
  106. """
  107. # This attribute specifies whether the plugin supports or not
  108. # bulk/pagination/sorting operations. Name mangling is used in
  109. # order to ensure it is qualified by class
  110. __native_bulk_support = True
  111. __native_pagination_support = True
  112. __native_sorting_support = True
  113. # List of supported extensions
  114. _supported_extension_aliases = ["provider", "external-net", "binding",
  115. "quotas", "security-group", "agent",
  116. "dhcp_agent_scheduler",
  117. "multi-provider", "allowed-address-pairs",
  118. "extra_dhcp_opt", "subnet_allocation",
  119. "net-mtu", "vlan-transparent",
  120. "address-scope",
  121. "availability_zone",
  122. "network_availability_zone",
  123. "default-subnetpools",
  124. "subnet-service-types"]
  125. @property
  126. def supported_extension_aliases(self):
  127. if not hasattr(self, '_aliases'):
  128. aliases = self._supported_extension_aliases[:]
  129. aliases += self.extension_manager.extension_aliases()
  130. sg_rpc.disable_security_group_extension_by_config(aliases)
  131. vlantransparent.disable_extension_by_config(aliases)
  132. self._aliases = aliases
  133. return self._aliases
  134. @resource_registry.tracked_resources(
  135. network=models_v2.Network,
  136. port=models_v2.Port,
  137. subnet=models_v2.Subnet,
  138. subnetpool=models_v2.SubnetPool,
  139. security_group=sg_models.SecurityGroup,
  140. security_group_rule=sg_models.SecurityGroupRule)
  141. def __init__(self):
  142. # First load drivers, then initialize DB, then initialize drivers
  143. self.type_manager = managers.TypeManager()
  144. self.extension_manager = managers.ExtensionManager()
  145. self.mechanism_manager = managers.MechanismManager()
  146. super(Ml2Plugin, self).__init__()
  147. self.type_manager.initialize()
  148. self.extension_manager.initialize()
  149. self.mechanism_manager.initialize()
  150. registry.subscribe(self._port_provisioned, resources.PORT,
  151. provisioning_blocks.PROVISIONING_COMPLETE)
  152. registry.subscribe(self._handle_segment_change, resources.SEGMENT,
  153. events.PRECOMMIT_CREATE)
  154. registry.subscribe(self._handle_segment_change, resources.SEGMENT,
  155. events.PRECOMMIT_DELETE)
  156. registry.subscribe(self._handle_segment_change, resources.SEGMENT,
  157. events.AFTER_CREATE)
  158. registry.subscribe(self._handle_segment_change, resources.SEGMENT,
  159. events.AFTER_DELETE)
  160. registry.subscribe(self._subnet_delete_precommit_handler,
  161. resources.SUBNET, events.PRECOMMIT_DELETE)
  162. registry.subscribe(self._subnet_delete_after_delete_handler,
  163. resources.SUBNET, events.AFTER_DELETE)
  164. self._setup_dhcp()
  165. self._start_rpc_notifiers()
  166. self.add_agent_status_check_worker(self.agent_health_check)
  167. self.add_workers(self.mechanism_manager.get_workers())
  168. self._verify_service_plugins_requirements()
  169. LOG.info(_LI("Modular L2 Plugin initialization complete"))
  170. def _setup_rpc(self):
  171. """Initialize components to support agent communication."""
  172. self.endpoints = [
  173. rpc.RpcCallbacks(self.notifier, self.type_manager),
  174. securitygroups_rpc.SecurityGroupServerRpcCallback(),
  175. dvr_rpc.DVRServerRpcCallback(),
  176. dhcp_rpc.DhcpRpcCallback(),
  177. agents_db.AgentExtRpcCallback(),
  178. metadata_rpc.MetadataRpcCallback(),
  179. resources_rpc.ResourcesPullRpcCallback()
  180. ]
  181. def _setup_dhcp(self):
  182. """Initialize components to support DHCP."""
  183. self.network_scheduler = importutils.import_object(
  184. cfg.CONF.network_scheduler_driver
  185. )
  186. self.add_periodic_dhcp_agent_status_check()
  187. def _verify_service_plugins_requirements(self):
  188. for service_plugin in cfg.CONF.service_plugins:
  189. extension_drivers = SERVICE_PLUGINS_REQUIRED_DRIVERS.get(
  190. service_plugin, []
  191. )
  192. for extension_driver in extension_drivers:
  193. if extension_driver not in self.extension_manager.names():
  194. raise ml2_exc.ExtensionDriverNotFound(
  195. driver=extension_driver, service_plugin=service_plugin
  196. )
  197. def _port_provisioned(self, rtype, event, trigger, context, object_id,
  198. **kwargs):
  199. port_id = object_id
  200. port = db.get_port(context.session, port_id)
  201. if not port or not port.port_binding:
  202. LOG.debug("Port %s was deleted so its status cannot be updated.",
  203. port_id)
  204. return
  205. if port.port_binding.vif_type in (portbindings.VIF_TYPE_BINDING_FAILED,
  206. portbindings.VIF_TYPE_UNBOUND):
  207. # NOTE(kevinbenton): we hit here when a port is created without
  208. # a host ID and the dhcp agent notifies that its wiring is done
  209. LOG.debug("Port %s cannot update to ACTIVE because it "
  210. "is not bound.", port_id)
  211. return
  212. else:
  213. # port is bound, but we have to check for new provisioning blocks
  214. # one last time to detect the case where we were triggered by an
  215. # unbound port and the port became bound with new provisioning
  216. # blocks before 'get_port' was called above
  217. if provisioning_blocks.is_object_blocked(context, port_id,
  218. resources.PORT):
  219. LOG.debug("Port %s had new provisioning blocks added so it "
  220. "will not transition to active.", port_id)
  221. return
  222. self.update_port_status(context, port_id, const.PORT_STATUS_ACTIVE)
  223. @property
  224. def supported_qos_rule_types(self):
  225. return self.mechanism_manager.supported_qos_rule_types
  226. @log_helpers.log_method_call
  227. def _start_rpc_notifiers(self):
  228. """Initialize RPC notifiers for agents."""
  229. self.notifier = rpc.AgentNotifierApi(topics.AGENT)
  230. self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
  231. dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
  232. )
  233. @log_helpers.log_method_call
  234. def start_rpc_listeners(self):
  235. """Start the RPC loop to let the plugin communicate with agents."""
  236. self._setup_rpc()
  237. self.topic = topics.PLUGIN
  238. self.conn = n_rpc.create_connection()
  239. self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
  240. self.conn.create_consumer(
  241. topics.SERVER_RESOURCE_VERSIONS,
  242. [resources_rpc.ResourcesPushToServerRpcCallback()],
  243. fanout=True)
  244. # process state reports despite dedicated rpc workers
  245. self.conn.create_consumer(topics.REPORTS,
  246. [agents_db.AgentExtRpcCallback()],
  247. fanout=False)
  248. return self.conn.consume_in_threads()
  249. def start_rpc_state_reports_listener(self):
  250. self.conn_reports = n_rpc.create_connection()
  251. self.conn_reports.create_consumer(topics.REPORTS,
  252. [agents_db.AgentExtRpcCallback()],
  253. fanout=False)
  254. return self.conn_reports.consume_in_threads()
  255. def _filter_nets_provider(self, context, networks, filters):
  256. return [network
  257. for network in networks
  258. if self.type_manager.network_matches_filters(network, filters)
  259. ]
  260. def _check_mac_update_allowed(self, orig_port, port, binding):
  261. unplugged_types = (portbindings.VIF_TYPE_BINDING_FAILED,
  262. portbindings.VIF_TYPE_UNBOUND)
  263. new_mac = port.get('mac_address')
  264. mac_change = (new_mac is not None and
  265. orig_port['mac_address'] != new_mac)
  266. if (mac_change and binding.vif_type not in unplugged_types):
  267. raise exc.PortBound(port_id=orig_port['id'],
  268. vif_type=binding.vif_type,
  269. old_mac=orig_port['mac_address'],
  270. new_mac=port['mac_address'])
  271. return mac_change
  272. def _process_port_binding(self, mech_context, attrs):
  273. session = mech_context._plugin_context.session
  274. binding = mech_context._binding
  275. port = mech_context.current
  276. port_id = port['id']
  277. changes = False
  278. host = const.ATTR_NOT_SPECIFIED
  279. if attrs and portbindings.HOST_ID in attrs:
  280. host = attrs.get(portbindings.HOST_ID) or ''
  281. original_host = binding.host
  282. if (validators.is_attr_set(host) and
  283. original_host != host):
  284. binding.host = host
  285. changes = True
  286. vnic_type = attrs and attrs.get(portbindings.VNIC_TYPE)
  287. if (validators.is_attr_set(vnic_type) and
  288. binding.vnic_type != vnic_type):
  289. binding.vnic_type = vnic_type
  290. changes = True
  291. # treat None as clear of profile.
  292. profile = None
  293. if attrs and portbindings.PROFILE in attrs:
  294. profile = attrs.get(portbindings.PROFILE) or {}
  295. if profile not in (None, const.ATTR_NOT_SPECIFIED,
  296. self._get_profile(binding)):
  297. binding.profile = jsonutils.dumps(profile)
  298. if len(binding.profile) > models.BINDING_PROFILE_LEN:
  299. msg = _("binding:profile value too large")
  300. raise exc.InvalidInput(error_message=msg)
  301. changes = True
  302. # Unbind the port if needed.
  303. if changes:
  304. binding.vif_type = portbindings.VIF_TYPE_UNBOUND
  305. binding.vif_details = ''
  306. db.clear_binding_levels(session, port_id, original_host)
  307. mech_context._clear_binding_levels()
  308. port['status'] = const.PORT_STATUS_DOWN
  309. super(Ml2Plugin, self).update_port(
  310. mech_context._plugin_context, port_id,
  311. {attributes.PORT: {'status': const.PORT_STATUS_DOWN}})
  312. if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
  313. binding.vif_type = portbindings.VIF_TYPE_UNBOUND
  314. binding.vif_details = ''
  315. db.clear_binding_levels(session, port_id, original_host)
  316. mech_context._clear_binding_levels()
  317. binding.host = ''
  318. self._update_port_dict_binding(port, binding)
  319. # merging here brings binding changes into the session so they can be
  320. # committed since the binding attached to the context is detached from
  321. # the session
  322. mech_context._plugin_context.session.merge(binding)
  323. return changes
  324. def _bind_port_if_needed(self, context, allow_notify=False,
  325. need_notify=False):
  326. for count in range(1, MAX_BIND_TRIES + 1):
  327. if count > 1:
  328. # yield for binding retries so that we give other threads a
  329. # chance to do their work
  330. greenthread.sleep(0)
  331. # multiple attempts shouldn't happen very often so we log each
  332. # attempt after the 1st.
  333. LOG.info(_LI("Attempt %(count)s to bind port %(port)s"),
  334. {'count': count, 'port': context.current['id']})
  335. bind_context, need_notify, try_again = self._attempt_binding(
  336. context, need_notify)
  337. if count == MAX_BIND_TRIES or not try_again:
  338. if self._should_bind_port(context):
  339. # At this point, we attempted to bind a port and reached
  340. # its final binding state. Binding either succeeded or
  341. # exhausted all attempts, thus no need to try again.
  342. # Now, the port and its binding state should be committed.
  343. context, need_notify, try_again = (
  344. self._commit_port_binding(context, bind_context,
  345. need_notify, try_again))
  346. else:
  347. context = bind_context
  348. if not try_again:
  349. if allow_notify and need_notify:
  350. self._notify_port_updated(context)
  351. return context
  352. LOG.error(_LE("Failed to commit binding results for %(port)s "
  353. "after %(max)s tries"),
  354. {'port': context.current['id'], 'max': MAX_BIND_TRIES})
  355. return context
  356. def _should_bind_port(self, context):
  357. return (context._binding.host and context._binding.vif_type
  358. in (portbindings.VIF_TYPE_UNBOUND,
  359. portbindings.VIF_TYPE_BINDING_FAILED))
  360. def _attempt_binding(self, context, need_notify):
  361. try_again = False
  362. if self._should_bind_port(context):
  363. bind_context = self._bind_port(context)
  364. if bind_context.vif_type != portbindings.VIF_TYPE_BINDING_FAILED:
  365. # Binding succeeded. Suggest notifying of successful binding.
  366. need_notify = True
  367. else:
  368. # Current attempt binding failed, try to bind again.
  369. try_again = True
  370. context = bind_context
  371. return context, need_notify, try_again
  372. def _bind_port(self, orig_context):
  373. # Construct a new PortContext from the one from the previous
  374. # transaction.
  375. port = orig_context.current
  376. orig_binding = orig_context._binding
  377. new_binding = models.PortBinding(
  378. host=orig_binding.host,
  379. vnic_type=orig_binding.vnic_type,
  380. profile=orig_binding.profile,
  381. vif_type=portbindings.VIF_TYPE_UNBOUND,
  382. vif_details=''
  383. )
  384. self._update_port_dict_binding(port, new_binding)
  385. new_context = driver_context.PortContext(
  386. self, orig_context._plugin_context, port,
  387. orig_context.network.current, new_binding, None)
  388. # Attempt to bind the port and return the context with the
  389. # result.
  390. self.mechanism_manager.bind_port(new_context)
  391. return new_context
  392. def _commit_port_binding(self, orig_context, bind_context,
  393. need_notify, try_again):
  394. port_id = orig_context.current['id']
  395. plugin_context = orig_context._plugin_context
  396. session = plugin_context.session
  397. orig_binding = orig_context._binding
  398. new_binding = bind_context._binding
  399. # After we've attempted to bind the port, we begin a
  400. # transaction, get the current port state, and decide whether
  401. # to commit the binding results.
  402. with session.begin(subtransactions=True):
  403. # Get the current port state and build a new PortContext
  404. # reflecting this state as original state for subsequent
  405. # mechanism driver update_port_*commit() calls.
  406. port_db, cur_binding = db.get_locked_port_and_binding(session,
  407. port_id)
  408. # Since the mechanism driver bind_port() calls must be made
  409. # outside a DB transaction locking the port state, it is
  410. # possible (but unlikely) that the port's state could change
  411. # concurrently while these calls are being made. If another
  412. # thread or process succeeds in binding the port before this
  413. # thread commits its results, the already committed results are
  414. # used. If attributes such as binding:host_id, binding:profile,
  415. # or binding:vnic_type are updated concurrently, the try_again
  416. # flag is returned to indicate that the commit was unsuccessful.
  417. if not port_db:
  418. # The port has been deleted concurrently, so just
  419. # return the unbound result from the initial
  420. # transaction that completed before the deletion.
  421. LOG.debug("Port %s has been deleted concurrently", port_id)
  422. return orig_context, False, False
  423. oport = self._make_port_dict(port_db)
  424. port = self._make_port_dict(port_db)
  425. network = bind_context.network.current
  426. if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
  427. # REVISIT(rkukura): The PortBinding instance from the
  428. # ml2_port_bindings table, returned as cur_binding
  429. # from db.get_locked_port_and_binding() above, is
  430. # currently not used for DVR distributed ports, and is
  431. # replaced here with the DistributedPortBinding instance from
  432. # the ml2_distributed_port_bindings table specific to the host
  433. # on which the distributed port is being bound. It
  434. # would be possible to optimize this code to avoid
  435. # fetching the PortBinding instance in the DVR case,
  436. # and even to avoid creating the unused entry in the
  437. # ml2_port_bindings table. But the upcoming resolution
  438. # for bug 1367391 will eliminate the
  439. # ml2_distributed_port_bindings table, use the
  440. # ml2_port_bindings table to store non-host-specific
  441. # fields for both distributed and non-distributed
  442. # ports, and introduce a new ml2_port_binding_hosts
  443. # table for the fields that need to be host-specific
  444. # in the distributed case. Since the PortBinding
  445. # instance will then be needed, it does not make sense
  446. # to optimize this code to avoid fetching it.
  447. cur_binding = db.get_distributed_port_binding_by_host(
  448. session, port_id, orig_binding.host)
  449. cur_context = driver_context.PortContext(
  450. self, plugin_context, port, network, cur_binding, None,
  451. original_port=oport)
  452. # Commit our binding results only if port has not been
  453. # successfully bound concurrently by another thread or
  454. # process and no binding inputs have been changed.
  455. commit = ((cur_binding.vif_type in
  456. [portbindings.VIF_TYPE_UNBOUND,
  457. portbindings.VIF_TYPE_BINDING_FAILED]) and
  458. orig_binding.host == cur_binding.host and
  459. orig_binding.vnic_type == cur_binding.vnic_type and
  460. orig_binding.profile == cur_binding.profile)
  461. if commit:
  462. # Update the port's binding state with our binding
  463. # results.
  464. cur_binding.vif_type = new_binding.vif_type
  465. cur_binding.vif_details = new_binding.vif_details
  466. db.clear_binding_levels(session, port_id, cur_binding.host)
  467. db.set_binding_levels(session, bind_context._binding_levels)
  468. # refresh context with a snapshot of updated state
  469. cur_context._binding = copy.deepcopy(cur_binding)
  470. cur_context._binding_levels = bind_context._binding_levels
  471. # Update PortContext's port dictionary to reflect the
  472. # updated binding state.
  473. self._update_port_dict_binding(port, cur_binding)
  474. # Update the port status if requested by the bound driver.
  475. if (bind_context._binding_levels and
  476. bind_context._new_port_status):
  477. port_db.status = bind_context._new_port_status
  478. port['status'] = bind_context._new_port_status
  479. # Call the mechanism driver precommit methods, commit
  480. # the results, and call the postcommit methods.
  481. self.mechanism_manager.update_port_precommit(cur_context)
  482. if commit:
  483. # Continue, using the port state as of the transaction that
  484. # just finished, whether that transaction committed new
  485. # results or discovered concurrent port state changes.
  486. # Also, Trigger notification for successful binding commit.
  487. self.mechanism_manager.update_port_postcommit(cur_context)
  488. need_notify = True
  489. try_again = False
  490. else:
  491. try_again = True
  492. return cur_context, need_notify, try_again
  493. def _update_port_dict_binding(self, port, binding):
  494. port[portbindings.VNIC_TYPE] = binding.vnic_type
  495. port[portbindings.PROFILE] = self._get_profile(binding)
  496. if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
  497. port[portbindings.HOST_ID] = ''
  498. port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_DISTRIBUTED
  499. port[portbindings.VIF_DETAILS] = {}
  500. else:
  501. port[portbindings.HOST_ID] = binding.host
  502. port[portbindings.VIF_TYPE] = binding.vif_type
  503. port[portbindings.VIF_DETAILS] = self._get_vif_details(binding)
  504. def _get_vif_details(self, binding):
  505. if binding.vif_details:
  506. try:
  507. return jsonutils.loads(binding.vif_details)
  508. except Exception:
  509. LOG.error(_LE("Serialized vif_details DB value '%(value)s' "
  510. "for port %(port)s is invalid"),
  511. {'value': binding.vif_details,
  512. 'port': binding.port_id})
  513. return {}
  514. def _get_profile(self, binding):
  515. if binding.profile:
  516. try:
  517. return jsonutils.loads(binding.profile)
  518. except Exception:
  519. LOG.error(_LE("Serialized profile DB value '%(value)s' for "
  520. "port %(port)s is invalid"),
  521. {'value': binding.profile,
  522. 'port': binding.port_id})
  523. return {}
  524. def _ml2_extend_port_dict_binding(self, port_res, port_db):
  525. # None when called during unit tests for other plugins.
  526. if port_db.port_binding:
  527. self._update_port_dict_binding(port_res, port_db.port_binding)
  528. db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
  529. attributes.PORTS, ['_ml2_extend_port_dict_binding'])
  530. # Register extend dict methods for network and port resources.
  531. # Each mechanism driver that supports extend attribute for the resources
  532. # can add those attribute to the result.
  533. db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
  534. attributes.NETWORKS, ['_ml2_md_extend_network_dict'])
  535. db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
  536. attributes.PORTS, ['_ml2_md_extend_port_dict'])
  537. db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
  538. attributes.SUBNETS, ['_ml2_md_extend_subnet_dict'])
  539. def _ml2_md_extend_network_dict(self, result, netdb):
  540. session = db_api.get_session()
  541. with session.begin(subtransactions=True):
  542. self.extension_manager.extend_network_dict(session, netdb, result)
  543. def _ml2_md_extend_port_dict(self, result, portdb):
  544. session = db_api.get_session()
  545. with session.begin(subtransactions=True):
  546. self.extension_manager.extend_port_dict(session, portdb, result)
  547. def _ml2_md_extend_subnet_dict(self, result, subnetdb):
  548. session = db_api.get_session()
  549. with session.begin(subtransactions=True):
  550. self.extension_manager.extend_subnet_dict(
  551. session, subnetdb, result)
  552. # Note - The following hook methods have "ml2" in their names so
  553. # that they are not called twice during unit tests due to global
  554. # registration of hooks in portbindings_db.py used by other
  555. # plugins.
  556. def _ml2_port_result_filter_hook(self, query, filters):
  557. values = filters and filters.get(portbindings.HOST_ID, [])
  558. if not values:
  559. return query
  560. bind_criteria = models.PortBinding.host.in_(values)
  561. return query.filter(models_v2.Port.port_binding.has(bind_criteria))
  562. db_base_plugin_v2.NeutronDbPluginV2.register_model_query_hook(
  563. models_v2.Port,
  564. "ml2_port_bindings",
  565. None,
  566. None,
  567. '_ml2_port_result_filter_hook')
  568. def _notify_port_updated(self, mech_context):
  569. port = mech_context.current
  570. segment = mech_context.bottom_bound_segment
  571. if not segment:
  572. # REVISIT(rkukura): This should notify agent to unplug port
  573. network = mech_context.network.current
  574. LOG.debug("In _notify_port_updated(), no bound segment for "
  575. "port %(port_id)s on network %(network_id)s",
  576. {'port_id': port['id'], 'network_id': network['id']})
  577. return
  578. self.notifier.port_update(mech_context._plugin_context, port,
  579. segment[api.NETWORK_TYPE],
  580. segment[api.SEGMENTATION_ID],
  581. segment[api.PHYSICAL_NETWORK])
  582. def _delete_objects(self, context, resource, objects):
  583. delete_op = getattr(self, 'delete_%s' % resource)
  584. for obj in objects:
  585. try:
  586. delete_op(context, obj['result']['id'])
  587. except KeyError:
  588. LOG.exception(_LE("Could not find %s to delete."),
  589. resource)
  590. except Exception:
  591. LOG.exception(_LE("Could not delete %(res)s %(id)s."),
  592. {'res': resource,
  593. 'id': obj['result']['id']})
  594. def _create_bulk_ml2(self, resource, context, request_items):
  595. objects = []
  596. collection = "%ss" % resource
  597. items = request_items[collection]
  598. try:
  599. with context.session.begin(subtransactions=True):
  600. obj_creator = getattr(self, '_create_%s_db' % resource)
  601. for item in items:
  602. attrs = item[resource]
  603. result, mech_context = obj_creator(context, item)
  604. objects.append({'mech_context': mech_context,
  605. 'result': result,
  606. 'attributes': attrs})
  607. except Exception as e:
  608. with excutils.save_and_reraise_exception():
  609. utils.attach_exc_details(
  610. e, _LE("An exception occurred while creating "
  611. "the %(resource)s:%(item)s"),
  612. {'resource': resource, 'item': item})
  613. try:
  614. postcommit_op = getattr(self.mechanism_manager,
  615. 'create_%s_postcommit' % resource)
  616. for obj in objects:
  617. postcommit_op(obj['mech_context'])
  618. return objects
  619. except ml2_exc.MechanismDriverError:
  620. with excutils.save_and_reraise_exception():
  621. resource_ids = [res['result']['id'] for res in objects]
  622. LOG.exception(_LE("mechanism_manager.create_%(res)s"
  623. "_postcommit failed for %(res)s: "
  624. "'%(failed_id)s'. Deleting "
  625. "%(res)ss %(resource_ids)s"),
  626. {'res': resource,
  627. 'failed_id': obj['result']['id'],
  628. 'resource_ids': ', '.join(resource_ids)})
  629. self._delete_objects(context, resource, objects)
  630. def _get_network_mtu(self, network):
  631. mtus = []
  632. try:
  633. segments = network[mpnet.SEGMENTS]
  634. except KeyError:
  635. segments = [network]
  636. for s in segments:
  637. segment_type = s[provider.NETWORK_TYPE]
  638. try:
  639. type_driver = self.type_manager.drivers[segment_type].obj
  640. except KeyError:
  641. # NOTE(ihrachys) This can happen when type driver is not loaded
  642. # for an existing segment, or simply when the network has no
  643. # segments at the specific time this is computed.
  644. # In the former case, while it's probably an indication of
  645. # a bad setup, it's better to be safe than sorry here. Also,
  646. # several unit tests use non-existent driver types that may
  647. # trigger the exception here.
  648. if segment_type and s[provider.SEGMENTATION_ID]:
  649. LOG.warning(
  650. _LW("Failed to determine MTU for segment "
  651. "%(segment_type)s:%(segment_id)s; network "
  652. "%(network_id)s MTU calculation may be not "
  653. "accurate"),
  654. {
  655. 'segment_type': segment_type,
  656. 'segment_id': s[provider.SEGMENTATION_ID],
  657. 'network_id': network['id'],
  658. }
  659. )
  660. else:
  661. mtu = type_driver.get_mtu(s[provider.PHYSICAL_NETWORK])
  662. # Some drivers, like 'local', may return None; the assumption
  663. # then is that for the segment type, MTU has no meaning or
  664. # unlimited, and so we should then ignore those values.
  665. if mtu:
  666. mtus.append(mtu)
  667. return min(mtus) if mtus else 0
  668. def _create_network_db(self, context, network):
  669. net_data = network[attributes.NETWORK]
  670. tenant_id = net_data['tenant_id']
  671. session = context.session
  672. with session.begin(subtransactions=True):
  673. self._ensure_default_security_group(context, tenant_id)
  674. net_db = self.create_network_db(context, network)
  675. result = self._make_network_dict(net_db, process_extensions=False,
  676. context=context)
  677. self.extension_manager.process_create_network(context, net_data,
  678. result)
  679. self._process_l3_create(context, result, net_data)
  680. net_data['id'] = result['id']
  681. self.type_manager.create_network_segments(context, net_data,
  682. tenant_id)
  683. self.type_manager.extend_network_dict_provider(context, result)
  684. # Update the transparent vlan if configured
  685. if utils.is_extension_supported(self, 'vlan-transparent'):
  686. vlt = vlantransparent.get_vlan_transparent(net_data)
  687. net_db['vlan_transparent'] = vlt
  688. result['vlan_transparent'] = vlt
  689. mech_context = driver_context.NetworkContext(self, context,
  690. result)
  691. self.mechanism_manager.create_network_precommit(mech_context)
  692. result[api.MTU] = self._get_network_mtu(result)
  693. if az_ext.AZ_HINTS in net_data:
  694. self.validate_availability_zones(context, 'network',
  695. net_data[az_ext.AZ_HINTS])
  696. az_hints = az_ext.convert_az_list_to_string(
  697. net_data[az_ext.AZ_HINTS])
  698. net_db[az_ext.AZ_HINTS] = az_hints
  699. result[az_ext.AZ_HINTS] = az_hints
  700. self._apply_dict_extend_functions('networks', result, net_db)
  701. return result, mech_context
  702. @utils.transaction_guard
  703. @db_api.retry_if_session_inactive()
  704. def create_network(self, context, network):
  705. result, mech_context = self._create_network_db(context, network)
  706. kwargs = {'context': context, 'network': result}
  707. registry.notify(resources.NETWORK, events.AFTER_CREATE, self, **kwargs)
  708. try:
  709. self.mechanism_manager.create_network_postcommit(mech_context)
  710. except ml2_exc.MechanismDriverError:
  711. with excutils.save_and_reraise_exception():
  712. LOG.error(_LE("mechanism_manager.create_network_postcommit "
  713. "failed, deleting network '%s'"), result['id'])
  714. self.delete_network(context, result['id'])
  715. return result
  716. @utils.transaction_guard
  717. @db_api.retry_if_session_inactive()
  718. def create_network_bulk(self, context, networks):
  719. objects = self._create_bulk_ml2(attributes.NETWORK, context, networks)
  720. return [obj['result'] for obj in objects]
  721. @utils.transaction_guard
  722. @db_api.retry_if_session_inactive()
  723. def update_network(self, context, id, network):
  724. net_data = network[attributes.NETWORK]
  725. provider._raise_if_updates_provider_attributes(net_data)
  726. session = context.session
  727. with session.begin(subtransactions=True):
  728. original_network = super(Ml2Plugin, self).get_network(context, id)
  729. updated_network = super(Ml2Plugin, self).update_network(context,
  730. id,
  731. network)
  732. self.extension_manager.process_update_network(context, net_data,
  733. updated_network)
  734. self._process_l3_update(context, updated_network, net_data)
  735. self.type_manager.extend_network_dict_provider(context,
  736. updated_network)
  737. # ToDO(QoS): This would change once EngineFacade moves out
  738. db_network = self._get_network(context, id)
  739. # Expire the db_network in current transaction, so that the join
  740. # relationship can be updated.
  741. context.session.expire(db_network)
  742. updated_network = self.get_network(context, id)
  743. # TODO(QoS): Move out to the extension framework somehow.
  744. need_network_update_notify = (
  745. qos_consts.QOS_POLICY_ID in net_data and
  746. original_network[qos_consts.QOS_POLICY_ID] !=
  747. updated_network[qos_consts.QOS_POLICY_ID])
  748. mech_context = driver_context.NetworkContext(
  749. self, context, updated_network,
  750. original_network=original_network)
  751. self.mechanism_manager.update_network_precommit(mech_context)
  752. # TODO(apech) - handle errors raised by update_network, potentially
  753. # by re-calling update_network with the previous attributes. For
  754. # now the error is propagated to the caller, which is expected to
  755. # either undo/retry the operation or delete the resource.
  756. kwargs = {'context': context, 'network': updated_network,
  757. 'original_network': original_network}
  758. registry.notify(resources.NETWORK, events.AFTER_UPDATE, self, **kwargs)
  759. self.mechanism_manager.update_network_postcommit(mech_context)
  760. if need_network_update_notify:
  761. self.notifier.network_update(context, updated_network)
  762. return updated_network
  763. @db_api.retry_if_session_inactive()
  764. def get_network(self, context, id, fields=None):
  765. session = context.session
  766. with session.begin(subtransactions=True):
  767. result = super(Ml2Plugin, self).get_network(context, id, None)
  768. self.type_manager.extend_network_dict_provider(context, result)
  769. result[api.MTU] = self._get_network_mtu(result)
  770. return self._fields(result, fields)
  771. @db_api.retry_if_session_inactive()
  772. def get_networks(self, context, filters=None, fields=None,
  773. sorts=None, limit=None, marker=None, page_reverse=False):
  774. session = context.session
  775. with session.begin(subtransactions=True):
  776. nets = super(Ml2Plugin,
  777. self).get_networks(context, filters, None, sorts,
  778. limit, marker, page_reverse)
  779. self.type_manager.extend_networks_dict_provider(context, nets)
  780. nets = self._filter_nets_provider(context, nets, filters)
  781. for net in nets:
  782. net[api.MTU] = self._get_network_mtu(net)
  783. return [self._fields(net, fields) for net in nets]
  784. def _delete_ports(self, context, port_ids):
  785. for port_id in port_ids:
  786. try:
  787. self.delete_port(context, port_id)
  788. except (exc.PortNotFound, sa_exc.ObjectDeletedError):
  789. # concurrent port deletion can be performed by
  790. # release_dhcp_port caused by concurrent subnet_delete
  791. LOG.info(_LI("Port %s was deleted concurrently"), port_id)
  792. except Exception as e:
  793. with excutils.save_and_reraise_exception():
  794. utils.attach_exc_details(
  795. e,
  796. _LE("Exception auto-deleting port %s"), port_id)
  797. def _delete_subnets(self, context, subnet_ids):
  798. for subnet_id in subnet_ids:
  799. try:
  800. self.delete_subnet(context, subnet_id)
  801. except (exc.SubnetNotFound, sa_exc.ObjectDeletedError):
  802. LOG.info(_LI("Subnet %s was deleted concurrently"),
  803. subnet_id)
  804. except Exception as e:
  805. with excutils.save_and_reraise_exception():
  806. utils.attach_exc_details(
  807. e,
  808. _LE("Exception auto-deleting subnet %s"), subnet_id)
  809. @utils.transaction_guard
  810. @db_api.retry_if_session_inactive()
  811. def delete_network(self, context, id):
  812. # REVISIT(rkukura) The super(Ml2Plugin, self).delete_network()
  813. # function is not used because it auto-deletes ports and
  814. # subnets from the DB without invoking the derived class's
  815. # delete_port() or delete_subnet(), preventing mechanism
  816. # drivers from being called. This approach should be revisited
  817. # when the API layer is reworked during icehouse.
  818. LOG.debug("Deleting network %s", id)
  819. session = context.session
  820. while True:
  821. # NOTE(kevinbenton): this loop keeps db objects in scope
  822. # so we must expire them or risk stale reads.
  823. # see bug/1623990
  824. session.expire_all()
  825. try:
  826. # REVISIT: Serialize this operation with a semaphore
  827. # to prevent deadlock waiting to acquire a DB lock
  828. # held by another thread in the same process, leading
  829. # to 'lock wait timeout' errors.
  830. #
  831. # Process L3 first, since, depending on the L3 plugin, it may
  832. # involve sending RPC notifications, and/or calling delete_port
  833. # on this plugin.
  834. # Additionally, a rollback may not be enough to undo the
  835. # deletion of a floating IP with certain L3 backends.
  836. self._process_l3_delete(context, id)
  837. # Using query().with_lockmode isn't necessary. Foreign-key
  838. # constraints prevent deletion if concurrent creation happens.
  839. with session.begin(subtransactions=True):
  840. # Get ports to auto-delete.
  841. ports = (session.query(models_v2.Port).
  842. enable_eagerloads(False).
  843. filter_by(network_id=id).all())
  844. LOG.debug("Ports to auto-delete: %s", ports)
  845. only_auto_del = all(p.device_owner
  846. in db_base_plugin_v2.
  847. AUTO_DELETE_PORT_OWNERS
  848. for p in ports)
  849. if not only_auto_del:
  850. LOG.debug("Tenant-owned ports exist")
  851. raise exc.NetworkInUse(net_id=id)
  852. # Get subnets to auto-delete.
  853. subnets = (session.query(models_v2.Subnet).
  854. enable_eagerloads(False).
  855. filter_by(network_id=id).all())
  856. LOG.debug("Subnets to auto-delete: %s", subnets)
  857. if not (ports or subnets):
  858. network = self.get_network(context, id)
  859. mech_context = driver_context.NetworkContext(self,
  860. context,
  861. network)
  862. self.mechanism_manager.delete_network_precommit(
  863. mech_context)
  864. registry.notify(resources.NETWORK,
  865. events.PRECOMMIT_DELETE,
  866. self,
  867. context=context,
  868. network_id=id)
  869. record = self._get_network(context, id)
  870. LOG.debug("Deleting network record %s", record)
  871. session.delete(record)
  872. # The segment records are deleted via cascade from the
  873. # network record, so explicit removal is not necessary.
  874. LOG.debug("Committing transaction")
  875. break
  876. port_ids = [port.id for port in ports]
  877. subnet_ids = [subnet.id for subnet in subnets]
  878. except os_db_exception.DBDuplicateEntry:
  879. LOG.warning(_LW("A concurrent port creation has "
  880. "occurred"))
  881. continue
  882. self._delete_ports(context, port_ids)
  883. self._delete_subnets(context, subnet_ids)
  884. kwargs = {'context': context, 'network': network}
  885. registry.notify(resources.NETWORK, events.AFTER_DELETE, self, **kwargs)
  886. try:
  887. self.mechanism_manager.delete_network_postcommit(mech_context)
  888. except ml2_exc.MechanismDriverError:
  889. # TODO(apech) - One or more mechanism driver failed to
  890. # delete the network. Ideally we'd notify the caller of
  891. # the fact that an error occurred.
  892. LOG.error(_LE("mechanism_manager.delete_network_postcommit"
  893. " failed"))
  894. self.notifier.network_delete(context, id)
  895. def _create_subnet_db(self, context, subnet):
  896. session = context.session
  897. # FIXME(kevinbenton): this is a mess because create_subnet ends up
  898. # calling _update_router_gw_ports which ends up calling update_port
  899. # on a router port inside this transaction. Need to find a way to
  900. # separate router updates from the subnet update operation.
  901. setattr(context, 'GUARD_TRANSACTION', False)
  902. with session.begin(subtransactions=True):
  903. result = super(Ml2Plugin, self).create_subnet(context, subnet)
  904. self.extension_manager.process_create_subnet(
  905. context, subnet[attributes.SUBNET], result)
  906. network = self.get_network(context, result['network_id'])
  907. mech_context = driver_context.SubnetContext(self, context,
  908. result, network)
  909. self.mechanism_manager.create_subnet_precommit(mech_context)
  910. return result, mech_context
  911. @utils.transaction_guard
  912. @db_api.retry_if_session_inactive()
  913. def create_subnet(self, context, subnet):
  914. result, mech_context = self._create_subnet_db(context, subnet)
  915. kwargs = {'context': context, 'subnet': result}
  916. registry.notify(resources.SUBNET, events.AFTER_CREATE, self, **kwargs)
  917. try:
  918. self.mechanism_manager.create_subnet_postcommit(mech_context)
  919. except ml2_exc.MechanismDriverError:
  920. with excutils.save_and_reraise_exception():
  921. LOG.error(_LE("mechanism_manager.create_subnet_postcommit "
  922. "failed, deleting subnet '%s'"), result['id'])
  923. self.delete_subnet(context, result['id'])
  924. return result
  925. @utils.transaction_guard
  926. @db_api.retry_if_session_inactive()
  927. def create_subnet_bulk(self, context, subnets):
  928. objects = self._create_bulk_ml2(attributes.SUBNET, context, subnets)
  929. return [obj['result'] for obj in objects]
  930. @utils.transaction_guard
  931. @db_api.retry_if_session_inactive()
  932. def update_subnet(self, context, id, subnet):
  933. session = context.session
  934. with session.begin(subtransactions=True):
  935. original_subnet = super(Ml2Plugin, self).get_subnet(context, id)
  936. updated_subnet = super(Ml2Plugin, self).update_subnet(
  937. context, id, subnet)
  938. self.extension_manager.process_update_subnet(
  939. context, subnet[attributes.SUBNET], updated_subnet)
  940. updated_subnet = self.get_subnet(context, id)
  941. network = self.get_network(context, updated_subnet['network_id'])
  942. mech_context = driver_context.SubnetContext(
  943. self, context, updated_subnet, network,
  944. original_subnet=original_subnet)
  945. self.mechanism_manager.update_subnet_precommit(mech_context)
  946. # TODO(apech) - handle errors raised by update_subnet, potentially
  947. # by re-calling update_subnet with the previous attributes. For
  948. # now the error is propagated to the caller, which is expected to
  949. # either undo/retry the operation or delete the resource.
  950. kwargs = {'context': context, 'subnet': updated_subnet,
  951. 'original_subnet': original_subnet}
  952. registry.notify(resources.SUBNET, events.AFTER_UPDATE, self, **kwargs)
  953. self.mechanism_manager.update_subnet_postcommit(mech_context)
  954. return updated_subnet
  955. @utils.transaction_guard
  956. def delete_subnet(self, context, id):
  957. # the only purpose of this override is to protect this from being
  958. # called inside of a transaction.
  959. return super(Ml2Plugin, self).delete_subnet(context, id)
  960. def _subnet_delete_precommit_handler(self, rtype, event, trigger,
  961. context, subnet_id, **kwargs):
  962. record = self._get_subnet(context, subnet_id)
  963. subnet = self._make_subnet_dict(record, context=context)
  964. network = self.get_network(context, subnet['network_id'])
  965. mech_context = driver_context.SubnetContext(self, context,
  966. subnet, network)
  967. # TODO(kevinbenton): move this mech context into something like
  968. # a 'delete context' so it's not polluting the real context object
  969. setattr(context, '_mech_context', mech_context)
  970. self.mechanism_manager.delete_subnet_precommit(mech_context)
  971. def _subnet_delete_after_delete_handler(self, rtype, event, trigger,
  972. context, subnet, **kwargs):
  973. try:
  974. self.mechanism_manager.delete_subnet_postcommit(
  975. context._mech_context)
  976. except ml2_exc.MechanismDriverError:
  977. # TODO(apech) - One or more mechanism driver failed to
  978. # delete the subnet. Ideally we'd notify the caller of
  979. # the fact that an error occurred.
  980. LOG.error(_LE("mechanism_manager.delete_subnet_postcommit failed"))
  981. # TODO(yalei) - will be simplified after security group and address pair be
  982. # converted to ext driver too.
  983. def _portsec_ext_port_create_processing(self, context, port_data, port):
  984. attrs = port[attributes.PORT]
  985. port_security = ((port_data.get(psec.PORTSECURITY) is None) or
  986. port_data[psec.PORTSECURITY])
  987. # allowed address pair checks
  988. if self._check_update_has_allowed_address_pairs(port):
  989. if not port_security:
  990. raise addr_pair.AddressPairAndPortSecurityRequired()
  991. else:
  992. # remove ATTR_NOT_SPECIFIED
  993. attrs[addr_pair.ADDRESS_PAIRS] = []
  994. if port_security:
  995. self._ensure_default_security_group_on_port(context, port)
  996. elif self._check_update_has_security_groups(port):
  997. raise psec.PortSecurityAndIPRequiredForSecurityGroups()
  998. def _setup_dhcp_agent_provisioning_component(self, context, port):
  999. # NOTE(kevinbenton): skipping network ports is a workaround for
  1000. # the fact that we don't issue dhcp notifications from internal
  1001. # port creation like router ports and dhcp ports via RPC
  1002. if utils.is_port_trusted(port):
  1003. return
  1004. subnet_ids = [f['subnet_id'] for f in port['fixed_ips']]
  1005. if (db.is_dhcp_active_on_any_subnet(context, subnet_ids) and
  1006. any(self.get_configuration_dict(a).get('notifies_port_ready')
  1007. for a in self.get_dhcp_agents_hosting_networks(
  1008. context, [port['network_id']]))):
  1009. # at least one of the agents will tell us when the dhcp config
  1010. # is ready so we setup a provisioning component to prevent the
  1011. # port from going ACTIVE until a dhcp_ready_on_port
  1012. # notification is received.
  1013. provisioning_blocks.add_provisioning_component(
  1014. context, port['id'], resources.PORT,
  1015. provisioning_blocks.DHCP_ENTITY)
  1016. else:
  1017. provisioning_blocks.remove_provisioning_component(
  1018. context, port['id'], resources.PORT,
  1019. provisioning_blocks.DHCP_ENTITY)
  1020. def _create_port_db(self, context, port):
  1021. attrs = port[attributes.PORT]
  1022. if not attrs.get('status'):
  1023. attrs['status'] = const.PORT_STATUS_DOWN
  1024. session = context.session
  1025. with session.begin(subtransactions=True):
  1026. dhcp_opts = attrs.get(edo_ext.EXTRADHCPOPTS, [])
  1027. port_db = self.create_port_db(context, port)
  1028. result = self._make_port_dict(port_db, process_extensions=False)
  1029. self.extension_manager.process_create_port(context, attrs, result)
  1030. self._portsec_ext_port_create_processing(context, result, port)
  1031. # sgids must be got after portsec checked with security group
  1032. sgids = self._get_security_groups_on_port(context, port)
  1033. self._process_port_create_security_group(context, result, sgids)
  1034. network = self.get_network(context, result['network_id'])
  1035. binding = db.add_port_binding(session, result['id'])
  1036. mech_context = driver_context.PortContext(self, context, result,
  1037. network, binding, None)
  1038. self._process_port_binding(mech_context, attrs)
  1039. result[addr_pair.ADDRESS_PAIRS] = (
  1040. self._process_create_allowed_address_pairs(
  1041. context, result,
  1042. attrs.get(addr_pair.ADDRESS_PAIRS)))
  1043. self._process_port_create_extra_dhcp_opts(context, result,
  1044. dhcp_opts)
  1045. self.mechanism_manager.create_port_precommit(mech_context)
  1046. self._setup_dhcp_agent_provisioning_component(context, result)
  1047. self._apply_dict_extend_functions('ports', result, port_db)
  1048. return result, mech_context
  1049. @utils.transaction_guard
  1050. @db_api.retry_if_session_inactive()
  1051. def create_port(self, context, port):
  1052. result, mech_context = self._create_port_db(context, port)
  1053. # notify any plugin that is interested in port create events
  1054. kwargs = {'context': context, 'port': result}
  1055. registry.notify(resources.PORT, events.AFTER_CREATE, self, **kwargs)
  1056. try:
  1057. self.mechanism_manager.create_port_postcommit(mech_context)
  1058. except ml2_exc.MechanismDriverError:
  1059. with excutils.save_and_reraise_exception():
  1060. LOG.error(_LE("mechanism_manager.create_port_postcommit "
  1061. "failed, deleting port '%s'"), result['id'])
  1062. self.delete_port(context, result['id'], l3_port_check=False)
  1063. # REVISIT(rkukura): Is there any point in calling this before
  1064. # a binding has been successfully established?
  1065. self.notify_security_groups_member_updated(context, result)
  1066. try:
  1067. bound_context = self._bind_port_if_needed(mech_context)
  1068. except os_db_exception.DBDeadlock:
  1069. # bind port can deadlock in normal operation so we just cleanup
  1070. # the port and let the API retry
  1071. with excutils.save_and_reraise_exception():
  1072. LOG.debug("_bind_port_if_needed deadlock, deleting port %s",
  1073. result['id'])
  1074. self.delete_port(context, result['id'])
  1075. except ml2_exc.MechanismDriverError:
  1076. with excutils.save_and_reraise_exception():
  1077. LOG.error(_LE("_bind_port_if_needed "
  1078. "failed, deleting port '%s'"), result['id'])
  1079. self.delete_port(context, result['id'], l3_port_check=False)
  1080. return bound_context.current
  1081. @utils.transaction_guard
  1082. @db_api.retry_if_session_inactive()
  1083. def create_port_bulk(self, context, ports):
  1084. objects = self._create_bulk_ml2(attributes.PORT, context, ports)
  1085. # REVISIT(rkukura): Is there any point in calling this before
  1086. # a binding has been successfully established?
  1087. results = [obj['result'] for obj in objects]
  1088. self.notify_security_groups_member_updated_bulk(context, results)
  1089. for obj in objects:
  1090. attrs = obj['attributes']
  1091. if attrs and attrs.get(portbindings.HOST_ID):
  1092. kwargs = {'context': context, 'port': obj['result']}
  1093. registry.notify(
  1094. resources.PORT, events.AFTER_CREATE, self, **kwargs)
  1095. try:
  1096. for obj in objects:
  1097. obj['bound_context'] = self._bind_port_if_needed(
  1098. obj['mech_context'])
  1099. return [obj['bound_context'].current for obj in objects]
  1100. except ml2_exc.MechanismDriverError:
  1101. with excutils.save_and_reraise_exception():
  1102. resource_ids = [res['result']['id'] for res in objects]
  1103. LOG.error(_LE("_bind_port_if_needed failed. "
  1104. "Deleting all ports from create bulk '%s'"),
  1105. resource_ids)
  1106. self._delete_objects(context, attributes.PORT, objects)
  1107. # TODO(yalei) - will be simplified after security group and address pair be
  1108. # converted to ext driver too.
  1109. def _portsec_ext_port_update_processing(self, updated_port, context, port,
  1110. id):
  1111. port_security = ((updated_port.get(psec.PORTSECURITY) is None) or
  1112. updated_port[psec.PORTSECURITY])
  1113. if port_security:
  1114. return
  1115. # check the address-pairs
  1116. if self._check_update_has_allowed_address_pairs(port):
  1117. # has address pairs in request
  1118. raise addr_pair.AddressPairAndPortSecurityRequired()
  1119. elif (not
  1120. self._check_update_deletes_allowed_address_pairs(port)):
  1121. # not a request for deleting the address-pairs
  1122. updated_port[addr_pair.ADDRESS_PAIRS] = (
  1123. self.get_allowed_address_pairs(context, id))
  1124. # check if address pairs has been in db, if address pairs could
  1125. # be put in extension driver, we can refine here.
  1126. if updated_port[addr_pair.ADDRESS_PAIRS]:
  1127. raise addr_pair.AddressPairAndPortSecurityRequired()
  1128. # checks if security groups were updated adding/modifying
  1129. # security groups, port security is set
  1130. if self._check_update_has_security_groups(port):
  1131. raise psec.PortSecurityAndIPRequiredForSecurityGroups()
  1132. elif (not
  1133. self._check_update_deletes_security_groups(port)):
  1134. if not utils.is_extension_supported(self, 'security-group'):
  1135. return
  1136. # Update did not have security groups passed in. Check
  1137. # that port does not have any security groups already on it.
  1138. filters = {'port_id': [id]}
  1139. security_groups = (
  1140. super(Ml2Plugin, self)._get_port_security_group_bindings(
  1141. context, filters)
  1142. )
  1143. if security_groups:
  1144. raise psec.PortSecurityPortHasSecurityGroup()
  1145. @utils.transaction_guard
  1146. @db_api.retry_if_session_inactive()
  1147. def update_port(self, context, id, port):
  1148. attrs = port[attributes.PORT]
  1149. need_port_update_notify = False
  1150. session = context.session
  1151. bound_mech_contexts = []
  1152. with session.begin(subtransactions=True):
  1153. port_db, binding = db.get_locked_port_and_binding(session, id)
  1154. if not port_db:
  1155. raise exc.PortNotFound(port_id=id)
  1156. mac_address_updated = self._check_mac_update_allowed(
  1157. port_db, attrs, binding)
  1158. need_port_update_notify |= mac_address_updated
  1159. original_port = self._make_port_dict(port_db)
  1160. updated_port = super(Ml2Plugin, self).update_port(context, id,
  1161. port)
  1162. self.extension_manager.process_update_port(context, attrs,
  1163. updated_port)
  1164. self._portsec_ext_port_update_processing(updated_port, context,
  1165. port, id)
  1166. if (psec.PORTSECURITY in attrs) and (
  1167. original_port[psec.PORTSECURITY] !=
  1168. updated_port[psec.PORTSECURITY]):
  1169. need_port_update_notify = True
  1170. # TODO(QoS): Move out to the extension framework somehow.
  1171. # Follow https://review.openstack.org/#/c/169223 for a solution.
  1172. if (qos_consts.QOS_POLICY_ID in attrs and
  1173. original_port[qos_consts.QOS_POLICY_ID] !=
  1174. updated_port[qos_consts.QOS_POLICY_ID]):
  1175. need_port_update_notify = True
  1176. if addr_pair.ADDRESS_PAIRS in attrs:
  1177. need_port_update_notify |= (
  1178. self.update_address_pairs_on_port(context, id, port,
  1179. original_port,
  1180. updated_port))
  1181. need_port_update_notify |= self.update_security_group_on_port(
  1182. context, id, port, original_port, updated_port)
  1183. network = self.get_network(context, original_port['network_id'])
  1184. need_port_update_notify |= self._update_extra_dhcp_opts_on_port(
  1185. context, id, port, updated_port)
  1186. levels = db.get_binding_levels(session, id, binding.host)
  1187. # one of the operations above may have altered the model call
  1188. # _make_port_dict again to ensure latest state is reflected so mech
  1189. # drivers, callback handlers, and the API caller see latest state.
  1190. # We expire here to reflect changed relationships on the obj.
  1191. # Repeatable read will ensure we still get the state from this
  1192. # transaction in spite of concurrent updates/deletes.
  1193. context.session.expire(port_db)
  1194. updated_port.update(self._make_port_dict(port_db))
  1195. mech_context = driver_context.PortContext(
  1196. self, context, updated_port, network, binding, levels,
  1197. original_port=original_port)
  1198. need_port_update_notify |= self._process_port_binding(
  1199. mech_context, attrs)
  1200. # For DVR router interface ports we need to retrieve the
  1201. # DVRPortbinding context instead of the normal port context.
  1202. # The normal Portbinding context does not have the status
  1203. # of the ports that are required by the l2pop to process the
  1204. # postcommit events.
  1205. # NOTE:Sometimes during the update_port call, the DVR router
  1206. # interface port may not have the port binding, so we cannot
  1207. # create a generic bindinglist that will address both the
  1208. # DVR and non-DVR cases here.
  1209. # TODO(Swami): This code need to be revisited.
  1210. if port_db['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
  1211. dist_binding_list = db.get_distributed_port_bindings(session,
  1212. id)
  1213. for dist_binding in dist_binding_list:
  1214. levels = db.get_binding_levels(session, id,
  1215. dist_binding.host)
  1216. dist_mech_context = driver_context.PortContext(
  1217. self, context, updated_port, network,
  1218. dist_binding, levels, original_port=original_port)
  1219. self.mechanism_manager.update_port_precommit(
  1220. dist_mech_context)
  1221. bound_mech_contexts.append(dist_mech_context)
  1222. else:
  1223. self.mechanism_manager.update_port_precommit(mech_context)
  1224. self._setup_dhcp_agent_provisioning_component(
  1225. context, updated_port)
  1226. bound_mech_contexts.append(mech_context)
  1227. # Notifications must be sent after the above transaction is complete
  1228. kwargs = {
  1229. 'context': context,
  1230. 'port': updated_port,
  1231. 'mac_address_updated': mac_address_updated,
  1232. 'original_port': original_port,
  1233. }
  1234. registry.notify(resources.PORT, events.AFTER_UPDATE, self, **kwargs)
  1235. # Note that DVR Interface ports will have bindings on
  1236. # multiple hosts, and so will have multiple mech_contexts,
  1237. # while other ports typically have just one.
  1238. # Since bound_mech_contexts has both the DVR and non-DVR
  1239. # contexts we can manage just with a single for loop.
  1240. try:
  1241. for mech_context in bound_mech_contexts:
  1242. self.mechanism_manager.update_port_postcommit(
  1243. mech_context)
  1244. except ml2_exc.MechanismDriverError:
  1245. LOG.error(_LE("mechanism_manager.update_port_postcommit "
  1246. "failed for port %s"), id)
  1247. self.check_and_notify_security_group_member_changed(
  1248. context, original_port, updated_port)
  1249. need_port_update_notify |= self.is_security_group_member_updated(
  1250. context, original_port, updated_port)
  1251. if original_port['admin_state_up'] != updated_port['admin_state_up']:
  1252. need_port_update_notify = True
  1253. # NOTE: In the case of DVR ports, the port-binding is done after
  1254. # router scheduling when sync_routers is called and so this call
  1255. # below may not be required for DVR routed interfaces. But still
  1256. # since we don't have the mech_context for the DVR router interfaces
  1257. # at certain times, we just pass the port-context and return it, so
  1258. # that we don't disturb other methods that are expecting a return
  1259. # value.
  1260. bound_context = self._bind_port_if_needed(
  1261. mech_context,
  1262. allow_notify=True,
  1263. need_notify=need_port_update_notify)
  1264. return bound_context.current
  1265. def _process_distributed_port_binding(self, mech_context, context, attrs):
  1266. session = mech_context._plugin_context.session
  1267. binding = mech_context._binding
  1268. port = mech_context.current
  1269. port_id = port['id']
  1270. if binding.vif_type != portbindings.VIF_TYPE_UNBOUND:
  1271. binding.vif_details = ''
  1272. binding.vif_type = portbindings.VIF_TYPE_UNBOUND
  1273. if binding.host:
  1274. db.clear_binding_levels(session, port_id, binding.host)
  1275. binding.host = ''
  1276. self._update_port_dict_binding(port, binding)
  1277. binding.host = attrs and attrs.get(portbindings.HOST_ID)
  1278. binding.router_id = attrs and attrs.get('device_id')
  1279. # merge into session to reflect changes
  1280. mech_context._plugin_context.session.merge(binding)
  1281. @utils.transaction_guard
  1282. @db_api.retry_if_session_inactive()
  1283. def update_distributed_port_binding(self, context, id, port):
  1284. attrs = port[attributes.PORT]
  1285. host = attrs and attrs.get(portbindings.HOST_ID)
  1286. host_set = validators.is_attr_set(host)
  1287. if not host_set:
  1288. LOG.error(_LE("No Host supplied to bind DVR Port %s"), id)
  1289. return
  1290. session = context.session
  1291. binding = db.get_distributed_port_binding_by_host(session, id, host)
  1292. device_id = attrs and attrs.get('device_id')
  1293. router_id = binding and binding.get('router_id')
  1294. update_required = (not binding or
  1295. binding.vif_type == portbindings.VIF_TYPE_BINDING_FAILED or
  1296. router_id != device_id)
  1297. if update_required:
  1298. try:
  1299. with session.begin(subtransactions=True):
  1300. orig_port = self.get_port(context, id)
  1301. if not binding:
  1302. binding = db.ensure_distributed_port_binding(
  1303. session, id, host, router_id=device_id)
  1304. network = self.get_network(context,
  1305. orig_port['network_id'])
  1306. levels = db.get_binding_levels(session, id, host)
  1307. mech_context = driver_context.PortContext(self,
  1308. context, orig_port, network,
  1309. binding, levels, original_port=orig_port)
  1310. self._process_distributed_port_binding(
  1311. mech_context, context, attrs)
  1312. except (os_db_exception.DBReferenceError, exc.PortNotFound):
  1313. LOG.debug("DVR Port %s has been deleted concurrently", id)
  1314. return
  1315. self._bind_port_if_needed(mech_context)
  1316. def _pre_delete_port(self, context, port_id, port_check):
  1317. """Do some preliminary operations before deleting the port."""
  1318. LOG.debug("Deleting port %s", port_id)
  1319. try:
  1320. # notify interested parties of imminent port deletion;
  1321. # a failure here prevents the operation from happening
  1322. kwargs = {
  1323. 'context': context,
  1324. 'port_id': port_id,
  1325. 'port_check': port_check
  1326. }
  1327. registry.notify(
  1328. resources.PORT, events.BEFORE_DELETE, self, **kwargs)
  1329. except exceptions.CallbackFailure as e:
  1330. # NOTE(armax): preserve old check's behavior
  1331. if len(e.errors) == 1:
  1332. raise e.errors[0].error
  1333. raise exc.ServicePortInUse(port_id=port_id, reason=e)
  1334. @utils.transaction_guard
  1335. @db_api.retry_if_session_inactive()
  1336. def delete_port(self, context, id, l3_port_check=True):
  1337. self._pre_delete_port(context, id, l3_port_check)
  1338. # TODO(armax): get rid of the l3 dependency in the with block
  1339. router_ids = []
  1340. l3plugin = manager.NeutronManager.get_service_plugins().get(
  1341. service_constants.L3_ROUTER_NAT)
  1342. session = context.session
  1343. with session.begin(subtransactions=True):
  1344. port_db, binding = db.get_locked_port_and_binding(session, id)
  1345. if not port_db:
  1346. LOG.debug("The port '%s' was deleted", id)
  1347. return
  1348. port = self._make_port_dict(port_db)
  1349. network = self.get_network(context, port['network_id'])
  1350. bound_mech_contexts = []
  1351. device_owner = port['device_owner']
  1352. if device_owner == const.DEVICE_OWNER_DVR_INTERFACE:
  1353. bindings = db.get_distributed_port_bindings(context.session,
  1354. id)
  1355. for bind in bindings:
  1356. levels = db.get_binding_levels(context.session, id,
  1357. bind.host)
  1358. mech_context = driver_context.PortContext(
  1359. self, context, port, network, bind, levels)
  1360. self.mechanism_manager.delete_port_precommit(mech_context)
  1361. bound_mech_contexts.append(mech_context)
  1362. else:
  1363. levels = db.get_binding_levels(context.session, id,
  1364. binding.host)
  1365. mech_context = driver_context.PortContext(
  1366. self, context, port, network, binding, levels)
  1367. self.mechanism_manager.delete_port_precommit(mech_context)
  1368. bound_mech_contexts.append(mech_context)
  1369. if l3plugin:
  1370. router_ids = l3plugin.disassociate_floatingips(
  1371. context, id, do_notify=False)
  1372. LOG.debug("Calling delete_port for %(port_id)s owned by %(owner)s",
  1373. {"port_id": id, "owner": device_owner})
  1374. super(Ml2Plugin, self).delete_port(context, id)
  1375. self._post_delete_port(
  1376. context, port, router_ids, bound_mech_contexts)
  1377. def _post_delete_port(
  1378. self, context, port, router_ids, bound_mech_contexts):
  1379. kwargs = {
  1380. 'context': context,
  1381. 'port': port,
  1382. 'router_ids': router_ids,
  1383. }
  1384. registry.notify(resources.PORT, events.AFTER_DELETE, self, **kwargs)
  1385. try:
  1386. # Note that DVR Interface ports will have bindings on
  1387. # multiple hosts, and so will have multiple mech_contexts,
  1388. # while other ports typically have just one.
  1389. for mech_context in bound_mech_contexts:
  1390. self.mechanism_manager.delete_port_postcommit(mech_context)
  1391. except ml2_exc.MechanismDriverError:
  1392. # TODO(apech) - One or more mechanism driver failed to
  1393. # delete the port. Ideally we'd notify the caller of the
  1394. # fact that an error occurred.
  1395. LOG.error(_LE("mechanism_manager.delete_port_postcommit failed for"
  1396. " port %s"), port['id'])
  1397. self.notifier.port_delete(context, port['id'])
  1398. self.notify_security_groups_member_updated(context, port)
  1399. @utils.transaction_guard
  1400. @db_api.retry_if_session_inactive(context_var_name='plugin_context')
  1401. def get_bound_port_context(self, plugin_context, port_id, host=None,
  1402. cached_networks=None):
  1403. session = plugin_context.session
  1404. with session.begin(subtransactions=True):
  1405. try:
  1406. port_db = (session.query(models_v2.Port).
  1407. enable_eagerloads(False).
  1408. filter(models_v2.Port.id.startswith(port_id)).
  1409. one())
  1410. except sa_exc.NoResultFound:
  1411. LOG.info(_LI("No ports have port_id starting with %s"),
  1412. port_id)
  1413. return
  1414. except sa_exc.MultipleResultsFound:
  1415. LOG.error(_LE("Multiple ports have port_id starting with %s"),
  1416. port_id)
  1417. return
  1418. port = self._make_port_dict(port_db)
  1419. network = (cached_networks or {}).get(port['network_id'])
  1420. if not network:
  1421. network = self.get_network(plugin_context, port['network_id'])
  1422. if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
  1423. binding = db.get_distributed_port_binding_by_host(
  1424. session, port['id'], host)
  1425. if not binding:
  1426. LOG.error(_LE("Binding info for DVR port %s not found"),
  1427. port_id)
  1428. return None
  1429. levels = db.get_binding_levels(session, port_db.id, host)
  1430. port_context = driver_context.PortContext(
  1431. self, plugin_context, port, network, binding, levels)
  1432. else:
  1433. # since eager loads are disabled in port_db query
  1434. # related attribute port_binding could disappear in
  1435. # concurrent port deletion.
  1436. # It's not an error condition.
  1437. binding = port_db.port_binding
  1438. if not binding:
  1439. LOG.info(_LI("Binding info for port %s was not found, "
  1440. "it might have been deleted already."),
  1441. port_id)
  1442. return
  1443. levels = db.get_binding_levels(session, port_db.id,
  1444. port_db.port_binding.host)
  1445. port_context = driver_context.PortContext(
  1446. self, plugin_context, port, network, binding, levels)
  1447. return self._bind_port_if_needed(port_context)
  1448. @utils.transaction_guard
  1449. @db_api.retry_if_session_inactive()
  1450. def update_port_status(self, context, port_id, status, host=None,
  1451. network=None):
  1452. """
  1453. Returns port_id (non-truncated uuid) if the port exists.
  1454. Otherwise returns None.
  1455. network can be passed in to avoid another get_network call if
  1456. one was already performed by the caller.
  1457. """
  1458. updated = False
  1459. session = context.session
  1460. with session.begin(subtransactions=True):
  1461. port = db.get_port(session, port_id)
  1462. if not port:
  1463. LOG.debug("Port %(port)s update to %(val)s by agent not found",
  1464. {'port': port_id, 'val': status})
  1465. return None
  1466. if (port.status != status and
  1467. port['device_owner'] != const.DEVICE_OWNER_DVR_INTERFACE):
  1468. original_port = self._make_port_dict(port)
  1469. port.status = status
  1470. # explicit flush before _make_port_dict to ensure extensions
  1471. # listening for db events can modify the port if necessary
  1472. context.session.flush()
  1473. updated_port = self._make_port_dict(port)
  1474. levels = db.get_binding_levels(session, port.id,
  1475. port.port_binding.host)
  1476. mech_context = driver_context.PortContext(
  1477. self, context, updated_port, network, port.port_binding,
  1478. levels, original_port=original_port)
  1479. self.mechanism_manager.update_port_precommit(mech_context)
  1480. updated = True
  1481. elif port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
  1482. binding = db.get_distributed_port_binding_by_host(
  1483. session, port['id'], host)
  1484. if not binding:
  1485. return
  1486. binding.status = status
  1487. updated = True
  1488. if (updated and
  1489. port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE):
  1490. with session.begin(subtransactions=True):
  1491. port = db.get_port(session, port_id)
  1492. if not port:
  1493. LOG.warning(_LW("Port %s not found during update"),
  1494. port_id)
  1495. return
  1496. original_port = self._make_port_dict(port)
  1497. network = network or self.get_network(
  1498. context, original_port['network_id'])
  1499. port.status = db.generate_distributed_port_status(session,
  1500. port['id'])
  1501. updated_port = self._make_port_dict(port)
  1502. levels = db.get_binding_levels(session, port_id, host)
  1503. mech_context = (driver_context.PortContext(
  1504. self, context, updated_port, network,
  1505. binding, levels, original_port=original_port))
  1506. self.mechanism_manager.update_port_precommit(mech_context)
  1507. if updated:
  1508. self.mechanism_manager.update_port_postcommit(mech_context)
  1509. kwargs = {'context': context, 'port': mech_context.current,
  1510. 'original_port': original_port}
  1511. if status == const.PORT_STATUS_ACTIVE:
  1512. # NOTE(kevinbenton): this kwarg was carried over from
  1513. # the RPC handler that used to call this. it's not clear
  1514. # who uses it so maybe it can be removed. added in commit
  1515. # 3f3874717c07e2b469ea6c6fd52bcb4da7b380c7
  1516. kwargs['update_device_up'] = True
  1517. registry.notify(resources.PORT, events.AFTER_UPDATE, self,
  1518. **kwargs)
  1519. if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
  1520. db.delete_distributed_port_binding_if_stale(session, binding)
  1521. return port['id']
  1522. @db_api.retry_if_session_inactive()
  1523. def port_bound_to_host(self, context, port_id, host):
  1524. if not host:
  1525. return
  1526. port = db.get_port(context.session, port_id)
  1527. if not port:
  1528. LOG.debug("No Port match for: %s", port_id)
  1529. return
  1530. if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
  1531. bindings = db.get_distributed_port_bindings(context.session,
  1532. port_id)
  1533. for b in bindings:
  1534. if b.host == host:
  1535. return port
  1536. LOG.debug("No binding found for DVR port %s", port['id'])
  1537. return
  1538. else:
  1539. port_host = db.get_port_binding_host(context.session, port_id)
  1540. return port if (port_host == host) else None
  1541. @db_api.retry_if_session_inactive()
  1542. def get_ports_from_devices(self, context, devices):
  1543. port_ids_to_devices = dict(
  1544. (self._device_to_port_id(context, device), device)
  1545. for device in devices)
  1546. port_ids = list(port_ids_to_devices.keys())
  1547. ports = db.get_ports_and_sgs(context, port_ids)
  1548. for port in ports:
  1549. # map back to original requested id
  1550. port_id = next((port_id for port_id in port_ids
  1551. if port['id'].startswith(port_id)), None)
  1552. port['device'] = port_ids_to_devices.get(port_id)
  1553. return ports
  1554. @staticmethod
  1555. def _device_to_port_id(context, device):
  1556. # REVISIT(rkukura): Consider calling into MechanismDrivers to
  1557. # process device names, or having MechanismDrivers supply list
  1558. # of device prefixes to strip.
  1559. for prefix in n_const.INTERFACE_PREFIXES:
  1560. if device.startswith(prefix):
  1561. return device[len(prefix):]
  1562. # REVISIT(irenab): Consider calling into bound MD to
  1563. # handle the get_device_details RPC
  1564. if not uuidutils.is_uuid_like(device):
  1565. port = db.get_port_from_device_mac(context, device)
  1566. if port:
  1567. return port.id
  1568. return device
  1569. def filter_hosts_with_network_access(
  1570. self, context, network_id, candidate_hosts):
  1571. segments = segments_db.get_network_segments(context.session,
  1572. network_id)
  1573. return self.mechanism_manager.filter_hosts_with_segment_access(
  1574. context, segments, candidate_hosts, self.get_agents)
  1575. def check_segment_for_agent(self, segment, agent):
  1576. for mech_driver in self.mechanism_manager.ordered_mech_drivers:
  1577. driver_agent_type = getattr(mech_driver.obj, 'agent_type', None)
  1578. if driver_agent_type and driver_agent_type == agent['agent_type']:
  1579. if mech_driver.obj.check_segment_for_agent(segment, agent):
  1580. return True
  1581. return False
  1582. def _handle_segment_change(self, rtype, event, trigger, context, segment):
  1583. if (event == events.PRECOMMIT_CREATE and
  1584. not isinstance(trigger, segments_plugin.Plugin)):
  1585. # TODO(xiaohhui): Now, when create network, ml2 will reserve
  1586. # segment and trigger this event handler. This event handler
  1587. # will reserve segment again, which will lead to error as the
  1588. # segment has already been reserved. This check could be removed
  1589. # by unifying segment creation procedure.
  1590. return
  1591. session = context.session
  1592. network_id = segment.get('network_id')
  1593. if event == events.PRECOMMIT_CREATE:
  1594. updated_segment = self.type_manager.reserve_network_segment(
  1595. session, segment)
  1596. # The segmentation id might be from ML2 type driver, update it
  1597. # in the original segment.
  1598. segment[api.SEGMENTATION_ID] = updated_segment[api.SEGMENTATION_ID]
  1599. elif event == events.PRECOMMIT_DELETE:
  1600. self.type_manager.release_network_segment(session, segment)
  1601. try:
  1602. self._notify_mechanism_driver_for_segment_change(
  1603. event, context, network_id)
  1604. except ml2_exc.MechanismDriverError:
  1605. with excutils.save_and_reraise_exception():
  1606. LOG.error(_LE("mechanism_manager error occurred when "
  1607. "handle event %(event)s for segment "
  1608. "'%(segment)s'"),
  1609. {'event': event, 'segment': segment['id']})
  1610. def _notify_mechanism_driver_for_segment_change(self, event,
  1611. context, network_id):
  1612. network_with_segments = self.get_network(context, network_id)
  1613. mech_context = driver_context.NetworkContext(
  1614. self, context, network_with_segments,
  1615. original_network=network_with_segments)
  1616. if (event == events.PRECOMMIT_CREATE or
  1617. event == events.PRECOMMIT_DELETE):
  1618. self.mechanism_manager.update_network_precommit(mech_context)
  1619. elif event == events.AFTER_CREATE or event == events.AFTER_DELETE:
  1620. self.mechanism_manager.update_network_postcommit(mech_context)