From 635f554055abb4f5c815615867be4903ab8b904b Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Mon, 22 Feb 2016 13:38:05 -0600 Subject: [PATCH] Add a task counter to cluster task scheduler PYTHON-473 Fixes a TypeError when tasks collide in time and function comparison is attempted (Python 3 only). https://docs.python.org/2/library/heapq.html#priority-queue-implementation-notes --- cassandra/cluster.py | 11 ++++++----- tests/unit/test_connection.py | 12 +++--------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index ae7a3ece..1954a139 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -41,7 +41,7 @@ except ImportError: from cassandra.util import WeakSet # NOQA from functools import partial, wraps -from itertools import groupby +from itertools import groupby, count from cassandra import (ConsistencyLevel, AuthenticationFailed, OperationTimedOut, UnsupportedOperation, @@ -2542,6 +2542,7 @@ class _Scheduler(object): def __init__(self, executor): self._queue = Queue.PriorityQueue() self._scheduled_tasks = set() + self._count = count() self._executor = executor t = Thread(target=self.run, name="Task Scheduler") @@ -2559,7 +2560,7 @@ class _Scheduler(object): # this can happen on interpreter shutdown pass self.is_shutdown = True - self._queue.put_nowait((0, None)) + self._queue.put_nowait((0, 0, None)) def schedule(self, delay, fn, *args, **kwargs): self._insert_task(delay, (fn, args, tuple(kwargs.items()))) @@ -2575,7 +2576,7 @@ class _Scheduler(object): if not self.is_shutdown: run_at = time.time() + delay self._scheduled_tasks.add(task) - self._queue.put_nowait((run_at, task)) + self._queue.put_nowait((run_at, next(self._count), task)) else: log.debug("Ignoring scheduled task after shutdown: %r", task) @@ -2586,7 +2587,7 @@ class _Scheduler(object): try: while True: - run_at, task = self._queue.get(block=True, timeout=None) + run_at, i, task = self._queue.get(block=True, timeout=None) if self.is_shutdown: log.debug("Not executing scheduled task due to Scheduler shutdown") return @@ -2597,7 +2598,7 @@ class _Scheduler(object): future = self._executor.submit(fn, *args, **kwargs) future.add_done_callback(self._log_if_failed) else: - self._queue.put_nowait((run_at, task)) + self._queue.put_nowait((run_at, i, task)) break except Queue.Empty: pass diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py index ed03b9a1..15fa6e72 100644 --- a/tests/unit/test_connection.py +++ b/tests/unit/test_connection.py @@ -419,19 +419,13 @@ class TimerTest(unittest.TestCase): def test_timer_collision(self): # simple test demonstrating #466 - def f1(): - pass - - def f2(): - pass - # same timeout, comparison will defer to the Timer object itself - t1 = Timer(0, f1) - t2 = Timer(0, f2) + t1 = Timer(0, lambda: None) + t2 = Timer(0, lambda: None) t2.end = t1.end tm = TimerManager() tm.add_timer(t1) tm.add_timer(t2) - # Prior to $466: "TypeError: unorderable types: Timer() < Timer()" + # Prior to #466: "TypeError: unorderable types: Timer() < Timer()" tm.service_timeouts()