Add basic client implementation for sbus protocol
This patch introduces basic implementation of the client to communicate with remote process using sbus, to gather all codes which deeply depend on the sbus protocol into the one place. This patch also introduces sbus CLI, which enables operators to send sbus command like halt easily. Change-Id: Ie1721c016b10b6562b131eb39e3dc84b1d8d473f
This commit is contained in:
parent
dbf7da58b2
commit
a21a14f13a
20
bin/sbus
Normal file
20
bin/sbus
Normal file
@ -0,0 +1,20 @@
|
||||
#!/usr/bin/env python
|
||||
# Copyright (c) 2016 OpenStack Foundation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import sys
|
||||
from storlets.sbus.client.cli import main
|
||||
|
||||
if __name__ == '__main__':
|
||||
main(sys.argv)
|
@ -299,11 +299,8 @@ install_storlets_code() {
|
||||
sudo chown "$STORLETS_SWIFT_RUNTIME_USER":"$STORLETS_SWIFT_RUNTIME_GROUP" "$STORLETS_DOCKER_DEVICE"/scripts
|
||||
sudo chmod 0755 "$STORLETS_DOCKER_DEVICE"/scripts
|
||||
sudo cp scripts/restart_docker_container "$STORLETS_DOCKER_DEVICE"/scripts/
|
||||
sudo cp scripts/send_halt_cmd_to_daemon_factory.py "$STORLETS_DOCKER_DEVICE"/scripts/
|
||||
sudo chmod 04755 "$STORLETS_DOCKER_DEVICE"/scripts/restart_docker_container
|
||||
sudo chown root:root "$STORLETS_DOCKER_DEVICE"/scripts/restart_docker_container
|
||||
sudo chmod 04755 "$STORLETS_DOCKER_DEVICE"/scripts/send_halt_cmd_to_daemon_factory.py
|
||||
sudo chown root:root "$STORLETS_DOCKER_DEVICE"/scripts/send_halt_cmd_to_daemon_factory.py
|
||||
|
||||
cd -
|
||||
}
|
||||
|
@ -280,9 +280,6 @@ require root privileges.
|
||||
cp $HOME/scripts/restart_docker_container .
|
||||
sudo chown root:root restart_docker_container
|
||||
sudo chmod 04755 restart_docker_container
|
||||
cp $HOME/scripts/send_halt_cmd_to_daemon_factory.py .
|
||||
sudo chown root:root send_halt_cmd_to_daemon_factory.py
|
||||
sudo chmod 04755 send_halt_cmd_to_daemon_factory.py
|
||||
|
||||
The run time directory will be later populated by the middleware with:
|
||||
#. storlets - Docker container mapped directories keeping storlet jars
|
||||
|
@ -1,58 +0,0 @@
|
||||
# Copyright (c) 2015, 2016 OpenStack Foundation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from storlets.sbus import SBus
|
||||
from storlets.sbus.datagram import FDMetadata, SBusServiceDatagram
|
||||
from storlets.sbus.file_description import SBUS_FD_SERVICE_OUT
|
||||
from storlets.sbus.command import SBUS_CMD_HALT
|
||||
|
||||
|
||||
def print_usage(argv):
|
||||
print(argv[0] + ' /path/to/daemon/factory_pipe')
|
||||
print('Example:')
|
||||
sys.stdout.write(argv[0] + ' ')
|
||||
print('/home/docker_device/pipes/scopes/'
|
||||
'AUTH_fb8b63c579054c48816ca8acd090b3d9/factory_pipe')
|
||||
|
||||
|
||||
def main(argv):
|
||||
if len(argv) < 2:
|
||||
print_usage(argv)
|
||||
return
|
||||
|
||||
daemon_factory_pipe_name = argv[1]
|
||||
try:
|
||||
fi, fo = os.pipe()
|
||||
halt_dtg = SBusServiceDatagram(
|
||||
SBUS_CMD_HALT,
|
||||
[fo],
|
||||
[FDMetadata(SBUS_FD_SERVICE_OUT).to_dict()])
|
||||
n_status = SBus.send(daemon_factory_pipe_name, halt_dtg)
|
||||
if n_status < 0:
|
||||
print('Sending failed')
|
||||
else:
|
||||
print('Sending succeeded')
|
||||
cmd_response = os.read(fi, 256)
|
||||
print(cmd_response)
|
||||
finally:
|
||||
os.close(fi)
|
||||
os.close(fo)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main(sys.argv)
|
@ -23,6 +23,10 @@ skip_changelog = True
|
||||
[files]
|
||||
packages =
|
||||
storlets
|
||||
scripts =
|
||||
bin/storlets-daemon
|
||||
bin/storlets-daemon-factory
|
||||
bin/sbus
|
||||
|
||||
[entry_points]
|
||||
paste.filter_factory =
|
||||
|
@ -21,9 +21,9 @@ import subprocess
|
||||
import time
|
||||
|
||||
from storlets.sbus import SBus
|
||||
from storlets.sbus.datagram import FDMetadata, SBusServiceDatagram
|
||||
from storlets.sbus.command import SBUS_CMD_HALT, SBUS_CMD_PING
|
||||
from storlets.sbus.file_description import SBUS_FD_SERVICE_OUT
|
||||
from storlets.sbus.client import SBusClient
|
||||
from storlets.sbus.client.exceptions import SBusClientException, \
|
||||
SBusClientSendError
|
||||
from storlets.agent.common.server import command_handler, CommandSuccess, \
|
||||
CommandFailure, SBusServer
|
||||
from storlets.agent.common.utils import get_logger, DEFAULT_PY2, DEFAULT_PY3
|
||||
@ -187,24 +187,19 @@ class StorletDaemonFactory(SBusServer):
|
||||
storlet_pipe_name = self.storlet_name_to_pipe_name[storlet_name]
|
||||
self.logger.debug('Send PING command to {0} via {1}'.
|
||||
format(storlet_name, storlet_pipe_name))
|
||||
read_fd, write_fd = os.pipe()
|
||||
try:
|
||||
dtg = SBusServiceDatagram(
|
||||
SBUS_CMD_PING,
|
||||
[write_fd],
|
||||
[FDMetadata(SBUS_FD_SERVICE_OUT).to_dict()])
|
||||
for i in range(self.NUM_OF_TRIES_PINGING_STARTING_DAEMON):
|
||||
ret = SBus.send(storlet_pipe_name, dtg)
|
||||
if ret >= 0:
|
||||
resp = os.read(read_fd, 128)
|
||||
if resp.startswith('True'):
|
||||
return True
|
||||
time.sleep(1)
|
||||
else:
|
||||
return False
|
||||
finally:
|
||||
os.close(read_fd)
|
||||
os.close(write_fd)
|
||||
client = SBusClient(storlet_pipe_name)
|
||||
for i in range(self.NUM_OF_TRIES_PINGING_STARTING_DAEMON):
|
||||
try:
|
||||
resp = client.ping()
|
||||
if resp.status:
|
||||
return True
|
||||
except SBusClientSendError:
|
||||
pass
|
||||
except SBusClientException:
|
||||
self.logger.exception('Failed to send sbus command')
|
||||
break
|
||||
time.sleep(1)
|
||||
return False
|
||||
|
||||
def process_start_daemon(self, daemon_language, storlet_path, storlet_name,
|
||||
pool_size, uds_path, log_level,
|
||||
@ -412,25 +407,20 @@ class StorletDaemonFactory(SBusServer):
|
||||
self.logger.debug('Send HALT command to {0} via {1}'.
|
||||
format(storlet_name, storlet_pipe_name))
|
||||
|
||||
read_fd, write_fd = os.pipe()
|
||||
client = SBusClient(storlet_pipe_name)
|
||||
try:
|
||||
dtg = SBusServiceDatagram(
|
||||
SBUS_CMD_HALT,
|
||||
[write_fd],
|
||||
[FDMetadata(SBUS_FD_SERVICE_OUT).to_dict()])
|
||||
rc = SBus.send(storlet_pipe_name, dtg)
|
||||
os.close(write_fd)
|
||||
if rc < 0:
|
||||
resp = client.halt()
|
||||
if not resp.status:
|
||||
self.logger.error('Failed to send sbus command: %s' %
|
||||
resp.message)
|
||||
raise SDaemonError(
|
||||
'Failed to send halt command to the storlet daemon {0}'
|
||||
.format(storlet_name))
|
||||
resp = os.read(read_fd, 128)
|
||||
if not resp.startswith('True'):
|
||||
raise SDaemonError(
|
||||
'Failed to send halt command to the storlet daemon {0}'
|
||||
.format(storlet_name))
|
||||
finally:
|
||||
os.close(read_fd)
|
||||
'Failed to send halt to {0}'.format(storlet_name))
|
||||
|
||||
except SBusClientException:
|
||||
self.logger.exception('Failed to send sbus command')
|
||||
raise SDaemonError(
|
||||
'Failed to send halt command to the storlet daemon {0}'
|
||||
.format(storlet_name))
|
||||
|
||||
try:
|
||||
os.waitpid(dmn_pid, 0)
|
||||
|
@ -27,10 +27,11 @@ import json
|
||||
from contextlib import contextmanager
|
||||
|
||||
from storlets.sbus import SBus
|
||||
from storlets.sbus.datagram import FDMetadata, SBusServiceDatagram, \
|
||||
SBusExecuteDatagram
|
||||
from storlets.sbus.command import SBUS_CMD_EXECUTE
|
||||
from storlets.sbus.datagram import FDMetadata, SBusExecuteDatagram
|
||||
from storlets.sbus import file_description as sbus_fd
|
||||
from storlets.sbus import command as sbus_cmd
|
||||
from storlets.sbus.client import SBusClient
|
||||
from storlets.sbus.client.exceptions import SBusClientException
|
||||
from storlets.gateway.common.exceptions import StorletRuntimeException, \
|
||||
StorletTimeout
|
||||
from storlets.gateway.common.logger import StorletLogger
|
||||
@ -246,21 +247,6 @@ class RunTimeSandbox(object):
|
||||
# TODO(change logger's route if possible)
|
||||
self.logger = logger
|
||||
|
||||
def _parse_sandbox_factory_answer(self, str_answer):
|
||||
"""
|
||||
Parse answer string received from container side
|
||||
|
||||
:param str_answer: answer string
|
||||
:returns: (status, message)
|
||||
"""
|
||||
two_tokens = str_answer.split(':', 1)
|
||||
if len(two_tokens) != 2:
|
||||
self.logger.error('Got wrong format about answer over sbus: %s' %
|
||||
str_answer)
|
||||
raise StorletRuntimeException('Got wrong answer')
|
||||
status = (two_tokens[0] == 'True')
|
||||
return status, two_tokens[1]
|
||||
|
||||
def ping(self):
|
||||
"""
|
||||
Ping to daemon factory process inside container
|
||||
@ -270,23 +256,17 @@ class RunTimeSandbox(object):
|
||||
-1 when it fails to send command to the process
|
||||
"""
|
||||
pipe_path = self.paths.host_factory_pipe()
|
||||
|
||||
with _open_pipe() as (read_fd, write_fd):
|
||||
dtg = SBusServiceDatagram(
|
||||
sbus_cmd.SBUS_CMD_PING,
|
||||
[write_fd],
|
||||
[FDMetadata(sbus_fd.SBUS_FD_SERVICE_OUT).to_dict()])
|
||||
rc = SBus.send(pipe_path, dtg)
|
||||
if (rc < 0):
|
||||
return -1
|
||||
|
||||
reply = os.read(read_fd, 10)
|
||||
|
||||
res, error_txt = self._parse_sandbox_factory_answer(reply)
|
||||
if res is True:
|
||||
return 1
|
||||
self.logger.error('Failed to ping to daemon factory: %s' % error_txt)
|
||||
return 0
|
||||
client = SBusClient(pipe_path)
|
||||
try:
|
||||
resp = client.ping()
|
||||
if resp.status:
|
||||
return 1
|
||||
else:
|
||||
self.logger.error('Failed to ping to daemon factory: %s' %
|
||||
resp.message)
|
||||
return 0
|
||||
except SBusClientException:
|
||||
return -1
|
||||
|
||||
def wait(self):
|
||||
"""
|
||||
@ -374,89 +354,59 @@ class RunTimeSandbox(object):
|
||||
self, spath, storlet_id, language, language_version=None):
|
||||
"""
|
||||
Start SDaemon process in the scope's sandbox
|
||||
|
||||
"""
|
||||
prms = {'daemon_language': language.lower(),
|
||||
'storlet_path': spath,
|
||||
'storlet_name': storlet_id,
|
||||
'uds_path': self.paths.sbox_storlet_pipe(storlet_id),
|
||||
'log_level': self.storlet_daemon_debug_level,
|
||||
'pool_size': self.storlet_daemon_thread_pool_size}
|
||||
pipe_path = self.paths.host_factory_pipe()
|
||||
client = SBusClient(pipe_path)
|
||||
try:
|
||||
resp = client.start_daemon(
|
||||
language.lower(), spath, storlet_id,
|
||||
self.paths.sbox_storlet_pipe(storlet_id),
|
||||
self.storlet_daemon_debug_level,
|
||||
self.storlet_daemon_thread_pool_size,
|
||||
language_version)
|
||||
|
||||
if language_version:
|
||||
prms.update({'daemon_language_version': language_version})
|
||||
|
||||
with _open_pipe() as (read_fd, write_fd):
|
||||
dtg = SBusServiceDatagram(
|
||||
sbus_cmd.SBUS_CMD_START_DAEMON,
|
||||
[write_fd],
|
||||
[FDMetadata(sbus_fd.SBUS_FD_SERVICE_OUT).to_dict()],
|
||||
prms)
|
||||
|
||||
pipe_path = self.paths.host_factory_pipe()
|
||||
rc = SBus.send(pipe_path, dtg)
|
||||
# TODO(takashi): Why we should rond rc into -1?
|
||||
if (rc < 0):
|
||||
return -1
|
||||
|
||||
reply = os.read(read_fd, 10)
|
||||
|
||||
res, error_txt = self._parse_sandbox_factory_answer(reply)
|
||||
if res is True:
|
||||
return 1
|
||||
self.logger.error('Failed to start storlet daemon: %s' % error_txt)
|
||||
return 0
|
||||
if resp.status:
|
||||
return 1
|
||||
else:
|
||||
self.logger.error('Failed to start storlet daemon: %s' %
|
||||
resp.message)
|
||||
return 0
|
||||
except SBusClientException:
|
||||
return -1
|
||||
|
||||
def stop_storlet_daemon(self, storlet_id):
|
||||
"""
|
||||
Stop SDaemon process in the scope's sandbox
|
||||
"""
|
||||
with _open_pipe() as (read_fd, write_fd):
|
||||
dtg = SBusServiceDatagram(
|
||||
sbus_cmd.SBUS_CMD_STOP_DAEMON,
|
||||
[write_fd],
|
||||
[FDMetadata(sbus_fd.SBUS_FD_SERVICE_OUT).to_dict()],
|
||||
{'storlet_name': storlet_id})
|
||||
pipe_path = self.paths.host_factory_pipe()
|
||||
rc = SBus.send(pipe_path, dtg)
|
||||
if (rc < 0):
|
||||
self.logger.info("Failed to send status command to %s %s" %
|
||||
(self.scope, storlet_id))
|
||||
return -1
|
||||
|
||||
reply = os.read(read_fd, 10)
|
||||
|
||||
res, error_txt = self._parse_sandbox_factory_answer(reply)
|
||||
if res is True:
|
||||
return 1
|
||||
self.logger.error('Failed to stop storlet daemon: %s' % error_txt)
|
||||
return 0
|
||||
pipe_path = self.paths.host_factory_pipe()
|
||||
client = SBusClient(pipe_path)
|
||||
try:
|
||||
resp = client.stop_daemon(storlet_id)
|
||||
if resp.status:
|
||||
return 1
|
||||
else:
|
||||
self.logger.error('Failed to stop storlet daemon: %s' %
|
||||
resp.message)
|
||||
return 0
|
||||
except SBusClientException:
|
||||
return -1
|
||||
|
||||
def get_storlet_daemon_status(self, storlet_id):
|
||||
"""
|
||||
Get the status of SDaemon process in the scope's sandbox
|
||||
"""
|
||||
with _open_pipe() as (read_fd, write_fd):
|
||||
dtg = SBusServiceDatagram(
|
||||
sbus_cmd.SBUS_CMD_DAEMON_STATUS,
|
||||
[write_fd],
|
||||
[FDMetadata(sbus_fd.SBUS_FD_SERVICE_OUT).to_dict()],
|
||||
{'storlet_name': storlet_id})
|
||||
pipe_path = self.paths.host_factory_pipe()
|
||||
rc = SBus.send(pipe_path, dtg)
|
||||
if (rc < 0):
|
||||
self.logger.info("Failed to send status command to %s %s" %
|
||||
(self.scope, storlet_id))
|
||||
return -1
|
||||
|
||||
reply = os.read(read_fd, 10)
|
||||
|
||||
res, error_txt = self._parse_sandbox_factory_answer(reply)
|
||||
if res is True:
|
||||
return 1
|
||||
self.logger.error('Failed to get status about storlet daemon: %s' %
|
||||
error_txt)
|
||||
return 0
|
||||
pipe_path = self.paths.host_factory_pipe()
|
||||
client = SBusClient(pipe_path)
|
||||
try:
|
||||
resp = client.daemon_status(storlet_id)
|
||||
if resp.status:
|
||||
return 1
|
||||
else:
|
||||
self.logger.error('Failed to get status about storlet '
|
||||
'daemon: %s' % resp.message)
|
||||
return 0
|
||||
except SBusClientException:
|
||||
return -1
|
||||
|
||||
def _get_storlet_classpath(self, storlet_main, storlet_id, dependencies):
|
||||
"""
|
||||
@ -711,18 +661,13 @@ class StorletInvocationProtocol(object):
|
||||
"""
|
||||
Cancel on-going storlet execution
|
||||
"""
|
||||
with _open_pipe() as (read_fd, write_fd):
|
||||
dtg = SBusServiceDatagram(
|
||||
sbus_cmd.SBUS_CMD_CANCEL,
|
||||
[write_fd],
|
||||
[FDMetadata(sbus_fd.SBUS_FD_SERVICE_OUT).to_dict()],
|
||||
None,
|
||||
self.task_id)
|
||||
rc = SBus.send(self.storlet_pipe_path, dtg)
|
||||
if (rc < 0):
|
||||
client = SBusClient(self.storlet_pipe_path)
|
||||
try:
|
||||
resp = client.cancel(self.task_id)
|
||||
if not resp.status:
|
||||
raise StorletRuntimeException('Failed to cancel task')
|
||||
# TODO(takashi): Check the response here
|
||||
os.read(read_fd, 10)
|
||||
except SBusClientException:
|
||||
raise StorletRuntimeException('Failed to cancel task')
|
||||
|
||||
def _invoke(self):
|
||||
"""
|
||||
@ -742,7 +687,7 @@ class StorletInvocationProtocol(object):
|
||||
execution
|
||||
"""
|
||||
dtg = SBusExecuteDatagram(
|
||||
sbus_cmd.SBUS_CMD_EXECUTE,
|
||||
SBUS_CMD_EXECUTE,
|
||||
self.remote_fds,
|
||||
self.remote_fds_metadata,
|
||||
self.srequest.params)
|
||||
|
123
storlets/sbus/client/__init__.py
Normal file
123
storlets/sbus/client/__init__.py
Normal file
@ -0,0 +1,123 @@
|
||||
# Copyright (c) 2016 OpenStack Foundation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import os
|
||||
from storlets.sbus import SBus
|
||||
from storlets.sbus.command import SBUS_CMD_CANCEL, SBUS_CMD_DAEMON_STATUS, \
|
||||
SBUS_CMD_HALT, SBUS_CMD_PING, SBUS_CMD_START_DAEMON, \
|
||||
SBUS_CMD_STOP_DAEMON, SBUS_CMD_STOP_DAEMONS
|
||||
from storlets.sbus.datagram import FDMetadata, SBusServiceDatagram
|
||||
from storlets.sbus.file_description import SBUS_FD_SERVICE_OUT
|
||||
from storlets.sbus.client.exceptions import SBusClientIOError, \
|
||||
SBusClientMalformedResponse, SBusClientSendError
|
||||
|
||||
|
||||
class SBusResponse(object):
|
||||
def __init__(self, status, message):
|
||||
"""
|
||||
Construct SBusResponse class
|
||||
|
||||
:param status: Whether the server succeed to process the given request
|
||||
:param message: Messages to describe the process result
|
||||
"""
|
||||
self.status = status
|
||||
self.message = message
|
||||
|
||||
|
||||
class SBusClient(object):
|
||||
def __init__(self, socket_path, chunk_size=16):
|
||||
self.socket_path = socket_path
|
||||
self.chunk_size = chunk_size
|
||||
|
||||
def _parse_response(self, str_response):
|
||||
"""
|
||||
Parse response string recieved from container side
|
||||
|
||||
:param str_response: response string
|
||||
:returns: SBusResponse instance
|
||||
"""
|
||||
two_tokens = str_response.split(':', 1)
|
||||
if len(two_tokens) != 2:
|
||||
raise SBusClientMalformedResponse('Got malformed response')
|
||||
status = (two_tokens[0].lower() == 'true')
|
||||
message = two_tokens[1]
|
||||
return SBusResponse(status, message)
|
||||
|
||||
def _request(self, command, params=None, task_id=None):
|
||||
read_fd, write_fd = os.pipe()
|
||||
try:
|
||||
try:
|
||||
datagram = SBusServiceDatagram(
|
||||
command, [write_fd],
|
||||
[FDMetadata(SBUS_FD_SERVICE_OUT).to_dict()],
|
||||
params, task_id)
|
||||
rc = SBus.send(self.socket_path, datagram)
|
||||
if rc < 0:
|
||||
raise SBusClientSendError(
|
||||
'Faild to send command(%s) to socket %s' %
|
||||
(datagram.command, self.socket_path))
|
||||
finally:
|
||||
# We already sent the write fd to remote, so should close it
|
||||
# in local side before reading response
|
||||
os.close(write_fd)
|
||||
|
||||
reply = ''
|
||||
while True:
|
||||
try:
|
||||
buf = os.read(read_fd, self.chunk_size)
|
||||
except IOError:
|
||||
raise SBusClientIOError('Failed to read data from read '
|
||||
'pipe')
|
||||
if not buf:
|
||||
break
|
||||
reply = reply + buf
|
||||
finally:
|
||||
os.close(read_fd)
|
||||
|
||||
return self._parse_response(reply)
|
||||
|
||||
def execute(self, *args, **kwargs):
|
||||
# TODO(takashi): implement this
|
||||
raise NotImplementedError('Execute command is not supported yet')
|
||||
|
||||
def ping(self):
|
||||
return self._request(SBUS_CMD_PING)
|
||||
|
||||
def start_daemon(self, language, storlet_path, storlet_id,
|
||||
uds_path, log_level, pool_size,
|
||||
language_version):
|
||||
params = {'daemon_language': language, 'storlet_path': storlet_path,
|
||||
'storlet_name': storlet_id, 'uds_path': uds_path,
|
||||
'log_level': log_level, 'pool_size': pool_size}
|
||||
if language_version:
|
||||
params['daemon_language_version'] = language_version
|
||||
|
||||
return self._request(SBUS_CMD_START_DAEMON, params)
|
||||
|
||||
def stop_daemon(self, storlet_name):
|
||||
return self._request(SBUS_CMD_STOP_DAEMON,
|
||||
{'storlet_name': storlet_name})
|
||||
|
||||
def stop_daemons(self):
|
||||
return self._request(SBUS_CMD_STOP_DAEMONS)
|
||||
|
||||
def halt(self):
|
||||
return self._request(SBUS_CMD_HALT)
|
||||
|
||||
def daemon_status(self, storlet_name):
|
||||
return self._request(SBUS_CMD_DAEMON_STATUS,
|
||||
{'storlet_name': storlet_name})
|
||||
|
||||
def cancel(self, task_id):
|
||||
return self._request(SBUS_CMD_CANCEL, task_id=task_id)
|
63
storlets/sbus/client/cli.py
Normal file
63
storlets/sbus/client/cli.py
Normal file
@ -0,0 +1,63 @@
|
||||
# Copyright (c) 2016 OpenStack Foundation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import os
|
||||
from sys import exit
|
||||
from storlets.sbus.client import SBusClient
|
||||
from storlets.sbus.client.exceptions import SBusClientException
|
||||
|
||||
EXIT_SUCCESS = 0
|
||||
EXIT_ERROR = 1
|
||||
|
||||
|
||||
def main(argv):
|
||||
# TODO(takashi): Add more detailed help message
|
||||
|
||||
if len(argv) < 3:
|
||||
print('sbus <command> <pipe_path>')
|
||||
exit(EXIT_ERROR)
|
||||
|
||||
command = argv[1]
|
||||
pipe_path = argv[2]
|
||||
|
||||
if not os.path.exists(pipe_path):
|
||||
print('ERROR: Pipe file %s does not exist' % pipe_path)
|
||||
exit(EXIT_ERROR)
|
||||
|
||||
client = SBusClient(pipe_path)
|
||||
try:
|
||||
handler = getattr(client, command)
|
||||
|
||||
# TODO(takashi): Currently this only works for ping or halt.
|
||||
# We need to pass more parameters like storlet_name
|
||||
# to implement the other command types.
|
||||
resp = handler()
|
||||
except (AttributeError, NotImplementedError):
|
||||
print('ERROR: Command %s is not supported' % command)
|
||||
exit(EXIT_ERROR)
|
||||
except SBusClientException as err:
|
||||
print('ERROR: Failed to send sbus command %s to %s: %s'
|
||||
% (command, pipe_path, err))
|
||||
exit(EXIT_ERROR)
|
||||
except Exception as err:
|
||||
print('ERROR: Unknown error: %s' % err)
|
||||
exit(EXIT_ERROR)
|
||||
|
||||
print('Response: %s: %s' % (resp.status, resp.message))
|
||||
if resp.status:
|
||||
print('OK')
|
||||
exit(EXIT_SUCCESS)
|
||||
else:
|
||||
print('ERROR: Got error response')
|
||||
exit(EXIT_ERROR)
|
30
storlets/sbus/client/exceptions.py
Normal file
30
storlets/sbus/client/exceptions.py
Normal file
@ -0,0 +1,30 @@
|
||||
# Copyright (c) 2015, 2016 OpenStack Foundation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
class SBusClientException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class SBusClientIOError(SBusClientException, IOError):
|
||||
pass
|
||||
|
||||
|
||||
class SBusClientSendError(SBusClientException):
|
||||
pass
|
||||
|
||||
|
||||
class SBusClientMalformedResponse(SBusClientException):
|
||||
pass
|
@ -12,12 +12,15 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from contextlib import contextmanager
|
||||
import errno
|
||||
import mock
|
||||
import unittest
|
||||
|
||||
from storlets.sbus import command as sbus_cmd
|
||||
from storlets.sbus.client import SBusResponse
|
||||
from storlets.sbus.client.exceptions import SBusClientSendError
|
||||
|
||||
from storlets.agent.daemon_factory.server import SDaemonError, \
|
||||
StorletDaemonFactory
|
||||
from storlets.agent.common.utils import DEFAULT_PY2, DEFAULT_PY3
|
||||
@ -35,7 +38,12 @@ class TestStorletDaemonFactory(unittest.TestCase):
|
||||
base_path = 'storlets.agent.daemon_factory.server'
|
||||
kill_path = base_path + '.os.kill'
|
||||
waitpid_path = base_path + '.os.waitpid'
|
||||
sbus_path = base_path + '.SBus'
|
||||
|
||||
@contextmanager
|
||||
def _mock_sbus_client(self, method):
|
||||
sbusclient_path = self.base_path + '.SBusClient'
|
||||
with mock.patch('.'.join([sbusclient_path, method])) as _method:
|
||||
yield _method
|
||||
|
||||
def setUp(self):
|
||||
self.logger = FakeLogger()
|
||||
@ -110,13 +118,11 @@ class TestStorletDaemonFactory(unittest.TestCase):
|
||||
with mock.patch(self.base_path + '.subprocess.Popen') as popen, \
|
||||
mock.patch(self.base_path + '.time.sleep'), \
|
||||
mock.patch(self.waitpid_path) as waitpid, \
|
||||
mock.patch(self.sbus_path + '.send') as send, \
|
||||
mock.patch(self.base_path + '.os.read') as read:
|
||||
self._mock_sbus_client('ping') as ping:
|
||||
popen.side_effect = [FakePopenObject(1000),
|
||||
FakePopenObject(1001)]
|
||||
waitpid.return_value = 0, 0
|
||||
send.return_value = 0
|
||||
read.return_value = 'True: OK'
|
||||
ping.return_value = SBusResponse(True, 'OK')
|
||||
self.dfactory.spawn_subprocess(
|
||||
['arg0', 'argv1', 'argv2'],
|
||||
{'envk0': 'envv0'}, 'storleta')
|
||||
@ -127,13 +133,11 @@ class TestStorletDaemonFactory(unittest.TestCase):
|
||||
with mock.patch(self.base_path + '.subprocess.Popen') as popen, \
|
||||
mock.patch(self.base_path + '.time.sleep'), \
|
||||
mock.patch(self.waitpid_path) as waitpid, \
|
||||
mock.patch(self.sbus_path + '.send') as send, \
|
||||
mock.patch(self.base_path + '.os.read') as read:
|
||||
self._mock_sbus_client('ping') as ping:
|
||||
popen.side_effect = [FakePopenObject(1000),
|
||||
FakePopenObject(1001)]
|
||||
waitpid.return_value = 0, 0
|
||||
send.return_value = 0
|
||||
read.return_value = 'False: NG'
|
||||
ping.return_value = SBusResponse(False, 'NG')
|
||||
with self.assertRaises(SDaemonError):
|
||||
self.dfactory.spawn_subprocess(
|
||||
['arg0', 'argv1', 'argv2'],
|
||||
@ -165,40 +169,32 @@ class TestStorletDaemonFactory(unittest.TestCase):
|
||||
self.dfactory.storlet_name_to_pipe_name = \
|
||||
{'storleta': 'path/to/uds/a'}
|
||||
|
||||
with mock.patch(self.sbus_path + '.send') as send, \
|
||||
mock.patch(self.base_path + '.time.sleep'), \
|
||||
mock.patch(self.base_path + '.os.read') as read:
|
||||
send.side_effect = [-1, 0]
|
||||
read.return_value = 'True: OK'
|
||||
with self._mock_sbus_client('ping') as ping, \
|
||||
mock.patch(self.base_path + '.time.sleep'):
|
||||
ping.return_value = SBusResponse(True, 'OK')
|
||||
self.assertTrue(
|
||||
self.dfactory.wait_for_daemon_to_initialize('storleta'))
|
||||
self.assertEqual(2, send.call_count)
|
||||
self.assertEqual(1, read.call_count)
|
||||
self.assertEqual(1, ping.call_count)
|
||||
|
||||
with mock.patch(self.sbus_path + '.send') as send, \
|
||||
mock.patch(self.base_path + '.time.sleep'), \
|
||||
mock.patch(self.base_path + '.os.read') as read:
|
||||
send.return_value = 0
|
||||
read.return_value = 'False: NG'
|
||||
with self._mock_sbus_client('ping') as ping, \
|
||||
mock.patch(self.base_path + '.time.sleep'):
|
||||
ping.return_value = SBusResponse(False, 'NG')
|
||||
self.assertFalse(
|
||||
self.dfactory.wait_for_daemon_to_initialize('storleta'))
|
||||
self.assertEqual(
|
||||
self.dfactory.NUM_OF_TRIES_PINGING_STARTING_DAEMON,
|
||||
send.call_count)
|
||||
self.assertEqual(
|
||||
self.dfactory.NUM_OF_TRIES_PINGING_STARTING_DAEMON,
|
||||
read.call_count)
|
||||
ping.call_count)
|
||||
|
||||
self.dfactory.storlet_name_to_pipe_name = \
|
||||
{'storleta': 'path/to/uds/a', 'storletb': 'path/to/uds/b'}
|
||||
with mock.patch(self.sbus_path + '.send') as send, \
|
||||
with self._mock_sbus_client('ping') as ping, \
|
||||
mock.patch(self.base_path + '.time.sleep'):
|
||||
send.return_value = -1
|
||||
ping.side_effect = SBusClientSendError()
|
||||
self.assertFalse(
|
||||
self.dfactory.wait_for_daemon_to_initialize('storleta'))
|
||||
self.assertEqual(
|
||||
self.dfactory.NUM_OF_TRIES_PINGING_STARTING_DAEMON,
|
||||
send.call_count)
|
||||
ping.call_count)
|
||||
|
||||
def test_process_start_daemon(self):
|
||||
# Not running
|
||||
@ -213,13 +209,11 @@ class TestStorletDaemonFactory(unittest.TestCase):
|
||||
with mock.patch(self.base_path + '.subprocess.Popen') as popen, \
|
||||
mock.patch(self.base_path + '.time.sleep'), \
|
||||
mock.patch(self.waitpid_path) as waitpid, \
|
||||
mock.patch(self.sbus_path + '.send') as send, \
|
||||
mock.patch(self.base_path + '.os.read') as read:
|
||||
self._mock_sbus_client('ping') as ping:
|
||||
popen.side_effect = [FakePopenObject(1000),
|
||||
FakePopenObject(1001)]
|
||||
waitpid.return_value = 0, 0
|
||||
send.return_value = 0
|
||||
read.return_value = 'True: OK'
|
||||
ping.return_value = SBusResponse(True, 'OK')
|
||||
self.assertTrue(self.dfactory.process_start_daemon(
|
||||
'java', 'path/to/storlet/a', 'storleta', 1, 'path/to/uds/a',
|
||||
'TRACE'))
|
||||
@ -404,11 +398,9 @@ class TestStorletDaemonFactory(unittest.TestCase):
|
||||
{'storleta': 1000, 'storletb': 1001}
|
||||
self.dfactory.storlet_name_to_pipe_name = \
|
||||
{'storleta': 'path/to/uds/a', 'storletb': 'path/to/uds/b'}
|
||||
with mock.patch(self.sbus_path + '.send') as send, \
|
||||
mock.patch(self.base_path + '.os.read') as read, \
|
||||
with self._mock_sbus_client('halt') as halt, \
|
||||
mock.patch(self.waitpid_path):
|
||||
send.return_value = 0
|
||||
read.return_value = 'True: OK'
|
||||
halt.return_value = SBusResponse(True, 'OK')
|
||||
terminated = self.dfactory.shutdown_all_processes()
|
||||
self.assertEqual(2, len(terminated))
|
||||
self.assertIn('storleta', terminated)
|
||||
@ -421,17 +413,15 @@ class TestStorletDaemonFactory(unittest.TestCase):
|
||||
{'storleta': 1000, 'storletb': 1001}
|
||||
self.dfactory.storlet_name_to_pipe_name = \
|
||||
{'storleta': 'patha', 'storletb': 'pathb'}
|
||||
with mock.patch(self.sbus_path + '.send') as send, \
|
||||
mock.patch(self.base_path + '.os.read') as read, \
|
||||
with self._mock_sbus_client('halt') as halt, \
|
||||
mock.patch(self.waitpid_path) as waitpid:
|
||||
send.return_value = -1
|
||||
read.return_value = 'True: OK'
|
||||
halt.side_effect = SBusClientSendError()
|
||||
exc_pattern = '^Failed to shutdown some storlet daemons: .*'
|
||||
with self.assertRaisesRegexp(SDaemonError, exc_pattern) as e:
|
||||
self.dfactory.shutdown_all_processes()
|
||||
self.assertIn('storleta', str(e.exception))
|
||||
self.assertIn('storletb', str(e.exception))
|
||||
self.assertEqual(2, send.call_count)
|
||||
self.assertEqual(2, halt.call_count)
|
||||
self.assertEqual(0, waitpid.call_count)
|
||||
self.assertEqual({'storleta': 1000, 'storletb': 1001},
|
||||
self.dfactory.storlet_name_to_pid)
|
||||
@ -441,16 +431,14 @@ class TestStorletDaemonFactory(unittest.TestCase):
|
||||
{'storleta': 1000, 'storletb': 1001}
|
||||
self.dfactory.storlet_name_to_pipe_name = \
|
||||
{'storleta': 'patha', 'storletb': 'pathb'}
|
||||
with mock.patch(self.sbus_path + '.send') as send, \
|
||||
mock.patch(self.base_path + '.os.read') as read, \
|
||||
with self._mock_sbus_client('halt') as halt, \
|
||||
mock.patch(self.waitpid_path) as waitpid:
|
||||
send.return_value = -1
|
||||
read.return_value = 'True: OK'
|
||||
halt.side_effect = SBusClientSendError()
|
||||
exc_pattern = ('^Failed to send halt command to the storlet '
|
||||
'daemon storlet[a-b]$')
|
||||
with self.assertRaisesRegexp(SDaemonError, exc_pattern):
|
||||
self.dfactory.shutdown_all_processes(False)
|
||||
self.assertEqual(1, send.call_count)
|
||||
self.assertEqual(1, halt.call_count)
|
||||
self.assertEqual(0, waitpid.call_count)
|
||||
self.assertEqual({'storleta': 1000, 'storletb': 1001},
|
||||
self.dfactory.storlet_name_to_pid)
|
||||
@ -461,11 +449,9 @@ class TestStorletDaemonFactory(unittest.TestCase):
|
||||
{'storleta': 1000, 'storletb': 1001}
|
||||
self.dfactory.storlet_name_to_pipe_name = \
|
||||
{'storleta': 'path/to/uds/a', 'storletb': 'path/to/uds/b'}
|
||||
with mock.patch(self.sbus_path + '.send') as send, \
|
||||
mock.patch(self.base_path + '.os.read') as read, \
|
||||
with self._mock_sbus_client('halt') as halt, \
|
||||
mock.patch(self.waitpid_path):
|
||||
send.return_value = 0
|
||||
read.return_value = 'True: OK'
|
||||
halt.return_value = SBusResponse(True, 'OK')
|
||||
self.dfactory.shutdown_process('storleta')
|
||||
self.assertEqual({'storletb': 1001},
|
||||
self.dfactory.storlet_name_to_pid)
|
||||
@ -475,11 +461,9 @@ class TestStorletDaemonFactory(unittest.TestCase):
|
||||
{'storleta': 1000, 'storletb': 1001}
|
||||
self.dfactory.storlet_name_to_pipe_name = \
|
||||
{'storleta': 'path/to/uds/a', 'storletb': 'path/to/uds/b'}
|
||||
with mock.patch(self.sbus_path + '.send') as send, \
|
||||
mock.patch(self.base_path + '.os.read') as read, \
|
||||
with self._mock_sbus_client('halt') as halt, \
|
||||
mock.patch(self.waitpid_path) as waitpid:
|
||||
send.return_value = -1
|
||||
read.return_value = 'True: OK'
|
||||
halt.side_effect = SBusClientSendError()
|
||||
with self.assertRaises(SDaemonError):
|
||||
self.dfactory.shutdown_process('storleta')
|
||||
self.assertEqual(0, waitpid.call_count)
|
||||
@ -491,11 +475,9 @@ class TestStorletDaemonFactory(unittest.TestCase):
|
||||
{'storleta': 1000, 'storletb': 1001}
|
||||
self.dfactory.storlet_name_to_pipe_name = \
|
||||
{'storleta': 'path/to/uds/a', 'storletb': 'path/to/uds/b'}
|
||||
with mock.patch(self.sbus_path + '.send') as send, \
|
||||
mock.patch(self.base_path + '.os.read') as read, \
|
||||
with self._mock_sbus_client('halt') as halt, \
|
||||
mock.patch(self.waitpid_path) as waitpid:
|
||||
send.return_value = 0
|
||||
read.return_value = 'True: OK'
|
||||
halt.return_value = SBusResponse(True, 'OK')
|
||||
waitpid.side_effect = OSError()
|
||||
with self.assertRaises(SDaemonError):
|
||||
self.dfactory.shutdown_process('storleta')
|
||||
@ -529,13 +511,13 @@ class TestStorletDaemonFactory(unittest.TestCase):
|
||||
with mock.patch(self.base_path + '.subprocess.Popen') as popen, \
|
||||
mock.patch(self.base_path + '.time.sleep'), \
|
||||
mock.patch(self.waitpid_path) as waitpid, \
|
||||
mock.patch(self.sbus_path + '.send') as send, \
|
||||
mock.patch(self.base_path + '.os.read') as read:
|
||||
self._mock_sbus_client('ping') as ping, \
|
||||
self._mock_sbus_client('start_daemon') as start_daemon:
|
||||
popen.side_effect = [FakePopenObject(1000),
|
||||
FakePopenObject(1001)]
|
||||
waitpid.return_value = 0, 0
|
||||
send.return_value = 0
|
||||
read.return_value = 'True: OK'
|
||||
ping.return_value = SBusResponse(True, 'OK')
|
||||
start_daemon.return_value = SBusResponse(True, 'OK')
|
||||
ret = self.dfactory.start_daemon(DummyDatagram(prms))
|
||||
self.assertTrue(ret.status)
|
||||
self.assertEqual('OK', ret.message)
|
||||
@ -621,11 +603,9 @@ class TestStorletDaemonFactory(unittest.TestCase):
|
||||
{'storleta': 1000, 'storletb': 1001}
|
||||
self.dfactory.storlet_name_to_pipe_name = \
|
||||
{'storleta': 'path/to/uds/a', 'storletb': 'path/to/uds/b'}
|
||||
with mock.patch(self.sbus_path + '.send') as send, \
|
||||
mock.patch(self.base_path + '.os.read') as read, \
|
||||
with self._mock_sbus_client('halt') as halt, \
|
||||
mock.patch(self.waitpid_path):
|
||||
send.return_value = 0
|
||||
read.return_value = 'True: OK'
|
||||
halt.return_value = SBusResponse(True, 'OK')
|
||||
resp = self.dfactory.halt(DummyDatagram())
|
||||
self.assertTrue(resp.status)
|
||||
self.assertIn('storleta: terminated', resp.message)
|
||||
|
@ -13,23 +13,28 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import unittest
|
||||
import os
|
||||
import os.path
|
||||
from shutil import rmtree
|
||||
from tempfile import mkdtemp
|
||||
import eventlet
|
||||
import json
|
||||
import six
|
||||
from six import StringIO
|
||||
|
||||
import mock
|
||||
import unittest
|
||||
|
||||
from swift.common.swob import Request, Response
|
||||
from swift.common.utils import FileLikeIter
|
||||
|
||||
from storlets.sbus.client import SBusResponse
|
||||
|
||||
from tests.unit import FakeLogger
|
||||
from tests.unit.gateway.gateways import FakeFileManager
|
||||
from storlets.gateway.gateways.docker.gateway import DockerStorletRequest, \
|
||||
StorletGatewayDocker
|
||||
from tests.unit import MockSBus
|
||||
import os
|
||||
import os.path
|
||||
from tempfile import mkdtemp
|
||||
from shutil import rmtree
|
||||
import mock
|
||||
import json
|
||||
import eventlet
|
||||
|
||||
|
||||
class MockInternalClient(object):
|
||||
@ -412,12 +417,6 @@ use = egg:swift#catch_errors
|
||||
# TODO(kota_): need more efficient way for emuration of return value
|
||||
# from SDaemon
|
||||
value_generator = iter([
|
||||
# Firt is for confirmation for SDaemon running
|
||||
'True: daemon running confirmation',
|
||||
# Second is stop SDaemon in activation
|
||||
'True: stop daemon',
|
||||
# Third is start SDaemon again in activation
|
||||
'True: start daemon',
|
||||
# Forth is return value for invoking as task_id
|
||||
'This is task id',
|
||||
# Fifth is for getting meta
|
||||
@ -451,6 +450,7 @@ use = egg:swift#catch_errors
|
||||
# SBus -> mock SBus.send() for container communication
|
||||
# os.read -> mock reading the file descriptor from container
|
||||
# select.slect -> mock fd communication which can be readable
|
||||
@mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient')
|
||||
@mock.patch('storlets.gateway.gateways.docker.runtime.SBus', MockSBus)
|
||||
@mock.patch('storlets.gateway.gateways.docker.runtime.os.read',
|
||||
mock_read)
|
||||
@ -461,7 +461,10 @@ use = egg:swift#catch_errors
|
||||
@mock.patch('storlets.gateway.common.stob.os.read',
|
||||
mock_read)
|
||||
@mock.patch(invocation_protocol, mock_writer)
|
||||
def test_invocation_flow():
|
||||
def test_invocation_flow(client):
|
||||
client.ping.return_value = SBusResponse(True, 'OK')
|
||||
client.stop_daemon.return_value = SBusResponse(True, 'OK')
|
||||
client.start_daemon.return_value = SBusResponse(True, 'OK')
|
||||
sresp = self.gateway.invocation_flow(st_req, extra_sources)
|
||||
eventlet.sleep(0.1)
|
||||
file_like = FileLikeIter(sresp.data_iter)
|
||||
|
@ -22,6 +22,9 @@ from contextlib import contextmanager
|
||||
from six import StringIO
|
||||
from stat import ST_MODE
|
||||
|
||||
from storlets.sbus.client import SBusResponse
|
||||
from storlets.sbus.client.exceptions import SBusClientIOError, \
|
||||
SBusClientMalformedResponse, SBusClientSendError
|
||||
from storlets.gateway.common.exceptions import StorletRuntimeException, \
|
||||
StorletTimeout
|
||||
from storlets.gateway.gateways.docker.gateway import DockerStorletRequest
|
||||
@ -263,62 +266,49 @@ class TestRunTimeSandbox(unittest.TestCase):
|
||||
self.scope = '0123456789abc'
|
||||
self.sbox = RunTimeSandbox(self.scope, self.conf, self.logger)
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
|
||||
def test_parse_sandbox_factory_answer(self):
|
||||
status, msg = self.sbox._parse_sandbox_factory_answer('True:message')
|
||||
self.assertTrue(status)
|
||||
self.assertEqual('message', msg)
|
||||
|
||||
status, msg = self.sbox._parse_sandbox_factory_answer('False:message')
|
||||
self.assertFalse(status)
|
||||
self.assertEqual('message', msg)
|
||||
|
||||
with self.assertRaises(StorletRuntimeException):
|
||||
self.sbox._parse_sandbox_factory_answer('Foo')
|
||||
|
||||
def _check_all_pipese_closed(self, pipes):
|
||||
for _pipe in pipes:
|
||||
self.assertTrue(_pipe[0].closed)
|
||||
self.assertTrue(_pipe[1].closed)
|
||||
|
||||
def test_ping(self):
|
||||
with _mock_os_pipe(['True:OK']) as pipes, _mock_sbus(0):
|
||||
self.assertEqual(1, self.sbox.ping())
|
||||
self._check_all_pipese_closed(pipes)
|
||||
with mock.patch('storlets.gateway.gateways.docker.runtime.'
|
||||
'SBusClient.ping') as ping:
|
||||
ping.return_value = SBusResponse(True, 'OK')
|
||||
self.assertEqual(self.sbox.ping(), 1)
|
||||
|
||||
with _mock_os_pipe(['False:ERROR']) as pipes, _mock_sbus(-1):
|
||||
self.assertEqual(-1, self.sbox.ping())
|
||||
self._check_all_pipese_closed(pipes)
|
||||
with mock.patch('storlets.gateway.gateways.docker.runtime.'
|
||||
'SBusClient.ping') as ping:
|
||||
ping.return_value = SBusResponse(False, 'Error')
|
||||
self.assertEqual(self.sbox.ping(), 0)
|
||||
|
||||
with _mock_os_pipe(['Foo']) as pipes, _mock_sbus(0):
|
||||
with self.assertRaises(StorletRuntimeException):
|
||||
self.sbox.ping()
|
||||
self._check_all_pipese_closed(pipes)
|
||||
with mock.patch('storlets.gateway.gateways.docker.runtime.'
|
||||
'SBusClient.ping') as ping:
|
||||
ping.side_effect = SBusClientSendError()
|
||||
self.assertEqual(self.sbox.ping(), -1)
|
||||
|
||||
with mock.patch('storlets.gateway.gateways.docker.runtime.'
|
||||
'SBusClient.ping') as ping:
|
||||
ping.side_effect = SBusClientMalformedResponse()
|
||||
self.assertEqual(self.sbox.ping(), -1)
|
||||
|
||||
with mock.patch('storlets.gateway.gateways.docker.runtime.'
|
||||
'SBusClient.ping') as ping:
|
||||
ping.side_effect = SBusClientIOError()
|
||||
self.assertEqual(self.sbox.ping(), -1)
|
||||
|
||||
def test_wait(self):
|
||||
with _mock_os_pipe(['True:OK']) as pipes, _mock_sbus(0), \
|
||||
with mock.patch('storlets.gateway.gateways.docker.runtime.'
|
||||
'SBusClient.ping') as ping, \
|
||||
mock.patch('storlets.gateway.gateways.docker.runtime.'
|
||||
'time.sleep') as _s:
|
||||
'time.sleep') as sleep:
|
||||
ping.return_value = SBusResponse(True, 'OK')
|
||||
self.sbox.wait()
|
||||
self.assertEqual(0, _s.call_count)
|
||||
self._check_all_pipese_closed(pipes)
|
||||
self.assertEqual(sleep.call_count, 0)
|
||||
|
||||
with _mock_os_pipe(['False:ERROR', 'True:OK']) as pipes, \
|
||||
_mock_sbus(0), \
|
||||
with mock.patch('storlets.gateway.gateways.docker.runtime.'
|
||||
'SBusClient.ping') as ping, \
|
||||
mock.patch('storlets.gateway.gateways.docker.runtime.'
|
||||
'time.sleep') as _s:
|
||||
'time.sleep') as sleep:
|
||||
ping.side_effect = [SBusResponse(False, 'Error'),
|
||||
SBusResponse(True, 'OK')]
|
||||
self.sbox.wait()
|
||||
self.assertEqual(1, _s.call_count)
|
||||
self._check_all_pipese_closed(pipes)
|
||||
|
||||
with _mock_os_pipe(['Foo']) as pipes, _mock_sbus(0), \
|
||||
mock.patch('storlets.gateway.gateways.docker.runtime.'
|
||||
'time.sleep') as _s:
|
||||
with self.assertRaises(StorletRuntimeException):
|
||||
self.sbox.wait()
|
||||
self._check_all_pipese_closed(pipes)
|
||||
self.assertEqual(sleep.call_count, 1)
|
||||
|
||||
# TODO(takashi): should test timeout case
|
||||
|
||||
|
0
tests/unit/sbus/client/__init__.py
Normal file
0
tests/unit/sbus/client/__init__.py
Normal file
152
tests/unit/sbus/client/test_client.py
Normal file
152
tests/unit/sbus/client/test_client.py
Normal file
@ -0,0 +1,152 @@
|
||||
# Copyright (c) 2015-2016 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
import os
|
||||
import unittest
|
||||
import errno
|
||||
from contextlib import contextmanager
|
||||
|
||||
from storlets.sbus.client.exceptions import SBusClientSendError, \
|
||||
SBusClientMalformedResponse
|
||||
from storlets.sbus.client import SBusClient
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _mock_sbus(send_status=0):
|
||||
with mock.patch('storlets.sbus.client.SBus.send') as fake_send:
|
||||
fake_send.return_value = send_status
|
||||
yield
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _mock_os_pipe(bufs):
|
||||
class FakeFd(object):
|
||||
def __init__(self, rbuf=''):
|
||||
self.rbuf = rbuf
|
||||
self.closed = False
|
||||
|
||||
def read(self, size):
|
||||
size = min(len(self.rbuf), size)
|
||||
ret = self.rbuf[:size]
|
||||
self.rbuf = self.rbuf[size:]
|
||||
return ret
|
||||
|
||||
def close(self):
|
||||
if self.closed:
|
||||
raise OSError(errno.EBADF, os.strerror(errno.EBADF))
|
||||
self.closed = True
|
||||
|
||||
def fake_os_read(fd, size):
|
||||
return fd.read(size)
|
||||
|
||||
def fake_os_close(fd):
|
||||
fd.close()
|
||||
|
||||
pipes = [(FakeFd(buf), FakeFd()) for buf in bufs]
|
||||
pipe_generator = iter(pipes)
|
||||
|
||||
def mock_os_pipe():
|
||||
try:
|
||||
return next(pipe_generator)
|
||||
except StopIteration:
|
||||
raise AssertionError('pipe called more than expected')
|
||||
|
||||
with mock.patch('storlets.sbus.client.os.pipe', mock_os_pipe), \
|
||||
mock.patch('storlets.sbus.client.os.read', fake_os_read), \
|
||||
mock.patch('storlets.sbus.client.os.close', fake_os_close):
|
||||
yield pipes
|
||||
|
||||
|
||||
class TestSBusClient(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.pipe_path = 'pipe_path'
|
||||
self.client = SBusClient(self.pipe_path, 4)
|
||||
|
||||
def test_parse_response(self):
|
||||
resp = self.client._parse_response('True:OK')
|
||||
self.assertTrue(resp.status)
|
||||
self.assertEqual('OK', resp.message)
|
||||
|
||||
resp = self.client._parse_response('False:NG')
|
||||
self.assertFalse(resp.status)
|
||||
self.assertEqual('NG', resp.message)
|
||||
|
||||
resp = self.client._parse_response('True:Sample:Message')
|
||||
self.assertTrue(resp.status)
|
||||
self.assertEqual('Sample:Message', resp.message)
|
||||
|
||||
with self.assertRaises(SBusClientMalformedResponse):
|
||||
resp = self.client._parse_response('Foo')
|
||||
|
||||
def _check_all_pipes_closed(self, pipes):
|
||||
# Make sure that pipes are not empty
|
||||
self.assertGreater(len(pipes), 0)
|
||||
|
||||
for _pipe in pipes:
|
||||
self.assertTrue(_pipe[0].closed)
|
||||
self.assertTrue(_pipe[1].closed)
|
||||
|
||||
def _test_service_request(self, method, *args, **kwargs):
|
||||
with _mock_os_pipe(['True:OK']) as pipes, _mock_sbus(0):
|
||||
resp = method(*args, **kwargs)
|
||||
self.assertTrue(resp.status)
|
||||
self.assertEqual('OK', resp.message)
|
||||
self._check_all_pipes_closed(pipes)
|
||||
|
||||
with _mock_os_pipe(['False:ERROR']) as pipes, _mock_sbus(0):
|
||||
resp = method(*args, **kwargs)
|
||||
self.assertFalse(resp.status)
|
||||
self.assertEqual('ERROR', resp.message)
|
||||
self._check_all_pipes_closed(pipes)
|
||||
|
||||
with _mock_os_pipe(['True:OK']) as pipes, _mock_sbus(-1):
|
||||
with self.assertRaises(SBusClientSendError):
|
||||
method(*args, **kwargs)
|
||||
self._check_all_pipes_closed(pipes)
|
||||
|
||||
# TODO(takashi): Add IOError case
|
||||
|
||||
with _mock_os_pipe(['Foo']) as pipes, _mock_sbus(0):
|
||||
with self.assertRaises(SBusClientMalformedResponse):
|
||||
method(*args, **kwargs)
|
||||
self._check_all_pipes_closed(pipes)
|
||||
|
||||
def test_ping(self):
|
||||
self._test_service_request(self.client.ping)
|
||||
|
||||
def test_start_daemon(self):
|
||||
self._test_service_request(
|
||||
self.client.start_daemon, 'java', 'path/to/storlet',
|
||||
'storleta', 'path/to/uds', 'INFO', '10', '11')
|
||||
|
||||
def test_stop_daemon(self):
|
||||
self._test_service_request(self.client.stop_daemon, 'storleta')
|
||||
|
||||
def test_stop_daemons(self):
|
||||
self._test_service_request(self.client.stop_daemons)
|
||||
|
||||
def test_halt(self):
|
||||
self._test_service_request(self.client.halt)
|
||||
|
||||
def test_daemon_status(self):
|
||||
self._test_service_request(self.client.daemon_status, 'storleta')
|
||||
|
||||
def test_cancel(self):
|
||||
self._test_service_request(self.client.cancel, 'taskid')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in New Issue
Block a user