From d79ddcc1f9c2f473168c15d6e2041f04be8bbf50 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 4 May 2015 17:12:33 -0700 Subject: [PATCH] Explicitly start and execute most transactions Instead of having transactions that are not started using the multi() block command ensure that we start at the needed places to ensure that we correctly perform these options in bulk (and handle the results of the transaction running). Change-Id: Idd5b91c466ccf11280d30619b1263e30d8d01708 --- tooz/drivers/redis.py | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/tooz/drivers/redis.py b/tooz/drivers/redis.py index 6e773aa9..d3006639 100644 --- a/tooz/drivers/redis.py +++ b/tooz/drivers/redis.py @@ -488,6 +488,13 @@ class RedisDriver(coordination.CoordinationDriver): def _create_group(p): if p.exists(encoded_group): raise coordination.GroupAlreadyExist(group_id) + # Start the transaction, all following should no longer be + # doing any reads (and should typically just be mutations from + # here on out). They need to be idempotent since transactions + # can fail half-way through if they key is of the wrong type + # or something else happens ... (redis doesn't have transactions + # that support rollbacks). + p.multi() p.sadd(self._groups, self._encode_group_id(group_id, apply_namespace=False)) # Add our special key to avoid redis from deleting the dictionary @@ -509,6 +516,7 @@ class RedisDriver(coordination.CoordinationDriver): if not p.hexists(encoded_group, encoded_member_id): raise coordination.MemberNotJoined(group_id, self._member_id) else: + p.multi() p.hset(encoded_group, encoded_member_id, self._dumps(capabilities)) @@ -524,7 +532,9 @@ class RedisDriver(coordination.CoordinationDriver): def _leave_group(p): if not p.exists(encoded_group): raise coordination.GroupNotCreated(group_id) - c = p.hdel(encoded_group, encoded_member_id) + p.multi() + p.hdel(encoded_group, encoded_member_id) + c = p.execute()[0] if c == 0: raise coordination.MemberNotJoined(group_id, self._member_id) else: @@ -564,6 +574,7 @@ class RedisDriver(coordination.CoordinationDriver): if gone_members: many_at_once, _version = self._check_fetch_redis_version( "2.4.0", not_existent=False) + p.multi() if not many_at_once: for m in gone_members: p.hdel(encoded_group, self._encode_member_id(m)) @@ -571,6 +582,7 @@ class RedisDriver(coordination.CoordinationDriver): encoded_gone_members = [self._encode_member_id(m) for m in gone_members] p.hdel(encoded_group, *encoded_gone_members) + p.execute() return [m for m in potential_members if m not in gone_members] else: return potential_members @@ -603,8 +615,10 @@ class RedisDriver(coordination.CoordinationDriver): def _join_group(p): if not p.exists(encoded_group): raise coordination.GroupNotCreated(group_id) - c = p.hset(encoded_group, encoded_member_id, - self._dumps(capabilities)) + p.multi() + p.hset(encoded_group, encoded_member_id, + self._dumps(capabilities)) + c = p.execute()[0] if c == 0: # Field already exists... raise coordination.MemberAlreadyExist(group_id, @@ -625,11 +639,14 @@ class RedisDriver(coordination.CoordinationDriver): # its len is 1 if p.hlen(encoded_group) > 1: raise coordination.GroupNotEmpty(group_id) - if not p.delete(encoded_group): - raise coordination.GroupNotCreated(group_id) + p.multi() + p.delete(encoded_group) p.srem(self._groups, self._encode_group_id(group_id, apply_namespace=False)) + was_group_deleted, was_group_member_removed = p.execute() + if not any([was_group_deleted, was_group_member_removed]): + raise coordination.GroupNotCreated(group_id) return RedisFutureResult(self._submit(self._client.transaction, _delete_group,