Reworked the if statement logic to only call stop() on not-stopped objects. added tests
This commit is contained in:
@@ -322,7 +322,7 @@ class Producer(object):
|
||||
self.thread.start()
|
||||
|
||||
def cleanup(obj):
|
||||
if obj.stopped:
|
||||
if not obj.stopped:
|
||||
obj.stop()
|
||||
self._cleanup_func = cleanup
|
||||
atexit.register(cleanup, self)
|
||||
|
||||
@@ -91,6 +91,20 @@ class TestKafkaProducer(unittest.TestCase):
|
||||
with self.assertRaises(FailedPayloadsError):
|
||||
producer.send_messages('foobar', b'test message')
|
||||
|
||||
def test_cleanup_stop_is_called_on_not_stopped_object(self):
|
||||
producer = Producer(MagicMock(), async=True)
|
||||
producer.stopped = True
|
||||
with patch('kafka.producer.base.Producer.stop') as base_stop:
|
||||
producer._cleanup_func(producer)
|
||||
self.assertEqual(base_stop.call_count, 0)
|
||||
|
||||
def test_cleanup_stop_is_not_called_on_stopped_object(self):
|
||||
producer = Producer(MagicMock(), async=True)
|
||||
producer.stopped = False
|
||||
with patch('kafka.producer.base.Producer.stop') as base_stop:
|
||||
producer._cleanup_func(producer)
|
||||
self.assertEqual(base_stop.call_count, 1)
|
||||
|
||||
|
||||
class TestKafkaProducerSendUpstream(unittest.TestCase):
|
||||
|
||||
|
||||
Reference in New Issue
Block a user