Merge "Add basic client implementation for sbus protocol"

This commit is contained in:
Zuul 2019-08-08 01:32:30 +00:00 committed by Gerrit Code Review
commit 905956ff81
15 changed files with 584 additions and 348 deletions

20
bin/sbus Normal file
View File

@ -0,0 +1,20 @@
#!/usr/bin/env python
# Copyright (c) 2016 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from storlets.sbus.client.cli import main
if __name__ == '__main__':
main(sys.argv)

View File

@ -299,11 +299,8 @@ install_storlets_code() {
sudo chown "$STORLETS_SWIFT_RUNTIME_USER":"$STORLETS_SWIFT_RUNTIME_GROUP" "$STORLETS_DOCKER_DEVICE"/scripts
sudo chmod 0755 "$STORLETS_DOCKER_DEVICE"/scripts
sudo cp scripts/restart_docker_container "$STORLETS_DOCKER_DEVICE"/scripts/
sudo cp scripts/send_halt_cmd_to_daemon_factory.py "$STORLETS_DOCKER_DEVICE"/scripts/
sudo chmod 04755 "$STORLETS_DOCKER_DEVICE"/scripts/restart_docker_container
sudo chown root:root "$STORLETS_DOCKER_DEVICE"/scripts/restart_docker_container
sudo chmod 04755 "$STORLETS_DOCKER_DEVICE"/scripts/send_halt_cmd_to_daemon_factory.py
sudo chown root:root "$STORLETS_DOCKER_DEVICE"/scripts/send_halt_cmd_to_daemon_factory.py
cd -
}

View File

@ -280,9 +280,6 @@ require root privileges.
cp $HOME/scripts/restart_docker_container .
sudo chown root:root restart_docker_container
sudo chmod 04755 restart_docker_container
cp $HOME/scripts/send_halt_cmd_to_daemon_factory.py .
sudo chown root:root send_halt_cmd_to_daemon_factory.py
sudo chmod 04755 send_halt_cmd_to_daemon_factory.py
The run time directory will be later populated by the middleware with:
#. storlets - Docker container mapped directories keeping storlet jars

View File

@ -1,58 +0,0 @@
# Copyright (c) 2015, 2016 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
from storlets.sbus import SBus
from storlets.sbus.datagram import FDMetadata, SBusServiceDatagram
from storlets.sbus.file_description import SBUS_FD_SERVICE_OUT
from storlets.sbus.command import SBUS_CMD_HALT
def print_usage(argv):
print(argv[0] + ' /path/to/daemon/factory_pipe')
print('Example:')
sys.stdout.write(argv[0] + ' ')
print('/home/docker_device/pipes/scopes/'
'AUTH_fb8b63c579054c48816ca8acd090b3d9/factory_pipe')
def main(argv):
if len(argv) < 2:
print_usage(argv)
return
daemon_factory_pipe_name = argv[1]
try:
fi, fo = os.pipe()
halt_dtg = SBusServiceDatagram(
SBUS_CMD_HALT,
[fo],
[FDMetadata(SBUS_FD_SERVICE_OUT).to_dict()])
n_status = SBus.send(daemon_factory_pipe_name, halt_dtg)
if n_status < 0:
print('Sending failed')
else:
print('Sending succeeded')
cmd_response = os.read(fi, 256)
print(cmd_response)
finally:
os.close(fi)
os.close(fo)
if __name__ == '__main__':
main(sys.argv)

View File

@ -26,6 +26,10 @@ skip_changelog = True
[files]
packages =
storlets
scripts =
bin/storlets-daemon
bin/storlets-daemon-factory
bin/sbus
[entry_points]
paste.filter_factory =

View File

@ -21,9 +21,9 @@ import subprocess
import time
from storlets.sbus import SBus
from storlets.sbus.datagram import FDMetadata, SBusServiceDatagram
from storlets.sbus.command import SBUS_CMD_HALT, SBUS_CMD_PING
from storlets.sbus.file_description import SBUS_FD_SERVICE_OUT
from storlets.sbus.client import SBusClient
from storlets.sbus.client.exceptions import SBusClientException, \
SBusClientSendError
from storlets.agent.common.server import command_handler, CommandSuccess, \
CommandFailure, SBusServer
from storlets.agent.common.utils import get_logger, DEFAULT_PY2, DEFAULT_PY3
@ -187,24 +187,19 @@ class StorletDaemonFactory(SBusServer):
storlet_pipe_name = self.storlet_name_to_pipe_name[storlet_name]
self.logger.debug('Send PING command to {0} via {1}'.
format(storlet_name, storlet_pipe_name))
read_fd, write_fd = os.pipe()
try:
dtg = SBusServiceDatagram(
SBUS_CMD_PING,
[write_fd],
[FDMetadata(SBUS_FD_SERVICE_OUT).to_dict()])
for i in range(self.NUM_OF_TRIES_PINGING_STARTING_DAEMON):
ret = SBus.send(storlet_pipe_name, dtg)
if ret >= 0:
resp = os.read(read_fd, 128)
if resp.startswith('True'):
return True
time.sleep(1)
else:
return False
finally:
os.close(read_fd)
os.close(write_fd)
client = SBusClient(storlet_pipe_name)
for i in range(self.NUM_OF_TRIES_PINGING_STARTING_DAEMON):
try:
resp = client.ping()
if resp.status:
return True
except SBusClientSendError:
pass
except SBusClientException:
self.logger.exception('Failed to send sbus command')
break
time.sleep(1)
return False
def process_start_daemon(self, daemon_language, storlet_path, storlet_name,
pool_size, uds_path, log_level,
@ -412,25 +407,20 @@ class StorletDaemonFactory(SBusServer):
self.logger.debug('Send HALT command to {0} via {1}'.
format(storlet_name, storlet_pipe_name))
read_fd, write_fd = os.pipe()
client = SBusClient(storlet_pipe_name)
try:
dtg = SBusServiceDatagram(
SBUS_CMD_HALT,
[write_fd],
[FDMetadata(SBUS_FD_SERVICE_OUT).to_dict()])
rc = SBus.send(storlet_pipe_name, dtg)
os.close(write_fd)
if rc < 0:
resp = client.halt()
if not resp.status:
self.logger.error('Failed to send sbus command: %s' %
resp.message)
raise SDaemonError(
'Failed to send halt command to the storlet daemon {0}'
.format(storlet_name))
resp = os.read(read_fd, 128)
if not resp.startswith('True'):
raise SDaemonError(
'Failed to send halt command to the storlet daemon {0}'
.format(storlet_name))
finally:
os.close(read_fd)
'Failed to send halt to {0}'.format(storlet_name))
except SBusClientException:
self.logger.exception('Failed to send sbus command')
raise SDaemonError(
'Failed to send halt command to the storlet daemon {0}'
.format(storlet_name))
try:
os.waitpid(dmn_pid, 0)

