Basic API server

Change-Id: Ib84b17db7da07ca9f6d50627e61390055895e89f
This commit is contained in:
Peter Balland 2013-09-26 13:43:58 -07:00
parent 9f768a7411
commit df3d06d96b
7 changed files with 937 additions and 4 deletions

View File

@ -27,7 +27,7 @@ SRCSCRIPT=`readlink -f $0`
SCRIPTDIR=`dirname $SRCSCRIPT`
ROOTDIR=`dirname $SCRIPTDIR`
PYSRCDIR=$ROOTDIR/src
WEBSERVERDIR=$PYSRCDIR/webserver
SERVERDIR=$PYSRCDIR/server
POLICYDIR=$PYSRCDIR/policy
THIRDPARTYDIR=$ROOTDIR/thirdparty
@ -42,6 +42,5 @@ SSLADDR=$INTERFACE:$SSLPORT
# Start Node API server
# TODO(pjb): make this the web server not the compiler
ARGS="$@"
cd $POLICYDIR
python -c "import compiler"
python compiler.pyc $ARGS
cd $SERVERDIR
python server.py $ARGS

0
src/server/__init__.py Normal file
View File

121
src/server/ad_sync.py Executable file
View File

@ -0,0 +1,121 @@
#!/usr/bin/env python
# Copyright (c) 2013 VMware, Inc. All rights reserved.
import argparse
import ldap
import sys
import time
import uuid
import ovs.daemon
import ovs.vlog
vlog = ovs.vlog.Vlog(__name__)
#NOTE: LDAP filters: http://tinyurl.com/la9jw7m
LDAP_URI = 'ldap://ad-server:389'
BASE_DN = 'dc=corp,dc=example,dc=com'
BIND_USER = 'cn=administrator,cn=Users' + ',' + BASE_DN
BIND_PW = 'p@ssw0rd'
class UserGroupDataModel(object):
"""An in-memory data model.
"""
def __init__(self):
self.items = {} # {uuid: (user, group)}
self.by_user = {} # {user: {group:uuid}}
def get_items(self):
"""Get items in model.
Returns: A dict of {id, item} for all items in model.
"""
return self.items
def get_item(self, id_):
"""Retrieve item with id id_ from model.
Args:
id_: The ID of the item to retrieve.
Returns:
The matching item or None if item with id_ does not exist.
"""
return self.items.get(id_)
def update_from_ad(self):
"""Fetch user group info from AD and update model.
Raises:
ldap.INVALID_CREDENTIALS
XXX: probably a bunch of ther ldap exceptions
"""
# TODO: rewrite to be scalable, robust
#vlog.dbg('Updating users from AD')
l = ldap.initialize(LDAP_URI)
l.simple_bind_s(BIND_USER, BIND_PW)
ret = l.search_s('cn=Users,%s' % BASE_DN, ldap.SCOPE_SUBTREE,
'(&(objectCategory=person)(objectClass=user))')
user_dns = [(u[1]['sAMAccountName'][0], u[0]) for u in ret]
users_to_del = set(self.by_user.keys()) - set([u[0] for u in user_dns])
for user in users_to_del:
num_groups = len(self.by_user[user])
vlog.info("User '%s' deleted (was in %s group%s)"
% (user, num_groups, '' if num_groups == 1 else 's'))
ids = self.by_user.pop(user).values()
for i in ids:
del self.items[i]
for user, dn in user_dns:
filter_ = '(member:1.2.840.113556.1.4.1941:= %s)' % dn
ret = l.search_s('cn=Users,%s' % BASE_DN, ldap.SCOPE_SUBTREE,
filter_)
new_groups = set([r[1]['cn'][0] for r in ret])
old_groups = set(self.by_user.get(user, {}).keys())
membership_to_del = old_groups - new_groups
membership_to_add = new_groups - old_groups
for group in membership_to_del:
id_ = self.by_user[user].pop(group)
vlog.info("User '%s' removed from group '%s' (%s)"
% (user, group, id_))
del self.items[id_]
for group in membership_to_add:
new_id = str(uuid.uuid4())
self.by_user.setdefault(user, {})[group] = new_id
vlog.info("User '%s' added to group '%s' (%s)"
% (user, group, new_id))
self.items[new_id] = (user, group)
def main():
parser = argparse.ArgumentParser()
ovs.vlog.add_args(parser)
args = parser.parse_args()
ovs.vlog.handle_args(args)
model = UserGroupDataModel()
vlog.info("Starting AD sync service")
while True:
model.update_from_ad()
time.sleep(3)
if __name__ == '__main__':
try:
main()
except SystemExit:
# Let system.exit() calls complete normally
raise
except:
vlog.exception("traceback")
sys.exit(ovs.daemon.RESTART_EXIT_CODE)

