Stop sharing Gerrit event queues in tests

When connections are set up in tests, multiple Gerrit connections which
are configured to connect to the same fake Gerrit server share a change
database so that changes sent to Gerrit via one connection are reflected
back to Zuul on another.  They also share an event queue so that events
injected on one are seen by another.

Unfortunately, that part doesn't work, and in fact, events are only seen
by one of the gerrit connections.  This happens to work since it doesn't
matter which gerrit connection receives an event, which is why we haven't
noticed the problem in tests.

Where we do see the problem in Zuulv3 is in shutting down the connections.
When a GerritConnection is stopped, a sentinal object (None) is added to
the event queue.  When the GerritConnection gets an event from the queue,
it first checks whether it has stopped before processing that event.
Because in tests (but not in production) multiple GerritConnections share
an event queue, the connection that adds the None object to the queue
may not be the one that receives it, which causes the test to raise an
exception and not stop correctly.

We did not notice this in v2 because the order in which the Queue.Queue
class decides to awaken a thread is deterministic enough that the thread
which submitted the sentinel was always the one that received it.  In
v3, the thread order is sufficiently different that the thread for the
*other* connection is reliably the one which receives it.

To correct this, stop using a shared queue between the differing
GerritConnection objects, and instead add a helper method to the testcase
class which will add an event to every connection for a given server.

Change-Id: Idd3238f5ab8f5e09e295c0fa028e140c089a2a3f
This commit is contained in:
James E. Blair 2016-08-08 15:37:15 -07:00
parent e7b99a0baa
commit 7fc8daa372
3 changed files with 40 additions and 17 deletions

View File

@ -396,11 +396,11 @@ class FakeGerritConnection(zuul.connection.gerrit.GerritConnection):
log = logging.getLogger("zuul.test.FakeGerritConnection")
def __init__(self, connection_name, connection_config,
changes_db=None, queues_db=None, upstream_root=None):
changes_db=None, upstream_root=None):
super(FakeGerritConnection, self).__init__(connection_name,
connection_config)
self.event_queue = queues_db
self.event_queue = Queue.Queue()
self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
self.change_number = 0
self.changes = changes_db
@ -1094,7 +1094,6 @@ class ZuulTestCase(BaseTestCase):
# Set a changes database so multiple FakeGerrit's can report back to
# a virtual canonical database given by the configured hostname
self.gerrit_changes_dbs = {}
self.gerrit_queues_dbs = {}
self.connections = zuul.lib.connections.ConnectionRegistry()
for section_name in self.config.sections():
@ -1115,17 +1114,13 @@ class ZuulTestCase(BaseTestCase):
if con_driver == 'gerrit':
if con_config['server'] not in self.gerrit_changes_dbs.keys():
self.gerrit_changes_dbs[con_config['server']] = {}
if con_config['server'] not in self.gerrit_queues_dbs.keys():
self.gerrit_queues_dbs[con_config['server']] = \
Queue.Queue()
self.event_queues.append(
self.gerrit_queues_dbs[con_config['server']])
self.connections.connections[con_name] = FakeGerritConnection(
con_name, con_config,
changes_db=self.gerrit_changes_dbs[con_config['server']],
queues_db=self.gerrit_queues_dbs[con_config['server']],
upstream_root=self.upstream_root
)
self.event_queues.append(
self.connections.connections[con_name].event_queue)
setattr(self, 'fake_' + con_name,
self.connections.connections[con_name])
elif con_driver == 'smtp':
@ -1140,12 +1135,11 @@ class ZuulTestCase(BaseTestCase):
if 'gerrit' in self.config.sections():
self.gerrit_changes_dbs['gerrit'] = {}
self.gerrit_queues_dbs['gerrit'] = Queue.Queue()
self.event_queues.append(self.gerrit_queues_dbs['gerrit'])
self.event_queues.append(
self.connections.connections[con_name].event_queue)
self.connections.connections['gerrit'] = FakeGerritConnection(
'_legacy_gerrit', dict(self.config.items('gerrit')),
changes_db=self.gerrit_changes_dbs['gerrit'],
queues_db=self.gerrit_queues_dbs['gerrit'])
changes_db=self.gerrit_changes_dbs['gerrit'])
if 'smtp' in self.config.sections():
self.connections.connections['smtp'] = \
@ -1494,6 +1488,36 @@ tenants:
if tag:
repo.create_tag(tag)
def addEvent(self, connection, event):
"""Inject a Fake (Gerrit) event.
This method accepts a JSON-encoded event and simulates Zuul
having received it from Gerrit. It could (and should)
eventually apply to any connection type, but is currently only
used with Gerrit connections. The name of the connection is
used to look up the corresponding server, and the event is
simulated as having been received by all Zuul connections
attached to that server. So if two Gerrit connections in Zuul
are connected to the same Gerrit server, and you invoke this
method specifying the name of one of them, the event will be
received by both.
.. note::
"self.fake_gerrit.addEvent" calls should be migrated to
this method.
:arg str connection: The name of the connection corresponding
to the gerrit server.
:arg str event: The JSON-encoded event.
"""
specified_conn = self.connections.connections[connection]
for conn in self.connections.connections.values():
if (isinstance(conn, specified_conn.__class__) and
specified_conn.server == conn.server):
conn.addEvent(event)
class AnsibleZuulTestCase(ZuulTestCase):
"""ZuulTestCase but with an actual ansible launcher running"""

View File

@ -42,7 +42,7 @@ class TestConnections(ZuulTestCase):
"Test multiple connections to the one gerrit"
A = self.fake_review_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_review_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.addEvent('review_gerrit', A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
@ -54,7 +54,7 @@ class TestConnections(ZuulTestCase):
B = self.fake_review_gerrit.addFakeChange('org/project', 'master', 'B')
self.worker.addFailTest('project-test2', B)
self.fake_review_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.addEvent('review_gerrit', B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()

View File

@ -243,7 +243,7 @@ class GerritConnection(BaseConnection):
self.port = int(self.connection_config.get('port', 29418))
self.keyfile = self.connection_config.get('sshkey', None)
self.watcher_thread = None
self.event_queue = None
self.event_queue = Queue.Queue()
self.client = None
self.baseurl = self.connection_config.get('baseurl',
@ -770,7 +770,6 @@ class GerritConnection(BaseConnection):
self.watcher_thread.join()
def _start_watcher_thread(self):
self.event_queue = Queue.Queue()
self.watcher_thread = GerritWatcher(
self,
self.user,