Manage a pool of nodes for a distributed test infrastructure
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

__init__.py 20KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573
  1. # Copyright (C) 2013 Hewlett-Packard Development Company, L.P.
  2. # Copyright (C) 2014 OpenStack Foundation
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. """Common utilities used in testing"""
  16. import glob
  17. import itertools
  18. import logging
  19. import os
  20. import random
  21. import select
  22. import string
  23. import socket
  24. import subprocess
  25. import threading
  26. import tempfile
  27. import time
  28. import fixtures
  29. import kazoo.client
  30. import testtools
  31. from nodepool import builder
  32. from nodepool import launcher
  33. from nodepool import webapp
  34. from nodepool import zk
  35. from nodepool.cmd.config_validator import ConfigValidator
  36. TRUE_VALUES = ('true', '1', 'yes')
  37. class LoggingPopen(subprocess.Popen):
  38. pass
  39. class ZookeeperServerFixture(fixtures.Fixture):
  40. def _setUp(self):
  41. zk_host = os.environ.get('NODEPOOL_ZK_HOST', 'localhost')
  42. if ':' in zk_host:
  43. host, port = zk_host.split(':')
  44. else:
  45. host = zk_host
  46. port = None
  47. self.zookeeper_host = host
  48. if not port:
  49. self.zookeeper_port = 2181
  50. else:
  51. self.zookeeper_port = int(port)
  52. class ChrootedKazooFixture(fixtures.Fixture):
  53. def __init__(self, zookeeper_host, zookeeper_port):
  54. super(ChrootedKazooFixture, self).__init__()
  55. self.zookeeper_host = zookeeper_host
  56. self.zookeeper_port = zookeeper_port
  57. def _setUp(self):
  58. # Make sure the test chroot paths do not conflict
  59. random_bits = ''.join(random.choice(string.ascii_lowercase +
  60. string.ascii_uppercase)
  61. for x in range(8))
  62. rand_test_path = '%s_%s' % (random_bits, os.getpid())
  63. self.zookeeper_chroot = "/nodepool_test/%s" % rand_test_path
  64. # Ensure the chroot path exists and clean up any pre-existing znodes.
  65. _tmp_client = kazoo.client.KazooClient(
  66. hosts='%s:%s' % (self.zookeeper_host, self.zookeeper_port))
  67. _tmp_client.start()
  68. if _tmp_client.exists(self.zookeeper_chroot):
  69. _tmp_client.delete(self.zookeeper_chroot, recursive=True)
  70. _tmp_client.ensure_path(self.zookeeper_chroot)
  71. _tmp_client.stop()
  72. _tmp_client.close()
  73. self.addCleanup(self._cleanup)
  74. def _cleanup(self):
  75. '''Remove the chroot path.'''
  76. # Need a non-chroot'ed client to remove the chroot path
  77. _tmp_client = kazoo.client.KazooClient(
  78. hosts='%s:%s' % (self.zookeeper_host, self.zookeeper_port))
  79. _tmp_client.start()
  80. _tmp_client.delete(self.zookeeper_chroot, recursive=True)
  81. _tmp_client.stop()
  82. _tmp_client.close()
  83. class StatsdFixture(fixtures.Fixture):
  84. def _setUp(self):
  85. self.running = True
  86. self.thread = threading.Thread(target=self.run)
  87. self.thread.daemon = True
  88. self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  89. self.sock.bind(('', 0))
  90. self.port = self.sock.getsockname()[1]
  91. self.wake_read, self.wake_write = os.pipe()
  92. self.stats = []
  93. self.thread.start()
  94. self.addCleanup(self._cleanup)
  95. def run(self):
  96. while self.running:
  97. poll = select.poll()
  98. poll.register(self.sock, select.POLLIN)
  99. poll.register(self.wake_read, select.POLLIN)
  100. ret = poll.poll()
  101. for (fd, event) in ret:
  102. if fd == self.sock.fileno():
  103. data = self.sock.recvfrom(1024)
  104. if not data:
  105. return
  106. self.stats.append(data[0])
  107. if fd == self.wake_read:
  108. return
  109. def _cleanup(self):
  110. self.running = False
  111. os.write(self.wake_write, b'1\n')
  112. self.thread.join()
  113. class BaseTestCase(testtools.TestCase):
  114. def setUp(self):
  115. super(BaseTestCase, self).setUp()
  116. test_timeout = os.environ.get('OS_TEST_TIMEOUT', 60)
  117. try:
  118. test_timeout = int(test_timeout)
  119. except ValueError:
  120. # If timeout value is invalid, fail hard.
  121. print("OS_TEST_TIMEOUT set to invalid value"
  122. " defaulting to no timeout")
  123. test_timeout = 0
  124. if test_timeout > 0:
  125. self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
  126. if os.environ.get('OS_STDOUT_CAPTURE') in TRUE_VALUES:
  127. stdout = self.useFixture(fixtures.StringStream('stdout')).stream
  128. self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
  129. if os.environ.get('OS_STDERR_CAPTURE') in TRUE_VALUES:
  130. stderr = self.useFixture(fixtures.StringStream('stderr')).stream
  131. self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
  132. if os.environ.get('OS_LOG_CAPTURE') in TRUE_VALUES:
  133. fs = '%(asctime)s %(levelname)s [%(name)s] %(message)s'
  134. self.useFixture(fixtures.FakeLogger(level=logging.DEBUG,
  135. format=fs))
  136. else:
  137. logging.basicConfig(level=logging.DEBUG)
  138. l = logging.getLogger('kazoo')
  139. l.setLevel(logging.INFO)
  140. l.propagate = False
  141. l = logging.getLogger('stevedore')
  142. l.setLevel(logging.INFO)
  143. l.propagate = False
  144. self.useFixture(fixtures.NestedTempfile())
  145. self.subprocesses = []
  146. def LoggingPopenFactory(*args, **kw):
  147. p = LoggingPopen(*args, **kw)
  148. self.subprocesses.append(p)
  149. return p
  150. self.statsd = StatsdFixture()
  151. self.useFixture(self.statsd)
  152. # note, use 127.0.0.1 rather than localhost to avoid getting ipv6
  153. # see: https://github.com/jsocol/pystatsd/issues/61
  154. os.environ['STATSD_HOST'] = '127.0.0.1'
  155. os.environ['STATSD_PORT'] = str(self.statsd.port)
  156. self.useFixture(fixtures.MonkeyPatch('subprocess.Popen',
  157. LoggingPopenFactory))
  158. self.setUpFakes()
  159. def setUpFakes(self):
  160. clouds_path = os.path.join(os.path.dirname(__file__),
  161. 'fixtures', 'clouds.yaml')
  162. self.useFixture(fixtures.MonkeyPatch(
  163. 'openstack.config.loader.CONFIG_FILES', [clouds_path]))
  164. def wait_for_threads(self):
  165. # Wait until all transient threads (node launches, deletions,
  166. # etc.) are all complete. Whitelist any long-running threads.
  167. whitelist = ['MainThread',
  168. 'NodePool',
  169. 'NodePool Builder',
  170. 'fake-provider',
  171. 'fake-provider1',
  172. 'fake-provider2',
  173. 'fake-provider3',
  174. 'CleanupWorker',
  175. 'DeletedNodeWorker',
  176. 'StatsWorker',
  177. 'pydevd.CommandThread',
  178. 'pydevd.Reader',
  179. 'pydevd.Writer',
  180. ]
  181. while True:
  182. done = True
  183. for t in threading.enumerate():
  184. if t.name.startswith("Thread-"):
  185. # Kazoo
  186. continue
  187. if t.name.startswith("worker "):
  188. # paste web server
  189. continue
  190. if t.name.startswith("UploadWorker"):
  191. continue
  192. if t.name.startswith("BuildWorker"):
  193. continue
  194. if t.name.startswith("CleanupWorker"):
  195. continue
  196. if t.name.startswith("PoolWorker"):
  197. continue
  198. if t.name not in whitelist:
  199. done = False
  200. if done:
  201. return
  202. time.sleep(0.1)
  203. def assertReportedStat(self, key, value=None, kind=None):
  204. """Check statsd output
  205. Check statsd return values. A ``value`` should specify a
  206. ``kind``, however a ``kind`` may be specified without a
  207. ``value`` for a generic match. Leave both empy to just check
  208. for key presence.
  209. :arg str key: The statsd key
  210. :arg str value: The expected value of the metric ``key``
  211. :arg str kind: The expected type of the metric ``key`` For example
  212. - ``c`` counter
  213. - ``g`` gauge
  214. - ``ms`` timing
  215. - ``s`` set
  216. """
  217. if value:
  218. self.assertNotEqual(kind, None)
  219. start = time.time()
  220. while time.time() < (start + 5):
  221. # Note our fake statsd just queues up results in a queue.
  222. # We just keep going through them until we find one that
  223. # matches, or fail out. If statsd pipelines are used,
  224. # large single packets are sent with stats separated by
  225. # newlines; thus we first flatten the stats out into
  226. # single entries.
  227. stats = itertools.chain.from_iterable(
  228. [s.decode('utf-8').split('\n') for s in self.statsd.stats])
  229. for stat in stats:
  230. k, v = stat.split(':')
  231. if key == k:
  232. if kind is None:
  233. # key with no qualifiers is found
  234. return True
  235. s_value, s_kind = v.split('|')
  236. # if no kind match, look for other keys
  237. if kind != s_kind:
  238. continue
  239. if value:
  240. # special-case value|ms because statsd can turn
  241. # timing results into float of indeterminate
  242. # length, hence foiling string matching.
  243. if kind == 'ms':
  244. if float(value) == float(s_value):
  245. return True
  246. if value == s_value:
  247. return True
  248. # otherwise keep looking for other matches
  249. continue
  250. # this key matches
  251. return True
  252. time.sleep(0.1)
  253. raise Exception("Key %s not found in reported stats" % key)
  254. class BuilderFixture(fixtures.Fixture):
  255. def __init__(self, configfile, cleanup_interval, securefile=None):
  256. super(BuilderFixture, self).__init__()
  257. self.configfile = configfile
  258. self.securefile = securefile
  259. self.cleanup_interval = cleanup_interval
  260. self.builder = None
  261. def setUp(self):
  262. super(BuilderFixture, self).setUp()
  263. self.builder = builder.NodePoolBuilder(
  264. self.configfile, secure_path=self.securefile)
  265. self.builder.cleanup_interval = self.cleanup_interval
  266. self.builder.build_interval = .1
  267. self.builder.upload_interval = .1
  268. self.builder.dib_cmd = 'nodepool/tests/fake-image-create'
  269. self.builder.start()
  270. self.addCleanup(self.cleanup)
  271. def cleanup(self):
  272. self.builder.stop()
  273. class DBTestCase(BaseTestCase):
  274. def setUp(self):
  275. super(DBTestCase, self).setUp()
  276. self.log = logging.getLogger("tests")
  277. self.setupZK()
  278. def setup_config(self, filename, images_dir=None, context_name=None):
  279. if images_dir is None:
  280. images_dir = fixtures.TempDir()
  281. self.useFixture(images_dir)
  282. build_log_dir = fixtures.TempDir()
  283. self.useFixture(build_log_dir)
  284. configfile = os.path.join(os.path.dirname(__file__),
  285. 'fixtures', filename)
  286. (fd, path) = tempfile.mkstemp()
  287. with open(configfile, 'rb') as conf_fd:
  288. config = conf_fd.read().decode('utf8')
  289. data = config.format(images_dir=images_dir.path,
  290. build_log_dir=build_log_dir.path,
  291. context_name=context_name,
  292. zookeeper_host=self.zookeeper_host,
  293. zookeeper_port=self.zookeeper_port,
  294. zookeeper_chroot=self.zookeeper_chroot)
  295. os.write(fd, data.encode('utf8'))
  296. os.close(fd)
  297. self._config_images_dir = images_dir
  298. self._config_build_log_dir = build_log_dir
  299. validator = ConfigValidator(path)
  300. validator.validate()
  301. return path
  302. def replace_config(self, configfile, filename):
  303. self.log.debug("Replacing config with %s", filename)
  304. new_configfile = self.setup_config(filename, self._config_images_dir)
  305. os.rename(new_configfile, configfile)
  306. def setup_secure(self, filename):
  307. # replace entries in secure.conf
  308. configfile = os.path.join(os.path.dirname(__file__),
  309. 'fixtures', filename)
  310. (fd, path) = tempfile.mkstemp()
  311. with open(configfile, 'rb') as conf_fd:
  312. config = conf_fd.read().decode('utf8')
  313. data = config.format(
  314. zookeeper_host=self.zookeeper_host,
  315. zookeeper_port=self.zookeeper_port,
  316. zookeeper_chroot=self.zookeeper_chroot)
  317. os.write(fd, data.encode('utf8'))
  318. os.close(fd)
  319. return path
  320. def wait_for_config(self, pool):
  321. for x in range(300):
  322. if pool.config is not None:
  323. return
  324. time.sleep(0.1)
  325. def waitForImage(self, provider_name, image_name, ignore_list=None):
  326. while True:
  327. self.wait_for_threads()
  328. image = self.zk.getMostRecentImageUpload(image_name, provider_name)
  329. if image:
  330. if ignore_list and image not in ignore_list:
  331. break
  332. elif not ignore_list:
  333. break
  334. time.sleep(1)
  335. self.wait_for_threads()
  336. return image
  337. def waitForUploadRecordDeletion(self, provider_name, image_name,
  338. build_id, upload_id):
  339. while True:
  340. self.wait_for_threads()
  341. uploads = self.zk.getUploads(image_name, build_id, provider_name)
  342. if not uploads or upload_id not in [u.id for u in uploads]:
  343. break
  344. time.sleep(1)
  345. self.wait_for_threads()
  346. def waitForImageDeletion(self, provider_name, image_name, match=None):
  347. while True:
  348. self.wait_for_threads()
  349. image = self.zk.getMostRecentImageUpload(image_name, provider_name)
  350. if not image or (match and image != match):
  351. break
  352. time.sleep(1)
  353. self.wait_for_threads()
  354. def waitForBuild(self, image_name, build_id, states=None):
  355. if states is None:
  356. states = (zk.READY,)
  357. base = "-".join([image_name, build_id])
  358. while True:
  359. self.wait_for_threads()
  360. build = self.zk.getBuild(image_name, build_id)
  361. if build and build.state in states:
  362. break
  363. time.sleep(1)
  364. # We should only expect a dib manifest with a successful build.
  365. while build.state == zk.READY:
  366. self.wait_for_threads()
  367. files = builder.DibImageFile.from_image_id(
  368. self._config_images_dir.path, base)
  369. if files:
  370. break
  371. time.sleep(1)
  372. self.wait_for_threads()
  373. return build
  374. def waitForBuildDeletion(self, image_name, build_id):
  375. base = "-".join([image_name, build_id])
  376. while True:
  377. self.wait_for_threads()
  378. files = builder.DibImageFile.from_image_id(
  379. self._config_images_dir.path, base)
  380. if not files:
  381. break
  382. time.sleep(1)
  383. while True:
  384. self.wait_for_threads()
  385. # Now, check the disk to ensure we didn't leak any files.
  386. matches = glob.glob('%s/%s.*' % (self._config_images_dir.path,
  387. base))
  388. if not matches:
  389. break
  390. time.sleep(1)
  391. while True:
  392. self.wait_for_threads()
  393. build = self.zk.getBuild(image_name, build_id)
  394. if not build:
  395. break
  396. time.sleep(1)
  397. self.wait_for_threads()
  398. def waitForNodeDeletion(self, node):
  399. while True:
  400. exists = False
  401. for n in self.zk.nodeIterator():
  402. if node.id == n.id:
  403. exists = True
  404. break
  405. if not exists:
  406. break
  407. time.sleep(1)
  408. def waitForInstanceDeletion(self, manager, instance_id):
  409. while True:
  410. servers = manager.listNodes()
  411. if not (instance_id in [s.id for s in servers]):
  412. break
  413. time.sleep(1)
  414. def waitForNodeRequestLockDeletion(self, request_id):
  415. while True:
  416. exists = False
  417. for lock_id in self.zk.getNodeRequestLockIDs():
  418. if request_id == lock_id:
  419. exists = True
  420. break
  421. if not exists:
  422. break
  423. time.sleep(1)
  424. def waitForNodes(self, label, count=1):
  425. while True:
  426. self.wait_for_threads()
  427. ready_nodes = self.zk.getReadyNodesOfTypes([label])
  428. if label in ready_nodes and len(ready_nodes[label]) == count:
  429. break
  430. time.sleep(1)
  431. self.wait_for_threads()
  432. return ready_nodes[label]
  433. def waitForNodeRequest(self, req, states=None):
  434. '''
  435. Wait for a node request to transition to a final state.
  436. '''
  437. if states is None:
  438. states = (zk.FULFILLED, zk.FAILED)
  439. while True:
  440. req = self.zk.getNodeRequest(req.id)
  441. if req.state in states:
  442. break
  443. time.sleep(1)
  444. return req
  445. def useNodepool(self, *args, **kwargs):
  446. secure_conf = kwargs.pop('secure_conf', None)
  447. args = (secure_conf,) + args
  448. pool = launcher.NodePool(*args, **kwargs)
  449. pool.cleanup_interval = .5
  450. pool.delete_interval = .5
  451. self.addCleanup(pool.stop)
  452. return pool
  453. def useWebApp(self, *args, **kwargs):
  454. app = webapp.WebApp(*args, **kwargs)
  455. self.addCleanup(app.stop)
  456. return app
  457. def useBuilder(self, configfile, securefile=None, cleanup_interval=.5):
  458. self.useFixture(
  459. BuilderFixture(configfile, cleanup_interval, securefile)
  460. )
  461. def setupZK(self):
  462. f = ZookeeperServerFixture()
  463. self.useFixture(f)
  464. self.zookeeper_host = f.zookeeper_host
  465. self.zookeeper_port = f.zookeeper_port
  466. kz_fxtr = self.useFixture(ChrootedKazooFixture(
  467. self.zookeeper_host,
  468. self.zookeeper_port))
  469. self.zookeeper_chroot = kz_fxtr.zookeeper_chroot
  470. self.zk = zk.ZooKeeper(enable_cache=False)
  471. host = zk.ZooKeeperConnectionConfig(
  472. self.zookeeper_host, self.zookeeper_port, self.zookeeper_chroot
  473. )
  474. self.zk.connect([host])
  475. self.addCleanup(self.zk.disconnect)
  476. def printZKTree(self, node):
  477. def join(a, b):
  478. if a.endswith('/'):
  479. return a + b
  480. return a + '/' + b
  481. data, stat = self.zk.client.get(node)
  482. self.log.debug("Node: %s" % (node,))
  483. if data:
  484. self.log.debug(data)
  485. for child in self.zk.client.get_children(node):
  486. self.printZKTree(join(node, child))
  487. class IntegrationTestCase(DBTestCase):
  488. def setUpFakes(self):
  489. pass