94
src/server/server.py Executable file
View File

@ -0,0 +1,94 @@
#!/usr/bin/env python
# Copyright (c) 2013 VMware, Inc. All rights reserved.
import argparse
import sys
import time
import eventlet
eventlet.patcher.monkey_patch()
import ovs.daemon
import ovs.vlog
vlog = ovs.vlog.Vlog(__name__)
from ad_sync import UserGroupDataModel
from webservice import ApiApplication
from webservice import CollectionHandler
from webservice import ElementHandler
from webservice import PolicyDataModel
from webservice import RowCollectionHandler
from webservice import RowElementHandler
from webservice import SimpleDataModel
from wsgi import Server
DEFAULT_HTTP_ADDR = '0.0.0.0'
DEFAULT_HTTP_PORT = 8080
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--http_listen_port", default=DEFAULT_HTTP_PORT)
parser.add_argument("--http_listen_addr", default=DEFAULT_HTTP_ADDR)
ovs.vlog.add_args(parser)
ovs.daemon.add_args(parser)
args = parser.parse_args()
ovs.vlog.handle_args(args)
ovs.daemon.handle_args(args)
wsgi_server = Server("API Broker")
api = ApiApplication()
tables_model = SimpleDataModel()
table_collection_handler = CollectionHandler('/tables', tables_model)
api.register_handler(table_collection_handler)
table_element_handler = ElementHandler('/tables/([^/]+)', tables_model,
table_collection_handler)
api.register_handler(table_element_handler)
rows_model = SimpleDataModel()
#TODO: scope model per table
rows_collection_handler = RowCollectionHandler('/tables/([^/]+)/rows',
rows_model)
api.register_handler(rows_collection_handler)
rows_element_handler = RowElementHandler(
'/tables/([^/]+)/rows/([^/]+)', rows_model,
rows_collection_handler)
api.register_handler(rows_element_handler)
policy_model = PolicyDataModel()
policy_element_handler = ElementHandler('/policy', policy_model)
api.register_handler(policy_element_handler)
ad_model = UserGroupDataModel()
def ad_update_thread():
while True:
ad_model.update_from_ad() # XXX: blocks eventlet
time.sleep(3)
wsgi_server.pool.spawn_n(ad_update_thread)
ad_row_handler = CollectionHandler( '/tables/ad-groups/rows', ad_model)
api.register_handler(ad_row_handler, 0)
# Add static tables to model
tables_model.add_item({'sample': 'schema', 'id': 'ad-groups'}, 'ad-groups')
vlog.info("Starting congress server")
wsgi_server.start(api, args.http_listen_port,
args.http_listen_addr)
wsgi_server.wait()
#TODO: trigger watcher for policy outputs
if __name__ == '__main__':
try:
main()
except SystemExit:
# Let system.exit() calls complete normally
raise
except:
vlog.exception("traceback")
sys.exit(ovs.daemon.RESTART_EXIT_CODE)

375
src/server/webservice.py Executable file
View File

