diff --git a/releasenotes/notes/sentinel-fallbacks-6fe2ab0d68959cdf.yaml b/releasenotes/notes/sentinel-fallbacks-6fe2ab0d68959cdf.yaml new file mode 100644 index 000000000..cbe5edda7 --- /dev/null +++ b/releasenotes/notes/sentinel-fallbacks-6fe2ab0d68959cdf.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + The redis driver now supports ``sentinel_fallbacks`` option. This allows + using additional sentinel servers as fallbacks. diff --git a/taskflow/jobs/backends/impl_redis.py b/taskflow/jobs/backends/impl_redis.py index 1beb42227..f18ff61db 100644 --- a/taskflow/jobs/backends/impl_redis.py +++ b/taskflow/jobs/backends/impl_redis.py @@ -17,6 +17,7 @@ import contextlib import datetime import functools +import re import string import threading import time @@ -558,6 +559,18 @@ return cmsgpack.pack(result) ut-were-afraid-to-ask """ + @classmethod + def _parse_sentinel(cls, sentinel): + # IPv6 (eg. [::1]:6379 ) + match = re.search(r'\[(\S+)\]:(\d+)', sentinel) + if match: + return (match[1], int(match[2])) + # IPv4 or hostname (eg. 127.0.0.1:6379 or localhost:6379) + match = re.search(r'(\S+):(\d+)', sentinel) + if match: + return (match[1], int(match[2])) + raise ValueError('Malformed sentinel server format') + @classmethod def _make_client(cls, conf): client_conf = {} @@ -569,6 +582,8 @@ return cmsgpack.pack(result) client_conf[key] = conf[key] if conf.get('sentinel') is not None: sentinels = [(client_conf.pop('host'), client_conf.pop('port'))] + for fallback in conf.get('sentinel_fallbacks', []): + sentinels.append(cls._parse_sentinel(fallback)) sentinel_kwargs = conf.get('sentinel_kwargs', {}) for key in ('username', 'password', 'socket_timeout'): if key in conf: diff --git a/taskflow/tests/unit/jobs/test_redis_job.py b/taskflow/tests/unit/jobs/test_redis_job.py index 5a4a53b19..20918edd5 100644 --- a/taskflow/tests/unit/jobs/test_redis_job.py +++ b/taskflow/tests/unit/jobs/test_redis_job.py @@ -146,6 +146,32 @@ class RedisJobboardTest(test.TestCase, base.BoardTestMixin): **test_conf) mock_sentinel().master_for.assert_called_once_with('mymaster') + def test__make_client_sentinel_fallbacks(self): + conf = {'host': '127.0.0.1', + 'port': 26379, + 'username': 'default', + 'password': 'secret', + 'namespace': 'test', + 'sentinel': 'mymaster', + 'sentinel_fallbacks': [ + '[::1]:26379', '127.0.0.2:26379', 'localhost:26379' + ]} + with mock.patch('redis.sentinel.Sentinel') as mock_sentinel: + impl_redis.RedisJobBoard('test-board', conf) + test_conf = { + 'username': 'default', + 'password': 'secret', + } + mock_sentinel.assert_called_once_with( + [('127.0.0.1', 26379), ('::1', 26379), + ('127.0.0.2', 26379), ('localhost', 26379)], + sentinel_kwargs={ + 'username': 'default', + 'password': 'secret' + }, + **test_conf) + mock_sentinel().master_for.assert_called_once_with('mymaster') + def test__make_client_sentinel_ssl(self): conf = {'host': '127.0.0.1', 'port': 26379,