Manage a pool of nodes for a distributed test infrastructure
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

__init__.py 21KB


  1. # Copyright (C) 2013 Hewlett-Packard Development Company, L.P.
  2. # Copyright (C) 2014 OpenStack Foundation
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. """Common utilities used in testing"""
  16. import glob
  17. import itertools
  18. import kubernetes.config.kube_config
  19. import logging
  20. import os
  21. import random
  22. import select
  23. import string
  24. import socket
  25. import subprocess
  26. import threading
  27. import tempfile
  28. import time
  29. import fixtures
  30. import kazoo.client
  31. import testtools
  32. from nodepool import builder
  33. from nodepool import launcher
  34. from nodepool import webapp
  35. from nodepool import zk
  36. from nodepool.cmd.config_validator import ConfigValidator
  37. TRUE_VALUES = ('true', '1', 'yes')
  38. class LoggingPopen(subprocess.Popen):
  39. pass
  40. class ZookeeperServerFixture(fixtures.Fixture):
  41. def _setUp(self):
  42. zk_host = os.environ.get('NODEPOOL_ZK_HOST', 'localhost')
  43. if ':' in zk_host:
  44. host, port = zk_host.split(':')
  45. else:
  46. host = zk_host
  47. port = None
  48. self.zookeeper_host = host
  49. if not port:
  50. self.zookeeper_port = 2181
  51. else:
  52. self.zookeeper_port = int(port)
  53. class ChrootedKazooFixture(fixtures.Fixture):
  54. def __init__(self, zookeeper_host, zookeeper_port):
  55. super(ChrootedKazooFixture, self).__init__()
  56. self.zookeeper_host = zookeeper_host
  57. self.zookeeper_port = zookeeper_port
  58. def _setUp(self):
  59. # Make sure the test chroot paths do not conflict
  60. random_bits = ''.join(random.choice(string.ascii_lowercase +
  61. string.ascii_uppercase)
  62. for x in range(8))
  63. rand_test_path = '%s_%s' % (random_bits, os.getpid())
  64. self.zookeeper_chroot = "/nodepool_test/%s" % rand_test_path
  65. # Ensure the chroot path exists and clean up any pre-existing znodes.
  66. _tmp_client = kazoo.client.KazooClient(
  67. hosts='%s:%s' % (self.zookeeper_host, self.zookeeper_port))
  68. _tmp_client.start()
  69. if _tmp_client.exists(self.zookeeper_chroot):
  70. _tmp_client.delete(self.zookeeper_chroot, recursive=True)
  71. _tmp_client.ensure_path(self.zookeeper_chroot)
  72. _tmp_client.stop()
  73. _tmp_client.close()
  74. self.addCleanup(self._cleanup)
  75. def _cleanup(self):
  76. '''Remove the chroot path.'''
  77. # Need a non-chroot'ed client to remove the chroot path
  78. _tmp_client = kazoo.client.KazooClient(
  79. hosts='%s:%s' % (self.zookeeper_host, self.zookeeper_port))
  80. _tmp_client.start()
  81. _tmp_client.delete(self.zookeeper_chroot, recursive=True)
  82. _tmp_client.stop()
  83. _tmp_client.close()
  84. class StatsdFixture(fixtures.Fixture):
  85. def _setUp(self):
  86. self.running = True
  87. self.thread = threading.Thread(target=self.run)
  88. self.thread.daemon = True
  89. self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  90. self.sock.bind(('', 0))
  91. self.port = self.sock.getsockname()[1]
  92. self.wake_read, self.wake_write = os.pipe()
  93. self.stats = []
  94. self.thread.start()
  95. self.addCleanup(self._cleanup)
  96. def run(self):
  97. while self.running:
  98. poll = select.poll()
  99. poll.register(self.sock, select.POLLIN)
  100. poll.register(self.wake_read, select.POLLIN)
  101. ret = poll.poll()
  102. for (fd, event) in ret:
  103. if fd == self.sock.fileno():
  104. data = self.sock.recvfrom(1024)
  105. if not data:
  106. return
  107. self.stats.append(data[0])
  108. if fd == self.wake_read:
  109. return
  110. def _cleanup(self):
  111. self.running = False
  112. os.write(self.wake_write, b'1\n')
  113. self.thread.join()
  114. class BaseTestCase(testtools.TestCase):
  115. def setUp(self):
  116. super(BaseTestCase, self).setUp()
  117. test_timeout = os.environ.get('OS_TEST_TIMEOUT', 60)
  118. try:
  119. test_timeout = int(test_timeout)
  120. except ValueError:
  121. # If timeout value is invalid, fail hard.
  122. print("OS_TEST_TIMEOUT set to invalid value"
  123. " defaulting to no timeout")
  124. test_timeout = 0
  125. if test_timeout > 0:
  126. self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
  127. if os.environ.get('OS_STDOUT_CAPTURE') in TRUE_VALUES:
  128. stdout = self.useFixture(fixtures.StringStream('stdout')).stream
  129. self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
  130. if os.environ.get('OS_STDERR_CAPTURE') in TRUE_VALUES:
  131. stderr = self.useFixture(fixtures.StringStream('stderr')).stream
  132. self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
  133. if os.environ.get('OS_LOG_CAPTURE') in TRUE_VALUES:
  134. fs = '%(asctime)s %(levelname)s [%(name)s] %(message)s'
  135. self.useFixture(fixtures.FakeLogger(level=logging.DEBUG,
  136. format=fs))
  137. else:
  138. logging.basicConfig(level=logging.DEBUG)
  139. l = logging.getLogger('kazoo')
  140. l.setLevel(logging.INFO)
  141. l.propagate = False
  142. l = logging.getLogger('stevedore')
  143. l.setLevel(logging.INFO)
  144. l.propagate = False
  145. self.useFixture(fixtures.NestedTempfile())
  146. self.subprocesses = []
  147. def LoggingPopenFactory(*args, **kw):
  148. p = LoggingPopen(*args, **kw)
  149. self.subprocesses.append(p)
  150. return p
  151. self.statsd = StatsdFixture()
  152. self.useFixture(self.statsd)
  153. # note, use 127.0.0.1 rather than localhost to avoid getting ipv6
  154. # see: https://github.com/jsocol/pystatsd/issues/61
  155. os.environ['STATSD_HOST'] = '127.0.0.1'
  156. os.environ['STATSD_PORT'] = str(self.statsd.port)
  157. self.useFixture(fixtures.MonkeyPatch('subprocess.Popen',
  158. LoggingPopenFactory))
  159. self.setUpFakes()
  160. self.addCleanup(self._cleanup)
  161. def _cleanup(self):
  162. # This is a hack to cleanup kubernetes temp files during test runs.
  163. # The kube_config maintains a global dict of temporary files. During
  164. # running the tests those can get deleted during the cleanup phase of
  165. # the tests without kube_config knowing about this so forcefully tell
  166. # kube_config to clean this up.
  167. kubernetes.config.kube_config._cleanup_temp_files()
  168. def setUpFakes(self):
  169. clouds_path = os.path.join(os.path.dirname(__file__),
  170. 'fixtures', 'clouds.yaml')
  171. self.useFixture(fixtures.MonkeyPatch(
  172. 'openstack.config.loader.CONFIG_FILES', [clouds_path]))
  173. def wait_for_threads(self):
  174. # Wait until all transient threads (node launches, deletions,
  175. # etc.) are all complete. Whitelist any long-running threads.
  176. whitelist = ['MainThread',
  177. 'NodePool',
  178. 'NodePool Builder',
  179. 'fake-provider',
  180. 'fake-provider1',
  181. 'fake-provider2',
  182. 'fake-provider3',
  183. 'CleanupWorker',
  184. 'DeletedNodeWorker',
  185. 'StatsWorker',
  186. 'pydevd.CommandThread',
  187. 'pydevd.Reader',
  188. 'pydevd.Writer',
  189. ]
  190. while True:
  191. done = True
  192. for t in threading.enumerate():
  193. if t.name.startswith("Thread-"):
  194. # Kazoo
  195. continue
  196. if t.name.startswith("worker "):
  197. # paste web server
  198. continue
  199. if t.name.startswith("UploadWorker"):
  200. continue
  201. if t.name.startswith("BuildWorker"):
  202. continue
  203. if t.name.startswith("CleanupWorker"):
  204. continue
  205. if t.name.startswith("PoolWorker"):
  206. continue
  207. if t.name not in whitelist:
  208. done = False
  209. if done:
  210. return
  211. time.sleep(0.1)
  212. def assertReportedStat(self, key, value=None, kind=None):
  213. """Check statsd output
  214. Check statsd return values. A ``value`` should specify a
  215. ``kind``, however a ``kind`` may be specified without a
  216. ``value`` for a generic match. Leave both empy to just check
  217. for key presence.
  218. :arg str key: The statsd key
  219. :arg str value: The expected value of the metric ``key``
  220. :arg str kind: The expected type of the metric ``key`` For example
  221. - ``c`` counter
  222. - ``g`` gauge
  223. - ``ms`` timing
  224. - ``s`` set
  225. """
  226. if value:
  227. self.assertNotEqual(kind, None)
  228. start = time.time()
  229. while time.time() < (start + 5):
  230. # Note our fake statsd just queues up results in a queue.
  231. # We just keep going through them until we find one that
  232. # matches, or fail out. If statsd pipelines are used,
  233. # large single packets are sent with stats separated by
  234. # newlines; thus we first flatten the stats out into
  235. # single entries.
  236. stats = itertools.chain.from_iterable(
  237. [s.decode('utf-8').split('\n') for s in self.statsd.stats])
  238. for stat in stats:
  239. k, v = stat.split(':')
  240. if key == k:
  241. if kind is None:
  242. # key with no qualifiers is found
  243. return True
  244. s_value, s_kind = v.split('|')
  245. # if no kind match, look for other keys
  246. if kind != s_kind:
  247. continue
  248. if value:
  249. # special-case value|ms because statsd can turn
  250. # timing results into float of indeterminate
  251. # length, hence foiling string matching.
  252. if kind == 'ms':
  253. if float(value) == float(s_value):
  254. return True
  255. if value == s_value:
  256. return True
  257. # otherwise keep looking for other matches
  258. continue
  259. # this key matches
  260. return True
  261. time.sleep(0.1)
  262. raise Exception("Key %s not found in reported stats" % key)
  263. class BuilderFixture(fixtures.Fixture):
  264. log = logging.getLogger("tests.BuilderFixture")
  265. def __init__(self, configfile, cleanup_interval, securefile=None,
  266. num_uploaders=1):
  267. super(BuilderFixture, self).__init__()
  268. self.configfile = configfile
  269. self.securefile = securefile
  270. self.cleanup_interval = cleanup_interval
  271. self.builder = None
  272. self.num_uploaders = num_uploaders
  273. def setUp(self):
  274. super(BuilderFixture, self).setUp()
  275. self.builder = builder.NodePoolBuilder(
  276. self.configfile, secure_path=self.securefile,
  277. num_uploaders=self.num_uploaders)
  278. self.builder.cleanup_interval = self.cleanup_interval
  279. self.builder.build_interval = .1
  280. self.builder.upload_interval = .1
  281. self.builder.start()
  282. self.addCleanup(self.cleanup)
  283. def cleanup(self):
  284. # The NodePoolBuilder.stop() method does not intentionally stop the
  285. # upload workers for reasons documented in that method. But we can
  286. # safely do so in tests.
  287. for worker in self.builder._upload_workers:
  288. worker.shutdown()
  289. worker.join()
  290. self.log.debug("Stopped worker %s", worker.name)
  291. self.builder.stop()
  292. class DBTestCase(BaseTestCase):
  293. def setUp(self):
  294. super(DBTestCase, self).setUp()
  295. self.log = logging.getLogger("tests")
  296. self.setupZK()
  297. def setup_config(self, filename, images_dir=None, context_name=None):
  298. if images_dir is None:
  299. images_dir = fixtures.TempDir()
  300. self.useFixture(images_dir)
  301. build_log_dir = fixtures.TempDir()
  302. self.useFixture(build_log_dir)
  303. if filename.startswith('/'):
  304. path = filename
  305. else:
  306. configfile = os.path.join(os.path.dirname(__file__),
  307. 'fixtures', filename)
  308. (fd, path) = tempfile.mkstemp()
  309. with open(configfile, 'rb') as conf_fd:
  310. config = conf_fd.read().decode('utf8')
  311. data = config.format(images_dir=images_dir.path,
  312. build_log_dir=build_log_dir.path,
  313. context_name=context_name,
  314. zookeeper_host=self.zookeeper_host,
  315. zookeeper_port=self.zookeeper_port,
  316. zookeeper_chroot=self.zookeeper_chroot)
  317. os.write(fd, data.encode('utf8'))
  318. os.close(fd)
  319. self._config_images_dir = images_dir
  320. self._config_build_log_dir = build_log_dir
  321. validator = ConfigValidator(path)
  322. ret = validator.validate()
  323. if ret != 0:
  324. raise ValueError("Config file %s could not be validated" % path)
  325. return path
  326. def replace_config(self, configfile, filename):
  327. self.log.debug("Replacing config with %s", filename)
  328. new_configfile = self.setup_config(filename, self._config_images_dir)
  329. os.rename(new_configfile, configfile)
  330. def setup_secure(self, filename):
  331. # replace entries in secure.conf
  332. configfile = os.path.join(os.path.dirname(__file__),
  333. 'fixtures', filename)
  334. (fd, path) = tempfile.mkstemp()
  335. with open(configfile, 'rb') as conf_fd:
  336. config = conf_fd.read().decode('utf8')
  337. data = config.format(
  338. zookeeper_host=self.zookeeper_host,
  339. zookeeper_port=self.zookeeper_port,
  340. zookeeper_chroot=self.zookeeper_chroot)
  341. os.write(fd, data.encode('utf8'))
  342. os.close(fd)
  343. return path
  344. def wait_for_config(self, pool):
  345. for x in range(300):
  346. if pool.config is not None:
  347. return
  348. time.sleep(0.1)
  349. def waitForImage(self, provider_name, image_name, ignore_list=None):
  350. while True:
  351. self.wait_for_threads()
  352. image = self.zk.getMostRecentImageUpload(image_name, provider_name)
  353. if image:
  354. if ignore_list and image not in ignore_list:
  355. break
  356. elif not ignore_list:
  357. break
  358. time.sleep(1)
  359. self.wait_for_threads()
  360. return image
  361. def waitForUploadRecordDeletion(self, provider_name, image_name,
  362. build_id, upload_id):
  363. while True:
  364. self.wait_for_threads()
  365. uploads = self.zk.getUploads(image_name, build_id, provider_name)
  366. if not uploads or upload_id not in [u.id for u in uploads]:
  367. break
  368. time.sleep(1)
  369. self.wait_for_threads()
  370. def waitForImageDeletion(self, provider_name, image_name, match=None):
  371. while True:
  372. self.wait_for_threads()
  373. image = self.zk.getMostRecentImageUpload(image_name, provider_name)
  374. if not image or (match and image != match):
  375. break
  376. time.sleep(1)
  377. self.wait_for_threads()
  378. def waitForBuild(self, image_name, build_id, states=None):
  379. if states is None:
  380. states = (zk.READY,)
  381. base = "-".join([image_name, build_id])
  382. while True:
  383. self.wait_for_threads()
  384. build = self.zk.getBuild(image_name, build_id)
  385. if build and build.state in states:
  386. break
  387. time.sleep(1)
  388. # We should only expect a dib manifest with a successful build.
  389. while build.state == zk.READY:
  390. self.wait_for_threads()
  391. files = builder.DibImageFile.from_image_id(
  392. self._config_images_dir.path, base)
  393. if files:
  394. break
  395. time.sleep(1)
  396. self.wait_for_threads()
  397. return build
  398. def waitForBuildDeletion(self, image_name, build_id):
  399. base = "-".join([image_name, build_id])
  400. while True:
  401. self.wait_for_threads()
  402. files = builder.DibImageFile.from_image_id(
  403. self._config_images_dir.path, base)
  404. if not files:
  405. break
  406. time.sleep(1)
  407. while True:
  408. self.wait_for_threads()
  409. # Now, check the disk to ensure we didn't leak any files.
  410. matches = glob.glob('%s/%s.*' % (self._config_images_dir.path,
  411. base))
  412. if not matches:
  413. break
  414. time.sleep(1)
  415. while True:
  416. self.wait_for_threads()
  417. build = self.zk.getBuild(image_name, build_id)
  418. if not build:
  419. break
  420. time.sleep(1)
  421. self.wait_for_threads()
  422. def waitForNodeDeletion(self, node):
  423. while True:
  424. exists = False
  425. for n in self.zk.nodeIterator():
  426. if node.id == n.id:
  427. exists = True
  428. break
  429. if not exists:
  430. break
  431. time.sleep(1)
  432. def waitForInstanceDeletion(self, manager, instance_id):
  433. while True:
  434. servers = manager.listNodes()
  435. if not (instance_id in [s.id for s in servers]):
  436. break
  437. time.sleep(1)
  438. def waitForNodeRequestLockDeletion(self, request_id):
  439. while True:
  440. exists = False
  441. for lock_id in self.zk.getNodeRequestLockIDs():
  442. if request_id == lock_id:
  443. exists = True
  444. break
  445. if not exists:
  446. break
  447. time.sleep(1)
  448. def waitForNodes(self, label, count=1):
  449. while True:
  450. self.wait_for_threads()
  451. ready_nodes = self.zk.getReadyNodesOfTypes([label])
  452. if label in ready_nodes and len(ready_nodes[label]) == count:
  453. break
  454. time.sleep(1)
  455. self.wait_for_threads()
  456. return ready_nodes[label]
  457. def waitForNodeRequest(self, req, states=None):
  458. '''
  459. Wait for a node request to transition to a final state.
  460. '''
  461. if states is None:
  462. states = (zk.FULFILLED, zk.FAILED)
  463. while True:
  464. req = self.zk.getNodeRequest(req.id)
  465. if req.state in states:
  466. break
  467. time.sleep(1)
  468. return req
  469. def useNodepool(self, *args, **kwargs):
  470. secure_conf = kwargs.pop('secure_conf', None)
  471. args = (secure_conf,) + args
  472. pool = launcher.NodePool(*args, **kwargs)
  473. pool.cleanup_interval = .5
  474. pool.delete_interval = .5
  475. self.addCleanup(pool.stop)
  476. return pool
  477. def useWebApp(self, *args, **kwargs):
  478. app = webapp.WebApp(*args, **kwargs)
  479. self.addCleanup(app.stop)
  480. return app
  481. def useBuilder(self, configfile, securefile=None, cleanup_interval=.5,
  482. num_uploaders=1):
  483. builder_fixture = self.useFixture(
  484. BuilderFixture(configfile, cleanup_interval, securefile,
  485. num_uploaders)
  486. )
  487. return builder_fixture.builder
  488. def setupZK(self):
  489. f = ZookeeperServerFixture()
  490. self.useFixture(f)
  491. self.zookeeper_host = f.zookeeper_host
  492. self.zookeeper_port = f.zookeeper_port
  493. kz_fxtr = self.useFixture(ChrootedKazooFixture(
  494. self.zookeeper_host,
  495. self.zookeeper_port))
  496. self.zookeeper_chroot = kz_fxtr.zookeeper_chroot
  497. self.zk = zk.ZooKeeper(enable_cache=False)
  498. host = zk.ZooKeeperConnectionConfig(
  499. self.zookeeper_host, self.zookeeper_port, self.zookeeper_chroot
  500. )
  501. self.zk.connect([host])
  502. self.addCleanup(self.zk.disconnect)
  503. def printZKTree(self, node):
  504. def join(a, b):
  505. if a.endswith('/'):
  506. return a + b
  507. return a + '/' + b
  508. data, stat = self.zk.client.get(node)
  509. self.log.debug("Node: %s" % (node,))
  510. if data:
  511. self.log.debug(data)
  512. for child in self.zk.client.get_children(node):
  513. self.printZKTree(join(node, child))
  514. class IntegrationTestCase(DBTestCase):
  515. def setUpFakes(self):
  516. pass