View File

@ -27,10 +27,11 @@ import json
from contextlib import contextmanager
from storlets.sbus import SBus
from storlets.sbus.datagram import FDMetadata, SBusServiceDatagram, \
SBusExecuteDatagram
from storlets.sbus.command import SBUS_CMD_EXECUTE
from storlets.sbus.datagram import FDMetadata, SBusExecuteDatagram
from storlets.sbus import file_description as sbus_fd
from storlets.sbus import command as sbus_cmd
from storlets.sbus.client import SBusClient
from storlets.sbus.client.exceptions import SBusClientException
from storlets.gateway.common.exceptions import StorletRuntimeException, \
StorletTimeout
from storlets.gateway.common.logger import StorletLogger
@ -246,21 +247,6 @@ class RunTimeSandbox(object):
# TODO(change logger's route if possible)
self.logger = logger
def _parse_sandbox_factory_answer(self, str_answer):
"""
Parse answer string received from container side
:param str_answer: answer string
:returns: (status, message)
"""
two_tokens = str_answer.split(':', 1)
if len(two_tokens) != 2:
self.logger.error('Got wrong format about answer over sbus: %s' %
str_answer)
raise StorletRuntimeException('Got wrong answer')
status = (two_tokens[0] == 'True')
return status, two_tokens[1]
def ping(self):
"""
Ping to daemon factory process inside container
@ -270,23 +256,17 @@ class RunTimeSandbox(object):
-1 when it fails to send command to the process
"""
pipe_path = self.paths.host_factory_pipe()
with _open_pipe() as (read_fd, write_fd):
dtg = SBusServiceDatagram(
sbus_cmd.SBUS_CMD_PING,
[write_fd],
[FDMetadata(sbus_fd.SBUS_FD_SERVICE_OUT).to_dict()])
rc = SBus.send(pipe_path, dtg)
if (rc < 0):
return -1
reply = os.read(read_fd, 10)
res, error_txt = self._parse_sandbox_factory_answer(reply)
if res is True:
return 1
self.logger.error('Failed to ping to daemon factory: %s' % error_txt)
return 0
client = SBusClient(pipe_path)
try:
resp = client.ping()
if resp.status:
return 1
else:
self.logger.error('Failed to ping to daemon factory: %s' %
resp.message)
return 0
except SBusClientException:
return -1
def wait(self):
"""
@ -386,89 +366,59 @@ class RunTimeSandbox(object):
self, spath, storlet_id, language, language_version=None):
"""
Start SDaemon process in the scope's sandbox
"""
prms = {'daemon_language': language.lower(),
'storlet_path': spath,
'storlet_name': storlet_id,
'uds_path': self.paths.sbox_storlet_pipe(storlet_id),
'log_level': self.storlet_daemon_debug_level,
'pool_size': self.storlet_daemon_thread_pool_size}
pipe_path = self.paths.host_factory_pipe()
client = SBusClient(pipe_path)
try:
resp = client.start_daemon(
language.lower(), spath, storlet_id,
self.paths.sbox_storlet_pipe(storlet_id),
self.storlet_daemon_debug_level,
self.storlet_daemon_thread_pool_size,
language_version)
if language_version:
prms.update({'daemon_language_version': language_version})
with _open_pipe() as (read_fd, write_fd):
dtg = SBusServiceDatagram(
sbus_cmd.SBUS_CMD_START_DAEMON,
[write_fd],
[FDMetadata(sbus_fd.SBUS_FD_SERVICE_OUT).to_dict()],
prms)
pipe_path = self.paths.host_factory_pipe()
rc = SBus.send(pipe_path, dtg)
# TODO(takashi): Why we should rond rc into -1?
if (rc < 0):
return -1
reply = os.read(read_fd, 10)
res, error_txt = self._parse_sandbox_factory_answer(reply)
if res is True:
return 1
self.logger.error('Failed to start storlet daemon: %s' % error_txt)
return 0
if resp.status:
return 1
else:
self.logger.error('Failed to start storlet daemon: %s' %
resp.message)
return 0
except SBusClientException:
return -1
def stop_storlet_daemon(self, storlet_id):
"""
Stop SDaemon process in the scope's sandbox
"""
with _open_pipe() as (read_fd, write_fd):
dtg = SBusServiceDatagram(
sbus_cmd.SBUS_CMD_STOP_DAEMON,
[write_fd],
[FDMetadata(sbus_fd.SBUS_FD_SERVICE_OUT).to_dict()],
{'storlet_name': storlet_id})
pipe_path = self.paths.host_factory_pipe()
rc = SBus.send(pipe_path, dtg)
if (rc < 0):
self.logger.info("Failed to send status command to %s %s" %
(self.scope, storlet_id))
return -1
reply = os.read(read_fd, 10)
res, error_txt = self._parse_sandbox_factory_answer(reply)
if res is True:
return 1
self.logger.error('Failed to stop storlet daemon: %s' % error_txt)
return 0
pipe_path = self.paths.host_factory_pipe()
client = SBusClient(pipe_path)
try:
resp = client.stop_daemon(storlet_id)
if resp.status:
return 1
else:
self.logger.error('Failed to stop storlet daemon: %s' %
resp.message)
return 0
except SBusClientException:
return -1
def get_storlet_daemon_status(self, storlet_id):
"""
Get the status of SDaemon process in the scope's sandbox
"""
with _open_pipe() as (read_fd, write_fd):
dtg = SBusServiceDatagram(
sbus_cmd.SBUS_CMD_DAEMON_STATUS,
[write_fd],
[FDMetadata(sbus_fd.SBUS_FD_SERVICE_OUT).to_dict()],
{'storlet_name': storlet_id})
pipe_path = self.paths.host_factory_pipe()
rc = SBus.send(pipe_path, dtg)
if (rc < 0):
self.logger.info("Failed to send status command to %s %s" %
(self.scope, storlet_id))
return -1
reply = os.read(read_fd, 10)
res, error_txt = self._parse_sandbox_factory_answer(reply)
if res is True:
return 1
self.logger.error('Failed to get status about storlet daemon: %s' %
error_txt)
return 0
pipe_path = self.paths.host_factory_pipe()
client = SBusClient(pipe_path)
try:
resp = client.daemon_status(storlet_id)
if resp.status:
return 1
else:
self.logger.error('Failed to get status about storlet '
'daemon: %s' % resp.message)
return 0
except SBusClientException:
return -1
def _get_storlet_classpath(self, storlet_main, storlet_id, dependencies):
"""
@ -715,18 +665,13 @@ class StorletInvocationProtocol(object):
"""
Cancel on-going storlet execution
"""
with _open_pipe() as (read_fd, write_fd):
dtg = SBusServiceDatagram(
sbus_cmd.SBUS_CMD_CANCEL,
[write_fd],
[FDMetadata(sbus_fd.SBUS_FD_SERVICE_OUT).to_dict()],
None,
self.task_id)
rc = SBus.send(self.storlet_pipe_path, dtg)
if (rc < 0):
client = SBusClient(self.storlet_pipe_path)
try:
resp = client.cancel(self.task_id)
if not resp.status:
raise StorletRuntimeException('Failed to cancel task')
# TODO(takashi): Check the response here
os.read(read_fd, 10)
except SBusClientException:
raise StorletRuntimeException('Failed to cancel task')
def _invoke(self):
"""
@ -746,7 +691,7 @@ class StorletInvocationProtocol(object):
execution
"""
dtg = SBusExecuteDatagram(
sbus_cmd.SBUS_CMD_EXECUTE,
SBUS_CMD_EXECUTE,
self.remote_fds,
self.remote_fds_metadata,
self.srequest.params)

