Explicitly start and execute most transactions
Instead of having transactions that are not started using the multi() block command ensure that we start at the needed places to ensure that we correctly perform these options in bulk (and handle the results of the transaction running). Change-Id: Idd5b91c466ccf11280d30619b1263e30d8d01708
This commit is contained in:
parent
9c329cc535
commit
d79ddcc1f9
|
@ -488,6 +488,13 @@ class RedisDriver(coordination.CoordinationDriver):
|
||||||
def _create_group(p):
|
def _create_group(p):
|
||||||
if p.exists(encoded_group):
|
if p.exists(encoded_group):
|
||||||
raise coordination.GroupAlreadyExist(group_id)
|
raise coordination.GroupAlreadyExist(group_id)
|
||||||
|
# Start the transaction, all following should no longer be
|
||||||
|
# doing any reads (and should typically just be mutations from
|
||||||
|
# here on out). They need to be idempotent since transactions
|
||||||
|
# can fail half-way through if they key is of the wrong type
|
||||||
|
# or something else happens ... (redis doesn't have transactions
|
||||||
|
# that support rollbacks).
|
||||||
|
p.multi()
|
||||||
p.sadd(self._groups,
|
p.sadd(self._groups,
|
||||||
self._encode_group_id(group_id, apply_namespace=False))
|
self._encode_group_id(group_id, apply_namespace=False))
|
||||||
# Add our special key to avoid redis from deleting the dictionary
|
# Add our special key to avoid redis from deleting the dictionary
|
||||||
|
@ -509,6 +516,7 @@ class RedisDriver(coordination.CoordinationDriver):
|
||||||
if not p.hexists(encoded_group, encoded_member_id):
|
if not p.hexists(encoded_group, encoded_member_id):
|
||||||
raise coordination.MemberNotJoined(group_id, self._member_id)
|
raise coordination.MemberNotJoined(group_id, self._member_id)
|
||||||
else:
|
else:
|
||||||
|
p.multi()
|
||||||
p.hset(encoded_group, encoded_member_id,
|
p.hset(encoded_group, encoded_member_id,
|
||||||
self._dumps(capabilities))
|
self._dumps(capabilities))
|
||||||
|
|
||||||
|
@ -524,7 +532,9 @@ class RedisDriver(coordination.CoordinationDriver):
|
||||||
def _leave_group(p):
|
def _leave_group(p):
|
||||||
if not p.exists(encoded_group):
|
if not p.exists(encoded_group):
|
||||||
raise coordination.GroupNotCreated(group_id)
|
raise coordination.GroupNotCreated(group_id)
|
||||||
c = p.hdel(encoded_group, encoded_member_id)
|
p.multi()
|
||||||
|
p.hdel(encoded_group, encoded_member_id)
|
||||||
|
c = p.execute()[0]
|
||||||
if c == 0:
|
if c == 0:
|
||||||
raise coordination.MemberNotJoined(group_id, self._member_id)
|
raise coordination.MemberNotJoined(group_id, self._member_id)
|
||||||
else:
|
else:
|
||||||
|
@ -564,6 +574,7 @@ class RedisDriver(coordination.CoordinationDriver):
|
||||||
if gone_members:
|
if gone_members:
|
||||||
many_at_once, _version = self._check_fetch_redis_version(
|
many_at_once, _version = self._check_fetch_redis_version(
|
||||||
"2.4.0", not_existent=False)
|
"2.4.0", not_existent=False)
|
||||||
|
p.multi()
|
||||||
if not many_at_once:
|
if not many_at_once:
|
||||||
for m in gone_members:
|
for m in gone_members:
|
||||||
p.hdel(encoded_group, self._encode_member_id(m))
|
p.hdel(encoded_group, self._encode_member_id(m))
|
||||||
|
@ -571,6 +582,7 @@ class RedisDriver(coordination.CoordinationDriver):
|
||||||
encoded_gone_members = [self._encode_member_id(m)
|
encoded_gone_members = [self._encode_member_id(m)
|
||||||
for m in gone_members]
|
for m in gone_members]
|
||||||
p.hdel(encoded_group, *encoded_gone_members)
|
p.hdel(encoded_group, *encoded_gone_members)
|
||||||
|
p.execute()
|
||||||
return [m for m in potential_members if m not in gone_members]
|
return [m for m in potential_members if m not in gone_members]
|
||||||
else:
|
else:
|
||||||
return potential_members
|
return potential_members
|
||||||
|
@ -603,8 +615,10 @@ class RedisDriver(coordination.CoordinationDriver):
|
||||||
def _join_group(p):
|
def _join_group(p):
|
||||||
if not p.exists(encoded_group):
|
if not p.exists(encoded_group):
|
||||||
raise coordination.GroupNotCreated(group_id)
|
raise coordination.GroupNotCreated(group_id)
|
||||||
c = p.hset(encoded_group, encoded_member_id,
|
p.multi()
|
||||||
|
p.hset(encoded_group, encoded_member_id,
|
||||||
self._dumps(capabilities))
|
self._dumps(capabilities))
|
||||||
|
c = p.execute()[0]
|
||||||
if c == 0:
|
if c == 0:
|
||||||
# Field already exists...
|
# Field already exists...
|
||||||
raise coordination.MemberAlreadyExist(group_id,
|
raise coordination.MemberAlreadyExist(group_id,
|
||||||
|
@ -625,11 +639,14 @@ class RedisDriver(coordination.CoordinationDriver):
|
||||||
# its len is 1
|
# its len is 1
|
||||||
if p.hlen(encoded_group) > 1:
|
if p.hlen(encoded_group) > 1:
|
||||||
raise coordination.GroupNotEmpty(group_id)
|
raise coordination.GroupNotEmpty(group_id)
|
||||||
if not p.delete(encoded_group):
|
p.multi()
|
||||||
raise coordination.GroupNotCreated(group_id)
|
p.delete(encoded_group)
|
||||||
p.srem(self._groups,
|
p.srem(self._groups,
|
||||||
self._encode_group_id(group_id,
|
self._encode_group_id(group_id,
|
||||||
apply_namespace=False))
|
apply_namespace=False))
|
||||||
|
was_group_deleted, was_group_member_removed = p.execute()
|
||||||
|
if not any([was_group_deleted, was_group_member_removed]):
|
||||||
|
raise coordination.GroupNotCreated(group_id)
|
||||||
|
|
||||||
return RedisFutureResult(self._submit(self._client.transaction,
|
return RedisFutureResult(self._submit(self._client.transaction,
|
||||||
_delete_group,
|
_delete_group,
|
||||||
|
|
Loading…
Reference in New Issue