OpenStack/Neutron integration for Calico networking
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

mech_calico.py 61KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386
  1. # -*- coding: utf-8 -*-
  2. #
  3. # Copyright (c) 2014, 2015 Metaswitch Networks
  4. # Copyright (c) 2013 OpenStack Foundation
  5. # Copyright (c) 2018 Tigera, Inc. All rights reserved.
  6. # All Rights Reserved.
  7. #
  8. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  9. # not use this file except in compliance with the License. You may obtain
  10. # a copy of the License at
  11. #
  12. # http://www.apache.org/licenses/LICENSE-2.0
  13. #
  14. # Unless required by applicable law or agreed to in writing, software
  15. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  16. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  17. # License for the specific language governing permissions and limitations
  18. # under the License.
  19. # Calico/OpenStack Plugin
  20. #
  21. # This module is the OpenStack-specific implementation of the Plugin component
  22. # of the new Calico architecture (described by the "Calico Architecture"
  23. # document at http://docs.projectcalico.org/en/latest/architecture.html).
  24. #
  25. # It is implemented as a Neutron/ML2 mechanism driver.
  26. import contextlib
  27. from functools import wraps
  28. import inspect
  29. import os
  30. import re
  31. import uuid
  32. # OpenStack imports.
  33. import eventlet
  34. from eventlet.queue import PriorityQueue
  35. from eventlet.semaphore import Semaphore
  36. from neutron.agent import rpc as agent_rpc
  37. try:
  38. from neutron_lib.agent import topics
  39. except ImportError:
  40. # Neutron code prior to d996758fb4 (13th March 2018).
  41. from neutron.common import topics
  42. try:
  43. from neutron_lib import context as ctx
  44. except ImportError:
  45. # Neutron code prior to ca751a1486 (6th March 2017).
  46. from neutron import context as ctx
  47. try:
  48. from neutron_lib.plugins.ml2 import api
  49. except ImportError:
  50. # Neutron code prior to a2c36d7e (10th November 2017).
  51. from neutron.plugins.ml2 import driver_api as api
  52. from neutron.plugins.ml2.drivers import mech_agent
  53. from sqlalchemy import exc as sa_exc
  54. # Monkeypatch import
  55. import neutron.plugins.ml2.rpc as rpc
  56. # Calico imports.
  57. from networking_calico.common import config as calico_config
  58. from networking_calico.compat import cfg
  59. from networking_calico.compat import constants
  60. from networking_calico.compat import db_exc
  61. from networking_calico.compat import lockutils
  62. from networking_calico.compat import log
  63. from networking_calico.compat import plugin_dir
  64. from networking_calico import datamodel_v1
  65. from networking_calico import datamodel_v2
  66. from networking_calico import datamodel_v3
  67. from networking_calico import etcdv3
  68. from networking_calico.logutils import logging_exceptions
  69. from networking_calico.monotonic import monotonic_time
  70. from networking_calico.plugins.ml2.drivers.calico.election import Elector
  71. from networking_calico.plugins.ml2.drivers.calico.endpoints import \
  72. _port_is_endpoint_port
  73. from networking_calico.plugins.ml2.drivers.calico.endpoints import \
  74. WorkloadEndpointSyncer
  75. from networking_calico.plugins.ml2.drivers.calico.policy import PolicySyncer
  76. from networking_calico.plugins.ml2.drivers.calico.status import StatusWatcher
  77. from networking_calico.plugins.ml2.drivers.calico.subnets import SubnetSyncer
  78. # Imports for a Keystone client.
  79. from keystoneauth1.identity import v3
  80. from keystoneauth1 import session
  81. from keystoneclient.v3.client import Client as KeystoneClient
  82. LOG = log.getLogger(__name__)
  83. calico_opts = [
  84. cfg.IntOpt('num_port_status_threads', default=4,
  85. help="Number of threads to use for writing port status "
  86. "updates to the database."),
  87. cfg.IntOpt('etcd_compaction_period_mins', default=60,
  88. help="Interval in minutes between periodic etcd compactions. "
  89. "A setting of 0 tells this Calico driver not to request "
  90. "any etcd compaction; in that case the deployment must "
  91. "take its own steps to prevent the etcd database from "
  92. "growing without any disk usage bound."),
  93. cfg.IntOpt('etcd_compaction_min_revisions', default=1000,
  94. help="The minimum number of revisions to keep when requesting "
  95. "an etcd compaction. We also keep at least the history "
  96. "of the previous etcd_compaction_period_mins interval."),
  97. cfg.IntOpt('project_name_cache_max', default=100,
  98. help="The maximum allowed size of our cache of project names."),
  99. ]
  100. cfg.CONF.register_opts(calico_opts, 'calico')
  101. # In order to rate limit warning logs about queue lengths, we check if we've
  102. # already logged within this interval (seconds) before logging.
  103. QUEUE_WARN_LOG_INTERVAL_SECS = 10
  104. # An OpenStack agent type name for Felix, the Calico agent component in the new
  105. # architecture.
  106. AGENT_TYPE_FELIX = 'Calico per-host agent (felix)'
  107. AGENT_ID_FELIX = 'calico-felix'
  108. # Mapping from our endpoint status to neutron's port status.
  109. PORT_STATUS_MAPPING = {
  110. datamodel_v1.ENDPOINT_STATUS_UP: constants.PORT_STATUS_ACTIVE,
  111. datamodel_v1.ENDPOINT_STATUS_DOWN: constants.PORT_STATUS_DOWN,
  112. datamodel_v1.ENDPOINT_STATUS_ERROR: constants.PORT_STATUS_ERROR,
  113. }
  114. # The interval between period resyncs, in seconds.
  115. # TODO(nj): Increase this to a longer interval for product code.
  116. RESYNC_INTERVAL_SECS = 60
  117. # When we're not the master, how often we check if we have become the master.
  118. MASTER_CHECK_INTERVAL_SECS = 5
  119. # Delay before retrying a failed port status update to the Neutron DB.
  120. PORT_UPDATE_RETRY_DELAY_SECS = 5
  121. # We wait for a short period of time before we initialize our state to avoid
  122. # problems with Neutron forking.
  123. STARTUP_DELAY_SECS = 10
  124. # Set a low refresh interval on the master key. This reduces the chance of
  125. # the etcd event buffer wrapping while non-masters are waiting for the key to
  126. # be refreshed.
  127. MASTER_REFRESH_INTERVAL = 10
  128. MASTER_TIMEOUT = 60
  129. PRIORITY_HIGH = 0
  130. PRIORITY_LOW = 1
  131. PRIORITY_RETRY = 2
  132. # This terrible global variable points to the running instance of the
  133. # Calico Mechanism Driver. This variable relies on the basic assertion that
  134. # any Neutron process, forked or not, should only ever have *one* Calico
  135. # Mechanism Driver in it. It's used by our monkeypatch of the
  136. # security_groups_rule_updated method below to locate the mechanism driver.
  137. # TODO(nj): Let's not do this any more. Please?
  138. mech_driver = None
  139. def requires_state(f):
  140. """requires_state
  141. This decorator is used to ensure that any method that requires that
  142. state be initialized will do that. This is to make sure that, if a user
  143. attempts an action before STARTUP_DELAY_SECS have passed, they don't
  144. have to wait.
  145. This decorator only needs to be applied to top-level functions of the
  146. CalicoMechanismDriver class: specifically, those that are called directly
  147. from Neutron.
  148. """
  149. @wraps(f)
  150. def wrapper(self, *args, **kwargs):
  151. self._post_fork_init()
  152. return f(self, *args, **kwargs)
  153. return wrapper
  154. class CalicoMechanismDriver(mech_agent.SimpleAgentMechanismDriverBase):
  155. """Neutron/ML2 mechanism driver for Project Calico.
  156. CalicoMechanismDriver communicates information about endpoints and security
  157. configuration, via etcd, to the Felix and DHCP agent instances running on
  158. each compute host.
  159. """
  160. def __init__(self):
  161. super(CalicoMechanismDriver, self).__init__(
  162. AGENT_TYPE_FELIX,
  163. 'tap',
  164. {'port_filter': True,
  165. 'mac_address': '00:61:fe:ed:ca:fe'})
  166. # Lock to prevent concurrent initialisation.
  167. self._init_lock = Semaphore()
  168. # Generally initialize attributes to nil values. They get initialized
  169. # properly, as needed, in _post_fork_init().
  170. self.db = None
  171. self.elector = None
  172. self._agent_update_context = None
  173. self._etcd_watcher = None
  174. self._etcd_watcher_thread = None
  175. self._my_pid = None
  176. self._epoch = 0
  177. self.in_resync = False
  178. # Mapping from (hostname, port-id) to Calico's status for a port. The
  179. # hostname is included to disambiguate between multiple copies of a
  180. # port, which may exist during a migration or a re-schedule.
  181. self._port_status_cache = {}
  182. # Queue used to fan out port status updates to worker threads. Notes:
  183. # * we don't recreate the queue in _post_fork_init() so that we can't
  184. # possibly lose updates that had already been queued.
  185. # * the queue contains tuples (priority, <status key>); we use a
  186. # higher priority for events and a lower priority for snapshot
  187. # keys, so that current data skips the queue.
  188. self._port_status_queue = PriorityQueue()
  189. self._port_status_queue_too_long = False
  190. # RPC client for fanning out agent state reports.
  191. self.state_report_rpc = None
  192. # Whether the version of update_port_status() available in this version
  193. # of OpenStack has the host argument. computed on first use.
  194. self._cached_update_port_status_has_host_param = None
  195. # Last time we logged about a long port-status queue. Used for rate
  196. # limiting. Note: monotonic_time() uses its own epoch so it's only
  197. # safe to compare this with other values returned by monotonic_time().
  198. self._last_status_queue_log_time = monotonic_time()
  199. # Tell the monkeypatch where we are.
  200. global mech_driver
  201. assert mech_driver is None
  202. mech_driver = self
  203. # Make sure we initialise even if we don't see any API calls.
  204. eventlet.spawn_after(STARTUP_DELAY_SECS, self._post_fork_init)
  205. LOG.info("Created Calico mechanism driver %s", self)
  206. @logging_exceptions(LOG)
  207. def _post_fork_init(self):
  208. """_post_fork_init
  209. Creates the connection state required for talking to the Neutron DB
  210. and to etcd. This is a no-op if it has been executed before.
  211. This is split out from __init__ to allow us to defer this
  212. initialisation until after Neutron has forked off its worker
  213. children. If we initialise the DB and etcd connections before
  214. the fork (as would happen in __init__()) then the workers
  215. would share sockets incorrectly.
  216. """
  217. # The self._init_lock semaphore mediates if two or more eventlet
  218. # threads call _post_fork_init at the same time, within the same
  219. # Neutron server fork. This can happen if the timed initialization
  220. # (after STARTUP_DELAY_SECS) coincides with the handling of a Neutron
  221. # API request, or if this fork processes multiple Neutron API requests
  222. # at the same time.
  223. with self._init_lock:
  224. current_pid = os.getpid()
  225. if self._my_pid == current_pid:
  226. # We've initialised our PID and it hasn't changed since last
  227. # time, nothing to do.
  228. LOG.info("Calico state already initialised for PID %s",
  229. current_pid)
  230. return
  231. # else: either this is the first call or our PID has changed:
  232. # (re)initialise.
  233. if self._my_pid is not None:
  234. # This is unexpected but we can deal with it: Neutron should
  235. # fork before we trigger the first call to _post_fork_init!().
  236. LOG.warning("PID changed from %s to %s; unexpected fork after "
  237. "initialisation? Reinitialising Calico driver.",
  238. self._my_pid, current_pid)
  239. else:
  240. LOG.info("Doing Calico mechanism driver initialisation in"
  241. " process %s", current_pid)
  242. # (Re)init the DB.
  243. self.db = None
  244. self._get_db()
  245. # Create a Keystone client.
  246. authcfg = cfg.CONF.keystone_authtoken
  247. LOG.debug("authcfg = %r", authcfg)
  248. for key in authcfg:
  249. if 'password' in key:
  250. LOG.debug("authcfg[%s] = %s", key, '***')
  251. else:
  252. LOG.debug("authcfg[%s] = %s", key, authcfg[key])
  253. # Compatibility with older versions of openstack [mitaka and below]
  254. try:
  255. user_domain_name = authcfg.user_domain_name
  256. except cfg.NoSuchOptError:
  257. user_domain_name = 'Default'
  258. LOG.debug("authcfg[user_domain_name] fallback = %s",
  259. user_domain_name)
  260. try:
  261. username = authcfg.username
  262. except cfg.NoSuchOptError:
  263. username = authcfg.admin_user
  264. LOG.debug("authcfg[username] fallback[admin_user] = %s",
  265. username)
  266. try:
  267. password = authcfg.password
  268. except cfg.NoSuchOptError:
  269. password = authcfg.admin_password
  270. LOG.debug("authcfg[password] fallback[admin_password] = %s",
  271. '***')
  272. try:
  273. project_domain_name = authcfg.project_domain_name
  274. except cfg.NoSuchOptError:
  275. project_domain_name = 'Default'
  276. LOG.debug("authcfg[project_domain_name] fallback = %s",
  277. project_domain_name)
  278. try:
  279. project_name = authcfg.project_name
  280. except cfg.NoSuchOptError:
  281. project_name = authcfg.admin_tenant_name
  282. LOG.debug("authcfg[project_name] fallback[admin_tenant_name]"
  283. " = %s", project_name)
  284. try:
  285. auth_url = authcfg.auth_url
  286. except cfg.NoSuchOptError:
  287. auth_url = authcfg.identity_uri
  288. LOG.debug("authcfg[auth_url] fallback[identity_uri] = %s",
  289. auth_url)
  290. auth = v3.Password(user_domain_name=user_domain_name,
  291. username=username,
  292. password=password,
  293. project_domain_name=project_domain_name,
  294. project_name=project_name,
  295. auth_url=re.sub(r'/v3/?$', '', auth_url) +
  296. '/v3')
  297. sess = session.Session(auth=auth)
  298. keystone_client = KeystoneClient(session=sess)
  299. LOG.debug("Keystone client = %r", keystone_client)
  300. # Create syncers.
  301. self.subnet_syncer = \
  302. SubnetSyncer(self.db, self._txn_from_context)
  303. self.policy_syncer = \
  304. PolicySyncer(self.db, self._txn_from_context)
  305. self.endpoint_syncer = \
  306. WorkloadEndpointSyncer(self.db,
  307. self._txn_from_context,
  308. self.policy_syncer,
  309. keystone_client)
  310. # Admin context used by (only) the thread that updates Felix agent
  311. # status.
  312. self._agent_update_context = ctx.get_admin_context()
  313. # Get RPC connection for fanning out Felix state reports.
  314. try:
  315. state_report_topic = topics.REPORTS
  316. except AttributeError:
  317. # Older versions of OpenStack share the PLUGIN topic.
  318. state_report_topic = topics.PLUGIN
  319. self.state_report_rpc = agent_rpc.PluginReportStateAPI(
  320. state_report_topic
  321. )
  322. # Elector, for performing leader election.
  323. self.elector = Elector(cfg.CONF.calico.elector_name,
  324. datamodel_v2.neutron_election_key(
  325. calico_config.get_region_string()),
  326. old_key=datamodel_v1.NEUTRON_ELECTION_KEY,
  327. interval=MASTER_REFRESH_INTERVAL,
  328. ttl=MASTER_TIMEOUT)
  329. self._my_pid = current_pid
  330. # Start our resynchronization process and status updating. Just in
  331. # case we ever get two same threads running, use an epoch counter
  332. # to tell the old thread to die.
  333. # We deliberately do this last, to ensure that all of the setup
  334. # above is complete before we start running.
  335. self._epoch += 1
  336. eventlet.spawn(self.periodic_resync_thread, self._epoch)
  337. eventlet.spawn(self._status_updating_thread, self._epoch)
  338. for _ in range(cfg.CONF.calico.num_port_status_threads):
  339. eventlet.spawn(self._loop_writing_port_statuses, self._epoch)
  340. LOG.info("Calico mechanism driver initialisation done in process "
  341. "%s", current_pid)
  342. @logging_exceptions(LOG)
  343. def _status_updating_thread(self, expected_epoch):
  344. """_status_updating_thread
  345. This method acts as a status updates handler logic for the
  346. Calico mechanism driver. Watches for felix updates in etcd
  347. and passes info to Neutron database.
  348. """
  349. LOG.info("Status updating thread started.")
  350. while self._epoch == expected_epoch:
  351. # Only handle updates if we are the master node.
  352. if self.elector.master():
  353. if self._etcd_watcher is None:
  354. LOG.info("Became the master, starting StatusWatcher")
  355. self._etcd_watcher = StatusWatcher(self)
  356. self._etcd_watcher_thread = eventlet.spawn(
  357. self._etcd_watcher.start
  358. )
  359. LOG.info("Started %s as %s",
  360. self._etcd_watcher, self._etcd_watcher_thread)
  361. elif not self._etcd_watcher_thread:
  362. LOG.error("StatusWatcher %s died", self._etcd_watcher)
  363. self._etcd_watcher.stop()
  364. self._etcd_watcher = None
  365. else:
  366. if self._etcd_watcher is not None:
  367. LOG.warning("No longer master, stopping StatusWatcher")
  368. self._etcd_watcher.stop()
  369. self._etcd_watcher = None
  370. # Short sleep interval before we check if we've become
  371. # the master.
  372. eventlet.sleep(MASTER_CHECK_INTERVAL_SECS)
  373. else:
  374. LOG.warning("Unexpected: epoch changed. "
  375. "Handling status updates thread exiting.")
  376. def on_felix_alive(self, felix_hostname, new):
  377. LOG.info("Felix on host %s is alive; fanning out status report",
  378. felix_hostname)
  379. # Rather than writing directly to the database, we use the RPC
  380. # mechanism to fan out the request to another process. This
  381. # distributes the DB write load and avoids turning the db-access lock
  382. # into a bottleneck.
  383. agent_state = felix_agent_state(felix_hostname, start_flag=new)
  384. self.state_report_rpc.report_state(self._agent_update_context,
  385. agent_state,
  386. use_call=False)
  387. def on_port_status_changed(self, hostname, port_id, status_dict,
  388. priority="low"):
  389. """Called when etcd tells us that a port status has changed.
  390. :param hostname: hostname of the host containing the port.
  391. :param port_id: the port ID.
  392. :param status_dict: new status dict for the port or None if the
  393. status was deleted.
  394. """
  395. port_status_key = (intern(hostname.encode("utf8")), port_id)
  396. # Unwrap the dict around the actual status.
  397. if status_dict is not None:
  398. # Update.
  399. calico_status = status_dict.get("status")
  400. else:
  401. # Deletion.
  402. calico_status = None
  403. # Check whether this update gives us new information to pass to
  404. # Neutron. "high" priority updates come from changes spotted by Felix,
  405. # including interface flaps caused by, for example, VM rebuild. In
  406. # those cases, we may be out-of-sync with Neutron because the
  407. # port can be marked as down/removed by another component.
  408. #
  409. # "low" priority updates come from datastore resyncs. In those cases
  410. # we rely on our cache of port status to avoid spamming Neutron with
  411. # many no-op updates. It _is_ possible for our cache to be out of
  412. # sync in the resync case too; however,
  413. #
  414. # - the impact on the database of sending port status updates to
  415. # Neutron for all ports is significant (we do have to do it as
  416. # start-of-day, because our cache is empty)
  417. #
  418. # - the impact of an incorrect port status for a normal, live VM is
  419. # minimal (and it shouldn't get out of sync unless another component
  420. # updates the port anyway, in which case they'll have updated the
  421. # database)
  422. #
  423. # - the impact of missing an update for a VM that is being (re)built
  424. # is that the VM (re)build fails; but if we're doing a resync then
  425. # we must have been disconnected from the datastore and that means
  426. # the (re)build is already likely to fail due to the disconnection.
  427. if (priority == "high" or
  428. self._port_status_cache.get(port_status_key) != calico_status):
  429. LOG.info("Status of port %s on host %s changed to %s",
  430. port_status_key, hostname, calico_status)
  431. # We write the update to our in-memory cache, which is shared with
  432. # the DB writer threads. This means that the next write for a
  433. # particular key always goes directly to the correct state.
  434. # Python's dict is thread-safe for set and get, which is what we
  435. # need.
  436. if calico_status is not None:
  437. if calico_status in PORT_STATUS_MAPPING:
  438. # Intern the status to avoid keeping thousands of copies
  439. # of the status strings. We know the .encode() is safe
  440. # because we just checked this was one of our expected
  441. # strings.
  442. interned_status = intern(calico_status.encode("utf8"))
  443. self._port_status_cache[port_status_key] = interned_status
  444. else:
  445. LOG.error("Unknown port status: %r", calico_status)
  446. self._port_status_cache.pop(port_status_key, None)
  447. else:
  448. self._port_status_cache.pop(port_status_key, None)
  449. # Defer the actual update to the background thread so that we don't
  450. # hold up reading from etcd. In particular, we don't want to block
  451. # Felix status updates while we wait on the DB.
  452. sortable_priority = (
  453. PRIORITY_HIGH if priority == "high" else PRIORITY_LOW,
  454. monotonic_time(),
  455. )
  456. self._port_status_queue.put((sortable_priority, port_status_key))
  457. qsize = self._port_status_queue.qsize()
  458. if qsize > 10:
  459. now = monotonic_time()
  460. if (now - self._last_status_queue_log_time >
  461. QUEUE_WARN_LOG_INTERVAL_SECS):
  462. LOG.warning("Port status update queue length is high: %s",
  463. qsize)
  464. self._last_status_queue_log_time = now
  465. self._port_status_queue_too_long = True
  466. # Queue is getting large, make sure the DB writer threads
  467. # get CPU.
  468. eventlet.sleep()
  469. elif self._port_status_queue_too_long and qsize < 5:
  470. self._port_status_queue_too_long = False
  471. LOG.warning("Port status update queue back to normal: %s",
  472. qsize)
  473. @logging_exceptions(LOG)
  474. def _loop_writing_port_statuses(self, expected_epoch):
  475. LOG.info("Port status write thread started epoch=%s", expected_epoch)
  476. admin_context = ctx.get_admin_context()
  477. while self._epoch == expected_epoch:
  478. # Wait for work to do.
  479. _, port_status_key = self._port_status_queue.get()
  480. # Actually do the update.
  481. self._try_to_update_port_status(admin_context, port_status_key)
  482. def _try_to_update_port_status(self, admin_context, port_status_key):
  483. """Attempts to update the given port status.
  484. :param admin_context: Admin context to pass to Neutron. Should be
  485. unique for each thread.
  486. :param port_status_key: tuple of hostname, port_id.
  487. """
  488. hostname, port_id = port_status_key
  489. calico_status = self._port_status_cache.get(port_status_key)
  490. if calico_status:
  491. neutron_status = PORT_STATUS_MAPPING[calico_status]
  492. LOG.info("Updating port %s status to %s", port_id, neutron_status)
  493. else:
  494. # Report deletion as error. Either the port has genuinely been
  495. # deleted, in which case this update is ignored by
  496. # update_port_status() or the port still exists but we disagree,
  497. # which is an error.
  498. neutron_status = constants.PORT_STATUS_ERROR
  499. LOG.info("Reporting port %s deletion", port_id)
  500. try:
  501. if self._update_port_status_has_host_param():
  502. # Later OpenStack versions support passing the hostname.
  503. LOG.debug("update_port_status() supports host parameter")
  504. self.db.update_port_status(admin_context,
  505. port_id,
  506. neutron_status,
  507. host=hostname)
  508. else:
  509. # Older versions don't have a way to specify the hostname so
  510. # we do our best.
  511. LOG.debug("update_port_status() missing host parameter")
  512. self.db.update_port_status(admin_context,
  513. port_id,
  514. neutron_status)
  515. except (db_exc.DBError,
  516. sa_exc.SQLAlchemyError) as e:
  517. # Defensive: pre-Liberty, it was easy to cause deadlocks here if
  518. # any code path (in another loaded plugin, say) failed to take
  519. # the db-access lock. Post-Liberty, we shouldn't see any
  520. # exceptions here because update_port_status() is wrapped with a
  521. # retry decorator in the neutron code.
  522. LOG.warning("Failed to update port status for %s due to %r.",
  523. port_id, e)
  524. # Queue up a retry after a delay.
  525. eventlet.spawn_after(PORT_UPDATE_RETRY_DELAY_SECS,
  526. self._retry_port_status_update,
  527. port_status_key)
  528. else:
  529. LOG.debug("Updated port status for %s", port_id)
  530. @logging_exceptions(LOG)
  531. def _retry_port_status_update(self, port_status_key):
  532. LOG.info("Retrying update to port %s", port_status_key)
  533. # Queue up the update so that we'll go via the normal writer threads.
  534. # They will re-read the current state of the port from the cache.
  535. self._port_status_queue.put(((PRIORITY_RETRY, monotonic_time()),
  536. port_status_key))
  537. def _update_port_status_has_host_param(self):
  538. """Check whether update_port_status() supports the host parameter."""
  539. if self._cached_update_port_status_has_host_param is None:
  540. args, _, varkw, _ = inspect.getargspec(self.db.update_port_status)
  541. has_host_param = varkw or "host" in args
  542. self._cached_update_port_status_has_host_param = has_host_param
  543. LOG.info("update_port_status() supports host arg: %s",
  544. has_host_param)
  545. return self._cached_update_port_status_has_host_param
  546. def _get_db(self):
  547. if not self.db:
  548. self.db = plugin_dir.get_plugin()
  549. LOG.info("db = %s" % self.db)
  550. # Update the reference to ourselves.
  551. global mech_driver
  552. mech_driver = self
  553. def bind_port(self, context):
  554. """bind_port
  555. Checks that the DHCP agent is alive on the host and then defers
  556. to the superclass, which will check that felix is alive and then
  557. call back into our check_segment_for_agent() method, which does
  558. further checks.
  559. """
  560. # FIXME: Actually for now we don't check for a DHCP agent,
  561. # because we haven't yet worked out the future architecture
  562. # for this. The key point is that we don't want to do this
  563. # via the Neutron database and RPC mechanisms, because that is
  564. # what causes the scaling problem that led us to switch to an
  565. # etcd-driven DHCP agent.
  566. return super(CalicoMechanismDriver, self).bind_port(context)
  567. def check_segment_for_agent(self, segment, agent):
  568. LOG.debug("Checking segment %s with agent %s" % (segment, agent))
  569. if segment[api.NETWORK_TYPE] in ['local', 'flat']:
  570. return True
  571. else:
  572. LOG.warning(
  573. "Calico does not support network type %s, on network %s",
  574. segment[api.NETWORK_TYPE],
  575. segment[api.ID],
  576. )
  577. return False
  578. def get_allowed_network_types(self, agent=None):
  579. return ('local', 'flat')
  580. def get_mappings(self, agent):
  581. # We override this primarily to satisfy the ABC checker: this method
  582. # never actually gets called because we also override
  583. # check_segment_for_agent.
  584. assert False
  585. # For network and subnet actions we have nothing to do, so we provide these
  586. # no-op methods.
  587. def create_network_postcommit(self, context):
  588. LOG.info("CREATE_NETWORK_POSTCOMMIT: %s" % context)
  589. def update_network_postcommit(self, context):
  590. LOG.info("UPDATE_NETWORK_POSTCOMMIT: %s" % context)
  591. def delete_network_postcommit(self, context):
  592. LOG.info("DELETE_NETWORK_POSTCOMMIT: %s" % context)
  593. @requires_state
  594. def create_subnet_postcommit(self, context):
  595. LOG.info("CREATE_SUBNET_POSTCOMMIT: %s" % context)
  596. # Re-read the subnet from the DB. This ensures that a change to the
  597. # same subnet can't be processed by another controller process while
  598. # we're writing the effects of this call into etcd.
  599. subnet = context.current
  600. plugin_context = context._plugin_context
  601. with self._txn_from_context(plugin_context, tag="create-subnet"):
  602. subnet = self.db.get_subnet(plugin_context, subnet['id'])
  603. if subnet['enable_dhcp']:
  604. self.subnet_syncer.subnet_created(subnet, context)
  605. @requires_state
  606. def update_subnet_postcommit(self, context):
  607. LOG.info("UPDATE_SUBNET_POSTCOMMIT: %s" % context)
  608. # Re-read the subnet from the DB. This ensures that a change to the
  609. # same subnet can't be processed by another controller process while
  610. # we're writing the effects of this call into etcd.
  611. subnet = context.current
  612. plugin_context = context._plugin_context
  613. with self._txn_from_context(plugin_context, tag="update-subnet"):
  614. subnet = self.db.get_subnet(plugin_context, subnet['id'])
  615. if subnet['enable_dhcp']:
  616. self.subnet_syncer.subnet_created(subnet, context)
  617. else:
  618. self.subnet_syncer.subnet_deleted(subnet['id'])
  619. @requires_state
  620. def delete_subnet_postcommit(self, context):
  621. LOG.info("DELETE_SUBNET_POSTCOMMIT: %s" % context)
  622. self.subnet_syncer.subnet_deleted(context.current['id'])
  623. # Idealised method forms.
  624. @requires_state
  625. def create_port_postcommit(self, context):
  626. """create_port_postcommit
  627. Called after Neutron has committed a port creation event to the
  628. database.
  629. Process this event by taking and holding a database transaction and
  630. re-reading the port. Once we do that, we know the port will remain
  631. unchanged while we hold the transaction. We can then write the port to
  632. etcd, along with any other information we may need.
  633. """
  634. LOG.info('CREATE_PORT_POSTCOMMIT: %s', context)
  635. port = context._port
  636. # Ignore if this is not an endpoint port.
  637. if not _port_is_endpoint_port(port):
  638. return
  639. # Ignore if the port binding VIF type is 'unbound'; then this port
  640. # doesn't need to be networked yet.
  641. if port['binding:vif_type'] == 'unbound':
  642. LOG.info("Creating unbound port: no work required.")
  643. return
  644. plugin_context = context._plugin_context
  645. with self._txn_from_context(plugin_context, tag="create-port"):
  646. self.endpoint_syncer.write_endpoint(port, plugin_context)
  647. @requires_state
  648. def update_port_postcommit(self, context):
  649. """update_port_postcommit
  650. Called after Neutron has committed a port update event to the
  651. database.
  652. This is a tricky event, because it can be called in a number of ways
  653. during VM migration. We farm out to the appropriate method from here.
  654. """
  655. LOG.info('UPDATE_PORT_POSTCOMMIT: %s', context)
  656. port = context._port
  657. original = context.original
  658. # Abort early if we're managing non-endpoint ports.
  659. if not _port_is_endpoint_port(port):
  660. return
  661. # If this port update is purely for a status change, don't do anything:
  662. # we don't care about port statuses.
  663. if port_status_change(port, original):
  664. LOG.info(' port status changed from %s to %s, no action.',
  665. original.get("status"), port.get("status"))
  666. return
  667. # Now, re-read the port.
  668. plugin_context = context._plugin_context
  669. with self._txn_from_context(plugin_context, tag="update-port"):
  670. port = self.db.get_port(plugin_context, port['id'])
  671. # Now, fork execution based on the type of update we're performing.
  672. # There are a few:
  673. # - a port becoming bound (binding vif_type from unbound to bound);
  674. # - a port becoming unbound (binding vif_type from bound to
  675. # unbound);
  676. # - an Icehouse migration (binding host id changed and port bound);
  677. # - an update (port bound at all times);
  678. # - a change to an unbound port (which we don't care about, because
  679. # we do nothing with unbound ports).
  680. if port_bound(port) and not port_bound(original):
  681. self._port_bound_update(context, port)
  682. elif port_bound(original) and not port_bound(port):
  683. self._port_unbound_update(context, original)
  684. elif original['binding:host_id'] != port['binding:host_id']:
  685. LOG.info("Icehouse migration")
  686. self._icehouse_migration_step(context, port, original)
  687. elif port_bound(original) and port_bound(port):
  688. LOG.info("Port update")
  689. self._update_port(plugin_context, port)
  690. else:
  691. LOG.info("Update on unbound port: no action")
  692. pass
  693. @requires_state
  694. def update_floatingip(self, plugin_context):
  695. """update_floatingip
  696. Called after a Neutron floating IP has been associated or
  697. disassociated from a port.
  698. """
  699. LOG.info('UPDATE_FLOATINGIP: %s', plugin_context)
  700. with self._txn_from_context(plugin_context, tag="update_floatingip"):
  701. port = self.db.get_port(plugin_context,
  702. plugin_context.fip_update_port_id)
  703. self._update_port(plugin_context, port)
  704. @requires_state
  705. def delete_port_postcommit(self, context):
  706. """delete_port_postcommit
  707. Called after Neutron has committed a port deletion event to the
  708. database.
  709. There's no database row for us to lock on here, so don't bother.
  710. """
  711. LOG.info('DELETE_PORT_POSTCOMMIT: %s', context)
  712. port = context._port
  713. # Immediately halt processing if this is not an endpoint port.
  714. if not _port_is_endpoint_port(port):
  715. return
  716. self.endpoint_syncer.delete_endpoint(port)
  717. @requires_state
  718. def send_sg_updates(self, sgids, context):
  719. """Called whenever security group rules or membership change.
  720. When a security group rule is added, we need to do the following steps:
  721. 1. Reread the security rules from the Neutron DB.
  722. 2. Write the updated policy to etcd.
  723. """
  724. LOG.info("Updating security group IDs %s", sgids)
  725. with self._txn_from_context(context, tag="sg-update"):
  726. self.policy_syncer.write_sgs_to_etcd(sgids, context)
  727. @contextlib.contextmanager
  728. def _txn_from_context(self, context, tag="<unset>"):
  729. """Context manager: opens a DB transaction against the given context.
  730. If required, this also takes the Neutron-wide db-access semaphore.
  731. :return: context manager for use with with:.
  732. """
  733. session = context.session
  734. conn_url = str(session.connection().engine.url).lower()
  735. if (conn_url.startswith("mysql:") or
  736. conn_url.startswith("mysql+mysqldb:")):
  737. # Neutron is using the mysqldb driver for accessing the database.
  738. # This has a known incompatibility with eventlet that leads to
  739. # deadlock. Take the neutron-wide db-access lock as a workaround.
  740. # See https://bugs.launchpad.net/oslo.db/+bug/1350149 for a
  741. # description of the issue.
  742. LOG.debug("Waiting for db-access lock tag=%s...", tag)
  743. try:
  744. with lockutils.lock('db-access'):
  745. LOG.debug("...acquired db-access lock tag=%s", tag)
  746. with context.session.begin(subtransactions=True) as txn:
  747. yield txn
  748. finally:
  749. LOG.debug("Released db-access lock tag=%s", tag)
  750. else:
  751. # Liberty or later uses an eventlet-safe mysql library. (Or, we're
  752. # not using mysql at all.)
  753. LOG.debug("Not using mysqldb driver, skipping db-access lock")
  754. with context.session.begin(subtransactions=True) as txn:
  755. yield txn
  756. def _port_unbound_update(self, context, port):
  757. """_port_unbound_update
  758. This is called when a port is unbound during a port update. This
  759. destroys the port in etcd.
  760. """
  761. LOG.info("Port becoming unbound: destroy.")
  762. self.endpoint_syncer.delete_endpoint(port)
  763. def _port_bound_update(self, context, port):
  764. """_port_bound_update
  765. This is called when a port is bound during a port update. This creates
  766. the port in etcd.
  767. This method expects to be called from within a database transaction,
  768. and does not create one itself.
  769. """
  770. # TODO(nj): Can we avoid re-writing policy here? Put another way, can
  771. # security groups change during migration steps, or does a separate
  772. # port update event occur?
  773. LOG.info("Port becoming bound: create.")
  774. self.endpoint_syncer.write_endpoint(port, context._plugin_context)
  775. def _icehouse_migration_step(self, context, port, original):
  776. """_icehouse_migration_step
  777. This is called when migrating on Icehouse. Here, we basically just
  778. perform an unbinding and a binding at exactly the same time, but we
  779. hold a DB lock the entire time.
  780. This method expects to be called from within a database transaction,
  781. and does not create one itself.
  782. """
  783. # TODO(nj): Can we avoid re-writing policy here? Put another way, can
  784. # security groups change during migration steps, or does a separate
  785. # port update event occur?
  786. LOG.info("Migration as implemented in Icehouse")
  787. self._port_unbound_update(context, original)
  788. self._port_bound_update(context, port)
  789. def _update_port(self, plugin_context, port):
  790. """_update_port
  791. Called during port updates that have nothing to do with migration.
  792. This method assumes it's being called from within a database
  793. transaction and does not take out another one.
  794. """
  795. # TODO(nj): There's a lot of redundant code in these methods, with the
  796. # only key difference being taking out transactions. Come back and
  797. # shorten these.
  798. LOG.info("Updating port %s", port)
  799. # If the binding VIF type is unbound, we consider this port 'disabled',
  800. # and should attempt to delete it. Otherwise, the port is enabled:
  801. # re-process it.
  802. port_disabled = port['binding:vif_type'] == 'unbound'
  803. if not port_disabled:
  804. LOG.info("Port enabled, attempting to update.")
  805. self.endpoint_syncer.write_endpoint(port, plugin_context)
  806. else:
  807. # Port unbound, attempt to delete.
  808. LOG.info("Port disabled, attempting delete if needed.")
  809. self.endpoint_syncer.delete_endpoint(port)
  810. def periodic_resync_thread(self, expected_epoch):
  811. """Periodic Neutron DB -> etcd resynchronization logic.
  812. On a fixed interval, spin over relevant Neutron DB objects and
  813. reconcile them with etcd, ensuring that the etcd database and Neutron
  814. are in synchronization with each other.
  815. """
  816. try:
  817. LOG.info("Periodic resync thread started")
  818. while self._epoch == expected_epoch:
  819. # Only do the resync logic if we're actually the master node.
  820. if self.elector.master():
  821. LOG.info("I am master: doing periodic resync")
  822. # Since this thread is not associated with any particular
  823. # request, we use our own admin context for accessing the
  824. # database.
  825. admin_context = ctx.get_admin_context()
  826. try:
  827. # Resync subnets.
  828. self.subnet_syncer.resync(admin_context)
  829. # Resync policies. Do this before endpoints because
  830. # it's worse to have incorrect or missing policy for a
  831. # known endpoint, than it is to have a briefly
  832. # incorrect or missing endpoint.
  833. self.policy_syncer.resync(admin_context)
  834. # Resync endpoints.
  835. self.endpoint_syncer.resync(admin_context)
  836. # Resync ClusterInformation and FelixConfiguration.
  837. self.provide_felix_config()
  838. # Possibly request an etcd compaction.
  839. check_request_etcd_compaction()
  840. except Exception:
  841. LOG.exception("Error in periodic resync thread.")
  842. # Reschedule ourselves.
  843. eventlet.sleep(RESYNC_INTERVAL_SECS)
  844. else:
  845. # Shorter sleep interval before we check if we've become
  846. # the master. Avoids waiting a whole RESYNC_INTERVAL_SECS
  847. # if we just miss the master update.
  848. LOG.debug("I am not master")
  849. eventlet.sleep(MASTER_CHECK_INTERVAL_SECS)
  850. except Exception:
  851. # TODO(nj) Should we tear down the process.
  852. LOG.exception("Periodic resync thread died!")
  853. if self.elector:
  854. # Stop the elector so that we give up the mastership.
  855. self.elector.stop()
  856. raise
  857. else:
  858. LOG.warning("Periodic resync thread exiting.")
  859. @etcdv3.logging_exceptions
  860. def provide_felix_config(self):
  861. """provide_felix_config
  862. Specify the prefix of the TAP interfaces that Felix should
  863. look for and work with. This config setting does not have a
  864. default value, because different cloud systems will do
  865. different things. Here we provide the prefix that Neutron
  866. uses.
  867. """
  868. LOG.info("Providing Felix configuration")
  869. rewrite_cluster_info = True
  870. while rewrite_cluster_info:
  871. # Get existing global ClusterInformation. We will add to this,
  872. # rather than trampling on anything that may already be there, and
  873. # will also take care to avoid an overlapping write with some other
  874. # orchestrator.
  875. try:
  876. cluster_info, ci_mod_revision = datamodel_v3.get(
  877. "ClusterInformation",
  878. "default")
  879. except etcdv3.KeyNotFound:
  880. cluster_info = {}
  881. ci_mod_revision = 0
  882. rewrite_cluster_info = False
  883. LOG.info("Read ClusterInformation %s mod_revision %r",
  884. cluster_info,
  885. ci_mod_revision)
  886. # Generate a cluster GUID if there isn't one already.
  887. if not cluster_info.get(datamodel_v3.CLUSTER_GUID):
  888. cluster_info[datamodel_v3.CLUSTER_GUID] = \
  889. uuid.uuid4().get_hex()
  890. rewrite_cluster_info = True
  891. # Add "openstack" to the cluster type, unless there already.
  892. cluster_type = cluster_info.get(datamodel_v3.CLUSTER_TYPE, "")
  893. if cluster_type:
  894. if "openstack" not in cluster_type:
  895. cluster_info[datamodel_v3.CLUSTER_TYPE] = \
  896. cluster_type + ",openstack"
  897. rewrite_cluster_info = True
  898. else:
  899. cluster_info[datamodel_v3.CLUSTER_TYPE] = "openstack"
  900. rewrite_cluster_info = True
  901. # Note, we don't touch the Calico version field here, as we don't
  902. # know it. (With other orchestrators, it is calico/node's
  903. # responsibility to set the Calico version. But we don't run
  904. # calico/node in Calico for OpenStack.)
  905. # Set the datastore to ready, if the datastore readiness state
  906. # isn't already set at all. This field is intentionally tri-state,
  907. # i.e. it can be explicitly True, explicitly False, or not set. If
  908. # it has been set explicitly to False, that is probably because
  909. # another orchestrator is doing an upgrade or wants for some other
  910. # reason to suspend processing of the Calico datastore.
  911. if datamodel_v3.DATASTORE_READY not in cluster_info:
  912. cluster_info[datamodel_v3.DATASTORE_READY] = True
  913. rewrite_cluster_info = True
  914. # Rewrite ClusterInformation, if we changed anything above.
  915. if rewrite_cluster_info:
  916. LOG.info("New ClusterInformation: %s", cluster_info)
  917. if datamodel_v3.put("ClusterInformation",
  918. datamodel_v3.NOT_NAMESPACED,
  919. "default",
  920. cluster_info,
  921. mod_revision=ci_mod_revision):
  922. rewrite_cluster_info = False
  923. else:
  924. # Short sleep to avoid a tight loop.
  925. eventlet.sleep(1)
  926. rewrite_felix_config = True
  927. while rewrite_felix_config:
  928. # Get existing global FelixConfiguration. We will add to this,
  929. # rather than trampling on anything that may already be there, and
  930. # will also take care to avoid an overlapping write with some other
  931. # orchestrator.
  932. try:
  933. felix_config, fc_mod_revision = datamodel_v3.get(
  934. "FelixConfiguration",
  935. "default")
  936. except etcdv3.KeyNotFound:
  937. felix_config = {}
  938. fc_mod_revision = 0
  939. rewrite_felix_config = False
  940. LOG.info("Read FelixConfiguration %s mod_revision %r",
  941. felix_config,
  942. fc_mod_revision)
  943. # Enable endpoint reporting.
  944. if not felix_config.get(datamodel_v3.ENDPOINT_REPORTING_ENABLED,
  945. False):
  946. felix_config[datamodel_v3.ENDPOINT_REPORTING_ENABLED] = True
  947. rewrite_felix_config = True
  948. # Ensure that interface prefixes include 'tap'.
  949. interface_prefix = felix_config.get(datamodel_v3.INTERFACE_PREFIX)
  950. prefixes = interface_prefix.split(',') if interface_prefix else []
  951. if 'tap' not in prefixes:
  952. prefixes.append('tap')
  953. felix_config[datamodel_v3.INTERFACE_PREFIX] = \
  954. ','.join(prefixes)
  955. rewrite_felix_config = True
  956. # Rewrite FelixConfiguration, if we changed anything above.
  957. if rewrite_felix_config:
  958. LOG.info("New FelixConfiguration: %s", felix_config)
  959. if datamodel_v3.put("FelixConfiguration",
  960. datamodel_v3.NOT_NAMESPACED,
  961. "default",
  962. felix_config,
  963. mod_revision=fc_mod_revision):
  964. rewrite_felix_config = False
  965. else:
  966. # Short sleep to avoid a tight loop.
  967. eventlet.sleep(1)
  968. # This section monkeypatches the AgentNotifierApi.security_groups_rule_updated
  969. # method to ensure that the Calico driver gets told about security group
  970. # updates at all times. This is a deeply unpleasant hack. Please, do as I say,
  971. # not as I do.
  972. #
  973. # For more info, please see issues #635 and #641.
  974. original_sgr_updated = rpc.AgentNotifierApi.security_groups_rule_updated
  975. def security_groups_rule_updated(self, context, sgids):
  976. LOG.info("security_groups_rule_updated: %s %s" % (context, sgids))
  977. mech_driver.send_sg_updates(sgids, context)
  978. original_sgr_updated(self, context, sgids)
  979. rpc.AgentNotifierApi.security_groups_rule_updated = (
  980. security_groups_rule_updated
  981. )
  982. def port_status_change(port, original):
  983. """port_status_change
  984. Checks whether a port update is being called for a port status change
  985. event.
  986. Port activation events are triggered by our own action: if the only change
  987. in the port dictionary is activation state, we don't want to do any
  988. processing.
  989. """
  990. # Be defensive here: if Neutron is going to use these port dicts later we
  991. # don't want to have taken away data they want. Take copies.
  992. port = port.copy()
  993. original = original.copy()
  994. port.pop('status')
  995. original.pop('status')
  996. if port == original:
  997. return True
  998. else:
  999. return False
  1000. def port_bound(port):
  1001. """Returns true if the port is bound."""
  1002. return port['binding:vif_type'] != 'unbound'
  1003. def felix_agent_state(hostname, start_flag=False):
  1004. """felix_agent_state
  1005. :param bool start_flag: True if this is a new felix, that is starting up.
  1006. False if this is a refresh of an existing felix.
  1007. :returns dict: agent status dict appropriate for inserting into Neutron DB.
  1008. """
  1009. state = {'agent_type': AGENT_TYPE_FELIX,
  1010. 'binary': AGENT_ID_FELIX,
  1011. 'host': hostname,
  1012. 'topic': constants.L2_AGENT_TOPIC}
  1013. if start_flag:
  1014. # Felix has told us that it has only just started, report that to
  1015. # neutron, which will use it to reset its view of the uptime.
  1016. state['start_flag'] = True
  1017. return state
  1018. COMPACTION_PREFIX = "/calico/compaction/v1/"
  1019. COMPACTION_TRIGGER_KEY = COMPACTION_PREFIX + "trigger"
  1020. COMPACTION_LAST_KEY = COMPACTION_PREFIX + "last"
  1021. def check_request_etcd_compaction():
  1022. """Possibly request an etcd compaction.
  1023. Without any compaction, etcd's disk usage will grow without bound because
  1024. of it retaining previous revisions for all known keys. Compaction, at a
  1025. particular revision, tells etcd to forget the detailed information for all
  1026. revisions before that, and so keeps etcd memory usage in check.
  1027. By default, therefore, networking-calico requests an etcd compaction every
  1028. 60 minutes. This period is controlled by the etcd_compaction_period_mins
  1029. config setting, and requesting compaction can be disabled by setting that
  1030. to 0.
  1031. Each time we consider a compaction, we ensure that we retain history for
  1032. the previous etcd_compaction_period_mins interval, and also for at least
  1033. the last etcd_compaction_min_revisions revisions.
  1034. We piggyback on the master election infrastructure so that only one thread
  1035. of the Neutron server requests compaction, each time that it becomes due.
  1036. """
  1037. # If periodic etcd compaction is disabled, do nothing here.
  1038. if cfg.CONF.calico.etcd_compaction_period_mins == 0:
  1039. return
  1040. try:
  1041. # Try to read the compaction trigger key.
  1042. try:
  1043. _, _, lease = etcdv3.get(COMPACTION_TRIGGER_KEY, with_lease=True)
  1044. # No exception, so the key still exists. Check that it still has a
  1045. # lease and TTL as expected. (For example, the lease could be
  1046. # missing or have an unreasonably large TTL, if the etcd cluster
  1047. # has been restarted after restoring from an incomplete or corrupt
  1048. # backup.)
  1049. if lease is None:
  1050. # Start from scratch as though neither of the compaction keys
  1051. # is present.
  1052. LOG.warning("Compaction key has lost its lease; rewriting")
  1053. write_compaction_keys(0)
  1054. return
  1055. # We're now going to sanity check the lease, but that involves
  1056. # further requests to the etcd server, and it's possible for those
  1057. # to fail if the lease is expiring _right now_. We will catch that
  1058. # and handle it the same as if the key was not there.
  1059. try:
  1060. ttl = lease.ttl()
  1061. if ttl > cfg.CONF.calico.etcd_compaction_period_mins * 60:
  1062. # Start from scratch as though neither of the compaction
  1063. # keys is present.
  1064. LOG.warning("Unreasonably large lease TTL (%r)", ttl)
  1065. write_compaction_keys(0)
  1066. return
  1067. # Lease is there and TTL is reasonable: just wait for more time
  1068. # to pass then.
  1069. LOG.info("Compaction trigger TTL is %r", ttl)
  1070. return
  1071. except (etcdv3.Etcd3Exception, KeyError) as e:
  1072. # Etcd3Exception "Not Found" is expected if the lease has just
  1073. # expired and been removed. We can also get KeyError 'TTL'
  1074. # because of JSON missing the 'TTL' field; for example here's
  1075. # what we see if we create a lease with TTL 5s and then query
  1076. # it every 0.5s:
  1077. #
  1078. # {..., u'grantedTTL': u'5', u'ID': u'75...', u'TTL': u'4'}
  1079. # {..., u'grantedTTL': u'5', u'ID': u'75...', u'TTL': u'4'}
  1080. # {..., u'grantedTTL': u'5', u'ID': u'75...', u'TTL': u'3'}
  1081. # {..., u'grantedTTL': u'5', u'ID': u'75...', u'TTL': u'3'}
  1082. # {..., u'grantedTTL': u'5', u'ID': u'75...', u'TTL': u'2'}
  1083. # {..., u'grantedTTL': u'5', u'ID': u'75...', u'TTL': u'2'}
  1084. # {..., u'grantedTTL': u'5', u'ID': u'75...', u'TTL': u'1'}
  1085. # {..., u'grantedTTL': u'5', u'ID': u'75...', u'TTL': u'1'}
  1086. # {..., u'ID': u'75...', u'grantedTTL': u'5'}
  1087. # {..., u'ID': u'75...', u'grantedTTL': u'5'}
  1088. # {..., u'ID': u'75...', u'grantedTTL': u'5'}
  1089. # {..., u'ID': u'75...', u'TTL': u'-1'}
  1090. # {..., u'ID': u'75...', u'TTL': u'-1'}
  1091. # {..., u'ID': u'75...', u'TTL': u'-1'}
  1092. #
  1093. # Strange but true!
  1094. LOG.info("Lease expired as we were checking it: %r", e)
  1095. # Now fall through to the code below to consider requesting a
  1096. # compaction.
  1097. except etcdv3.KeyNotFound:
  1098. # The key has timed out, so etcd_compaction_period_mins has passed
  1099. # since the last time we considered compaction. (Or else the key
  1100. # has never existed yet.)
  1101. pass
  1102. # Find out when the last compaction happened, and what the current
  1103. # revision was etcd_compaction_period_mins ago.
  1104. try:
  1105. last_compaction_rev, last_check_rev = etcdv3.get(
  1106. COMPACTION_LAST_KEY
  1107. )
  1108. last_compaction_rev = int(last_compaction_rev)
  1109. last_check_rev = int(last_check_rev)
  1110. LOG.info("Last compaction %r, last check %r",
  1111. last_compaction_rev, last_check_rev)
  1112. except etcdv3.KeyNotFound:
  1113. # This is the first time we've checked for compaction. No
  1114. # possibility of compacting this time, because we always want to
  1115. # keep history for at least one etcd_compaction_period_mins
  1116. # interval, and we can't yet tell what that means. Write the keys
  1117. # so that we will be able to tell this next time round.
  1118. LOG.info("First check")
  1119. write_compaction_keys(0)
  1120. return
  1121. # Get the current revision.
  1122. _, current_revision = etcdv3.get_status()
  1123. current_revision = int(current_revision)
  1124. LOG.info("Current etcd revision is %r", current_revision)
  1125. # Defensive sanity check that the read last_compaction_rev is less than
  1126. # the current revision. Conceivably a user could restore from backup
  1127. # and throw off the revisions. In that case, rewrite the keys with
  1128. # last_compaction 0 and returning without compacting. (Note: it isn't
  1129. # possible for last_check_rev to be similarly bogus, because it is
  1130. # current etcd cluster metadata from the same source as
  1131. # current_revision.)
  1132. if last_compaction_rev > current_revision:
  1133. LOG.info("Bogus last compaction revision (%r > %r)",
  1134. last_compaction_rev,
  1135. current_revision)
  1136. write_compaction_keys(0)
  1137. return
  1138. # We must keep at least etcd_compaction_min_revisions of history. If
  1139. # there aren't that many yet, we can't compact.
  1140. if current_revision <= cfg.CONF.calico.etcd_compaction_min_revisions:
  1141. LOG.info("Not enough revisions to compact yet (%r <= %r)",
  1142. current_revision,
  1143. cfg.CONF.calico.etcd_compaction_min_revisions)
  1144. # Note: there could still be a non-zero last_compaction_rev here,
  1145. # if the Neutron server has been restarted with an increased value
  1146. # of etcd_compaction_min_revisions.
  1147. write_compaction_keys(last_compaction_rev)
  1148. return
  1149. # Calculate the amount of history to keep. This must be at least
  1150. # etcd_compaction_min_revisions.
  1151. keep_revisions = cfg.CONF.calico.etcd_compaction_min_revisions
  1152. # But must also be at least the history of the whole previous
  1153. # etcd_compaction_period_mins interval.
  1154. if keep_revisions < (current_revision - last_check_rev):
  1155. keep_revisions = current_revision - last_check_rev
  1156. # So that would mean compacting at:
  1157. compact_revision = current_revision - keep_revisions
  1158. if compact_revision <= last_compaction_rev:
  1159. # We've already compacted at or after that revision. Wait for more
  1160. # time to pass, or history to accumulate.
  1161. LOG.info("No compactable history yet (%r <= %r)",
  1162. compact_revision, last_compaction_rev)
  1163. write_compaction_keys(last_compaction_rev)
  1164. return
  1165. # Request compaction at that revision.
  1166. LOG.info("Request compaction at %r", compact_revision)
  1167. try:
  1168. etcdv3.request_compaction(compact_revision)
  1169. except etcdv3.Etcd3Exception:
  1170. # An exception here most likely means that the revision we're
  1171. # asking to compact at has already been compacted - which means
  1172. # that there is some other service in the deployment which is also
  1173. # taking some responsibility for etcd compaction. (For example, it
  1174. # could be libcalico-go.)
  1175. #
  1176. # In that case, and given that it isn't straightforward for us to
  1177. # discover exactly what the current compacted revision is, just
  1178. # imagine that it's the same as the current revision. That means
  1179. # that this code won't consider compacting again until another
  1180. # etcd_compaction_min_revisions revisions and
  1181. # etcd_compaction_period_mins minutes have passed.
  1182. #
  1183. # (On the other hand, if the exception is for some other reason
  1184. # such as connectivity to the etcd cluster, the following write
  1185. # will hit that too, and that will be handled below.)
  1186. LOG.info("Someone else has requested etcd compaction")
  1187. write_compaction_keys(current_revision)
  1188. return
  1189. # Record the new compaction revision.
  1190. write_compaction_keys(compact_revision)
  1191. except Exception:
  1192. # Something wrong with etcd connectivity; clearly then we can't do any
  1193. # compaction. Just log, and we'll try again on the next resync.
  1194. LOG.exception("Failed to check/request compaction")
  1195. def write_compaction_keys(compaction_revision):
  1196. # Write the last key to record the last compaction and check revisions (the
  1197. # latter implicitly, as mod_revision).
  1198. if not etcdv3.put(COMPACTION_LAST_KEY, str(compaction_revision)):
  1199. # Writing should always succeed; but in case it doesn't we will retry
  1200. # as part of the next resync, so just a warning is sufficient here.
  1201. LOG.warning("Failed to write %s", COMPACTION_LAST_KEY)
  1202. # Write the trigger key, with TTL such that it will disappear again after
  1203. # etcd_compaction_period_mins.
  1204. lease = etcdv3.get_lease(
  1205. cfg.CONF.calico.etcd_compaction_period_mins * 60
  1206. )
  1207. if not etcdv3.put(COMPACTION_TRIGGER_KEY, str(os.getpid()), lease=lease):
  1208. # Writing should always succeed; but in case it doesn't we will retry
  1209. # as part of the next resync, so just a warning is sufficient here.
  1210. LOG.warning("Failed to write %s", COMPACTION_TRIGGER_KEY)