View File

@ -0,0 +1,123 @@
# Copyright (c) 2016 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from storlets.sbus import SBus
from storlets.sbus.command import SBUS_CMD_CANCEL, SBUS_CMD_DAEMON_STATUS, \
SBUS_CMD_HALT, SBUS_CMD_PING, SBUS_CMD_START_DAEMON, \
SBUS_CMD_STOP_DAEMON, SBUS_CMD_STOP_DAEMONS
from storlets.sbus.datagram import FDMetadata, SBusServiceDatagram
from storlets.sbus.file_description import SBUS_FD_SERVICE_OUT
from storlets.sbus.client.exceptions import SBusClientIOError, \
SBusClientMalformedResponse, SBusClientSendError
class SBusResponse(object):
def __init__(self, status, message):
"""
Construct SBusResponse class
:param status: Whether the server succeed to process the given request
:param message: Messages to describe the process result
"""
self.status = status
self.message = message
class SBusClient(object):
def __init__(self, socket_path, chunk_size=16):
self.socket_path = socket_path
self.chunk_size = chunk_size
def _parse_response(self, str_response):
"""
Parse response string recieved from container side
:param str_response: response string
:returns: SBusResponse instance
"""
two_tokens = str_response.split(':', 1)
if len(two_tokens) != 2:
raise SBusClientMalformedResponse('Got malformed response')
status = (two_tokens[0].lower() == 'true')
message = two_tokens[1]
return SBusResponse(status, message)
def _request(self, command, params=None, task_id=None):
read_fd, write_fd = os.pipe()
try:
try:
datagram = SBusServiceDatagram(
command, [write_fd],
[FDMetadata(SBUS_FD_SERVICE_OUT).to_dict()],
params, task_id)
rc = SBus.send(self.socket_path, datagram)
if rc < 0:
raise SBusClientSendError(
'Faild to send command(%s) to socket %s' %
(datagram.command, self.socket_path))
finally:
# We already sent the write fd to remote, so should close it
# in local side before reading response
os.close(write_fd)
reply = ''
while True:
try:
buf = os.read(read_fd, self.chunk_size)
except IOError:
raise SBusClientIOError('Failed to read data from read '
'pipe')
if not buf:
break
reply = reply + buf
finally:
os.close(read_fd)
return self._parse_response(reply)
def execute(self, *args, **kwargs):
# TODO(takashi): implement this
raise NotImplementedError('Execute command is not supported yet')
def ping(self):
return self._request(SBUS_CMD_PING)
def start_daemon(self, language, storlet_path, storlet_id,
uds_path, log_level, pool_size,
language_version):
params = {'daemon_language': language, 'storlet_path': storlet_path,
'storlet_name': storlet_id, 'uds_path': uds_path,
'log_level': log_level, 'pool_size': pool_size}
if language_version:
params['daemon_language_version'] = language_version
return self._request(SBUS_CMD_START_DAEMON, params)
def stop_daemon(self, storlet_name):
return self._request(SBUS_CMD_STOP_DAEMON,
{'storlet_name': storlet_name})
def stop_daemons(self):
return self._request(SBUS_CMD_STOP_DAEMONS)
def halt(self):
return self._request(SBUS_CMD_HALT)
def daemon_status(self, storlet_name):
return self._request(SBUS_CMD_DAEMON_STATUS,
{'storlet_name': storlet_name})
def cancel(self, task_id):
return self._request(SBUS_CMD_CANCEL, task_id=task_id)

