Split out general implementation for containers

This is a prep work to support different container runtimes such as
podman.

Change-Id: Ia7787bdb1d86683d316a32b34c0a080b7b4ebb87
This commit is contained in:
Takashi Kajinami 2024-02-11 00:47:00 +09:00
parent f1e59923ee
commit caae8c45a7
16 changed files with 2142 additions and 2066 deletions

View File

@ -36,8 +36,8 @@ paste.filter_factory =
storlet_handler = storlets.swift_middleware.storlet_handler:filter_factory storlet_handler = storlets.swift_middleware.storlet_handler:filter_factory
storlets.gateways = storlets.gateways =
stub = storlets.gateway.gateways.stub:StorletGatewayStub stub = storlets.gateway.gateways.stub:StubStorletGateway
docker = storlets.gateway.gateways.docker:StorletGatewayDocker docker = storlets.gateway.gateways.docker:DockerStorletGateway
console_scripts = console_scripts =
sbus = storlets.sbus.cli:main sbus = storlets.sbus.cli:main

View File

@ -0,0 +1,372 @@
# Copyright IBM Corp. 2015, 2015 All Rights Reserved
# Copyright (c) 2010-2016 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import shutil
from storlets.gateway.common.stob import StorletRequest
from storlets.gateway.gateways.base import StorletGatewayBase
from storlets.gateway.gateways.container.runtime import RunTimePaths, \
StorletInvocationProtocol
class ContainerStorletRequest(StorletRequest):
"""
The ContainerStorletRequest class represents a request to be processed by
the storlet the request is derived from the Swift request and
essentially consists of:
1. A data stream to be processed
2. Metadata identifying the stream
"""
# TODO(takashi): Some of following parameters should be defined common
# parameters for StorletRequest
required_options = ['storlet_main', 'storlet_language', 'file_manager']
def __init__(self, storlet_id, params, user_metadata, data_iter=None,
data_fd=None, options=None):
"""
:param storlet_id: storlet id
:param params: execution parameters
:param user_metadata: user metadata
:param data_iter: an iterator to read data
:param data_fd: a file descriptor to read data
:param options: a dictionaly which stores gateway specific options.
:raises ValueError: when some of the required options (storlet_main
and file_manager) are missing
"""
super(ContainerStorletRequest, self).__init__(
storlet_id, params, user_metadata, data_iter, data_fd,
options=options)
self.generate_log = self.options.get('generate_log', False)
self.storlet_main = self.options['storlet_main']
self.storlet_language = self.options['storlet_language']
self.storlet_language_version = \
self.options.get('storlet_language_version')
if self.options.get('storlet_dependency'):
self.dependencies = [
x.strip() for x
in self.options['storlet_dependency'].split(',')
if x.strip()]
else:
self.dependencies = []
self.file_manager = self.options['file_manager']
self.start = self.options.get('range_start')
self.end = self.options.get('range_end')
@property
def has_range(self):
"""
Whether the input range is given
"""
return self.start is not None and self.end is not None
class StorletGatewayContainer(StorletGatewayBase):
request_class = ContainerStorletRequest
sandbox = None
def __init__(self, conf, logger, scope):
"""
:param conf: a dict for gateway conf
:param logger: a logger instance
:param scope: scope name to identify the container
"""
super(StorletGatewayContainer, self).__init__(conf, logger, scope)
self.storlet_timeout = float(self.conf.get('storlet_timeout', 40))
self.paths = RunTimePaths(scope, conf)
@classmethod
def validate_storlet_registration(cls, params, name):
"""
Validate required parameters for storlet file
:param params: parameters related to the storlet file
:param name: name of the storlet file
:raises ValueError: if some of the required parameters are missing,
or some of the parameters are invalid
"""
mandatory = ['Language', 'Interface-Version', 'Object-Metadata',
'Main']
cls._check_mandatory_params(params, mandatory)
if params['Language'].lower() == 'java':
if '-' not in name or '.' not in name:
raise ValueError('Storlet name is incorrect')
elif params['Language'].lower() == 'python':
try:
version = float(params.get('Language-Version', 3))
except ValueError:
raise ValueError('Language-Version is invalid')
if int(version) != 3:
# TODO(kota_): more strict version check should be nice.
raise ValueError('Not supported version specified')
if name.endswith('.py'):
cls_name = params['Main']
if not cls_name.startswith(name[:-3] + '.'):
raise ValueError('Main class should be included in '
'storlet file')
if len(cls_name.split('.')) != 2:
raise ValueError('Submodule is currently not supported')
# TODO(takashi): Add support for sdist tar.gz
else:
raise ValueError('Storlet name is incorrect')
else:
raise ValueError('Unsupported Language')
dep = params.get('Dependency')
if dep:
deps = dep.split(',')
if name in deps:
raise ValueError('Using the same name for storlet and '
'dependency is not allowed')
if len(deps) != len(set(deps)):
raise ValueError('Duplicated name in dependencies')
@classmethod
def validate_dependency_registration(cls, params, name):
"""
Validate required parameters for dependency file
:param params: parameters related to the dependency file
:param name: name of the dependency file
:raises ValueError: if some of the required parameters are missing,
or some of the parameters are invalid
"""
mandatory = ['Dependency-Version']
cls._check_mandatory_params(params, mandatory)
perm = params.get('Dependency-Permissions')
if perm is not None:
try:
perm_int = int(perm, 8)
except ValueError:
raise ValueError('Dependency permission is incorrect')
if (perm_int & int('600', 8)) != int('600', 8):
raise ValueError('The owner should have rw permission')
@classmethod
def _check_mandatory_params(cls, params, mandatory):
"""
Ensure that we have all mandatory parameters in the given parameters
:param params: file parameters
:param mandatory: required parameters
:raises ValueError: if some of the required parameters are missing
"""
for md in mandatory:
if md not in params:
raise ValueError('Mandatory parameter is missing'
': {0}'.format(md))
def invocation_flow(self, sreq, extra_sources=None):
"""
Invoke the backend protocol with gateway
:param sreq: StorletRequest instance
:param extra_sources (WIP): A list of StorletRequest instance to gather
as extra resoureces to feed to storlet
container as data source
:return: StorletResponse instance
"""
run_time_sbox = self.sandbox(self.scope, self.conf, self.logger)
container_updated = self.update_container_from_cache(sreq)
run_time_sbox.activate_storlet_daemon(sreq, container_updated)
self._add_system_params(sreq)
slog_path = self.paths.get_host_slog_path(sreq.storlet_main)
storlet_pipe_path = \
self.paths.get_host_storlet_pipe(sreq.storlet_main)
sprotocol = StorletInvocationProtocol(sreq,
storlet_pipe_path,
slog_path,
self.storlet_timeout,
self.logger,
extra_sources=extra_sources)
sresp = sprotocol.communicate()
self._upload_storlet_logs(slog_path, sreq)
return sresp
def _add_system_params(self, sreq):
"""
Adds Storlet engine specific parameters to the invocation
currently, this consists only of the execution path of the
Storlet within the container.
:params params: Request parameters
"""
sreq.params['storlet_execution_path'] = self. \
paths.get_sbox_storlet_dir(sreq.storlet_main)
def _upload_storlet_logs(self, slog_path, sreq):
"""
Upload storlet execution log as a swift object
:param slog_path: target path
:params sreq: ContainerStorletRequest instance
"""
if sreq.generate_log:
with open(slog_path, 'rb') as logfile:
storlet_name = sreq.storlet_id.split('-')[0]
log_obj_name = '%s.log' % storlet_name
sreq.file_manager.put_log(log_obj_name, logfile)
def bring_from_cache(self, obj_name, sreq, is_storlet):
"""
Auxiliary function that:
(1) Brings from Swift obj_name, either this is in a
storlet or a storlet dependency.
(2) Copies from local cache into the conrainer
If this is a Storlet then also validates that the cache is updated
with most recent copy of the Storlet compared to the copy residing in
Swift.
:params obj_name: name of the object
:params sreq: ContainerStorletRequest instance
:params is_storlet: True if the object is a storlet object
False if the object is a dependency object
:returns: Whether the container was updated with obj_name
"""
# Determine the cache we are to work with
# e.g. dependency or storlet
if is_storlet:
cache_dir = self.paths.host_storlet_cache_dir
get_func = sreq.file_manager.get_storlet
else:
cache_dir = self.paths.host_dependency_cache_dir
get_func = sreq.file_manager.get_dependency
if not os.path.exists(cache_dir):
os.makedirs(cache_dir, 0o700)
# cache_target_path is the actual object we need to deal with
# e.g. a concrete storlet or dependency we need to bring/update
cache_target_path = os.path.join(cache_dir, obj_name)
# Determine if we need to update the cache for cache_target_path
# We default for no
update_cache = False
# If it does not exist in cache, we obviously need to bring
if not os.path.isfile(cache_target_path):
update_cache = True
elif is_storlet:
# The cache_target_path exists, we test if it is up-to-date
# with the metadata we got.
# We mention that this is currently applicable for storlets
# only, and not for dependencies.
# This will change when we will head dependencies as well
fstat = os.stat(cache_target_path)
storlet_or_size = int(
sreq.options['storlet_content_length'].rstrip("L"))
storlet_or_time = float(sreq.options['storlet_x_timestamp'])
b_storlet_size_changed = fstat.st_size != storlet_or_size
b_storlet_file_updated = float(fstat.st_mtime) < storlet_or_time
if b_storlet_size_changed or b_storlet_file_updated:
update_cache = True
if update_cache:
# If the cache needs to be updated, then get on with it
# bring the object from storage
data_iter, perm = get_func(obj_name)
if perm:
perm = int(perm, 8) & 0o700
else:
perm = 0o600
# TODO(takashi): Do not directly write to target path
with open(cache_target_path, 'wb') as fn:
os.chmod(cache_target_path, perm)
for data in data_iter:
fn.write(data)
# The node's local cache is now updated.
# We now verify if we need to update the
# container itself.
# The container needs to be updated if:
# 1. The container does not hold a copy of the object
# 2. The container holds an older version of the object
update_container = False
container_storlet_path = \
self.paths.get_host_storlet_dir(sreq.storlet_main)
container_target_path = os.path.join(container_storlet_path, obj_name)
if not os.path.exists(container_storlet_path):
os.makedirs(container_storlet_path, 0o700)
update_container = True
elif not os.path.isfile(container_target_path):
update_container = True
else:
fstat_cached = os.stat(cache_target_path)
fstat_container = os.stat(container_target_path)
if fstat_cached.st_size != fstat_container.st_size:
update_container = True
if fstat_cached.st_mtime < fstat_container.st_mtime:
update_container = True
if update_container:
# need to copy from cache to container
# copy2 also copies the permissions
shutil.copy2(cache_target_path, container_target_path)
return update_container
def update_container_from_cache(self, sreq):
"""
Iterates over the storlet name and its dependencies appearing
in the invocation data and make sure they are brought to the
local cache, and from there to the container.
Uses the bring_from_cache auxiliary function.
:params sreq: ContainerStorletRequest instance
:returns: True if the container was updated
"""
# where at the host side, reside the storlet containers
storlet_path = self.paths.host_storlet_base_dir
if not os.path.exists(storlet_path):
os.makedirs(storlet_path, 0o755)
# Iterate over storlet and dependencies, and make sure
# they are updated within the container.
# return True if any of them wea actually
# updated within the container
container_updated = False
updated = self.bring_from_cache(sreq.storlet_id, sreq, True)
container_updated = container_updated or updated
for dep in sreq.dependencies:
updated = self.bring_from_cache(dep, sreq, False)
container_updated = container_updated or updated
return container_updated

View File

