Some pylink and pep8 cleanups. Added a pylintrc file.

This commit is contained in:
Eric Day 2010-08-06 23:47:15 +00:00 committed by Tarmac
commit 9586d648f4
2 changed files with 144 additions and 103 deletions

View File

@ -16,6 +16,10 @@
# License for the specific language governing permissions and limitations
# under the License.
"""
Unittets for S3 objectstore clone.
"""
import boto
import glob
import hashlib
@ -24,76 +28,69 @@ import os
import shutil
import tempfile
from nova import flags
from nova import objectstore
from nova.objectstore import bucket # for buckets_path flag
from nova.objectstore import image # for images_path flag
from nova import test
from nova.auth import manager
from nova.objectstore.handler import S3
from nova.exception import NotEmpty, NotFound, NotAuthorized
from boto.s3.connection import S3Connection, OrdinaryCallingFormat
from twisted.internet import reactor, threads, defer
from twisted.web import http, server
from nova import flags
from nova import objectstore
from nova import test
from nova.auth import manager
from nova.exception import NotEmpty, NotFound
from nova.objectstore import image
from nova.objectstore.handler import S3
FLAGS = flags.FLAGS
oss_tempdir = tempfile.mkdtemp(prefix='test_oss-')
# Create a unique temporary directory. We don't delete after test to
# allow checking the contents after running tests. Users and/or tools
# running the tests need to remove the tests directories.
OSS_TEMPDIR = tempfile.mkdtemp(prefix='test_oss-')
# Create bucket/images path
os.makedirs(os.path.join(OSS_TEMPDIR, 'images'))
os.makedirs(os.path.join(OSS_TEMPDIR, 'buckets'))
# delete tempdirs from previous runs (we don't delete after test to allow
# checking the contents after running tests)
# TODO: This fails on the test box with a permission denied error
# Also, doing these things in a global tempdir means that different runs of
# the test suite on the same box could clobber each other.
#for path in glob.glob(os.path.abspath(os.path.join(oss_tempdir, '../test_oss-*'))):
# if path != oss_tempdir:
# shutil.rmtree(path)
# create bucket/images path
os.makedirs(os.path.join(oss_tempdir, 'images'))
os.makedirs(os.path.join(oss_tempdir, 'buckets'))
class ObjectStoreTestCase(test.BaseTestCase):
def setUp(self):
"""Test objectstore API directly."""
def setUp(self): # pylint: disable-msg=C0103
"""Setup users and projects."""
super(ObjectStoreTestCase, self).setUp()
self.flags(buckets_path=os.path.join(oss_tempdir, 'buckets'),
images_path=os.path.join(oss_tempdir, 'images'),
self.flags(buckets_path=os.path.join(OSS_TEMPDIR, 'buckets'),
images_path=os.path.join(OSS_TEMPDIR, 'images'),
ca_path=os.path.join(os.path.dirname(__file__), 'CA'))
logging.getLogger().setLevel(logging.DEBUG)
self.um = manager.AuthManager()
try:
self.um.create_user('user1')
except: pass
try:
self.um.create_user('user2')
except: pass
try:
self.um.create_user('admin_user', admin=True)
except: pass
try:
self.um.create_project('proj1', 'user1', 'a proj', ['user1'])
except: pass
try:
self.um.create_project('proj2', 'user2', 'a proj', ['user2'])
except: pass
class Context(object): pass
self.auth_manager = manager.AuthManager()
self.auth_manager.create_user('user1')
self.auth_manager.create_user('user2')
self.auth_manager.create_user('admin_user', admin=True)
self.auth_manager.create_project('proj1', 'user1', 'a proj', ['user1'])
self.auth_manager.create_project('proj2', 'user2', 'a proj', ['user2'])
class Context(object):
"""Dummy context for running tests."""
user = None
project = None
self.context = Context()
def tearDown(self):
self.um.delete_project('proj1')
self.um.delete_project('proj2')
self.um.delete_user('user1')
self.um.delete_user('user2')
self.um.delete_user('admin_user')
def tearDown(self): # pylint: disable-msg=C0103
"""Tear down users and projects."""
self.auth_manager.delete_project('proj1')
self.auth_manager.delete_project('proj2')
self.auth_manager.delete_user('user1')
self.auth_manager.delete_user('user2')
self.auth_manager.delete_user('admin_user')
super(ObjectStoreTestCase, self).tearDown()
def test_buckets(self):
self.context.user = self.um.get_user('user1')
self.context.project = self.um.get_project('proj1')
"""Test the bucket API."""
self.context.user = self.auth_manager.get_user('user1')
self.context.project = self.auth_manager.get_project('proj1')
objectstore.bucket.Bucket.create('new_bucket', self.context)
bucket = objectstore.bucket.Bucket('new_bucket')
@ -101,12 +98,12 @@ class ObjectStoreTestCase(test.BaseTestCase):
self.assert_(bucket.is_authorized(self.context))
# another user is not authorized
self.context.user = self.um.get_user('user2')
self.context.project = self.um.get_project('proj2')
self.context.user = self.auth_manager.get_user('user2')
self.context.project = self.auth_manager.get_project('proj2')
self.assertFalse(bucket.is_authorized(self.context))
# admin is authorized to use bucket
self.context.user = self.um.get_user('admin_user')
self.context.user = self.auth_manager.get_user('admin_user')
self.context.project = None
self.assertTrue(bucket.is_authorized(self.context))
@ -136,8 +133,9 @@ class ObjectStoreTestCase(test.BaseTestCase):
self.assertRaises(NotFound, objectstore.bucket.Bucket, 'new_bucket')
def test_images(self):
self.context.user = self.um.get_user('user1')
self.context.project = self.um.get_project('proj1')
"Test the image API."
self.context.user = self.auth_manager.get_user('user1')
self.context.project = self.auth_manager.get_project('proj1')
# create a bucket for our bundle
objectstore.bucket.Bucket.create('image_bucket', self.context)
@ -149,10 +147,12 @@ class ObjectStoreTestCase(test.BaseTestCase):
bucket[os.path.basename(path)] = open(path, 'rb').read()
# register an image
objectstore.image.Image.register_aws_image('i-testing', 'image_bucket/1mb.manifest.xml', self.context)
image.Image.register_aws_image('i-testing',
'image_bucket/1mb.manifest.xml',
self.context)
# verify image
my_img = objectstore.image.Image('i-testing')
my_img = image.Image('i-testing')
result_image_file = os.path.join(my_img.path, 'image')
self.assertEqual(os.stat(result_image_file).st_size, 1048576)
@ -160,38 +160,48 @@ class ObjectStoreTestCase(test.BaseTestCase):
self.assertEqual(sha, '3b71f43ff30f4b15b5cd85dd9e95ebc7e84eb5a3')
# verify image permissions
self.context.user = self.um.get_user('user2')
self.context.project = self.um.get_project('proj2')
self.context.user = self.auth_manager.get_user('user2')
self.context.project = self.auth_manager.get_project('proj2')
self.assertFalse(my_img.is_authorized(self.context))
class TestHTTPChannel(http.HTTPChannel):
# Otherwise we end up with an unclean reactor
def checkPersistence(self, _, __):
"""Dummy site required for twisted.web"""
def checkPersistence(self, _, __): # pylint: disable-msg=C0103
"""Otherwise we end up with an unclean reactor."""
return False
class TestSite(server.Site):
"""Dummy site required for twisted.web"""
protocol = TestHTTPChannel
class S3APITestCase(test.TrialTestCase):
def setUp(self):
"""Test objectstore through S3 API."""
def setUp(self): # pylint: disable-msg=C0103
"""Setup users, projects, and start a test server."""
super(S3APITestCase, self).setUp()
FLAGS.auth_driver='nova.auth.ldapdriver.FakeLdapDriver',
FLAGS.buckets_path = os.path.join(oss_tempdir, 'buckets')
FLAGS.auth_driver = 'nova.auth.ldapdriver.FakeLdapDriver',
FLAGS.buckets_path = os.path.join(OSS_TEMPDIR, 'buckets')
self.um = manager.AuthManager()
self.admin_user = self.um.create_user('admin', admin=True)
self.admin_project = self.um.create_project('admin', self.admin_user)
self.auth_manager = manager.AuthManager()
self.admin_user = self.auth_manager.create_user('admin', admin=True)
self.admin_project = self.auth_manager.create_project('admin',
self.admin_user)
shutil.rmtree(FLAGS.buckets_path)
os.mkdir(FLAGS.buckets_path)
root = S3()
self.site = TestSite(root)
self.listening_port = reactor.listenTCP(0, self.site, interface='127.0.0.1')
# pylint: disable-msg=E1101
self.listening_port = reactor.listenTCP(0, self.site,
interface='127.0.0.1')
# pylint: enable-msg=E1101
self.tcp_port = self.listening_port.getHost().port
@ -205,65 +215,90 @@ class S3APITestCase(test.TrialTestCase):
is_secure=False,
calling_format=OrdinaryCallingFormat())
# Don't attempt to reuse connections
def get_http_connection(host, is_secure):
"""Get a new S3 connection, don't attempt to reuse connections."""
return self.conn.new_http_connection(host, is_secure)
self.conn.get_http_connection = get_http_connection
def _ensure_empty_list(self, l):
self.assertEquals(len(l), 0, "List was not empty")
def _ensure_no_buckets(self, buckets): # pylint: disable-msg=C0111
self.assertEquals(len(buckets), 0, "Bucket list was not empty")
return True
def _ensure_only_bucket(self, l, name):
self.assertEquals(len(l), 1, "List didn't have exactly one element in it")
self.assertEquals(l[0].name, name, "Wrong name")
def _ensure_one_bucket(self, buckets, name): # pylint: disable-msg=C0111
self.assertEquals(len(buckets), 1,
"Bucket list didn't have exactly one element in it")
self.assertEquals(buckets[0].name, name, "Wrong name")
return True
def test_000_list_buckets(self):
d = threads.deferToThread(self.conn.get_all_buckets)
d.addCallback(self._ensure_empty_list)
return d
"""Make sure we are starting with no buckets."""
deferred = threads.deferToThread(self.conn.get_all_buckets)
deferred.addCallback(self._ensure_no_buckets)
return deferred
def test_001_create_and_delete_bucket(self):
"""Test bucket creation and deletion."""
bucket_name = 'testbucket'
d = threads.deferToThread(self.conn.create_bucket, bucket_name)
d.addCallback(lambda _:threads.deferToThread(self.conn.get_all_buckets))
deferred = threads.deferToThread(self.conn.create_bucket, bucket_name)
deferred.addCallback(lambda _:
threads.deferToThread(self.conn.get_all_buckets))
def ensure_only_bucket(l, name):
self.assertEquals(len(l), 1, "List didn't have exactly one element in it")
self.assertEquals(l[0].name, name, "Wrong name")
d.addCallback(ensure_only_bucket, bucket_name)
deferred.addCallback(self._ensure_one_bucket, bucket_name)
d.addCallback(lambda _:threads.deferToThread(self.conn.delete_bucket, bucket_name))
d.addCallback(lambda _:threads.deferToThread(self.conn.get_all_buckets))
d.addCallback(self._ensure_empty_list)
return d
deferred.addCallback(lambda _:
threads.deferToThread(self.conn.delete_bucket,
bucket_name))
deferred.addCallback(lambda _:
threads.deferToThread(self.conn.get_all_buckets))
deferred.addCallback(self._ensure_no_buckets)
return deferred
def test_002_create_bucket_and_key_and_delete_key_again(self):
"""Test key operations on buckets."""
bucket_name = 'testbucket'
key_name = 'somekey'
key_contents = 'somekey'
d = threads.deferToThread(self.conn.create_bucket, bucket_name)
d.addCallback(lambda b:threads.deferToThread(b.new_key, key_name))
d.addCallback(lambda k:threads.deferToThread(k.set_contents_from_string, key_contents))
deferred = threads.deferToThread(self.conn.create_bucket, bucket_name)
deferred.addCallback(lambda b:
threads.deferToThread(b.new_key, key_name))
deferred.addCallback(lambda k:
threads.deferToThread(k.set_contents_from_string,
key_contents))
def ensure_key_contents(bucket_name, key_name, contents):
"""Verify contents for a key in the given bucket."""
bucket = self.conn.get_bucket(bucket_name)
key = bucket.get_key(key_name)
self.assertEquals(key.get_contents_as_string(), contents, "Bad contents")
d.addCallback(lambda _:threads.deferToThread(ensure_key_contents, bucket_name, key_name, key_contents))
self.assertEquals(key.get_contents_as_string(), contents,
"Bad contents")
deferred.addCallback(lambda _:
threads.deferToThread(ensure_key_contents,
bucket_name, key_name,
key_contents))
def delete_key(bucket_name, key_name):
"""Delete a key for the given bucket."""
bucket = self.conn.get_bucket(bucket_name)
key = bucket.get_key(key_name)
key.delete()
d.addCallback(lambda _:threads.deferToThread(delete_key, bucket_name, key_name))
d.addCallback(lambda _:threads.deferToThread(self.conn.get_bucket, bucket_name))
d.addCallback(lambda b:threads.deferToThread(b.get_all_keys))
d.addCallback(self._ensure_empty_list)
return d
def tearDown(self):
self.um.delete_user('admin')
self.um.delete_project('admin')
return defer.DeferredList([defer.maybeDeferred(self.listening_port.stopListening)])
super(S3APITestCase, self).tearDown()
deferred.addCallback(lambda _:
threads.deferToThread(delete_key, bucket_name,
key_name))
deferred.addCallback(lambda _:
threads.deferToThread(self.conn.get_bucket,
bucket_name))
deferred.addCallback(lambda b: threads.deferToThread(b.get_all_keys))
deferred.addCallback(self._ensure_no_buckets)
return deferred
def tearDown(self): # pylint: disable-msg=C0103
"""Tear down auth and test server."""
self.auth_manager.delete_user('admin')
self.auth_manager.delete_project('admin')
stop_listening = defer.maybeDeferred(self.listening_port.stopListening)
return defer.DeferredList([stop_listening])

6
pylintrc Normal file
View File

@ -0,0 +1,6 @@
[Basic]
method-rgx=[a-z_][a-z0-9_]{2,50}$
[Design]
max-public-methods=100
min-public-methods=0