Manage a pool of nodes for a distributed test infrastructure
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

provider.py 22KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595
  1. # Copyright (C) 2011-2013 OpenStack Foundation
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. # implied.
  13. #
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import copy
  17. import logging
  18. import operator
  19. import time
  20. import openstack
  21. from nodepool import exceptions
  22. from nodepool.driver import Provider
  23. from nodepool.driver.utils import QuotaInformation
  24. from nodepool.nodeutils import iterate_timeout
  25. from nodepool.task_manager import TaskManager
  26. from nodepool import stats
  27. from nodepool import version
  28. from nodepool import zk
  29. # Import entire module to avoid partial-loading, circular import
  30. from nodepool.driver.openstack import handler
  31. IPS_LIST_AGE = 5 # How long to keep a cached copy of the ip list
  32. MAX_QUOTA_AGE = 5 * 60 # How long to keep the quota information cached
  33. class OpenStackProvider(Provider):
  34. log = logging.getLogger("nodepool.driver.openstack.OpenStackProvider")
  35. def __init__(self, provider, use_taskmanager):
  36. self.provider = provider
  37. self._images = {}
  38. self._networks = {}
  39. self.__flavors = {} # TODO(gtema): caching
  40. self.__azs = None
  41. self._use_taskmanager = use_taskmanager
  42. self._taskmanager = None
  43. self._current_nodepool_quota = None
  44. self._zk = None
  45. self._down_ports = set()
  46. self._last_port_cleanup = None
  47. self._port_cleanup_interval_secs = 180
  48. self._statsd = stats.get_client()
  49. def start(self, zk_conn):
  50. if self._use_taskmanager:
  51. self._taskmanager = TaskManager(self.provider.name,
  52. self.provider.rate)
  53. self._taskmanager.start()
  54. self.resetClient()
  55. self._zk = zk_conn
  56. def stop(self):
  57. if self._taskmanager:
  58. self._taskmanager.stop()
  59. def join(self):
  60. if self._taskmanager:
  61. self._taskmanager.join()
  62. def getRequestHandler(self, poolworker, request):
  63. return handler.OpenStackNodeRequestHandler(poolworker, request)
  64. # TODO(gtema): caching
  65. @property
  66. def _flavors(self):
  67. if not self.__flavors:
  68. self.__flavors = self._getFlavors()
  69. return self.__flavors
  70. def _getClient(self):
  71. if self._use_taskmanager:
  72. manager = self._taskmanager
  73. else:
  74. manager = None
  75. return openstack.connection.Connection(
  76. config=self.provider.cloud_config,
  77. task_manager=manager,
  78. app_name='nodepool',
  79. app_version=version.version_info.version_string()
  80. )
  81. def quotaNeededByNodeType(self, ntype, pool):
  82. provider_label = pool.labels[ntype]
  83. flavor = self.findFlavor(provider_label.flavor_name,
  84. provider_label.min_ram)
  85. return QuotaInformation.construct_from_flavor(flavor)
  86. def estimatedNodepoolQuota(self):
  87. '''
  88. Determine how much quota is available for nodepool managed resources.
  89. This needs to take into account the quota of the tenant, resources
  90. used outside of nodepool and the currently used resources by nodepool,
  91. max settings in nodepool config. This is cached for MAX_QUOTA_AGE
  92. seconds.
  93. :return: Total amount of resources available which is currently
  94. available to nodepool including currently existing nodes.
  95. '''
  96. if self._current_nodepool_quota:
  97. now = time.time()
  98. if now < self._current_nodepool_quota['timestamp'] + MAX_QUOTA_AGE:
  99. return copy.deepcopy(self._current_nodepool_quota['quota'])
  100. limits = self._client.get_compute_limits()
  101. # This is initialized with the full tenant quota and later becomes
  102. # the quota available for nodepool.
  103. nodepool_quota = QuotaInformation.construct_from_limits(limits)
  104. self.log.debug("Provider quota for %s: %s",
  105. self.provider.name, nodepool_quota)
  106. # Subtract the unmanaged quota usage from nodepool_max
  107. # to get the quota available for us.
  108. nodepool_quota.subtract(self.unmanagedQuotaUsed())
  109. self._current_nodepool_quota = {
  110. 'quota': nodepool_quota,
  111. 'timestamp': time.time()
  112. }
  113. self.log.debug("Available quota for %s: %s",
  114. self.provider.name, nodepool_quota)
  115. return copy.deepcopy(nodepool_quota)
  116. def invalidateQuotaCache(self):
  117. self._current_nodepool_quota['timestamp'] = 0
  118. def estimatedNodepoolQuotaUsed(self, pool=None):
  119. '''
  120. Sums up the quota used (or planned) currently by nodepool. If pool is
  121. given it is filtered by the pool.
  122. :param pool: If given, filtered by the pool.
  123. :return: Calculated quota in use by nodepool
  124. '''
  125. used_quota = QuotaInformation()
  126. for node in self._zk.nodeIterator():
  127. if node.provider == self.provider.name:
  128. try:
  129. if pool and not node.pool == pool.name:
  130. continue
  131. provider_pool = self.provider.pools.get(node.pool)
  132. if not provider_pool:
  133. self.log.warning(
  134. "Cannot find provider pool for node %s" % node)
  135. # This node is in a funny state we log it for debugging
  136. # but move on and don't account it as we can't properly
  137. # calculate its cost without pool info.
  138. continue
  139. if node.type[0] not in provider_pool.labels:
  140. self.log.warning("Node type is not in provider pool "
  141. "for node %s" % node)
  142. # This node is also in a funny state; the config
  143. # may have changed under it. It should settle out
  144. # eventually when it's deleted.
  145. continue
  146. node_resources = self.quotaNeededByNodeType(
  147. node.type[0], provider_pool)
  148. used_quota.add(node_resources)
  149. except Exception:
  150. self.log.exception("Couldn't consider invalid node %s "
  151. "for quota:" % node)
  152. return used_quota
  153. def unmanagedQuotaUsed(self):
  154. '''
  155. Sums up the quota used by servers unmanaged by nodepool.
  156. :return: Calculated quota in use by unmanaged servers
  157. '''
  158. flavors = self.listFlavorsById()
  159. used_quota = QuotaInformation()
  160. node_ids = set([n.id for n in self._zk.nodeIterator()])
  161. for server in self.listNodes():
  162. meta = server.get('metadata', {})
  163. nodepool_provider_name = meta.get('nodepool_provider_name')
  164. if (nodepool_provider_name and
  165. nodepool_provider_name == self.provider.name):
  166. # This provider (regardless of the launcher) owns this
  167. # server so it must not be accounted for unmanaged
  168. # quota; unless it has leaked.
  169. nodepool_node_id = meta.get('nodepool_node_id')
  170. # FIXME(tobiash): Add a test case for this
  171. if nodepool_node_id and nodepool_node_id in node_ids:
  172. # It has not leaked.
  173. continue
  174. flavor = flavors.get(server.flavor.id)
  175. used_quota.add(QuotaInformation.construct_from_flavor(flavor))
  176. return used_quota
  177. def resetClient(self):
  178. self._client = self._getClient()
  179. def _getFlavors(self):
  180. flavors = self.listFlavors()
  181. flavors.sort(key=operator.itemgetter('ram'))
  182. return flavors
  183. # TODO(gtema): These next three methods duplicate logic that is in
  184. # openstacksdk, caching is not enabled there by default
  185. # Remove it when caching is default
  186. def _findFlavorByName(self, flavor_name):
  187. for f in self._flavors:
  188. if flavor_name in (f['name'], f['id']):
  189. return f
  190. raise Exception("Unable to find flavor: %s" % flavor_name)
  191. def _findFlavorByRam(self, min_ram, flavor_name):
  192. for f in self._flavors:
  193. if (f['ram'] >= min_ram
  194. and (not flavor_name or flavor_name in f['name'])):
  195. return f
  196. raise Exception("Unable to find flavor with min ram: %s" % min_ram)
  197. def findFlavor(self, flavor_name, min_ram):
  198. # Note: this will throw an error if the provider is offline
  199. # but all the callers are in threads (they call in via CreateServer) so
  200. # the mainloop won't be affected.
  201. # TODO(gtema): enable commented block when openstacksdk has caching
  202. # enabled by default
  203. # if min_ram:
  204. # return self._client.get_flavor_by_ram(
  205. # ram=min_ram,
  206. # include=flavor_name,
  207. # get_extra=False)
  208. # else:
  209. # return self._client.get_flavor(flavor_name, get_extra=False)
  210. if min_ram:
  211. return self._findFlavorByRam(min_ram, flavor_name)
  212. else:
  213. return self._findFlavorByName(flavor_name)
  214. def findImage(self, name):
  215. if name in self._images:
  216. return self._images[name]
  217. image = self._client.get_image(name)
  218. self._images[name] = image
  219. return image
  220. def findNetwork(self, name):
  221. if name in self._networks:
  222. return self._networks[name]
  223. network = self._client.get_network(name)
  224. self._networks[name] = network
  225. return network
  226. def deleteImage(self, name):
  227. if name in self._images:
  228. del self._images[name]
  229. return self._client.delete_image(name)
  230. def createServer(self, name, image,
  231. flavor_name=None, min_ram=None,
  232. az=None, key_name=None, config_drive=True,
  233. nodepool_node_id=None, nodepool_node_label=None,
  234. nodepool_image_name=None,
  235. networks=None, security_groups=None,
  236. boot_from_volume=False, volume_size=50,
  237. instance_properties=None):
  238. if not networks:
  239. networks = []
  240. if not isinstance(image, dict):
  241. # if it's a dict, we already have the cloud id. If it's not,
  242. # we don't know if it's name or ID so need to look it up
  243. image = self.findImage(image)
  244. flavor = self.findFlavor(flavor_name=flavor_name, min_ram=min_ram)
  245. create_args = dict(name=name,
  246. image=image,
  247. flavor=flavor,
  248. config_drive=config_drive)
  249. if boot_from_volume:
  250. create_args['boot_from_volume'] = boot_from_volume
  251. create_args['volume_size'] = volume_size
  252. # NOTE(pabelanger): Always cleanup volumes when we delete a server.
  253. create_args['terminate_volume'] = True
  254. if key_name:
  255. create_args['key_name'] = key_name
  256. if az:
  257. create_args['availability_zone'] = az
  258. if security_groups:
  259. create_args['security_groups'] = security_groups
  260. nics = []
  261. for network in networks:
  262. net_id = self.findNetwork(network)['id']
  263. nics.append({'net-id': net_id})
  264. if nics:
  265. create_args['nics'] = nics
  266. # Put provider.name and image_name in as groups so that ansible
  267. # inventory can auto-create groups for us based on each of those
  268. # qualities
  269. # Also list each of those values directly so that non-ansible
  270. # consumption programs don't need to play a game of knowing that
  271. # groups[0] is the image name or anything silly like that.
  272. groups_list = [self.provider.name]
  273. if nodepool_image_name:
  274. groups_list.append(nodepool_image_name)
  275. if nodepool_node_label:
  276. groups_list.append(nodepool_node_label)
  277. meta = dict(
  278. groups=",".join(groups_list),
  279. nodepool_provider_name=self.provider.name,
  280. )
  281. # merge in any provided properties
  282. if instance_properties:
  283. meta = {**instance_properties, **meta}
  284. if nodepool_node_id:
  285. meta['nodepool_node_id'] = nodepool_node_id
  286. if nodepool_image_name:
  287. meta['nodepool_image_name'] = nodepool_image_name
  288. if nodepool_node_label:
  289. meta['nodepool_node_label'] = nodepool_node_label
  290. create_args['meta'] = meta
  291. try:
  292. return self._client.create_server(wait=False, **create_args)
  293. except openstack.exceptions.BadRequestException:
  294. # We've gotten a 400 error from nova - which means the request
  295. # was malformed. The most likely cause of that, unless something
  296. # became functionally and systemically broken, is stale az, image
  297. # or flavor cache. Log a message, invalidate the caches so that
  298. # next time we get new caches.
  299. self._images = {}
  300. self.__azs = None
  301. self.__flavors = {} # TODO(gtema): caching
  302. self.log.info(
  303. "Clearing az, flavor and image caches due to 400 error "
  304. "from nova")
  305. raise
  306. def getServer(self, server_id):
  307. return self._client.get_server(server_id)
  308. def getServerConsole(self, server_id):
  309. try:
  310. return self._client.get_server_console(server_id)
  311. except openstack.exceptions.OpenStackCloudException:
  312. return None
  313. def waitForServer(self, server, timeout=3600, auto_ip=True):
  314. return self._client.wait_for_server(
  315. server=server, auto_ip=auto_ip,
  316. reuse=False, timeout=timeout)
  317. def waitForNodeCleanup(self, server_id, timeout=600):
  318. for count in iterate_timeout(
  319. timeout, exceptions.ServerDeleteException,
  320. "server %s deletion" % server_id):
  321. if not self.getServer(server_id):
  322. return
  323. def createImage(self, server, image_name, meta):
  324. return self._client.create_image_snapshot(
  325. image_name, server, **meta)
  326. def getImage(self, image_id):
  327. return self._client.get_image(image_id)
  328. def labelReady(self, label):
  329. if not label.cloud_image:
  330. return False
  331. # If an image ID was supplied, we'll assume it is ready since
  332. # we don't currently have a way of validating that (except during
  333. # server creation).
  334. if label.cloud_image.image_id:
  335. return True
  336. image = self.getImage(label.cloud_image.external_name)
  337. if not image:
  338. self.log.warning(
  339. "Provider %s is configured to use %s as the"
  340. " cloud-image for label %s and that"
  341. " cloud-image could not be found in the"
  342. " cloud." % (self.provider.name,
  343. label.cloud_image.external_name,
  344. label.name))
  345. return False
  346. return True
  347. def uploadImage(self, image_name, filename, image_type=None, meta=None,
  348. md5=None, sha256=None):
  349. # configure glance and upload image. Note the meta flags
  350. # are provided as custom glance properties
  351. # NOTE: we have wait=True set here. This is not how we normally
  352. # do things in nodepool, preferring to poll ourselves thankyouverymuch.
  353. # However - two things to note:
  354. # - PUT has no aysnc mechanism, so we have to handle it anyway
  355. # - v2 w/task waiting is very strange and complex - but we have to
  356. # block for our v1 clouds anyway, so we might as well
  357. # have the interface be the same and treat faking-out
  358. # a openstacksdk-level fake-async interface later
  359. if not meta:
  360. meta = {}
  361. if image_type:
  362. meta['disk_format'] = image_type
  363. image = self._client.create_image(
  364. name=image_name,
  365. filename=filename,
  366. is_public=False,
  367. wait=True,
  368. md5=md5,
  369. sha256=sha256,
  370. **meta)
  371. return image.id
  372. def listPorts(self, status=None):
  373. '''
  374. List known ports.
  375. :param str status: A valid port status. E.g., 'ACTIVE' or 'DOWN'.
  376. '''
  377. if status:
  378. ports = self._client.list_ports(filters={'status': status})
  379. else:
  380. ports = self._client.list_ports()
  381. return ports
  382. def deletePort(self, port_id):
  383. self._client.delete_port(port_id)
  384. def listImages(self):
  385. return self._client.list_images()
  386. def listFlavors(self):
  387. return self._client.list_flavors(get_extra=False)
  388. def listFlavorsById(self):
  389. flavors = {}
  390. for flavor in self._client.list_flavors(get_extra=False):
  391. flavors[flavor.id] = flavor
  392. return flavors
  393. def listNodes(self):
  394. # list_servers carries the nodepool server list caching logic
  395. return self._client.list_servers()
  396. def deleteServer(self, server_id):
  397. return self._client.delete_server(server_id, delete_ips=True)
  398. def cleanupNode(self, server_id):
  399. server = self.getServer(server_id)
  400. if not server:
  401. raise exceptions.NotFound()
  402. self.log.debug('Deleting server %s' % server_id)
  403. self.deleteServer(server_id)
  404. def cleanupLeakedInstances(self):
  405. '''
  406. Delete any leaked server instances.
  407. Remove any servers found in this provider that are not recorded in
  408. the ZooKeeper data.
  409. '''
  410. deleting_nodes = {}
  411. for node in self._zk.nodeIterator():
  412. if node.state == zk.DELETING:
  413. if node.provider != self.provider.name:
  414. continue
  415. if node.provider not in deleting_nodes:
  416. deleting_nodes[node.provider] = []
  417. deleting_nodes[node.provider].append(node.external_id)
  418. for server in self.listNodes():
  419. meta = server.get('metadata', {})
  420. if 'nodepool_provider_name' not in meta:
  421. continue
  422. if meta['nodepool_provider_name'] != self.provider.name:
  423. # Another launcher, sharing this provider but configured
  424. # with a different name, owns this.
  425. continue
  426. if (self.provider.name in deleting_nodes and
  427. server.id in deleting_nodes[self.provider.name]):
  428. # Already deleting this node
  429. continue
  430. if not self._zk.getNode(meta['nodepool_node_id']):
  431. self.log.warning(
  432. "Marking for delete leaked instance %s (%s) in %s "
  433. "(unknown node id %s)",
  434. server.name, server.id, self.provider.name,
  435. meta['nodepool_node_id']
  436. )
  437. # Create an artifical node to use for deleting the server.
  438. node = zk.Node()
  439. node.external_id = server.id
  440. node.provider = self.provider.name
  441. node.state = zk.DELETING
  442. self._zk.storeNode(node)
  443. def filterComputePorts(self, ports):
  444. '''
  445. Return a list of compute ports (or no device owner).
  446. We are not interested in ports for routers or DHCP.
  447. '''
  448. ret = []
  449. for p in ports:
  450. if p.device_owner is None or p.device_owner.startswith("compute:"):
  451. ret.append(p)
  452. return ret
  453. def cleanupLeakedPorts(self):
  454. if not self._last_port_cleanup:
  455. self._last_port_cleanup = time.monotonic()
  456. ports = self.listPorts(status='DOWN')
  457. ports = self.filterComputePorts(ports)
  458. self._down_ports = set([p.id for p in ports])
  459. return
  460. # Return if not enough time has passed between cleanup
  461. last_check_in_secs = int(time.monotonic() - self._last_port_cleanup)
  462. if last_check_in_secs <= self._port_cleanup_interval_secs:
  463. return
  464. ports = self.listPorts(status='DOWN')
  465. ports = self.filterComputePorts(ports)
  466. current_set = set([p.id for p in ports])
  467. remove_set = current_set & self._down_ports
  468. removed_count = 0
  469. for port_id in remove_set:
  470. try:
  471. self.deletePort(port_id)
  472. except Exception:
  473. self.log.exception("Exception deleting port %s in %s:",
  474. port_id, self.provider.name)
  475. else:
  476. removed_count += 1
  477. self.log.debug("Removed DOWN port %s in %s",
  478. port_id, self.provider.name)
  479. if self._statsd and removed_count:
  480. key = 'nodepool.provider.%s.downPorts' % (self.provider.name)
  481. self._statsd.incr(key, removed_count)
  482. self._last_port_cleanup = time.monotonic()
  483. # Rely on OpenStack to tell us the down ports rather than doing our
  484. # own set adjustment.
  485. ports = self.listPorts(status='DOWN')
  486. ports = self.filterComputePorts(ports)
  487. self._down_ports = set([p.id for p in ports])
  488. def cleanupLeakedResources(self):
  489. self.cleanupLeakedInstances()
  490. self.cleanupLeakedPorts()
  491. if self.provider.clean_floating_ips:
  492. self._client.delete_unattached_floating_ips()
  493. def getAZs(self):
  494. if self.__azs is None:
  495. self.__azs = self._client.list_availability_zone_names()
  496. if not self.__azs:
  497. # If there are no zones, return a list containing None so that
  498. # random.choice can pick None and pass that to Nova. If this
  499. # feels dirty, please direct your ire to policy.json and the
  500. # ability to turn off random portions of the OpenStack API.
  501. self.__azs = [None]
  502. return self.__azs