@ -0,0 +1,744 @@
# Copyright (c) 2015, 2016 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import errno
import os
import select
import stat
import sys
import time
import eventlet
import json
from contextlib import contextmanager
from storlets.sbus.client import SBusClient
from storlets.sbus.client.exceptions import SBusClientException
from storlets.sbus.datagram import SBusFileDescriptor
from storlets.sbus import file_description as sbus_fd
from storlets.gateway.common.exceptions import StorletRuntimeException, \
StorletTimeout
from storlets.gateway.common.logger import StorletLogger
from storlets.gateway.common.stob import StorletResponse
MAX_METADATA_SIZE = 4096
eventlet.monkey_patch()
class RunTimePaths(object):
"""
The Storlet Engine need to be access stuff located in many paths:
1. The various communication channels represented as pipes in the
filesystem
2. Directories where to place Storlets
3. Directories where to place logs
Communication channels
----------------------
The RunTimeSandbox communicates with the Sandbox via two types of pipes
1. factory pipe - defined per scope, used for communication with the
sandbox
for e.g. start/stop a storlet daemon
2. Storlet pipe - defined per scope and Storlet, used for communication
with a storlet daemon, e.g. to call the invoke API
Each pipe type has two paths:
1. A path that is inside the sandbox
2. A path that is outside of the sandbox or at the host side. As such
this path is prefixed by 'host_'
Thus, we have the following 4 paths of interest:
1. sandbox_factory_pipe_path
2. host_factory_pipe_path
3. sandbox_storlet_pipe_path
4. host_storlet_pipe_path
Our implementation uses the following path structure for the various pipes:
In the host, all pipes belonging to a given scope are prefixed by
<pipes_dir>/<scope>, where <pipes_dir> comes from the configuration
Thus:
host_factory_pipe_path is of the form <pipes_dir>/<scope>/factory_pipe
host_storlet_pipe_path is of the form <pipes_dir>/<scope>/<storlet_id>
In The sandbox side
sandbox_factory_pipe_path is of the form /mnt/channels/factory_pipe
sandbox_storlet_pipe_path is of the form /mnt/channels/<storlet_id>
Storlets Locations
------------------
The Storlet binaries are accessible from the sandbox using a mounted
directory.
This directory is called the storlet directories.
On the host side it is of the form <storlet_dir>/<scope>/<storlet_name>
On the sandbox side it is of the form /home/swift/<storlet_name>
<storlet_dir> comes from the configuration
<storlet_name> is the prefix of the jar.
Logs
----
Logs are located in paths of the form:
<log_dir>/<scope>/<storlet_name>.log
"""
def __init__(self, scope, conf):
"""
Construct RunTimePaths instance
:param scope: scope name to be used as container name
:param conf: gateway conf
"""
self.scope = scope
self.factory_pipe_name = 'factory_pipe'
self.sandbox_pipe_dir = '/mnt/channels'
self.sandbox_storlet_base_dir = '/home/swift'
self.host_root_dir = conf.get('host_root', '/var/lib/storlets')
self.host_pipe_root_dir = \
conf.get('pipes_dir',
os.path.join(self.host_root_dir, 'pipes', 'scopes'))
self.host_storlet_root_dir = \
conf.get('storlets_dir',
os.path.join(self.host_root_dir, 'storlets', 'scopes'))
self.host_log_root_dir = \
conf.get('log_dir',
os.path.join(self.host_root_dir, 'logs', 'scopes'))
self.host_cache_root_dir = \
conf.get('cache_dir',
os.path.join(self.host_root_dir, 'cache', 'scopes'))
self.host_restart_script_dir = \
conf.get('script_dir',
os.path.join(self.host_root_dir, 'scripts'))
self.host_storlet_native_lib_dir = '/usr/local/lib/storlets'
self.sandbox_storlet_native_lib_dir = '/usr/local/lib/storlets'
self.host_storlet_native_bin_dir = '/usr/local/libexec/storlets'
self.sandbox_storlet_native_bin_dir = '/usr/local/libexec/storlets'
@property
def host_pipe_dir(self):
return os.path.join(self.host_pipe_root_dir, self.scope)
def create_host_pipe_dir(self):
path = self.host_pipe_dir
if not os.path.exists(path):
os.makedirs(path)
# 0777 should be 0700 when we get user namespaces in container
os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
return path
@property
def host_factory_pipe(self):
return os.path.join(self.host_pipe_dir, self.factory_pipe_name)
@property
def sandbox_factory_pipe(self):
return os.path.join(self.sandbox_pipe_dir, self.factory_pipe_name)
def get_host_storlet_pipe(self, storlet_id):
return os.path.join(self.host_pipe_dir, storlet_id)
def get_sbox_storlet_pipe(self, storlet_id):
return os.path.join(self.sandbox_pipe_dir, storlet_id)
def get_sbox_storlet_dir(self, storlet_id):
return os.path.join(self.sandbox_storlet_base_dir, storlet_id)
@property
def host_storlet_base_dir(self):
return os.path.join(self.host_storlet_root_dir, self.scope)
def get_host_storlet_dir(self, storlet_id):
return os.path.join(self.host_storlet_base_dir, storlet_id)
def get_host_slog_path(self, storlet_id):
return os.path.join(
self.host_log_root_dir, self.scope, storlet_id,
'storlet_invoke.log')
@property
def host_storlet_cache_dir(self):
return os.path.join(self.host_cache_root_dir, self.scope, 'storlet')
@property
def host_dependency_cache_dir(self):
return os.path.join(self.host_cache_root_dir, self.scope, 'dependency')
class RunTimeSandbox(object, metaclass=abc.ABCMeta):
"""
The RunTimeSandbox represents a reusable per scope sandbox.
The sandbox is reusable in the sense that it can run several storlet
daemons.
The following methods are supported:
ping - pings the sandbox for liveness
wait - wait for the sandbox to be ready for processing commands
restart - restart the sandbox
start_storlet_daemon - start a daemon for a given storlet
stop_storlet_daemon - stop a daemon of a given storlet
get_storlet_daemon_status - test if a given storlet daemon is running
"""
def __init__(self, scope, conf, logger):
"""
:param scope: scope name to be used as container name
:param conf: gateway conf
:param logger: logger instance
"""
self.paths = RunTimePaths(scope, conf)
self.scope = scope
self.sandbox_ping_interval = \
float(conf.get('sandbox_ping_interval', 0.5))
self.sandbox_stop_timeout = \
float(conf.get('stop_linux_container_timeout', 1))
self.sandbox_wait_timeout = \
float(conf.get('restart_linux_container_timeout', 10))
self.container_image_namespace = \
conf.get('docker_repo', conf.get('container_image_namespace'))
self.container_image_name_prefix = 'tenant'
# TODO(add line in conf)
self.storlet_daemon_thread_pool_size = \
int(conf.get('storlet_daemon_thread_pool_size', 5))
self.storlet_daemon_factory_debug_level = \
conf.get('storlet_daemon_factory_debug_level', 'DEBUG')
self.storlet_daemon_debug_level = \
conf.get('storlet_daemon_debug_level', 'DEBUG')
# TODO(change logger's route if possible)
self.logger = logger
self.default_container_image_name = conf.get(
'default_docker_image_name',
conf.get('default_container_image_name', 'storlet_engine_image')
)
self.max_containers_per_node = \
int(conf.get('max_containers_per_node', 0))
self.container_cpu_period = int(conf.get('container_cpu_period', 0))
self.container_cpu_quota = int(conf.get('container_cpu_quota', 0))
self.container_mem_limit = conf.get('container_mem_limit', 0)
# NOTE(tkajinam): memory limit can be a string with unit like 1024m
try:
self.container_mem_limit = int(self.container_mem_limit)
except TypeError:
pass
self.container_cpuset_cpus = conf.get('container_cpuset_cpus')
self.container_cpuset_mems = conf.get('container_cpuset_mems')
self.container_pids_limit = int(conf.get('container_pids_limit', 0))
def ping(self):
"""
Ping to daemon factory process inside container
:returns: True when the daemon factory is responsive
False when the daemon factory is not responsive or it fails
to send command to the process
"""
pipe_path = self.paths.host_factory_pipe
client = SBusClient(pipe_path)
try:
resp = client.ping()
if not resp.status:
self.logger.error('Failed to ping to daemon factory: %s' %
resp.message)
return resp.status
except SBusClientException:
return False
def wait(self):
"""
Wait while scope's sandbox is starting
:raises StorletTimeout: the sandbox has not started in
sandbox_wait_timeout
"""
with StorletTimeout(self.sandbox_wait_timeout):
while not self.ping():
time.sleep(self.sandbox_ping_interval)
@abc.abstractmethod
def _restart(self, container_image_name):
"""
Restarts the scope's sandbox using the specified container image
:param container_image_name: name of the container image to start
:raises StorletRuntimeException: when failed to restart the container
"""
pass
def restart(self):
"""
Restarts the scope's sandbox
"""
self.paths.create_host_pipe_dir()
container_image_name = self.scope
try:
self._restart(container_image_name)
self.wait()
except StorletTimeout:
raise
except StorletRuntimeException:
# We were unable to start a container from the tenant image.
# Let us try to start a container from default image.
self.logger.exception("Failed to start a container from "
"tenant image %s" % container_image_name)
self.logger.info("Trying to start a container from default "
"image: %s" % self.default_container_image_name)
self._restart(self.default_container_image_name)
self.wait()
def start_storlet_daemon(
self, spath, storlet_id, language, language_version=None):
"""
Start SDaemon process in the scope's sandbox
"""
pipe_path = self.paths.host_factory_pipe
client = SBusClient(pipe_path)
try:
resp = client.start_daemon(
language.lower(), spath, storlet_id,
self.paths.get_sbox_storlet_pipe(storlet_id),
self.storlet_daemon_debug_level,
self.storlet_daemon_thread_pool_size,
language_version)
if not resp.status:
self.logger.error('Failed to start storlet daemon: %s' %
resp.message)
raise StorletRuntimeException('Daemon start failed')
except SBusClientException:
raise StorletRuntimeException('Daemon start failed')
def stop_storlet_daemon(self, storlet_id):
"""
Stop SDaemon process in the scope's sandbox
"""
pipe_path = self.paths.host_factory_pipe
client = SBusClient(pipe_path)
try:
resp = client.stop_daemon(storlet_id)
if not resp.status:
self.logger.error('Failed to stop storlet daemon: %s' %
resp.message)
raise StorletRuntimeException('Daemon stop failed')
except SBusClientException:
raise StorletRuntimeException('Daemon stop failed')
def get_storlet_daemon_status(self, storlet_id):
"""
Get the status of SDaemon process in the scope's sandbox
"""
pipe_path = self.paths.host_factory_pipe
client = SBusClient(pipe_path)
try:
resp = client.daemon_status(storlet_id)
if resp.status:
return 1
else:
self.logger.error('Failed to get status about storlet '
'daemon: %s' % resp.message)
return 0
except SBusClientException:
return -1
def _get_storlet_classpath(self, storlet_main, storlet_id, dependencies):
"""
Get classpath required to run storlet application
:param storlet_main: Main class name of the storlet
:param storlet_id: Name of the storlet file
:param dependencies: A list of dependency file
:returns: classpath string
"""
class_path = os.path.join(
self.paths.get_sbox_storlet_dir(storlet_main), storlet_id)
dep_path_list = \
[os.path.join(self.paths.get_sbox_storlet_dir(storlet_main), dep)
for dep in dependencies]
return class_path + ':' + ':'.join(dep_path_list)
def activate_storlet_daemon(self, sreq, cache_updated=True):
storlet_daemon_status = \
self.get_storlet_daemon_status(sreq.storlet_main)
if (storlet_daemon_status == -1):
# We failed to send a command to the factory.
# Best we can do is execute the container.
self.logger.debug('Failed to check the storlet daemon status. '
'Restart its container')
self.restart()
storlet_daemon_status = 0
if (cache_updated is True and storlet_daemon_status == 1):
# The cache was updated while the daemon is running we need to
# stop it.
self.logger.debug('The cache was updated, and the storlet daemon '
'is running. Stopping daemon')
try:
self.stop_storlet_daemon(sreq.storlet_main)
except StorletRuntimeException:
self.logger.warning('Failed to stop the storlet daemon. '
'Restart its container')
self.restart()
else:
self.logger.debug('Deamon stopped')
storlet_daemon_status = 0
if (storlet_daemon_status == 0):
self.logger.debug('Going to start the storlet daemon!')
# TODO(takashi): This is not needed for python application
classpath = self._get_storlet_classpath(
sreq.storlet_main, sreq.storlet_id, sreq.dependencies)
self.start_storlet_daemon(
classpath, sreq.storlet_main, sreq.storlet_language,
sreq.storlet_language_version)
self.logger.debug('Daemon started')
class StorletInvocationProtocol(object):
"""
StorletInvocationProtocol class
This class serves communictaion with a container to run an
application
:param srequest: StorletRequest instance
:param storlet_pipe_path: path string to pipe
:param storlet_logger_path: path string to log file
:param timeout: integer of timeout for waiting the resp from container
:param logger: logger instance
:param extra_sources (WIP): a list of StorletRequest instances
which keep data_iter for adding extra source
as data stream
"""
def __init__(self, srequest, storlet_pipe_path, storlet_logger_path,
timeout, logger, extra_sources=None):
self.srequest = srequest
self.storlet_pipe_path = storlet_pipe_path
self.storlet_logger = StorletLogger(storlet_logger_path)
self.logger = logger
self.timeout = timeout
# local side file descriptors
self.data_read_fd = None
self.data_write_fd = None
self.metadata_read_fd = None
self.metadata_write_fd = None
self.task_id = None
self._input_data_read_fd = None
self._input_data_write_fd = None
self.extra_data_sources = []
extra_sources = extra_sources or []
for source in extra_sources:
if source.has_fd:
# TODO(kota_): it may be data_fd in the future.
raise Exception(
'extra_source no requires data_fd just data_iter')
self.extra_data_sources.append(
{'read_fd': None, 'write_fd': None,
'user_metadata': source.user_metadata,
'data_iter': source.data_iter})
@property
def input_data_read_fd(self):
"""
File descriptor to read the input body content
"""
if self.srequest.has_fd:
return self.srequest.data_fd
else:
return self._input_data_read_fd
@property
def remote_fds(self):
"""
A list of sbus file descriptors passed to remote side
"""
storlets_metadata = {}
if self.srequest.has_range:
storlets_metadata.update(
{'start': str(self.srequest.start),
'end': str(self.srequest.end)})
fds = [SBusFileDescriptor(sbus_fd.SBUS_FD_INPUT_OBJECT,
self.input_data_read_fd,
storage_metadata=self.srequest.user_metadata,
storlets_metadata=storlets_metadata),
SBusFileDescriptor(sbus_fd.SBUS_FD_OUTPUT_OBJECT,
self.data_write_fd),
SBusFileDescriptor(sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
self.metadata_write_fd),
SBusFileDescriptor(sbus_fd.SBUS_FD_LOGGER,
self.storlet_logger.getfd())]
for source in self.extra_data_sources:
fd = SBusFileDescriptor(
sbus_fd.SBUS_FD_INPUT_OBJECT,
source['read_fd'],
storage_metadata=source['user_metadata'])
fds.append(fd)
return fds
@contextmanager
def _activate_invocation_descriptors(self):
"""
Contextmanager about file descriptors used in storlet invocation
NOTE: This context manager now only closes remote side fds,
so you should close local side fds
"""
self._prepare_invocation_descriptors()
try:
yield
finally:
self._close_remote_side_descriptors()
def _prepare_invocation_descriptors(self):
"""
Create all pipse used for Storlet execution
"""
if not self.srequest.has_fd:
self._input_data_read_fd, self._input_data_write_fd = os.pipe()
self.data_read_fd, self.data_write_fd = os.pipe()
self.metadata_read_fd, self.metadata_write_fd = os.pipe()
for source in self.extra_data_sources:
source['read_fd'], source['write_fd'] = os.pipe()
def _safe_close(self, fds):
"""
Make sure that all of the file descriptors get closed
:param fds: a list of file descriptors
"""
for fd in fds:
try:
os.close(fd)
except OSError as err:
if err.errno != errno.EBADF:
raise
# TODO(kota_): fd might be closed already, so if already
# closed, OSError will be raised. we need more refactor to
# keep clean the file descriptors.
pass
def _close_remote_side_descriptors(self):
"""
Close all of the container side descriptors
"""
fds = [self.data_write_fd, self.metadata_write_fd]
if not self.srequest.has_fd:
fds.append(self.input_data_read_fd)
fds.extend([source['read_fd'] for source in self.extra_data_sources])
for fd in fds:
os.close(fd)
def _close_local_side_descriptors(self):
"""
Close all of the host side descriptors
"""
fds = [self.data_read_fd, self.metadata_read_fd]
fds.extend([source['write_fd'] for source in self.extra_data_sources])
self._safe_close(fds)
def _cancel(self):
"""
Cancel on-going storlet execution
"""
client = SBusClient(self.storlet_pipe_path)
try:
resp = client.cancel(self.task_id)
if not resp.status:
raise StorletRuntimeException('Failed to cancel task')
except SBusClientException:
raise StorletRuntimeException('Failed to cancel task')
def _invoke(self):
"""
Send an execution command to the remote daemon factory
"""
with self.storlet_logger.activate(),\
self._activate_invocation_descriptors():
self._send_execute_command()
def _send_execute_command(self):
"""
Send execute command to the remote daemon factory to invoke storlet
execution
"""
client = SBusClient(self.storlet_pipe_path)
try:
resp = client.execute(self.srequest.params, self.remote_fds)
if not resp.status:
raise StorletRuntimeException("Failed to send execute command")
if not resp.task_id:
raise StorletRuntimeException("Missing task id")
else:
self.task_id = resp.task_id
except SBusClientException:
raise StorletRuntimeException("Failed to send execute command")
def _wait_for_read_with_timeout(self, fd):
"""
Wait while the read file descriptor gets ready
:param fd: File descriptor to read
:raises StorletTimeout: Exception raised when it times out to cancel
the existing task
:raises StorletRuntimeException: Exception raised when it fails to
cancel the existing task
"""
try:
with StorletTimeout(self.timeout):
r, w, e = select.select([fd], [], [])
except StorletTimeout:
exc_type, exc_value, exc_traceback = sys.exc_info()
# When there is a task already running, we should cancel it.
if self.task_id:
try:
self._cancel()
except StorletRuntimeException:
self.logger.warning(
'Task %s timed out, but failed to get canceled'
% self.task_id)
pass
if exc_value is None:
exc_value = exc_traceback
if exc_value.__traceback__ is not exc_traceback:
raise exc_value.with_traceback(exc_traceback)
raise exc_value
if fd not in r:
raise StorletRuntimeException('Read fd is not ready')
def _read_metadata(self):
"""
Read metadata in the storlet execution result from fd
:returns: a dict of metadata
"""
self._wait_for_read_with_timeout(self.metadata_read_fd)
flat_json = os.read(self.metadata_read_fd, MAX_METADATA_SIZE)
os.close(self.metadata_read_fd)
try:
return json.loads(flat_json)
except ValueError:
self.logger.exception('Failed to load metadata from json')
raise StorletRuntimeException('Got invalid format about metadata')
def _wait_for_write_with_timeout(self, fd):
"""
Wait while the write file descriptor gets ready
:param fd: File descriptor to write
:raises StorletTimeout: Exception raised when it times out to cancel
the existing task
:raises StorletRuntimeException: Exception raised when it fails to
cancel the existing task
"""
with StorletTimeout(self.timeout):
r, w, e = select.select([], [fd], [])
if fd not in w:
raise StorletRuntimeException('Write fd is not ready')
def _close_input_data_descriptors(self):
fds = [self._input_data_read_fd, self._input_data_write_fd]
self._safe_close(fds)
def communicate(self):
try:
self._invoke()
if not self.srequest.has_fd:
self._wait_for_write_with_timeout(self._input_data_write_fd)
# We do the writing in a different thread.
# Otherwise, we can run into the following deadlock
# 1. middleware writes to Storlet
# 2. Storlet reads and starts to write metadata and then data
# 3. middleware continues writing
# 4. Storlet continues writing and gets stuck as middleware
# is busy writing, but still not consuming the reader end
# of the Storlet writer.
eventlet.spawn_n(self._write_input_data,
self._input_data_write_fd,
self.srequest.data_iter)
for source in self.extra_data_sources:
# NOTE(kota_): not sure right now if using eventlet.spawn_n is
# right way. GreenPool is better? I don't get
# whole for the dead lock described in above.
self._wait_for_write_with_timeout(source['write_fd'])
eventlet.spawn_n(self._write_input_data,
source['write_fd'],
source['data_iter'])
out_md = self._read_metadata()
self._wait_for_read_with_timeout(self.data_read_fd)
return StorletResponse(out_md, data_fd=self.data_read_fd,
cancel=self._cancel)
except Exception:
self._close_local_side_descriptors()
if not self.srequest.has_fd:
self._close_input_data_descriptors()
raise
@contextmanager
def _open_writer(self, fd):
with os.fdopen(fd, 'wb') as writer:
yield writer
def _write_input_data(self, fd, data_iter):
try:
# double try/except block saving from unexpected errors
try:
with self._open_writer(fd) as writer:
for chunk in data_iter:
with StorletTimeout(self.timeout):
writer.write(chunk)
except (OSError, TypeError, ValueError):
self.logger.exception('fdopen failed')
except IOError:
# this will happen at sort of broken pipe while writer.write
self.logger.exception('IOError with writing fd %s' % fd)
except StorletTimeout:
self.logger.exception(
'Timeout (%s)s with writing fd %s' % (self.timeout, fd))
except Exception:
# _write_input_data is designed to run eventlet thread
# so that we should catch and suppress it here
self.logger.exception('Unexpected error at writing input data')

View File

@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from storlets.gateway.gateways.docker.gateway import StorletGatewayDocker from storlets.gateway.gateways.docker.gateway import DockerStorletGateway
__all__ = [ __all__ = [
'StorletGatewayDocker', 'DockerStorletGateway',
] ]

View File

