implement and test MonotonicTimestampGenerator
This commit is contained in:
parent
e989d398a9
commit
0cee875696
@ -70,6 +70,7 @@ from cassandra.pool import (Host, _ReconnectionHandler, _HostReconnectionHandler
|
||||
from cassandra.query import (SimpleStatement, PreparedStatement, BoundStatement,
|
||||
BatchStatement, bind_params, QueryTrace,
|
||||
named_tuple_factory, dict_factory, tuple_factory, FETCH_SIZE_UNSET)
|
||||
from cassandra.timestamps import MonotonicTimestampGenerator
|
||||
|
||||
|
||||
def _is_eventlet_monkey_patched():
|
||||
@ -771,7 +772,8 @@ class Cluster(object):
|
||||
prepare_on_all_hosts=True,
|
||||
reprepare_on_up=True,
|
||||
execution_profiles=None,
|
||||
allow_beta_protocol_version=False):
|
||||
allow_beta_protocol_version=False,
|
||||
timestamp_generator=None):
|
||||
"""
|
||||
``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
|
||||
extablishing connection pools or refreshing metadata.
|
||||
@ -830,6 +832,13 @@ class Cluster(object):
|
||||
if connection_class is not None:
|
||||
self.connection_class = connection_class
|
||||
|
||||
if timestamp_generator is not None:
|
||||
if not callable(timestamp_generator):
|
||||
raise ValueError("timestamp_generator must be callable")
|
||||
self.timestamp_generator = timestamp_generator
|
||||
else:
|
||||
self.timestamp_generator = MonotonicTimestampGenerator()
|
||||
|
||||
self.profile_manager = ProfileManager()
|
||||
self.profile_manager.profiles[EXEC_PROFILE_DEFAULT] = ExecutionProfile(self.load_balancing_policy,
|
||||
self.default_retry_policy,
|
||||
@ -1893,6 +1902,27 @@ class Session(object):
|
||||
.. versionadded:: 2.1.0
|
||||
"""
|
||||
|
||||
timestamp_generator = None
|
||||
"""
|
||||
When :attr:`use_client_timestamp` is set, sessions call this object and use
|
||||
the result as the timestamp. (Note that timestamps specified within a CQL
|
||||
query will override this timestamp.) By default, a new
|
||||
:class:`~.MonotonicTimestampGenerator` is created for
|
||||
each :class:`Cluster` instance.
|
||||
|
||||
Applications can set this value for custom timestamp behavior. For
|
||||
example, an application could share a timestamp generator across
|
||||
:class:`Cluster` objects to guarantee that the application will use unique,
|
||||
increasing timestamps across clusters, or set it to to ``lambda:
|
||||
int(time.time() * 1e6)`` if losing records over clock inconsistencies is
|
||||
acceptable for the application. Custom :attr:`timestamp_generator` s should
|
||||
be callable, and calling them should return an integer representing seconds
|
||||
since some point in time, typically UNIX epoch.
|
||||
|
||||
.. versionadded:: 3.8.0
|
||||
"""
|
||||
|
||||
|
||||
encoder = None
|
||||
"""
|
||||
A :class:`~cassandra.encoder.Encoder` instance that will be used when
|
||||
@ -2085,7 +2115,7 @@ class Session(object):
|
||||
|
||||
start_time = time.time()
|
||||
if self._protocol_version >= 3 and self.use_client_timestamp:
|
||||
timestamp = int(start_time * 1e6)
|
||||
timestamp = self.cluster.timestamp_generator()
|
||||
else:
|
||||
timestamp = None
|
||||
|
||||
|
107
cassandra/timestamps.py
Normal file
107
cassandra/timestamps.py
Normal file
@ -0,0 +1,107 @@
|
||||
# Copyright 2013-2016 DataStax, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
This module contains utilities for generating timestamps for client-side
|
||||
timestamp specification.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
from threading import Lock
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
class MonotonicTimestampGenerator(object):
|
||||
"""
|
||||
An object that, when called, returns ``int(time.time() * 1e6)`` when
|
||||
possible, but, if the value returned by ``time.time`` doesn't increase,
|
||||
drifts into the future and logs warnings.
|
||||
Exposed configuration attributes can be configured with arguments to
|
||||
``__init__`` or by changing attributes on an initialized object.
|
||||
|
||||
.. versionadded:: 3.8.0
|
||||
"""
|
||||
|
||||
warn_on_drift = True
|
||||
"""
|
||||
If true, log warnings when timestamps drift into the future as allowed by
|
||||
:attr:`warning_threshold` and :attr:`warning_interval`.
|
||||
"""
|
||||
|
||||
warning_threshold = 0
|
||||
"""
|
||||
This object will only issue warnings when the returned timestamp drifts
|
||||
more than ``warning_threshold`` seconds into the future.
|
||||
"""
|
||||
|
||||
warning_interval = 0
|
||||
"""
|
||||
This object will only issue warnings every ``warning_interval`` seconds.
|
||||
"""
|
||||
|
||||
def __init__(self, warn_on_drift=True, warning_threshold=0, warning_interval=0):
|
||||
self.lock = Lock()
|
||||
with self.lock:
|
||||
self.last = 0
|
||||
self._last_warn = 0
|
||||
self.warn_on_drift = warn_on_drift
|
||||
self.warning_threshold = warning_threshold
|
||||
self.warning_interval = warning_interval
|
||||
|
||||
def _next_timestamp(self, now, last):
|
||||
"""
|
||||
Returns the timestamp that should be used if ``now`` is the current
|
||||
time and ``last`` is the last timestamp returned by this object.
|
||||
Intended for internal and testing use only; to generate timestamps,
|
||||
call an instantiated ``MonotonicTimestampGenerator`` object.
|
||||
|
||||
:param int now: an integer to be used as the current time, typically
|
||||
representing the current time in seconds since the UNIX epoch
|
||||
:param int last: an integer representing the last timestamp returned by
|
||||
this object
|
||||
"""
|
||||
if now > last:
|
||||
self.last = now
|
||||
return now
|
||||
else:
|
||||
self._maybe_warn(now=now)
|
||||
self.last = last + 1
|
||||
return self.last
|
||||
|
||||
def __call__(self):
|
||||
"""
|
||||
Makes ``MonotonicTimestampGenerator`` objects callable; defers
|
||||
internally to _next_timestamp.
|
||||
"""
|
||||
with self.lock:
|
||||
return self._next_timestamp(now=int(time.time() * 1e6),
|
||||
last=self.last)
|
||||
|
||||
def _maybe_warn(self, now):
|
||||
# should be called from inside the self.lock.
|
||||
diff = self.last - now
|
||||
since_last_warn = now - self._last_warn
|
||||
|
||||
warn = (self.warn_on_drift and
|
||||
(diff >= self.warning_threshold) and
|
||||
(since_last_warn >= self.warning_interval))
|
||||
if warn:
|
||||
log.warn(
|
||||
"Clock skew detected: current tick ({now}) was {diff} "
|
||||
"microseconds behind the last generated timestamp "
|
||||
"({last}), returned timestamps will be artificially "
|
||||
"incremented to guarantee monotonicity.".format(
|
||||
now=now, diff=diff, last=self.last))
|
||||
self._last_warn = now
|
@ -134,6 +134,8 @@
|
||||
|
||||
.. autoattribute:: use_client_timestamp
|
||||
|
||||
.. autoattribute:: timestamp_generator
|
||||
|
||||
.. autoattribute:: encoder
|
||||
|
||||
.. autoattribute:: client_protocol_handler
|
||||
|
14
docs/api/cassandra/timestamps.rst
Normal file
14
docs/api/cassandra/timestamps.rst
Normal file
@ -0,0 +1,14 @@
|
||||
``cassandra.timestamps`` - Timestamp Generation
|
||||
=============================================
|
||||
|
||||
.. module:: cassandra.timestamps
|
||||
|
||||
.. autoclass:: MonotonicTimestampGenerator (warn_on_drift=True, warning_threshold=0, warning_interval=0)
|
||||
|
||||
.. autoattribute:: warn_on_drift
|
||||
|
||||
.. autoattribute:: warning_threshold
|
||||
|
||||
.. autoattribute:: warning_interval
|
||||
|
||||
.. automethod:: _next_timestamp
|
168
tests/unit/test_timestamps.py
Normal file
168
tests/unit/test_timestamps.py
Normal file
@ -0,0 +1,168 @@
|
||||
# Copyright 2013-2016 DataStax, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
try:
|
||||
import unittest2 as unittest
|
||||
except ImportError:
|
||||
import unittest # noqa
|
||||
|
||||
import mock
|
||||
|
||||
from cassandra import timestamps
|
||||
|
||||
|
||||
class _TimestampTestMixin(object):
|
||||
|
||||
@mock.patch('cassandra.timestamps.time')
|
||||
def _call_and_check_results(self,
|
||||
patched_time_module,
|
||||
system_time_expected_stamp_pairs,
|
||||
timestamp_generator=None):
|
||||
"""
|
||||
For each element in an iterable of (system_time, expected_timestamp)
|
||||
pairs, call a :class:`cassandra.timestamps.MonotonicTimestampGenerator`
|
||||
with system_times as the underlying time.time() result, then assert
|
||||
that the result is expected_timestamp. Skips the check if
|
||||
expected_timestamp is None.
|
||||
"""
|
||||
patched_time_module.time = mock.Mock()
|
||||
system_times, expected_timestamps = zip(*system_time_expected_stamp_pairs)
|
||||
|
||||
patched_time_module.time.side_effect = system_times
|
||||
tsg = timestamp_generator or timestamps.MonotonicTimestampGenerator()
|
||||
|
||||
for expected in expected_timestamps:
|
||||
actual = tsg()
|
||||
if expected is not None:
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
# assert we patched timestamps.time.time correctly
|
||||
with self.assertRaises(StopIteration):
|
||||
tsg()
|
||||
|
||||
|
||||
class TestTimestampGeneratorOutput(unittest.TestCase, _TimestampTestMixin):
|
||||
"""
|
||||
Mock time.time and test the output of MonotonicTimestampGenerator.__call__
|
||||
given different patterns of changing results.
|
||||
"""
|
||||
|
||||
def test_timestamps_during_and_after_same_system_time(self):
|
||||
"""
|
||||
Timestamps should increase monotonically over repeated system time.
|
||||
|
||||
Test that MonotonicTimestampGenerator's output increases by 1 when the
|
||||
underlying system time is the same, then returns to normal when the
|
||||
system time increases again.
|
||||
"""
|
||||
self._call_and_check_results(
|
||||
system_time_expected_stamp_pairs=(
|
||||
(15.0, 15 * 1e6),
|
||||
(15.0, 15 * 1e6 + 1),
|
||||
(15.0, 15 * 1e6 + 2),
|
||||
(15.01, 15.01 * 1e6))
|
||||
)
|
||||
|
||||
def test_timestamps_during_and_after_backwards_system_time(self):
|
||||
"""
|
||||
Timestamps should increase monotonically over system time going backwards.
|
||||
|
||||
Test that MonotonicTimestampGenerator's output increases by 1 when the
|
||||
underlying system time goes backward, then returns to normal when the
|
||||
system time increases again.
|
||||
"""
|
||||
self._call_and_check_results(
|
||||
system_time_expected_stamp_pairs=(
|
||||
(15.0, 15 * 1e6),
|
||||
(13.0, 15 * 1e6 + 1),
|
||||
(14.0, 15 * 1e6 + 2),
|
||||
(13.5, 15 * 1e6 + 3),
|
||||
(15.01, 15.01 * 1e6))
|
||||
)
|
||||
|
||||
|
||||
class TestTimestampGeneratorLogging(unittest.TestCase, _TimestampTestMixin):
|
||||
|
||||
def setUp(self):
|
||||
self.log_patcher = mock.patch('cassandra.timestamps.log')
|
||||
self.addCleanup(self.log_patcher.stop)
|
||||
self.patched_timestamp_log = self.log_patcher.start()
|
||||
|
||||
def assertLastCallArgRegex(self, call, pattern):
|
||||
last_warn_args, last_warn_kwargs = call
|
||||
self.assertEqual(len(last_warn_args), 1)
|
||||
self.assertEqual(len(last_warn_kwargs), 0)
|
||||
self.assertRegexpMatches(
|
||||
last_warn_args[0],
|
||||
pattern,
|
||||
)
|
||||
|
||||
def test_basic_log_content(self):
|
||||
tsg = timestamps.MonotonicTimestampGenerator()
|
||||
tsg._last_warn = 12
|
||||
|
||||
tsg._next_timestamp(20, tsg.last)
|
||||
self.assertEqual(len(self.patched_timestamp_log.warn.call_args_list), 0)
|
||||
tsg._next_timestamp(16, tsg.last)
|
||||
|
||||
self.assertEqual(len(self.patched_timestamp_log.warn.call_args_list), 1)
|
||||
self.assertLastCallArgRegex(
|
||||
self.patched_timestamp_log.warn.call_args,
|
||||
r'Clock skew detected:.*\b16\b.*\b4\b.*\b20\b'
|
||||
)
|
||||
|
||||
def test_disable_logging(self):
|
||||
no_warn_tsg = timestamps.MonotonicTimestampGenerator(warn_on_drift=False)
|
||||
|
||||
no_warn_tsg.last = 100
|
||||
no_warn_tsg._next_timestamp(99, no_warn_tsg.last)
|
||||
self.assertEqual(len(self.patched_timestamp_log.warn.call_args_list), 0)
|
||||
|
||||
def test_warning_threshold_respected_no_logging(self):
|
||||
tsg = timestamps.MonotonicTimestampGenerator(
|
||||
warning_threshold=2,
|
||||
)
|
||||
tsg.last, tsg._last_warn = 100, 97
|
||||
tsg._next_timestamp(98, tsg.last)
|
||||
self.assertEqual(len(self.patched_timestamp_log.warn.call_args_list), 0)
|
||||
|
||||
def test_warning_threshold_respected_logs(self):
|
||||
tsg = timestamps.MonotonicTimestampGenerator(
|
||||
warning_threshold=1
|
||||
)
|
||||
tsg.last, tsg._last_warn = 100, 97
|
||||
tsg._next_timestamp(98, tsg.last)
|
||||
self.assertEqual(len(self.patched_timestamp_log.warn.call_args_list), 1)
|
||||
|
||||
def test_warning_interval_respected_no_logging(self):
|
||||
tsg = timestamps.MonotonicTimestampGenerator(
|
||||
warning_interval=2
|
||||
)
|
||||
tsg.last = 100
|
||||
tsg._next_timestamp(70, tsg.last)
|
||||
self.assertEqual(len(self.patched_timestamp_log.warn.call_args_list), 1)
|
||||
|
||||
tsg._next_timestamp(71, tsg.last)
|
||||
self.assertEqual(len(self.patched_timestamp_log.warn.call_args_list), 1)
|
||||
|
||||
def test_warning_interval_respected_logs(self):
|
||||
tsg = timestamps.MonotonicTimestampGenerator(
|
||||
warning_interval=1
|
||||
)
|
||||
tsg.last = 100
|
||||
tsg._next_timestamp(70, tsg.last)
|
||||
self.assertEqual(len(self.patched_timestamp_log.warn.call_args_list), 1)
|
||||
|
||||
tsg._next_timestamp(72, tsg.last)
|
||||
self.assertEqual(len(self.patched_timestamp_log.warn.call_args_list), 2)
|
Loading…
Reference in New Issue
Block a user