@ -0,0 +1,375 @@
#!/usr/bin/env python
# Copyright (c) 2013 VMware, Inc. All rights reserved.
import httplib
import json
import re
import uuid
import webob
import webob.dec
import ovs.vlog
vlog = ovs.vlog.Vlog(__name__)
NOT_SUPPORTED_RESPONSE = webob.Response(body="Method not supported",
status=httplib.NOT_IMPLEMENTED)
def errorResponse(status, error_code, description, data=None):
"""Construct and return an error response.
Args:
status: The HTTP status code of the response.
error_code: The application-specific error code.
description: Friendly G11N-enabled string corresponding ot error_code.
data: Additional data (not G11N-enabled) for the API consumer.
"""
data = {
'error_code': error_code,
'descripton': description,
'error_data': data
}
body = '%s\n' % json.dumps(data)
return webob.Response(body=body, status=status,
content_type='application/json')
class ApiApplication(object):
def __init__(self):
self.handlers = []
@webob.dec.wsgify(RequestClass=webob.Request)
def __call__(self, request):
handler = self.get_handler(request)
if handler:
vlog.dbg("Handling request '%s %s' with %s"
% (request.method, request.path, str(handler)))
return handler.handle_request(request)
else:
return NOT_SUPPORTED_RESPONSE
def register_handler(self, handler, search_index=None):
if search_index is not None:
self.handlers.insert(search_index, handler)
else:
self.handlers.append(handler)
def get_handler(self, request):
"""Find a handler for a REST request.
Args:
request: A webob request object.
Returns:
A handler instance or None.
"""
for h in self.handlers:
if h.handles_request(request):
return h
return None
class AbstractApiHandler(object):
def __init__(self, path_regex):
self.parent_handler = None
self.child_handlers = []
if path_regex[-1] != '$':
path_regex += "$"
# we only use 'match' so no need to mark the beginning of string
self.path_regex = path_regex
self.path_re = re.compile(path_regex)
def __str__(self):
return "%s(%s)" % (self.__class__.__name__, self.path_re.pattern)
def handles_request(self, request):
m = self.path_re.match(request.path)
return m is not None
def handle_request(self, request):
"""Handle a REST request.
Args:
request: A webob request object.
Returns:
A webob response object.
"""
return NOT_SUPPORTED_RESPONSE
class ElementHandler(AbstractApiHandler):
"""API handler for REST element resources.
"""
#TODO: validation
def __init__(self, path_regex, model, collection_handler=None):
"""Initialize an element handler.
Args:
path_regex: A regular expression that matches the full path
to the element. If multiple handlers match a request path,
the handler with the highhest registration search_index wins.
model: A resource data model instance
collection_handler: The collection handler this elemeent
is a member of or None if the element is not a member of a
collection.
"""
super(ElementHandler, self).__init__(path_regex)
self.model = model
self.collection_handler = collection_handler
def _get_element_id(self, request):
m = self.path_re.match(request.path)
if m.groups():
return m.groups()[-1] #TODO: make robust
return None
def handle_request(self, request):
"""Handle a REST request.
Args:
request: A webob request object.
Returns:
A webob response object.
"""
if request.method == 'GET':
return self.read(request)
#TODO(pjb): POST for controller semantics
elif request.method == 'PUT':
return self.replace(request)
elif request.method == 'PATCH':
return self.update(request)
elif request.method == 'DELETE':
return self.delete(request)
return NOT_SUPPORTED_RESPONSE
def read(self, request):
if not hasattr(self.model, 'get_item'):
return NOT_SUPPORTED_RESPONSE
id_ = self._get_element_id(request)
item = self.model.get_item(id_)
if item is None:
return errorResponse(httplib.NOT_FOUND, 404, 'Not found')
return webob.Response(body=json.dumps(item), status=httplib.OK,
content_type='application/json')
def replace(self, request):
if not hasattr(self.model, 'update_item'):
return NOT_SUPPORTED_RESPONSE
id_ = self._get_element_id(request)
try:
item = json.loads(request.body)
self.model.update_item(id_, item)
except KeyError:
if (self.collection_handler and
getattr(self.collection_handler, 'allow_named_create', False)):
return self.collection_handler.create_member(request, id_=id_)
return errorResponse(httplib.NOT_FOUND, 404, 'Not found')
return webob.Response(body=json.dumps(item), status=httplib.OK,
content_type='application/json')
def update(self, request):
if not (hasattr(self.model, 'update_item') or
hasattr(self.model, 'get_tiem')):
return NOT_SUPPORTED_RESPONSE
id_ = self._get_element_id(request)
item = self.model.get_item(id_)
if item is None:
return errorResponse(httplib.NOT_FOUND, 404, 'Not found')
updates = json.loads(request.body)
item.update(updates)
self.model.update_item(id_, item)
return webob.Response(body=json.dumps(item), status=httplib.OK,
content_type='application/json')
def delete(self, request):
if not hasattr(self.model, 'delete_item'):
return NOT_SUPPORTED_RESPONSE
id_ = self._get_element_id(request)
try:
item = self.model.delete_item(id_)
return webob.Response(body=json.dumps(item), status=httplib.OK,
content_type='application/json')
except KeyError:
return errorResponse(httplib.NOT_FOUND, 404, 'Not found')
class CollectionHandler(AbstractApiHandler):
"""API handler for REST collection resources.
"""
#TODO: validation
def __init__(self, path_regex, model, allow_named_create=True):
"""Initialize a collection handler.
Args:
path_regex: A regular expression matching the collection base path.
model: TODO
element_handler_factor: A callable that returns a new element
handler.
"""
super(CollectionHandler, self).__init__(path_regex)
self.model = model
self.allow_named_create = allow_named_create
def handle_request(self, request):
"""Handle a REST request.
Args:
request: A webob request object.
Returns:
A webob response object.
"""
if request.method == 'GET':
return self.list_members(request)
elif request.method == 'POST':
return self.create_member(request)
return NOT_SUPPORTED_RESPONSE
def list_members(self, request):
items = self.model.get_items().values()
body = "%s\n" % json.dumps(items, indent=2)
return webob.Response(body=body, status=httplib.OK,
content_type='application/json')
def create_member(self, request, id_=None):
item = json.loads(request.body)
try:
id_ = self.model.add_item(item, id_)
except KeyError:
return errorResponse(httplib.CONFLICT, httplib.CONFLICT,
'Element already exists')
item['id'] = id_
return webob.Response(body=json.dumps(item), status=httplib.CREATED,
content_type='application/json',
location="%s/%s" %(request.path, id_))
class RowCollectionHandler(CollectionHandler):
pass
class RowElementHandler(ElementHandler):
"""API handler for table row elements.
"""
def _get_element_id(self, request):
m = self.path_re.match(request.path)
print 'groups', m.groups()
if m.groups():
return m.groups()[-1] #TODO: make robust
return None
class SimpleDataModel(object):
"""An in-memory data model.
"""
def __init__(self):
self.items = {}
def get_items(self):
"""Get items in model.
Returns: A dict of {id, item} for all items in model.
"""
return self.items
def add_item(self, item, id_=None):
"""Add item to model.
Args:
item: The item to add to the model.
id_: The ID of the item, or None if an ID should be generated
Returns:
The ID of the newly added item.
Raises:
KeyError: ID already exists.
"""
if id_ is None:
id_ = str(uuid.uuid4())
if id_ in self.items:
raise KeyError("Cannot create item with ID '%s': "
"ID already exists")
self.items[id_] = item
return id_
def get_item(self, id_):
"""Retrieve item with id id_ from model.
Args:
id_: The ID of the item to retrieve.
Returns:
The matching item or None if item with id_ does not exist.
"""
return self.items.get(id_)
def update_item(self, id_, item):
"""Update item with id_ with new data.
Args:
id_: The ID of the item to be updated.
item: The new item.
Returns:
The updated item.
Raises:
KeyError: Item with specified id_ not present.
"""
if id_ not in self.items:
raise KeyError("Cannot update item with ID '%s': "
"ID does not exist")
self.items[id_] = item
return item
def delete_item(self, id_):
"""Remove item from model.
Args:
id_: The ID of the item to be removed.
Returns:
The removed item.
Raises:
KeyError: Item with specified id_ not present.
"""
ret = self.items[id_]
del self.items[id_]
return ret
class PolicyDataModel(object):
"""An in-memory policy data model.
"""
def __init__(self):
self.rules = []
def get_item(self, id_):
return {'rules': self.rules}
def update_item(self, id_, item):
self.rules = item['rules']
return self.get_item(None)