@ -14,370 +14,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os from storlets.gateway.gateways.container.gateway import StorletGatewayContainer
import shutil from storlets.gateway.gateways.docker.runtime import DockerRunTimeSandbox
from storlets.gateway.common.stob import StorletRequest
from storlets.gateway.gateways.base import StorletGatewayBase
from storlets.gateway.gateways.docker.runtime import RunTimePaths, \
RunTimeSandbox, StorletInvocationProtocol
"""--------------------------------------------------------------------------- class DockerStorletGateway(StorletGatewayContainer):
The Storlet Gateway API sandbox = DockerRunTimeSandbox
The API is made of:
(1) The class DockerStorletRequest. This encapsulates what goes in and comes
out of the gateway
(2) The StorletGateway is the Docker flavor of the StorletGateway API:
validate_storlet_registration
validate_dependency_registration
invocation_flow
---------------------------------------------------------------------------"""
class DockerStorletRequest(StorletRequest):
"""
The DockerStorletRequest class represents a request to be processed by the
storlet the request is derived from the Swift request and
essentially consists of:
1. A data stream to be processed
2. Metadata identifying the stream
"""
# TODO(takashi): Some of following parameters should be defined common
# parameters for StorletRequest
required_options = ['storlet_main', 'storlet_language', 'file_manager']
def __init__(self, storlet_id, params, user_metadata, data_iter=None,
data_fd=None, options=None):
"""
:param storlet_id: storlet id
:param params: execution parameters
:param user_metadata: user metadata
:param data_iter: an iterator to read data
:param data_fd: a file descriptor to read data
:param options: a dictionaly which stores gateway specific options.
:raises ValueError: when some of the required options (storlet_main
and file_manager) are missing
"""
super(DockerStorletRequest, self).__init__(
storlet_id, params, user_metadata, data_iter, data_fd,
options=options)
self.generate_log = self.options.get('generate_log', False)
self.storlet_main = self.options['storlet_main']
self.storlet_language = self.options['storlet_language']
self.storlet_language_version = \
self.options.get('storlet_language_version')
if self.options.get('storlet_dependency'):
self.dependencies = [
x.strip() for x
in self.options['storlet_dependency'].split(',')
if x.strip()]
else:
self.dependencies = []
self.file_manager = self.options['file_manager']
self.start = self.options.get('range_start')
self.end = self.options.get('range_end')
@property
def has_range(self):
"""
Whether the input range is given
"""
return self.start is not None and self.end is not None
class StorletGatewayDocker(StorletGatewayBase):
request_class = DockerStorletRequest
def __init__(self, conf, logger, scope):
"""
:param conf: a dict for gateway conf
:param logger: a logger instance
:param scope: scope name to identify the container
"""
super(StorletGatewayDocker, self).__init__(conf, logger, scope)
self.storlet_timeout = float(self.conf.get('storlet_timeout', 40))
self.paths = RunTimePaths(scope, conf)
@classmethod
def validate_storlet_registration(cls, params, name):
"""
Validate required parameters for storlet file
:param params: parameters related to the storlet file
:param name: name of the storlet file
:raises ValueError: if some of the required parameters are missing,
or some of the parameters are invalid
"""
mandatory = ['Language', 'Interface-Version', 'Object-Metadata',
'Main']
cls._check_mandatory_params(params, mandatory)
if params['Language'].lower() == 'java':
if '-' not in name or '.' not in name:
raise ValueError('Storlet name is incorrect')
elif params['Language'].lower() == 'python':
try:
version = float(params.get('Language-Version', 3))
except ValueError:
raise ValueError('Language-Version is invalid')
if int(version) != 3:
# TODO(kota_): more strict version check should be nice.
raise ValueError('Not supported version specified')
if name.endswith('.py'):
cls_name = params['Main']
if not cls_name.startswith(name[:-3] + '.'):
raise ValueError('Main class should be included in '
'storlet file')
if len(cls_name.split('.')) != 2:
raise ValueError('Submodule is currently not supported')
# TODO(takashi): Add support for sdist tar.gz
else:
raise ValueError('Storlet name is incorrect')
else:
raise ValueError('Unsupported Language')
dep = params.get('Dependency')
if dep:
deps = dep.split(',')
if name in deps:
raise ValueError('Using the same name for storlet and '
'dependency is not allowed')
if len(deps) != len(set(deps)):
raise ValueError('Duplicated name in dependencies')
@classmethod
def validate_dependency_registration(cls, params, name):
"""
Validate required parameters for dependency file
:param params: parameters related to the dependency file
:param name: name of the dependency file
:raises ValueError: if some of the required parameters are missing,
or some of the parameters are invalid
"""
mandatory = ['Dependency-Version']
cls._check_mandatory_params(params, mandatory)
perm = params.get('Dependency-Permissions')
if perm is not None:
try:
perm_int = int(perm, 8)
except ValueError:
raise ValueError('Dependency permission is incorrect')
if (perm_int & int('600', 8)) != int('600', 8):
raise ValueError('The owner should have rw permission')
@classmethod
def _check_mandatory_params(cls, params, mandatory):
"""
Ensure that we have all mandatory parameters in the given parameters
:param params: file parameters
:param mandatory: required parameters
:raises ValueError: if some of the required parameters are missing
"""
for md in mandatory:
if md not in params:
raise ValueError('Mandatory parameter is missing'
': {0}'.format(md))
def invocation_flow(self, sreq, extra_sources=None):
"""
Invoke the backend protocol with gateway
:param sreq: StorletRequest instance
:param extra_sources (WIP): A list of StorletRequest instance to gather
as extra resoureces to feed to storlet
container as data source
:return: StorletResponse instance
"""
run_time_sbox = RunTimeSandbox(self.scope, self.conf, self.logger)
docker_updated = self.update_docker_container_from_cache(sreq)
run_time_sbox.activate_storlet_daemon(sreq, docker_updated)
self._add_system_params(sreq)
slog_path = self.paths.get_host_slog_path(sreq.storlet_main)
storlet_pipe_path = \
self.paths.get_host_storlet_pipe(sreq.storlet_main)
sprotocol = StorletInvocationProtocol(sreq,
storlet_pipe_path,
slog_path,
self.storlet_timeout,
self.logger,
extra_sources=extra_sources)
sresp = sprotocol.communicate()
self._upload_storlet_logs(slog_path, sreq)
return sresp
def _add_system_params(self, sreq):
"""
Adds Storlet engine specific parameters to the invocation
currently, this consists only of the execution path of the
Storlet within the Docker container.
:params params: Request parameters
"""
sreq.params['storlet_execution_path'] = self. \
paths.get_sbox_storlet_dir(sreq.storlet_main)
def _upload_storlet_logs(self, slog_path, sreq):
"""
Upload storlet execution log as a swift object
:param slog_path: target path
:params sreq: DockerStorletRequest instance
"""
if sreq.generate_log:
with open(slog_path, 'rb') as logfile:
storlet_name = sreq.storlet_id.split('-')[0]
log_obj_name = '%s.log' % storlet_name
sreq.file_manager.put_log(log_obj_name, logfile)
def bring_from_cache(self, obj_name, sreq, is_storlet):
"""
Auxiliary function that:
(1) Brings from Swift obj_name, either this is in a
storlet or a storlet dependency.
(2) Copies from local cache into the Docker conrainer
If this is a Storlet then also validates that the cache is updated
with most recent copy of the Storlet compared to the copy residing in
Swift.
:params obj_name: name of the object
:params sreq: DockerStorletRequest instance
:params is_storlet: True if the object is a storlet object
False if the object is a dependency object
:returns: Whether the Docker container was updated with obj_name
"""
# Determine the cache we are to work with
# e.g. dependency or storlet
if is_storlet:
cache_dir = self.paths.host_storlet_cache_dir
get_func = sreq.file_manager.get_storlet
else:
cache_dir = self.paths.host_dependency_cache_dir
get_func = sreq.file_manager.get_dependency
if not os.path.exists(cache_dir):
os.makedirs(cache_dir, 0o700)
# cache_target_path is the actual object we need to deal with
# e.g. a concrete storlet or dependency we need to bring/update
cache_target_path = os.path.join(cache_dir, obj_name)
# Determine if we need to update the cache for cache_target_path
# We default for no
update_cache = False
# If it does not exist in cache, we obviously need to bring
if not os.path.isfile(cache_target_path):
update_cache = True
elif is_storlet:
# The cache_target_path exists, we test if it is up-to-date
# with the metadata we got.
# We mention that this is currently applicable for storlets
# only, and not for dependencies.
# This will change when we will head dependencies as well
fstat = os.stat(cache_target_path)
storlet_or_size = int(
sreq.options['storlet_content_length'].rstrip("L"))
storlet_or_time = float(sreq.options['storlet_x_timestamp'])
b_storlet_size_changed = fstat.st_size != storlet_or_size
b_storlet_file_updated = float(fstat.st_mtime) < storlet_or_time
if b_storlet_size_changed or b_storlet_file_updated:
update_cache = True
if update_cache:
# If the cache needs to be updated, then get on with it
# bring the object from storage
data_iter, perm = get_func(obj_name)
if perm:
perm = int(perm, 8) & 0o700
else:
perm = 0o600
# TODO(takashi): Do not directly write to target path
with open(cache_target_path, 'wb') as fn:
os.chmod(cache_target_path, perm)
for data in data_iter:
fn.write(data)
# The node's local cache is now updated.
# We now verify if we need to update the
# Docker container itself.
# The Docker container needs to be updated if:
# 1. The Docker container does not hold a copy of the object
# 2. The Docker container holds an older version of the object
update_docker = False
docker_storlet_path = \
self.paths.get_host_storlet_dir(sreq.storlet_main)
docker_target_path = os.path.join(docker_storlet_path, obj_name)
if not os.path.exists(docker_storlet_path):
os.makedirs(docker_storlet_path, 0o700)
update_docker = True
elif not os.path.isfile(docker_target_path):
update_docker = True
else:
fstat_cached = os.stat(cache_target_path)
fstat_docker = os.stat(docker_target_path)
if fstat_cached.st_size != fstat_docker.st_size:
update_docker = True
if fstat_cached.st_mtime < fstat_docker.st_mtime:
update_docker = True
if update_docker:
# need to copy from cache to docker
# copy2 also copies the permissions
shutil.copy2(cache_target_path, docker_target_path)
return update_docker
def update_docker_container_from_cache(self, sreq):
"""
Iterates over the storlet name and its dependencies appearing
in the invocation data and make sure they are brought to the
local cache, and from there to the Docker container.
Uses the bring_from_cache auxiliary function.
:params sreq: DockerStorletRequest instance
:returns: True if the Docker container was updated
"""
# where at the host side, reside the storlet containers
storlet_path = self.paths.host_storlet_base_dir
if not os.path.exists(storlet_path):
os.makedirs(storlet_path, 0o755)
# Iterate over storlet and dependencies, and make sure
# they are updated within the Docker container.
# return True if any of them wea actually
# updated within the Docker container
docker_updated = False
updated = self.bring_from_cache(sreq.storlet_id, sreq, True)
docker_updated = docker_updated or updated
for dep in sreq.dependencies:
updated = self.bring_from_cache(dep, sreq, False)
docker_updated = docker_updated or updated
return docker_updated

View File

