Add more tests, add method for lock in Client

This commit is contained in:
Davanum Srinivas 2017-03-25 12:57:59 -04:00
parent 6199d5bec5
commit 6f72189ba9
3 changed files with 49 additions and 4 deletions

View File

@ -12,11 +12,13 @@
import base64 import base64
import json import json
import uuid
import requests import requests
import six import six
from etcd3gw.lease import Lease from etcd3gw.lease import Lease
from etcd3gw.lock import Lock
from etcd3gw.utils import _encode from etcd3gw.utils import _encode
from etcd3gw.utils import DEFAULT_TIMEOUT from etcd3gw.utils import DEFAULT_TIMEOUT
@ -48,6 +50,9 @@ class Client(object):
json={"TTL": ttl, "ID": 0}) json={"TTL": ttl, "ID": 0})
return Lease(int(result['ID']), client=self) return Lease(int(result['ID']), client=self)
def lock(self, id=str(uuid.uuid4()), ttl=DEFAULT_TIMEOUT):
return Lock(id, ttl=ttl, client=self)
def put(self, key, value, lease=None): def put(self, key, value, lease=None):
payload = { payload = {
"key": _encode(key), "key": _encode(key),

View File

@ -39,5 +39,6 @@ class Lease(object):
result = self.client.post(self.client.get_url("/kv/lease/timetolive"), result = self.client.post(self.client.get_url("/kv/lease/timetolive"),
json={"ID": self.id, json={"ID": self.id,
"keys": True}) "keys": True})
keys = result['keys'] if 'keys' in result else []
return [base64.b64decode(six.b(key)).decode('utf-8') return [base64.b64decode(six.b(key)).decode('utf-8')
for key in result['keys']] for key in keys]

View File

@ -18,6 +18,7 @@ Tests for `etcd3gw` module.
""" """
from testtools.testcase import unittest from testtools.testcase import unittest
import time
import urllib3 import urllib3
from etcd3gw.client import Client from etcd3gw.client import Client
@ -33,8 +34,46 @@ def _is_etcd3_running():
class TestEtcd3Gateway(base.TestCase): class TestEtcd3Gateway(base.TestCase):
@classmethod
def setUpClass(cls):
cls.client = Client()
@unittest.skipUnless( @unittest.skipUnless(
_is_etcd3_running(), "etcd3 is not available") _is_etcd3_running(), "etcd3 is not available")
def test_something(self): def test_client_status(self):
client = Client() response = self.client.status()
self.assertIsNotNone(client.status()) self.assertIsNotNone(response)
self.assertIn('version', response)
self.assertIn('header', response)
self.assertIn('cluster_id', response['header'])
@unittest.skipUnless(
_is_etcd3_running(), "etcd3 is not available")
def test_client_lease(self):
lease = self.client.lease(ttl=60)
self.assertIsNotNone(lease)
ttl = lease.ttl()
self.assertTrue(0 <= ttl <= 60)
keys = lease.keys()
self.assertEqual([], keys)
ttl = lease.refresh()
self.assertTrue(0 <= ttl <= 60)
self.assertTrue(lease.revoke())
@unittest.skipUnless(
_is_etcd3_running(), "etcd3 is not available")
def test_client_locks(self):
lock = self.client.lock(id='xyz-%s' % time.clock(), ttl=60)
self.assertIsNotNone(lock)
self.assertTrue(lock.acquire())
ttl = lock.refresh()
self.assertTrue(0 <= ttl <= 60)
self.assertTrue(lock.is_acquired())
self.assertTrue(lock.release())
self.assertFalse(lock.is_acquired())