Test statsd.
Add a fake statsd listener, and actually listen on a random udp port for statsd messages. Configure STATSD env vars to use it before each test. Add a test that exercises all of the statsd methods that we use explicitly. Add checks of the statsd output to the basic "run jobs" test case to make sure that we're covering some of the statsd code in the scheduler. Change-Id: I3361c4e334155dda413bf343e6f1c6f4e277a3ff Reviewed-on: https://review.openstack.org/27306 Reviewed-by: Clark Boylan <clark.boylan@gmail.com> Approved: Jeremy Stanley <fungi@yuggoth.org> Reviewed-by: Jeremy Stanley <fungi@yuggoth.org> Tested-by: Jenkins
This commit is contained in:
parent
a14a2e89e8
commit
412e558cd2
|
@ -28,11 +28,13 @@ import pprint
|
|||
import re
|
||||
import urllib2
|
||||
import urlparse
|
||||
import select
|
||||
import statsd
|
||||
import shutil
|
||||
import socket
|
||||
import string
|
||||
import git
|
||||
|
||||
import zuul
|
||||
import zuul.scheduler
|
||||
import zuul.launcher.jenkins
|
||||
import zuul.trigger.gerrit
|
||||
|
@ -642,6 +644,34 @@ class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
|
|||
return os.path.join(UPSTREAM_ROOT, project.name)
|
||||
|
||||
|
||||
class FakeStatsd(threading.Thread):
|
||||
def __init__(self):
|
||||
threading.Thread.__init__(self)
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self.sock.bind(('', 0))
|
||||
self.port = self.sock.getsockname()[1]
|
||||
self.wake_read, self.wake_write = os.pipe()
|
||||
self.stats = []
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
poll = select.poll()
|
||||
poll.register(self.sock, select.POLLIN)
|
||||
poll.register(self.wake_read, select.POLLIN)
|
||||
ret = poll.poll()
|
||||
for (fd, event) in ret:
|
||||
if fd == self.sock.fileno():
|
||||
data = self.sock.recvfrom(1024)
|
||||
if not data:
|
||||
return
|
||||
self.stats.append(data[0])
|
||||
if fd == self.wake_read:
|
||||
return
|
||||
|
||||
def stop(self):
|
||||
os.write(self.wake_write, '1\n')
|
||||
|
||||
|
||||
class testScheduler(unittest.TestCase):
|
||||
log = logging.getLogger("zuul.test")
|
||||
|
||||
|
@ -660,6 +690,15 @@ class testScheduler(unittest.TestCase):
|
|||
init_repo("org/one-job-project")
|
||||
init_repo("org/nonvoting-project")
|
||||
self.config = CONFIG
|
||||
|
||||
self.statsd = FakeStatsd()
|
||||
os.environ['STATSD_HOST'] = 'localhost'
|
||||
os.environ['STATSD_PORT'] = str(self.statsd.port)
|
||||
self.statsd.start()
|
||||
# the statsd client object is configured in the statsd module import
|
||||
reload(statsd)
|
||||
reload(zuul.scheduler)
|
||||
|
||||
self.sched = zuul.scheduler.Scheduler()
|
||||
|
||||
def jenkinsFactory(*args, **kw):
|
||||
|
@ -699,6 +738,8 @@ class testScheduler(unittest.TestCase):
|
|||
self.gerrit.stop()
|
||||
self.sched.stop()
|
||||
self.sched.join()
|
||||
self.statsd.stop()
|
||||
self.statsd.join()
|
||||
#shutil.rmtree(TEST_ROOT)
|
||||
|
||||
def waitUntilSettled(self):
|
||||
|
@ -745,6 +786,21 @@ class testScheduler(unittest.TestCase):
|
|||
print 'heads', queue.severed_heads
|
||||
assert len(queue.severed_heads) == 0
|
||||
|
||||
def assertReportedStat(self, key, value=None):
|
||||
start = time.time()
|
||||
while time.time() < (start + 5):
|
||||
for stat in self.statsd.stats:
|
||||
k, v = stat.split(':')
|
||||
if key == k:
|
||||
if value is None:
|
||||
return
|
||||
if value == v:
|
||||
return
|
||||
time.sleep(0.1)
|
||||
|
||||
pprint.pprint(self.statsd.stats)
|
||||
raise Exception("Key %s not found in reported stats" % key)
|
||||
|
||||
def test_jobs_launched(self):
|
||||
"Test that jobs are launched and a change is merged"
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
|
@ -763,6 +819,16 @@ class testScheduler(unittest.TestCase):
|
|||
assert A.reported == 2
|
||||
self.assertEmptyQueues()
|
||||
|
||||
self.assertReportedStat('gerrit.event.comment-added', '1|c')
|
||||
self.assertReportedStat('zuul.pipeline.gate.current_changes', '1|g')
|
||||
self.assertReportedStat('zuul.job.project-merge')
|
||||
self.assertReportedStat('zuul.pipeline.gate.resident_time')
|
||||
self.assertReportedStat('zuul.pipeline.gate.total_changes', '1|c')
|
||||
self.assertReportedStat(
|
||||
'zuul.pipeline.gate.org.project.resident_time')
|
||||
self.assertReportedStat(
|
||||
'zuul.pipeline.gate.org.project.total_changes', '1|c')
|
||||
|
||||
def test_parallel_changes(self):
|
||||
"Test that changes are tested in parallel and merged in series"
|
||||
self.fake_jenkins.hold_jobs_in_build = True
|
||||
|
@ -1970,6 +2036,17 @@ class testScheduler(unittest.TestCase):
|
|||
assert D.reported == 2
|
||||
self.assertEmptyQueues()
|
||||
|
||||
def test_statsd(self):
|
||||
"Test each of the statsd methods used in the scheduler"
|
||||
import extras
|
||||
statsd = extras.try_import('statsd.statsd')
|
||||
statsd.incr('test-incr')
|
||||
statsd.timing('test-timing', 3)
|
||||
statsd.gauge('test-guage', 12)
|
||||
self.assertReportedStat('test-incr', '1|c')
|
||||
self.assertReportedStat('test-timing', '3|ms')
|
||||
self.assertReportedStat('test-guage', '12|g')
|
||||
|
||||
def test_file_jobs(self):
|
||||
"Test that file jobs run only when appropriate"
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
|
|
Loading…
Reference in New Issue