Merge "Fix keyboardinterrupt test on kombuserver"

This commit is contained in:
Zuul 2025-01-02 15:22:44 +00:00 committed by Gerrit Code Review
commit 55b33a6d5c

View File

@ -21,6 +21,8 @@ from mistral.tests.unit.rpc.kombu import fake_kombu
from unittest import mock
import socket
import time
from stevedore import driver
with mock.patch.dict('sys.modules', kombu=fake_kombu):
@ -37,6 +39,7 @@ class KombuServerTest(base.KombuTestCase):
super(KombuServerTest, self).setUp()
self.conf = mock.MagicMock()
self.conf.host = 'fakehost'
self.server = kombu_server.KombuRPCServer(self.conf)
self.ctx = type('context', (object,), {'to_dict': lambda self: {}})()
@ -134,6 +137,9 @@ class KombuServerTest(base.KombuTestCase):
fake_kombu.connection.acquire.return_value = acquire_mock
self.assertIsNone(self.server.run())
# Wait 1 sec so the thread start listening on RPC and receive the
# side_effect
time.sleep(1)
self.assertFalse(self.server.is_running)
@mock.patch.object(