View File

@ -0,0 +1,63 @@
# Copyright (c) 2016 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from sys import exit
from storlets.sbus.client import SBusClient
from storlets.sbus.client.exceptions import SBusClientException
EXIT_SUCCESS = 0
EXIT_ERROR = 1
def main(argv):
# TODO(takashi): Add more detailed help message
if len(argv) < 3:
print('sbus <command> <pipe_path>')
exit(EXIT_ERROR)
command = argv[1]
pipe_path = argv[2]
if not os.path.exists(pipe_path):
print('ERROR: Pipe file %s does not exist' % pipe_path)
exit(EXIT_ERROR)
client = SBusClient(pipe_path)
try:
handler = getattr(client, command)
# TODO(takashi): Currently this only works for ping or halt.
# We need to pass more parameters like storlet_name
# to implement the other command types.
resp = handler()
except (AttributeError, NotImplementedError):
print('ERROR: Command %s is not supported' % command)
exit(EXIT_ERROR)
except SBusClientException as err:
print('ERROR: Failed to send sbus command %s to %s: %s'
% (command, pipe_path, err))
exit(EXIT_ERROR)
except Exception as err:
print('ERROR: Unknown error: %s' % err)
exit(EXIT_ERROR)
print('Response: %s: %s' % (resp.status, resp.message))
if resp.status:
print('OK')
exit(EXIT_SUCCESS)
else:
print('ERROR: Got error response')
exit(EXIT_ERROR)

View File

@ -0,0 +1,30 @@
# Copyright (c) 2015, 2016 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class SBusClientException(Exception):
pass
class SBusClientIOError(SBusClientException, IOError):
pass
class SBusClientSendError(SBusClientException):
pass
class SBusClientMalformedResponse(SBusClientException):
pass

View File

