Add unittests and cleanup codes
This patch works around StorletGatewayDocker.invocation_flow and RnTimePaths. To assert *REAL* behavior, this patch adds a few unittests to run in real os dir. This patch also add an unit test to assert the communication of docker container (basically wrap up the response from the container with mock) and check the real handling. This could be the first step to emurate real environment in *UNIT TEST* rather than functional tests for easy reproducing and finding real bugs. In the future work, we could add a temporary server process communicating with DockerGateway instance via fds. Change-Id: I5fbd7cd415053b7913310973f71ad5a7ab48faa0
This commit is contained in:
@@ -20,9 +20,9 @@ from storlet_gateway.common.exceptions import StorletTimeout
|
||||
|
||||
|
||||
class FileDescriptorIterator(object):
|
||||
def __init__(self, obj_data, timeout, cancel_func):
|
||||
def __init__(self, fd, timeout, cancel_func):
|
||||
self.closed = False
|
||||
self.obj_data = obj_data
|
||||
self.fd = fd
|
||||
self.timeout = timeout
|
||||
self.cancel_func = cancel_func
|
||||
self.buf = b''
|
||||
@@ -33,7 +33,7 @@ class FileDescriptorIterator(object):
|
||||
def read_with_timeout(self, size):
|
||||
try:
|
||||
with StorletTimeout(self.timeout):
|
||||
chunk = os.read(self.obj_data, size)
|
||||
chunk = os.read(self.fd, size)
|
||||
except StorletTimeout:
|
||||
if self.cancel_func:
|
||||
self.cancel_func()
|
||||
@@ -46,11 +46,11 @@ class FileDescriptorIterator(object):
|
||||
|
||||
def next(self, size=64 * 1024):
|
||||
if len(self.buf) < size:
|
||||
r, w, e = select.select([self.obj_data], [], [], self.timeout)
|
||||
r, w, e = select.select([self.fd], [], [], self.timeout)
|
||||
if len(r) == 0:
|
||||
self.close()
|
||||
|
||||
if self.obj_data in r:
|
||||
if self.fd in r:
|
||||
self.buf += self.read_with_timeout(size - len(self.buf))
|
||||
if self.buf == b'':
|
||||
raise StopIteration('Stopped iterator ex')
|
||||
@@ -119,7 +119,7 @@ class FileDescriptorIterator(object):
|
||||
def close(self):
|
||||
if self.closed:
|
||||
return
|
||||
os.close(self.obj_data)
|
||||
os.close(self.fd)
|
||||
self.closed = True
|
||||
|
||||
def __del__(self):
|
||||
|
||||
@@ -88,6 +88,13 @@ class StorletGatewayDocker(StorletGatewayBase):
|
||||
request_class = DockerStorletRequest
|
||||
|
||||
def __init__(self, sconf, logger, scope):
|
||||
"""
|
||||
:param sconf: a dict for storlets conf
|
||||
:param logger: a logger instance
|
||||
:param scope: a string for sandbox path basically consists of
|
||||
PREFIX_projectid comming from swift request path
|
||||
(e.g. AUTH_<project id>)
|
||||
"""
|
||||
super(StorletGatewayDocker, self).__init__(sconf, logger, scope)
|
||||
# TODO(eranr): Add sconf defaults, and get rid of validate_conf below
|
||||
self.storlet_timeout = int(self.sconf['storlet_timeout'])
|
||||
@@ -169,7 +176,7 @@ class StorletGatewayDocker(StorletGatewayBase):
|
||||
"""
|
||||
Auxiliary function that:
|
||||
|
||||
(1) Brings from Swift obj_name, whether this is a
|
||||
(1) Brings from Swift obj_name, either this is in a
|
||||
storlet or a storlet dependency.
|
||||
(2) Copies from local cache into the Docker conrainer
|
||||
If this is a Storlet then also validates that the cache is updated
|
||||
|
||||
@@ -118,6 +118,7 @@ class RunTimePaths(object):
|
||||
|
||||
def __init__(self, scope, conf):
|
||||
self.scope = scope
|
||||
self.reseller_prefix = conf.get('reseller_prefix', 'AUTH')
|
||||
self.factory_pipe_suffix = 'factory_pipe'
|
||||
self.sandbox_pipe_prefix = '/mnt/channels'
|
||||
self.storlet_pipe_suffix = '_storlet_pipe'
|
||||
@@ -214,7 +215,7 @@ class RunTimeSandbox(object):
|
||||
self.sandbox_wait_timeout = \
|
||||
int(conf.get('restart_linux_container_timeout', 3))
|
||||
|
||||
self.docker_repo = conf['docker_repo']
|
||||
self.docker_repo = conf.get('docker_repo', 'localhost:5001')
|
||||
self.docker_image_name_prefix = 'tenant'
|
||||
|
||||
# TODO(should come from upper layer Storlet metadata)
|
||||
@@ -491,6 +492,37 @@ class RemoteFDMetadata(object):
|
||||
|
||||
|
||||
class StorletInvocationProtocol(object):
|
||||
"""
|
||||
StorletInvocationProtocol class
|
||||
|
||||
This class serves communictaion with a Docker container to run an
|
||||
application
|
||||
|
||||
:param srequest: StorletRequest instance
|
||||
:param storlet_pipe_path:
|
||||
:param storlet_logger_path:
|
||||
:param timeout:
|
||||
"""
|
||||
def __init__(self, srequest, storlet_pipe_path, storlet_logger_path,
|
||||
timeout):
|
||||
self.srequest = srequest
|
||||
self.storlet_pipe_path = storlet_pipe_path
|
||||
self.storlet_logger_path = storlet_logger_path
|
||||
self.storlet_logger = StorletLogger(self.storlet_logger_path,
|
||||
'storlet_invoke')
|
||||
self.timeout = timeout
|
||||
|
||||
# local side file descriptors
|
||||
self.data_read_fd = None
|
||||
self.data_write_fd = None
|
||||
self.metadata_read_fd = None
|
||||
self.metadata_write_fd = None
|
||||
self.execution_str_read_fd = None
|
||||
self.execution_str_write_fd = None
|
||||
self.task_id = None
|
||||
|
||||
if not os.path.exists(storlet_logger_path):
|
||||
os.makedirs(storlet_logger_path)
|
||||
|
||||
@property
|
||||
def input_data_read_fd(self):
|
||||
@@ -595,30 +627,10 @@ class StorletInvocationProtocol(object):
|
||||
raise StorletRuntimeException("Failed to send execute command")
|
||||
|
||||
self._wait_for_read_with_timeout(self.execution_str_read_fd)
|
||||
# TODO(kota_): need an assertion for task_id format
|
||||
self.task_id = os.read(self.execution_str_read_fd, 10)
|
||||
os.close(self.execution_str_read_fd)
|
||||
|
||||
def __init__(self, srequest, storlet_pipe_path, storlet_logger_path,
|
||||
timeout):
|
||||
self.srequest = srequest
|
||||
self.storlet_pipe_path = storlet_pipe_path
|
||||
self.storlet_logger_path = storlet_logger_path
|
||||
self.storlet_logger = StorletLogger(self.storlet_logger_path,
|
||||
'storlet_invoke')
|
||||
self.timeout = timeout
|
||||
|
||||
# local side file descriptors
|
||||
self.data_read_fd = None
|
||||
self.data_write_fd = None
|
||||
self.metadata_read_fd = None
|
||||
self.metadata_write_fd = None
|
||||
self.execution_str_read_fd = None
|
||||
self.execution_str_write_fd = None
|
||||
self.task_id = None
|
||||
|
||||
if not os.path.exists(storlet_logger_path):
|
||||
os.makedirs(storlet_logger_path)
|
||||
|
||||
def _wait_for_read_with_timeout(self, fd):
|
||||
"""
|
||||
Wait while the read file descriptor gets ready
|
||||
@@ -680,7 +692,7 @@ class StorletInvocationProtocol(object):
|
||||
def communicate(self):
|
||||
try:
|
||||
with self.storlet_logger.activate(),\
|
||||
self._activate_invocation_descriptors():
|
||||
self._activate_invocation_descriptors():
|
||||
self._invoke()
|
||||
|
||||
if not self.srequest.has_fd:
|
||||
|
||||
@@ -44,6 +44,8 @@ class SwiftFileManager(FileManager):
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
# TODO(kota_): IMO, we need to make this to self._client environ to
|
||||
# get rid of redundant instanciation
|
||||
return InternalClient(self.conf_file, 'SA', 1)
|
||||
|
||||
def _get_object(self, container, obj, headers=None):
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
# Copyright (c) 2010-2016 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from tempfile import mkdtemp
|
||||
from shutil import rmtree
|
||||
import functools
|
||||
|
||||
|
||||
def with_tempdir(f):
|
||||
"""
|
||||
Decorator to give a single test a tempdir as argument to test method.
|
||||
"""
|
||||
@functools.wraps(f)
|
||||
def wrapped(*args, **kwargs):
|
||||
tempdir = mkdtemp()
|
||||
args = list(args)
|
||||
args.append(tempdir)
|
||||
try:
|
||||
return f(*args, **kwargs)
|
||||
finally:
|
||||
rmtree(tempdir)
|
||||
return wrapped
|
||||
|
||||
|
||||
class MockSBus(object):
|
||||
@classmethod
|
||||
def send(self, path, datagram):
|
||||
# return success code
|
||||
return 0
|
||||
|
||||
@@ -17,10 +17,23 @@ from contextlib import contextmanager
|
||||
import unittest
|
||||
from six import StringIO
|
||||
from swift.common.swob import HTTPException, Request
|
||||
from swift.common.utils import FileLikeIter
|
||||
from tests.unit.swift import FakeLogger
|
||||
from tests.unit.swift.storlet_middleware import FakeApp
|
||||
from storlet_gateway.gateways.docker.gateway import DockerStorletRequest, \
|
||||
StorletGatewayDocker
|
||||
from tests.unit import MockSBus
|
||||
import os
|
||||
import os.path
|
||||
from tempfile import mkdtemp
|
||||
from shutil import rmtree
|
||||
import mock
|
||||
import json
|
||||
|
||||
|
||||
class MockInternalClient(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
|
||||
class TestDockerStorletRequest(unittest.TestCase):
|
||||
@@ -107,17 +120,14 @@ class TestDockerStorletRequest(unittest.TestCase):
|
||||
self.assertTrue(dsreq.has_range)
|
||||
|
||||
|
||||
class TestStorletGatewayDocker(unittest.TestCase):
|
||||
class TestStorletDockerGateway(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# TODO(takashi): take these values from config file
|
||||
self.tempdir = mkdtemp()
|
||||
self.sconf = {
|
||||
'lxc_root': '/home/docker_device/scopes',
|
||||
'cache_dir': '/home/docker_device/cache/scopes',
|
||||
'log_dir': '/home/docker_device/logs/scopes',
|
||||
'script_dir': '/home/docker_device/scripts',
|
||||
'storlets_dir': '/home/docker_device/storlets/scopes',
|
||||
'pipes_dir': '/home/docker_device/pipes/scopes',
|
||||
'host_root': self.tempdir,
|
||||
'swift_dir': self.tempdir,
|
||||
'storlet_timeout': '9',
|
||||
'storlet_container': 'storlet',
|
||||
'storlet_dependency': 'dependency',
|
||||
@@ -130,11 +140,39 @@ class TestStorletGatewayDocker(unittest.TestCase):
|
||||
self.storlet_dependency = self.sconf['storlet_dependency']
|
||||
|
||||
self.version = 'v1'
|
||||
self.account = 'a'
|
||||
self.container = 'c'
|
||||
self.obj = 'o'
|
||||
self.account = 'AUTH_account'
|
||||
self.container = 'container'
|
||||
self.obj = 'object'
|
||||
self.sobj = 'storlet-1.0.jar'
|
||||
|
||||
# TODO(kota_): shoudl be 'storlet-internal-client.conf' actually
|
||||
ic_conf_path = os.path.join(self.tempdir,
|
||||
'storlet-proxy-server.conf')
|
||||
with open(ic_conf_path, 'w') as f:
|
||||
f.write("""
|
||||
[DEFAULT]
|
||||
[pipeline:main]
|
||||
pipeline = catch_errors proxy-logging cache proxy-server
|
||||
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
|
||||
[filter:cache]
|
||||
use = egg:swift#memcache
|
||||
|
||||
[filter:proxy-logging]
|
||||
use = egg:swift#proxy_logging
|
||||
|
||||
[filter:catch_errors]
|
||||
use = egg:swift#catch_errors
|
||||
""")
|
||||
|
||||
self.gateway = StorletGatewayDocker(
|
||||
self.sconf, self.logger, self.account)
|
||||
|
||||
def tearDown(self):
|
||||
rmtree(self.tempdir)
|
||||
|
||||
@property
|
||||
def req_path(self):
|
||||
return self._create_proxy_path(
|
||||
@@ -147,13 +185,6 @@ class TestStorletGatewayDocker(unittest.TestCase):
|
||||
self.version, self.account, self.storlet_container,
|
||||
self.sobj)
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
|
||||
def _create_gateway(self):
|
||||
return StorletGatewayDocker(
|
||||
self.sconf, self.logger, self.app, self.account)
|
||||
|
||||
def _create_proxy_path(self, version, account, container, obj):
|
||||
return '/'.join(['', version, account, container, obj])
|
||||
|
||||
@@ -259,13 +290,84 @@ class TestStorletGatewayDocker(unittest.TestCase):
|
||||
'Dependency-Version': '1.0'}
|
||||
with self.assertRaises(ValueError):
|
||||
StorletGatewayDocker.validate_dependency_registration(params, obj)
|
||||
|
||||
params = {
|
||||
'Dependency-Permissions': '888',
|
||||
'Dependency-Version': '1.0'}
|
||||
with self.assertRaises(ValueError):
|
||||
StorletGatewayDocker.validate_dependency_registration(params, obj)
|
||||
|
||||
def test_docker_gateway_communicate(self):
|
||||
sw_req = Request.blank(
|
||||
self.req_path, environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Run-Storlet': self.sobj}, body='body')
|
||||
|
||||
reader = sw_req.environ['wsgi.input'].read
|
||||
body_iter = iter(lambda: reader(65536), '')
|
||||
options = {'generate_log': False,
|
||||
'scope': 'AUTH_account',
|
||||
'storlet_main': 'org.openstack.storlet.Storlet',
|
||||
'storlet_dependency': 'dep1,dep2'}
|
||||
|
||||
st_req = DockerStorletRequest(
|
||||
storlet_id=sw_req.headers['X-Run-Storlet'],
|
||||
params=sw_req.params,
|
||||
user_metadata={},
|
||||
data_iter=body_iter, options=options)
|
||||
|
||||
# TODO(kota_): need more efficient way for emuration of return value
|
||||
# from SDaemon
|
||||
value_generator = iter([
|
||||
# Firt is for confirmation for SDaemon running
|
||||
'True: daemon running confirmation',
|
||||
# Second is stop SDaemon in activation
|
||||
'True: stop daemon',
|
||||
# Third is start SDaemon again in activation
|
||||
'True: start daemon',
|
||||
# Forth is return value for invoking as task_id
|
||||
'This is task id',
|
||||
# Fifth is for getting meta
|
||||
json.dumps({'metadata': 'return'}),
|
||||
# At last return body and EOF
|
||||
'something', '',
|
||||
])
|
||||
|
||||
def mock_read(fd, size):
|
||||
try:
|
||||
value = next(value_generator)
|
||||
except StopIteration:
|
||||
raise Exception('called more then expected')
|
||||
return value
|
||||
|
||||
# prepare nested mock patch
|
||||
# SBus -> mock SBus.send() for container communication
|
||||
# os.read -> mock reading the file descriptor from container
|
||||
# select.slect -> mock fd communication wich can be readable
|
||||
@mock.patch('storlet_gateway.gateways.docker.runtime.SBus', MockSBus)
|
||||
@mock.patch('storlet_gateway.gateways.docker.runtime.os.read',
|
||||
mock_read)
|
||||
@mock.patch('storlet_gateway.gateways.docker.runtime.select.select',
|
||||
lambda r, w, x, timeout=None: (r, w, x))
|
||||
@mock.patch('storlet_gateway.common.stob.os.read',
|
||||
mock_read)
|
||||
def test_invocation_flow():
|
||||
sresp = self.gateway.invocation_flow(st_req)
|
||||
file_like = FileLikeIter(sresp.data_iter)
|
||||
self.assertEqual('something', file_like.read())
|
||||
|
||||
# I hate the decorator to return an instance but to track current
|
||||
# implementation, we have to make a mock class for this. Need to fix.
|
||||
|
||||
class MockFileManager(object):
|
||||
def get_storlet(self, req):
|
||||
return StringIO('mock'), None
|
||||
|
||||
def get_dependency(self, req):
|
||||
return StringIO('mock'), None
|
||||
|
||||
st_req.file_manager = MockFileManager()
|
||||
|
||||
test_invocation_flow()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
@@ -19,6 +19,7 @@ import unittest
|
||||
import tempfile
|
||||
from contextlib import contextmanager
|
||||
from six import StringIO
|
||||
from stat import ST_MODE
|
||||
|
||||
from storlet_gateway.common.exceptions import StorletRuntimeException
|
||||
from storlet_gateway.gateways.docker.gateway import DockerStorletRequest
|
||||
@@ -26,6 +27,7 @@ from storlet_gateway.gateways.docker.runtime import RunTimeSandbox, \
|
||||
RunTimePaths, StorletInvocationProtocol
|
||||
from tests.unit.swift import FakeLogger
|
||||
from exceptions import AssertionError
|
||||
from tests.unit import with_tempdir
|
||||
|
||||
|
||||
@contextmanager
|
||||
@@ -110,8 +112,8 @@ class TestRuntimePaths(unittest.TestCase):
|
||||
|
||||
# When the directory exists
|
||||
with mock.patch('os.path.exists', return_value=True), \
|
||||
mock.patch('os.makedirs') as m, \
|
||||
mock.patch('os.chmod') as c:
|
||||
mock.patch('os.makedirs') as m, \
|
||||
mock.patch('os.chmod') as c:
|
||||
self.paths.create_host_pipe_prefix()
|
||||
self.assertEqual(m.call_count, 0)
|
||||
cargs, ckwargs = c.call_args
|
||||
@@ -120,8 +122,8 @@ class TestRuntimePaths(unittest.TestCase):
|
||||
|
||||
# When the directory does not exist
|
||||
with mock.patch('os.path.exists', return_value=False), \
|
||||
mock.patch('os.makedirs') as m, \
|
||||
mock.patch('os.chmod') as c:
|
||||
mock.patch('os.makedirs') as m, \
|
||||
mock.patch('os.chmod') as c:
|
||||
self.paths.create_host_pipe_prefix(),
|
||||
self.assertEqual(m.call_count, 1)
|
||||
# Make sure about the target directory
|
||||
@@ -188,9 +190,72 @@ class TestRuntimePaths(unittest.TestCase):
|
||||
self.paths.get_host_dependency_cache_dir(),
|
||||
os.path.join(self.cache_dir, self.scope, 'dependency'))
|
||||
|
||||
def test_runtime_paths_default(self):
|
||||
# CHECK: docs says we need 4 dirs for communicate
|
||||
# ====================================================================
|
||||
# |1| host_factory_pipe_path | <pipes_dir>/<account>/factory_pipe |
|
||||
# ====================================================================
|
||||
# |2| host_storlet_pipe_path | <pipes_dir>/<account>/<storlet_id> |
|
||||
# ====================================================================
|
||||
# |3| sandbox_factory_pipe_path | /mnt/channels/factory_pipe |
|
||||
# ====================================================================
|
||||
# |4| sandbox_storlet_pipe_path | /mnt/channels/<storlet_id> |
|
||||
# ====================================================================
|
||||
#
|
||||
# With this test, the account value is "account" (because of w/o
|
||||
# reseller_prefix), and the storlet_id is "Storlet-1.0.jar" (app name?)
|
||||
# ok, let's check for these values
|
||||
|
||||
runtime_paths = RunTimePaths('AUTH_account', {})
|
||||
storlet_id = 'Storlet-1.0.jar'
|
||||
|
||||
# For pipe
|
||||
self.assertEqual('/home/docker_device/pipes/scopes/AUTH_account',
|
||||
runtime_paths.host_pipe_prefix())
|
||||
|
||||
# 1. host_factory_pipe_path <pipes_dir>/<scope>/factory_pipe
|
||||
self.assertEqual(
|
||||
'/home/docker_device/pipes/scopes/AUTH_account/factory_pipe',
|
||||
runtime_paths.host_factory_pipe())
|
||||
# 2. host_storlet_pipe_path <pipes_dir>/<scope>/<storlet_id>
|
||||
self.assertEqual(
|
||||
'/home/docker_device/pipes/scopes/AUTH_account/Storlet-1.0.jar',
|
||||
runtime_paths.host_storlet_pipe(storlet_id))
|
||||
# 3. Yes, right now, we don't have the path for #3 in Python
|
||||
# 4. sandbox_storlet_pipe_path | /mnt/channels/<storlet_id>
|
||||
self.assertEqual('/mnt/channels/Storlet-1.0.jar',
|
||||
runtime_paths.sbox_storlet_pipe(storlet_id))
|
||||
|
||||
# This looks like for jar load?
|
||||
self.assertEqual('/home/docker_device/storlets/scopes/AUTH_account',
|
||||
runtime_paths.host_storlet_prefix())
|
||||
self.assertEqual(
|
||||
'/home/docker_device/storlets/scopes/AUTH_account/Storlet-1.0.jar',
|
||||
runtime_paths.host_storlet(storlet_id))
|
||||
# And this one is a mount poit in sand box?
|
||||
self.assertEqual('/home/swift/Storlet-1.0.jar',
|
||||
runtime_paths.sbox_storlet_exec(storlet_id))
|
||||
|
||||
@with_tempdir
|
||||
def test_create_host_pipe_prefix_with_real_dir(self, temp_dir):
|
||||
runtime_paths = RunTimePaths('AUTH_account', {'host_root': temp_dir})
|
||||
runtime_paths.create_host_pipe_prefix()
|
||||
path = runtime_paths.host_pipe_prefix()
|
||||
self.assertTrue(os.path.exists(path))
|
||||
self.assertTrue(os.path.isdir(path))
|
||||
permission = oct(os.stat(path)[ST_MODE])[-3:]
|
||||
# TODO(kota_): make sure if this is really acceptable
|
||||
self.assertEqual('777', permission)
|
||||
|
||||
|
||||
class TestRuntimePathsTempauth(TestRuntimePaths):
|
||||
def setUp(self):
|
||||
self.account = 'AUTH_test'
|
||||
self.scope = 'test'
|
||||
self._initialize()
|
||||
|
||||
|
||||
class TestRunTimeSandbox(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.logger = FakeLogger()
|
||||
# TODO(takashi): take these values from config file
|
||||
|
||||
Reference in New Issue
Block a user