# vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2010-2011 OpenStack, LLC # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """Stubouts, mocks and fixtures for the test suite""" import datetime import httplib import operator import os import shutil import StringIO import sys import stubout import webob import glance.common.client from glance.common import exception from glance.registry import server as rserver from glance.api import v1 as server import glance.store import glance.store.filesystem import glance.store.http import glance.registry.db.api FAKE_FILESYSTEM_ROOTDIR = os.path.join('/tmp', 'glance-tests') VERBOSE = False DEBUG = False def stub_out_http_backend(stubs): """ Stubs out the httplib.HTTPRequest.getresponse to return faked-out data instead of grabbing actual contents of a resource The stubbed getresponse() returns an iterator over the data "I am a teapot, short and stout\n" :param stubs: Set of stubout stubs """ class FakeHTTPConnection(object): DATA = 'I am a teapot, short and stout\n' def getresponse(self): return StringIO.StringIO(self.DATA) def request(self, *_args, **_kwargs): pass fake_http_conn = FakeHTTPConnection() stubs.Set(httplib.HTTPConnection, 'request', fake_http_conn.request) stubs.Set(httplib.HTTPSConnection, 'request', fake_http_conn.request) stubs.Set(httplib.HTTPConnection, 'getresponse', fake_http_conn.getresponse) stubs.Set(httplib.HTTPSConnection, 'getresponse', fake_http_conn.getresponse) def clean_out_fake_filesystem_backend(): """ Removes any leftover directories used in fake filesystem backend """ if os.path.exists(FAKE_FILESYSTEM_ROOTDIR): shutil.rmtree(FAKE_FILESYSTEM_ROOTDIR, ignore_errors=True) def stub_out_filesystem_backend(): """ Stubs out the Filesystem Glance service to return fake pped image data from files. We establish a few fake images in a directory under //tmp/glance-tests and ensure that this directory contains the following files: //tmp/glance-tests/2 <-- file containing "chunk00000remainder" The stubbed service yields the data in the above files. """ # Establish a clean faked filesystem with dummy images if os.path.exists(FAKE_FILESYSTEM_ROOTDIR): shutil.rmtree(FAKE_FILESYSTEM_ROOTDIR, ignore_errors=True) os.mkdir(FAKE_FILESYSTEM_ROOTDIR) f = open(os.path.join(FAKE_FILESYSTEM_ROOTDIR, '2'), "wb") f.write("chunk00000remainder") f.close() def stub_out_s3_backend(stubs): """ Stubs out the S3 Backend with fake data and calls. The stubbed s3 backend provides back an iterator over the data "" :param stubs: Set of stubout stubs """ class FakeSwiftAuth(object): pass class FakeS3Connection(object): pass class FakeS3Backend(object): CHUNK_SIZE = 2 DATA = 'I am a teapot, short and stout\n' @classmethod def get(cls, location, expected_size, conn_class=None): S3Backend = glance.store.s3.S3Backend def chunk_it(): for i in xrange(0, len(cls.DATA), cls.CHUNK_SIZE): yield cls.DATA[i:i + cls.CHUNK_SIZE] return chunk_it() fake_s3_backend = FakeS3Backend() stubs.Set(glance.store.s3.S3Backend, 'get', fake_s3_backend.get) def stub_out_registry_and_store_server(stubs): """ Mocks calls to 127.0.0.1 on 9191 and 9292 for testing so that a real Glance server does not need to be up and running """ class FakeRegistryConnection(object): def __init__(self, *args, **kwargs): pass def connect(self): return True def close(self): return True def request(self, method, url, body=None, headers={}): self.req = webob.Request.blank("/" + url.lstrip("/")) self.req.method = method if headers: self.req.headers = headers if body: self.req.body = body def getresponse(self): sql_connection = os.environ.get('GLANCE_SQL_CONNECTION', "sqlite://") options = {'sql_connection': sql_connection, 'verbose': VERBOSE, 'debug': DEBUG} res = self.req.get_response(rserver.API(options)) # httplib.Response has a read() method...fake it out def fake_reader(): return res.body setattr(res, 'read', fake_reader) return res class FakeGlanceConnection(object): def __init__(self, *args, **kwargs): pass def connect(self): return True def close(self): return True def putrequest(self, method, url): self.req = webob.Request.blank("/" + url.lstrip("/")) self.req.method = method def putheader(self, key, value): self.req.headers[key] = value def endheaders(self): pass def send(self, data): # send() is called during chunked-transfer encoding, and # data is of the form %x\r\n%s\r\n. Strip off the %x and # only write the actual data in tests. self.req.body += data.split("\r\n")[1] def request(self, method, url, body=None, headers={}): self.req = webob.Request.blank("/" + url.lstrip("/")) self.req.method = method if headers: self.req.headers = headers if body: self.req.body = body def getresponse(self): options = {'verbose': VERBOSE, 'debug': DEBUG, 'registry_host': '0.0.0.0', 'registry_port': '9191', 'default_store': 'file', 'filesystem_store_datadir': FAKE_FILESYSTEM_ROOTDIR} res = self.req.get_response(server.API(options)) # httplib.Response has a read() method...fake it out def fake_reader(): return res.body setattr(res, 'read', fake_reader) return res def fake_get_connection_type(client): """ Returns the proper connection type """ DEFAULT_REGISTRY_PORT = 9191 DEFAULT_API_PORT = 9292 if (client.port == DEFAULT_API_PORT and client.host == '0.0.0.0'): return FakeGlanceConnection elif (client.port == DEFAULT_REGISTRY_PORT and client.host == '0.0.0.0'): return FakeRegistryConnection def fake_image_iter(self): for i in self.response.app_iter: yield i stubs.Set(glance.common.client.BaseClient, 'get_connection_type', fake_get_connection_type) stubs.Set(glance.common.client.ImageBodyIterator, '__iter__', fake_image_iter) def stub_out_registry_db_image_api(stubs): """ Stubs out the database set/fetch API calls for Registry so the calls are routed to an in-memory dict. This helps us avoid having to manually clear or flush the SQLite database. The "datastore" always starts with this set of image fixtures. :param stubs: Set of stubout stubs """ class FakeDatastore(object): FIXTURES = [ {'id': 1, 'name': 'fake image #1', 'status': 'active', 'disk_format': 'ami', 'container_format': 'ami', 'is_public': False, 'created_at': datetime.datetime.utcnow(), 'updated_at': datetime.datetime.utcnow(), 'deleted_at': None, 'deleted': False, 'checksum': None, 'size': 13, 'location': "swift://user:passwd@acct/container/obj.tar.0", 'properties': [{'name': 'type', 'value': 'kernel', 'deleted': False}]}, {'id': 2, 'name': 'fake image #2', 'status': 'active', 'disk_format': 'vhd', 'container_format': 'ovf', 'is_public': True, 'created_at': datetime.datetime.utcnow(), 'updated_at': datetime.datetime.utcnow(), 'deleted_at': None, 'deleted': False, 'checksum': None, 'size': 19, 'location': "file:///tmp/glance-tests/2", 'properties': []}] def __init__(self): self.images = FakeDatastore.FIXTURES self.deleted_images = [] self.next_id = 3 def image_create(self, _context, values): values['id'] = values.get('id', self.next_id) if values['id'] in [image['id'] for image in self.images]: raise exception.Duplicate("Duplicate image id: %s" % values['id']) glance.registry.db.api.validate_image(values) values['size'] = values.get('size', 0) values['checksum'] = values.get('checksum') values['deleted'] = False values['properties'] = values.get('properties', {}) values['location'] = values.get('location') now = datetime.datetime.utcnow() values['created_at'] = values.get('created_at', now) values['updated_at'] = values.get('updated_at', now) values['deleted_at'] = None props = [] if 'properties' in values.keys(): for k, v in values['properties'].items(): p = {} p['name'] = k p['value'] = v p['deleted'] = False p['created_at'] = now p['updated_at'] = now p['deleted_at'] = None props.append(p) values['properties'] = props self.next_id += 1 self.images.append(values) return values def image_update(self, _context, image_id, values, purge_props=False): image = self.image_get(_context, image_id) copy_image = image.copy() copy_image.update(values) glance.registry.db.api.validate_image(copy_image) props = [] orig_properties = image['properties'] if purge_props == False: if 'properties' in values.keys(): for k, v in values['properties'].items(): p = {} p['name'] = k p['value'] = v p['deleted'] = False p['created_at'] = datetime.datetime.utcnow() p['updated_at'] = datetime.datetime.utcnow() p['deleted_at'] = None props.append(p) orig_properties = orig_properties + props values['properties'] = orig_properties image.update(values) return image def image_destroy(self, _context, image_id): image = self.image_get(_context, image_id) self.images.remove(image) image['deleted_at'] = datetime.datetime.utcnow() self.deleted_images.append(image) def image_get(self, _context, image_id): images = [i for i in self.images if str(i['id']) == str(image_id)] if len(images) != 1 or images[0]['deleted']: raise exception.NotFound("No model for id %s %s" % (image_id, str(self.images))) else: return images[0] def image_get_all_pending_delete(self, _context, delete_time=None, limit=None): images = [f for f in self.deleted_images \ if f['status'] == 'pending_delete' and \ f['deleted_at'] <= delete_time] return images def image_get_all(self, _context, filters=None, marker=None, limit=1000, sort_key=None, sort_dir=None): images = self.images if 'size_min' in filters: size_min = int(filters.pop('size_min')) images = [f for f in images if int(f['size']) >= size_min] if 'size_max' in filters: size_max = int(filters.pop('size_max')) images = [f for f in images if int(f['size']) <= size_max] def _prop_filter(key, value): def _func(image): for prop in image['properties']: if prop['name'] == key: return prop['value'] == value return False return _func for k, v in filters.pop('properties', {}).items(): images = filter(_prop_filter(k, v), images) for k, v in filters.items(): if v is not None: images = [f for f in images if f[k] == v] # sorted func expects func that compares in descending order def image_cmp(x, y): _sort_dir = sort_dir or 'desc' multiplier = { 'asc': -1, 'desc': 1, }[_sort_dir] _sort_key = sort_key or 'created_at' if x[_sort_key] > y[_sort_key]: return 1 * multiplier elif x[_sort_key] == y[_sort_key]: if x['id'] > y['id']: return 1 * multiplier else: return -1 * multiplier else: return -1 * multiplier images = sorted(images, cmp=image_cmp) images.reverse() if marker == None: start_index = 0 else: start_index = -1 for i, image in enumerate(images): if image['id'] == marker: start_index = i + 1 break if start_index == -1: raise exception.NotFound(marker) return images[start_index:start_index + limit] fake_datastore = FakeDatastore() stubs.Set(glance.registry.db.api, 'image_create', fake_datastore.image_create) stubs.Set(glance.registry.db.api, 'image_update', fake_datastore.image_update) stubs.Set(glance.registry.db.api, 'image_destroy', fake_datastore.image_destroy) stubs.Set(glance.registry.db.api, 'image_get', fake_datastore.image_get) stubs.Set(glance.registry.db.api, 'image_get_all_pending_delete', fake_datastore.image_get_all_pending_delete) stubs.Set(glance.registry.db.api, 'image_get_all', fake_datastore.image_get_all)