@ -12,12 +12,15 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from contextlib import contextmanager
import errno
import mock
import unittest
from storlets.sbus import command as sbus_cmd
from storlets.sbus.client import SBusResponse
from storlets.sbus.client.exceptions import SBusClientSendError
from storlets.agent.daemon_factory.server import SDaemonError, \
StorletDaemonFactory
from storlets.agent.common.utils import DEFAULT_PY2, DEFAULT_PY3
@ -35,7 +38,12 @@ class TestStorletDaemonFactory(unittest.TestCase):
base_path = 'storlets.agent.daemon_factory.server'
kill_path = base_path + '.os.kill'
waitpid_path = base_path + '.os.waitpid'
sbus_path = base_path + '.SBus'
@contextmanager
def _mock_sbus_client(self, method):
sbusclient_path = self.base_path + '.SBusClient'
with mock.patch('.'.join([sbusclient_path, method])) as _method:
yield _method
def setUp(self):
self.logger = FakeLogger()
@ -110,13 +118,11 @@ class TestStorletDaemonFactory(unittest.TestCase):
with mock.patch(self.base_path + '.subprocess.Popen') as popen, \
mock.patch(self.base_path + '.time.sleep'), \
mock.patch(self.waitpid_path) as waitpid, \
mock.patch(self.sbus_path + '.send') as send, \
mock.patch(self.base_path + '.os.read') as read:
self._mock_sbus_client('ping') as ping:
popen.side_effect = [FakePopenObject(1000),
FakePopenObject(1001)]
waitpid.return_value = 0, 0
send.return_value = 0
read.return_value = 'True: OK'
ping.return_value = SBusResponse(True, 'OK')
self.dfactory.spawn_subprocess(
['arg0', 'argv1', 'argv2'],
{'envk0': 'envv0'}, 'storleta')
@ -127,13 +133,11 @@ class TestStorletDaemonFactory(unittest.TestCase):
with mock.patch(self.base_path + '.subprocess.Popen') as popen, \
mock.patch(self.base_path + '.time.sleep'), \
mock.patch(self.waitpid_path) as waitpid, \
mock.patch(self.sbus_path + '.send') as send, \
mock.patch(self.base_path + '.os.read') as read:
self._mock_sbus_client('ping') as ping:
popen.side_effect = [FakePopenObject(1000),
FakePopenObject(1001)]
waitpid.return_value = 0, 0
send.return_value = 0
read.return_value = 'False: NG'
ping.return_value = SBusResponse(False, 'NG')
with self.assertRaises(SDaemonError):
self.dfactory.spawn_subprocess(
['arg0', 'argv1', 'argv2'],
@ -165,40 +169,32 @@ class TestStorletDaemonFactory(unittest.TestCase):
self.dfactory.storlet_name_to_pipe_name = \
{'storleta': 'path/to/uds/a'}
with mock.patch(self.sbus_path + '.send') as send, \
mock.patch(self.base_path + '.time.sleep'), \
mock.patch(self.base_path + '.os.read') as read:
send.side_effect = [-1, 0]
read.return_value = 'True: OK'
with self._mock_sbus_client('ping') as ping, \
mock.patch(self.base_path + '.time.sleep'):
ping.return_value = SBusResponse(True, 'OK')
self.assertTrue(
self.dfactory.wait_for_daemon_to_initialize('storleta'))
self.assertEqual(2, send.call_count)
self.assertEqual(1, read.call_count)
self.assertEqual(1, ping.call_count)
with mock.patch(self.sbus_path + '.send') as send, \
mock.patch(self.base_path + '.time.sleep'), \
mock.patch(self.base_path + '.os.read') as read:
send.return_value = 0
read.return_value = 'False: NG'
with self._mock_sbus_client('ping') as ping, \
mock.patch(self.base_path + '.time.sleep'):
ping.return_value = SBusResponse(False, 'NG')
self.assertFalse(
self.dfactory.wait_for_daemon_to_initialize('storleta'))
self.assertEqual(
self.dfactory.NUM_OF_TRIES_PINGING_STARTING_DAEMON,
send.call_count)
self.assertEqual(
self.dfactory.NUM_OF_TRIES_PINGING_STARTING_DAEMON,
read.call_count)
ping.call_count)
self.dfactory.storlet_name_to_pipe_name = \
{'storleta': 'path/to/uds/a', 'storletb': 'path/to/uds/b'}
with mock.patch(self.sbus_path + '.send') as send, \
with self._mock_sbus_client('ping') as ping, \
mock.patch(self.base_path + '.time.sleep'):
send.return_value = -1
ping.side_effect = SBusClientSendError()
self.assertFalse(
self.dfactory.wait_for_daemon_to_initialize('storleta'))
self.assertEqual(
self.dfactory.NUM_OF_TRIES_PINGING_STARTING_DAEMON,
send.call_count)
ping.call_count)
def test_process_start_daemon(self):
# Not running
@ -213,13 +209,11 @@ class TestStorletDaemonFactory(unittest.TestCase):
with mock.patch(self.base_path + '.subprocess.Popen') as popen, \
mock.patch(self.base_path + '.time.sleep'), \
mock.patch(self.waitpid_path) as waitpid, \
mock.patch(self.sbus_path + '.send') as send, \
mock.patch(self.base_path + '.os.read') as read:
self._mock_sbus_client('ping') as ping:
popen.side_effect = [FakePopenObject(1000),
FakePopenObject(1001)]
waitpid.return_value = 0, 0
send.return_value = 0
read.return_value = 'True: OK'
ping.return_value = SBusResponse(True, 'OK')
self.assertTrue(self.dfactory.process_start_daemon(
'java', 'path/to/storlet/a', 'storleta', 1, 'path/to/uds/a',
'TRACE'))
@ -404,11 +398,9 @@ class TestStorletDaemonFactory(unittest.TestCase):
{'storleta': 1000, 'storletb': 1001}
self.dfactory.storlet_name_to_pipe_name = \
{'storleta': 'path/to/uds/a', 'storletb': 'path/to/uds/b'}
with mock.patch(self.sbus_path + '.send') as send, \
mock.patch(self.base_path + '.os.read') as read, \
with self._mock_sbus_client('halt') as halt, \
mock.patch(self.waitpid_path):
send.return_value = 0
read.return_value = 'True: OK'
halt.return_value = SBusResponse(True, 'OK')
terminated = self.dfactory.shutdown_all_processes()
self.assertEqual(2, len(terminated))
self.assertIn('storleta', terminated)
@ -421,17 +413,15 @@ class TestStorletDaemonFactory(unittest.TestCase):
{'storleta': 1000, 'storletb': 1001}
self.dfactory.storlet_name_to_pipe_name = \
{'storleta': 'patha', 'storletb': 'pathb'}
with mock.patch(self.sbus_path + '.send') as send, \
mock.patch(self.base_path + '.os.read') as read, \
with self._mock_sbus_client('halt') as halt, \
mock.patch(self.waitpid_path) as waitpid:
send.return_value = -1
read.return_value = 'True: OK'
halt.side_effect = SBusClientSendError()
exc_pattern = '^Failed to shutdown some storlet daemons: .*'
with self.assertRaisesRegexp(SDaemonError, exc_pattern) as e:
self.dfactory.shutdown_all_processes()
self.assertIn('storleta', str(e.exception))
self.assertIn('storletb', str(e.exception))
self.assertEqual(2, send.call_count)
self.assertEqual(2, halt.call_count)
self.assertEqual(0, waitpid.call_count)
self.assertEqual({'storleta': 1000, 'storletb': 1001},
self.dfactory.storlet_name_to_pid)
@ -441,16 +431,14 @@ class TestStorletDaemonFactory(unittest.TestCase):
{'storleta': 1000, 'storletb': 1001}
self.dfactory.storlet_name_to_pipe_name = \
{'storleta': 'patha', 'storletb': 'pathb'}
with mock.patch(self.sbus_path + '.send') as send, \
mock.patch(self.base_path + '.os.read') as read, \
with self._mock_sbus_client('halt') as halt, \
mock.patch(self.waitpid_path) as waitpid:
send.return_value = -1
read.return_value = 'True: OK'
halt.side_effect = SBusClientSendError()
exc_pattern = ('^Failed to send halt command to the storlet '
'daemon storlet[a-b]$')
with self.assertRaisesRegexp(SDaemonError, exc_pattern):
self.dfactory.shutdown_all_processes(False)
self.assertEqual(1, send.call_count)
self.assertEqual(1, halt.call_count)
self.assertEqual(0, waitpid.call_count)
self.assertEqual({'storleta': 1000, 'storletb': 1001},
self.dfactory.storlet_name_to_pid)
@ -461,11 +449,9 @@ class TestStorletDaemonFactory(unittest.TestCase):
{'storleta': 1000, 'storletb': 1001}
self.dfactory.storlet_name_to_pipe_name = \
{'storleta': 'path/to/uds/a', 'storletb': 'path/to/uds/b'}
with mock.patch(self.sbus_path + '.send') as send, \
mock.patch(self.base_path + '.os.read') as read, \
with self._mock_sbus_client('halt') as halt, \
mock.patch(self.waitpid_path):
send.return_value = 0
read.return_value = 'True: OK'
halt.return_value = SBusResponse(True, 'OK')
self.dfactory.shutdown_process('storleta')
self.assertEqual({'storletb': 1001},
self.dfactory.storlet_name_to_pid)
@ -475,11 +461,9 @@ class TestStorletDaemonFactory(unittest.TestCase):
{'storleta': 1000, 'storletb': 1001}
self.dfactory.storlet_name_to_pipe_name = \
{'storleta': 'path/to/uds/a', 'storletb': 'path/to/uds/b'}
with mock.patch(self.sbus_path + '.send') as send, \
mock.patch(self.base_path + '.os.read') as read, \
with self._mock_sbus_client('halt') as halt, \
mock.patch(self.waitpid_path) as waitpid:
send.return_value = -1
read.return_value = 'True: OK'
halt.side_effect = SBusClientSendError()
with self.assertRaises(SDaemonError):
self.dfactory.shutdown_process('storleta')
self.assertEqual(0, waitpid.call_count)
@ -491,11 +475,9 @@ class TestStorletDaemonFactory(unittest.TestCase):
{'storleta': 1000, 'storletb': 1001}
self.dfactory.storlet_name_to_pipe_name = \
{'storleta': 'path/to/uds/a', 'storletb': 'path/to/uds/b'}
with mock.patch(self.sbus_path + '.send') as send, \
mock.patch(self.base_path + '.os.read') as read, \
with self._mock_sbus_client('halt') as halt, \
mock.patch(self.waitpid_path) as waitpid:
send.return_value = 0
read.return_value = 'True: OK'
halt.return_value = SBusResponse(True, 'OK')
waitpid.side_effect = OSError()
with self.assertRaises(SDaemonError):
self.dfactory.shutdown_process('storleta')
@ -529,13 +511,13 @@ class TestStorletDaemonFactory(unittest.TestCase):
with mock.patch(self.base_path + '.subprocess.Popen') as popen, \
mock.patch(self.base_path + '.time.sleep'), \
mock.patch(self.waitpid_path) as waitpid, \
mock.patch(self.sbus_path + '.send') as send, \
mock.patch(self.base_path + '.os.read') as read:
self._mock_sbus_client('ping') as ping, \
self._mock_sbus_client('start_daemon') as start_daemon:
popen.side_effect = [FakePopenObject(1000),
FakePopenObject(1001)]
waitpid.return_value = 0, 0
send.return_value = 0
read.return_value = 'True: OK'
ping.return_value = SBusResponse(True, 'OK')
start_daemon.return_value = SBusResponse(True, 'OK')
ret = self.dfactory.start_daemon(DummyDatagram(prms))
self.assertTrue(ret.status)
self.assertEqual('OK', ret.message)
@ -621,11 +603,9 @@ class TestStorletDaemonFactory(unittest.TestCase):
{'storleta': 1000, 'storletb': 1001}
self.dfactory.storlet_name_to_pipe_name = \
{'storleta': 'path/to/uds/a', 'storletb': 'path/to/uds/b'}
with mock.patch(self.sbus_path + '.send') as send, \
mock.patch(self.base_path + '.os.read') as read, \
with self._mock_sbus_client('halt') as halt, \
mock.patch(self.waitpid_path):
send.return_value = 0
read.return_value = 'True: OK'
halt.return_value = SBusResponse(True, 'OK')
resp = self.dfactory.halt(DummyDatagram())
self.assertTrue(resp.status)
self.assertIn('storleta: terminated', resp.message)

