Big Switch Networks plugins and drivers for the networking project
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

servermanager.py 51KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212
  1. # Copyright 2014 Big Switch Networks, Inc.
  2. # All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. """
  16. This module manages the HTTP and HTTPS connections to the backend controllers.
  17. The main class it provides for external use is ServerPool which manages a set
  18. of ServerProxy objects that correspond to individual backend controllers.
  19. The following functionality is handled by this module:
  20. - Translation of rest_* function calls to HTTP/HTTPS calls to the controllers
  21. - Automatic failover between controllers
  22. - SSL Certificate enforcement
  23. - HTTP Authentication
  24. """
  25. import base64
  26. import httplib
  27. import re
  28. import socket
  29. import ssl
  30. import time
  31. from neutron_lib import exceptions
  32. from oslo_log import log as logging
  33. import eventlet
  34. import eventlet.corolocal
  35. from keystoneauth1.identity import v3
  36. from keystoneauth1 import session
  37. from keystoneclient.v3 import client as ksclient
  38. from networking_bigswitch.plugins.bigswitch.db import consistency_db as cdb
  39. from networking_bigswitch.plugins.bigswitch.i18n import _
  40. from networking_bigswitch.plugins.bigswitch.i18n import _LE
  41. from networking_bigswitch.plugins.bigswitch.i18n import _LI
  42. from networking_bigswitch.plugins.bigswitch.i18n import _LW
  43. from networking_bigswitch.plugins.bigswitch.utils import Util
  44. import os
  45. from oslo_config import cfg
  46. from oslo_serialization import jsonutils
  47. from oslo_utils import excutils
  48. from sqlalchemy.types import Enum
  49. LOG = logging.getLogger(__name__)
  50. # The following are used to invoke the API on the external controller
  51. CAPABILITIES_PATH = "/capabilities"
  52. NET_RESOURCE_PATH = "/tenants/%s/networks"
  53. PORT_RESOURCE_PATH = "/tenants/%s/networks/%s/ports"
  54. ROUTER_RESOURCE_PATH = "/tenants/%s/routers"
  55. ROUTER_INTF_OP_PATH = "/tenants/%s/routers/%s/interfaces"
  56. SECURITY_GROUP_RESOURCE_PATH = "/securitygroups"
  57. TENANT_RESOURCE_PATH = "/tenants"
  58. NETWORKS_PATH = "/tenants/%s/networks/%s"
  59. FLOATINGIPS_PATH = "/tenants/%s/floatingips/%s"
  60. PORTS_PATH = "/tenants/%s/networks/%s/ports/%s"
  61. ATTACHMENT_PATH = "/tenants/%s/networks/%s/ports/%s/attachment"
  62. ROUTERS_PATH = "/tenants/%s/routers/%s"
  63. ROUTER_INTF_PATH = "/tenants/%s/routers/%s/interfaces/%s"
  64. SECURITY_GROUP_PATH = "/securitygroups/%s"
  65. TENANT_PATH = "/tenants/%s"
  66. TOPOLOGY_PATH = "/topology"
  67. HEALTH_PATH = "/health"
  68. SWITCHES_PATH = "/switches/%s"
  69. TESTPATH_PATH = ('/testpath/controller-view'
  70. '?src-tenant=%(src-tenant)s'
  71. '&src-segment=%(src-segment)s&src-ip=%(src-ip)s'
  72. '&dst-ip=%(dst-ip)s')
  73. TENANTPOLICY_RESOURCE_PATH = "/tenants/%s/policies"
  74. TENANTPOLICIES_PATH = "/tenants/%s/policies/%s"
  75. SUCCESS_CODES = range(200, 207)
  76. FAILURE_CODES = [0, 301, 302, 303, 400, 401, 403, 404, 500, 501, 502, 503,
  77. 504, 505]
  78. BASE_URI = '/networkService/v2.0'
  79. ORCHESTRATION_SERVICE_ID = 'Neutron v2.0'
  80. HASH_MATCH_HEADER = 'X-BSN-BVS-HASH-MATCH'
  81. SERVICE_TENANT = 'VRRP_Service'
  82. KS_AUTH_GROUP_NAME = 'keystone_authtoken'
  83. KS_AUTH_DOMAIN_DEFAULT = 'default'
  84. # error messages
  85. NXNETWORK = 'NXVNS'
  86. HTTP_SERVICE_UNAVAILABLE_RETRY_COUNT = 3
  87. HTTP_SERVICE_UNAVAILABLE_RETRY_INTERVAL = 3
  88. KEYSTONE_SYNC_RATE_LIMIT = 30 # Limit KeyStone sync to once in 30 secs
  89. # TOPO_SYNC Responses
  90. TOPO_RESPONSE_OK = (httplib.OK, httplib.OK, True, True)
  91. TOPO_RESPONSE_FAIL = (0, None, None, None)
  92. # RE pattern for checking BCF supported names
  93. BCF_IDENTIFIER_UUID_RE = re.compile(r"[0-9a-zA-Z][-.0-9a-zA-Z_]*")
  94. class TenantIDNotFound(exceptions.NeutronException):
  95. message = _("Tenant: %(tenant)s is not known by keystone.")
  96. status = None
  97. def __init__(self, **kwargs):
  98. self.tenant = kwargs.get('tenant')
  99. super(TenantIDNotFound, self).__init__(**kwargs)
  100. class UnsupportedNameException(exceptions.NeutronException):
  101. """UnsupportedNameException
  102. Exception class to be raised when encountering object names with
  103. unsupported names. Namely those that do not conform to the regular
  104. expression BCF_IDENTIFIER_UUID_RE
  105. :keyword obj_type
  106. :keyword obj_id
  107. :keyword obj_name
  108. """
  109. message = _("Object of type %(obj_type)s and id %(obj_id)s has unsupported"
  110. " character in name \"%(obj_name)s\". It should begin with an"
  111. " alphanumeric character [0-9a-zA-Z] and can contain space,"
  112. " underscore, apostrophe, forward slash, opening and closing"
  113. " square brackets.")
  114. status = None
  115. class UnsupportedTenantNameInObjectException(exceptions.NeutronException):
  116. """Unsupported Tenant Name
  117. Exception class to be raised when objects have tenant names with
  118. unsupported characters. Namely those that do not conform to the regular
  119. expression BCF_IDENTIFIER_UUID_RE
  120. :keyword obj_type
  121. :keyword obj_id
  122. :keyword obj_name
  123. :keyword tenant_name
  124. """
  125. message = _("Object of type %(obj_type)s, id %(obj_id)s and name "
  126. "%(obj_name)s has unsupported character in its tenant name "
  127. "\"%(tenant_name)s\". It should begin with an"
  128. "alphanumeric character [0-9a-zA-Z] and can contain space,"
  129. "underscore, apostrophe and double-quotes.")
  130. status = None
  131. class NetworkNameChangeError(exceptions.NeutronException):
  132. message = _("network name is not allowed to be changed.")
  133. status = None
  134. class RemoteRestError(exceptions.NeutronException):
  135. message = _("Error in REST call to BCF "
  136. "controller: %(reason)s")
  137. status = None
  138. def __init__(self, **kwargs):
  139. self.status = kwargs.pop('status', None)
  140. self.reason = kwargs.get('reason')
  141. super(RemoteRestError, self).__init__(**kwargs)
  142. class DictDiffer(object):
  143. """Calculate the difference between two dictionaries as:
  144. (1) items added
  145. (2) items removed
  146. (3) keys same in both but changed values
  147. (4) keys same in both and unchanged values
  148. """
  149. def __init__(self, current_dict, past_dict):
  150. self.current_dict, self.past_dict = current_dict, past_dict
  151. self.set_current, self.set_past = (set(current_dict.keys()),
  152. set(past_dict.keys()))
  153. self.intersect = self.set_current.intersection(self.set_past)
  154. def added(self):
  155. return self.set_current - self.intersect
  156. def removed(self):
  157. return self.set_past - self.intersect
  158. def changed(self):
  159. return set(o for o in self.intersect
  160. if self.past_dict[o] != self.current_dict[o])
  161. def unchanged(self):
  162. return set(o for o in self.intersect
  163. if self.past_dict[o] == self.current_dict[o])
  164. class ObjTypeEnum(Enum):
  165. """Enum
  166. Enum to represent various object types whose name requires sanitization
  167. before syncing to the controller.
  168. """
  169. network = "network"
  170. router = "router"
  171. security_group = "security_group"
  172. subnet = "subnet"
  173. tenant = "tenant"
  174. def is_valid_bcf_name(name):
  175. """Check if bcf name is valid
  176. :returns True if name matches BCF_IDENTIFIER_UUID_RE
  177. :returns False otherwise
  178. """
  179. match_obj = BCF_IDENTIFIER_UUID_RE.match(name)
  180. if match_obj and match_obj.group(0) == name:
  181. return True
  182. return False
  183. def get_keystoneauth_cfg(conf, name):
  184. """get the keystone auth cfg
  185. Fetch value of keystone_authtoken group from config file when not
  186. available as part of GroupAttr.
  187. :rtype: String
  188. :param conf: oslo config cfg.CONF
  189. :param name: property name to be retrieved
  190. """
  191. try:
  192. # expected response is a tuple consisting of
  193. # (value_list, source_of_value). since keystone attributes are fields
  194. # like username, password and domain name - they're not actually lists.
  195. # so we pick the first item from the list
  196. value_list, source_of_value = conf._namespace._get_file_value(
  197. [(KS_AUTH_GROUP_NAME, name)])
  198. LOG.debug('KEYSTONE_AUTHTOKEN value for %(name)s is %(value)s',
  199. {'name': name, 'value': value_list[0]})
  200. return value_list[0]
  201. except KeyError as e:
  202. LOG.warning("Config does not have property %(name)s "
  203. "in group keystone_authtoken", {'name': name})
  204. raise e
  205. class ServerProxy(object):
  206. """REST server proxy to a network controller."""
  207. def __init__(self, server, port, ssl, auth, neutron_id, timeout,
  208. base_uri, name, mypool, combined_cert):
  209. self.server = server
  210. self.port = port
  211. self.ssl = ssl
  212. self.base_uri = base_uri
  213. self.timeout = timeout
  214. self.name = name
  215. self.success_codes = SUCCESS_CODES
  216. self.auth = None
  217. self.auth_token = None
  218. self.neutron_id = neutron_id
  219. self.failed = False
  220. self.capabilities = []
  221. # enable server to reference parent pool
  222. self.mypool = mypool
  223. # cache connection here to avoid a SSL handshake for every connection
  224. self.currentconn = None
  225. if auth:
  226. if ':' in auth:
  227. self.auth = 'Basic ' + base64.encodestring(auth).strip()
  228. else:
  229. self.auth_token = 'session_cookie="' + auth + '"'
  230. self.combined_cert = combined_cert
  231. def get_capabilities(self):
  232. try:
  233. body = self.rest_call('GET', CAPABILITIES_PATH)[2]
  234. if body:
  235. self.capabilities = jsonutils.loads(body)
  236. except Exception:
  237. LOG.exception("Couldn't retrieve capabilities on server "
  238. "%(server)s. ", {'server': self.server})
  239. LOG.info("The following capabilities were received "
  240. "for %(server)s: %(cap)s",
  241. {'server': self.server, 'cap': self.capabilities})
  242. return self.capabilities
  243. def rest_call(self, action, resource, data='', headers=None,
  244. timeout=False, reconnect=False):
  245. uri = self.base_uri + resource
  246. body = jsonutils.dumps(data)
  247. headers = headers or {}
  248. headers['Content-type'] = 'application/json'
  249. headers['Accept'] = 'application/json'
  250. headers['NeutronProxy-Agent'] = self.name
  251. headers['Instance-ID'] = self.neutron_id
  252. headers['Orchestration-Service-ID'] = ORCHESTRATION_SERVICE_ID
  253. # TODO(kevinbenton): Re-enable keep-alive in a thread-safe fashion.
  254. # When multiple workers are enabled the saved connection gets mangled
  255. # by multiple threads so we always reconnect.
  256. if 'keep-alive' in self.capabilities and False:
  257. headers['Connection'] = 'keep-alive'
  258. else:
  259. reconnect = True
  260. if self.auth_token:
  261. headers['Cookie'] = self.auth_token
  262. elif self.auth:
  263. headers['Authorization'] = self.auth
  264. LOG.debug("ServerProxy: server=%(server)s, port=%(port)d, "
  265. "ssl=%(ssl)r",
  266. {'server': self.server, 'port': self.port, 'ssl': self.ssl})
  267. LOG.debug("ServerProxy: resource=%(resource)s, data=%(data)r, "
  268. "headers=%(headers)r, action=%(action)s",
  269. {'resource': resource, 'data': data, 'headers': headers,
  270. 'action': action})
  271. # unspecified timeout is False because a timeout can be specified as
  272. # None to indicate no timeout.
  273. if timeout is False:
  274. timeout = self.timeout
  275. if timeout != self.timeout:
  276. # need a new connection if timeout has changed
  277. reconnect = True
  278. if not self.currentconn or reconnect:
  279. if self.currentconn:
  280. self.currentconn.close()
  281. if self.ssl:
  282. currentconn = HTTPSConnectionWithValidation(
  283. self.server, self.port, timeout=timeout)
  284. if currentconn is None:
  285. LOG.error('ServerProxy: Could not establish HTTPS '
  286. 'connection')
  287. return 0, None, None, None
  288. currentconn.combined_cert = self.combined_cert
  289. else:
  290. currentconn = httplib.HTTPConnection(
  291. self.server, self.port, timeout=timeout)
  292. if currentconn is None:
  293. LOG.error('ServerProxy: Could not establish HTTP '
  294. 'connection')
  295. return 0, None, None, None
  296. try:
  297. bcf_request_time = time.time()
  298. currentconn.request(action, uri, body, headers)
  299. response = currentconn.getresponse()
  300. respstr = response.read()
  301. respdata = respstr
  302. bcf_response_time = time.time()
  303. LOG.debug("Time waited to get response from BCF %.2fsecs",
  304. (bcf_response_time - bcf_request_time))
  305. if response.status in self.success_codes:
  306. try:
  307. respdata = jsonutils.loads(respstr)
  308. except ValueError:
  309. # response was not JSON, ignore the exception
  310. pass
  311. ret = (response.status, response.reason, respstr, respdata)
  312. except httplib.HTTPException:
  313. # If we were using a cached connection, try again with a new one.
  314. with excutils.save_and_reraise_exception() as ctxt:
  315. currentconn.close()
  316. if reconnect:
  317. # if reconnect is true, this was on a fresh connection so
  318. # reraise since this server seems to be broken
  319. ctxt.reraise = True
  320. else:
  321. # if reconnect is false, it was a cached connection so
  322. # try one more time before re-raising
  323. ctxt.reraise = False
  324. return self.rest_call(action, resource, data, headers,
  325. timeout=timeout, reconnect=True)
  326. except (socket.timeout, socket.error) as e:
  327. currentconn.close()
  328. LOG.error('ServerProxy: %(action)s failure, %(e)r',
  329. {'action': action, 'e': e})
  330. ret = 0, None, None, None
  331. LOG.debug("ServerProxy: status=%(status)d, reason=%(reason)r, "
  332. "ret=%(ret)s, data=%(data)r", {'status': ret[0],
  333. 'reason': ret[1],
  334. 'ret': ret[2],
  335. 'data': ret[3]})
  336. return ret
  337. class ServerPool(object):
  338. _instance = None
  339. @classmethod
  340. def get_instance(cls):
  341. if cls._instance:
  342. return cls._instance
  343. cls._instance = cls()
  344. return cls._instance
  345. def __init__(self, timeout=False,
  346. base_uri=BASE_URI, name='NeutronRestProxy'):
  347. LOG.debug("ServerPool: initializing")
  348. # 'servers' is the list of network controller REST end-points
  349. # (used in order specified till one succeeds, and it is sticky
  350. # till next failure). Use 'server_auth' to encode api-key
  351. servers = cfg.CONF.RESTPROXY.servers
  352. self.auth = cfg.CONF.RESTPROXY.server_auth
  353. self.ssl = cfg.CONF.RESTPROXY.server_ssl
  354. self.neutron_id = cfg.CONF.RESTPROXY.neutron_id
  355. # unicode config
  356. self.cfg_unicode_enabled = cfg.CONF.RESTPROXY.naming_scheme_unicode
  357. if 'keystone_authtoken' in cfg.CONF:
  358. self.auth_user = get_keystoneauth_cfg(cfg.CONF, 'username')
  359. self.auth_password = get_keystoneauth_cfg(cfg.CONF, 'password')
  360. self.auth_url = get_keystoneauth_cfg(cfg.CONF, 'auth_url')
  361. self.auth_tenant = get_keystoneauth_cfg(cfg.CONF, 'project_name')
  362. self.project_domain_name = get_keystoneauth_cfg(
  363. cfg.CONF, 'project_domain_name')
  364. self.user_domain_name = get_keystoneauth_cfg(
  365. cfg.CONF, 'user_domain_name')
  366. else:
  367. # this is for UT only
  368. LOG.warning("keystone_authtoken not found in "
  369. "/etc/neutron/neutron.conf. "
  370. "Please check config file")
  371. self.auth_url = cfg.CONF.RESTPROXY.auth_url
  372. self.auth_user = cfg.CONF.RESTPROXY.auth_user
  373. self.auth_password = cfg.CONF.RESTPROXY.auth_password
  374. self.auth_tenant = cfg.CONF.RESTPROXY.auth_tenant
  375. self.project_domain_name = KS_AUTH_DOMAIN_DEFAULT
  376. self.user_domain_name = KS_AUTH_DOMAIN_DEFAULT
  377. # Use Keystonev3 URL for authentication
  378. if "v2.0" in self.auth_url:
  379. self.auth_url = self.auth_url.replace("v2.0", "v3")
  380. elif "v3" not in self.auth_url:
  381. self.auth_url = "%s/v3" % self.auth_url
  382. self.base_uri = base_uri
  383. self.name = name
  384. # Cache for Openstack projects
  385. # The cache is maintained in a separate thread and sync'ed with
  386. # Keystone periodically.
  387. self.keystone_tenants = {}
  388. self._update_tenant_cache(reconcile=False)
  389. self.timeout = cfg.CONF.RESTPROXY.server_timeout
  390. self.always_reconnect = not cfg.CONF.RESTPROXY.cache_connections
  391. self.capabilities = []
  392. default_port = 8000
  393. if timeout is not False:
  394. self.timeout = timeout
  395. # Function to use to retrieve topology for consistency syncs.
  396. # Needs to be set by module that uses the servermanager.
  397. self.get_topo_function = None
  398. self.get_topo_function_args = {}
  399. if not servers:
  400. raise cfg.Error(_('Servers not defined. Aborting server manager.'))
  401. servers = [s if len(s.rsplit(':', 1)) == 2
  402. else "%s:%d" % (s, default_port)
  403. for s in servers]
  404. if any((len(spl) != 2 or not spl[1].isdigit())
  405. for spl in [sp.rsplit(':', 1)
  406. for sp in servers]):
  407. raise cfg.Error(_('Servers must be defined as <ip>:<port>. '
  408. 'Configuration was %s') % servers)
  409. self.servers = []
  410. for s in servers:
  411. server, port = s.rsplit(':', 1)
  412. if server.startswith("[") and server.endswith("]"):
  413. # strip [] for ipv6 address
  414. server = server[1:-1]
  415. self.servers.append(self.server_proxy_for(server, int(port)))
  416. self.start_background_tasks()
  417. ServerPool._instance = self
  418. LOG.debug("ServerPool: initialization done")
  419. def start_background_tasks(self):
  420. # update capabilities, starts immediately
  421. # updates every 5 minutes, mostly for bcf upgrade/downgrade cases
  422. eventlet.spawn(self._capability_watchdog, 300)
  423. # consistency check, starts after 1* consistency_interval
  424. eventlet.spawn(self._consistency_watchdog,
  425. cfg.CONF.RESTPROXY.consistency_interval)
  426. # Start keystone sync thread after 5 consistency sync
  427. # to give enough time for topology to sync over when
  428. # neutron-server starts.
  429. eventlet.spawn_after(
  430. 5 * cfg.CONF.RESTPROXY.consistency_interval,
  431. self._keystone_sync,
  432. cfg.CONF.RESTPROXY.keystone_sync_interval)
  433. def get_capabilities(self):
  434. """Get capabilities
  435. If cache has the value, use it
  436. If Not, do REST calls to BCF controllers to check it
  437. :return: supported capability list
  438. """
  439. # lookup on first try
  440. # if capabilities is empty, the check is either not done, or failed
  441. if self.capabilities:
  442. return self.capabilities
  443. else:
  444. return self.get_capabilities_force_update()
  445. def get_capabilities_force_update(self):
  446. """Do REST calls to update capabilities
  447. Logs a unicode change message when:
  448. 1. the first time that plugin gets capabilities from BCF
  449. 2. plugin notices the unicode mode is changed
  450. :return: combined capability list from all servers
  451. """
  452. # Servers should be the same version
  453. # If one server is down, use online server's capabilities
  454. capability_list = [set(server.get_capabilities())
  455. for server in self.servers]
  456. new_capabilities = set.union(*capability_list)
  457. self.log_unicode_status_change(new_capabilities)
  458. self.capabilities = new_capabilities
  459. # With multiple workers enabled, the fork may occur after the
  460. # connections to the DB have been established. We need to clear the
  461. # connections after the first attempt to call the backend to ensure
  462. # that the established connections will all be local to the thread and
  463. # not shared. Placing it here in the capabilities call is the easiest
  464. # way to ensure that its done after everything is initialized and the
  465. # first call to the backend is made.
  466. # This is necessary in our plugin and not others because we have a
  467. # completely separate DB connection for the consistency records. The
  468. # main connection is made thread-safe in the neutron service init.
  469. # https://github.com/openstack/neutron/blob/
  470. # ec716b9e68b8b66a88218913ae4c9aa3a26b025a/neutron/wsgi.py#L104
  471. if cdb.HashHandler._FACADE:
  472. cdb.HashHandler._FACADE.get_engine().pool.dispose()
  473. if not new_capabilities:
  474. LOG.error('Failed to get capabilities on any controller. ')
  475. return self.capabilities
  476. def log_unicode_status_change(self, new_capabilities):
  477. """Log unicode status, if capabilities is initialized or if changed
  478. Compares old capabilities with new capabilities
  479. :param new_capabilities: new capabilities
  480. :return:
  481. """
  482. if new_capabilities and self.capabilities != new_capabilities:
  483. # unicode disabled by user
  484. if not self.cfg_unicode_enabled:
  485. # Log only during Initialization
  486. if not self.capabilities:
  487. LOG.info('naming_scheme_unicode is set to False,'
  488. ' Unicode names Disabled')
  489. # unicode enabled and supported by controller
  490. elif 'display-name' in new_capabilities:
  491. # Log for 2 situations:
  492. # 1. Initialization
  493. # 2. BCF is upgraded to support unicode
  494. if 'display-name' not in self.capabilities:
  495. LOG.info('naming_scheme_unicode is set to True,'
  496. ' Unicode names Enabled')
  497. # unicode enabled, but not supported by controller
  498. else:
  499. # Log for 2 situations:
  500. # 1. Initialization
  501. # 2. BCF is downgraded, no longer supports unicode
  502. if not self.capabilities or 'display-name' in \
  503. self.capabilities:
  504. LOG.warning('naming_scheme_unicode is set to True,'
  505. ' but BCF does not support it.'
  506. ' Unicode names Disabled')
  507. def is_unicode_enabled(self):
  508. """Check unicode running status
  509. True: enabled
  510. False: disabled
  511. """
  512. if not self.get_capabilities():
  513. msg = 'Capabilities unknown! Please check BCF controller status.'
  514. raise RemoteRestError(reason=msg)
  515. if self.cfg_unicode_enabled and 'display-name' in \
  516. self.get_capabilities():
  517. return True
  518. else:
  519. return False
  520. def server_proxy_for(self, server, port):
  521. combined_cert = self._get_combined_cert_for_server(server, port)
  522. return ServerProxy(server, port, self.ssl, self.auth, self.neutron_id,
  523. self.timeout, self.base_uri, self.name, mypool=self,
  524. combined_cert=combined_cert)
  525. def _get_combined_cert_for_server(self, server, port):
  526. # The ssl library requires a combined file with all trusted certs
  527. # so we make one containing the trusted CAs and the corresponding
  528. # host cert for this server
  529. combined_cert = None
  530. if self.ssl and not cfg.CONF.RESTPROXY.no_ssl_validation:
  531. base_ssl = cfg.CONF.RESTPROXY.ssl_cert_directory
  532. host_dir = os.path.join(base_ssl, 'host_certs')
  533. ca_dir = os.path.join(base_ssl, 'ca_certs')
  534. combined_dir = os.path.join(base_ssl, 'combined')
  535. combined_cert = os.path.join(combined_dir, '%s.pem' % server)
  536. if not os.path.exists(base_ssl):
  537. raise cfg.Error(_('ssl_cert_directory [%s] does not exist. '
  538. 'Create it or disable ssl.') % base_ssl)
  539. for automake in [combined_dir, ca_dir, host_dir]:
  540. if not os.path.exists(automake):
  541. os.makedirs(automake)
  542. # get all CA certs
  543. certs = self._get_ca_cert_paths(ca_dir)
  544. # check for a host specific cert
  545. hcert, exists = self._get_host_cert_path(host_dir, server)
  546. if exists:
  547. certs.append(hcert)
  548. elif cfg.CONF.RESTPROXY.ssl_sticky:
  549. self._fetch_and_store_cert(server, port, hcert)
  550. certs.append(hcert)
  551. if not certs:
  552. raise cfg.Error(_('No certificates were found to verify '
  553. 'controller %s') % (server))
  554. self._combine_certs_to_file(certs, combined_cert)
  555. return combined_cert
  556. def _combine_certs_to_file(self, certs, cfile):
  557. """_combine_certs_to_file
  558. Concatenates the contents of each certificate in a list of
  559. certificate paths to one combined location for use with ssl
  560. sockets.
  561. """
  562. with open(cfile, 'w') as combined:
  563. for c in certs:
  564. with open(c, 'r') as cert_handle:
  565. combined.write(cert_handle.read())
  566. def _get_host_cert_path(self, host_dir, server):
  567. """returns full path and boolean indicating existence"""
  568. hcert = os.path.join(host_dir, '%s.pem' % server)
  569. if os.path.exists(hcert):
  570. return hcert, True
  571. return hcert, False
  572. def _get_ca_cert_paths(self, ca_dir):
  573. certs = [os.path.join(root, name)
  574. for name in [
  575. name for (root, dirs, files) in os.walk(ca_dir)
  576. for name in files]
  577. if name.endswith('.pem')]
  578. return certs
  579. def _fetch_and_store_cert(self, server, port, path):
  580. """_fetch_and_store_cert
  581. Grabs a certificate from a server and writes it to
  582. a given path.
  583. """
  584. try:
  585. cert = ssl.get_server_certificate((server, port),
  586. ssl_version=ssl.PROTOCOL_SSLv23)
  587. except Exception as e:
  588. raise cfg.Error(_('Could not retrieve initial '
  589. 'certificate from controller %(server)s. '
  590. 'Error details: %(error)s') %
  591. {'server': server, 'error': e})
  592. LOG.warning("Storing to certificate for host %(server)s "
  593. "at %(path)s", {'server': server, 'path': path})
  594. self._file_put_contents(path, cert)
  595. return cert
  596. def _file_put_contents(self, path, contents):
  597. # Simple method to write to file.
  598. # Created for easy Mocking
  599. with open(path, 'w') as handle:
  600. handle.write(contents)
  601. def server_failure(self, resp, ignore_codes=None):
  602. """Define failure codes as required.
  603. Note: We assume 301-303 is a failure, and try the next server in
  604. the server pool.
  605. """
  606. if ignore_codes is None:
  607. ignore_codes = []
  608. return (resp[0] in FAILURE_CODES and resp[0] not in ignore_codes)
  609. def action_success(self, resp):
  610. """Defining success codes as required.
  611. Note: We assume any valid 2xx as being successful response.
  612. """
  613. return resp[0] in SUCCESS_CODES
  614. def rest_call(self, action, resource, data, headers, ignore_codes,
  615. timeout=False):
  616. good_first = sorted(self.servers, key=lambda x: x.failed)
  617. first_response = None
  618. for active_server in good_first:
  619. LOG.debug("ServerProxy: %(action)s to servers: "
  620. "%(server)r, %(resource)s",
  621. {'action': action,
  622. 'server': (active_server.server,
  623. active_server.port),
  624. 'resource': resource})
  625. for x in range(HTTP_SERVICE_UNAVAILABLE_RETRY_COUNT + 1):
  626. ret = active_server.rest_call(action, resource, data, headers,
  627. timeout,
  628. reconnect=self.always_reconnect)
  629. if ret[0] != httplib.SERVICE_UNAVAILABLE:
  630. break
  631. eventlet.sleep(HTTP_SERVICE_UNAVAILABLE_RETRY_INTERVAL)
  632. # Store the first response as the error to be bubbled up to the
  633. # user since it was a good server. Subsequent servers will most
  634. # likely be cluster slaves and won't have a useful error for the
  635. # user (e.g. 302 redirect to master)
  636. if not first_response:
  637. first_response = ret
  638. if not self.server_failure(ret, ignore_codes):
  639. active_server.failed = False
  640. LOG.debug("ServerProxy: %(action)s succeed for servers: "
  641. "%(server)r Response: %(response)s",
  642. {'action': action,
  643. 'server': (active_server.server,
  644. active_server.port),
  645. 'response': ret[3]})
  646. return ret
  647. else:
  648. LOG.warning('ServerProxy: %(action)s failure for servers:'
  649. '%(server)r Response: %(response)s',
  650. {'action': action,
  651. 'server': (active_server.server,
  652. active_server.port),
  653. 'response': ret[3]})
  654. LOG.warning("ServerProxy: Error details: "
  655. "status=%(status)d, reason=%(reason)r, "
  656. "ret=%(ret)s, data=%(data)r",
  657. {'status': ret[0], 'reason': ret[1],
  658. 'ret': ret[2], 'data': ret[3]})
  659. active_server.failed = True
  660. # All servers failed, reset server list and try again next time
  661. LOG.error('ServerProxy: %(action)s failure for all servers: '
  662. '%(server)r',
  663. {'action': action,
  664. 'server': tuple((s.server,
  665. s.port) for s in self.servers)})
  666. return first_response
  667. def rest_action(self, action, resource, data='', errstr='%s',
  668. ignore_codes=None, headers=None, timeout=False):
  669. """rest_action
  670. Wrapper for rest_call that verifies success and raises a
  671. RemoteRestError on failure with a provided error string
  672. By default, 404 errors on DELETE calls are ignored because
  673. they already do not exist on the backend.
  674. """
  675. ignore_codes = ignore_codes or []
  676. headers = headers or {}
  677. if not ignore_codes and action == 'DELETE':
  678. ignore_codes = [404]
  679. resp = self.rest_call(action, resource, data, headers, ignore_codes,
  680. timeout)
  681. if self.server_failure(resp, ignore_codes):
  682. # Request wasn't success, nor can be ignored,
  683. # do a full synchronization if auto_sync_on_failure is True
  684. if (cfg.CONF.RESTPROXY.auto_sync_on_failure and
  685. resource != TOPOLOGY_PATH):
  686. LOG.error(_LE("NeutronRestProxyV2: Inconsistency with backend "
  687. "controller, triggering full synchronization. "
  688. "%(action)s %(resource)s."),
  689. {'action': action, 'resource': resource})
  690. sync_executed, topo_resp = self.force_topo_sync(check_ts=True)
  691. if sync_executed:
  692. return topo_resp
  693. # either topo_sync not executed or topo_sync itself failed
  694. LOG.error(errstr, resp[2])
  695. raise RemoteRestError(reason=resp[2], status=resp[0])
  696. if resp[0] in ignore_codes:
  697. LOG.info("NeutronRestProxyV2: Received and ignored error "
  698. "code %(code)s on %(action)s action to resource "
  699. "%(resource)s",
  700. {'code': resp[2], 'action': action,
  701. 'resource': resource})
  702. return resp
  703. def _check_and_raise_exception_unsupported_name(self, obj_type, obj):
  704. """_check_and_raise_exception_unsupported_name
  705. Used to sanity check object names and tenant names within an object.
  706. If they do not comply with the BCF expectation, raises an exception.
  707. :returns None if all ok
  708. :raises UnsupportedNameException or
  709. UnsupportedTenantNameInObjectException if name does not match
  710. BCF expectation
  711. """
  712. if ('name' in obj and obj['name'] and
  713. not is_valid_bcf_name(obj['name'])):
  714. raise UnsupportedNameException(obj_type=obj_type,
  715. obj_id=obj['id'],
  716. obj_name=obj['name'])
  717. if ('tenant_name' in obj and
  718. not is_valid_bcf_name(obj['tenant_name'])):
  719. raise UnsupportedTenantNameInObjectException(
  720. obj_type=obj_type, obj_id=obj['id'], obj_name=obj['name'],
  721. tenant_name=obj['tenant_name'])
  722. def rest_create_tenant(self, tenant_id):
  723. self._update_tenant_cache()
  724. self._rest_create_tenant(tenant_id)
  725. def _rest_create_tenant(self, tenant_id):
  726. tenant_name = self.keystone_tenants.get(tenant_id)
  727. if not tenant_name:
  728. raise TenantIDNotFound(tenant=tenant_id)
  729. if self.is_unicode_enabled():
  730. data = {"tenant_id": tenant_id, 'tenant_name': tenant_id,
  731. 'display-name': tenant_name}
  732. else:
  733. if not is_valid_bcf_name(tenant_name):
  734. raise UnsupportedNameException(obj_type=ObjTypeEnum.tenant,
  735. obj_id=tenant_id,
  736. obj_name=tenant_name)
  737. data = {"tenant_id": tenant_id, 'tenant_name': tenant_name}
  738. resource = TENANT_RESOURCE_PATH
  739. errstr = _("Unable to create tenant: %s")
  740. self.rest_action('POST', resource, data, errstr)
  741. def rest_delete_tenant(self, tenant_id):
  742. resource = TENANT_PATH % tenant_id
  743. errstr = _("Unable to delete tenant: %s")
  744. self.rest_action('DELETE', resource, errstr=errstr)
  745. def rest_create_router(self, tenant_id, router):
  746. self._check_and_raise_exception_unsupported_name(ObjTypeEnum.router,
  747. router)
  748. resource = ROUTER_RESOURCE_PATH % tenant_id
  749. data = {"router": router}
  750. errstr = _("Unable to create remote router: %s")
  751. self.rest_action('POST', resource, data, errstr)
  752. def rest_update_router(self, tenant_id, router, router_id):
  753. self._check_and_raise_exception_unsupported_name(ObjTypeEnum.router,
  754. router)
  755. resource = ROUTERS_PATH % (tenant_id, router_id)
  756. data = {"router": router}
  757. errstr = _("Unable to update remote router: %s")
  758. self.rest_action('PUT', resource, data, errstr)
  759. def rest_delete_router(self, tenant_id, router_id):
  760. resource = ROUTERS_PATH % (tenant_id, router_id)
  761. errstr = _("Unable to delete remote router: %s")
  762. self.rest_action('DELETE', resource, errstr=errstr)
  763. def rest_add_router_interface(self, tenant_id, router_id, intf_details):
  764. resource = ROUTER_INTF_OP_PATH % (tenant_id, router_id)
  765. data = {"interface": intf_details}
  766. errstr = _("Unable to add router interface: %s")
  767. self.rest_action('POST', resource, data, errstr)
  768. def rest_remove_router_interface(self, tenant_id, router_id, interface_id):
  769. resource = ROUTER_INTF_PATH % (tenant_id, router_id, interface_id)
  770. errstr = _("Unable to delete remote intf: %s")
  771. self.rest_action('DELETE', resource, errstr=errstr)
  772. def rest_create_network(self, tenant_id, network):
  773. self._check_and_raise_exception_unsupported_name(ObjTypeEnum.network,
  774. network)
  775. if 'subnets' in network:
  776. for subnet in network['subnets']:
  777. self._check_and_raise_exception_unsupported_name(
  778. ObjTypeEnum.subnet, subnet)
  779. resource = NET_RESOURCE_PATH % tenant_id
  780. data = {"network": network}
  781. errstr = _("Unable to create remote network: %s")
  782. self.rest_action('POST', resource, data, errstr)
  783. def rest_update_network(self, tenant_id, net_id, network):
  784. self._check_and_raise_exception_unsupported_name(ObjTypeEnum.network,
  785. network)
  786. if 'subnets' in network:
  787. for subnet in network['subnets']:
  788. self._check_and_raise_exception_unsupported_name(
  789. ObjTypeEnum.subnet, subnet)
  790. resource = NETWORKS_PATH % (tenant_id, net_id)
  791. data = {"network": network}
  792. errstr = _("Unable to update remote network: %s")
  793. self.rest_action('PUT', resource, data, errstr)
  794. def rest_delete_network(self, tenant_id, net_id):
  795. resource = NETWORKS_PATH % (tenant_id, net_id)
  796. errstr = _("Unable to delete remote network: %s")
  797. self.rest_action('DELETE', resource, errstr=errstr)
  798. def rest_create_securitygroup(self, sg):
  799. self._check_and_raise_exception_unsupported_name(
  800. ObjTypeEnum.security_group, sg)
  801. resource = SECURITY_GROUP_RESOURCE_PATH
  802. data = {"security-group": sg}
  803. errstr = _("Unable to create security group: %s")
  804. self.rest_action('POST', resource, data, errstr)
  805. def rest_delete_securitygroup(self, sg_id):
  806. resource = SECURITY_GROUP_PATH % sg_id
  807. errstr = _("Unable to delete security group: %s")
  808. self.rest_action('DELETE', resource, errstr=errstr)
  809. def rest_get_port(self, tenant_id, net_id, port_id):
  810. resource = ATTACHMENT_PATH % (tenant_id, net_id, port_id)
  811. errstr = _("Unable to retrieve port: %s")
  812. resp = self.rest_action('GET', resource, errstr=errstr,
  813. ignore_codes=[404])
  814. return None if resp[0] == 404 else resp[3]
  815. def rest_create_port(self, tenant_id, net_id, port):
  816. resource = ATTACHMENT_PATH % (tenant_id, net_id, port["id"])
  817. data = {"port": port}
  818. device_id = port.get("device_id")
  819. if not port["mac_address"] or not device_id:
  820. # controller only cares about ports attached to devices
  821. LOG.warning("No device MAC attached to port %s. "
  822. "Skipping notification to controller.",
  823. port["id"])
  824. return
  825. data["attachment"] = {"id": device_id,
  826. "mac": port["mac_address"]}
  827. errstr = _("Unable to create remote port: %s")
  828. self.rest_action('PUT', resource, data, errstr)
  829. def rest_delete_port(self, tenant_id, network_id, port_id):
  830. resource = ATTACHMENT_PATH % (tenant_id, network_id, port_id)
  831. errstr = _("Unable to delete remote port: %s")
  832. self.rest_action('DELETE', resource, errstr=errstr)
  833. def rest_update_port(self, tenant_id, net_id, port):
  834. # Controller has no update operation for the port endpoint
  835. # the create PUT method will replace
  836. self.rest_create_port(tenant_id, net_id, port)
  837. def rest_create_floatingip(self, tenant_id, floatingip):
  838. resource = FLOATINGIPS_PATH % (tenant_id, floatingip['id'])
  839. errstr = _("Unable to create floating IP: %s")
  840. self._ensure_tenant_cache(tenant_id)
  841. self.rest_action('PUT', resource, floatingip, errstr=errstr)
  842. def rest_update_floatingip(self, tenant_id, floatingip, oldid):
  843. resource = FLOATINGIPS_PATH % (tenant_id, oldid)
  844. errstr = _("Unable to update floating IP: %s")
  845. self.rest_action('PUT', resource, floatingip, errstr=errstr)
  846. def rest_delete_floatingip(self, tenant_id, oldid):
  847. resource = FLOATINGIPS_PATH % (tenant_id, oldid)
  848. errstr = _("Unable to delete floating IP: %s")
  849. self.rest_action('DELETE', resource, errstr=errstr)
  850. def rest_get_switch(self, switch_id):
  851. resource = SWITCHES_PATH % switch_id
  852. errstr = _("Unable to retrieve switch: %s")
  853. resp = self.rest_action('GET', resource, errstr=errstr,
  854. ignore_codes=[404])
  855. # return None if switch not found, else return switch info
  856. return None if resp[0] == 404 else resp[3]
  857. def rest_get_testpath(self, src, dst):
  858. resource = TESTPATH_PATH % {'src-tenant': src['tenant'],
  859. 'src-segment': src['segment'],
  860. 'src-ip': src['ip'],
  861. 'dst-ip': dst['ip']}
  862. errstr = _("Unable to retrieve results for testpath ID: %s")
  863. resp = self.rest_action('GET', resource, errstr=errstr,
  864. ignore_codes=[404])
  865. # return None if testpath not found, else return testpath info
  866. return None if (resp[0] not in range(200, 300)) else resp[3]
  867. def rest_create_tenantpolicy(self, tenant_id, policy):
  868. policy['ipproto'] = policy['protocol']
  869. resource = TENANTPOLICY_RESOURCE_PATH % tenant_id
  870. data = {"policy": policy}
  871. errstr = _("Unable to create policy: %s")
  872. self.rest_action('POST', resource, data, errstr)
  873. def rest_update_tenantpolicy(self, tenant_id, policy):
  874. policy['ipproto'] = policy['protocol']
  875. resource = TENANTPOLICIES_PATH % (tenant_id, policy['priority'])
  876. data = {"policy": policy}
  877. errstr = _("Unable to update policy: %s")
  878. self.rest_action('PUT', resource, data, errstr)
  879. def rest_delete_tenantpolicy(self, tenant_id, policy_prio):
  880. resource = TENANTPOLICIES_PATH % (tenant_id, policy_prio)
  881. errstr = _("Unable to delete remote policy: %s")
  882. self.rest_action('DELETE', resource, errstr=errstr)
  883. def _consistency_watchdog(self, polling_interval=60):
  884. if 'consistency' not in self.get_capabilities():
  885. LOG.warning("Backend server(s) do not support automated "
  886. "consistency checks.")
  887. return
  888. if not polling_interval:
  889. LOG.warning("Consistency watchdog disabled by polling "
  890. "interval setting of %s.", polling_interval)
  891. return
  892. while True:
  893. # If consistency is supported, all we have to do is make any
  894. # rest call and the consistency header will be added. If it
  895. # doesn't match, the backend will return a synchronization error
  896. # that will be handled by the rest_action.
  897. eventlet.sleep(polling_interval)
  898. try:
  899. self.rest_action('GET', HEALTH_PATH)
  900. except Exception:
  901. LOG.exception("Encountered an error checking controller "
  902. "health.")
  903. def _capability_watchdog(self, polling_interval=300):
  904. """Check capabilities based on polling_interval
  905. :param polling_interval: interval in seconds
  906. """
  907. while True:
  908. try:
  909. self.get_capabilities_force_update()
  910. except Exception:
  911. LOG.exception("Encountered an error checking capabilities.")
  912. finally:
  913. eventlet.sleep(polling_interval)
  914. def force_topo_sync(self, check_ts=True):
  915. """Execute a topology_sync between OSP and BCF.
  916. Topology sync collects all data from Openstack and pushes to BCF in
  917. one single REST call. This is a heavy operation and is not executed
  918. automatically.
  919. Conditions when this would be called:
  920. (1) during ServerPool initialization
  921. (a) must check previous timestamp
  922. (2) if periodic keystone tenant_cache find a diff in tenant list
  923. (a) should not check previous timestamp
  924. (3) externally triggered by neutron force-bcf-sync command
  925. (a) should not check previous timestamp
  926. (4) a rest_call to BCF fails on both servers and failure_code is not
  927. part of the ignore_codes list
  928. (a) must check previous timestamp
  929. :param check_ts: boolean flag to check previous
  930. timestamp < TOPO_SYNC_EXPIRED_SECS
  931. prev_resp: a REST response tuple from the previous failed REST
  932. call if available. If we skip topo_sync, return the
  933. failure as previously observed.
  934. :return: (sync_executed, response)
  935. sync_executed - Boolean - returns True if we were able to
  936. acquire a lock to perform topo_sync,
  937. False otherwise
  938. response - tuple of the typical HTTP response from REST call
  939. (response.status, response.reason, respstr, respdata)
  940. """
  941. LOG.info(_LI('TOPO_SYNC requested with check_ts %s'), check_ts)
  942. if not self.get_topo_function:
  943. raise cfg.Error(_('Server requires synchronization, '
  944. 'but no topology function was defined.'))
  945. # get current timestamp
  946. curr_ts = str(time.time())
  947. hash_handler = cdb.HashHandler(timestamp_ms=curr_ts)
  948. if not hash_handler.lock(check_ts):
  949. LOG.info(_LI("TOPO_SYNC: lock() returned False. Skipping."))
  950. return False, TOPO_RESPONSE_OK
  951. # else, perform topo_sync
  952. try:
  953. LOG.debug("TOPO_SYNC: requested at %(request_ts)s started at "
  954. "%(start_ts)s",
  955. {'request_ts': cdb.convert_ts_to_datetime(curr_ts),
  956. 'start_ts': cdb.convert_ts_to_datetime(time.time())})
  957. data = self.get_topo_function(
  958. **self.get_topo_function_args)
  959. if not data:
  960. # when keystone sync fails, it fails silently with data = None
  961. # that is wrong, we need to raise an exception
  962. raise Exception(_("TOPO_SYNC: failed to retrieve data."))
  963. LOG.debug("TOPO_SYNC: data received from OSP, sending "
  964. "request to BCF.")
  965. errstr = _("Unable to perform forced topology_sync: %s")
  966. return True, self.rest_action('POST', TOPOLOGY_PATH, data, errstr)
  967. except Exception as e:
  968. # if encountered an exception, set to previous timestamp
  969. LOG.warning(_LW("TOPO_SYNC: Exception during topology sync. "
  970. "Consistency DB timestamp will not be updated."))
  971. hash_handler.unlock(set_prev_ts=True)
  972. raise e
  973. finally:
  974. hash_handler.unlock()
  975. diff = time.time() - float(hash_handler.lock_ts)
  976. LOG.info(_LI("TOPO_SYNC: took %s seconds to execute topo_sync. "
  977. "consistency_db unlocked."),
  978. str(diff))
  979. def _ensure_tenant_cache(self, tenant_id):
  980. if tenant_id not in self.keystone_tenants:
  981. self._update_tenant_cache()
  982. def _update_tenant_cache(self, reconcile=True, ratelimit=False):
  983. if ratelimit is True and self._last_keystone_sync_time is not None:
  984. if time.time() - self._last_keystone_sync_time <= \
  985. KEYSTONE_SYNC_RATE_LIMIT:
  986. return
  987. try:
  988. auth = v3.Password(auth_url=self.auth_url,
  989. username=self.auth_user,
  990. password=self.auth_password,
  991. project_name=self.auth_tenant,
  992. user_domain_name=self.user_domain_name,
  993. project_domain_name=self.project_domain_name)
  994. sess = session.Session(auth=auth)
  995. keystone_client = ksclient.Client(session=sess)
  996. tenants = keystone_client.projects.list()
  997. if self.is_unicode_enabled():
  998. new_cached_tenants = {tn.id: tn.name
  999. for tn in tenants}
  1000. else:
  1001. new_cached_tenants = {tn.id: Util.format_resource_name(tn.name)
  1002. for tn in tenants}
  1003. # Add SERVICE_TENANT to handle hidden network for VRRP
  1004. new_cached_tenants[SERVICE_TENANT] = SERVICE_TENANT
  1005. LOG.debug("New TENANTS: %s \nPrevious Tenants %s",
  1006. new_cached_tenants, self.keystone_tenants)
  1007. diff = DictDiffer(new_cached_tenants, self.keystone_tenants)
  1008. self.keystone_tenants = new_cached_tenants
  1009. if reconcile:
  1010. for tenant_id in diff.added():
  1011. LOG.debug("TENANT create: id %s name %s",
  1012. tenant_id, self.keystone_tenants[tenant_id])
  1013. self._rest_create_tenant(tenant_id)
  1014. for tenant_id in diff.removed():
  1015. LOG.debug("TENANT delete: id %s", tenant_id)
  1016. self.rest_delete_tenant(tenant_id)
  1017. if diff.changed():
  1018. LOG.debug("Tenant cache outdated. Forcing topo_sync.")
  1019. self.force_topo_sync(check_ts=False)
  1020. return True
  1021. except Exception:
  1022. LOG.exception("Encountered an error syncing with keystone.")
  1023. return False
  1024. finally:
  1025. self._last_keystone_sync_time = time.time()
  1026. def _keystone_sync(self, polling_interval=300):
  1027. while True:
  1028. eventlet.sleep(polling_interval)
  1029. self._update_tenant_cache()
  1030. class HTTPSConnectionWithValidation(httplib.HTTPSConnection):
  1031. # If combined_cert is None, the connection will continue without
  1032. # any certificate validation.
  1033. combined_cert = None
  1034. def connect(self):
  1035. sock = socket.create_connection((self.host, self.port),
  1036. self.timeout, self.source_address)
  1037. if self._tunnel_host:
  1038. self.sock = sock
  1039. self._tunnel()
  1040. if self.combined_cert:
  1041. self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
  1042. cert_reqs=ssl.CERT_REQUIRED,
  1043. ca_certs=self.combined_cert,
  1044. ssl_version=ssl.PROTOCOL_SSLv23)
  1045. else:
  1046. self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
  1047. cert_reqs=ssl.CERT_NONE,
  1048. ssl_version=ssl.PROTOCOL_SSLv23)