Factorize member_id in the base coordinator class
Any coordinator that implements (and they all should) group membership needs a member_id field and copy it to self. Just do that by default for all driver at __init__ time. Change-Id: I66d82815afd4c6e9354212e5ab3fba17931c1bbc
This commit is contained in:
		| @@ -231,8 +231,9 @@ class CoordinationDriver(object): | |||||||
|     enum member(s) that can be used to interogate how this driver works. |     enum member(s) that can be used to interogate how this driver works. | ||||||
|     """ |     """ | ||||||
|  |  | ||||||
|     def __init__(self): |     def __init__(self, member_id): | ||||||
|         super(CoordinationDriver, self).__init__() |         super(CoordinationDriver, self).__init__() | ||||||
|  |         self._member_id = member_id | ||||||
|         self._started = False |         self._started = False | ||||||
|         self._hooks_join_group = collections.defaultdict(Hooks) |         self._hooks_join_group = collections.defaultdict(Hooks) | ||||||
|         self._hooks_leave_group = collections.defaultdict(Hooks) |         self._hooks_leave_group = collections.defaultdict(Hooks) | ||||||
| @@ -610,8 +611,8 @@ class CoordinationDriverCachedRunWatchers(CoordinationDriver): | |||||||
|  |  | ||||||
|     """ |     """ | ||||||
|  |  | ||||||
|     def __init__(self): |     def __init__(self, member_id): | ||||||
|         super(CoordinationDriverCachedRunWatchers, self).__init__() |         super(CoordinationDriverCachedRunWatchers, self).__init__(member_id) | ||||||
|         # A cache for group members |         # A cache for group members | ||||||
|         self._group_members = collections.defaultdict(set) |         self._group_members = collections.defaultdict(set) | ||||||
|         self._joined_groups = set() |         self._joined_groups = set() | ||||||
|   | |||||||
| @@ -107,7 +107,7 @@ class ConsulDriver(coordination.CoordinationDriver): | |||||||
|     DEFAULT_PORT = 8500 |     DEFAULT_PORT = 8500 | ||||||
|  |  | ||||||
|     def __init__(self, member_id, parsed_url, options): |     def __init__(self, member_id, parsed_url, options): | ||||||
|         super(ConsulDriver, self).__init__() |         super(ConsulDriver, self).__init__(member_id) | ||||||
|         options = utils.collapse(options) |         options = utils.collapse(options) | ||||||
|         self._executor = utils.ProxyExecutor.build("Consul", options) |         self._executor = utils.ProxyExecutor.build("Consul", options) | ||||||
|         self._host = parsed_url.hostname |         self._host = parsed_url.hostname | ||||||
|   | |||||||
| @@ -200,7 +200,7 @@ class EtcdDriver(coordination.CoordinationDriver): | |||||||
|     lock_encoder_cls = utils.Base64LockEncoder |     lock_encoder_cls = utils.Base64LockEncoder | ||||||
|  |  | ||||||
|     def __init__(self, member_id, parsed_url, options): |     def __init__(self, member_id, parsed_url, options): | ||||||
|         super(EtcdDriver, self).__init__() |         super(EtcdDriver, self).__init__(member_id) | ||||||
|         host = parsed_url.hostname or self.DEFAULT_HOST |         host = parsed_url.hostname or self.DEFAULT_HOST | ||||||
|         port = parsed_url.port or self.DEFAULT_PORT |         port = parsed_url.port or self.DEFAULT_PORT | ||||||
|         options = utils.collapse(options) |         options = utils.collapse(options) | ||||||
|   | |||||||
| @@ -205,8 +205,7 @@ class FileDriver(coordination.CoordinationDriverCachedRunWatchers): | |||||||
|  |  | ||||||
|     def __init__(self, member_id, parsed_url, options): |     def __init__(self, member_id, parsed_url, options): | ||||||
|         """Initialize the file driver.""" |         """Initialize the file driver.""" | ||||||
|         super(FileDriver, self).__init__() |         super(FileDriver, self).__init__(member_id) | ||||||
|         self._member_id = member_id |  | ||||||
|         self._dir = self._normalize_path(parsed_url.path) |         self._dir = self._normalize_path(parsed_url.path) | ||||||
|         self._executor = utils.ProxyExecutor.build("File", options) |         self._executor = utils.ProxyExecutor.build("File", options) | ||||||
|         self._group_dir = os.path.join(self._dir, 'groups') |         self._group_dir = os.path.join(self._dir, 'groups') | ||||||
|   | |||||||
| @@ -166,7 +166,7 @@ class IPCDriver(coordination.CoordinationDriver): | |||||||
|  |  | ||||||
|     def __init__(self, member_id, parsed_url, options): |     def __init__(self, member_id, parsed_url, options): | ||||||
|         """Initialize the IPC driver.""" |         """Initialize the IPC driver.""" | ||||||
|         super(IPCDriver, self).__init__() |         super(IPCDriver, self).__init__(member_id) | ||||||
|         self._executor = utils.ProxyExecutor.build("IPC", options) |         self._executor = utils.ProxyExecutor.build("IPC", options) | ||||||
|  |  | ||||||
|     def _start(self): |     def _start(self): | ||||||
|   | |||||||
| @@ -216,10 +216,9 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers): | |||||||
|     STILL_ALIVE = b"It's alive!" |     STILL_ALIVE = b"It's alive!" | ||||||
|  |  | ||||||
|     def __init__(self, member_id, parsed_url, options): |     def __init__(self, member_id, parsed_url, options): | ||||||
|         super(MemcachedDriver, self).__init__() |         super(MemcachedDriver, self).__init__(member_id) | ||||||
|         options = utils.collapse(options) |         options = utils.collapse(options) | ||||||
|         self._options = options |         self._options = options | ||||||
|         self._member_id = member_id |  | ||||||
|         self._executor = utils.ProxyExecutor.build("Memcached", options) |         self._executor = utils.ProxyExecutor.build("Memcached", options) | ||||||
|         self.host = (parsed_url.hostname or "localhost", |         self.host = (parsed_url.hostname or "localhost", | ||||||
|                      parsed_url.port or 11211) |                      parsed_url.port or 11211) | ||||||
|   | |||||||
| @@ -117,7 +117,7 @@ class MySQLDriver(coordination.CoordinationDriver): | |||||||
|  |  | ||||||
|     def __init__(self, member_id, parsed_url, options): |     def __init__(self, member_id, parsed_url, options): | ||||||
|         """Initialize the MySQL driver.""" |         """Initialize the MySQL driver.""" | ||||||
|         super(MySQLDriver, self).__init__() |         super(MySQLDriver, self).__init__(member_id) | ||||||
|         self._parsed_url = parsed_url |         self._parsed_url = parsed_url | ||||||
|         self._options = utils.collapse(options) |         self._options = utils.collapse(options) | ||||||
|  |  | ||||||
|   | |||||||
| @@ -173,7 +173,7 @@ class PostgresDriver(coordination.CoordinationDriver): | |||||||
|  |  | ||||||
|     def __init__(self, member_id, parsed_url, options): |     def __init__(self, member_id, parsed_url, options): | ||||||
|         """Initialize the PostgreSQL driver.""" |         """Initialize the PostgreSQL driver.""" | ||||||
|         super(PostgresDriver, self).__init__() |         super(PostgresDriver, self).__init__(member_id) | ||||||
|         self._parsed_url = parsed_url |         self._parsed_url = parsed_url | ||||||
|         self._options = utils.collapse(options) |         self._options = utils.collapse(options) | ||||||
|  |  | ||||||
|   | |||||||
| @@ -315,7 +315,7 @@ return 1 | |||||||
|     """ |     """ | ||||||
|  |  | ||||||
|     def __init__(self, member_id, parsed_url, options): |     def __init__(self, member_id, parsed_url, options): | ||||||
|         super(RedisDriver, self).__init__() |         super(RedisDriver, self).__init__(member_id) | ||||||
|         options = utils.collapse(options, exclude=self.CLIENT_LIST_ARGS) |         options = utils.collapse(options, exclude=self.CLIENT_LIST_ARGS) | ||||||
|         self._parsed_url = parsed_url |         self._parsed_url = parsed_url | ||||||
|         self._options = options |         self._options = options | ||||||
| @@ -332,7 +332,6 @@ return 1 | |||||||
|         self._beat_prefix = self._namespace + b"_beats" |         self._beat_prefix = self._namespace + b"_beats" | ||||||
|         self._groups = self._namespace + b"_groups" |         self._groups = self._namespace + b"_groups" | ||||||
|         self._client = None |         self._client = None | ||||||
|         self._member_id = utils.to_binary(member_id, encoding=self._encoding) |  | ||||||
|         self._acquired_locks = set() |         self._acquired_locks = set() | ||||||
|         self._executor = utils.ProxyExecutor.build("Redis", options) |         self._executor = utils.ProxyExecutor.build("Redis", options) | ||||||
|         self._started = False |         self._started = False | ||||||
|   | |||||||
| @@ -135,9 +135,8 @@ class KazooDriver(coordination.CoordinationDriverCachedRunWatchers): | |||||||
|     """ |     """ | ||||||
|  |  | ||||||
|     def __init__(self, member_id, parsed_url, options): |     def __init__(self, member_id, parsed_url, options): | ||||||
|         super(KazooDriver, self).__init__() |         super(KazooDriver, self).__init__(member_id) | ||||||
|         options = utils.collapse(options, exclude=['hosts']) |         options = utils.collapse(options, exclude=['hosts']) | ||||||
|         self._member_id = member_id |  | ||||||
|         self.timeout = int(options.get('timeout', '10')) |         self.timeout = int(options.get('timeout', '10')) | ||||||
|         self._namespace = options.get('namespace', self.TOOZ_NAMESPACE) |         self._namespace = options.get('namespace', self.TOOZ_NAMESPACE) | ||||||
|         self._coord = self._make_client(parsed_url, options) |         self._coord = self._make_client(parsed_url, options) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Julien Danjou
					Julien Danjou