View File

@ -13,23 +13,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import os
import os.path
from shutil import rmtree
from tempfile import mkdtemp
import eventlet
import json
import six
from six import StringIO
import mock
import unittest
from swift.common.swob import Request, Response
from swift.common.utils import FileLikeIter
from storlets.sbus.client import SBusResponse
from tests.unit import FakeLogger
from tests.unit.gateway.gateways import FakeFileManager
from storlets.gateway.gateways.docker.gateway import DockerStorletRequest, \
StorletGatewayDocker
from tests.unit import MockSBus
import os
import os.path
from tempfile import mkdtemp
from shutil import rmtree
import mock
import json
import eventlet
class MockInternalClient(object):
@ -461,12 +466,6 @@ use = egg:swift#catch_errors
# TODO(kota_): need more efficient way for emuration of return value
# from SDaemon
value_generator = iter([
# Firt is for confirmation for SDaemon running
'True: daemon running confirmation',
# Second is stop SDaemon in activation
'True: stop daemon',
# Third is start SDaemon again in activation
'True: start daemon',
# Forth is return value for invoking as task_id
'This is task id',
# Fifth is for getting meta
@ -500,6 +499,7 @@ use = egg:swift#catch_errors
# SBus -> mock SBus.send() for container communication
# os.read -> mock reading the file descriptor from container
# select.slect -> mock fd communication which can be readable
@mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient')
@mock.patch('storlets.gateway.gateways.docker.runtime.SBus', MockSBus)
@mock.patch('storlets.gateway.gateways.docker.runtime.os.read',
mock_read)
@ -510,7 +510,10 @@ use = egg:swift#catch_errors
@mock.patch('storlets.gateway.common.stob.os.read',
mock_read)
@mock.patch(invocation_protocol, mock_writer)
def test_invocation_flow():
def test_invocation_flow(client):
client.ping.return_value = SBusResponse(True, 'OK')
client.stop_daemon.return_value = SBusResponse(True, 'OK')
client.start_daemon.return_value = SBusResponse(True, 'OK')
sresp = self.gateway.invocation_flow(st_req, extra_sources)
eventlet.sleep(0.1)
file_like = FileLikeIter(sresp.data_iter)

View File

@ -22,6 +22,9 @@ from contextlib import contextmanager
from six import StringIO
from stat import ST_MODE
from storlets.sbus.client import SBusResponse
from storlets.sbus.client.exceptions import SBusClientIOError, \
SBusClientMalformedResponse, SBusClientSendError
from storlets.gateway.common.exceptions import StorletRuntimeException, \
StorletTimeout
from storlets.gateway.gateways.docker.gateway import DockerStorletRequest
@ -263,62 +266,49 @@ class TestRunTimeSandbox(unittest.TestCase):
self.scope = '0123456789abc'
self.sbox = RunTimeSandbox(self.scope, self.conf, self.logger)
def tearDown(self):
pass
def test_parse_sandbox_factory_answer(self):
status, msg = self.sbox._parse_sandbox_factory_answer('True:message')
self.assertTrue(status)
self.assertEqual('message', msg)
status, msg = self.sbox._parse_sandbox_factory_answer('False:message')
self.assertFalse(status)
self.assertEqual('message', msg)
with self.assertRaises(StorletRuntimeException):
self.sbox._parse_sandbox_factory_answer('Foo')
def _check_all_pipese_closed(self, pipes):
for _pipe in pipes:
self.assertTrue(_pipe[0].closed)
self.assertTrue(_pipe[1].closed)
def test_ping(self):
with _mock_os_pipe(['True:OK']) as pipes, _mock_sbus(0):
self.assertEqual(1, self.sbox.ping())
self._check_all_pipese_closed(pipes)
with mock.patch('storlets.gateway.gateways.docker.runtime.'
'SBusClient.ping') as ping:
ping.return_value = SBusResponse(True, 'OK')
self.assertEqual(self.sbox.ping(), 1)
with _mock_os_pipe(['False:ERROR']) as pipes, _mock_sbus(-1):
self.assertEqual(-1, self.sbox.ping())
self._check_all_pipese_closed(pipes)
with mock.patch('storlets.gateway.gateways.docker.runtime.'
'SBusClient.ping') as ping:
ping.return_value = SBusResponse(False, 'Error')
self.assertEqual(self.sbox.ping(), 0)
with _mock_os_pipe(['Foo']) as pipes, _mock_sbus(0):
with self.assertRaises(StorletRuntimeException):
self.sbox.ping()
self._check_all_pipese_closed(pipes)
with mock.patch('storlets.gateway.gateways.docker.runtime.'
'SBusClient.ping') as ping:
ping.side_effect = SBusClientSendError()
self.assertEqual(self.sbox.ping(), -1)
with mock.patch('storlets.gateway.gateways.docker.runtime.'
'SBusClient.ping') as ping:
ping.side_effect = SBusClientMalformedResponse()
self.assertEqual(self.sbox.ping(), -1)
with mock.patch('storlets.gateway.gateways.docker.runtime.'
'SBusClient.ping') as ping:
ping.side_effect = SBusClientIOError()
self.assertEqual(self.sbox.ping(), -1)
def test_wait(self):
with _mock_os_pipe(['True:OK']) as pipes, _mock_sbus(0), \
with mock.patch('storlets.gateway.gateways.docker.runtime.'
'SBusClient.ping') as ping, \
mock.patch('storlets.gateway.gateways.docker.runtime.'
'time.sleep') as _s:
'time.sleep') as sleep:
ping.return_value = SBusResponse(True, 'OK')
self.sbox.wait()
self.assertEqual(0, _s.call_count)
self._check_all_pipese_closed(pipes)
self.assertEqual(sleep.call_count, 0)
with _mock_os_pipe(['False:ERROR', 'True:OK']) as pipes, \
_mock_sbus(0), \
with mock.patch('storlets.gateway.gateways.docker.runtime.'
'SBusClient.ping') as ping, \
mock.patch('storlets.gateway.gateways.docker.runtime.'
'time.sleep') as _s:
'time.sleep') as sleep:
ping.side_effect = [SBusResponse(False, 'Error'),
SBusResponse(True, 'OK')]
self.sbox.wait()
self.assertEqual(1, _s.call_count)
self._check_all_pipese_closed(pipes)
with _mock_os_pipe(['Foo']) as pipes, _mock_sbus(0), \
mock.patch('storlets.gateway.gateways.docker.runtime.'
'time.sleep') as _s:
with self.assertRaises(StorletRuntimeException):
self.sbox.wait()
self._check_all_pipese_closed(pipes)
self.assertEqual(sleep.call_count, 1)
# TODO(takashi): should test timeout case

