Merge "Add validation about fds passed over SBus"
This commit is contained in:
@@ -17,7 +17,8 @@ import os
|
|||||||
import sys
|
import sys
|
||||||
|
|
||||||
from storlets.sbus import SBus
|
from storlets.sbus import SBus
|
||||||
from storlets.sbus.datagram import SBusDatagram
|
from storlets.sbus.datagram import FDMetadata, SBusServiceDatagram
|
||||||
|
from storlets.sbus.file_description import SBUS_FD_SERVICE_OUT
|
||||||
from storlets.sbus.command import SBUS_CMD_HALT
|
from storlets.sbus.command import SBUS_CMD_HALT
|
||||||
|
|
||||||
|
|
||||||
@@ -37,8 +38,10 @@ def main(argv):
|
|||||||
daemon_factory_pipe_name = argv[1]
|
daemon_factory_pipe_name = argv[1]
|
||||||
try:
|
try:
|
||||||
fi, fo = os.pipe()
|
fi, fo = os.pipe()
|
||||||
halt_dtg = SBusDatagram.create_service_datagram(
|
halt_dtg = SBusServiceDatagram(
|
||||||
SBUS_CMD_HALT, fo)
|
SBUS_CMD_HALT,
|
||||||
|
[fo],
|
||||||
|
[FDMetadata(SBUS_FD_SERVICE_OUT).to_dict()])
|
||||||
n_status = SBus.send(daemon_factory_pipe_name, halt_dtg)
|
n_status = SBus.send(daemon_factory_pipe_name, halt_dtg)
|
||||||
if n_status < 0:
|
if n_status < 0:
|
||||||
print('Sending failed')
|
print('Sending failed')
|
||||||
|
@@ -23,8 +23,9 @@ import subprocess
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
from storlets.sbus import SBus
|
from storlets.sbus import SBus
|
||||||
from storlets.sbus.datagram import SBusDatagram
|
from storlets.sbus.datagram import FDMetadata, SBusServiceDatagram
|
||||||
from storlets.sbus.command import SBUS_CMD_PREFIX, SBUS_CMD_HALT, SBUS_CMD_PING
|
from storlets.sbus.command import SBUS_CMD_PREFIX, SBUS_CMD_HALT, SBUS_CMD_PING
|
||||||
|
from storlets.sbus.file_description import SBUS_FD_SERVICE_OUT
|
||||||
|
|
||||||
|
|
||||||
EXIT_SUCCESS = 0
|
EXIT_SUCCESS = 0
|
||||||
@@ -229,8 +230,10 @@ class DaemonFactory(object):
|
|||||||
format(storlet_name, storlet_pipe_name))
|
format(storlet_name, storlet_pipe_name))
|
||||||
read_fd, write_fd = os.pipe()
|
read_fd, write_fd = os.pipe()
|
||||||
try:
|
try:
|
||||||
dtg = SBusDatagram.create_service_datagram(
|
dtg = SBusServiceDatagram(
|
||||||
SBUS_CMD_PING, write_fd)
|
SBUS_CMD_PING,
|
||||||
|
[write_fd],
|
||||||
|
[FDMetadata(SBUS_FD_SERVICE_OUT).to_dict()])
|
||||||
for i in range(self.NUM_OF_TRIES_PINGING_STARTING_DAEMON):
|
for i in range(self.NUM_OF_TRIES_PINGING_STARTING_DAEMON):
|
||||||
ret = SBus.send(storlet_pipe_name, dtg)
|
ret = SBus.send(storlet_pipe_name, dtg)
|
||||||
if ret >= 0:
|
if ret >= 0:
|
||||||
@@ -454,8 +457,10 @@ class DaemonFactory(object):
|
|||||||
|
|
||||||
read_fd, write_fd = os.pipe()
|
read_fd, write_fd = os.pipe()
|
||||||
try:
|
try:
|
||||||
dtg = SBusDatagram.create_service_datagram(
|
dtg = SBusServiceDatagram(
|
||||||
SBUS_CMD_HALT, write_fd)
|
SBUS_CMD_HALT,
|
||||||
|
[write_fd],
|
||||||
|
[FDMetadata(SBUS_FD_SERVICE_OUT).to_dict()])
|
||||||
rc = SBus.send(storlet_pipe_name, dtg)
|
rc = SBus.send(storlet_pipe_name, dtg)
|
||||||
os.close(write_fd)
|
os.close(write_fd)
|
||||||
if rc < 0:
|
if rc < 0:
|
||||||
|
@@ -27,7 +27,8 @@ import json
|
|||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
|
||||||
from storlets.sbus import SBus
|
from storlets.sbus import SBus
|
||||||
from storlets.sbus.datagram import FDMetadata, SBusDatagram
|
from storlets.sbus.datagram import FDMetadata, SBusServiceDatagram, \
|
||||||
|
SBusExecuteDatagram
|
||||||
from storlets.sbus import file_description as sbus_fd
|
from storlets.sbus import file_description as sbus_fd
|
||||||
from storlets.sbus import command as sbus_cmd
|
from storlets.sbus import command as sbus_cmd
|
||||||
from storlets.gateway.common.exceptions import StorletRuntimeException, \
|
from storlets.gateway.common.exceptions import StorletRuntimeException, \
|
||||||
@@ -262,9 +263,10 @@ class RunTimeSandbox(object):
|
|||||||
pipe_path = self.paths.host_factory_pipe()
|
pipe_path = self.paths.host_factory_pipe()
|
||||||
|
|
||||||
with _open_pipe() as (read_fd, write_fd):
|
with _open_pipe() as (read_fd, write_fd):
|
||||||
dtg = SBusDatagram.create_service_datagram(
|
dtg = SBusServiceDatagram(
|
||||||
sbus_cmd.SBUS_CMD_PING,
|
sbus_cmd.SBUS_CMD_PING,
|
||||||
write_fd)
|
[write_fd],
|
||||||
|
[FDMetadata(sbus_fd.SBUS_FD_SERVICE_OUT).to_dict()])
|
||||||
rc = SBus.send(pipe_path, dtg)
|
rc = SBus.send(pipe_path, dtg)
|
||||||
if (rc < 0):
|
if (rc < 0):
|
||||||
return -1
|
return -1
|
||||||
@@ -360,9 +362,10 @@ class RunTimeSandbox(object):
|
|||||||
'pool_size': self.storlet_daemon_thread_pool_size}
|
'pool_size': self.storlet_daemon_thread_pool_size}
|
||||||
|
|
||||||
with _open_pipe() as (read_fd, write_fd):
|
with _open_pipe() as (read_fd, write_fd):
|
||||||
dtg = SBusDatagram.create_service_datagram(
|
dtg = SBusServiceDatagram(
|
||||||
sbus_cmd.SBUS_CMD_START_DAEMON,
|
sbus_cmd.SBUS_CMD_START_DAEMON,
|
||||||
write_fd,
|
[write_fd],
|
||||||
|
[FDMetadata(sbus_fd.SBUS_FD_SERVICE_OUT).to_dict()],
|
||||||
prms)
|
prms)
|
||||||
|
|
||||||
pipe_path = self.paths.host_factory_pipe()
|
pipe_path = self.paths.host_factory_pipe()
|
||||||
@@ -384,9 +387,10 @@ class RunTimeSandbox(object):
|
|||||||
Stop SDaemon process in the scope's sandbox
|
Stop SDaemon process in the scope's sandbox
|
||||||
"""
|
"""
|
||||||
with _open_pipe() as (read_fd, write_fd):
|
with _open_pipe() as (read_fd, write_fd):
|
||||||
dtg = SBusDatagram.create_service_datagram(
|
dtg = SBusServiceDatagram(
|
||||||
sbus_cmd.SBUS_CMD_STOP_DAEMON,
|
sbus_cmd.SBUS_CMD_STOP_DAEMON,
|
||||||
write_fd,
|
[write_fd],
|
||||||
|
[FDMetadata(sbus_fd.SBUS_FD_SERVICE_OUT).to_dict()],
|
||||||
{'storlet_name': storlet_id})
|
{'storlet_name': storlet_id})
|
||||||
pipe_path = self.paths.host_factory_pipe()
|
pipe_path = self.paths.host_factory_pipe()
|
||||||
rc = SBus.send(pipe_path, dtg)
|
rc = SBus.send(pipe_path, dtg)
|
||||||
@@ -408,9 +412,10 @@ class RunTimeSandbox(object):
|
|||||||
Get the status of SDaemon process in the scope's sandbox
|
Get the status of SDaemon process in the scope's sandbox
|
||||||
"""
|
"""
|
||||||
with _open_pipe() as (read_fd, write_fd):
|
with _open_pipe() as (read_fd, write_fd):
|
||||||
dtg = SBusDatagram.create_service_datagram(
|
dtg = SBusServiceDatagram(
|
||||||
sbus_cmd.SBUS_CMD_DAEMON_STATUS,
|
sbus_cmd.SBUS_CMD_DAEMON_STATUS,
|
||||||
write_fd,
|
[write_fd],
|
||||||
|
[FDMetadata(sbus_fd.SBUS_FD_SERVICE_OUT).to_dict()],
|
||||||
{'storlet_name': storlet_id})
|
{'storlet_name': storlet_id})
|
||||||
pipe_path = self.paths.host_factory_pipe()
|
pipe_path = self.paths.host_factory_pipe()
|
||||||
rc = SBus.send(pipe_path, dtg)
|
rc = SBus.send(pipe_path, dtg)
|
||||||
@@ -682,9 +687,10 @@ class StorletInvocationProtocol(object):
|
|||||||
Cancel on-going storlet execution
|
Cancel on-going storlet execution
|
||||||
"""
|
"""
|
||||||
with _open_pipe() as (read_fd, write_fd):
|
with _open_pipe() as (read_fd, write_fd):
|
||||||
dtg = SBusDatagram.create_service_datagram(
|
dtg = SBusServiceDatagram(
|
||||||
sbus_cmd.SBUS_CMD_CANCEL,
|
sbus_cmd.SBUS_CMD_CANCEL,
|
||||||
write_fd,
|
[write_fd],
|
||||||
|
[FDMetadata(sbus_fd.SBUS_FD_SERVICE_OUT).to_dict()],
|
||||||
None,
|
None,
|
||||||
self.task_id)
|
self.task_id)
|
||||||
rc = SBus.send(self.storlet_pipe_path, dtg)
|
rc = SBus.send(self.storlet_pipe_path, dtg)
|
||||||
@@ -710,7 +716,7 @@ class StorletInvocationProtocol(object):
|
|||||||
Send execute command to the remote daemon factory to invoke storlet
|
Send execute command to the remote daemon factory to invoke storlet
|
||||||
execution
|
execution
|
||||||
"""
|
"""
|
||||||
dtg = SBusDatagram(
|
dtg = SBusExecuteDatagram(
|
||||||
sbus_cmd.SBUS_CMD_EXECUTE,
|
sbus_cmd.SBUS_CMD_EXECUTE,
|
||||||
self.remote_fds,
|
self.remote_fds,
|
||||||
self.remote_fds_metadata,
|
self.remote_fds_metadata,
|
||||||
|
@@ -14,7 +14,7 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from ctypes import c_char_p, c_int, CDLL, POINTER
|
from ctypes import c_char_p, c_int, CDLL, POINTER
|
||||||
from storlets.sbus.datagram import SBusDatagram
|
from storlets.sbus.datagram import build_datagram_from_raw_message
|
||||||
|
|
||||||
|
|
||||||
class SBus(object):
|
class SBus(object):
|
||||||
@@ -116,7 +116,7 @@ class SBus(object):
|
|||||||
str_params = str_params[0:n_params]
|
str_params = str_params[0:n_params]
|
||||||
|
|
||||||
# Construct actual result datagram
|
# Construct actual result datagram
|
||||||
return SBusDatagram.build_from_raw_message(
|
return build_datagram_from_raw_message(
|
||||||
h_files, str_metadata, str_params)
|
h_files, str_metadata, str_params)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@@ -16,6 +16,7 @@ import copy
|
|||||||
import json
|
import json
|
||||||
|
|
||||||
from storlets.sbus import file_description as sbus_fd
|
from storlets.sbus import file_description as sbus_fd
|
||||||
|
from storlets.sbus.command import SBUS_CMD_EXECUTE
|
||||||
|
|
||||||
|
|
||||||
class FDMetadata(object):
|
class FDMetadata(object):
|
||||||
@@ -44,6 +45,10 @@ class SBusDatagram(object):
|
|||||||
The manager class for the datagram passed over sbus protocol
|
The manager class for the datagram passed over sbus protocol
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# Each child Datagram should define what fd types are expected with
|
||||||
|
# list format
|
||||||
|
_required_fd_types = None
|
||||||
|
|
||||||
def __init__(self, command, fds, metadata, params=None, task_id=None):
|
def __init__(self, command, fds, metadata, params=None, task_id=None):
|
||||||
"""
|
"""
|
||||||
Create SBusDatagram instance
|
Create SBusDatagram instance
|
||||||
@@ -51,67 +56,25 @@ class SBusDatagram(object):
|
|||||||
:param command: A string encoding the command to send
|
:param command: A string encoding the command to send
|
||||||
:param fds: A list of file descriptors (integer) to pass with
|
:param fds: A list of file descriptors (integer) to pass with
|
||||||
the command
|
the command
|
||||||
:param md: A list of dictionaries, where the i'th dictionary is the
|
:param metadata: A list of dictionaries, where the i'th dictionary
|
||||||
metadata of the i'th fd.
|
is the metadata of the i'th fd.
|
||||||
:param params: A optional dictionary with parameters for the command
|
:param params: A optional dictionary with parameters for the command
|
||||||
execution
|
execution
|
||||||
:param task_id: An optional string task id. This is currently used for
|
:param task_id: An optional string task id. This is currently used for
|
||||||
cancel command
|
cancel command
|
||||||
"""
|
"""
|
||||||
|
if type(self) == SBusDatagram:
|
||||||
|
raise NotImplementedError(
|
||||||
|
'SBusDatagram class should not be initialized as bare')
|
||||||
self.command = command
|
self.command = command
|
||||||
if len(fds) != len(metadata):
|
self._check_fd_nums(fds, metadata)
|
||||||
raise ValueError('Length mismatch fds:%s metadata:%s' %
|
fd_types = [md['storlets']['type'] for md in metadata]
|
||||||
(len(fds), len(metadata)))
|
self._check_required_fd_types(fd_types)
|
||||||
self.fds = fds
|
self.fds = fds
|
||||||
self.metadata = metadata
|
self.metadata = metadata
|
||||||
self.params = params
|
self.params = params
|
||||||
self.task_id = task_id
|
self.task_id = task_id
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def create_service_datagram(cls, command, outfd, params=None,
|
|
||||||
task_id=None):
|
|
||||||
"""
|
|
||||||
Create datagram which only has one service out fd
|
|
||||||
|
|
||||||
This method is used to create datagram for some command type, which
|
|
||||||
only needs one service out fd. Currently we can use this function for
|
|
||||||
the following commands.
|
|
||||||
- SBUS_CMD_HALT
|
|
||||||
- SBUS_CMD_START_DAEMON
|
|
||||||
- SBUS_CMD_STOP_DAEMON
|
|
||||||
- SBUS_CMD_DAEMON_STATUS
|
|
||||||
- SBUS_CMD_STOP_DAEMONS
|
|
||||||
- SBUS_CMD_PING
|
|
||||||
- SBUS_CMD_CANCEL
|
|
||||||
|
|
||||||
:param command: command type
|
|
||||||
:param outfd: service out fd integer
|
|
||||||
:param params: A optional dictionary with parameters for the command
|
|
||||||
execution
|
|
||||||
:param task_id: An optional string task id
|
|
||||||
"""
|
|
||||||
# TODO(takashi): Maybe we can get rid of this function
|
|
||||||
md = [FDMetadata(sbus_fd.SBUS_FD_SERVICE_OUT).to_dict()]
|
|
||||||
fds = [outfd]
|
|
||||||
return cls(command, fds, md, params, task_id)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def build_from_raw_message(cls, fds, str_md, str_cmd_params):
|
|
||||||
"""
|
|
||||||
Build SBusDatagram from raw message recieved over sbus
|
|
||||||
|
|
||||||
:param fds: A list of file descriptors (integer) to pass with
|
|
||||||
the command
|
|
||||||
:param str_md: json serialized metadata dict
|
|
||||||
:param str_cmd_params: json serialized command parameters dict
|
|
||||||
"""
|
|
||||||
metadata = json.loads(str_md)
|
|
||||||
cmd_params = json.loads(str_cmd_params)
|
|
||||||
command = cmd_params.get('command')
|
|
||||||
params = cmd_params.get('params')
|
|
||||||
task_id = cmd_params.get('task_id')
|
|
||||||
return cls(command, fds, metadata, params, task_id)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def num_fds(self):
|
def num_fds(self):
|
||||||
return len(self.fds)
|
return len(self.fds)
|
||||||
@@ -133,30 +96,6 @@ class SBusDatagram(object):
|
|||||||
def serialized_metadata(self):
|
def serialized_metadata(self):
|
||||||
return json.dumps(self.metadata)
|
return json.dumps(self.metadata)
|
||||||
|
|
||||||
@property
|
|
||||||
def service_out_fd(self):
|
|
||||||
return self._find_fd(sbus_fd.SBUS_FD_SERVICE_OUT)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def object_out_fds(self):
|
|
||||||
return self._find_fds(sbus_fd.SBUS_FD_OUTPUT_OBJECT)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def object_metadata_out_fds(self):
|
|
||||||
return self._find_fds(sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def task_id_out_fd(self):
|
|
||||||
return self._find_fd(sbus_fd.SBUS_FD_OUTPUT_TASK_ID)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def logger_out_fd(self):
|
|
||||||
return self._find_fd(sbus_fd.SBUS_FD_LOGGER)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def object_in_fds(self):
|
|
||||||
return self._find_fds(sbus_fd.SBUS_FD_INPUT_OBJECT)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def object_in_metadata(self):
|
def object_in_metadata(self):
|
||||||
return [md['storage'] for md in self.metadata
|
return [md['storage'] for md in self.metadata
|
||||||
@@ -196,8 +135,119 @@ class SBusDatagram(object):
|
|||||||
# fd validation.
|
# fd validation.
|
||||||
return ret[0]
|
return ret[0]
|
||||||
|
|
||||||
|
def _check_fd_nums(self, fds, metadata):
|
||||||
|
if len(fds) != len(metadata):
|
||||||
|
raise ValueError('Length mismatch fds:%s metadata:%s'
|
||||||
|
% (len(fds), len(metadata)))
|
||||||
|
|
||||||
|
def _check_required_fd_types(self, given_fd_types):
|
||||||
|
if self._required_fd_types is None:
|
||||||
|
raise NotImplementedError(
|
||||||
|
'SBusDatagram class should define _required_fd_types')
|
||||||
|
# the first len(self._required_fd_types) types should be fit
|
||||||
|
# to the required list
|
||||||
|
if given_fd_types[:len(self._required_fd_types)] != \
|
||||||
|
self._required_fd_types:
|
||||||
|
raise ValueError('Fd type mismatch given_fd_types:%s \
|
||||||
|
required_fd_types:%s' %
|
||||||
|
(given_fd_types, self._required_fd_types))
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return 'num_fds=%s, md=%s, cmd_params=%s' % (
|
return 'num_fds=%s, md=%s, cmd_params=%s' % (
|
||||||
self.num_fds,
|
self.num_fds,
|
||||||
str(self.serialized_metadata),
|
str(self.serialized_metadata),
|
||||||
str(self.serialized_cmd_params))
|
str(self.serialized_cmd_params))
|
||||||
|
|
||||||
|
|
||||||
|
class SBusServiceDatagram(SBusDatagram):
|
||||||
|
"""
|
||||||
|
This class deals with datagram which only has one service out fd.
|
||||||
|
|
||||||
|
This class is used to create datagram for some command type, which
|
||||||
|
only needs one service out fd. Currently we can use this class for
|
||||||
|
the following commands.
|
||||||
|
- SBUS_CMD_HALT
|
||||||
|
- SBUS_CMD_START_DAEMON
|
||||||
|
- SBUS_CMD_STOP_DAEMON
|
||||||
|
- SBUS_CMD_DAEMON_STATUS
|
||||||
|
- SBUS_CMD_STOP_DAEMONS
|
||||||
|
- SBUS_CMD_PING
|
||||||
|
- SBUS_CMD_CANCEL
|
||||||
|
"""
|
||||||
|
_required_fd_types = [sbus_fd.SBUS_FD_SERVICE_OUT]
|
||||||
|
|
||||||
|
def __init__(self, command, fds, metadata, params=None, task_id=None):
|
||||||
|
super(SBusServiceDatagram, self).__init__(
|
||||||
|
command, fds, metadata, params, task_id)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def service_out_fd(self):
|
||||||
|
return self._find_fd(sbus_fd.SBUS_FD_SERVICE_OUT)
|
||||||
|
|
||||||
|
|
||||||
|
class SBusExecuteDatagram(SBusDatagram):
|
||||||
|
_required_fd_types = [sbus_fd.SBUS_FD_INPUT_OBJECT,
|
||||||
|
sbus_fd.SBUS_FD_OUTPUT_TASK_ID,
|
||||||
|
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
|
||||||
|
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
|
||||||
|
sbus_fd.SBUS_FD_LOGGER]
|
||||||
|
|
||||||
|
def __init__(self, command, fds, metadata, params=None, task_id=None):
|
||||||
|
# TODO(kota_): the args command is not used in ExecuteDatagram
|
||||||
|
# but it could be worthful to taransparent init
|
||||||
|
# for other datagram classes.
|
||||||
|
# TODO(takashi): When we add extra output sources, we should
|
||||||
|
# consider how we can specify the number of the
|
||||||
|
# extra input/output sources, because currently
|
||||||
|
# this implementation is based on the idea that
|
||||||
|
# we only have extra input sources, which is
|
||||||
|
# added at the end of fd list
|
||||||
|
extra_fd_types = [
|
||||||
|
md['storlets']['type'] == sbus_fd.SBUS_FD_INPUT_OBJECT
|
||||||
|
for md in metadata[len(self._required_fd_types):]]
|
||||||
|
|
||||||
|
if not all(extra_fd_types):
|
||||||
|
raise ValueError(
|
||||||
|
'Extra data should be SBUS_FD_INPUT_OBJECT: %' % metadata)
|
||||||
|
|
||||||
|
super(SBusExecuteDatagram, self).__init__(
|
||||||
|
SBUS_CMD_EXECUTE, fds, metadata, params, task_id)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def object_out_fds(self):
|
||||||
|
return self._find_fds(sbus_fd.SBUS_FD_OUTPUT_OBJECT)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def object_metadata_out_fds(self):
|
||||||
|
return self._find_fds(sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def task_id_out_fd(self):
|
||||||
|
return self._find_fd(sbus_fd.SBUS_FD_OUTPUT_TASK_ID)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def logger_out_fd(self):
|
||||||
|
return self._find_fd(sbus_fd.SBUS_FD_LOGGER)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def object_in_fds(self):
|
||||||
|
return self._find_fds(sbus_fd.SBUS_FD_INPUT_OBJECT)
|
||||||
|
|
||||||
|
|
||||||
|
def build_datagram_from_raw_message(fds, str_md, str_cmd_params):
|
||||||
|
"""
|
||||||
|
Build SBusDatagram from raw message recieved over sbus
|
||||||
|
|
||||||
|
:param fds: A list of file descriptors (integer) to pass with
|
||||||
|
the command
|
||||||
|
:param str_md: json serialized metadata dict
|
||||||
|
:param str_cmd_params: json serialized command parameters dict
|
||||||
|
"""
|
||||||
|
metadata = json.loads(str_md)
|
||||||
|
cmd_params = json.loads(str_cmd_params)
|
||||||
|
command = cmd_params.get('command')
|
||||||
|
params = cmd_params.get('params')
|
||||||
|
task_id = cmd_params.get('task_id')
|
||||||
|
if command == SBUS_CMD_EXECUTE:
|
||||||
|
return SBusExecuteDatagram(command, fds, metadata, params, task_id)
|
||||||
|
return SBusServiceDatagram(command, fds, metadata, params, task_id)
|
||||||
|
@@ -17,7 +17,8 @@ import eventlet
|
|||||||
import mock
|
import mock
|
||||||
from storlet_daemon.daemon import (
|
from storlet_daemon.daemon import (
|
||||||
Daemon, EXIT_SUCCESS, StorletDaemonException)
|
Daemon, EXIT_SUCCESS, StorletDaemonException)
|
||||||
from sbus.datagram import SBusDatagram
|
from sbus.datagram import FDMetadata, SBusServiceDatagram
|
||||||
|
from sbus.file_description import SBUS_FD_SERVICE_OUT
|
||||||
import sbus.command
|
import sbus.command
|
||||||
|
|
||||||
from tests.unit.swift import FakeLogger
|
from tests.unit.swift import FakeLogger
|
||||||
@@ -99,11 +100,12 @@ class TestStorletDaemon(unittest.TestCase):
|
|||||||
fake_import.return_value = FakeModule()
|
fake_import.return_value = FakeModule()
|
||||||
daemon = Daemon(
|
daemon = Daemon(
|
||||||
'fakeModule.FakeClass', 'fake_path', self.logger, 16)
|
'fakeModule.FakeClass', 'fake_path', self.logger, 16)
|
||||||
|
metadata = [FDMetadata(SBUS_FD_SERVICE_OUT).to_dict()]
|
||||||
scenario = [
|
scenario = [
|
||||||
('create', 1),
|
('create', 1),
|
||||||
('listen', 1),
|
('listen', 1),
|
||||||
('receive', SBusDatagram(command=stop_command,
|
('receive', SBusServiceDatagram(command=stop_command, fds=[1],
|
||||||
fds=[], metadata=[],
|
metadata=metadata,
|
||||||
params=None, task_id=None)),
|
params=None, task_id=None)),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@@ -35,8 +35,6 @@ from exceptions import AssertionError
|
|||||||
@contextmanager
|
@contextmanager
|
||||||
def _mock_sbus(send_status=0):
|
def _mock_sbus(send_status=0):
|
||||||
with mock.patch('storlets.gateway.gateways.docker.runtime.'
|
with mock.patch('storlets.gateway.gateways.docker.runtime.'
|
||||||
'SBusDatagram.create_service_datagram'), \
|
|
||||||
mock.patch('storlets.gateway.gateways.docker.runtime.'
|
|
||||||
'SBus.send') as fake_send:
|
'SBus.send') as fake_send:
|
||||||
fake_send.return_value = send_status
|
fake_send.return_value = send_status
|
||||||
yield
|
yield
|
||||||
|
@@ -16,7 +16,17 @@
|
|||||||
import json
|
import json
|
||||||
import unittest
|
import unittest
|
||||||
import storlets.sbus.file_description as sbus_fd
|
import storlets.sbus.file_description as sbus_fd
|
||||||
from storlets.sbus.datagram import FDMetadata, SBusDatagram
|
from storlets.sbus.datagram import FDMetadata, SBusDatagram, \
|
||||||
|
SBusServiceDatagram, SBusExecuteDatagram, build_datagram_from_raw_message
|
||||||
|
from storlets.sbus.command import SBUS_CMD_PING, SBUS_CMD_EXECUTE
|
||||||
|
|
||||||
|
ALL_FD_TYPES = [
|
||||||
|
sbus_fd.SBUS_FD_INPUT_OBJECT, sbus_fd.SBUS_FD_OUTPUT_OBJECT,
|
||||||
|
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
|
||||||
|
sbus_fd.SBUS_FD_OUTPUT_OBJECT_AND_METADATA,
|
||||||
|
sbus_fd.SBUS_FD_LOGGER, sbus_fd.SBUS_FD_OUTPUT_CONTAINER,
|
||||||
|
sbus_fd.SBUS_FD_OUTPUT_TASK_ID, sbus_fd.SBUS_FD_SERVICE_OUT,
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
class TestFDMetadata(unittest.TestCase):
|
class TestFDMetadata(unittest.TestCase):
|
||||||
@@ -44,28 +54,31 @@ class TestFDMetadata(unittest.TestCase):
|
|||||||
|
|
||||||
|
|
||||||
class TestSBusDatagram(unittest.TestCase):
|
class TestSBusDatagram(unittest.TestCase):
|
||||||
|
def test_check_required_fd_types_not_implemented(self):
|
||||||
|
# SBusDatagram designed not to be called independently
|
||||||
|
with self.assertRaises(NotImplementedError) as err:
|
||||||
|
SBusDatagram('', [], [])
|
||||||
|
self.assertEqual(
|
||||||
|
'SBusDatagram class should not be initialized as bare',
|
||||||
|
err.exception.message)
|
||||||
|
|
||||||
|
def test_invalid_child_class_definition(self):
|
||||||
|
# no definition for _requried_fd_types
|
||||||
|
class InvalidSBusDatagram(SBusDatagram):
|
||||||
|
pass
|
||||||
|
|
||||||
|
with self.assertRaises(NotImplementedError) as err:
|
||||||
|
InvalidSBusDatagram('', [], [])
|
||||||
|
self.assertEqual(
|
||||||
|
'SBusDatagram class should define _required_fd_types',
|
||||||
|
err.exception.message)
|
||||||
|
|
||||||
|
|
||||||
|
class SBusDatagramTestMixin(object):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.command = 'COMMAND'
|
|
||||||
self.types = [sbus_fd.SBUS_FD_SERVICE_OUT,
|
|
||||||
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
|
|
||||||
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
|
|
||||||
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
|
|
||||||
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
|
|
||||||
sbus_fd.SBUS_FD_OUTPUT_TASK_ID,
|
|
||||||
sbus_fd.SBUS_FD_LOGGER,
|
|
||||||
sbus_fd.SBUS_FD_INPUT_OBJECT,
|
|
||||||
sbus_fd.SBUS_FD_INPUT_OBJECT]
|
|
||||||
self.fds = []
|
|
||||||
self.metadata = []
|
|
||||||
for i in xrange(len(self.types)):
|
|
||||||
self.fds.append(i + 1)
|
|
||||||
self.metadata.append(
|
|
||||||
FDMetadata(self.types[i],
|
|
||||||
{'key%d' % i: 'value%d' % i},
|
|
||||||
{'skey%d' % i: 'svalue%d' % i}).to_dict())
|
|
||||||
self.params = {'param1': 'paramvalue1'}
|
self.params = {'param1': 'paramvalue1'}
|
||||||
self.task_id = 'id'
|
self.task_id = 'id'
|
||||||
self.dtg = SBusDatagram(self.command, self.fds, self.metadata,
|
self.dtg = self._test_class(self.command, self.fds, self.metadata,
|
||||||
self.params, self.task_id)
|
self.params, self.task_id)
|
||||||
|
|
||||||
def test_init(self):
|
def test_init(self):
|
||||||
@@ -92,74 +105,175 @@ class TestSBusDatagram(unittest.TestCase):
|
|||||||
res = {'command': self.command,
|
res = {'command': self.command,
|
||||||
'params': self.params,
|
'params': self.params,
|
||||||
'task_id': self.task_id}
|
'task_id': self.task_id}
|
||||||
self.assertEqual(res,
|
self.assertEqual(res, json.loads(self.dtg.serialized_cmd_params))
|
||||||
json.loads(self.dtg.serialized_cmd_params))
|
|
||||||
|
|
||||||
def test_create_service_datagram(self):
|
def test_check_fd_nums(self):
|
||||||
dtg = SBusDatagram.create_service_datagram(
|
with self.assertRaises(ValueError):
|
||||||
self.command, 1, self.params, self.task_id)
|
self.dtg._check_fd_nums([], self.metadata)
|
||||||
self.assertEqual(self.params, dtg.params)
|
|
||||||
self.assertEqual(self.command, dtg.command)
|
|
||||||
self.assertEqual(self.task_id, dtg.task_id)
|
|
||||||
self.assertEqual([1], dtg.fds)
|
|
||||||
self.assertEqual([{'storlets': {'type': sbus_fd.SBUS_FD_SERVICE_OUT},
|
|
||||||
'storage': {}}], dtg.metadata)
|
|
||||||
|
|
||||||
dtg = SBusDatagram.create_service_datagram(
|
|
||||||
self.command, 1)
|
|
||||||
self.assertIsNone(dtg.params)
|
|
||||||
self.assertEqual(self.command, dtg.command)
|
|
||||||
self.assertIsNone(dtg.task_id)
|
|
||||||
self.assertEqual([1], dtg.fds)
|
|
||||||
self.assertEqual([{'storlets': {'type': sbus_fd.SBUS_FD_SERVICE_OUT},
|
|
||||||
'storage': {}}], dtg.metadata)
|
|
||||||
|
|
||||||
def test_find_fds(self):
|
def test_find_fds(self):
|
||||||
self.assertEqual(
|
# prepare all fd types and then pop out in the loop below
|
||||||
[1], self.dtg._find_fds(sbus_fd.SBUS_FD_SERVICE_OUT))
|
not_in_fd_types = ALL_FD_TYPES[:]
|
||||||
self.assertEqual(
|
# N.B. fd should start from 1 (not 0), really?
|
||||||
[2, 3], self.dtg._find_fds(sbus_fd.SBUS_FD_OUTPUT_OBJECT))
|
for index, fd_type in enumerate(self.types, 1):
|
||||||
self.assertEqual(
|
found_fds = self.dtg._find_fds(fd_type)
|
||||||
[], self.dtg._find_fds('DUMMY_TYPE'))
|
# at least 1 fd should be found
|
||||||
|
self.assertTrue(found_fds)
|
||||||
|
# and the index is in the types
|
||||||
|
self.assertIn(index, found_fds)
|
||||||
|
|
||||||
|
if fd_type in not_in_fd_types:
|
||||||
|
# N.B. ALL_FD_TYPES should be unique list
|
||||||
|
not_in_fd_types.remove(fd_type)
|
||||||
|
|
||||||
|
# sanity, not a fd type results in []
|
||||||
|
self.assertEqual([], self.dtg._find_fds('DUMMY_TYPE'))
|
||||||
|
|
||||||
|
# sanity, no other types are found
|
||||||
|
for fd_type in not_in_fd_types:
|
||||||
|
self.assertEqual([], self.dtg._find_fds(fd_type))
|
||||||
|
|
||||||
def test_find_fd(self):
|
def test_find_fd(self):
|
||||||
self.assertEqual(
|
# prepare all fd types and then pop out in the loop below
|
||||||
1, self.dtg._find_fd(sbus_fd.SBUS_FD_SERVICE_OUT))
|
not_in_fd_types = ALL_FD_TYPES[:]
|
||||||
self.assertEqual(
|
# N.B. fd should start from 1 (not 0), really?
|
||||||
2, self.dtg._find_fd(sbus_fd.SBUS_FD_OUTPUT_OBJECT))
|
for index, fd_type in enumerate(self.types, 1):
|
||||||
self.assertIsNone(
|
found_fd = self.dtg._find_fd(fd_type)
|
||||||
self.dtg._find_fd('DUMMY_TYPE'))
|
# at least 1 fd should be found
|
||||||
|
self.assertEqual(index, found_fd)
|
||||||
|
|
||||||
|
if fd_type in not_in_fd_types:
|
||||||
|
# N.B. ALL_FD_TYPES should be unique list
|
||||||
|
not_in_fd_types.remove(fd_type)
|
||||||
|
|
||||||
|
# sanity, not a fd type results in None
|
||||||
|
self.assertIs(None, self.dtg._find_fd('DUMMY_TYPE'))
|
||||||
|
|
||||||
|
# sanity, no other types are found
|
||||||
|
for fd_type in not_in_fd_types:
|
||||||
|
self.assertIs(None, self.dtg._find_fd(fd_type))
|
||||||
|
|
||||||
|
|
||||||
|
class TestSBusServiceDatagram(SBusDatagramTestMixin, unittest.TestCase):
|
||||||
|
_test_class = SBusServiceDatagram
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.command = 'SBUS_CMD_TEST'
|
||||||
|
self.types = [sbus_fd.SBUS_FD_SERVICE_OUT]
|
||||||
|
self.fds = [1]
|
||||||
|
self.metadata = [FDMetadata(sbus_fd.SBUS_FD_SERVICE_OUT).to_dict()]
|
||||||
|
super(TestSBusServiceDatagram, self).setUp()
|
||||||
|
|
||||||
def test_service_out_fd(self):
|
def test_service_out_fd(self):
|
||||||
self.assertEqual(1, self.dtg.service_out_fd)
|
self.assertEqual(1, self.dtg.service_out_fd)
|
||||||
|
|
||||||
|
|
||||||
|
class TestSBusExecuteDatagram(SBusDatagramTestMixin, unittest.TestCase):
|
||||||
|
_test_class = SBusExecuteDatagram
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.command = SBUS_CMD_EXECUTE
|
||||||
|
self.types = [sbus_fd.SBUS_FD_INPUT_OBJECT,
|
||||||
|
sbus_fd.SBUS_FD_OUTPUT_TASK_ID,
|
||||||
|
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
|
||||||
|
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
|
||||||
|
sbus_fd.SBUS_FD_LOGGER]
|
||||||
|
self.fds = [i + 1 for i in range(len(self.types))]
|
||||||
|
self.metadata = [FDMetadata(self.types[i],
|
||||||
|
{'key%d' % i: 'value%d' % i},
|
||||||
|
{'skey%d' % i: 'svalue%d' % i}).to_dict()
|
||||||
|
for i in range(len(self.types))]
|
||||||
|
super(TestSBusExecuteDatagram, self).setUp()
|
||||||
|
|
||||||
|
def test_init_extra_sources(self):
|
||||||
|
types = [sbus_fd.SBUS_FD_INPUT_OBJECT,
|
||||||
|
sbus_fd.SBUS_FD_OUTPUT_TASK_ID,
|
||||||
|
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
|
||||||
|
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
|
||||||
|
sbus_fd.SBUS_FD_LOGGER,
|
||||||
|
sbus_fd.SBUS_FD_INPUT_OBJECT,
|
||||||
|
sbus_fd.SBUS_FD_INPUT_OBJECT,
|
||||||
|
sbus_fd.SBUS_FD_INPUT_OBJECT]
|
||||||
|
fds = [i + 1 for i in xrange(len(types))]
|
||||||
|
metadata = [FDMetadata(types[i],
|
||||||
|
{'key%d' % i: 'value%d' % i},
|
||||||
|
{'skey%d' % i: 'svalue%d' % i}).to_dict()
|
||||||
|
for i in xrange(len(types))]
|
||||||
|
dtg = self._test_class(
|
||||||
|
self.command, fds, metadata, self.params, self.task_id)
|
||||||
|
self.assertEqual(dtg.fds, fds)
|
||||||
|
self.assertEqual(dtg.metadata, metadata)
|
||||||
|
self.assertEqual(dtg.params, self.params)
|
||||||
|
self.assertEqual(dtg.task_id, self.task_id)
|
||||||
|
|
||||||
def test_object_out_fds(self):
|
def test_object_out_fds(self):
|
||||||
self.assertEqual([2, 3], self.dtg.object_out_fds)
|
self.assertEqual([3], self.dtg.object_out_fds)
|
||||||
|
|
||||||
def test_object_metadata_out_fds(self):
|
def test_object_metadata_out_fds(self):
|
||||||
self.assertEqual([4, 5], self.dtg.object_metadata_out_fds)
|
self.assertEqual([4], self.dtg.object_metadata_out_fds)
|
||||||
|
|
||||||
def test_task_id_out_fd(self):
|
def test_task_id_out_fd(self):
|
||||||
self.assertEqual(6, self.dtg.task_id_out_fd)
|
self.assertEqual(2, self.dtg.task_id_out_fd)
|
||||||
|
|
||||||
def test_logger_out_fd(self):
|
def test_logger_out_fd(self):
|
||||||
self.assertEqual(7, self.dtg.logger_out_fd)
|
self.assertEqual(5, self.dtg.logger_out_fd)
|
||||||
|
|
||||||
def test_object_in_fds(self):
|
def test_object_in_fds(self):
|
||||||
self.assertEqual([8, 9], self.dtg.object_in_fds)
|
self.assertEqual([1], self.dtg.object_in_fds)
|
||||||
|
|
||||||
def test_build_from_raw_message(self):
|
|
||||||
str_metadata = json.dumps(self.metadata)
|
|
||||||
str_cmd_params = json.dumps(self.dtg.cmd_params)
|
|
||||||
|
|
||||||
dtg = SBusDatagram.build_from_raw_message(
|
class TestBuildDatagramFromRawMessage(unittest.TestCase):
|
||||||
self.fds, str_metadata, str_cmd_params)
|
|
||||||
|
|
||||||
self.assertEqual(self.command, dtg.command)
|
def test_build_datagram_from_raw_message(self):
|
||||||
self.assertEqual(self.fds, dtg.fds)
|
# SBusServiceDatagram scenario
|
||||||
self.assertEqual(self.metadata, dtg.metadata)
|
command = SBUS_CMD_PING
|
||||||
self.assertEqual(self.params, dtg.params)
|
types = [sbus_fd.SBUS_FD_SERVICE_OUT]
|
||||||
self.assertEqual(self.task_id, dtg.task_id)
|
fds = [1]
|
||||||
|
metadata = [FDMetadata(sbus_fd.SBUS_FD_SERVICE_OUT).to_dict()]
|
||||||
|
params = {'param1': 'paramvalue1'}
|
||||||
|
task_id = 'id'
|
||||||
|
cmd_params = {'command': command, 'params': params, 'task_id': task_id}
|
||||||
|
|
||||||
|
str_metadata = json.dumps(metadata)
|
||||||
|
str_cmd_params = json.dumps(cmd_params)
|
||||||
|
dtg = build_datagram_from_raw_message(fds, str_metadata,
|
||||||
|
str_cmd_params)
|
||||||
|
|
||||||
|
self.assertEqual(command, dtg.command)
|
||||||
|
self.assertEqual(fds, dtg.fds)
|
||||||
|
self.assertEqual(metadata, dtg.metadata)
|
||||||
|
self.assertEqual(params, dtg.params)
|
||||||
|
self.assertEqual(task_id, dtg.task_id)
|
||||||
|
|
||||||
|
# SBusExecuteDatagram scenario
|
||||||
|
command = SBUS_CMD_EXECUTE
|
||||||
|
types = [sbus_fd.SBUS_FD_INPUT_OBJECT,
|
||||||
|
sbus_fd.SBUS_FD_OUTPUT_TASK_ID,
|
||||||
|
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
|
||||||
|
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
|
||||||
|
sbus_fd.SBUS_FD_LOGGER,
|
||||||
|
sbus_fd.SBUS_FD_INPUT_OBJECT,
|
||||||
|
sbus_fd.SBUS_FD_INPUT_OBJECT,
|
||||||
|
sbus_fd.SBUS_FD_INPUT_OBJECT]
|
||||||
|
fds = [i + 1 for i in xrange(len(types))]
|
||||||
|
metadata = [FDMetadata(types[i],
|
||||||
|
{'key%d' % i: 'value%d' % i},
|
||||||
|
{'skey%d' % i: 'svalue%d' % i}).to_dict()
|
||||||
|
for i in xrange(len(types))]
|
||||||
|
params = {'param1': 'paramvalue1'}
|
||||||
|
task_id = 'id'
|
||||||
|
cmd_params = {'command': command, 'params': params, 'task_id': task_id}
|
||||||
|
|
||||||
|
str_metadata = json.dumps(metadata)
|
||||||
|
str_cmd_params = json.dumps(cmd_params)
|
||||||
|
dtg = build_datagram_from_raw_message(fds, str_metadata,
|
||||||
|
str_cmd_params)
|
||||||
|
|
||||||
|
self.assertEqual(command, dtg.command)
|
||||||
|
self.assertEqual(fds, dtg.fds)
|
||||||
|
self.assertEqual(metadata, dtg.metadata)
|
||||||
|
self.assertEqual(params, dtg.params)
|
||||||
|
self.assertEqual(task_id, dtg.task_id)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
Reference in New Issue
Block a user