Browse Source

Merge "Implement group support for etcd3gw"

tags/1.62.0^0
Zuul 1 year ago
parent
commit
2ac3ef36d2

+ 4
- 0
releasenotes/notes/etcd3gw-group-support-598832a8764a8aa6.yaml View File

@@ -0,0 +1,4 @@
1
+---
2
+features:
3
+  - |
4
+    The etcd3gw driver now supports the group membership API.

+ 206
- 3
tooz/drivers/etcd3gw.py View File

@@ -29,6 +29,11 @@ from tooz import locking
29 29
 from tooz import utils
30 30
 
31 31
 
32
+def _encode(data):
33
+    """Safely encode data for consumption of the gateway."""
34
+    return base64.b64encode(data).decode("ascii")
35
+
36
+
32 37
 def _translate_failures(func):
33 38
     """Translates common requests exceptions into tooz exceptions."""
34 39
 
@@ -66,8 +71,8 @@ class Etcd3Lock(locking.Lock):
66 71
         self._timeout = timeout
67 72
         self._coord = coord
68 73
         self._key = self.LOCK_PREFIX + name
69
-        self._key_b64 = base64.b64encode(self._key).decode("ascii")
70
-        self._uuid = base64.b64encode(uuid.uuid4().bytes).decode("ascii")
74
+        self._key_b64 = _encode(self._key)
75
+        self._uuid = _encode(uuid.uuid4().bytes)
71 76
         self._exclusive_access = threading.Lock()
72 77
 
73 78
     @_translate_failures
@@ -156,7 +161,7 @@ class Etcd3Lock(locking.Lock):
156 161
         return False
157 162
 
158 163
 
