Manage a pool of nodes for a distributed test infrastructure
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

handler.py 4.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. # Copyright 2018 Red Hat
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  4. # not use this file except in compliance with the License. You may obtain
  5. # a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  11. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  12. # License for the specific language governing permissions and limitations
  13. # under the License.
  14. import logging
  15. from kazoo import exceptions as kze
  16. from nodepool import exceptions
  17. from nodepool import zk
  18. from nodepool.driver.utils import NodeLauncher
  19. from nodepool.driver import NodeRequestHandler
  20. class OpenShiftLauncher(NodeLauncher):
  21. def __init__(self, handler, node, provider_config, provider_label):
  22. super().__init__(handler.zk, node, provider_config)
  23. self.handler = handler
  24. self.zk = handler.zk
  25. self.label = provider_label
  26. self._retries = provider_config.launch_retries
  27. def _launchLabel(self):
  28. self.log.debug("Creating resource")
  29. project = "%s-%s" % (self.handler.pool.name, self.node.id)
  30. self.node.external_id = self.handler.manager.createProject(project)
  31. self.zk.storeNode(self.node)
  32. resource = self.handler.manager.prepareProject(project)
  33. if self.label.type == "pod":
  34. self.handler.manager.createPod(
  35. project, self.label)
  36. resource['pod'] = self.label.name
  37. self.node.connection_type = "kubectl"
  38. self.node.interface_ip = self.label.name
  39. else:
  40. self.node.connection_type = "project"
  41. self.node.state = zk.READY
  42. # NOTE: resource access token may be encrypted here
  43. self.node.connection_port = resource
  44. self.zk.storeNode(self.node)
  45. self.log.info("Resource %s is ready", project)
  46. def launch(self):
  47. attempts = 1
  48. while attempts <= self._retries:
  49. try:
  50. self._launchLabel()
  51. break
  52. except kze.SessionExpiredError:
  53. # If we lost our ZooKeeper session, we've lost our node lock
  54. # so there's no need to continue.
  55. raise
  56. except Exception as e:
  57. if attempts <= self._retries:
  58. self.log.exception(
  59. "Launch attempt %d/%d failed for node %s:",
  60. attempts, self._retries, self.node.id)
  61. # If we created an instance, delete it.
  62. if self.node.external_id:
  63. self.handler.manager.cleanupNode(self.node.external_id)
  64. self.handler.manager.waitForNodeCleanup(
  65. self.node.external_id)
  66. self.node.external_id = None
  67. self.node.interface_ip = None
  68. self.zk.storeNode(self.node)
  69. if 'exceeded quota' in str(e).lower():
  70. self.log.info("%s: quota exceeded", self.node.id)
  71. raise exceptions.QuotaException("Quota exceeded")
  72. if attempts == self._retries:
  73. raise
  74. attempts += 1
  75. class OpenshiftNodeRequestHandler(NodeRequestHandler):
  76. log = logging.getLogger("nodepool.driver.openshift."
  77. "OpenshiftNodeRequestHandler")
  78. def __init__(self, pw, request):
  79. super().__init__(pw, request)
  80. self._threads = []
  81. @property
  82. def alive_thread_count(self):
  83. count = 0
  84. for t in self._threads:
  85. if t.isAlive():
  86. count += 1
  87. return count
  88. def imagesAvailable(self):
  89. return True
  90. def launchesComplete(self):
  91. '''
  92. Check if all launch requests have completed.
  93. When all of the Node objects have reached a final state (READY or
  94. FAILED), we'll know all threads have finished the launch process.
  95. '''
  96. if not self._threads:
  97. return True
  98. # Give the NodeLaunch threads time to finish.
  99. if self.alive_thread_count:
  100. return False
  101. node_states = [node.state for node in self.nodeset]
  102. # NOTE: It very important that NodeLauncher always sets one of
  103. # these states, no matter what.
  104. if not all(s in (zk.READY, zk.FAILED, zk.ABORTED)
  105. for s in node_states):
  106. return False
  107. return True
  108. def hasRemainingQuota(self, node_types):
  109. if len(self.manager.listNodes()) + 1 > self.provider.max_projects:
  110. return False
  111. return True
  112. def launch(self, node):
  113. label = self.pool.labels[node.type[0]]
  114. thd = OpenShiftLauncher(self, node, self.provider, label)
  115. thd.start()
  116. self._threads.append(thd)