106
src/server/wsgi.py Normal file
View File

@ -0,0 +1,106 @@
import errno
import socket
import sys
import time
import eventlet.wsgi
eventlet.patcher.monkey_patch(all=False, socket=True)
import webob.dec
import ovs.vlog
vlog = ovs.vlog.Vlog(__name__)
# Number of seconds to keep retrying to listen
RETRY_UNTIL_WINDOW = 30
# Sets the value of TCP_KEEPIDLE in seconds for each server socket.
TCP_KEEPIDLE = 600
# Number of backlog requests to configure the socket with
BACKLOG = 4096
class Server(object):
"""Server class to manage multiple WSGI sockets and applications."""
def __init__(self, name, threads=1000):
self.pool = eventlet.GreenPool(threads)
self.name = name
def _get_socket(self, host, port, backlog):
bind_addr = (host, port)
# TODO(dims): eventlet's green dns/socket module does not actually
# support IPv6 in getaddrinfo(). We need to get around this in the
# future or monitor upstream for a fix
try:
info = socket.getaddrinfo(bind_addr[0],
bind_addr[1],
socket.AF_UNSPEC,
socket.SOCK_STREAM)[0]
family = info[0]
bind_addr = info[-1]
except Exception:
vlog.exception(("Unable to listen on %(host)s:%(port)s") %
{'host': host, 'port': port})
sys.exit(1)
sock = None
retry_until = time.time() + RETRY_UNTIL_WINDOW
while not sock and time.time() < retry_until:
try:
sock = eventlet.listen(bind_addr,
backlog=backlog,
family=family)
except socket.error as err:
if err.errno != errno.EADDRINUSE:
raise
eventlet.sleep(0.1)
if not sock:
raise RuntimeError(("Could not bind to %(host)s:%(port)s "
"after trying for %(time)d seconds") %
{'host': host,
'port': port,
'time': RETRY_UNTIL_WINDOW})
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# sockets can hang around forever without keepalive
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if hasattr(socket, 'TCP_KEEPIDLE'):
sock.setsockopt(socket.IPPROTO_TCP,
socket.TCP_KEEPIDLE,
TCP_KEEPIDLE)
return sock
def start(self, application, port, host='0.0.0.0'):
"""Run a WSGI server with the given application."""
self._host = host
self._port = port
backlog = BACKLOG
self._socket = self._get_socket(self._host,
self._port,
backlog=backlog)
self._server = self.pool.spawn(self._run, application, self._socket)
@property
def host(self):
return self._socket.getsockname()[0] if self._socket else self._host
@property
def port(self):
return self._socket.getsockname()[1] if self._socket else self._port
def stop(self):
self._server.kill()
def wait(self):
"""Wait until all servers have completed running."""
try:
self.pool.waitall()
except KeyboardInterrupt:
pass
def _run(self, application, socket):
"""Start a WSGI server in a new green thread."""
eventlet.wsgi.server(socket, application, custom_pool=self.pool)