159
-class Etcd3Driver(coordination.CoordinationDriver):
164
+class Etcd3Driver(coordination.CoordinationDriverWithExecutor):
160 165
     """An etcd based driver.
161 166
 
162 167
     This driver uses etcd provide the coordination driver semantics and
@@ -172,6 +177,8 @@ class Etcd3Driver(coordination.CoordinationDriver):
172 177
     #: Default port used if none provided (4001 or 2379 are the common ones).
173 178
     DEFAULT_PORT = 2379
174 179
 
180
+    GROUP_PREFIX = b"tooz/groups/"
181
+
175 182
     def __init__(self, member_id, parsed_url, options):
176 183
         super(Etcd3Driver, self).__init__(member_id, parsed_url, options)
177 184
         host = parsed_url.hostname or self.DEFAULT_HOST
@@ -180,8 +187,14 @@ class Etcd3Driver(coordination.CoordinationDriver):
180 187
         timeout = int(options.get('timeout', self.DEFAULT_TIMEOUT))
181 188
         self.client = etcd3gw.client(host=host, port=port, timeout=timeout)
182 189
         self.lock_timeout = int(options.get('lock_timeout', timeout))
190
+        self.membership_timeout = int(options.get(
191
+            'membership_timeout', timeout))
183 192
         self._acquired_locks = set()
184 193
 
194
+    def _start(self):
195
+        super(Etcd3Driver, self)._start()
196
+        self._membership_lease = self.client.lease(self.membership_timeout)
197
+
185 198
     def get_lock(self, name):
186 199
         return Etcd3Lock(self, name, self.lock_timeout)
187 200
 
@@ -202,3 +215,193 @@ class Etcd3Driver(coordination.CoordinationDriver):
202 215
 
203 216
     def unwatch_leave_group(self, group_id, callback):
204 217
         raise tooz.NotImplemented
218
+
219
+    def _encode_group_id(self, group_id):
220
+        return _encode(self._prefix_group(group_id))
221
+
222
+    def _prefix_group(self, group_id):
223
+        return b"%s%s/" % (self.GROUP_PREFIX, group_id)
224
+
225
+    def create_group(self, group_id):
226
+        @_translate_failures
227
+        def _create_group():
228
+            encoded_group = self._encode_group_id(group_id)
229
+            txn = {
230
+                'compare': [{
231
+                    'key': encoded_group,
232
+                    'result': 'EQUAL',
233
+                    'target': 'VERSION',
234
+                    'version': 0
235
+                }],
236
+                'success': [{
237
+                    'request_put': {
238
+                        'key': encoded_group,
239
+                        # We shouldn't need a value, but etcd3gw needs it for
240
+                        # now
241
+                        'value': encoded_group
242
+                    }
243
+                }],
244
+                'failure': []
245
+            }
246
+            result = self.client.transaction(txn)
247
+            if not result.get("succeeded"):
248
+                raise coordination.GroupAlreadyExist(group_id)
249
+
250
+        return coordination.CoordinatorResult(
251
+            self._executor.submit(_create_group))
252
+
253
+    def _destroy_group(self, group_id):
254
+        self.client.delete(group_id)
255
+
256
+    def delete_group(self, group_id):
257
+        @_translate_failures
258
+        def _delete_group():
259
+            prefix_group = self._prefix_group(group_id)
260
+            members = self.client.get_prefix(prefix_group)
261
+            if len(members) > 1:
262
+                raise coordination.GroupNotEmpty(group_id)
263
+
264
+            encoded_group = self._encode_group_id(group_id)
265
+            txn = {
266
+                'compare': [{
267
+                    'key': encoded_group,
268
+                    'result': 'NOT_EQUAL',
269
+                    'target': 'VERSION',
270
+                    'version': 0
271
+                }],
272
+                'success': [{
273
+                    'request_delete_range': {
274
+                        'key': encoded_group,
275
+                    }
276
+                }],
277
+                'failure': []
278
+            }
279
+            result = self.client.transaction(txn)
280
+
281
+            if not result.get("succeeded"):
282
+                raise coordination.GroupNotCreated(group_id)
283
+
284
+        return coordination.CoordinatorResult(
285
+            self._executor.submit(_delete_group))
286
+
287
+    def join_group(self, group_id, capabilities=b""):
288
+        @_retry.retry()
289
+        @_translate_failures
290
+        def _join_group():
291
+            prefix_group = self._prefix_group(group_id)
292
+            prefix_member = prefix_group + self._member_id
293
+            members = self.client.get_prefix(prefix_group)
294
+
295
+            encoded_member = _encode(prefix_member)
296
+
297
+            group_metadata = None
298
+            for cap, metadata in members:
299
+                if metadata['key'] == prefix_member:
300
+                    raise coordination.MemberAlreadyExist(group_id,
301
+                                                          self._member_id)
302
+                if metadata['key'] == prefix_group:
303
+                    group_metadata = metadata
304
+
305
+            if group_metadata is None:
306
+                raise coordination.GroupNotCreated(group_id)
307
+
308
+            encoded_group = self._encode_group_id(group_id)
309
+            txn = {
310
+                'compare': [{
311
+                    'key': encoded_group,
312
+                    'result': 'EQUAL',
313
+                    'target': 'VERSION',
314
+                    'version': int(group_metadata['version'])
315
+                }],
316
+                'success': [{
317
+                    'request_put': {
318
+                        'key': encoded_member,
319
+                        'value': _encode(utils.dumps(capabilities)),
320
+                        'lease': self._membership_lease.id
321
+                    }
322
+                }],
323
+                'failure': []
324
+            }
325
+            result = self.client.transaction(txn)
326
+            if not result.get('succeeded'):
327
+                raise _retry.TryAgain
328
+            else:
329
+                self._joined_groups.add(group_id)
330
+
331
+        return coordination.CoordinatorResult(
332
+            self._executor.submit(_join_group))
333
+
334
+    def leave_group(self, group_id):
335
+        @_translate_failures
336
+        def _leave_group():
337
+            prefix_group = self._prefix_group(group_id)
338
+            prefix_member = prefix_group + self._member_id
339
+            members = self.client.get_prefix(prefix_group)
340
+            for capabilities, metadata in members:
341
+                if metadata['key'] == prefix_member:
342
+                    break
343
+            else:
344
+                raise coordination.MemberNotJoined(group_id,
345
+                                                   self._member_id)
346
+
347
+            self.client.delete(prefix_member)
348
+            self._joined_groups.discard(group_id)
349
+
350
+        return coordination.CoordinatorResult(
351
+            self._executor.submit(_leave_group))
352
+
353
+    def get_members(self, group_id):
354
+        @_translate_failures
355
+        def _get_members():
356
+            prefix_group = self._prefix_group(group_id)
357
+            members = set()
358
+            group_found = False
359
+
360
+            for cap, metadata in self.client.get_prefix(prefix_group):
361
+                if metadata['key'] == prefix_group:
362
+                    group_found = True
363
+                else:
364
+                    members.add(metadata['key'][len(prefix_group):])
365
+
366
+            if not group_found:
367
+                raise coordination.GroupNotCreated(group_id)
368
+
369
+            return members
370
+
371
+        return coordination.CoordinatorResult(
372
+            self._executor.submit(_get_members))
373
+
374
+    def get_member_capabilities(self, group_id, member_id):
375
+        @_translate_failures
376
+        def _get_member_capabilities():
377
+            prefix_member = self._prefix_group(group_id) + member_id
378
+            result = self.client.get(prefix_member)
379
+            if not result:
380
+                raise coordination.MemberNotJoined(group_id, member_id)
381
+            return utils.loads(result[0])
382
+
383
+        return coordination.CoordinatorResult(
384
+            self._executor.submit(_get_member_capabilities))
385
+
386
+    def update_capabilities(self, group_id, capabilities):
387
+        @_translate_failures
388
+        def _update_capabilities():
389
+            prefix_member = self._prefix_group(group_id) + self._member_id
390
+            result = self.client.get(prefix_member)
391
+            if not result:
392
+                raise coordination.MemberNotJoined(group_id, self._member_id)
393
+
394
+            self.client.put(prefix_member, utils.dumps(capabilities),
395
+                            lease=self._membership_lease)
396
+
397
+        return coordination.CoordinatorResult(
398
+            self._executor.submit(_update_capabilities))
399
+
400
+    def get_groups(self):
401
+        @_translate_failures
402
+        def _get_groups():
403
+            groups = self.client.get_prefix(self.GROUP_PREFIX)
404
+            return [
405
+                group[1]['key'][len(self.GROUP_PREFIX):-1] for group in groups]
406
+        return coordination.CoordinatorResult(
407
+            self._executor.submit(_get_groups))

Loading…
Cancel
Save