@ -13,283 +13,17 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import errno
import os import os
import select
import stat
import sys
import time
import docker import docker
import docker.errors import docker.errors
from docker.types import Mount as DockerMount from docker.types import Mount as DockerMount
import eventlet from storlets.gateway.common.exceptions import StorletRuntimeException
import json from storlets.gateway.gateways.container.runtime import RunTimeSandbox
from contextlib import contextmanager
from storlets.sbus.client import SBusClient
from storlets.sbus.client.exceptions import SBusClientException
from storlets.sbus.datagram import SBusFileDescriptor
from storlets.sbus import file_description as sbus_fd
from storlets.gateway.common.exceptions import StorletRuntimeException, \
StorletTimeout
from storlets.gateway.common.logger import StorletLogger
from storlets.gateway.common.stob import StorletResponse
MAX_METADATA_SIZE = 4096
eventlet.monkey_patch() class DockerRunTimeSandbox(RunTimeSandbox):
"""---------------------------------------------------------------------------
Sandbox API
"""
class RunTimePaths(object):
"""
The Storlet Engine need to be access stuff located in many paths:
1. The various communication channels represented as pipes in the
filesystem
2. Directories where to place Storlets
3. Directories where to place logs
Communication channels
----------------------
The RunTimeSandbox communicates with the Sandbox via two types of pipes
1. factory pipe - defined per scope, used for communication with the
sandbox
for e.g. start/stop a storlet daemon
2. Storlet pipe - defined per scope and Storlet, used for communication
with a storlet daemon, e.g. to call the invoke API
Each pipe type has two paths:
1. A path that is inside the sandbox
2. A path that is outside of the sandbox or at the host side. As such
this path is prefixed by 'host_'
Thus, we have the following 4 paths of interest:
1. sandbox_factory_pipe_path
2. host_factory_pipe_path
3. sandbox_storlet_pipe_path
4. host_storlet_pipe_path
Our implementation uses the following path structure for the various pipes:
In the host, all pipes belonging to a given scope are prefixed by
<pipes_dir>/<scope>, where <pipes_dir> comes from the configuration
Thus:
host_factory_pipe_path is of the form <pipes_dir>/<scope>/factory_pipe
host_storlet_pipe_path is of the form <pipes_dir>/<scope>/<storlet_id>
In The sandbox side
sandbox_factory_pipe_path is of the form /mnt/channels/factory_pipe
sandbox_storlet_pipe_path is of the form /mnt/channels/<storlet_id>
Storlets Locations
------------------
The Storlet binaries are accessible from the sandbox using a mounted
directory.
This directory is called the storlet directories.
On the host side it is of the form <storlet_dir>/<scope>/<storlet_name>
On the sandbox side it is of the form /home/swift/<storlet_name>
<storlet_dir> comes from the configuration
<storlet_name> is the prefix of the jar.
Logs
----
Logs are located in paths of the form:
<log_dir>/<scope>/<storlet_name>.log
"""
def __init__(self, scope, conf):
"""
Construct RunTimePaths instance
:param scope: scope name to be used as container name
:param conf: gateway conf
"""
self.scope = scope
self.factory_pipe_name = 'factory_pipe'
self.sandbox_pipe_dir = '/mnt/channels'
self.sandbox_storlet_base_dir = '/home/swift'
self.host_root_dir = conf.get('host_root', '/var/lib/storlets')
self.host_pipe_root_dir = \
conf.get('pipes_dir',
os.path.join(self.host_root_dir, 'pipes', 'scopes'))
self.host_storlet_root_dir = \
conf.get('storlets_dir',
os.path.join(self.host_root_dir, 'storlets', 'scopes'))
self.host_log_root_dir = \
conf.get('log_dir',
os.path.join(self.host_root_dir, 'logs', 'scopes'))
self.host_cache_root_dir = \
conf.get('cache_dir',
os.path.join(self.host_root_dir, 'cache', 'scopes'))
self.host_restart_script_dir = \
conf.get('script_dir',
os.path.join(self.host_root_dir, 'scripts'))
self.host_storlet_native_lib_dir = '/usr/local/lib/storlets'
self.sandbox_storlet_native_lib_dir = '/usr/local/lib/storlets'
self.host_storlet_native_bin_dir = '/usr/local/libexec/storlets'
self.sandbox_storlet_native_bin_dir = '/usr/local/libexec/storlets'
@property
def host_pipe_dir(self):
return os.path.join(self.host_pipe_root_dir, self.scope)
def create_host_pipe_dir(self):
path = self.host_pipe_dir
if not os.path.exists(path):
os.makedirs(path)
# 0777 should be 0700 when we get user namespaces in Docker
os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
return path
@property
def host_factory_pipe(self):
return os.path.join(self.host_pipe_dir, self.factory_pipe_name)
@property
def sandbox_factory_pipe(self):
return os.path.join(self.sandbox_pipe_dir, self.factory_pipe_name)
def get_host_storlet_pipe(self, storlet_id):
return os.path.join(self.host_pipe_dir, storlet_id)
def get_sbox_storlet_pipe(self, storlet_id):
return os.path.join(self.sandbox_pipe_dir, storlet_id)
def get_sbox_storlet_dir(self, storlet_id):
return os.path.join(self.sandbox_storlet_base_dir, storlet_id)
@property
def host_storlet_base_dir(self):
return os.path.join(self.host_storlet_root_dir, self.scope)
def get_host_storlet_dir(self, storlet_id):
return os.path.join(self.host_storlet_base_dir, storlet_id)
def get_host_slog_path(self, storlet_id):
return os.path.join(
self.host_log_root_dir, self.scope, storlet_id,
'storlet_invoke.log')
@property
def host_storlet_cache_dir(self):
return os.path.join(self.host_cache_root_dir, self.scope, 'storlet')
@property
def host_dependency_cache_dir(self):
return os.path.join(self.host_cache_root_dir, self.scope, 'dependency')
"""---------------------------------------------------------------------------
Docker Stateful Container API
The RunTimeSandbox serve as an API between the Docker Gateway and
a reusable per scope sandbox
---------------------------------------------------------------------------"""
class RunTimeSandbox(object):
"""
The RunTimeSandbox represents a reusable per scope sandbox.
The sandbox is reusable in the sense that it can run several storlet
daemons.
The following methods are supported:
ping - pings the sandbox for liveness
wait - wait for the sandbox to be ready for processing commands
restart - restart the sandbox
start_storlet_daemon - start a daemon for a given storlet
stop_storlet_daemon - stop a daemon of a given storlet
get_storlet_daemon_status - test if a given storlet daemon is running
"""
def __init__(self, scope, conf, logger):
"""
:param scope: scope name to be used as container name
:param conf: gateway conf
:param logger: logger instance
"""
self.paths = RunTimePaths(scope, conf)
self.scope = scope
self.sandbox_ping_interval = \
float(conf.get('sandbox_ping_interval', 0.5))
self.sandbox_stop_timeout = \
float(conf.get('stop_linux_container_timeout', 1))
self.sandbox_wait_timeout = \
float(conf.get('restart_linux_container_timeout', 10))
self.container_image_namespace = \
conf.get('docker_repo', conf.get('container_image_namespace'))
self.container_image_name_prefix = 'tenant'
# TODO(add line in conf)
self.storlet_daemon_thread_pool_size = \
int(conf.get('storlet_daemon_thread_pool_size', 5))
self.storlet_daemon_factory_debug_level = \
conf.get('storlet_daemon_factory_debug_level', 'DEBUG')
self.storlet_daemon_debug_level = \
conf.get('storlet_daemon_debug_level', 'DEBUG')
# TODO(change logger's route if possible)
self.logger = logger
self.default_container_image_name = conf.get(
'default_docker_image_name',
conf.get('default_container_image_name', 'storlet_engine_image')
)
self.max_containers_per_node = \
int(conf.get('max_containers_per_node', 0))
self.container_cpu_period = int(conf.get('container_cpu_period', 0))
self.container_cpu_quota = int(conf.get('container_cpu_quota', 0))
self.container_mem_limit = conf.get('container_mem_limit', 0)
# NOTE(tkajinam): memory limit can be a string with unit like 1024m
try:
self.container_mem_limit = int(self.container_mem_limit)
except TypeError:
pass
self.container_cpuset_cpus = conf.get('container_cpuset_cpus')
self.container_cpuset_mems = conf.get('container_cpuset_mems')
self.container_pids_limit = int(conf.get('container_pids_limit', 0))
def ping(self):
"""
Ping to daemon factory process inside container
:returns: True when the daemon factory is responsive
False when the daemon factory is not responsive or it fails
to send command to the process
"""
pipe_path = self.paths.host_factory_pipe
client = SBusClient(pipe_path)
try:
resp = client.ping()
if not resp.status:
self.logger.error('Failed to ping to daemon factory: %s' %
resp.message)
return resp.status
except SBusClientException:
return False
def wait(self):
"""
Wait while scope's sandbox is starting
:raises StorletTimeout: the sandbox has not started in
sandbox_wait_timeout
"""
with StorletTimeout(self.sandbox_wait_timeout):
while not self.ping():
time.sleep(self.sandbox_ping_interval)
def _restart(self, container_image_name): def _restart(self, container_image_name):
""" """
@ -375,468 +109,3 @@ class RunTimeSandbox(object):
except docker.errors.APIError: except docker.errors.APIError:
self.logger.exception("Failed to manage docker containers") self.logger.exception("Failed to manage docker containers")
raise StorletRuntimeException("Docker runtime error") raise StorletRuntimeException("Docker runtime error")
def restart(self):
"""
Restarts the scope's sandbox
"""
self.paths.create_host_pipe_dir()
container_image_name = self.scope
try:
self._restart(container_image_name)
self.wait()
except StorletTimeout:
raise
except StorletRuntimeException:
# We were unable to start docker container from the tenant image.
# Let us try to start docker container from default image.
self.logger.exception("Failed to start docker container from "
"tenant image %s" % container_image_name)
self.logger.info("Trying to start docker container from default "
"image: %s" % self.default_container_image_name)
self._restart(self.default_container_image_name)
self.wait()
def start_storlet_daemon(
self, spath, storlet_id, language, language_version=None):
"""
Start SDaemon process in the scope's sandbox
"""
pipe_path = self.paths.host_factory_pipe
client = SBusClient(pipe_path)
try:
resp = client.start_daemon(
language.lower(), spath, storlet_id,
self.paths.get_sbox_storlet_pipe(storlet_id),
self.storlet_daemon_debug_level,
self.storlet_daemon_thread_pool_size,
language_version)
if not resp.status:
self.logger.error('Failed to start storlet daemon: %s' %
resp.message)
raise StorletRuntimeException('Daemon start failed')
except SBusClientException:
raise StorletRuntimeException('Daemon start failed')
def stop_storlet_daemon(self, storlet_id):
"""
Stop SDaemon process in the scope's sandbox
"""
pipe_path = self.paths.host_factory_pipe
client = SBusClient(pipe_path)
try:
resp = client.stop_daemon(storlet_id)
if not resp.status:
self.logger.error('Failed to stop storlet daemon: %s' %
resp.message)
raise StorletRuntimeException('Daemon stop failed')
except SBusClientException:
raise StorletRuntimeException('Daemon stop failed')
def get_storlet_daemon_status(self, storlet_id):
"""
Get the status of SDaemon process in the scope's sandbox
"""
pipe_path = self.paths.host_factory_pipe
client = SBusClient(pipe_path)
try:
resp = client.daemon_status(storlet_id)
if resp.status:
return 1
else:
self.logger.error('Failed to get status about storlet '
'daemon: %s' % resp.message)
return 0
except SBusClientException:
return -1
def _get_storlet_classpath(self, storlet_main, storlet_id, dependencies):
"""
Get classpath required to run storlet application
:param storlet_main: Main class name of the storlet
:param storlet_id: Name of the storlet file
:param dependencies: A list of dependency file
:returns: classpath string
"""
class_path = os.path.join(
self.paths.get_sbox_storlet_dir(storlet_main), storlet_id)
dep_path_list = \
[os.path.join(self.paths.get_sbox_storlet_dir(storlet_main), dep)
for dep in dependencies]
return class_path + ':' + ':'.join(dep_path_list)
def activate_storlet_daemon(self, sreq, cache_updated=True):
storlet_daemon_status = \
self.get_storlet_daemon_status(sreq.storlet_main)
if (storlet_daemon_status == -1):
# We failed to send a command to the factory.
# Best we can do is execute the container.
self.logger.debug('Failed to check the storlet daemon status. '
'Restart Docker container')
self.restart()
storlet_daemon_status = 0
if (cache_updated is True and storlet_daemon_status == 1):
# The cache was updated while the daemon is running we need to
# stop it.
self.logger.debug('The cache was updated, and the storlet daemon '
'is running. Stopping daemon')
try:
self.stop_storlet_daemon(sreq.storlet_main)
except StorletRuntimeException:
self.logger.warning('Failed to stop the storlet daemon. '
'Restart Docker container')
self.restart()
else:
self.logger.debug('Deamon stopped')
storlet_daemon_status = 0
if (storlet_daemon_status == 0):
self.logger.debug('Going to start the storlet daemon!')
# TODO(takashi): This is not needed for python application
classpath = self._get_storlet_classpath(
sreq.storlet_main, sreq.storlet_id, sreq.dependencies)
self.start_storlet_daemon(
classpath, sreq.storlet_main, sreq.storlet_language,
sreq.storlet_language_version)
self.logger.debug('Daemon started')
"""---------------------------------------------------------------------------
Storlet Daemon API
StorletInvocationProtocol
server as an API between the Docker Gateway and the Storlet Daemon which
runs inside the Docker container. These classes implement the Storlet execution
protocol
---------------------------------------------------------------------------"""
class StorletInvocationProtocol(object):
"""
StorletInvocationProtocol class
This class serves communictaion with a Docker container to run an
application
:param srequest: StorletRequest instance
:param storlet_pipe_path: path string to pipe
:param storlet_logger_path: path string to log file
:param timeout: integer of timeout for waiting the resp from container
:param logger: logger instance
:param extra_sources (WIP): a list of StorletRequest instances
which keep data_iter for adding extra source
as data stream
"""
def __init__(self, srequest, storlet_pipe_path, storlet_logger_path,
timeout, logger, extra_sources=None):
self.srequest = srequest
self.storlet_pipe_path = storlet_pipe_path
self.storlet_logger = StorletLogger(storlet_logger_path)
self.logger = logger
self.timeout = timeout
# local side file descriptors
self.data_read_fd = None
self.data_write_fd = None
self.metadata_read_fd = None
self.metadata_write_fd = None
self.task_id = None
self._input_data_read_fd = None
self._input_data_write_fd = None
self.extra_data_sources = []
extra_sources = extra_sources or []
for source in extra_sources:
if source.has_fd:
# TODO(kota_): it may be data_fd in the future.
raise Exception(
'extra_source no requires data_fd just data_iter')
self.extra_data_sources.append(
{'read_fd': None, 'write_fd': None,
'user_metadata': source.user_metadata,
'data_iter': source.data_iter})
@property
def input_data_read_fd(self):
"""
File descriptor to read the input body content
"""
if self.srequest.has_fd:
return self.srequest.data_fd
else:
return self._input_data_read_fd
@property
def remote_fds(self):
"""
A list of sbus file descriptors passed to remote side
"""
storlets_metadata = {}
if self.srequest.has_range:
storlets_metadata.update(
{'start': str(self.srequest.start),
'end': str(self.srequest.end)})
fds = [SBusFileDescriptor(sbus_fd.SBUS_FD_INPUT_OBJECT,
self.input_data_read_fd,
storage_metadata=self.srequest.user_metadata,
storlets_metadata=storlets_metadata),
SBusFileDescriptor(sbus_fd.SBUS_FD_OUTPUT_OBJECT,
self.data_write_fd),
SBusFileDescriptor(sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
self.metadata_write_fd),
SBusFileDescriptor(sbus_fd.SBUS_FD_LOGGER,
self.storlet_logger.getfd())]
for source in self.extra_data_sources:
fd = SBusFileDescriptor(
sbus_fd.SBUS_FD_INPUT_OBJECT,
source['read_fd'],
storage_metadata=source['user_metadata'])
fds.append(fd)
return fds
@contextmanager
def _activate_invocation_descriptors(self):
"""
Contextmanager about file descriptors used in storlet invocation
NOTE: This context manager now only closes remote side fds,
so you should close local side fds
"""
self._prepare_invocation_descriptors()
try:
yield
finally:
self._close_remote_side_descriptors()
def _prepare_invocation_descriptors(self):
"""
Create all pipse used for Storlet execution
"""
if not self.srequest.has_fd:
self._input_data_read_fd, self._input_data_write_fd = os.pipe()
self.data_read_fd, self.data_write_fd = os.pipe()
self.metadata_read_fd, self.metadata_write_fd = os.pipe()
for source in self.extra_data_sources:
source['read_fd'], source['write_fd'] = os.pipe()
def _safe_close(self, fds):
"""
Make sure that all of the file descriptors get closed
:param fds: a list of file descriptors
"""
for fd in fds:
try:
os.close(fd)
except OSError as err:
if err.errno != errno.EBADF:
raise
# TODO(kota_): fd might be closed already, so if already
# closed, OSError will be raised. we need more refactor to
# keep clean the file descriptors.
pass
def _close_remote_side_descriptors(self):
"""
Close all of the container side descriptors
"""
fds = [self.data_write_fd, self.metadata_write_fd]
if not self.srequest.has_fd:
fds.append(self.input_data_read_fd)
fds.extend([source['read_fd'] for source in self.extra_data_sources])
for fd in fds:
os.close(fd)
def _close_local_side_descriptors(self):
"""
Close all of the host side descriptors
"""
fds = [self.data_read_fd, self.metadata_read_fd]
fds.extend([source['write_fd'] for source in self.extra_data_sources])
self._safe_close(fds)
def _cancel(self):
"""
Cancel on-going storlet execution
"""
client = SBusClient(self.storlet_pipe_path)
try:
resp = client.cancel(self.task_id)
if not resp.status:
raise StorletRuntimeException('Failed to cancel task')
except SBusClientException:
raise StorletRuntimeException('Failed to cancel task')
def _invoke(self):
"""
Send an execution command to the remote daemon factory
"""
with self.storlet_logger.activate(),\
self._activate_invocation_descriptors():
self._send_execute_command()
def _send_execute_command(self):
"""
Send execute command to the remote daemon factory to invoke storlet
execution
"""
client = SBusClient(self.storlet_pipe_path)
try:
resp = client.execute(self.srequest.params, self.remote_fds)
if not resp.status:
raise StorletRuntimeException("Failed to send execute command")
if not resp.task_id:
raise StorletRuntimeException("Missing task id")
else:
self.task_id = resp.task_id
except SBusClientException:
raise StorletRuntimeException("Failed to send execute command")
def _wait_for_read_with_timeout(self, fd):
"""
Wait while the read file descriptor gets ready
:param fd: File descriptor to read
:raises StorletTimeout: Exception raised when it times out to cancel
the existing task
:raises StorletRuntimeException: Exception raised when it fails to
cancel the existing task
"""
try:
with StorletTimeout(self.timeout):
r, w, e = select.select([fd], [], [])
except StorletTimeout:
exc_type, exc_value, exc_traceback = sys.exc_info()
# When there is a task already running, we should cancel it.
if self.task_id:
try:
self._cancel()
except StorletRuntimeException:
self.logger.warning(
'Task %s timed out, but failed to get canceled'
% self.task_id)
pass
if exc_value is None:
exc_value = exc_traceback
if exc_value.__traceback__ is not exc_traceback:
raise exc_value.with_traceback(exc_traceback)
raise exc_value
if fd not in r:
raise StorletRuntimeException('Read fd is not ready')
def _read_metadata(self):
"""
Read metadata in the storlet execution result from fd
:returns: a dict of metadata
"""
self._wait_for_read_with_timeout(self.metadata_read_fd)
flat_json = os.read(self.metadata_read_fd, MAX_METADATA_SIZE)
os.close(self.metadata_read_fd)
try:
return json.loads(flat_json)
except ValueError:
self.logger.exception('Failed to load metadata from json')
raise StorletRuntimeException('Got invalid format about metadata')
def _wait_for_write_with_timeout(self, fd):
"""
Wait while the write file descriptor gets ready
:param fd: File descriptor to write
:raises StorletTimeout: Exception raised when it times out to cancel
the existing task
:raises StorletRuntimeException: Exception raised when it fails to
cancel the existing task
"""
with StorletTimeout(self.timeout):
r, w, e = select.select([], [fd], [])
if fd not in w:
raise StorletRuntimeException('Write fd is not ready')
def _close_input_data_descriptors(self):
fds = [self._input_data_read_fd, self._input_data_write_fd]
self._safe_close(fds)
def communicate(self):
try:
self._invoke()
if not self.srequest.has_fd:
self._wait_for_write_with_timeout(self._input_data_write_fd)
# We do the writing in a different thread.
# Otherwise, we can run into the following deadlock
# 1. middleware writes to Storlet
# 2. Storlet reads and starts to write metadata and then data
# 3. middleware continues writing
# 4. Storlet continues writing and gets stuck as middleware
# is busy writing, but still not consuming the reader end
# of the Storlet writer.
eventlet.spawn_n(self._write_input_data,
self._input_data_write_fd,
self.srequest.data_iter)
for source in self.extra_data_sources:
# NOTE(kota_): not sure right now if using eventlet.spawn_n is
# right way. GreenPool is better? I don't get
# whole for the dead lock described in above.
self._wait_for_write_with_timeout(source['write_fd'])
eventlet.spawn_n(self._write_input_data,
source['write_fd'],
source['data_iter'])
out_md = self._read_metadata()
self._wait_for_read_with_timeout(self.data_read_fd)
return StorletResponse(out_md, data_fd=self.data_read_fd,
cancel=self._cancel)
except Exception:
self._close_local_side_descriptors()
if not self.srequest.has_fd:
self._close_input_data_descriptors()
raise
@contextmanager
def _open_writer(self, fd):
with os.fdopen(fd, 'wb') as writer:
yield writer
def _write_input_data(self, fd, data_iter):
try:
# double try/except block saving from unexpected errors
try:
with self._open_writer(fd) as writer:
for chunk in data_iter:
with StorletTimeout(self.timeout):
writer.write(chunk)
except (OSError, TypeError, ValueError):
self.logger.exception('fdopen failed')
except IOError:
# this will happen at sort of broken pipe while writer.write
self.logger.exception('IOError with writing fd %s' % fd)
except StorletTimeout:
self.logger.exception(
'Timeout (%s)s with writing fd %s' % (self.timeout, fd))
except Exception:
# _write_input_data is designed to run eventlet thread
# so that we should catch and suppress it here
self.logger.exception('Unexpected error at writing input data')

View File

@ -18,12 +18,12 @@ from storlets.gateway.common.stob import StorletRequest, StorletResponse
from storlets.gateway.gateways.base import StorletGatewayBase from storlets.gateway.gateways.base import StorletGatewayBase
class StorletGatewayStub(StorletGatewayBase): class StubStorletGateway(StorletGatewayBase):
request_class = StorletRequest request_class = StorletRequest
def __init__(self, conf, logger, scope): def __init__(self, conf, logger, scope):
super(StorletGatewayStub, self).__init__(conf, logger, scope) super(StubStorletGateway, self).__init__(conf, logger, scope)
self.logger = logger self.logger = logger
self.conf = conf self.conf = conf
self.scope = scope self.scope = scope

View File

@ -39,7 +39,7 @@ class TestCapabilities(StorletBaseFunctionalTest):
# TODO(eranr): take values from conf # TODO(eranr): take values from conf
self.assertEqual('dependency', options['storlet_dependency']) self.assertEqual('dependency', options['storlet_dependency'])
self.assertEqual('storlet', options['storlet_container']) self.assertEqual('storlet', options['storlet_container'])
self.assertEqual('StorletGatewayDocker', self.assertEqual('DockerStorletGateway',
options['storlet_gateway_class']) options['storlet_gateway_class'])

View File