View File

View File

@ -0,0 +1,152 @@
# Copyright (c) 2015-2016 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import os
import unittest
import errno
from contextlib import contextmanager
from storlets.sbus.client.exceptions import SBusClientSendError, \
SBusClientMalformedResponse
from storlets.sbus.client import SBusClient
@contextmanager
def _mock_sbus(send_status=0):
with mock.patch('storlets.sbus.client.SBus.send') as fake_send:
fake_send.return_value = send_status
yield
@contextmanager
def _mock_os_pipe(bufs):
class FakeFd(object):
def __init__(self, rbuf=''):
self.rbuf = rbuf
self.closed = False
def read(self, size):
size = min(len(self.rbuf), size)
ret = self.rbuf[:size]
self.rbuf = self.rbuf[size:]
return ret
def close(self):
if self.closed:
raise OSError(errno.EBADF, os.strerror(errno.EBADF))
self.closed = True
def fake_os_read(fd, size):
return fd.read(size)
def fake_os_close(fd):
fd.close()
pipes = [(FakeFd(buf), FakeFd()) for buf in bufs]
pipe_generator = iter(pipes)
def mock_os_pipe():
try:
return next(pipe_generator)
except StopIteration:
raise AssertionError('pipe called more than expected')
with mock.patch('storlets.sbus.client.os.pipe', mock_os_pipe), \
mock.patch('storlets.sbus.client.os.read', fake_os_read), \
mock.patch('storlets.sbus.client.os.close', fake_os_close):
yield pipes
class TestSBusClient(unittest.TestCase):
def setUp(self):
self.pipe_path = 'pipe_path'
self.client = SBusClient(self.pipe_path, 4)
def test_parse_response(self):
resp = self.client._parse_response('True:OK')
self.assertTrue(resp.status)
self.assertEqual('OK', resp.message)
resp = self.client._parse_response('False:NG')
self.assertFalse(resp.status)
self.assertEqual('NG', resp.message)
resp = self.client._parse_response('True:Sample:Message')
self.assertTrue(resp.status)
self.assertEqual('Sample:Message', resp.message)
with self.assertRaises(SBusClientMalformedResponse):
resp = self.client._parse_response('Foo')
def _check_all_pipes_closed(self, pipes):
# Make sure that pipes are not empty
self.assertGreater(len(pipes), 0)
for _pipe in pipes:
self.assertTrue(_pipe[0].closed)
self.assertTrue(_pipe[1].closed)
def _test_service_request(self, method, *args, **kwargs):
with _mock_os_pipe(['True:OK']) as pipes, _mock_sbus(0):
resp = method(*args, **kwargs)
self.assertTrue(resp.status)
self.assertEqual('OK', resp.message)
self._check_all_pipes_closed(pipes)
with _mock_os_pipe(['False:ERROR']) as pipes, _mock_sbus(0):
resp = method(*args, **kwargs)
self.assertFalse(resp.status)
self.assertEqual('ERROR', resp.message)
self._check_all_pipes_closed(pipes)
with _mock_os_pipe(['True:OK']) as pipes, _mock_sbus(-1):
with self.assertRaises(SBusClientSendError):
method(*args, **kwargs)
self._check_all_pipes_closed(pipes)
# TODO(takashi): Add IOError case
with _mock_os_pipe(['Foo']) as pipes, _mock_sbus(0):
with self.assertRaises(SBusClientMalformedResponse):
method(*args, **kwargs)
self._check_all_pipes_closed(pipes)
def test_ping(self):
self._test_service_request(self.client.ping)
def test_start_daemon(self):
self._test_service_request(
self.client.start_daemon, 'java', 'path/to/storlet',
'storleta', 'path/to/uds', 'INFO', '10', '11')
def test_stop_daemon(self):
self._test_service_request(self.client.stop_daemon, 'storleta')
def test_stop_daemons(self):
self._test_service_request(self.client.stop_daemons)
def test_halt(self):
self._test_service_request(self.client.halt)
def test_daemon_status(self):
self._test_service_request(self.client.daemon_status, 'storleta')
def test_cancel(self):
self._test_service_request(self.client.cancel, 'taskid')
if __name__ == '__main__':
unittest.main()