Big Switch Networks plugins and drivers for the networking project
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1287 lines
54KB

  1. # Copyright 2014 Big Switch Networks, Inc.
  2. # All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. """
  16. This module manages the HTTP and HTTPS connections to the backend controllers.
  17. The main class it provides for external use is ServerPool which manages a set
  18. of ServerProxy objects that correspond to individual backend controllers.
  19. The following functionality is handled by this module:
  20. - Translation of rest_* function calls to HTTP/HTTPS calls to the controllers
  21. - Automatic failover between controllers
  22. - SSL Certificate enforcement
  23. - HTTP Authentication
  24. """
  25. import base64
  26. import httplib
  27. import re
  28. import socket
  29. import ssl
  30. import time
  31. from neutron_lib import exceptions
  32. from oslo_log import log as logging
  33. import eventlet
  34. import eventlet.corolocal
  35. from keystoneauth1.identity import v3
  36. from keystoneauth1 import session
  37. from keystoneclient.v3 import client as ksclient
  38. from networking_bigswitch.plugins.bigswitch.db import consistency_db as cdb
  39. from networking_bigswitch.plugins.bigswitch.i18n import _
  40. from networking_bigswitch.plugins.bigswitch.i18n import _LE
  41. from networking_bigswitch.plugins.bigswitch.i18n import _LI
  42. from networking_bigswitch.plugins.bigswitch.i18n import _LW
  43. from networking_bigswitch.plugins.bigswitch import state_syncer
  44. from networking_bigswitch.plugins.bigswitch.utils import Util
  45. import os
  46. from oslo_config import cfg
  47. from oslo_serialization import jsonutils
  48. from oslo_utils import excutils
  49. from sqlalchemy.types import Enum
  50. LOG = logging.getLogger(__name__)
  51. # The following are used to invoke the API on the external controller
  52. CAPABILITIES_PATH = "/capabilities"
  53. NET_RESOURCE_PATH = "/tenants/%s/networks"
  54. PORT_RESOURCE_PATH = "/tenants/%s/networks/%s/ports"
  55. ROUTER_RESOURCE_PATH = "/tenants/%s/routers"
  56. ROUTER_INTF_OP_PATH = "/tenants/%s/routers/%s/interfaces"
  57. SECURITY_GROUP_RESOURCE_PATH = "/securitygroups"
  58. TENANT_RESOURCE_PATH = "/tenants"
  59. NETWORKS_PATH = "/tenants/%s/networks/%s"
  60. FLOATINGIPS_PATH = "/tenants/%s/floatingips/%s"
  61. PORTS_PATH = "/tenants/%s/networks/%s/ports/%s"
  62. ATTACHMENT_PATH = "/tenants/%s/networks/%s/ports/%s/attachment"
  63. ROUTERS_PATH = "/tenants/%s/routers/%s"
  64. ROUTER_INTF_PATH = "/tenants/%s/routers/%s/interfaces/%s"
  65. SECURITY_GROUP_PATH = "/securitygroups/%s"
  66. TENANT_PATH = "/tenants/%s"
  67. TOPOLOGY_PATH = "/topology"
  68. HEALTH_PATH = "/health"
  69. SWITCHES_PATH = "/switches/%s"
  70. TESTPATH_PATH = ('/testpath/controller-view'
  71. '?src-tenant=%(src-tenant)s'
  72. '&src-segment=%(src-segment)s&src-ip=%(src-ip)s'
  73. '&dst-ip=%(dst-ip)s')
  74. TENANTPOLICY_RESOURCE_PATH = "/tenants/%s/policies"
  75. TENANTPOLICIES_PATH = "/tenants/%s/policies/%s"
  76. OOSM_OSP_RESOURCE_PATH = "/orchestrator/openstack"
  77. OOSM_OSP_LAST_UPDATE_PATH = OOSM_OSP_RESOURCE_PATH + "/%s/last-update"
  78. SUCCESS_CODES = range(200, 207)
  79. FAILURE_CODES = [0, 301, 302, 303, 400, 401, 403, 404, 500, 501, 502, 503,
  80. 504, 505]
  81. BASE_URI = '/networkService/v2.0'
  82. ORCHESTRATION_SERVICE_ID = 'Neutron v2.0'
  83. HASH_MATCH_HEADER = 'X-BSN-BVS-HASH-MATCH'
  84. SERVICE_TENANT = 'VRRP_Service'
  85. KS_AUTH_GROUP_NAME = 'keystone_authtoken'
  86. KS_AUTH_DOMAIN_DEFAULT = 'default'
  87. # nova auth group name
  88. NOVA_AUTH_GROUP_NAME = 'nova'
  89. # error messages
  90. NXNETWORK = 'NXVNS'
  91. HTTP_SERVICE_UNAVAILABLE_RETRY_COUNT = 3
  92. HTTP_SERVICE_UNAVAILABLE_RETRY_INTERVAL = 3
  93. KEYSTONE_SYNC_RATE_LIMIT = 30 # Limit KeyStone sync to once in 30 secs
  94. # TOPO_SYNC Responses
  95. TOPO_RESPONSE_OK = (httplib.OK, httplib.OK, True, True)
  96. TOPO_RESPONSE_FAIL = (0, None, None, None)
  97. # RE pattern for checking BCF supported names
  98. BCF_IDENTIFIER_UUID_RE = re.compile(r"[0-9a-zA-Z][-.0-9a-zA-Z_]*")
  99. class TenantIDNotFound(exceptions.NeutronException):
  100. message = _("Tenant: %(tenant)s is not known by keystone.")
  101. status = None
  102. def __init__(self, **kwargs):
  103. self.tenant = kwargs.get('tenant')
  104. super(TenantIDNotFound, self).__init__(**kwargs)
  105. class UnsupportedNameException(exceptions.NeutronException):
  106. """UnsupportedNameException
  107. Exception class to be raised when encountering object names with
  108. unsupported names. Namely those that do not conform to the regular
  109. expression BCF_IDENTIFIER_UUID_RE
  110. :keyword obj_type
  111. :keyword obj_id
  112. :keyword obj_name
  113. """
  114. message = _("Object of type %(obj_type)s and id %(obj_id)s has unsupported"
  115. " character in name \"%(obj_name)s\". It should begin with an"
  116. " alphanumeric character [0-9a-zA-Z] and can contain space,"
  117. " underscore, apostrophe, forward slash, opening and closing"
  118. " square brackets.")
  119. status = None
  120. class UnsupportedTenantNameInObjectException(exceptions.NeutronException):
  121. """Unsupported Tenant Name
  122. Exception class to be raised when objects have tenant names with
  123. unsupported characters. Namely those that do not conform to the regular
  124. expression BCF_IDENTIFIER_UUID_RE
  125. :keyword obj_type
  126. :keyword obj_id
  127. :keyword obj_name
  128. :keyword tenant_name
  129. """
  130. message = _("Object of type %(obj_type)s, id %(obj_id)s and name "
  131. "%(obj_name)s has unsupported character in its tenant name "
  132. "\"%(tenant_name)s\". It should begin with an"
  133. "alphanumeric character [0-9a-zA-Z] and can contain space,"
  134. "underscore, apostrophe and double-quotes.")
  135. status = None
  136. class NetworkNameChangeError(exceptions.NeutronException):
  137. message = _("network name is not allowed to be changed.")
  138. status = None
  139. class RemoteRestError(exceptions.NeutronException):
  140. message = _("Error in REST call to BCF "
  141. "controller: %(reason)s")
  142. status = None
  143. def __init__(self, **kwargs):
  144. self.status = kwargs.pop('status', None)
  145. self.reason = kwargs.get('reason')
  146. super(RemoteRestError, self).__init__(**kwargs)
  147. class DictDiffer(object):
  148. """Calculate the difference between two dictionaries as:
  149. (1) items added
  150. (2) items removed
  151. (3) keys same in both but changed values
  152. (4) keys same in both and unchanged values
  153. """
  154. def __init__(self, current_dict, past_dict):
  155. self.current_dict, self.past_dict = current_dict, past_dict
  156. self.set_current, self.set_past = (set(current_dict.keys()),
  157. set(past_dict.keys()))
  158. self.intersect = self.set_current.intersection(self.set_past)
  159. def added(self):
  160. return self.set_current - self.intersect
  161. def removed(self):
  162. return self.set_past - self.intersect
  163. def changed(self):
  164. return set(o for o in self.intersect
  165. if self.past_dict[o] != self.current_dict[o])
  166. def unchanged(self):
  167. return set(o for o in self.intersect
  168. if self.past_dict[o] == self.current_dict[o])
  169. class ObjTypeEnum(Enum):
  170. """Enum
  171. Enum to represent various object types whose name requires sanitization
  172. before syncing to the controller.
  173. """
  174. network = "network"
  175. router = "router"
  176. security_group = "security_group"
  177. subnet = "subnet"
  178. tenant = "tenant"
  179. def is_valid_bcf_name(name):
  180. """Check if bcf name is valid
  181. :returns True if name matches BCF_IDENTIFIER_UUID_RE
  182. :returns False otherwise
  183. """
  184. match_obj = BCF_IDENTIFIER_UUID_RE.match(name)
  185. if match_obj and match_obj.group(0) == name:
  186. return True
  187. return False
  188. def get_keystoneauth_cfg(conf, name):
  189. """get the keystone auth cfg
  190. Fetch value of keystone_authtoken group from config file when not
  191. available as part of GroupAttr.
  192. :rtype: String
  193. :param conf: oslo config cfg.CONF
  194. :param name: property name to be retrieved
  195. """
  196. try:
  197. value_list = conf._namespace._get_file_value([(KS_AUTH_GROUP_NAME,
  198. name)])
  199. return value_list[0]
  200. except KeyError as e:
  201. LOG.warning("Config does not have property %(name)s "
  202. "in group keystone_authtoken", {'name': name})
  203. raise e
  204. def get_novaauth_cfg(conf, name):
  205. """get the nova auth cfg
  206. Fetch value of nova group from config file when not
  207. available as part of GroupAttr.
  208. :rtype: String
  209. :param conf: oslo config cfg.CONF
  210. :param name: property name to be retrieved
  211. """
  212. try:
  213. value_list = conf._namespace._get_file_value([(NOVA_AUTH_GROUP_NAME,
  214. name)])
  215. return value_list[0]
  216. except KeyError:
  217. LOG.debug("Config does not have property %(name)s "
  218. "in group nova", {'name': name})
  219. # no need to raise exception - if property is missing in nova group,
  220. # we reuse what was set for keystone :)
  221. # raise e
  222. class ServerProxy(object):
  223. """REST server proxy to a network controller."""
  224. def __init__(self, server, port, ssl, auth, neutron_id, timeout,
  225. base_uri, name, mypool, combined_cert):
  226. self.server = server
  227. self.port = port
  228. self.ssl = ssl
  229. self.base_uri = base_uri
  230. self.timeout = timeout
  231. self.name = name
  232. self.success_codes = SUCCESS_CODES
  233. self.auth = None
  234. self.auth_token = None
  235. self.neutron_id = neutron_id
  236. self.failed = False
  237. self.capabilities = []
  238. # enable server to reference parent pool
  239. self.mypool = mypool
  240. # cache connection here to avoid a SSL handshake for every connection
  241. self.currentconn = None
  242. if auth:
  243. if ':' in auth:
  244. self.auth = 'Basic ' + base64.encodestring(auth).strip()
  245. else:
  246. self.auth_token = 'session_cookie="' + auth + '"'
  247. self.combined_cert = combined_cert
  248. def get_capabilities(self):
  249. try:
  250. body = self.rest_call('GET', CAPABILITIES_PATH)[2]
  251. if body:
  252. self.capabilities = jsonutils.loads(body)
  253. except Exception:
  254. LOG.exception("Couldn't retrieve capabilities on server "
  255. "%(server)s. ", {'server': self.server})
  256. LOG.info("The following capabilities were received "
  257. "for %(server)s: %(cap)s",
  258. {'server': self.server, 'cap': self.capabilities})
  259. return self.capabilities
  260. def rest_call(self, action, resource, data='', headers=None,
  261. timeout=False, reconnect=False):
  262. uri = self.base_uri + resource
  263. body = jsonutils.dumps(data)
  264. headers = headers or {}
  265. headers['Content-type'] = 'application/json'
  266. headers['Accept'] = 'application/json'
  267. headers['NeutronProxy-Agent'] = self.name
  268. headers['Instance-ID'] = self.neutron_id
  269. headers['Orchestration-Service-ID'] = ORCHESTRATION_SERVICE_ID
  270. # TODO(kevinbenton): Re-enable keep-alive in a thread-safe fashion.
  271. # When multiple workers are enabled the saved connection gets mangled
  272. # by multiple threads so we always reconnect.
  273. if 'keep-alive' in self.capabilities and False:
  274. headers['Connection'] = 'keep-alive'
  275. else:
  276. reconnect = True
  277. if self.auth_token:
  278. headers['Cookie'] = self.auth_token
  279. elif self.auth:
  280. headers['Authorization'] = self.auth
  281. LOG.debug("ServerProxy: server=%(server)s, port=%(port)d, "
  282. "ssl=%(ssl)r",
  283. {'server': self.server, 'port': self.port, 'ssl': self.ssl})
  284. LOG.debug("ServerProxy: resource=%(resource)s, data=%(data)r, "
  285. "headers=%(headers)r, action=%(action)s",
  286. {'resource': resource, 'data': data, 'headers': headers,
  287. 'action': action})
  288. # unspecified timeout is False because a timeout can be specified as
  289. # None to indicate no timeout.
  290. if timeout is False:
  291. timeout = self.timeout
  292. if timeout != self.timeout:
  293. # need a new connection if timeout has changed
  294. reconnect = True
  295. if not self.currentconn or reconnect:
  296. if self.currentconn:
  297. self.currentconn.close()
  298. if self.ssl:
  299. currentconn = HTTPSConnectionWithValidation(
  300. self.server, self.port, timeout=timeout)
  301. if currentconn is None:
  302. LOG.error('ServerProxy: Could not establish HTTPS '
  303. 'connection')
  304. return 0, None, None, None
  305. currentconn.combined_cert = self.combined_cert
  306. else:
  307. currentconn = httplib.HTTPConnection(
  308. self.server, self.port, timeout=timeout)
  309. if currentconn is None:
  310. LOG.error('ServerProxy: Could not establish HTTP '
  311. 'connection')
  312. return 0, None, None, None
  313. try:
  314. bcf_request_time = time.time()
  315. currentconn.request(action, uri, body, headers)
  316. response = currentconn.getresponse()
  317. respstr = response.read()
  318. respdata = respstr
  319. bcf_response_time = time.time()
  320. LOG.debug("Time waited to get response from BCF %.2fsecs",
  321. (bcf_response_time - bcf_request_time))
  322. if response.status in self.success_codes:
  323. try:
  324. respdata = jsonutils.loads(respstr)
  325. except ValueError:
  326. # response was not JSON, ignore the exception
  327. pass
  328. ret = (response.status, response.reason, respstr, respdata)
  329. except httplib.HTTPException:
  330. # If we were using a cached connection, try again with a new one.
  331. with excutils.save_and_reraise_exception() as ctxt:
  332. currentconn.close()
  333. if reconnect:
  334. # if reconnect is true, this was on a fresh connection so
  335. # reraise since this server seems to be broken
  336. ctxt.reraise = True
  337. else:
  338. # if reconnect is false, it was a cached connection so
  339. # try one more time before re-raising
  340. ctxt.reraise = False
  341. return self.rest_call(action, resource, data, headers,
  342. timeout=timeout, reconnect=True)
  343. except (socket.timeout, socket.error) as e:
  344. currentconn.close()
  345. LOG.error('ServerProxy: %(action)s failure, %(e)r',
  346. {'action': action, 'e': e})
  347. ret = 0, None, None, None
  348. LOG.debug("ServerProxy: status=%(status)d, reason=%(reason)r, "
  349. "ret=%(ret)s, data=%(data)r", {'status': ret[0],
  350. 'reason': ret[1],
  351. 'ret': ret[2],
  352. 'data': ret[3]})
  353. return ret
  354. class ServerPool(object):
  355. _instance = None
  356. @classmethod
  357. def get_instance(cls):
  358. if cls._instance:
  359. return cls._instance
  360. cls._instance = cls()
  361. return cls._instance
  362. def __init__(self, timeout=False,
  363. base_uri=BASE_URI, name='NeutronRestProxy'):
  364. LOG.debug("ServerPool: initializing")
  365. # 'servers' is the list of network controller REST end-points
  366. # (used in order specified till one succeeds, and it is sticky
  367. # till next failure). Use 'server_auth' to encode api-key
  368. servers = cfg.CONF.RESTPROXY.servers
  369. self.auth = cfg.CONF.RESTPROXY.server_auth
  370. self.ssl = cfg.CONF.RESTPROXY.server_ssl
  371. self.neutron_id = cfg.CONF.RESTPROXY.neutron_id
  372. # unicode config
  373. self.cfg_unicode_enabled = cfg.CONF.RESTPROXY.naming_scheme_unicode
  374. self.capabilities = set()
  375. # cluster state sync interval
  376. self.state_sync_interval = cfg.CONF.RESTPROXY.state_sync_interval
  377. if 'keystone_authtoken' in cfg.CONF:
  378. self.auth_user = get_keystoneauth_cfg(cfg.CONF, 'username')
  379. self.auth_password = get_keystoneauth_cfg(cfg.CONF, 'password')
  380. self.auth_url = get_keystoneauth_cfg(cfg.CONF, 'auth_url')
  381. self.auth_tenant = get_keystoneauth_cfg(cfg.CONF, 'project_name')
  382. self.project_domain_name = get_keystoneauth_cfg(
  383. cfg.CONF, 'project_domain_name')
  384. self.user_domain_name = get_keystoneauth_cfg(
  385. cfg.CONF, 'user_domain_name')
  386. else:
  387. # this is for UT only
  388. LOG.warning("keystone_authtoken not found in "
  389. "/etc/neutron/neutron.conf. "
  390. "Please check config file")
  391. self.auth_url = cfg.CONF.RESTPROXY.auth_url
  392. self.auth_user = cfg.CONF.RESTPROXY.auth_user
  393. self.auth_password = cfg.CONF.RESTPROXY.auth_password
  394. self.auth_tenant = cfg.CONF.RESTPROXY.auth_tenant
  395. self.project_domain_name = KS_AUTH_DOMAIN_DEFAULT
  396. self.user_domain_name = KS_AUTH_DOMAIN_DEFAULT
  397. # Use Keystonev3 URL for authentication
  398. if "v2.0" in self.auth_url:
  399. self.auth_url = self.auth_url.replace("v2.0", "v3")
  400. elif "v3" not in self.auth_url:
  401. self.auth_url = "%s/v3" % self.auth_url
  402. # load nova auth separately if available (true for Devstack)
  403. nova_auth_dict = {}
  404. if 'nova' in cfg.CONF:
  405. nova_auth_dict['auth_url'] = self.auth_url
  406. username = get_novaauth_cfg(cfg.CONF, 'username')
  407. nova_auth_dict['username'] = (self.auth_user
  408. if username is None else username)
  409. password = get_novaauth_cfg(cfg.CONF, 'password')
  410. nova_auth_dict['password'] = (self.auth_password
  411. if password is None else password)
  412. project_name = get_novaauth_cfg(cfg.CONF, 'project_name')
  413. nova_auth_dict['project_name'] = (
  414. self.auth_tenant
  415. if project_name is None else project_name)
  416. user_domain_name = get_novaauth_cfg(cfg.CONF, 'user_domain_name')
  417. nova_auth_dict['user_domain_name'] = (
  418. self.user_domain_name
  419. if user_domain_name is None else user_domain_name)
  420. project_domain_name = get_novaauth_cfg(
  421. cfg.CONF, 'project_domain_name')
  422. nova_auth_dict['project_domain_name'] = (
  423. self.project_domain_name
  424. if project_domain_name is None else project_domain_name)
  425. # create a dict for keystone and nova auth to pass to state_syncer
  426. ks_auth_dict = {}
  427. ks_auth_dict['auth_url'] = self.auth_url
  428. ks_auth_dict['username'] = self.auth_user
  429. ks_auth_dict['password'] = self.auth_password
  430. ks_auth_dict['project_name'] = self.auth_tenant
  431. ks_auth_dict['user_domain_name'] = self.user_domain_name
  432. ks_auth_dict['project_domain_name'] = self.project_domain_name
  433. # once all auth info is loaded, init the state_syncer
  434. self.state_syncer = state_syncer.StateSyncer(
  435. ks_auth_dict=ks_auth_dict, nova_auth_dict=nova_auth_dict)
  436. self.base_uri = base_uri
  437. self.name = name
  438. # Cache for Openstack projects
  439. # The cache is maintained in a separate thread and sync'ed with
  440. # Keystone periodically.
  441. self.keystone_tenants = {}
  442. self.timeout = cfg.CONF.RESTPROXY.server_timeout
  443. self.always_reconnect = not cfg.CONF.RESTPROXY.cache_connections
  444. default_port = 8000
  445. if timeout is not False:
  446. self.timeout = timeout
  447. # Function to use to retrieve topology for consistency syncs.
  448. # Needs to be set by module that uses the servermanager.
  449. self.get_topo_function = None
  450. self.get_topo_function_args = {}
  451. if not servers:
  452. raise cfg.Error(_('Servers not defined. Aborting server manager.'))
  453. servers = [s if len(s.rsplit(':', 1)) == 2
  454. else "%s:%d" % (s, default_port)
  455. for s in servers]
  456. if any((len(spl) != 2 or not spl[1].isdigit())
  457. for spl in [sp.rsplit(':', 1)
  458. for sp in servers]):
  459. raise cfg.Error(_('Servers must be defined as <ip>:<port>. '
  460. 'Configuration was %s') % servers)
  461. self.servers = []
  462. for s in servers:
  463. server, port = s.rsplit(':', 1)
  464. if server.startswith("[") and server.endswith("]"):
  465. # strip [] for ipv6 address
  466. server = server[1:-1]
  467. self.servers.append(self.server_proxy_for(server, int(port)))
  468. # update capabilities
  469. self.get_capabilities()
  470. self._update_tenant_cache(reconcile=False)
  471. self.start_background_tasks()
  472. ServerPool._instance = self
  473. LOG.debug("ServerPool: initialization done")
  474. def start_background_tasks(self):
  475. # consistency check, starts after 1* consistency_interval
  476. eventlet.spawn(self._consistency_watchdog,
  477. cfg.CONF.RESTPROXY.consistency_interval)
  478. # Start keystone sync thread after 5 consistency sync
  479. # to give enough time for topology to sync over when
  480. # neutron-server starts.
  481. eventlet.spawn_after(
  482. 5 * cfg.CONF.RESTPROXY.consistency_interval,
  483. self._keystone_sync,
  484. cfg.CONF.RESTPROXY.keystone_sync_interval)
  485. # start a periodic state syncer with BCF
  486. # this provides hypervisor, vm and other misc info to BCF for
  487. # integration dashboard
  488. if self.state_sync_interval > 0:
  489. if 'orchestrator-state-sync' in self.get_capabilities():
  490. eventlet.spawn_after(60,
  491. self.state_syncer.periodic_update,
  492. self.state_sync_interval)
  493. LOG.info('Orchestrator State Sync is enabled.')
  494. else:
  495. LOG.info('BCF does not support orchestrator-state-sync, '
  496. 'disabled it.')
  497. else:
  498. LOG.info('Orchestrator State Sync is disabled.')
  499. def get_capabilities(self):
  500. """Get capabilities
  501. If cache has the value, use it
  502. If Not, do REST calls to BCF controllers to check it
  503. :return: supported capability list
  504. """
  505. # lookup on first try
  506. # if capabilities is empty, the check is either not done, or failed
  507. if self.capabilities:
  508. return self.capabilities
  509. # Do REST calls to update capabilities at startup
  510. # Servers should be the same version
  511. # If one server is down, use online server's capabilities
  512. capability_list = [set(server.get_capabilities())
  513. for server in self.servers]
  514. new_capabilities = set.union(*capability_list)
  515. if new_capabilities:
  516. # Log unicode status
  517. self.log_unicode_status(new_capabilities)
  518. self.capabilities = new_capabilities
  519. # With multiple workers enabled, the fork may occur after the
  520. # connections to the DB have been established. We need to clear the
  521. # connections after the first attempt to call the backend to ensure
  522. # that the established connections will all be local to the thread and
  523. # not shared. Placing it here in the capabilities call is the easiest
  524. # way to ensure that its done after everything is initialized and the
  525. # first call to the backend is made.
  526. # This is necessary in our plugin and not others because we have a
  527. # completely separate DB connection for the consistency records. The
  528. # main connection is made thread-safe in the neutron service init.
  529. # https://github.com/openstack/neutron/blob/
  530. # ec716b9e68b8b66a88218913ae4c9aa3a26b025a/neutron/wsgi.py#L104
  531. if cdb.HashHandler._FACADE:
  532. cdb.HashHandler._FACADE.get_engine().pool.dispose()
  533. if not new_capabilities:
  534. LOG.error('Failed to get capabilities on any controller. ')
  535. return self.capabilities
  536. def log_unicode_status(self, new_capabilities):
  537. """Log unicode running status
  538. :param new_capabilities: new capabilities
  539. :return:
  540. """
  541. # unicode disabled by user
  542. if not self.cfg_unicode_enabled:
  543. LOG.info('naming_scheme_unicode is set to False,'
  544. ' Unicode names Disabled')
  545. # unicode enabled and supported by controller
  546. elif 'display-name' in new_capabilities:
  547. LOG.info('naming_scheme_unicode is set to True,'
  548. ' Unicode names Enabled')
  549. # unicode enabled, but not supported by controller
  550. else:
  551. LOG.warning('naming_scheme_unicode is set to True,'
  552. ' but BCF does not support it.'
  553. ' Unicode names Disabled')
  554. def is_unicode_enabled(self):
  555. """Check unicode running status
  556. True: enabled
  557. False: disabled
  558. """
  559. LOG.debug('Current Capabilities: %s', self.get_capabilities())
  560. if not self.get_capabilities():
  561. msg = 'Capabilities unknown! Please check BCF controller status.'
  562. raise RemoteRestError(reason=msg)
  563. if self.cfg_unicode_enabled and 'display-name' in \
  564. self.get_capabilities():
  565. LOG.debug('Unicode Check=True')
  566. return True
  567. else:
  568. LOG.debug('Unicode Check=False')
  569. return False
  570. def server_proxy_for(self, server, port):
  571. combined_cert = self._get_combined_cert_for_server(server, port)
  572. return ServerProxy(server, port, self.ssl, self.auth, self.neutron_id,
  573. self.timeout, self.base_uri, self.name, mypool=self,
  574. combined_cert=combined_cert)
  575. def _get_combined_cert_for_server(self, server, port):
  576. # The ssl library requires a combined file with all trusted certs
  577. # so we make one containing the trusted CAs and the corresponding
  578. # host cert for this server
  579. combined_cert = None
  580. if self.ssl and not cfg.CONF.RESTPROXY.no_ssl_validation:
  581. base_ssl = cfg.CONF.RESTPROXY.ssl_cert_directory
  582. host_dir = os.path.join(base_ssl, 'host_certs')
  583. ca_dir = os.path.join(base_ssl, 'ca_certs')
  584. combined_dir = os.path.join(base_ssl, 'combined')
  585. combined_cert = os.path.join(combined_dir, '%s.pem' % server)
  586. if not os.path.exists(base_ssl):
  587. raise cfg.Error(_('ssl_cert_directory [%s] does not exist. '
  588. 'Create it or disable ssl.') % base_ssl)
  589. for automake in [combined_dir, ca_dir, host_dir]:
  590. if not os.path.exists(automake):
  591. os.makedirs(automake)
  592. # get all CA certs
  593. certs = self._get_ca_cert_paths(ca_dir)
  594. # check for a host specific cert
  595. hcert, exists = self._get_host_cert_path(host_dir, server)
  596. if exists:
  597. certs.append(hcert)
  598. elif cfg.CONF.RESTPROXY.ssl_sticky:
  599. self._fetch_and_store_cert(server, port, hcert)
  600. certs.append(hcert)
  601. if not certs:
  602. raise cfg.Error(_('No certificates were found to verify '
  603. 'controller %s') % (server))
  604. self._combine_certs_to_file(certs, combined_cert)
  605. return combined_cert
  606. def _combine_certs_to_file(self, certs, cfile):
  607. """_combine_certs_to_file
  608. Concatenates the contents of each certificate in a list of
  609. certificate paths to one combined location for use with ssl
  610. sockets.
  611. """
  612. with open(cfile, 'w') as combined:
  613. for c in certs:
  614. with open(c, 'r') as cert_handle:
  615. combined.write(cert_handle.read())
  616. def _get_host_cert_path(self, host_dir, server):
  617. """returns full path and boolean indicating existence"""
  618. hcert = os.path.join(host_dir, '%s.pem' % server)
  619. if os.path.exists(hcert):
  620. return hcert, True
  621. return hcert, False
  622. def _get_ca_cert_paths(self, ca_dir):
  623. certs = [os.path.join(root, name)
  624. for name in [
  625. name for (root, dirs, files) in os.walk(ca_dir)
  626. for name in files]
  627. if name.endswith('.pem')]
  628. return certs
  629. def _fetch_and_store_cert(self, server, port, path):
  630. """_fetch_and_store_cert
  631. Grabs a certificate from a server and writes it to
  632. a given path.
  633. """
  634. try:
  635. cert = ssl.get_server_certificate((server, port),
  636. ssl_version=ssl.PROTOCOL_SSLv23)
  637. except Exception as e:
  638. raise cfg.Error(_('Could not retrieve initial '
  639. 'certificate from controller %(server)s. '
  640. 'Error details: %(error)s') %
  641. {'server': server, 'error': e})
  642. LOG.warning("Storing to certificate for host %(server)s "
  643. "at %(path)s", {'server': server, 'path': path})
  644. self._file_put_contents(path, cert)
  645. return cert
  646. def _file_put_contents(self, path, contents):
  647. # Simple method to write to file.
  648. # Created for easy Mocking
  649. with open(path, 'w') as handle:
  650. handle.write(contents)
  651. def server_failure(self, resp, ignore_codes=None):
  652. """Define failure codes as required.
  653. Note: We assume 301-303 is a failure, and try the next server in
  654. the server pool.
  655. """
  656. if ignore_codes is None:
  657. ignore_codes = []
  658. return (resp[0] in FAILURE_CODES and resp[0] not in ignore_codes)
  659. def action_success(self, resp):
  660. """Defining success codes as required.
  661. Note: We assume any valid 2xx as being successful response.
  662. """
  663. return resp[0] in SUCCESS_CODES
  664. def rest_call(self, action, resource, data, headers, ignore_codes,
  665. timeout=False):
  666. good_first = sorted(self.servers, key=lambda x: x.failed)
  667. first_response = None
  668. for active_server in good_first:
  669. LOG.debug("ServerProxy: %(action)s to servers: "
  670. "%(server)r, %(resource)s",
  671. {'action': action,
  672. 'server': (active_server.server,
  673. active_server.port),
  674. 'resource': resource})
  675. for x in range(HTTP_SERVICE_UNAVAILABLE_RETRY_COUNT + 1):
  676. ret = active_server.rest_call(action, resource, data, headers,
  677. timeout,
  678. reconnect=self.always_reconnect)
  679. if ret[0] != httplib.SERVICE_UNAVAILABLE:
  680. break
  681. eventlet.sleep(HTTP_SERVICE_UNAVAILABLE_RETRY_INTERVAL)
  682. # Store the first response as the error to be bubbled up to the
  683. # user since it was a good server. Subsequent servers will most
  684. # likely be cluster slaves and won't have a useful error for the
  685. # user (e.g. 302 redirect to master)
  686. if not first_response:
  687. first_response = ret
  688. if not self.server_failure(ret, ignore_codes):
  689. active_server.failed = False
  690. LOG.debug("ServerProxy: %(action)s succeed for servers: "
  691. "%(server)r Response: %(response)s",
  692. {'action': action,
  693. 'server': (active_server.server,
  694. active_server.port),
  695. 'response': ret[3]})
  696. return ret
  697. else:
  698. LOG.warning('ServerProxy: %(action)s failure for servers:'
  699. '%(server)r Response: %(response)s',
  700. {'action': action,
  701. 'server': (active_server.server,
  702. active_server.port),
  703. 'response': ret[3]})
  704. LOG.warning("ServerProxy: Error details: "
  705. "status=%(status)d, reason=%(reason)r, "
  706. "ret=%(ret)s, data=%(data)r",
  707. {'status': ret[0], 'reason': ret[1],
  708. 'ret': ret[2], 'data': ret[3]})
  709. active_server.failed = True
  710. # All servers failed, reset server list and try again next time
  711. LOG.error('ServerProxy: %(action)s failure for all servers: '
  712. '%(server)r',
  713. {'action': action,
  714. 'server': tuple((s.server,
  715. s.port) for s in self.servers)})
  716. return first_response
  717. def rest_action(self, action, resource, data='', errstr='%s',
  718. ignore_codes=None, headers=None, timeout=False):
  719. """rest_action
  720. Wrapper for rest_call that verifies success and raises a
  721. RemoteRestError on failure with a provided error string
  722. By default, 404 errors on DELETE calls are ignored because
  723. they already do not exist on the backend.
  724. """
  725. ignore_codes = ignore_codes or []
  726. headers = headers or {}
  727. if not ignore_codes and action == 'DELETE':
  728. ignore_codes = [404]
  729. resp = self.rest_call(action, resource, data, headers, ignore_codes,
  730. timeout)
  731. if self.server_failure(resp, ignore_codes):
  732. # Request wasn't success, nor can be ignored,
  733. # do a full synchronization if auto_sync_on_failure is True
  734. if (cfg.CONF.RESTPROXY.auto_sync_on_failure and
  735. resource != TOPOLOGY_PATH):
  736. LOG.error(_LE("NeutronRestProxyV2: Inconsistency with backend "
  737. "controller, triggering full synchronization. "
  738. "%(action)s %(resource)s."),
  739. {'action': action, 'resource': resource})
  740. sync_executed, topo_resp = self.force_topo_sync(check_ts=True)
  741. if sync_executed:
  742. return topo_resp
  743. # either topo_sync not executed or topo_sync itself failed
  744. LOG.error(errstr, resp[2])
  745. raise RemoteRestError(reason=resp[2], status=resp[0])
  746. if resp[0] in ignore_codes:
  747. LOG.info("NeutronRestProxyV2: Received and ignored error "
  748. "code %(code)s on %(action)s action to resource "
  749. "%(resource)s",
  750. {'code': resp[2], 'action': action,
  751. 'resource': resource})
  752. return resp
  753. def _check_and_raise_exception_unsupported_name(self, obj_type, obj):
  754. """_check_and_raise_exception_unsupported_name
  755. Used to sanity check object names and tenant names within an object.
  756. If they do not comply with the BCF expectation, raises an exception.
  757. :returns None if all ok
  758. :raises UnsupportedNameException or
  759. UnsupportedTenantNameInObjectException if name does not match
  760. BCF expectation
  761. """
  762. if ('name' in obj and obj['name'] and
  763. not is_valid_bcf_name(obj['name'])):
  764. raise UnsupportedNameException(obj_type=obj_type,
  765. obj_id=obj['id'],
  766. obj_name=obj['name'])
  767. if ('tenant_name' in obj and
  768. not is_valid_bcf_name(obj['tenant_name'])):
  769. raise UnsupportedTenantNameInObjectException(
  770. obj_type=obj_type, obj_id=obj['id'], obj_name=obj['name'],
  771. tenant_name=obj['tenant_name'])
  772. def rest_create_tenant(self, tenant_id):
  773. self._update_tenant_cache()
  774. self._rest_create_tenant(tenant_id)
  775. def _rest_create_tenant(self, tenant_id):
  776. tenant_name = self.keystone_tenants.get(tenant_id)
  777. if not tenant_name:
  778. raise TenantIDNotFound(tenant=tenant_id)
  779. if self.is_unicode_enabled():
  780. data = {"tenant_id": tenant_id, 'tenant_name': tenant_id,
  781. 'display-name': tenant_name}
  782. else:
  783. if not is_valid_bcf_name(tenant_name):
  784. raise UnsupportedNameException(obj_type=ObjTypeEnum.tenant,
  785. obj_id=tenant_id,
  786. obj_name=tenant_name)
  787. data = {"tenant_id": tenant_id, 'tenant_name': tenant_name}
  788. resource = TENANT_RESOURCE_PATH
  789. errstr = _("Unable to create tenant: %s")
  790. self.rest_action('POST', resource, data, errstr)
  791. def rest_delete_tenant(self, tenant_id):
  792. resource = TENANT_PATH % tenant_id
  793. errstr = _("Unable to delete tenant: %s")
  794. self.rest_action('DELETE', resource, errstr=errstr)
  795. def rest_create_router(self, tenant_id, router):
  796. self._check_and_raise_exception_unsupported_name(ObjTypeEnum.router,
  797. router)
  798. resource = ROUTER_RESOURCE_PATH % tenant_id
  799. data = {"router": router}
  800. errstr = _("Unable to create remote router: %s")
  801. self.rest_action('POST', resource, data, errstr)
  802. def rest_update_router(self, tenant_id, router, router_id):
  803. self._check_and_raise_exception_unsupported_name(ObjTypeEnum.router,
  804. router)
  805. resource = ROUTERS_PATH % (tenant_id, router_id)
  806. data = {"router": router}
  807. errstr = _("Unable to update remote router: %s")
  808. self.rest_action('PUT', resource, data, errstr)
  809. def rest_delete_router(self, tenant_id, router_id):
  810. resource = ROUTERS_PATH % (tenant_id, router_id)
  811. errstr = _("Unable to delete remote router: %s")
  812. self.rest_action('DELETE', resource, errstr=errstr)
  813. def rest_add_router_interface(self, tenant_id, router_id, intf_details):
  814. resource = ROUTER_INTF_OP_PATH % (tenant_id, router_id)
  815. data = {"interface": intf_details}
  816. errstr = _("Unable to add router interface: %s")
  817. self.rest_action('POST', resource, data, errstr)
  818. def rest_remove_router_interface(self, tenant_id, router_id, interface_id):
  819. resource = ROUTER_INTF_PATH % (tenant_id, router_id, interface_id)
  820. errstr = _("Unable to delete remote intf: %s")
  821. self.rest_action('DELETE', resource, errstr=errstr)
  822. def rest_create_network(self, tenant_id, network):
  823. self._check_and_raise_exception_unsupported_name(ObjTypeEnum.network,
  824. network)
  825. if 'subnets' in network:
  826. for subnet in network['subnets']:
  827. self._check_and_raise_exception_unsupported_name(
  828. ObjTypeEnum.subnet, subnet)
  829. resource = NET_RESOURCE_PATH % tenant_id
  830. data = {"network": network}
  831. errstr = _("Unable to create remote network: %s")
  832. self.rest_action('POST', resource, data, errstr)
  833. def rest_update_network(self, tenant_id, net_id, network):
  834. self._check_and_raise_exception_unsupported_name(ObjTypeEnum.network,
  835. network)
  836. if 'subnets' in network:
  837. for subnet in network['subnets']:
  838. self._check_and_raise_exception_unsupported_name(
  839. ObjTypeEnum.subnet, subnet)
  840. resource = NETWORKS_PATH % (tenant_id, net_id)
  841. data = {"network": network}
  842. errstr = _("Unable to update remote network: %s")
  843. self.rest_action('PUT', resource, data, errstr)
  844. def rest_delete_network(self, tenant_id, net_id):
  845. resource = NETWORKS_PATH % (tenant_id, net_id)
  846. errstr = _("Unable to delete remote network: %s")
  847. self.rest_action('DELETE', resource, errstr=errstr)
  848. def rest_create_securitygroup(self, sg):
  849. self._check_and_raise_exception_unsupported_name(
  850. ObjTypeEnum.security_group, sg)
  851. resource = SECURITY_GROUP_RESOURCE_PATH
  852. data = {"security-group": sg}
  853. errstr = _("Unable to create security group: %s")
  854. self.rest_action('POST', resource, data, errstr)
  855. def rest_delete_securitygroup(self, sg_id):
  856. resource = SECURITY_GROUP_PATH % sg_id
  857. errstr = _("Unable to delete security group: %s")
  858. self.rest_action('DELETE', resource, errstr=errstr)
  859. def rest_get_port(self, tenant_id, net_id, port_id):
  860. resource = ATTACHMENT_PATH % (tenant_id, net_id, port_id)
  861. errstr = _("Unable to retrieve port: %s")
  862. resp = self.rest_action('GET', resource, errstr=errstr,
  863. ignore_codes=[404])
  864. return None if resp[0] == 404 else resp[3]
  865. def rest_create_port(self, tenant_id, net_id, port):
  866. resource = ATTACHMENT_PATH % (tenant_id, net_id, port["id"])
  867. # security-group info might be useful for some higher level
  868. # operations, so we modify the json right before sending
  869. if not cfg.CONF.RESTPROXY.sync_security_groups:
  870. if 'security_groups' in port:
  871. del port['security_groups']
  872. data = {"port": port}
  873. device_id = port.get("device_id")
  874. if not port["mac_address"] or not device_id:
  875. # controller only cares about ports attached to devices
  876. LOG.warning("No device MAC attached to port %s. "
  877. "Skipping notification to controller.",
  878. port["id"])
  879. return
  880. data["attachment"] = {"id": device_id,
  881. "mac": port["mac_address"]}
  882. errstr = _("Unable to create remote port: %s")
  883. self.rest_action('PUT', resource, data, errstr)
  884. def rest_delete_port(self, tenant_id, network_id, port_id):
  885. resource = ATTACHMENT_PATH % (tenant_id, network_id, port_id)
  886. errstr = _("Unable to delete remote port: %s")
  887. self.rest_action('DELETE', resource, errstr=errstr)
  888. def rest_update_port(self, tenant_id, net_id, port):
  889. # Controller has no update operation for the port endpoint
  890. # the create PUT method will replace
  891. self.rest_create_port(tenant_id, net_id, port)
  892. def rest_create_floatingip(self, tenant_id, floatingip):
  893. resource = FLOATINGIPS_PATH % (tenant_id, floatingip['id'])
  894. errstr = _("Unable to create floating IP: %s")
  895. self._ensure_tenant_cache(tenant_id)
  896. self.rest_action('PUT', resource, floatingip, errstr=errstr)
  897. def rest_update_floatingip(self, tenant_id, floatingip, oldid):
  898. resource = FLOATINGIPS_PATH % (tenant_id, oldid)
  899. errstr = _("Unable to update floating IP: %s")
  900. self.rest_action('PUT', resource, floatingip, errstr=errstr)
  901. def rest_delete_floatingip(self, tenant_id, oldid):
  902. resource = FLOATINGIPS_PATH % (tenant_id, oldid)
  903. errstr = _("Unable to delete floating IP: %s")
  904. self.rest_action('DELETE', resource, errstr=errstr)
  905. def rest_get_switch(self, switch_id):
  906. resource = SWITCHES_PATH % switch_id
  907. errstr = _("Unable to retrieve switch: %s")
  908. resp = self.rest_action('GET', resource, errstr=errstr,
  909. ignore_codes=[404])
  910. # return None if switch not found, else return switch info
  911. return None if resp[0] == 404 else resp[3]
  912. def rest_get_testpath(self, src, dst):
  913. resource = TESTPATH_PATH % {'src-tenant': src['tenant'],
  914. 'src-segment': src['segment'],
  915. 'src-ip': src['ip'],
  916. 'dst-ip': dst['ip']}
  917. errstr = _("Unable to retrieve results for testpath ID: %s")
  918. resp = self.rest_action('GET', resource, errstr=errstr,
  919. ignore_codes=[404])
  920. # return None if testpath not found, else return testpath info
  921. return None if (resp[0] not in range(200, 300)) else resp[3]
  922. def rest_create_tenantpolicy(self, tenant_id, policy):
  923. policy['ipproto'] = policy['protocol']
  924. resource = TENANTPOLICY_RESOURCE_PATH % tenant_id
  925. data = {"policy": policy}
  926. errstr = _("Unable to create policy: %s")
  927. self.rest_action('POST', resource, data, errstr)
  928. def rest_update_tenantpolicy(self, tenant_id, policy):
  929. policy['ipproto'] = policy['protocol']
  930. resource = TENANTPOLICIES_PATH % (tenant_id, policy['priority'])
  931. data = {"policy": policy}
  932. errstr = _("Unable to update policy: %s")
  933. self.rest_action('PUT', resource, data, errstr)
  934. def rest_delete_tenantpolicy(self, tenant_id, policy_prio):
  935. resource = TENANTPOLICIES_PATH % (tenant_id, policy_prio)
  936. errstr = _("Unable to delete remote policy: %s")
  937. self.rest_action('DELETE', resource, errstr=errstr)
  938. def rest_update_osp_cluster_info(self, data):
  939. resource = OOSM_OSP_RESOURCE_PATH
  940. # nsapi does not append origination for cluster info
  941. data['origination'] = self.neutron_id
  942. errstr = _("Unable to update cluster visibility info: %s")
  943. self.rest_action('POST', resource, data, errstr)
  944. def rest_get_osp_cluster_last_update(self):
  945. resource = OOSM_OSP_LAST_UPDATE_PATH % self.neutron_id
  946. errstr = _("Failed to get last update time for the cluster: %s")
  947. resp = self.rest_action('GET', resource, errstr=errstr)
  948. return resp[3].get('last-update')
  949. def _consistency_watchdog(self, polling_interval=60):
  950. if 'consistency' not in self.get_capabilities():
  951. LOG.warning("Backend server(s) do not support automated "
  952. "consistency checks.")
  953. return
  954. if not polling_interval:
  955. LOG.warning("Consistency watchdog disabled by polling "
  956. "interval setting of %s.", polling_interval)
  957. return
  958. while True:
  959. # If consistency is supported, all we have to do is make any
  960. # rest call and the consistency header will be added. If it
  961. # doesn't match, the backend will return a synchronization error
  962. # that will be handled by the rest_action.
  963. eventlet.sleep(polling_interval)
  964. try:
  965. self.rest_action('GET', HEALTH_PATH)
  966. except Exception:
  967. LOG.exception("Encountered an error checking controller "
  968. "health.")
  969. def force_topo_sync(self, check_ts=True):
  970. """Execute a topology_sync between OSP and BCF.
  971. Topology sync collects all data from Openstack and pushes to BCF in
  972. one single REST call. This is a heavy operation and is not executed
  973. automatically.
  974. Conditions when this would be called:
  975. (1) during ServerPool initialization
  976. (a) must check previous timestamp
  977. (2) if periodic keystone tenant_cache find a diff in tenant list
  978. (a) should not check previous timestamp
  979. (3) externally triggered by neutron force-bcf-sync command
  980. (a) should not check previous timestamp
  981. (4) a rest_call to BCF fails on both servers and failure_code is not
  982. part of the ignore_codes list
  983. (a) must check previous timestamp
  984. :param check_ts: boolean flag to check previous
  985. timestamp < TOPO_SYNC_EXPIRED_SECS
  986. prev_resp: a REST response tuple from the previous failed REST
  987. call if available. If we skip topo_sync, return the
  988. failure as previously observed.
  989. :return: (sync_executed, response)
  990. sync_executed - Boolean - returns True if we were able to
  991. acquire a lock to perform topo_sync,
  992. False otherwise
  993. response - tuple of the typical HTTP response from REST call
  994. (response.status, response.reason, respstr, respdata)
  995. """
  996. LOG.info(_LI('TOPO_SYNC requested with check_ts %s'), check_ts)
  997. if not self.get_topo_function:
  998. raise cfg.Error(_('Server requires synchronization, '
  999. 'but no topology function was defined.'))
  1000. # get current timestamp
  1001. curr_ts = str(time.time())
  1002. hash_handler = cdb.HashHandler(timestamp_ms=curr_ts)
  1003. if not hash_handler.lock(check_ts):
  1004. LOG.info(_LI("TOPO_SYNC: lock() returned False. Skipping."))
  1005. return False, TOPO_RESPONSE_OK
  1006. # else, perform topo_sync
  1007. try:
  1008. LOG.debug("TOPO_SYNC: requested at %(request_ts)s started at "
  1009. "%(start_ts)s",
  1010. {'request_ts': cdb.convert_ts_to_datetime(curr_ts),
  1011. 'start_ts': cdb.convert_ts_to_datetime(time.time())})
  1012. data = self.get_topo_function(
  1013. **self.get_topo_function_args)
  1014. if not data:
  1015. # when keystone sync fails, it fails silently with data = None
  1016. # that is wrong, we need to raise an exception
  1017. raise Exception(_("TOPO_SYNC: failed to retrieve data."))
  1018. LOG.debug("TOPO_SYNC: data received from OSP, sending "
  1019. "request to BCF.")
  1020. errstr = _("Unable to perform forced topology_sync: %s")
  1021. return True, self.rest_action('POST', TOPOLOGY_PATH, data, errstr)
  1022. except Exception as e:
  1023. # if encountered an exception, set to previous timestamp
  1024. LOG.warning(_LW("TOPO_SYNC: Exception during topology sync. "
  1025. "Consistency DB timestamp will not be updated."))
  1026. hash_handler.unlock(set_prev_ts=True)
  1027. raise e
  1028. finally:
  1029. hash_handler.unlock()
  1030. diff = time.time() - float(hash_handler.lock_ts)
  1031. LOG.info(_LI("TOPO_SYNC: took %s seconds to execute topo_sync. "
  1032. "consistency_db unlocked."),
  1033. str(diff))
  1034. def _ensure_tenant_cache(self, tenant_id):
  1035. if tenant_id not in self.keystone_tenants:
  1036. self._update_tenant_cache()
  1037. def _update_tenant_cache(self, reconcile=True, ratelimit=False):
  1038. if ratelimit is True and self._last_keystone_sync_time is not None:
  1039. if time.time() - self._last_keystone_sync_time <= \
  1040. KEYSTONE_SYNC_RATE_LIMIT:
  1041. return
  1042. try:
  1043. auth = v3.Password(auth_url=self.auth_url,
  1044. username=self.auth_user,
  1045. password=self.auth_password,
  1046. project_name=self.auth_tenant,
  1047. user_domain_name=self.user_domain_name,
  1048. project_domain_name=self.project_domain_name)
  1049. sess = session.Session(auth=auth)
  1050. keystone_client = ksclient.Client(session=sess)
  1051. tenants = keystone_client.projects.list()
  1052. if self.is_unicode_enabled():
  1053. new_cached_tenants = {tn.id: tn.name
  1054. for tn in tenants}
  1055. else:
  1056. new_cached_tenants = {tn.id: Util.format_resource_name(tn.name)
  1057. for tn in tenants}
  1058. # Add SERVICE_TENANT to handle hidden network for VRRP
  1059. new_cached_tenants[SERVICE_TENANT] = SERVICE_TENANT
  1060. LOG.debug("New TENANTS: %s \nPrevious Tenants %s",
  1061. new_cached_tenants, self.keystone_tenants)
  1062. diff = DictDiffer(new_cached_tenants, self.keystone_tenants)
  1063. self.keystone_tenants = new_cached_tenants
  1064. if reconcile:
  1065. for tenant_id in diff.added():
  1066. LOG.debug("TENANT create: id %s name %s",
  1067. tenant_id, self.keystone_tenants[tenant_id])
  1068. self._rest_create_tenant(tenant_id)
  1069. for tenant_id in diff.removed():
  1070. LOG.debug("TENANT delete: id %s", tenant_id)
  1071. self.rest_delete_tenant(tenant_id)
  1072. if diff.changed():
  1073. LOG.debug("Tenant cache outdated. Forcing topo_sync.")
  1074. self.force_topo_sync(check_ts=False)
  1075. return True
  1076. except Exception:
  1077. LOG.exception("Encountered an error syncing with keystone.")
  1078. return False
  1079. finally:
  1080. self._last_keystone_sync_time = time.time()
  1081. def _keystone_sync(self, polling_interval=300):
  1082. while True:
  1083. eventlet.sleep(polling_interval)
  1084. self._update_tenant_cache()
  1085. class HTTPSConnectionWithValidation(httplib.HTTPSConnection):
  1086. # If combined_cert is None, the connection will continue without
  1087. # any certificate validation.
  1088. combined_cert = None
  1089. def connect(self):
  1090. sock = socket.create_connection((self.host, self.port),
  1091. self.timeout, self.source_address)
  1092. if self._tunnel_host:
  1093. self.sock = sock
  1094. self._tunnel()
  1095. if self.combined_cert:
  1096. self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
  1097. cert_reqs=ssl.CERT_REQUIRED,
  1098. ca_certs=self.combined_cert,
  1099. ssl_version=ssl.PROTOCOL_SSLv23)
  1100. else:
  1101. self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
  1102. cert_reqs=ssl.CERT_NONE,
  1103. ssl_version=ssl.PROTOCOL_SSLv23)