@ -0,0 +1,561 @@
# Copyright (c) 2010-2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
from io import BytesIO, StringIO
import json
from shutil import rmtree
from tempfile import mkdtemp
import unittest
from unittest import mock
from swift.common.swob import Request, Response
from swift.common.utils import FileLikeIter
from storlets.sbus.client import SBusResponse
from storlets.gateway.gateways.container.gateway import ContainerStorletRequest
from tests.unit import FakeLogger
from tests.unit.gateway.gateways import FakeFileManager
class TestContainerStorletRequest(unittest.TestCase):
def test_init(self):
# Java
storlet_id = 'Storlet-1.0.jar'
params = {'Param1': 'Value1', 'Param2': 'Value2'}
metadata = {'MetaKey1': 'MetaValue1', 'MetaKey2': 'MetaValue2'}
# with dependencies
options = {'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_dependency': 'dep1,dep2',
'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep')}
dsreq = ContainerStorletRequest(storlet_id, params, metadata,
iter(StringIO()), options=options)
self.assertEqual(metadata, dsreq.user_metadata)
self.assertEqual(params, dsreq.params)
self.assertEqual('Storlet-1.0.jar', dsreq.storlet_id)
self.assertEqual('org.openstack.storlet.Storlet', dsreq.storlet_main)
self.assertEqual(['dep1', 'dep2'], dsreq.dependencies)
self.assertEqual('java', dsreq.storlet_language)
self.assertIsNone(dsreq.storlet_language_version)
# without dependencies
options = {'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep')}
dsreq = ContainerStorletRequest(storlet_id, params, metadata,
iter(StringIO()), options=options)
self.assertEqual(metadata, dsreq.user_metadata)
self.assertEqual(params, dsreq.params)
self.assertEqual('Storlet-1.0.jar', dsreq.storlet_id)
self.assertEqual('org.openstack.storlet.Storlet', dsreq.storlet_main)
self.assertEqual([], dsreq.dependencies)
self.assertEqual('java', dsreq.storlet_language)
self.assertIsNone(dsreq.storlet_language_version)
# storlet_language is not given
options = {'storlet_main': 'org.openstack.storlet.Storlet',
'file_manager': FakeFileManager('storlet', 'dep')}
with self.assertRaises(ValueError):
ContainerStorletRequest(storlet_id, params, metadata,
iter(StringIO()), options=options)
# storlet_main is not given
options = {'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep')}
with self.assertRaises(ValueError):
ContainerStorletRequest(storlet_id, params, metadata,
iter(StringIO()), options=options)
# file_manager is not given
options = {'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_language': 'java'}
with self.assertRaises(ValueError):
ContainerStorletRequest(storlet_id, params, metadata,
iter(StringIO()), options=options)
# Python
storlet_id = 'storlet.py'
params = {'Param1': 'Value1', 'Param2': 'Value2'}
metadata = {'MetaKey1': 'MetaValue1', 'MetaKey2': 'MetaValue2'}
# without language version
options = {'storlet_main': 'storlet.Storlet',
'storlet_language': 'python',
'file_manager': FakeFileManager('storlet', 'dep')}
dsreq = ContainerStorletRequest(storlet_id, params, metadata,
iter(StringIO()), options=options)
self.assertEqual(metadata, dsreq.user_metadata)
self.assertEqual(params, dsreq.params)
self.assertEqual('storlet.py', dsreq.storlet_id)
self.assertEqual('storlet.Storlet', dsreq.storlet_main)
self.assertEqual([], dsreq.dependencies)
self.assertEqual('python', dsreq.storlet_language)
self.assertIsNone(dsreq.storlet_language_version)
# with language version
options = {'storlet_main': 'storlet.Storlet',
'storlet_language': 'python',
'storlet_language_version': '3.6',
'file_manager': FakeFileManager('storlet', 'dep')}
dsreq = ContainerStorletRequest(storlet_id, params, metadata,
iter(StringIO()), options=options)
self.assertEqual(metadata, dsreq.user_metadata)
self.assertEqual(params, dsreq.params)
self.assertEqual('storlet.py', dsreq.storlet_id)
self.assertEqual('storlet.Storlet', dsreq.storlet_main)
self.assertEqual([], dsreq.dependencies)
self.assertEqual('python', dsreq.storlet_language)
self.assertEqual('3.6', dsreq.storlet_language_version)
def test_init_with_range(self):
storlet_id = 'Storlet-1.0.jar'
params = {}
metadata = {}
options = {'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_dependency': 'dep1,dep2',
'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep'),
'range_start': 1,
'range_end': 6}
dsreq = ContainerStorletRequest(storlet_id, params, metadata,
None, 0, options=options)
self.assertEqual('Storlet-1.0.jar', dsreq.storlet_id)
self.assertEqual('org.openstack.storlet.Storlet', dsreq.storlet_main)
self.assertEqual(['dep1', 'dep2'], dsreq.dependencies)
self.assertEqual('java', dsreq.storlet_language)
self.assertIsNone(dsreq.storlet_language_version)
self.assertEqual(1, dsreq.start)
self.assertEqual(6, dsreq.end)
options = {'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_dependency': 'dep1,dep2',
'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep'),
'range_start': 0,
'range_end': 0}
dsreq = ContainerStorletRequest(storlet_id, params, metadata,
None, 0, options=options)
self.assertEqual('Storlet-1.0.jar', dsreq.storlet_id)
self.assertEqual('org.openstack.storlet.Storlet', dsreq.storlet_main)
self.assertEqual(['dep1', 'dep2'], dsreq.dependencies)
self.assertEqual('java', dsreq.storlet_language)
self.assertIsNone(dsreq.storlet_language_version)
self.assertEqual(0, dsreq.start)
self.assertEqual(0, dsreq.end)
def test_has_range(self):
storlet_id = 'Storlet-1.0.jar'
params = {}
metadata = {}
options = {'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_dependency': 'dep1,dep2',
'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep')}
dsreq = ContainerStorletRequest(storlet_id, params, metadata,
None, 0, options=options)
self.assertFalse(dsreq.has_range)
options = {'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_dependency': 'dep1,dep2',
'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep'),
'range_start': 1,
'range_end': 6}
dsreq = ContainerStorletRequest(storlet_id, params, metadata,
None, 0, options=options)
self.assertTrue(dsreq.has_range)
options = {'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_dependency': 'dep1,dep2',
'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep'),
'range_start': 0,
'range_end': 6}
dsreq = ContainerStorletRequest(storlet_id, params, metadata,
None, 0, options=options)
self.assertTrue(dsreq.has_range)
class ContainerGatewayTestMixin(object):
def setUp(self):
# TODO(takashi): take these values from config file
self.tempdir = mkdtemp()
self.sconf = {
'host_root': self.tempdir,
'swift_dir': self.tempdir,
'storlet_timeout': '9',
'storlet_container': 'storlet',
'storlet_dependency': 'dependency',
'reseller_prefix': 'AUTH'
}
self.logger = FakeLogger()
self.storlet_container = self.sconf['storlet_container']
self.storlet_dependency = self.sconf['storlet_dependency']
self.version = 'v1'
self.account = 'AUTH_account'
self.container = 'container'
self.obj = 'object'
self.sobj = 'storlet-1.0.jar'
self.gateway = self.gateway_class(
self.sconf, self.logger, self.account)
def tearDown(self):
rmtree(self.tempdir)
@property
def req_path(self):
return self._create_proxy_path(
self.version, self.account, self.container,
self.obj)
@property
def storlet_path(self):
return self._create_proxy_path(
self.version, self.account, self.storlet_container,
self.sobj)
def _create_proxy_path(self, version, account, container, obj):
return '/'.join(['', version, account, container, obj])
def test_check_mandatory_params(self):
params = {'keyA': 'valueA',
'keyB': 'valueB',
'keyC': 'valueC'}
# all mandatory headers are included
self.gateway_class._check_mandatory_params(
params, ['keyA', 'keyB'])
# some of mandatory headers are missing
with self.assertRaises(ValueError):
self.gateway_class._check_mandatory_params(
params, ['keyA', 'KeyD'])
def test_validate_storlet_registration_java(self):
# correct name and headers w/ dependency
obj = 'storlet-1.0.jar'
params = {'Language': 'java',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no',
'Main': 'path.to.storlet.class'}
self.gateway_class.validate_storlet_registration(params, obj)
# correct name and headers w/o dependency
obj = 'storlet-1.0.jar'
params = {'Language': 'java',
'Interface-Version': '1.0',
'Object-Metadata': 'no',
'Main': 'path.to.storlet.class'}
self.gateway_class.validate_storlet_registration(params, obj)
# some header keys are missing
params = {'Language': 'java',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no'}
with self.assertRaises(ValueError):
self.gateway_class.validate_storlet_registration(params, obj)
# wrong name
obj = 'storlet.jar'
params = {'Language': 'java',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no',
'Main': 'path.to.storlet.class'}
with self.assertRaises(ValueError):
self.gateway_class.validate_storlet_registration(params, obj)
def test_validate_storlet_registration_python(self):
# correct name and headers w/ dependency
obj = 'storlet.py'
params = {'Language': 'python',
'Language-Version': '3.6',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no',
'Main': 'storlet.Storlet'}
self.gateway_class.validate_storlet_registration(params, obj)
# wrong version
obj = 'storlet.py'
params = {'Language': 'python',
'Language-Version': '1.7',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no',
'Main': 'storlet.Storlet'}
with self.assertRaises(ValueError):
self.gateway_class.validate_storlet_registration(params, obj)
# py2 is no more supported
obj = 'storlet.py'
params = {'Language': 'python',
'Language-Version': '2.7',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no',
'Main': 'storlet.Storlet'}
with self.assertRaises(ValueError):
self.gateway_class.validate_storlet_registration(params, obj)
# wrong name
obj = 'storlet.pyfoo'
params = {'Language': 'python',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no',
'Main': 'storlet.Storlet'}
with self.assertRaises(ValueError):
self.gateway_class.validate_storlet_registration(params, obj)
# wrong main class
obj = 'storlet.py'
params = {'Language': 'python',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no',
'Main': 'another_storlet.Storlet'}
with self.assertRaises(ValueError):
self.gateway_class.validate_storlet_registration(params, obj)
obj = 'storlet.py'
params = {'Language': 'python',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no',
'Main': 'storlet'}
with self.assertRaises(ValueError):
self.gateway_class.validate_storlet_registration(params, obj)
obj = 'storlet.py'
params = {'Language': 'python',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no',
'Main': 'storlet.foo.Storlet'}
with self.assertRaises(ValueError):
self.gateway_class.validate_storlet_registration(params, obj)
def test_validate_storlet_registration_not_suppoeted(self):
# unsupported language
obj = 'storlet.foo'
params = {'Language': 'bar',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no',
'Main': 'path.to.storlet.class'}
with self.assertRaises(ValueError):
self.gateway_class.validate_storlet_registration(params, obj)
# same name for storlet and dependency
obj = 'storlet-1.0.jar'
params = {'Language': 'java',
'Interface-Version': '1.0',
'Dependency': 'storlet-1.0.jar',
'Object-Metadata': 'no',
'Main': 'path.to.storlet.class'}
with self.assertRaises(ValueError):
self.gateway_class.validate_storlet_registration(params, obj)
# duplicated name in dependencies
obj = 'storlet-1.0.jar'
params = {'Language': 'java',
'Interface-Version': '1.0',
'Dependency': 'dep_file,dep_file',
'Object-Metadata': 'no',
'Main': 'path.to.storlet.class'}
with self.assertRaises(ValueError):
self.gateway_class.validate_storlet_registration(params, obj)
def test_validate_dependency_registration(self):
# w/o dependency parameter
obj = 'dep_file'
params = {'Dependency-Version': '1.0'}
self.gateway_class.validate_dependency_registration(params, obj)
# w/ correct dependency parameter
params = {
'Dependency-Permissions': '755',
'Dependency-Version': '1.0'}
self.gateway_class.validate_dependency_registration(params, obj)
# w/ wrong dependency parameter
params = {
'Dependency-Permissions': '400',
'Dependency-Version': '1.0'}
with self.assertRaises(ValueError):
self.gateway_class.validate_dependency_registration(params, obj)
# w/ invalid dependency parameter
params = {
'Dependency-Permissions': 'foo',
'Dependency-Version': '1.0'}
with self.assertRaises(ValueError):
self.gateway_class.validate_dependency_registration(params, obj)
params = {
'Dependency-Permissions': '888',
'Dependency-Version': '1.0'}
with self.assertRaises(ValueError):
self.gateway_class.validate_dependency_registration(params, obj)
def _test_invocation_flow(self, extra_sources=None):
extra_sources = extra_sources or []
options = {'generate_log': False,
'scope': 'AUTH_account',
'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_dependency': 'dep1,dep2',
'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep')}
st_req = ContainerStorletRequest(
storlet_id=self.sobj,
params={},
user_metadata={},
data_iter=iter('body'), options=options)
# TODO(kota_): need more efficient way for emuration of return value
# from SDaemon
value_generator = iter([
# first, we get metadata json
json.dumps({'metadata': 'return'}),
# then we get object data
'something', '',
])
def mock_read(fd, size):
try:
value = next(value_generator)
except StopIteration:
raise Exception('called more then expected')
# NOTE(takashi): Make sure that we return bytes in PY3
return value.encode('utf-8')
def mock_close(fd):
pass
called_fd_and_bodies = []
invocation_protocol = \
'storlets.gateway.gateways.container.runtime.' \
'StorletInvocationProtocol._write_input_data'
def mock_writer(self, fd, app_iter):
body = ''
for chunk in app_iter:
body += chunk
called_fd_and_bodies.append((fd, body))
# prepare nested mock patch
# SBus -> mock SBus.send() for container communication
# os.read -> mock reading the file descriptor from container
# select.select -> mock fd communication which can be readable
@mock.patch('storlets.gateway.gateways.container.runtime.SBusClient')
@mock.patch('storlets.gateway.gateways.container.runtime.os.read',
mock_read)
@mock.patch('storlets.gateway.gateways.container.runtime.os.close',
mock_close)
@mock.patch('storlets.gateway.gateways.container.runtime.select.'
'select',
lambda r, w, x, timeout=None: (r, w, x))
@mock.patch('storlets.gateway.common.stob.os.read', mock_read)
@mock.patch(invocation_protocol, mock_writer)
def test_invocation_flow(client):
client.ping.return_value = SBusResponse(True, 'OK')
client.stop_daemon.return_value = SBusResponse(True, 'OK')
client.start_daemon.return_value = SBusResponse(True, 'OK')
client.execute.return_value = SBusResponse(True, 'OK', 'someid')
sresp = self.gateway.invocation_flow(st_req, extra_sources)
eventlet.sleep(0.1)
file_like = FileLikeIter(sresp.data_iter)
self.assertEqual(b'something', file_like.read())
# I hate the decorator to return an instance but to track current
# implementation, we have to make a mock class for this. Need to fix.
class MockFileManager(object):
def get_storlet(self, req):
return BytesIO(b'mock'), None
def get_dependency(self, req):
return BytesIO(b'mock'), None
st_req.file_manager = MockFileManager()
test_invocation_flow()
# ensure st_req.app_iter is drawn
self.assertRaises(StopIteration, next, st_req.data_iter)
expected_mock_writer_calls = len(extra_sources) + 1
self.assertEqual(expected_mock_writer_calls,
len(called_fd_and_bodies))
self.assertEqual('body', called_fd_and_bodies[0][1])
return called_fd_and_bodies
def test_invocation_flow(self):
self._test_invocation_flow()
def test_invocation_flow_with_extra_sources(self):
options = {'generate_log': False,
'scope': 'AUTH_account',
'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_dependency': 'dep1,dep2',
'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep')}
data_sources = []
def generate_extra_st_request():
# This works similarly with build_storlet_request
# TODO(kota_): think of more generarl way w/o
# build_storlet_request
sw_req = Request.blank(
self.req_path, environ={'REQUEST_METHOD': 'GET'},
headers={'X-Run-Storlet': self.sobj})
sw_resp = Response(
app_iter=iter(['This is a response body']), status=200)
st_req = ContainerStorletRequest(
storlet_id=sw_req.headers['X-Run-Storlet'],
params=sw_req.params,
user_metadata={},
data_iter=sw_resp.app_iter, options=options)
data_sources.append(sw_resp.app_iter)
return st_req
extra_request = generate_extra_st_request()
mock_calls = self._test_invocation_flow(
extra_sources=[extra_request])
self.assertEqual('This is a response body', mock_calls[1][1])
# run all existing eventlet threads
for app_iter in data_sources:
# ensure all app_iters are drawn
self.assertRaises(StopIteration, next, app_iter)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,411 @@
# Copyright (c) 2010-2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from contextlib import contextmanager
import errno
from io import StringIO
import os
from stat import ST_MODE
import tempfile
import unittest
from unittest import mock
from storlets.sbus.client import SBusResponse
from storlets.sbus.client.exceptions import SBusClientIOError
from storlets.gateway.common.exceptions import StorletRuntimeException, \
StorletTimeout
from storlets.gateway.gateways.container.gateway import ContainerStorletRequest
from storlets.gateway.gateways.container.runtime import \
RunTimePaths, StorletInvocationProtocol
from tests.unit import FakeLogger, with_tempdir
from tests.unit.gateway.gateways import FakeFileManager
@contextmanager
def _mock_os_pipe(bufs):
class FakeFd(object):
def __init__(self, rbuf=''):
self.rbuf = rbuf.encode('utf-8')
self.closed = False
def read(self, size):
size = min(len(self.rbuf), size)
ret = self.rbuf[:size]
self.rbuf = self.rbuf[size:]
return ret
def close(self):
if self.closed:
raise OSError(errno.EBADF, os.strerror(errno.EBADF))
self.closed = True
def fake_os_read(fd, size):
return fd.read(size)
def fake_os_close(fd):
fd.close()
pipes = [(FakeFd(buf), FakeFd()) for buf in bufs]
pipe_generator = iter(pipes)
def mock_os_pipe():
try:
return next(pipe_generator)
except StopIteration:
raise AssertionError('pipe called more than expected')
with mock.patch('storlets.gateway.gateways.container.runtime.os.pipe',
mock_os_pipe), \
mock.patch('storlets.gateway.gateways.container.runtime.os.read',
fake_os_read) as fake_os_read,\
mock.patch('storlets.gateway.gateways.container.runtime.os.close',
fake_os_close) as fake_os_close:
yield pipes
class TestRuntimePaths(unittest.TestCase):
def setUp(self):
self.scope = '0123456789abc'
self._initialize()
def _initialize(self):
# TODO(takashi): take these values from config file
base_dir = '/var/lib/storlets'
self.script_dir = os.path.join(base_dir, 'scripts')
self.pipes_dir = os.path.join(base_dir, 'pipes', 'scopes')
self.storlets_dir = os.path.join(base_dir, 'storlets', 'scopes')
self.log_dir = os.path.join(base_dir, 'logs', 'scopes')
self.cache_dir = os.path.join(base_dir, 'cache', 'scopes')
self.conf = {}
self.storlet_id = 'org.openstack.storlet.mystorlet'
self.paths = RunTimePaths(self.scope, self.conf)
def tearDown(self):
pass
def test_host_pipe_dir(self):
self.assertEqual(
os.path.join(self.pipes_dir, self.scope),
self.paths.host_pipe_dir)
def test_create_host_pipe_dir(self):
pipedir = self.paths.host_pipe_dir
# When the directory exists
with mock.patch('os.path.exists', return_value=True), \
mock.patch('os.makedirs') as m, \
mock.patch('os.chmod') as c:
self.assertEqual(os.path.join(self.pipes_dir, self.scope),
self.paths.create_host_pipe_dir())
self.assertEqual(0, m.call_count)
cargs, ckwargs = c.call_args
# Make sure about the target directory
self.assertEqual(cargs[0], pipedir)
# When the directory does not exist
with mock.patch('os.path.exists', return_value=False), \
mock.patch('os.makedirs') as m, \
mock.patch('os.chmod') as c:
self.assertEqual(os.path.join(self.pipes_dir, self.scope),
self.paths.create_host_pipe_dir())
self.assertEqual(1, m.call_count)
# Make sure about the target directory
margs, mkwargs = m.call_args
self.assertEqual(margs[0], pipedir)
cargs, ckwargs = c.call_args
self.assertEqual(cargs[0], pipedir)
def test_host_factory_pipe(self):
self.assertEqual(
self.paths.host_factory_pipe,
os.path.join(self.pipes_dir, self.scope, 'factory_pipe'))
def test_get_host_storlet_pipe(self):
self.assertEqual(
os.path.join(self.pipes_dir, self.scope, self.storlet_id),
self.paths.get_host_storlet_pipe(self.storlet_id))
def test_get_sbox_storlet_pipe(self):
self.assertEqual(
os.path.join('/mnt/channels', self.storlet_id),
self.paths.get_sbox_storlet_pipe(self.storlet_id))
def test_get_sbox_storlet_dir(self):
self.assertEqual(
os.path.join('/home/swift', self.storlet_id),
self.paths.get_sbox_storlet_dir(self.storlet_id))
def test_host_storlet_base_dir(self):
self.assertEqual(
self.paths.host_storlet_base_dir,
os.path.join(self.storlets_dir, self.scope))
def test_get_host_storlet_dir(self):
self.assertEqual(
os.path.join(self.storlets_dir, self.scope, self.storlet_id),
self.paths.get_host_storlet_dir(self.storlet_id))
def test_get_host_slog_path(self):
self.assertEqual(
os.path.join(self.log_dir, self.scope, self.storlet_id,
'storlet_invoke.log'),
self.paths.get_host_slog_path(self.storlet_id))
def test_host_storlet_cache_dir(self):
self.assertEqual(
os.path.join(self.cache_dir, self.scope, 'storlet'),
self.paths.host_storlet_cache_dir)
def test_host_dependency_cache_dir(self):
self.assertEqual(
os.path.join(self.cache_dir, self.scope, 'dependency'),
self.paths.host_dependency_cache_dir)
def test_runtime_paths_default(self):
# CHECK: docs says we need 4 dirs for communicate
# ====================================================================
# |1| host_factory_pipe_path | <pipes_dir>/<scope>/factory_pipe |
# ====================================================================
# |2| host_storlet_pipe_path | <pipes_dir>/<scope>/<storlet_id> |
# ====================================================================
# |3| sandbox_factory_pipe_path | /mnt/channels/factory_pipe |
# ====================================================================
# |4| sandbox_storlet_pipe_path | /mnt/channels/<storlet_id> |
# ====================================================================
#
# With this test, the scope value is "account" and the storlet_id is
# "Storlet-1.0.jar" (app name?)
# ok, let's check for these values
runtime_paths = RunTimePaths('account', {})
storlet_id = 'Storlet-1.0.jar'
# For pipe
self.assertEqual('/var/lib/storlets/pipes/scopes/account',
runtime_paths.host_pipe_dir)
# 1. host_factory_pipe_path <pipes_dir>/<scope>/factory_pipe
self.assertEqual(
'/var/lib/storlets/pipes/scopes/account/factory_pipe',
runtime_paths.host_factory_pipe)
# 2. host_storlet_pipe_path <pipes_dir>/<scope>/<storlet_id>
self.assertEqual(
'/var/lib/storlets/pipes/scopes/account/Storlet-1.0.jar',
runtime_paths.get_host_storlet_pipe(storlet_id))
# 3. Yes, right now, we don't have the path for #3 in Python
# 4. sandbox_storlet_pipe_path | /mnt/channels/<storlet_id>
self.assertEqual('/mnt/channels/Storlet-1.0.jar',
runtime_paths.get_sbox_storlet_pipe(storlet_id))
# This looks like for jar load?
self.assertEqual('/var/lib/storlets/storlets/scopes/account',
runtime_paths.host_storlet_base_dir)
self.assertEqual(
'/var/lib/storlets/storlets/scopes/account/Storlet-1.0.jar',
runtime_paths.get_host_storlet_dir(storlet_id))
# And this one is a mount point in sand box?
self.assertEqual('/home/swift/Storlet-1.0.jar',
runtime_paths.get_sbox_storlet_dir(storlet_id))
@with_tempdir
def test_create_host_pipe_dir_with_real_dir(self, temp_dir):
runtime_paths = RunTimePaths('account', {'host_root': temp_dir})
runtime_paths.create_host_pipe_dir()
path = runtime_paths.host_pipe_dir
self.assertTrue(os.path.exists(path))
self.assertTrue(os.path.isdir(path))
permission = oct(os.stat(path)[ST_MODE])[-3:]
# TODO(kota_): make sure if this is really acceptable
self.assertEqual('777', permission)
class TestRuntimePathsTempauth(TestRuntimePaths):
def setUp(self):
self.scope = 'test'
self._initialize()
class TestStorletInvocationProtocol(unittest.TestCase):
def setUp(self):
self.pipe_path = tempfile.mktemp()
self.log_file = tempfile.mktemp()
self.logger = FakeLogger()
self.storlet_id = 'Storlet-1.0.jar'
self.options = {'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_dependency': 'dep1,dep2',
'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep')}
storlet_request = ContainerStorletRequest(
self.storlet_id, {}, {}, iter(StringIO()), options=self.options)
self.protocol = StorletInvocationProtocol(
storlet_request, self.pipe_path, self.log_file, 1, self.logger)
def tearDown(self):
for path in [self.pipe_path, self.log_file]:
try:
os.unlink(path)
except OSError:
pass
def test_send_execute_command(self):
with mock.patch('storlets.gateway.gateways.container.runtime.'
'SBusClient.execute') as execute:
execute.return_value = SBusResponse(True, 'OK', 'someid')
with self.protocol.storlet_logger.activate():
self.protocol._send_execute_command()
self.assertEqual('someid', self.protocol.task_id)
with mock.patch('storlets.gateway.gateways.container.runtime.'
'SBusClient.execute') as execute:
execute.return_value = SBusResponse(True, 'OK')
with self.assertRaises(StorletRuntimeException):
with self.protocol.storlet_logger.activate():
self.protocol._send_execute_command()
with mock.patch('storlets.gateway.gateways.container.runtime.'
'SBusClient.execute') as execute:
execute.return_value = SBusResponse(False, 'NG', 'someid')
with self.assertRaises(StorletRuntimeException):
with self.protocol.storlet_logger.activate():
self.protocol._send_execute_command()
with mock.patch('storlets.gateway.gateways.container.runtime.'
'SBusClient.execute') as execute:
execute.side_effect = SBusClientIOError()
with self.assertRaises(StorletRuntimeException):
with self.protocol.storlet_logger.activate():
self.protocol._send_execute_command()
def test_invocation_protocol(self):
# os.pipe will be called 3 times
pipe_called = 3
with _mock_os_pipe([''] * pipe_called) as pipes:
with mock.patch.object(self.protocol,
'_wait_for_read_with_timeout'), \
mock.patch.object(self.protocol, '_send_execute_command'):
self.protocol._invoke()
self.assertEqual(pipe_called, len(pipes))
pipes = iter(pipes)
# data write is not directly closed
# data read is closed
input_data_read_fd, input_data_write_fd = next(pipes)
self.assertTrue(input_data_read_fd.closed)
self.assertFalse(input_data_write_fd.closed)
# data write is closed but data read is still open
data_read_fd, data_write_fd = next(pipes)
self.assertFalse(data_read_fd.closed)
self.assertTrue(data_write_fd.closed)
# metadata write fd is closed, metadata read fd is still open.
metadata_read_fd, metadata_write_fd = next(pipes)
self.assertFalse(metadata_read_fd.closed)
self.assertTrue(metadata_write_fd.closed)
# sanity
self.assertRaises(StopIteration, next, pipes)
def test_invocation_protocol_remote_fds(self):
# In default, we have 4 fds in remote_fds
storlet_request = ContainerStorletRequest(
self.storlet_id, {}, {}, iter(StringIO()), options=self.options)
protocol = StorletInvocationProtocol(
storlet_request, self.pipe_path, self.log_file, 1, self.logger)
with protocol.storlet_logger.activate():
self.assertEqual(4, len(protocol.remote_fds))
# extra_resources expands the remote_fds
storlet_request = ContainerStorletRequest(
self.storlet_id, {}, {}, iter(StringIO()), options=self.options)
protocol = StorletInvocationProtocol(
storlet_request, self.pipe_path, self.log_file, 1, self.logger,
extra_sources=[storlet_request])
with protocol.storlet_logger.activate():
self.assertEqual(5, len(protocol.remote_fds))
# 2 more extra_resources expands the remote_fds
storlet_request = ContainerStorletRequest(
self.storlet_id, {}, {}, iter(StringIO()), options=self.options)
protocol = StorletInvocationProtocol(
storlet_request, self.pipe_path, self.log_file, 1, self.logger,
extra_sources=[storlet_request] * 3)
with protocol.storlet_logger.activate():
self.assertEqual(7, len(protocol.remote_fds))
def test_open_writer_with_invalid_fd(self):
invalid_fds = (
(None, TypeError), (-1, ValueError), ('blah', TypeError))
for invalid_fd, expected_error in invalid_fds:
with self.assertRaises(expected_error):
with self.protocol._open_writer(invalid_fd):
pass
def _test_writer_with_exception(self, exception_cls):
pipes = [os.pipe()]
def raise_in_the_context():
with self.protocol._open_writer(pipes[0][1]):
raise exception_cls()
try:
# writer context doesn't suppress any exception
self.assertRaises(exception_cls, raise_in_the_context)
# since _open_writer closes the write fd, the os.close will fail as
# BadFileDescriptor
with self.assertRaises(OSError) as os_error:
os.close(pipes[0][1])
self.assertEqual(9, os_error.exception.errno)
finally:
for fd in pipes[0]:
try:
os.close(fd)
except OSError:
pass
def test_writer_raise_while_in_writer_context(self):
# basic storlet timeout
self._test_writer_with_exception(StorletTimeout)
# unexpected IOError
self._test_writer_with_exception(IOError)
# else
self._test_writer_with_exception(Exception)
class TestStorletInvocationProtocolPython(TestStorletInvocationProtocol):
def setUp(self):
self.pipe_path = tempfile.mktemp()
self.log_file = tempfile.mktemp()
self.logger = FakeLogger()
self.storlet_id = 'Storlet-1.0.py'
self.options = {'storlet_main': 'storlet.Storlet',
'storlet_dependency': 'dep1,dep2',
'storlet_language': 'python',
'language_version': '3.6',
'file_manager': FakeFileManager('storlet', 'dep')}
storlet_request = ContainerStorletRequest(
self.storlet_id, {}, {}, iter(StringIO()), options=self.options)
self.protocol = StorletInvocationProtocol(
storlet_request, self.pipe_path, self.log_file, 1, self.logger)
if __name__ == '__main__':
unittest.main()