238
tests/functional/test_api.py Executable file
View File

@ -0,0 +1,238 @@
#!/usr/bin/python
#
# Copyright (c) 2013 VMware, Inc. All rights reserved.
#
import httplib
import json
import os
import socket
import subprocess
import time
import unittest
import uuid
FUNCTIONAL_TESTS_PATH = os.path.dirname(os.path.realpath(__file__))
TESTS_PATH = os.path.dirname(FUNCTIONAL_TESTS_PATH)
PROJECT_PATH = os.path.dirname(TESTS_PATH)
SRC_PATH = os.path.join(PROJECT_PATH, 'src')
class AbstractApiTest(unittest.TestCase):
API_SERVER_PATH = None # Subclass must override
API_SERVER_PORT = '8888'
API_SERVER_ADDR = '127.0.0.1'
API_SERVER_ARGS = ['--http_listen_port', API_SERVER_PORT,
'--http_listen_addr', API_SERVER_ADDR, '--verbose']
SERVER_STARTUP_WAIT_MS = 5000
SERVER_SHUTDOWN_WAIT_MS = 3000
@classmethod
def setUpClass(cls):
cls.server = subprocess.Popen(
[cls.API_SERVER_PATH] + cls.API_SERVER_ARGS)
hconn = httplib.HTTPConnection(cls.API_SERVER_ADDR,
cls.API_SERVER_PORT)
starttm = time.time() * 1000
exc = None
while (hconn.sock is None and
(time.time() * 1000 - starttm) < cls.SERVER_STARTUP_WAIT_MS):
time.sleep(0.1)
try:
hconn.connect()
except socket.error, e:
exc = e
if hconn.sock is None:
raise exc
@classmethod
def tearDownClass(cls):
cls.server.terminate()
starttm = time.time() * 1000
while (cls.server.poll() is None and
(time.time() * 1000 - starttm) < cls.SERVER_SHUTDOWN_WAIT_MS):
time.sleep(0.01)
if cls.server.poll() is None:
cls.server.kill()
def setUp(self):
self.hconn = httplib.HTTPConnection(self.API_SERVER_ADDR,
self.API_SERVER_PORT)
self.hconn.connect()
def tearDown(self):
self.hconn.close()
def check_response(self, response, description, status=httplib.OK,
content_type='text/plain'):
body = response.read()
self.assertTrue(response.status == status,
'%s response status == %s' %(description, status))
if content_type is not None:
if ';' in content_type:
self.assertTrue(
response.getheader('content-type') == content_type,
"'%s response Content-Type (with params) is '%s'"
% (description, content_type))
else:
ct_start = response.getheader('content-type').split(';', 1)[0]
self.assertTrue(ct_start == content_type,
"'%s response Content-Type (no params) is '%s'"
% (description, content_type))
return body
def check_json_response(self, response, description, status=httplib.OK):
raw_body = self.check_response(
response, description, status, 'application/json')
body = json.loads(raw_body)
return body
class TestTablesApi(AbstractApiTest):
API_SERVER_PATH = os.path.join(SRC_PATH, 'server', 'server.py')
STATIC_TABLES = ['ad-groups']
def test_tables(self):
"""Test table list API method."""
# List (empty)
self.hconn.request('GET', '/tables')
r = self.hconn.getresponse()
body = self.check_json_response(r, 'List tables (empty)')
self.assertIsInstance(body, list, 'List tables (empty) returns a list')
self.assertTrue(len(body) == len(self.STATIC_TABLES),
'List tables (empty) contains default tables only')
# Create
ids = []
for table in ['table1', 'table2', 'table3']:
self.hconn.request('POST', '/tables', '{"%s": "foo"}' % table)
r = self.hconn.getresponse()
body = self.check_json_response(r, 'Create table',
status=httplib.CREATED)
self.assertIsInstance(body, dict, 'Create table returns a dict')
#TODO: validate object
#TODO: validate location header
ids.append(body['id'])
self.hconn.request('GET', '/tables')
r = self.hconn.getresponse()
body = self.check_json_response(r, 'List tables')
self.assertIsInstance(body, list, 'List tables returns a list')
self.assertTrue(len(body) == 3 + len(self.STATIC_TABLES),
'List contains proper number of results')
# Create Named
self.hconn.request('PUT', '/tables/foo_id', '{"%s": "foo"}' % table)
r = self.hconn.getresponse()
body = self.check_json_response(r, 'Create named table',
status=httplib.CREATED)
self.assertIsInstance(body, dict, 'Create table returns a dict')
self.assertTrue(body['id'] == 'foo_id',
'Created table has specified ID')
# List
self.hconn.request('GET', '/tables')
r = self.hconn.getresponse()
body = self.check_json_response(r, 'List tables')
self.assertIsInstance(body, list, 'List tables returns a list')
self.assertTrue(len(body) == 4 + len(self.STATIC_TABLES),
'List contains proper # results (after create)')
# Read
self.hconn.request('GET', '/tables/%s' % ids[0])
r = self.hconn.getresponse()
body = self.check_json_response(r, 'Read table')
self.assertIsInstance(body, dict, 'Read table returns a dict')
self.assertEqual(body['id'], ids[0], 'Read expected table instance')
self.assertTrue('table1' in body, 'Read expected table instance data')
# Read Invalid
self.hconn.request('GET', '/tables/%s' % uuid.uuid4())
r = self.hconn.getresponse()
body = self.check_json_response(r, 'Read missing table',
status=httplib.NOT_FOUND)
# Replace
new = json.loads('{"id": "%s", "table4": "bar"}' % ids[0])
self.hconn.request('PUT', '/tables/%s' % ids[0], json.dumps(new))
r = self.hconn.getresponse()
body = self.check_json_response(r, 'Replace table')
self.assertEqual(body, new, 'Replaced table returns new data')
self.hconn.request('GET', '/tables/%s' % ids[0])
r = self.hconn.getresponse()
body = self.check_json_response(r, 'Read replaced table')
self.assertEqual(body, new, 'GET replaced table returns new data')
# Update
self.hconn.request('GET', '/tables/%s' % ids[1])
r = self.hconn.getresponse()
old_body = self.check_json_response(r, 'Read old table')
new = json.loads('{"newkey": "baz"}')
self.hconn.request('PATCH', '/tables/%s' % ids[1], json.dumps(new))
r = self.hconn.getresponse()
body = self.check_json_response(r, 'Update table')
expected = old_body.copy()
expected.update(new)
self.assertEqual(body, expected, 'Updated table returns new data')
self.hconn.request('GET', '/tables/%s' % ids[1])
r = self.hconn.getresponse()
body = self.check_json_response(r, 'Read updated table')
self.assertEqual(body, expected, 'GET replaced table returns new data')
# Delete
self.hconn.request('DELETE', '/tables/%s' % ids[1])
r = self.hconn.getresponse()
body = self.check_json_response(r, 'Delete table')
self.hconn.request('DELETE', '/tables/%s' % ids[1])
r = self.hconn.getresponse()
body = self.check_json_response(r, 'Delete missing table',
status=httplib.NOT_FOUND)
#TODO: validate objects
class TestPolicyApi(AbstractApiTest):
API_SERVER_PATH = os.path.join(SRC_PATH, 'server', 'server.py')
def test_policy(self):
"""Test table list API method."""
# POST
self.hconn.request('POST', '/policy')
r = self.hconn.getresponse()
body = self.check_response(r, 'POST policy',
status=httplib.NOT_IMPLEMENTED,
content_type=None)
empty_policy = {'rules': []}
# Get (empty)
self.hconn.request('GET', '/policy')
r = self.hconn.getresponse()
body = self.check_json_response(r, 'Get policy (empty)')
self.assertEqual(body, empty_policy,
'Get policy (empty) returns empty ruleset')
# Update
fake_policy = {'rules': ["foo", "bar"]}
self.hconn.request('PUT', '/policy', json.dumps(fake_policy))
r = self.hconn.getresponse()
body = self.check_json_response(r, 'Update policy')
self.assertEqual(body, fake_policy, 'Update policy returns new policy')
# Get
self.hconn.request('GET', '/policy')
r = self.hconn.getresponse()
body = self.check_json_response(r, 'Get policy')
self.assertEqual(body, fake_policy, 'Get policy returns new policy')
# Delete
self.hconn.request('DELETE', '/policy')
r = self.hconn.getresponse()
body = self.check_response(r, 'DELETE policy',
status=httplib.NOT_IMPLEMENTED,
content_type=None)
if __name__ == '__main__':
unittest.main(verbosity=2)