Manage a pool of nodes for a distributed test infrastructure
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

provider.py 19KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. # Copyright (C) 2011-2013 OpenStack Foundation
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. # implied.
  13. #
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import copy
  17. import logging
  18. import operator
  19. import time
  20. import openstack
  21. from nodepool import exceptions
  22. from nodepool.driver import Provider
  23. from nodepool.driver.utils import QuotaInformation
  24. from nodepool.nodeutils import iterate_timeout
  25. from nodepool.task_manager import TaskManager
  26. from nodepool import version
  27. from nodepool import zk
  28. # Import entire module to avoid partial-loading, circular import
  29. from nodepool.driver.openstack import handler
  30. IPS_LIST_AGE = 5 # How long to keep a cached copy of the ip list
  31. MAX_QUOTA_AGE = 5 * 60 # How long to keep the quota information cached
  32. class OpenStackProvider(Provider):
  33. log = logging.getLogger("nodepool.driver.openstack.OpenStackProvider")
  34. def __init__(self, provider, use_taskmanager):
  35. self.provider = provider
  36. self._images = {}
  37. self._networks = {}
  38. self.__flavors = {} # TODO(gtema): caching
  39. self.__azs = None
  40. self._use_taskmanager = use_taskmanager
  41. self._taskmanager = None
  42. self._current_nodepool_quota = None
  43. self._zk = None
  44. def start(self, zk_conn):
  45. if self._use_taskmanager:
  46. self._taskmanager = TaskManager(self.provider.name,
  47. self.provider.rate)
  48. self._taskmanager.start()
  49. self.resetClient()
  50. self._zk = zk_conn
  51. def stop(self):
  52. if self._taskmanager:
  53. self._taskmanager.stop()
  54. def join(self):
  55. if self._taskmanager:
  56. self._taskmanager.join()
  57. def getRequestHandler(self, poolworker, request):
  58. return handler.OpenStackNodeRequestHandler(poolworker, request)
  59. # TODO(gtema): caching
  60. @property
  61. def _flavors(self):
  62. if not self.__flavors:
  63. self.__flavors = self._getFlavors()
  64. return self.__flavors
  65. def _getClient(self):
  66. if self._use_taskmanager:
  67. manager = self._taskmanager
  68. else:
  69. manager = None
  70. return openstack.connection.Connection(
  71. config=self.provider.cloud_config,
  72. task_manager=manager,
  73. app_name='nodepool',
  74. app_version=version.version_info.version_string()
  75. )
  76. def quotaNeededByNodeType(self, ntype, pool):
  77. provider_label = pool.labels[ntype]
  78. flavor = self.findFlavor(provider_label.flavor_name,
  79. provider_label.min_ram)
  80. return QuotaInformation.construct_from_flavor(flavor)
  81. def estimatedNodepoolQuota(self):
  82. '''
  83. Determine how much quota is available for nodepool managed resources.
  84. This needs to take into account the quota of the tenant, resources
  85. used outside of nodepool and the currently used resources by nodepool,
  86. max settings in nodepool config. This is cached for MAX_QUOTA_AGE
  87. seconds.
  88. :return: Total amount of resources available which is currently
  89. available to nodepool including currently existing nodes.
  90. '''
  91. if self._current_nodepool_quota:
  92. now = time.time()
  93. if now < self._current_nodepool_quota['timestamp'] + MAX_QUOTA_AGE:
  94. return copy.deepcopy(self._current_nodepool_quota['quota'])
  95. limits = self._client.get_compute_limits()
  96. # This is initialized with the full tenant quota and later becomes
  97. # the quota available for nodepool.
  98. nodepool_quota = QuotaInformation.construct_from_limits(limits)
  99. self.log.debug("Provider quota for %s: %s",
  100. self.provider.name, nodepool_quota)
  101. # Subtract the unmanaged quota usage from nodepool_max
  102. # to get the quota available for us.
  103. nodepool_quota.subtract(self.unmanagedQuotaUsed())
  104. self._current_nodepool_quota = {
  105. 'quota': nodepool_quota,
  106. 'timestamp': time.time()
  107. }
  108. self.log.debug("Available quota for %s: %s",
  109. self.provider.name, nodepool_quota)
  110. return copy.deepcopy(nodepool_quota)
  111. def invalidateQuotaCache(self):
  112. self._current_nodepool_quota['timestamp'] = 0
  113. def estimatedNodepoolQuotaUsed(self, pool=None):
  114. '''
  115. Sums up the quota used (or planned) currently by nodepool. If pool is
  116. given it is filtered by the pool.
  117. :param pool: If given, filtered by the pool.
  118. :return: Calculated quota in use by nodepool
  119. '''
  120. used_quota = QuotaInformation()
  121. for node in self._zk.nodeIterator():
  122. if node.provider == self.provider.name:
  123. try:
  124. if pool and not node.pool == pool.name:
  125. continue
  126. provider_pool = self.provider.pools.get(node.pool)
  127. if not provider_pool:
  128. self.log.warning(
  129. "Cannot find provider pool for node %s" % node)
  130. # This node is in a funny state we log it for debugging
  131. # but move on and don't account it as we can't properly
  132. # calculate its cost without pool info.
  133. continue
  134. if node.type[0] not in provider_pool.labels:
  135. self.log.warning("Node type is not in provider pool "
  136. "for node %s" % node)
  137. # This node is also in a funny state; the config
  138. # may have changed under it. It should settle out
  139. # eventually when it's deleted.
  140. continue
  141. node_resources = self.quotaNeededByNodeType(
  142. node.type[0], provider_pool)
  143. used_quota.add(node_resources)
  144. except Exception:
  145. self.log.exception("Couldn't consider invalid node %s "
  146. "for quota:" % node)
  147. return used_quota
  148. def unmanagedQuotaUsed(self):
  149. '''
  150. Sums up the quota used by servers unmanaged by nodepool.
  151. :return: Calculated quota in use by unmanaged servers
  152. '''
  153. flavors = self.listFlavorsById()
  154. used_quota = QuotaInformation()
  155. node_ids = set([n.id for n in self._zk.nodeIterator()])
  156. for server in self.listNodes():
  157. meta = server.get('metadata', {})
  158. nodepool_provider_name = meta.get('nodepool_provider_name')
  159. if (nodepool_provider_name and
  160. nodepool_provider_name == self.provider.name):
  161. # This provider (regardless of the launcher) owns this
  162. # server so it must not be accounted for unmanaged
  163. # quota; unless it has leaked.
  164. nodepool_node_id = meta.get('nodepool_node_id')
  165. # FIXME(tobiash): Add a test case for this
  166. if nodepool_node_id and nodepool_node_id in node_ids:
  167. # It has not leaked.
  168. continue
  169. flavor = flavors.get(server.flavor.id)
  170. used_quota.add(QuotaInformation.construct_from_flavor(flavor))
  171. return used_quota
  172. def resetClient(self):
  173. self._client = self._getClient()
  174. def _getFlavors(self):
  175. flavors = self.listFlavors()
  176. flavors.sort(key=operator.itemgetter('ram'))
  177. return flavors
  178. # TODO(gtema): These next three methods duplicate logic that is in
  179. # openstacksdk, caching is not enabled there by default
  180. # Remove it when caching is default
  181. def _findFlavorByName(self, flavor_name):
  182. for f in self._flavors:
  183. if flavor_name in (f['name'], f['id']):
  184. return f
  185. raise Exception("Unable to find flavor: %s" % flavor_name)
  186. def _findFlavorByRam(self, min_ram, flavor_name):
  187. for f in self._flavors:
  188. if (f['ram'] >= min_ram
  189. and (not flavor_name or flavor_name in f['name'])):
  190. return f
  191. raise Exception("Unable to find flavor with min ram: %s" % min_ram)
  192. def findFlavor(self, flavor_name, min_ram):
  193. # Note: this will throw an error if the provider is offline
  194. # but all the callers are in threads (they call in via CreateServer) so
  195. # the mainloop won't be affected.
  196. # TODO(gtema): enable commented block when openstacksdk has caching
  197. # enabled by default
  198. # if min_ram:
  199. # return self._client.get_flavor_by_ram(
  200. # ram=min_ram,
  201. # include=flavor_name,
  202. # get_extra=False)
  203. # else:
  204. # return self._client.get_flavor(flavor_name, get_extra=False)
  205. if min_ram:
  206. return self._findFlavorByRam(min_ram, flavor_name)
  207. else:
  208. return self._findFlavorByName(flavor_name)
  209. def findImage(self, name):
  210. if name in self._images:
  211. return self._images[name]
  212. image = self._client.get_image(name)
  213. self._images[name] = image
  214. return image
  215. def findNetwork(self, name):
  216. if name in self._networks:
  217. return self._networks[name]
  218. network = self._client.get_network(name)
  219. self._networks[name] = network
  220. return network
  221. def deleteImage(self, name):
  222. if name in self._images:
  223. del self._images[name]
  224. return self._client.delete_image(name)
  225. def createServer(self, name, image,
  226. flavor_name=None, min_ram=None,
  227. az=None, key_name=None, config_drive=True,
  228. nodepool_node_id=None, nodepool_node_label=None,
  229. nodepool_image_name=None,
  230. networks=None, security_groups=None,
  231. boot_from_volume=False, volume_size=50,
  232. instance_properties=None, userdata=None):
  233. if not networks:
  234. networks = []
  235. if not isinstance(image, dict):
  236. # if it's a dict, we already have the cloud id. If it's not,
  237. # we don't know if it's name or ID so need to look it up
  238. image = self.findImage(image)
  239. flavor = self.findFlavor(flavor_name=flavor_name, min_ram=min_ram)
  240. create_args = dict(name=name,
  241. image=image,
  242. flavor=flavor,
  243. config_drive=config_drive)
  244. if boot_from_volume:
  245. create_args['boot_from_volume'] = boot_from_volume
  246. create_args['volume_size'] = volume_size
  247. # NOTE(pabelanger): Always cleanup volumes when we delete a server.
  248. create_args['terminate_volume'] = True
  249. if key_name:
  250. create_args['key_name'] = key_name
  251. if az:
  252. create_args['availability_zone'] = az
  253. if security_groups:
  254. create_args['security_groups'] = security_groups
  255. if userdata:
  256. create_args['userdata'] = userdata
  257. nics = []
  258. for network in networks:
  259. net_id = self.findNetwork(network)['id']
  260. nics.append({'net-id': net_id})
  261. if nics:
  262. create_args['nics'] = nics
  263. # Put provider.name and image_name in as groups so that ansible
  264. # inventory can auto-create groups for us based on each of those
  265. # qualities
  266. # Also list each of those values directly so that non-ansible
  267. # consumption programs don't need to play a game of knowing that
  268. # groups[0] is the image name or anything silly like that.
  269. groups_list = [self.provider.name]
  270. if nodepool_image_name:
  271. groups_list.append(nodepool_image_name)
  272. if nodepool_node_label:
  273. groups_list.append(nodepool_node_label)
  274. meta = dict(
  275. groups=",".join(groups_list),
  276. nodepool_provider_name=self.provider.name,
  277. )
  278. # merge in any provided properties
  279. if instance_properties:
  280. meta = {**instance_properties, **meta}
  281. if nodepool_node_id:
  282. meta['nodepool_node_id'] = nodepool_node_id
  283. if nodepool_image_name:
  284. meta['nodepool_image_name'] = nodepool_image_name
  285. if nodepool_node_label:
  286. meta['nodepool_node_label'] = nodepool_node_label
  287. create_args['meta'] = meta
  288. try:
  289. return self._client.create_server(wait=False, **create_args)
  290. except openstack.exceptions.BadRequestException:
  291. # We've gotten a 400 error from nova - which means the request
  292. # was malformed. The most likely cause of that, unless something
  293. # became functionally and systemically broken, is stale az, image
  294. # or flavor cache. Log a message, invalidate the caches so that
  295. # next time we get new caches.
  296. self._images = {}
  297. self.__azs = None
  298. self.__flavors = {} # TODO(gtema): caching
  299. self.log.info(
  300. "Clearing az, flavor and image caches due to 400 error "
  301. "from nova")
  302. raise
  303. def getServer(self, server_id):
  304. return self._client.get_server(server_id)
  305. def getServerConsole(self, server_id):
  306. try:
  307. return self._client.get_server_console(server_id)
  308. except openstack.exceptions.OpenStackCloudException:
  309. return None
  310. def waitForServer(self, server, timeout=3600, auto_ip=True):
  311. return self._client.wait_for_server(
  312. server=server, auto_ip=auto_ip,
  313. reuse=False, timeout=timeout)
  314. def waitForNodeCleanup(self, server_id, timeout=600):
  315. for count in iterate_timeout(
  316. timeout, exceptions.ServerDeleteException,
  317. "server %s deletion" % server_id):
  318. if not self.getServer(server_id):
  319. return
  320. def createImage(self, server, image_name, meta):
  321. return self._client.create_image_snapshot(
  322. image_name, server, **meta)
  323. def getImage(self, image_id):
  324. return self._client.get_image(image_id)
  325. def labelReady(self, label):
  326. if not label.cloud_image:
  327. return False
  328. # If an image ID was supplied, we'll assume it is ready since
  329. # we don't currently have a way of validating that (except during
  330. # server creation).
  331. if label.cloud_image.image_id:
  332. return True
  333. image = self.getImage(label.cloud_image.external_name)
  334. if not image:
  335. self.log.warning(
  336. "Provider %s is configured to use %s as the"
  337. " cloud-image for label %s and that"
  338. " cloud-image could not be found in the"
  339. " cloud." % (self.provider.name,
  340. label.cloud_image.external_name,
  341. label.name))
  342. return False
  343. return True
  344. def uploadImage(self, image_name, filename, image_type=None, meta=None,
  345. md5=None, sha256=None):
  346. # configure glance and upload image. Note the meta flags
  347. # are provided as custom glance properties
  348. # NOTE: we have wait=True set here. This is not how we normally
  349. # do things in nodepool, preferring to poll ourselves thankyouverymuch.
  350. # However - two things to note:
  351. # - PUT has no aysnc mechanism, so we have to handle it anyway
  352. # - v2 w/task waiting is very strange and complex - but we have to
  353. # block for our v1 clouds anyway, so we might as well
  354. # have the interface be the same and treat faking-out
  355. # a openstacksdk-level fake-async interface later
  356. if not meta:
  357. meta = {}
  358. if image_type:
  359. meta['disk_format'] = image_type
  360. image = self._client.create_image(
  361. name=image_name,
  362. filename=filename,
  363. is_public=False,
  364. wait=True,
  365. md5=md5,
  366. sha256=sha256,
  367. **meta)
  368. return image.id
  369. def listImages(self):
  370. return self._client.list_images()
  371. def listFlavors(self):
  372. return self._client.list_flavors(get_extra=False)
  373. def listFlavorsById(self):
  374. flavors = {}
  375. for flavor in self._client.list_flavors(get_extra=False):
  376. flavors[flavor.id] = flavor
  377. return flavors
  378. def listNodes(self):
  379. # list_servers carries the nodepool server list caching logic
  380. return self._client.list_servers()
  381. def deleteServer(self, server_id):
  382. return self._client.delete_server(server_id, delete_ips=True)
  383. def cleanupNode(self, server_id):
  384. server = self.getServer(server_id)
  385. if not server:
  386. raise exceptions.NotFound()
  387. self.log.debug('Deleting server %s' % server_id)
  388. self.deleteServer(server_id)
  389. def cleanupLeakedResources(self):
  390. '''
  391. Delete any leaked server instances.
  392. Remove any servers found in this provider that are not recorded in
  393. the ZooKeeper data.
  394. '''
  395. deleting_nodes = {}
  396. for node in self._zk.nodeIterator():
  397. if node.state == zk.DELETING:
  398. if node.provider != self.provider.name:
  399. continue
  400. if node.provider not in deleting_nodes:
  401. deleting_nodes[node.provider] = []
  402. deleting_nodes[node.provider].append(node.external_id)
  403. for server in self.listNodes():
  404. meta = server.get('metadata', {})
  405. if 'nodepool_provider_name' not in meta:
  406. continue
  407. if meta['nodepool_provider_name'] != self.provider.name:
  408. # Another launcher, sharing this provider but configured
  409. # with a different name, owns this.
  410. continue
  411. if (self.provider.name in deleting_nodes and
  412. server.id in deleting_nodes[self.provider.name]):
  413. # Already deleting this node
  414. continue
  415. if not self._zk.getNode(meta['nodepool_node_id']):
  416. self.log.warning(
  417. "Marking for delete leaked instance %s (%s) in %s "
  418. "(unknown node id %s)",
  419. server.name, server.id, self.provider.name,
  420. meta['nodepool_node_id']
  421. )
  422. # Create an artifical node to use for deleting the server.
  423. node = zk.Node()
  424. node.external_id = server.id
  425. node.provider = self.provider.name
  426. node.state = zk.DELETING
  427. self._zk.storeNode(node)
  428. if self.provider.clean_floating_ips:
  429. self._client.delete_unattached_floating_ips()
  430. def getAZs(self):
  431. if self.__azs is None:
  432. self.__azs = self._client.list_availability_zone_names()
  433. if not self.__azs:
  434. # If there are no zones, return a list containing None so that
  435. # random.choice can pick None and pass that to Nova. If this
  436. # feels dirty, please direct your ire to policy.json and the
  437. # ability to turn off random portions of the OpenStack API.
  438. self.__azs = [None]
  439. return self.__azs