View File

@ -13,554 +13,17 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import eventlet
from io import BytesIO, StringIO
import json
from shutil import rmtree
from tempfile import mkdtemp
import unittest import unittest
from unittest import mock
from swift.common.swob import Request, Response from storlets.gateway.gateways.docker.gateway import \
from swift.common.utils import FileLikeIter DockerStorletGateway
from tests.unit.gateway.gateways.container.test_gateway import \
ContainerGatewayTestMixin
from storlets.sbus.client import SBusResponse
from tests.unit import FakeLogger class TestStorletDockerGateway(ContainerGatewayTestMixin, unittest.TestCase):
from tests.unit.gateway.gateways import FakeFileManager
from storlets.gateway.gateways.docker.gateway import DockerStorletRequest, \
StorletGatewayDocker
gateway_class = DockerStorletGateway
class MockInternalClient(object):
def __init__(self):
pass
class TestDockerStorletRequest(unittest.TestCase):
def test_init(self):
# Java
storlet_id = 'Storlet-1.0.jar'
params = {'Param1': 'Value1', 'Param2': 'Value2'}
metadata = {'MetaKey1': 'MetaValue1', 'MetaKey2': 'MetaValue2'}
# with dependencies
options = {'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_dependency': 'dep1,dep2',
'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep')}
dsreq = DockerStorletRequest(storlet_id, params, metadata,
iter(StringIO()), options=options)
self.assertEqual(metadata, dsreq.user_metadata)
self.assertEqual(params, dsreq.params)
self.assertEqual('Storlet-1.0.jar', dsreq.storlet_id)
self.assertEqual('org.openstack.storlet.Storlet', dsreq.storlet_main)
self.assertEqual(['dep1', 'dep2'], dsreq.dependencies)
self.assertEqual('java', dsreq.storlet_language)
self.assertIsNone(dsreq.storlet_language_version)
# without dependencies
options = {'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep')}
dsreq = DockerStorletRequest(storlet_id, params, metadata,
iter(StringIO()), options=options)
self.assertEqual(metadata, dsreq.user_metadata)
self.assertEqual(params, dsreq.params)
self.assertEqual('Storlet-1.0.jar', dsreq.storlet_id)
self.assertEqual('org.openstack.storlet.Storlet', dsreq.storlet_main)
self.assertEqual([], dsreq.dependencies)
self.assertEqual('java', dsreq.storlet_language)
self.assertIsNone(dsreq.storlet_language_version)
# storlet_language is not given
options = {'storlet_main': 'org.openstack.storlet.Storlet',
'file_manager': FakeFileManager('storlet', 'dep')}
with self.assertRaises(ValueError):
DockerStorletRequest(storlet_id, params, metadata,
iter(StringIO()), options=options)
# storlet_main is not given
options = {'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep')}
with self.assertRaises(ValueError):
DockerStorletRequest(storlet_id, params, metadata,
iter(StringIO()), options=options)
# file_manager is not given
options = {'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_language': 'java'}
with self.assertRaises(ValueError):
DockerStorletRequest(storlet_id, params, metadata,
iter(StringIO()), options=options)
# Python
storlet_id = 'storlet.py'
params = {'Param1': 'Value1', 'Param2': 'Value2'}
metadata = {'MetaKey1': 'MetaValue1', 'MetaKey2': 'MetaValue2'}
# without language version
options = {'storlet_main': 'storlet.Storlet',
'storlet_language': 'python',
'file_manager': FakeFileManager('storlet', 'dep')}
dsreq = DockerStorletRequest(storlet_id, params, metadata,
iter(StringIO()), options=options)
self.assertEqual(metadata, dsreq.user_metadata)
self.assertEqual(params, dsreq.params)
self.assertEqual('storlet.py', dsreq.storlet_id)
self.assertEqual('storlet.Storlet', dsreq.storlet_main)
self.assertEqual([], dsreq.dependencies)
self.assertEqual('python', dsreq.storlet_language)
self.assertIsNone(dsreq.storlet_language_version)
# with language version
options = {'storlet_main': 'storlet.Storlet',
'storlet_language': 'python',
'storlet_language_version': '3.6',
'file_manager': FakeFileManager('storlet', 'dep')}
dsreq = DockerStorletRequest(storlet_id, params, metadata,
iter(StringIO()), options=options)
self.assertEqual(metadata, dsreq.user_metadata)
self.assertEqual(params, dsreq.params)
self.assertEqual('storlet.py', dsreq.storlet_id)
self.assertEqual('storlet.Storlet', dsreq.storlet_main)
self.assertEqual([], dsreq.dependencies)
self.assertEqual('python', dsreq.storlet_language)
self.assertEqual('3.6', dsreq.storlet_language_version)
def test_init_with_range(self):
storlet_id = 'Storlet-1.0.jar'
params = {}
metadata = {}
options = {'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_dependency': 'dep1,dep2',
'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep'),
'range_start': 1,
'range_end': 6}
dsreq = DockerStorletRequest(storlet_id, params, metadata,
None, 0, options=options)
self.assertEqual('Storlet-1.0.jar', dsreq.storlet_id)
self.assertEqual('org.openstack.storlet.Storlet', dsreq.storlet_main)
self.assertEqual(['dep1', 'dep2'], dsreq.dependencies)
self.assertEqual('java', dsreq.storlet_language)
self.assertIsNone(dsreq.storlet_language_version)
self.assertEqual(1, dsreq.start)
self.assertEqual(6, dsreq.end)
options = {'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_dependency': 'dep1,dep2',
'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep'),
'range_start': 0,
'range_end': 0}
dsreq = DockerStorletRequest(storlet_id, params, metadata,
None, 0, options=options)
self.assertEqual('Storlet-1.0.jar', dsreq.storlet_id)
self.assertEqual('org.openstack.storlet.Storlet', dsreq.storlet_main)
self.assertEqual(['dep1', 'dep2'], dsreq.dependencies)
self.assertEqual('java', dsreq.storlet_language)
self.assertIsNone(dsreq.storlet_language_version)
self.assertEqual(0, dsreq.start)
self.assertEqual(0, dsreq.end)
def test_has_range(self):
storlet_id = 'Storlet-1.0.jar'
params = {}
metadata = {}
options = {'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_dependency': 'dep1,dep2',
'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep')}
dsreq = DockerStorletRequest(storlet_id, params, metadata,
None, 0, options=options)
self.assertFalse(dsreq.has_range)
options = {'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_dependency': 'dep1,dep2',
'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep'),
'range_start': 1,
'range_end': 6}
dsreq = DockerStorletRequest(storlet_id, params, metadata,
None, 0, options=options)
self.assertTrue(dsreq.has_range)
options = {'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_dependency': 'dep1,dep2',
'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep'),
'range_start': 0,
'range_end': 6}
dsreq = DockerStorletRequest(storlet_id, params, metadata,
None, 0, options=options)
self.assertTrue(dsreq.has_range)
class TestStorletDockerGateway(unittest.TestCase):
def setUp(self):
# TODO(takashi): take these values from config file
self.tempdir = mkdtemp()
self.sconf = {
'host_root': self.tempdir,
'swift_dir': self.tempdir,
'storlet_timeout': '9',
'storlet_container': 'storlet',
'storlet_dependency': 'dependency',
'reseller_prefix': 'AUTH'
}
self.logger = FakeLogger()
self.storlet_container = self.sconf['storlet_container']
self.storlet_dependency = self.sconf['storlet_dependency']
self.version = 'v1'
self.account = 'AUTH_account'
self.container = 'container'
self.obj = 'object'
self.sobj = 'storlet-1.0.jar'
self.gateway = StorletGatewayDocker(
self.sconf, self.logger, self.account)
def tearDown(self):
rmtree(self.tempdir)
@property
def req_path(self):
return self._create_proxy_path(
self.version, self.account, self.container,
self.obj)
@property
def storlet_path(self):
return self._create_proxy_path(
self.version, self.account, self.storlet_container,
self.sobj)
def _create_proxy_path(self, version, account, container, obj):
return '/'.join(['', version, account, container, obj])
def test_check_mandatory_params(self):
params = {'keyA': 'valueA',
'keyB': 'valueB',
'keyC': 'valueC'}
# all mandatory headers are included
StorletGatewayDocker._check_mandatory_params(
params, ['keyA', 'keyB'])
# some of mandatory headers are missing
with self.assertRaises(ValueError):
StorletGatewayDocker._check_mandatory_params(
params, ['keyA', 'KeyD'])
def test_validate_storlet_registration_java(self):
# correct name and headers w/ dependency
obj = 'storlet-1.0.jar'
params = {'Language': 'java',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no',
'Main': 'path.to.storlet.class'}
StorletGatewayDocker.validate_storlet_registration(params, obj)
# correct name and headers w/o dependency
obj = 'storlet-1.0.jar'
params = {'Language': 'java',
'Interface-Version': '1.0',
'Object-Metadata': 'no',
'Main': 'path.to.storlet.class'}
StorletGatewayDocker.validate_storlet_registration(params, obj)
# some header keys are missing
params = {'Language': 'java',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no'}
with self.assertRaises(ValueError):
StorletGatewayDocker.validate_storlet_registration(params, obj)
# wrong name
obj = 'storlet.jar'
params = {'Language': 'java',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no',
'Main': 'path.to.storlet.class'}
with self.assertRaises(ValueError):
StorletGatewayDocker.validate_storlet_registration(params, obj)
def test_validate_storlet_registration_python(self):
# correct name and headers w/ dependency
obj = 'storlet.py'
params = {'Language': 'python',
'Language-Version': '3.6',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no',
'Main': 'storlet.Storlet'}
StorletGatewayDocker.validate_storlet_registration(params, obj)
# wrong version
obj = 'storlet.py'
params = {'Language': 'python',
'Language-Version': '1.7',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no',
'Main': 'storlet.Storlet'}
with self.assertRaises(ValueError):
StorletGatewayDocker.validate_storlet_registration(params, obj)
# py2 is no more supported
obj = 'storlet.py'
params = {'Language': 'python',
'Language-Version': '2.7',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no',
'Main': 'storlet.Storlet'}
with self.assertRaises(ValueError):
StorletGatewayDocker.validate_storlet_registration(params, obj)
# wrong name
obj = 'storlet.pyfoo'
params = {'Language': 'python',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no',
'Main': 'storlet.Storlet'}
with self.assertRaises(ValueError):
StorletGatewayDocker.validate_storlet_registration(params, obj)
# wrong main class
obj = 'storlet.py'
params = {'Language': 'python',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no',
'Main': 'another_storlet.Storlet'}
with self.assertRaises(ValueError):
StorletGatewayDocker.validate_storlet_registration(params, obj)
obj = 'storlet.py'
params = {'Language': 'python',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no',
'Main': 'storlet'}
with self.assertRaises(ValueError):
StorletGatewayDocker.validate_storlet_registration(params, obj)
obj = 'storlet.py'
params = {'Language': 'python',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no',
'Main': 'storlet.foo.Storlet'}
with self.assertRaises(ValueError):
StorletGatewayDocker.validate_storlet_registration(params, obj)
def test_validate_storlet_registration_not_suppoeted(self):
# unsupported language
obj = 'storlet.foo'
params = {'Language': 'bar',
'Interface-Version': '1.0',
'Dependency': 'dep_file',
'Object-Metadata': 'no',
'Main': 'path.to.storlet.class'}
with self.assertRaises(ValueError):
StorletGatewayDocker.validate_storlet_registration(params, obj)
# same name for storlet and dependency
obj = 'storlet-1.0.jar'
params = {'Language': 'java',
'Interface-Version': '1.0',
'Dependency': 'storlet-1.0.jar',
'Object-Metadata': 'no',
'Main': 'path.to.storlet.class'}
with self.assertRaises(ValueError):
StorletGatewayDocker.validate_storlet_registration(params, obj)
# duplicated name in dependencies
obj = 'storlet-1.0.jar'
params = {'Language': 'java',
'Interface-Version': '1.0',
'Dependency': 'dep_file,dep_file',
'Object-Metadata': 'no',
'Main': 'path.to.storlet.class'}
with self.assertRaises(ValueError):
StorletGatewayDocker.validate_storlet_registration(params, obj)
def test_validate_dependency_registration(self):
# w/o dependency parameter
obj = 'dep_file'
params = {'Dependency-Version': '1.0'}
StorletGatewayDocker.validate_dependency_registration(params, obj)
# w/ correct dependency parameter
params = {
'Dependency-Permissions': '755',
'Dependency-Version': '1.0'}
StorletGatewayDocker.validate_dependency_registration(params, obj)
# w/ wrong dependency parameter
params = {
'Dependency-Permissions': '400',
'Dependency-Version': '1.0'}
with self.assertRaises(ValueError):
StorletGatewayDocker.validate_dependency_registration(params, obj)
# w/ invalid dependency parameter
params = {
'Dependency-Permissions': 'foo',
'Dependency-Version': '1.0'}
with self.assertRaises(ValueError):
StorletGatewayDocker.validate_dependency_registration(params, obj)
params = {
'Dependency-Permissions': '888',
'Dependency-Version': '1.0'}
with self.assertRaises(ValueError):
StorletGatewayDocker.validate_dependency_registration(params, obj)
def _test_docker_gateway_communicate(self, extra_sources=None):
extra_sources = extra_sources or []
options = {'generate_log': False,
'scope': 'AUTH_account',
'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_dependency': 'dep1,dep2',
'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep')}
st_req = DockerStorletRequest(
storlet_id=self.sobj,
params={},
user_metadata={},
data_iter=iter('body'), options=options)
# TODO(kota_): need more efficient way for emuration of return value
# from SDaemon
value_generator = iter([
# first, we get metadata json
json.dumps({'metadata': 'return'}),
# then we get object data
'something', '',
])
def mock_read(fd, size):
try:
value = next(value_generator)
except StopIteration:
raise Exception('called more then expected')
# NOTE(takashi): Make sure that we return bytes in PY3
return value.encode('utf-8')
def mock_close(fd):
pass
called_fd_and_bodies = []
invocation_protocol = \
'storlets.gateway.gateways.docker.runtime.' \
'StorletInvocationProtocol._write_input_data'
def mock_writer(self, fd, app_iter):
body = ''
for chunk in app_iter:
body += chunk
called_fd_and_bodies.append((fd, body))
# prepare nested mock patch
# SBus -> mock SBus.send() for container communication
# os.read -> mock reading the file descriptor from container
# select.select -> mock fd communication which can be readable
@mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient')
@mock.patch('storlets.gateway.gateways.docker.runtime.os.read',
mock_read)
@mock.patch('storlets.gateway.gateways.docker.runtime.os.close',
mock_close)
@mock.patch('storlets.gateway.gateways.docker.runtime.select.select',
lambda r, w, x, timeout=None: (r, w, x))
@mock.patch('storlets.gateway.common.stob.os.read', mock_read)
@mock.patch(invocation_protocol, mock_writer)
def test_invocation_flow(client):
client.ping.return_value = SBusResponse(True, 'OK')
client.stop_daemon.return_value = SBusResponse(True, 'OK')
client.start_daemon.return_value = SBusResponse(True, 'OK')
client.execute.return_value = SBusResponse(True, 'OK', 'someid')
sresp = self.gateway.invocation_flow(st_req, extra_sources)
eventlet.sleep(0.1)
file_like = FileLikeIter(sresp.data_iter)
self.assertEqual(b'something', file_like.read())
# I hate the decorator to return an instance but to track current
# implementation, we have to make a mock class for this. Need to fix.
class MockFileManager(object):
def get_storlet(self, req):
return BytesIO(b'mock'), None
def get_dependency(self, req):
return BytesIO(b'mock'), None
st_req.file_manager = MockFileManager()
test_invocation_flow()
# ensure st_req.app_iter is drawn
self.assertRaises(StopIteration, next, st_req.data_iter)
expected_mock_writer_calls = len(extra_sources) + 1
self.assertEqual(expected_mock_writer_calls,
len(called_fd_and_bodies))
self.assertEqual('body', called_fd_and_bodies[0][1])
return called_fd_and_bodies
def test_docker_gateway_communicate(self):
self._test_docker_gateway_communicate()
def test_docker_gateway_communicate_with_extra_sources(self):
options = {'generate_log': False,
'scope': 'AUTH_account',
'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_dependency': 'dep1,dep2',
'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep')}
data_sources = []
def generate_extra_st_request():
# This works similarly with build_storlet_request
# TODO(kota_): think of more generarl way w/o
# build_storlet_request
sw_req = Request.blank(
self.req_path, environ={'REQUEST_METHOD': 'GET'},
headers={'X-Run-Storlet': self.sobj})
sw_resp = Response(
app_iter=iter(['This is a response body']), status=200)
st_req = DockerStorletRequest(
storlet_id=sw_req.headers['X-Run-Storlet'],
params=sw_req.params,
user_metadata={},
data_iter=sw_resp.app_iter, options=options)
data_sources.append(sw_resp.app_iter)
return st_req
extra_request = generate_extra_st_request()
mock_calls = self._test_docker_gateway_communicate(
extra_sources=[extra_request])
self.assertEqual('This is a response body', mock_calls[1][1])
# run all existing eventlet threads
for app_iter in data_sources:
# ensure all app_iters are drawn
self.assertRaises(StopIteration, next, app_iter)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -13,12 +13,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from contextlib import contextmanager
import errno
from io import StringIO
import os
from stat import ST_MODE
import tempfile
import unittest import unittest
from unittest import mock from unittest import mock
@ -31,266 +25,57 @@ from storlets.sbus.client.exceptions import SBusClientIOError, \
SBusClientMalformedResponse, SBusClientSendError SBusClientMalformedResponse, SBusClientSendError
from storlets.gateway.common.exceptions import StorletRuntimeException, \ from storlets.gateway.common.exceptions import StorletRuntimeException, \
StorletTimeout StorletTimeout
from storlets.gateway.gateways.docker.gateway import DockerStorletRequest from storlets.gateway.gateways.docker.runtime import DockerRunTimeSandbox
from storlets.gateway.gateways.docker.runtime import RunTimeSandbox, \ from tests.unit import FakeLogger
RunTimePaths, StorletInvocationProtocol
from tests.unit import FakeLogger, with_tempdir
from tests.unit.gateway.gateways import FakeFileManager
@contextmanager class TestDockerRunTimeSandbox(unittest.TestCase):
def _mock_os_pipe(bufs):
class FakeFd(object):
def __init__(self, rbuf=''):
self.rbuf = rbuf.encode('utf-8')
self.closed = False
def read(self, size):
size = min(len(self.rbuf), size)
ret = self.rbuf[:size]
self.rbuf = self.rbuf[size:]
return ret
def close(self):
if self.closed:
raise OSError(errno.EBADF, os.strerror(errno.EBADF))
self.closed = True
def fake_os_read(fd, size):
return fd.read(size)
def fake_os_close(fd):
fd.close()
pipes = [(FakeFd(buf), FakeFd()) for buf in bufs]
pipe_generator = iter(pipes)
def mock_os_pipe():
try:
return next(pipe_generator)
except StopIteration:
raise AssertionError('pipe called more than expected')
with mock.patch('storlets.gateway.gateways.docker.runtime.os.pipe',
mock_os_pipe), \
mock.patch('storlets.gateway.gateways.docker.runtime.os.read',
fake_os_read) as fake_os_read,\
mock.patch('storlets.gateway.gateways.docker.runtime.os.close',
fake_os_close) as fake_os_close:
yield pipes
class TestRuntimePaths(unittest.TestCase):
def setUp(self):
self.scope = '0123456789abc'
self._initialize()
def _initialize(self):
# TODO(takashi): take these values from config file
base_dir = '/var/lib/storlets'
self.script_dir = os.path.join(base_dir, 'scripts')
self.pipes_dir = os.path.join(base_dir, 'pipes', 'scopes')
self.storlets_dir = os.path.join(base_dir, 'storlets', 'scopes')
self.log_dir = os.path.join(base_dir, 'logs', 'scopes')
self.cache_dir = os.path.join(base_dir, 'cache', 'scopes')
self.conf = {}
self.storlet_id = 'org.openstack.storlet.mystorlet'
self.paths = RunTimePaths(self.scope, self.conf)
def tearDown(self):
pass
def test_host_pipe_dir(self):
self.assertEqual(
os.path.join(self.pipes_dir, self.scope),
self.paths.host_pipe_dir)
def test_create_host_pipe_dir(self):
pipedir = self.paths.host_pipe_dir
# When the directory exists
with mock.patch('os.path.exists', return_value=True), \
mock.patch('os.makedirs') as m, \
mock.patch('os.chmod') as c:
self.assertEqual(os.path.join(self.pipes_dir, self.scope),
self.paths.create_host_pipe_dir())
self.assertEqual(0, m.call_count)
cargs, ckwargs = c.call_args
# Make sure about the target directory
self.assertEqual(cargs[0], pipedir)
# When the directory does not exist
with mock.patch('os.path.exists', return_value=False), \
mock.patch('os.makedirs') as m, \
mock.patch('os.chmod') as c:
self.assertEqual(os.path.join(self.pipes_dir, self.scope),
self.paths.create_host_pipe_dir())
self.assertEqual(1, m.call_count)
# Make sure about the target directory
margs, mkwargs = m.call_args
self.assertEqual(margs[0], pipedir)
cargs, ckwargs = c.call_args
self.assertEqual(cargs[0], pipedir)
def test_host_factory_pipe(self):
self.assertEqual(
self.paths.host_factory_pipe,
os.path.join(self.pipes_dir, self.scope, 'factory_pipe'))
def test_get_host_storlet_pipe(self):
self.assertEqual(
os.path.join(self.pipes_dir, self.scope, self.storlet_id),
self.paths.get_host_storlet_pipe(self.storlet_id))
def test_get_sbox_storlet_pipe(self):
self.assertEqual(
os.path.join('/mnt/channels', self.storlet_id),
self.paths.get_sbox_storlet_pipe(self.storlet_id))
def test_get_sbox_storlet_dir(self):
self.assertEqual(
os.path.join('/home/swift', self.storlet_id),
self.paths.get_sbox_storlet_dir(self.storlet_id))
def test_host_storlet_base_dir(self):
self.assertEqual(
self.paths.host_storlet_base_dir,
os.path.join(self.storlets_dir, self.scope))
def test_get_host_storlet_dir(self):
self.assertEqual(
os.path.join(self.storlets_dir, self.scope, self.storlet_id),
self.paths.get_host_storlet_dir(self.storlet_id))
def test_get_host_slog_path(self):
self.assertEqual(
os.path.join(self.log_dir, self.scope, self.storlet_id,
'storlet_invoke.log'),
self.paths.get_host_slog_path(self.storlet_id))
def test_host_storlet_cache_dir(self):
self.assertEqual(
os.path.join(self.cache_dir, self.scope, 'storlet'),
self.paths.host_storlet_cache_dir)
def test_host_dependency_cache_dir(self):
self.assertEqual(
os.path.join(self.cache_dir, self.scope, 'dependency'),
self.paths.host_dependency_cache_dir)
def test_runtime_paths_default(self):
# CHECK: docs says we need 4 dirs for communicate
# ====================================================================
# |1| host_factory_pipe_path | <pipes_dir>/<scope>/factory_pipe |
# ====================================================================
# |2| host_storlet_pipe_path | <pipes_dir>/<scope>/<storlet_id> |
# ====================================================================
# |3| sandbox_factory_pipe_path | /mnt/channels/factory_pipe |
# ====================================================================
# |4| sandbox_storlet_pipe_path | /mnt/channels/<storlet_id> |
# ====================================================================
#
# With this test, the scope value is "account" and the storlet_id is
# "Storlet-1.0.jar" (app name?)
# ok, let's check for these values
runtime_paths = RunTimePaths('account', {})
storlet_id = 'Storlet-1.0.jar'
# For pipe
self.assertEqual('/var/lib/storlets/pipes/scopes/account',
runtime_paths.host_pipe_dir)
# 1. host_factory_pipe_path <pipes_dir>/<scope>/factory_pipe
self.assertEqual(
'/var/lib/storlets/pipes/scopes/account/factory_pipe',
runtime_paths.host_factory_pipe)
# 2. host_storlet_pipe_path <pipes_dir>/<scope>/<storlet_id>
self.assertEqual(
'/var/lib/storlets/pipes/scopes/account/Storlet-1.0.jar',
runtime_paths.get_host_storlet_pipe(storlet_id))
# 3. Yes, right now, we don't have the path for #3 in Python
# 4. sandbox_storlet_pipe_path | /mnt/channels/<storlet_id>
self.assertEqual('/mnt/channels/Storlet-1.0.jar',
runtime_paths.get_sbox_storlet_pipe(storlet_id))
# This looks like for jar load?
self.assertEqual('/var/lib/storlets/storlets/scopes/account',
runtime_paths.host_storlet_base_dir)
self.assertEqual(
'/var/lib/storlets/storlets/scopes/account/Storlet-1.0.jar',
runtime_paths.get_host_storlet_dir(storlet_id))
# And this one is a mount point in sand box?
self.assertEqual('/home/swift/Storlet-1.0.jar',
runtime_paths.get_sbox_storlet_dir(storlet_id))
@with_tempdir
def test_create_host_pipe_dir_with_real_dir(self, temp_dir):
runtime_paths = RunTimePaths('account', {'host_root': temp_dir})
runtime_paths.create_host_pipe_dir()
path = runtime_paths.host_pipe_dir
self.assertTrue(os.path.exists(path))
self.assertTrue(os.path.isdir(path))
permission = oct(os.stat(path)[ST_MODE])[-3:]
# TODO(kota_): make sure if this is really acceptable
self.assertEqual('777', permission)
class TestRuntimePathsTempauth(TestRuntimePaths):
def setUp(self):
self.scope = 'test'
self._initialize()
class TestRunTimeSandbox(unittest.TestCase):
def setUp(self): def setUp(self):
self.logger = FakeLogger() self.logger = FakeLogger()
# TODO(takashi): take these values from config file # TODO(takashi): take these values from config file
self.conf = {'container_image_namespace': 'localhost:5001', self.conf = {'container_image_namespace': 'localhost:5001',
'default_container_image_name': 'defaultimage'} 'default_container_image_name': 'defaultimage'}
self.scope = '0123456789abc' self.scope = '0123456789abc'
self.sbox = RunTimeSandbox(self.scope, self.conf, self.logger) self.sbox = DockerRunTimeSandbox(self.scope, self.conf, self.logger)
def test_ping(self): def test_ping(self):
with mock.patch('storlets.gateway.gateways.docker.runtime.' with mock.patch('storlets.gateway.gateways.container.runtime.'
'SBusClient.ping') as ping: 'SBusClient.ping') as ping:
ping.return_value = SBusResponse(True, 'OK') ping.return_value = SBusResponse(True, 'OK')
self.assertTrue(self.sbox.ping()) self.assertTrue(self.sbox.ping())
with mock.patch('storlets.gateway.gateways.docker.runtime.' with mock.patch('storlets.gateway.gateways.container.runtime.'
'SBusClient.ping') as ping: 'SBusClient.ping') as ping:
ping.return_value = SBusResponse(False, 'Error') ping.return_value = SBusResponse(False, 'Error')
self.assertFalse(self.sbox.ping()) self.assertFalse(self.sbox.ping())
with mock.patch('storlets.gateway.gateways.docker.runtime.' with mock.patch('storlets.gateway.gateways.container.runtime.'
'SBusClient.ping') as ping: 'SBusClient.ping') as ping:
ping.side_effect = SBusClientSendError() ping.side_effect = SBusClientSendError()
self.assertFalse(self.sbox.ping()) self.assertFalse(self.sbox.ping())
with mock.patch('storlets.gateway.gateways.docker.runtime.' with mock.patch('storlets.gateway.gateways.container.runtime.'
'SBusClient.ping') as ping: 'SBusClient.ping') as ping:
ping.side_effect = SBusClientMalformedResponse() ping.side_effect = SBusClientMalformedResponse()
self.assertFalse(self.sbox.ping()) self.assertFalse(self.sbox.ping())
with mock.patch('storlets.gateway.gateways.docker.runtime.' with mock.patch('storlets.gateway.gateways.container.runtime.'
'SBusClient.ping') as ping: 'SBusClient.ping') as ping:
ping.side_effect = SBusClientIOError() ping.side_effect = SBusClientIOError()
self.assertFalse(self.sbox.ping()) self.assertFalse(self.sbox.ping())
def test_wait(self): def test_wait(self):
with mock.patch('storlets.gateway.gateways.docker.runtime.' with mock.patch('storlets.gateway.gateways.container.runtime.'
'SBusClient.ping') as ping, \ 'SBusClient.ping') as ping, \
mock.patch('storlets.gateway.gateways.docker.runtime.' mock.patch('storlets.gateway.gateways.container.runtime.'
'time.sleep') as sleep: 'time.sleep') as sleep:
ping.return_value = SBusResponse(True, 'OK') ping.return_value = SBusResponse(True, 'OK')
self.sbox.wait() self.sbox.wait()
self.assertEqual(sleep.call_count, 0) self.assertEqual(sleep.call_count, 0)
with mock.patch('storlets.gateway.gateways.docker.runtime.' with mock.patch('storlets.gateway.gateways.container.runtime.'
'SBusClient.ping') as ping, \ 'SBusClient.ping') as ping, \
mock.patch('storlets.gateway.gateways.docker.runtime.' mock.patch('storlets.gateway.gateways.container.runtime.'
'time.sleep') as sleep: 'time.sleep') as sleep:
ping.side_effect = [SBusResponse(False, 'Error'), ping.side_effect = [SBusResponse(False, 'Error'),
SBusResponse(True, 'OK')] SBusResponse(True, 'OK')]
@ -433,24 +218,24 @@ class TestRunTimeSandbox(unittest.TestCase):
self.assertEqual(0, mock_containers.run.call_count) self.assertEqual(0, mock_containers.run.call_count)
def test_restart(self): def test_restart(self):
with mock.patch('storlets.gateway.gateways.docker.runtime.' with mock.patch('storlets.gateway.gateways.container.runtime.'
'RunTimePaths.create_host_pipe_dir') as pipe_dir, \ 'RunTimePaths.create_host_pipe_dir') as pipe_dir, \
mock.patch('storlets.gateway.gateways.docker.runtime.' mock.patch('storlets.gateway.gateways.docker.runtime.'
'RunTimeSandbox._restart') as _restart, \ 'DockerRunTimeSandbox._restart') as _restart, \
mock.patch('storlets.gateway.gateways.docker.runtime.' mock.patch('storlets.gateway.gateways.docker.runtime.'
'RunTimeSandbox.wait') as wait: 'DockerRunTimeSandbox.wait') as wait:
self.sbox.restart() self.sbox.restart()
self.assertEqual(1, pipe_dir.call_count) self.assertEqual(1, pipe_dir.call_count)
self.assertEqual(1, _restart.call_count) self.assertEqual(1, _restart.call_count)
self.assertEqual((self.scope,), _restart.call_args.args) self.assertEqual((self.scope,), _restart.call_args.args)
self.assertEqual(1, wait.call_count) self.assertEqual(1, wait.call_count)
with mock.patch('storlets.gateway.gateways.docker.runtime.' with mock.patch('storlets.gateway.gateways.container.runtime.'
'RunTimePaths.create_host_pipe_dir') as pipe_dir, \ 'RunTimePaths.create_host_pipe_dir') as pipe_dir, \
mock.patch('storlets.gateway.gateways.docker.runtime.' mock.patch('storlets.gateway.gateways.docker.runtime.'
'RunTimeSandbox._restart') as _restart, \ 'DockerRunTimeSandbox._restart') as _restart, \
mock.patch('storlets.gateway.gateways.docker.runtime.' mock.patch('storlets.gateway.gateways.docker.runtime.'
'RunTimeSandbox.wait') as wait: 'DockerRunTimeSandbox.wait') as wait:
_restart.side_effect = [StorletRuntimeException(), None] _restart.side_effect = [StorletRuntimeException(), None]
self.sbox.restart() self.sbox.restart()
self.assertEqual(1, pipe_dir.call_count) self.assertEqual(1, pipe_dir.call_count)
@ -461,12 +246,12 @@ class TestRunTimeSandbox(unittest.TestCase):
_restart.call_args_list[1].args) _restart.call_args_list[1].args)
self.assertEqual(1, wait.call_count) self.assertEqual(1, wait.call_count)
with mock.patch('storlets.gateway.gateways.docker.runtime.' with mock.patch('storlets.gateway.gateways.container.runtime.'
'RunTimePaths.create_host_pipe_dir') as pipe_dir, \ 'RunTimePaths.create_host_pipe_dir') as pipe_dir, \
mock.patch('storlets.gateway.gateways.docker.runtime.' mock.patch('storlets.gateway.gateways.docker.runtime.'
'RunTimeSandbox._restart') as _restart, \ 'DockerRunTimeSandbox._restart') as _restart, \
mock.patch('storlets.gateway.gateways.docker.runtime.' mock.patch('storlets.gateway.gateways.docker.runtime.'
'RunTimeSandbox.wait') as wait: 'DockerRunTimeSandbox.wait') as wait:
_restart.side_effect = StorletTimeout() _restart.side_effect = StorletTimeout()
with self.assertRaises(StorletRuntimeException): with self.assertRaises(StorletRuntimeException):
self.sbox.restart() self.sbox.restart()
@ -487,173 +272,5 @@ class TestRunTimeSandbox(unittest.TestCase):
dependencies),) dependencies),)
class TestStorletInvocationProtocol(unittest.TestCase):
def setUp(self):
self.pipe_path = tempfile.mktemp()
self.log_file = tempfile.mktemp()
self.logger = FakeLogger()
self.storlet_id = 'Storlet-1.0.jar'
self.options = {'storlet_main': 'org.openstack.storlet.Storlet',
'storlet_dependency': 'dep1,dep2',
'storlet_language': 'java',
'file_manager': FakeFileManager('storlet', 'dep')}
storlet_request = DockerStorletRequest(
self.storlet_id, {}, {}, iter(StringIO()), options=self.options)
self.protocol = StorletInvocationProtocol(
storlet_request, self.pipe_path, self.log_file, 1, self.logger)
def tearDown(self):
for path in [self.pipe_path, self.log_file]:
try:
os.unlink(path)
except OSError:
pass
def test_send_execute_command(self):
with mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient.'
'execute') as execute:
execute.return_value = SBusResponse(True, 'OK', 'someid')
with self.protocol.storlet_logger.activate():
self.protocol._send_execute_command()
self.assertEqual('someid', self.protocol.task_id)
with mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient.'
'execute') as execute:
execute.return_value = SBusResponse(True, 'OK')
with self.assertRaises(StorletRuntimeException):
with self.protocol.storlet_logger.activate():
self.protocol._send_execute_command()
with mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient.'
'execute') as execute:
execute.return_value = SBusResponse(False, 'NG', 'someid')
with self.assertRaises(StorletRuntimeException):
with self.protocol.storlet_logger.activate():
self.protocol._send_execute_command()
with mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient.'
'execute') as execute:
execute.side_effect = SBusClientIOError()
with self.assertRaises(StorletRuntimeException):
with self.protocol.storlet_logger.activate():
self.protocol._send_execute_command()
def test_invocation_protocol(self):
# os.pipe will be called 3 times
pipe_called = 3
with _mock_os_pipe([''] * pipe_called) as pipes:
with mock.patch.object(self.protocol,
'_wait_for_read_with_timeout'), \
mock.patch.object(self.protocol, '_send_execute_command'):
self.protocol._invoke()
self.assertEqual(pipe_called, len(pipes))
pipes = iter(pipes)
# data write is not directly closed
# data read is closed
input_data_read_fd, input_data_write_fd = next(pipes)
self.assertTrue(input_data_read_fd.closed)
self.assertFalse(input_data_write_fd.closed)
# data write is closed but data read is still open
data_read_fd, data_write_fd = next(pipes)
self.assertFalse(data_read_fd.closed)
self.assertTrue(data_write_fd.closed)
# metadata write fd is closed, metadata read fd is still open.
metadata_read_fd, metadata_write_fd = next(pipes)
self.assertFalse(metadata_read_fd.closed)
self.assertTrue(metadata_write_fd.closed)
# sanity
self.assertRaises(StopIteration, next, pipes)
def test_invocation_protocol_remote_fds(self):
# In default, we have 4 fds in remote_fds
storlet_request = DockerStorletRequest(
self.storlet_id, {}, {}, iter(StringIO()), options=self.options)
protocol = StorletInvocationProtocol(
storlet_request, self.pipe_path, self.log_file, 1, self.logger)
with protocol.storlet_logger.activate():
self.assertEqual(4, len(protocol.remote_fds))
# extra_resources expands the remote_fds
storlet_request = DockerStorletRequest(
self.storlet_id, {}, {}, iter(StringIO()), options=self.options)
protocol = StorletInvocationProtocol(
storlet_request, self.pipe_path, self.log_file, 1, self.logger,
extra_sources=[storlet_request])
with protocol.storlet_logger.activate():
self.assertEqual(5, len(protocol.remote_fds))
# 2 more extra_resources expands the remote_fds
storlet_request = DockerStorletRequest(
self.storlet_id, {}, {}, iter(StringIO()), options=self.options)
protocol = StorletInvocationProtocol(
storlet_request, self.pipe_path, self.log_file, 1, self.logger,
extra_sources=[storlet_request] * 3)
with protocol.storlet_logger.activate():
self.assertEqual(7, len(protocol.remote_fds))
def test_open_writer_with_invalid_fd(self):
invalid_fds = (
(None, TypeError), (-1, ValueError), ('blah', TypeError))
for invalid_fd, expected_error in invalid_fds:
with self.assertRaises(expected_error):
with self.protocol._open_writer(invalid_fd):
pass
def _test_writer_with_exception(self, exception_cls):
pipes = [os.pipe()]
def raise_in_the_context():
with self.protocol._open_writer(pipes[0][1]):
raise exception_cls()
try:
# writer context doesn't suppress any exception
self.assertRaises(exception_cls, raise_in_the_context)
# since _open_writer closes the write fd, the os.close will fail as
# BadFileDescriptor
with self.assertRaises(OSError) as os_error:
os.close(pipes[0][1])
self.assertEqual(9, os_error.exception.errno)
finally:
for fd in pipes[0]:
try:
os.close(fd)
except OSError:
pass
def test_writer_raise_while_in_writer_context(self):
# basic storlet timeout
self._test_writer_with_exception(StorletTimeout)
# unexpected IOError
self._test_writer_with_exception(IOError)
# else
self._test_writer_with_exception(Exception)
class TestStorletInvocationProtocolPython(TestStorletInvocationProtocol):
def setUp(self):
self.pipe_path = tempfile.mktemp()
self.log_file = tempfile.mktemp()
self.logger = FakeLogger()
self.storlet_id = 'Storlet-1.0.py'
self.options = {'storlet_main': 'storlet.Storlet',
'storlet_dependency': 'dep1,dep2',
'storlet_language': 'python',
'language_version': '3.6',
'file_manager': FakeFileManager('storlet', 'dep')}
storlet_request = DockerStorletRequest(
self.storlet_id, {}, {}, iter(StringIO()), options=self.options)
self.protocol = StorletInvocationProtocol(
storlet_request, self.pipe_path, self.log_file, 1, self.logger)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -15,8 +15,8 @@
import unittest import unittest
from storlets.gateway.common.exceptions import StorletGatewayLoadError from storlets.gateway.common.exceptions import StorletGatewayLoadError
from storlets.gateway.loader import load_gateway from storlets.gateway.loader import load_gateway
from storlets.gateway.gateways.stub import StorletGatewayStub from storlets.gateway.gateways.stub import StubStorletGateway
from storlets.gateway.gateways.docker import StorletGatewayDocker from storlets.gateway.gateways.docker import DockerStorletGateway
class TestLoader(unittest.TestCase): class TestLoader(unittest.TestCase):
@ -26,11 +26,11 @@ class TestLoader(unittest.TestCase):
def test_load_gateway_entry_point(self): def test_load_gateway_entry_point(self):
# existing entry point # existing entry point
self.assertEqual( self.assertEqual(
StorletGatewayStub, StubStorletGateway,
load_gateway('stub')) load_gateway('stub'))
self.assertEqual( self.assertEqual(
StorletGatewayDocker, DockerStorletGateway,
load_gateway('docker')) load_gateway('docker'))
# If the given entry point does not exist # If the given entry point does not exist
@ -40,18 +40,18 @@ class TestLoader(unittest.TestCase):
def test_load_gateway_full_class_path(self): def test_load_gateway_full_class_path(self):
# If the given class path exists # If the given class path exists
self.assertEqual( self.assertEqual(
StorletGatewayStub, StubStorletGateway,
load_gateway('storlets.gateway.gateways.stub.StorletGatewayStub')) load_gateway('storlets.gateway.gateways.stub.StubStorletGateway'))
self.assertEqual( self.assertEqual(
StorletGatewayDocker, DockerStorletGateway,
load_gateway('storlets.gateway.gateways.docker.' load_gateway('storlets.gateway.gateways.docker.'
'StorletGatewayDocker')) 'DockerStorletGateway'))
# If module does not exist # If module does not exist
with self.assertRaises(StorletGatewayLoadError): with self.assertRaises(StorletGatewayLoadError):
load_gateway('storlets.gateway.gateways.another_stub.' load_gateway('storlets.gateway.gateways.another_stub.'
'StorletGatewayStub') 'StubStorletGateway')
# If class does not exist # If class does not exist
with self.assertRaises(StorletGatewayLoadError): with self.assertRaises(StorletGatewayLoadError):

View File

@ -16,7 +16,7 @@
import unittest import unittest
from unittest import mock from unittest import mock
from storlets.gateway.gateways.stub import StorletGatewayStub from storlets.gateway.gateways.stub import StubStorletGateway
from storlets.swift_middleware import storlet_handler from storlets.swift_middleware import storlet_handler
from tests.unit import FakeLogger from tests.unit import FakeLogger
@ -25,7 +25,7 @@ from tests.unit.swift_middleware import FakeApp
def create_handler_config(exec_server): def create_handler_config(exec_server):
return {'execution_server': exec_server, return {'execution_server': exec_server,
'gateway_module': StorletGatewayStub} 'gateway_module': StubStorletGateway}
class BaseTestStorletMiddleware(unittest.TestCase): class BaseTestStorletMiddleware(unittest.TestCase):