Use SBusClient for execute command

This patch is the final refactoring work to have client library
for sbus communication, and port current implementaion about
execute command from runtime to SBusClient.

This patch makes execute command rely on the same communication
as the other commands, which happens on SERVICE_OUT_FD, and remove
TASK_ID_OUT_FD which is used only for returning task id.
This allows us to avoid special handling for execute command.

Change-Id: I00dc7b3004512624f9aaf120420f910e451cac89
This commit is contained in:
Takashi Kajinami 2019-07-20 23:54:52 +09:00
parent 821570021b
commit 826be15b6a
19 changed files with 347 additions and 283 deletions

View File

@ -36,10 +36,13 @@ public abstract class SAbstractTask {
this.logger = logger;
}
protected boolean respond(OutputStream ostream, boolean status, String message) {
protected boolean respond(OutputStream ostream, boolean status, String message, String taskid) {
JSONObject obj = new JSONObject();
obj.put("status", status);
obj.put("message", message);
if ( taskid != null ) {
obj.put("task_id", taskid);
}
boolean bStatus = true;
try {
ostream.write(obj.toJSONString().getBytes());

View File

@ -63,7 +63,7 @@ public class SCancelTask extends SAbstractTask {
respMessage = new String("Task id " + this.taskId_
+ "is not found");
}
return respond(this.sOut_, respStatus, respMessage);
return respond(this.sOut_, respStatus, respMessage, null);
}
}
/* ============================== END OF FILE =============================== */

View File

@ -21,6 +21,7 @@ import java.io.OutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.UUID;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.Logger;
@ -71,7 +72,9 @@ public class SExecutionManager {
public String submitTask(final SExecutionTask sTask) {
Future futureTask = threadPool_.submit(sTask);
String taskId = futureTask.toString().split("@")[1];
UUID uuid = UUID.randomUUID();
String taskId = uuid.toString().substring(8);
synchronized (this.taskIdToTask_) {
this.taskIdToTask_.put(taskId, futureTask);

View File

@ -22,8 +22,8 @@ import org.slf4j.Logger;
import org.openstack.storlet.common.*;
import org.openstack.storlet.daemon.SExecutionManager;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.io.IOException;
import java.io.OutputStream;
@ -36,26 +36,26 @@ import java.io.OutputStream;
public class SExecutionTask extends SAbstractTask implements Runnable {
private StorletLogger storletLogger_ = null;
private IStorlet storlet_ = null;
private OutputStream sOut_ = null;
private ArrayList<StorletInputStream> inStreams_ = null;
private ArrayList<StorletOutputStream> outStreams_ = null;
private HashMap<String, String> executionParams_ = null;
private OutputStream taskIdOut_ = null;
private String taskId_ = null;
private SExecutionManager sExecManager_ = null;
public SExecutionTask(IStorlet storlet,
public SExecutionTask(IStorlet storlet, OutputStream sOut,
ArrayList<StorletInputStream> instreams,
ArrayList<StorletOutputStream> outstreams, OutputStream taskIdOut,
ArrayList<StorletOutputStream> outstreams,
HashMap<String, String> executionParams,
StorletLogger storletLogger, Logger logger,
SExecutionManager sExecManager) {
super(logger);
this.storlet_ = storlet;
this.sOut_ = sOut;
this.inStreams_ = instreams;
this.outStreams_ = outstreams;
this.executionParams_ = executionParams;
this.storletLogger_ = storletLogger;
this.taskIdOut_ = taskIdOut;
this.sExecManager_ = sExecManager;
}
@ -92,20 +92,7 @@ public class SExecutionTask extends SAbstractTask implements Runnable {
public boolean exec() {
boolean bStatus = true;
this.taskId_ = this.sExecManager_.submitTask((SExecutionTask) this);
try {
this.taskIdOut_.write(this.taskId_.getBytes());
} catch (IOException e) {
e.printStackTrace();
bStatus = false;
} finally {
try{
this.taskIdOut_.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return bStatus;
return respond(this.sOut_, true, new String("OK"), this.taskId_);
}
@Override

View File

@ -46,7 +46,7 @@ public class SHaltTask extends SAbstractTask {
* */
@Override
public boolean exec() {
respond(this.sOut_, true, new String("OK"));
respond(this.sOut_, true, new String("OK"), null);
return false;
}
}

View File

@ -46,7 +46,7 @@ public class SPingTask extends SAbstractTask {
* */
@Override
public boolean exec() {
return respond(this.sOut_, true, new String("OK"));
return respond(this.sOut_, true, new String("OK"), null);
}
}
/* ============================== END OF FILE =============================== */

View File

@ -86,14 +86,14 @@ public class STaskFactory {
int nFiles = dtg.getNFiles();
HashMap<String, HashMap<String, String>>[] FilesMD = dtg.getFilesMetadata();
this.logger_.trace("StorletTask: Got " + nFiles + " fds");
OutputStream taskIdOut = null;
OutputStream sOut = null;
for (int i = 0; i < nFiles; ++i) {
HashMap<String, String> storletsMetadata = FilesMD[i].get("storlets");
HashMap<String, String> storageMetadata = FilesMD[i].get("storage");
FileDescriptor fd = dtg.getFiles()[i];
String strFDtype = storletsMetadata.get("type");
if (strFDtype.equals("SBUS_FD_OUTPUT_TASK_ID")) {
taskIdOut = new FileOutputStream(fd);
if (strFDtype.equals("SBUS_FD_SERVICE_OUT")) {
sOut = new FileOutputStream(fd);
} else if (strFDtype.equals("SBUS_FD_INPUT_OBJECT")) {
this.logger_.trace("createStorletTask: fd " + i
+ " is of type SBUS_FD_INPUT_OBJECT");
@ -147,7 +147,7 @@ public class STaskFactory {
this.logger_.error("createStorletTask: fd " + i
+ " is of unknown type " + strFDtype);
}
return new SExecutionTask(storlet_, inStreams, outStreams, taskIdOut,
return new SExecutionTask(storlet_, sOut, inStreams, outStreams,
dtg.getExecParams(), storletLogger, logger_, sExecManager);
}

View File

@ -28,13 +28,14 @@ class CommandResponse(Exception):
The result of command execution
"""
def __init__(self, status, message, iterable=True):
def __init__(self, status, message, iterable=True, task_id=None):
"""
Constract CommandResponse instance
:param status: task status
:param message: message to be returned and logged
:param iterable: wheter we can keep SDaemon process running
:param task_id: ID assigned to the requested task
"""
self.status = status
self.message = message
@ -43,9 +44,14 @@ class CommandResponse(Exception):
# exit or not as a result of processing the command
self.iterable = iterable
self.task_id = task_id
@property
def report_message(self):
return json.dumps({'status': self.status, 'message': self.message})
rsp = {'status': self.status, 'message': self.message}
if self.task_id:
rsp['task_id'] = self.task_id
return json.dumps(rsp)
CommandSuccess = partial(CommandResponse, True)
@ -119,16 +125,9 @@ class SBusServer(object):
self.logger.info('Command:%s Response:%s' %
(command, resp.report_message))
try:
outfd = dtg.service_out_fd
with os.fdopen(outfd, 'wb') as outfile:
self._respond(outfile, resp)
except AttributeError:
# TODO(takashi): Currently we return response via service out fd
# only for service commands, but to be more
# consistent, we should do the same for execute
# command
pass
outfd = dtg.service_out_fd
with os.fdopen(outfd, 'wb') as outfile:
self._respond(outfile, resp)
return resp.iterable

View File

@ -147,19 +147,38 @@ class StorletDaemon(SBusServer):
else:
return StorletInputFile(in_md, in_fd)
def _safe_close(self, fileno):
try:
os.close(fileno)
except OSError as e:
if e.errno != errno.EBADF:
raise
pass
@command_handler
def execute(self, dtg):
task_id_out_fd = dtg.task_id_out_fd
task_id = str(uuid.uuid4())[:8]
while len(self.task_id_to_pid) >= self.pool_size:
self._wait_child_process()
self.logger.debug('Returning task_id: %s ' % task_id)
with os.fdopen(task_id_out_fd, 'wb') as outfile:
outfile.write(task_id.encode("utf-8"))
pid = os.fork()
if pid:
self.logger.debug('Create a subprocess %d for task %s' %
(pid, task_id))
self.task_id_to_pid[task_id] = pid
for fd in dtg.invocation_fds:
# We do not use invocation fds in main process, so close them
self._safe_close(fd)
else:
self._safe_close(dtg.service_out_fd)
self._execute(dtg)
sys.exit()
return CommandSuccess('OK', task_id=task_id)
def _execute(self, dtg):
storlet_md = dtg.object_in_storlet_metadata
params = dtg.params
in_md = dtg.object_in_metadata
@ -168,50 +187,28 @@ class StorletDaemon(SBusServer):
out_fds = dtg.object_out_fds
logger_fd = dtg.logger_out_fd
pid = os.fork()
if pid:
self.logger.debug('Create a subprocess %d for task %s' %
(pid, task_id))
self.task_id_to_pid[task_id] = pid
self.logger.debug('Start storlet invocation')
self.logger.debug(
'in_fds:%s in_md:%s out_md_fds:%s out_fds:%s logger_fd: %s'
% (in_fds, in_md, out_md_fds, out_fds, logger_fd))
for fd in dtg.fds:
# We do not use fds in main process, so close them
try:
os.close(fd)
except OSError as e:
if e.errno != errno.EBADF:
raise
pass
else:
try:
self.logger.debug('Start storlet invocation')
in_files = [self._create_input_file(st_md, md, in_fd)
for st_md, md, in_fd in zip(storlet_md, in_md, in_fds)]
self.logger.debug('in_fds:%s in_md:%s out_md_fds:%s out_fds:%s'
' logger_fd: %s'
% (in_fds, in_md, out_md_fds, out_fds,
logger_fd))
out_files = [StorletOutputFile(out_md_fd, out_fd)
for out_md_fd, out_fd in zip(out_md_fds, out_fds)]
in_files = [self._create_input_file(st_md, md, in_fd)
for st_md, md, in_fd
in zip(storlet_md, in_md, in_fds)]
out_files = [StorletOutputFile(out_md_fd, out_fd)
for out_md_fd, out_fd
in zip(out_md_fds, out_fds)]
self.logger.debug('Start storlet execution')
with StorletLogger(self.storlet_name, logger_fd) as slogger:
handler = self.storlet_cls(slogger)
handler(in_files, out_files, params)
self.logger.debug('Completed')
except Exception:
self.logger.exception('Error in storlet invocation')
finally:
# Make sure that all fds are closed
self._safe_close_files(in_files)
self._safe_close_files(out_files)
sys.exit()
return CommandSuccess('OK')
try:
self.logger.debug('Start storlet execution')
with StorletLogger(self.storlet_name, logger_fd) as slogger:
handler = self.storlet_cls(slogger)
handler(in_files, out_files, params)
self.logger.debug('Completed')
except Exception:
self.logger.exception('Error in storlet invocation')
finally:
# Make sure that all fds are closed
self._safe_close_files(in_files + out_files)
@command_handler
def cancel(self, dtg):

View File

@ -26,12 +26,10 @@ import eventlet
import json
from contextlib import contextmanager
from storlets.sbus import SBus
from storlets.sbus.command import SBUS_CMD_EXECUTE
from storlets.sbus.datagram import SBusFileDescriptor, SBusExecuteDatagram
from storlets.sbus import file_description as sbus_fd
from storlets.sbus.client import SBusClient
from storlets.sbus.client.exceptions import SBusClientException
from storlets.sbus.datagram import SBusFileDescriptor
from storlets.sbus import file_description as sbus_fd
from storlets.gateway.common.exceptions import StorletRuntimeException, \
StorletTimeout
from storlets.gateway.common.logger import StorletLogger
@ -503,8 +501,6 @@ class StorletInvocationProtocol(object):
self.data_write_fd = None
self.metadata_read_fd = None
self.metadata_write_fd = None
self.taskid_read_fd = None
self.taskid_write_fd = None
self.task_id = None
self._input_data_read_fd = None
self._input_data_write_fd = None
@ -546,8 +542,6 @@ class StorletInvocationProtocol(object):
self.input_data_read_fd,
storage_metadata=self.srequest.user_metadata,
storlets_metadata=storlets_metadata),
SBusFileDescriptor(sbus_fd.SBUS_FD_OUTPUT_TASK_ID,
self.taskid_write_fd),
SBusFileDescriptor(sbus_fd.SBUS_FD_OUTPUT_OBJECT,
self.data_write_fd),
SBusFileDescriptor(sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
@ -585,7 +579,6 @@ class StorletInvocationProtocol(object):
if not self.srequest.has_fd:
self._input_data_read_fd, self._input_data_write_fd = os.pipe()
self.data_read_fd, self.data_write_fd = os.pipe()
self.taskid_read_fd, self.taskid_write_fd = os.pipe()
self.metadata_read_fd, self.metadata_write_fd = os.pipe()
for source in self.extra_data_sources:
@ -612,8 +605,7 @@ class StorletInvocationProtocol(object):
"""
Close all of the container side descriptors
"""
fds = [self.data_write_fd, self.metadata_write_fd,
self.taskid_write_fd]
fds = [self.data_write_fd, self.metadata_write_fd]
if not self.srequest.has_fd:
fds.append(self.input_data_read_fd)
fds.extend([source['read_fd'] for source in self.extra_data_sources])
@ -624,8 +616,7 @@ class StorletInvocationProtocol(object):
"""
Close all of the host side descriptors
"""
fds = [self.data_read_fd, self.metadata_read_fd,
self.taskid_read_fd]
fds = [self.data_read_fd, self.metadata_read_fd]
fds.extend([source['write_fd'] for source in self.extra_data_sources])
self._safe_close(fds)
@ -648,25 +639,23 @@ class StorletInvocationProtocol(object):
with self.storlet_logger.activate(),\
self._activate_invocation_descriptors():
self._send_execute_command()
self._wait_for_read_with_timeout(self.taskid_read_fd)
# TODO(kota_): need an assertion for task_id format
self.task_id = os.read(self.taskid_read_fd, 10)
if not isinstance(self.task_id, str):
self.task_id = self.task_id.decode('utf-8')
os.close(self.taskid_read_fd)
def _send_execute_command(self):
"""
Send execute command to the remote daemon factory to invoke storlet
execution
"""
dtg = SBusExecuteDatagram(
SBUS_CMD_EXECUTE,
self.remote_fds,
self.srequest.params)
rc = SBus.send(self.storlet_pipe_path, dtg)
client = SBusClient(self.storlet_pipe_path)
try:
resp = client.execute(self.srequest.params, self.remote_fds)
if not resp.status:
raise StorletRuntimeException("Failed to send execute command")
if (rc < 0):
if not resp.task_id:
raise StorletRuntimeException("Missing task id")
else:
self.task_id = resp.task_id
except SBusClientException:
raise StorletRuntimeException("Failed to send execute command")
def _wait_for_read_with_timeout(self, fd):

View File

@ -15,25 +15,25 @@
import json
import os
from storlets.sbus import SBus
from storlets.sbus.command import SBUS_CMD_CANCEL, SBUS_CMD_DAEMON_STATUS, \
SBUS_CMD_HALT, SBUS_CMD_PING, SBUS_CMD_START_DAEMON, \
SBUS_CMD_STOP_DAEMON, SBUS_CMD_STOP_DAEMONS
from storlets.sbus.datagram import SBusFileDescriptor, SBusServiceDatagram
from storlets.sbus import command as sbus_cmd
from storlets.sbus.datagram import SBusFileDescriptor, build_datagram
from storlets.sbus.file_description import SBUS_FD_SERVICE_OUT
from storlets.sbus.client.exceptions import SBusClientIOError, \
SBusClientMalformedResponse, SBusClientSendError
class SBusResponse(object):
def __init__(self, status, message):
def __init__(self, status, message, task_id=None):
"""
Construct SBusResponse class
:param status: Whether the server succeed to process the given request
:param message: Messages to describe the process result
:param task_id: Id assigned to each tasks
"""
self.status = status
self.message = message
self.task_id = task_id
class SBusClient(object):
@ -54,16 +54,22 @@ class SBusClient(object):
message = resp['message']
except (ValueError, KeyError):
raise SBusClientMalformedResponse('Got malformed response')
return SBusResponse(status, message)
def _request(self, command, params=None, task_id=None):
# NOTE(takashi): task_id is currently used only in EXECUTE command, so
# we don't fail here even if the given response doesn't
# have task_id.
task_id = resp.get('task_id')
return SBusResponse(status, message, task_id)
def _request(self, command, params=None, task_id=None, extra_fds=None):
read_fd, write_fd = os.pipe()
try:
try:
datagram = SBusServiceDatagram(
command,
[SBusFileDescriptor(SBUS_FD_SERVICE_OUT, write_fd)],
params, task_id)
sfds = \
[SBusFileDescriptor(SBUS_FD_SERVICE_OUT, write_fd)] + \
(extra_fds or [])
datagram = build_datagram(command, sfds, params, task_id)
rc = SBus.send(self.socket_path, datagram)
if rc < 0:
raise SBusClientSendError(
@ -79,8 +85,8 @@ class SBusClient(object):
try:
buf = os.read(read_fd, self.chunk_size)
except IOError:
raise SBusClientIOError('Failed to read data from read '
'pipe')
raise SBusClientIOError(
'Failed to read data from read pipe')
if not buf:
break
reply = reply + buf
@ -92,12 +98,13 @@ class SBusClient(object):
return self._parse_response(reply)
def execute(self, *args, **kwargs):
# TODO(takashi): implement this
raise NotImplementedError('Execute command is not supported yet')
def execute(self, invocation_params, invocation_fds):
return self._request(sbus_cmd.SBUS_CMD_EXECUTE,
params=invocation_params,
extra_fds=invocation_fds)
def ping(self):
return self._request(SBUS_CMD_PING)
return self._request(sbus_cmd.SBUS_CMD_PING)
def start_daemon(self, language, storlet_path, storlet_id,
uds_path, log_level, pool_size,
@ -108,21 +115,21 @@ class SBusClient(object):
if language_version:
params['daemon_language_version'] = language_version
return self._request(SBUS_CMD_START_DAEMON, params)
return self._request(sbus_cmd.SBUS_CMD_START_DAEMON, params)
def stop_daemon(self, storlet_name):
return self._request(SBUS_CMD_STOP_DAEMON,
return self._request(sbus_cmd.SBUS_CMD_STOP_DAEMON,
{'storlet_name': storlet_name})
def stop_daemons(self):
return self._request(SBUS_CMD_STOP_DAEMONS)
return self._request(sbus_cmd.SBUS_CMD_STOP_DAEMONS)
def halt(self):
return self._request(SBUS_CMD_HALT)
return self._request(sbus_cmd.SBUS_CMD_HALT)
def daemon_status(self, storlet_name):
return self._request(SBUS_CMD_DAEMON_STATUS,
return self._request(sbus_cmd.SBUS_CMD_DAEMON_STATUS,
{'storlet_name': storlet_name})
def cancel(self, task_id):
return self._request(SBUS_CMD_CANCEL, task_id=task_id)
return self._request(sbus_cmd.SBUS_CMD_CANCEL, task_id=task_id)

View File

@ -15,8 +15,8 @@
import copy
import json
from storlets.sbus import command as sbus_cmd
from storlets.sbus import file_description as sbus_fd
from storlets.sbus.command import SBUS_CMD_EXECUTE
class SBusFileDescriptor(object):
@ -39,11 +39,10 @@ class SBusFileDescriptor(object):
'storage': self.storage_metadata}
@classmethod
def from_metadata_dict(cls, metadict):
def from_fileno_and_metadata_dict(cls, fileno, metadict):
_metadict = copy.deepcopy(metadict)
storlets_metadata = _metadict['storlets']
storage_metadata = _metadict['storage']
fileno = _metadict['fileno']
fdtype = storlets_metadata.pop('type')
return cls(fdtype, fileno, storlets_metadata, storage_metadata)
@ -55,7 +54,7 @@ class SBusDatagram(object):
# Each child Datagram should define what fd types are expected with
# list format
_required_fd_types = None
_required_fdtypes = None
def __init__(self, command, sfds, params=None, task_id=None):
"""
@ -72,8 +71,8 @@ class SBusDatagram(object):
raise NotImplementedError(
'SBusDatagram class should not be initialized as bare')
self.command = command
fd_types = [sfd.fdtype for sfd in sfds]
self._check_required_fd_types(fd_types)
fdtypes = [sfd.fdtype for sfd in sfds]
self._check_required_fdtypes(fdtypes)
self.sfds = sfds
self.params = params
self.task_id = task_id
@ -145,17 +144,17 @@ class SBusDatagram(object):
# fd validation.
return ret[0]
def _check_required_fd_types(self, given_fd_types):
if self._required_fd_types is None:
def _check_required_fdtypes(self, given_fdtypes):
if self._required_fdtypes is None:
raise NotImplementedError(
'SBusDatagram class should define _required_fd_types')
# the first len(self._required_fd_types) types should be fit
'SBusDatagram class should define _required_fdtypes')
# the first len(self._required_fdtypes) types should be fit
# to the required list
if given_fd_types[:len(self._required_fd_types)] != \
self._required_fd_types:
raise ValueError('Fd type mismatch given_fd_types:%s \
required_fd_types:%s' %
(given_fd_types, self._required_fd_types))
if given_fdtypes[:len(self._required_fdtypes)] != \
self._required_fdtypes:
raise ValueError('Fd type mismatch given_fdtypes:%s \
required_fdtypes:%s' %
(given_fdtypes, self._required_fdtypes))
def __str__(self):
return 'num_fds=%s, md=%s, cmd_params=%s' % (
@ -179,7 +178,7 @@ class SBusServiceDatagram(SBusDatagram):
- SBUS_CMD_PING
- SBUS_CMD_CANCEL
"""
_required_fd_types = [sbus_fd.SBUS_FD_SERVICE_OUT]
_required_fdtypes = [sbus_fd.SBUS_FD_SERVICE_OUT]
def __init__(self, command, sfds, params=None, task_id=None):
super(SBusServiceDatagram, self).__init__(
@ -191,11 +190,11 @@ class SBusServiceDatagram(SBusDatagram):
class SBusExecuteDatagram(SBusDatagram):
_required_fd_types = [sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_TASK_ID,
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
sbus_fd.SBUS_FD_LOGGER]
_required_fdtypes = [sbus_fd.SBUS_FD_SERVICE_OUT,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
sbus_fd.SBUS_FD_LOGGER]
def __init__(self, command, sfds, params=None, task_id=None):
# TODO(kota_): the args command is not used in ExecuteDatagram
@ -207,15 +206,23 @@ class SBusExecuteDatagram(SBusDatagram):
# this implementation is based on the idea that
# we only have extra input sources, which is
# added at the end of fd list
extra_fd_types = [sfd.fdtype for sfd in
sfds[len(self._required_fd_types):]]
extra_fdtypes = [sfd.fdtype for sfd in
sfds[len(self._required_fdtypes):]]
if [t for t in extra_fd_types if t != sbus_fd.SBUS_FD_INPUT_OBJECT]:
if [t for t in extra_fdtypes if t != sbus_fd.SBUS_FD_INPUT_OBJECT]:
raise ValueError(
'Extra data should be SBUS_FD_INPUT_OBJECT')
super(SBusExecuteDatagram, self).__init__(
SBUS_CMD_EXECUTE, sfds, params, task_id)
sbus_cmd.SBUS_CMD_EXECUTE, sfds, params, task_id)
@property
def invocation_fds(self):
return [sfd.fileno for sfd in self.sfds[1:]]
@property
def service_out_fd(self):
return self._find_fd(sbus_fd.SBUS_FD_SERVICE_OUT)
@property
def object_out_fds(self):
@ -225,10 +232,6 @@ class SBusExecuteDatagram(SBusDatagram):
def object_metadata_out_fds(self):
return self._find_fds(sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA)
@property
def task_id_out_fd(self):
return self._find_fd(sbus_fd.SBUS_FD_OUTPUT_TASK_ID)
@property
def logger_out_fd(self):
return self._find_fd(sbus_fd.SBUS_FD_LOGGER)
@ -238,6 +241,15 @@ class SBusExecuteDatagram(SBusDatagram):
return self._find_fds(sbus_fd.SBUS_FD_INPUT_OBJECT)
def build_datagram(command, sfds, params, task_id):
if command == sbus_cmd.SBUS_CMD_EXECUTE:
dtg_cls = SBusExecuteDatagram
else:
dtg_cls = SBusServiceDatagram
return dtg_cls(command, sfds, params, task_id)
def build_datagram_from_raw_message(fds, str_md, str_cmd_params):
"""
Build SBusDatagram from raw message received over sbus
@ -256,11 +268,6 @@ def build_datagram_from_raw_message(fds, str_md, str_cmd_params):
if len(fds) != len(metadata):
raise ValueError('Length mismatch fds: %d != md %d' %
(len(fds), len(metadata)))
sfds = []
for fileno, md in zip(fds, metadata):
md['fileno'] = fileno
sfds.append(SBusFileDescriptor.from_metadata_dict(md))
if command == SBUS_CMD_EXECUTE:
return SBusExecuteDatagram(command, sfds, params, task_id)
return SBusServiceDatagram(command, sfds, params, task_id)
sfds = [SBusFileDescriptor.from_fileno_and_metadata_dict(fileno, md)
for (fileno, md) in zip(fds, metadata)]
return build_datagram(command, sfds, params, task_id)

View File

@ -19,5 +19,4 @@ SBUS_FD_OUTPUT_OBJECT_METADATA = 'SBUS_FD_OUTPUT_OBJECT_METADATA'
SBUS_FD_OUTPUT_OBJECT_AND_METADATA = 'SBUS_FD_OUTPUT_OBJECT_AND_METADATA'
SBUS_FD_LOGGER = 'SBUS_FD_LOGGER'
SBUS_FD_OUTPUT_CONTAINER = 'SBUS_FD_OUTPUT_CONTAINER'
SBUS_FD_OUTPUT_TASK_ID = 'SBUS_FD_OUTPUT_TASK_ID'
SBUS_FD_SERVICE_OUT = 'SBUS_FD_SERVICE_OUT'

View File

@ -36,13 +36,6 @@ def with_tempdir(f):
return wrapped
class MockSBus(object):
@classmethod
def send(self, path, datagram):
# return success code
return 0
class FakeLogger(object):
def __init__(self, *args, **kwargs):
self._log_lines = defaultdict(list)

View File

@ -30,17 +30,29 @@ class TestCommandResponse(unittest.TestCase):
self.assertTrue(resp.status)
self.assertEqual('ok', resp.message)
self.assertTrue(resp.iterable)
self.assertIsNone(resp.task_id)
resp = CommandResponse(False, 'error', False)
self.assertFalse(resp.status)
self.assertEqual('error', resp.message)
self.assertFalse(resp.iterable)
self.assertIsNone(resp.task_id)
resp = CommandResponse(True, 'ok', task_id='foo')
self.assertTrue(resp.status)
self.assertEqual('ok', resp.message)
self.assertTrue(resp.iterable)
self.assertEqual('foo', resp.task_id)
def test_report_message(self):
resp = CommandResponse(True, 'msg', True)
self.assertEqual({'status': True, 'message': 'msg'},
json.loads(resp.report_message))
resp = CommandResponse(True, 'msg', True, 'foo')
self.assertEqual({'status': True, 'message': 'msg', 'task_id': 'foo'},
json.loads(resp.report_message))
class TestCommandSuccess(unittest.TestCase):
def test_init(self):

View File

@ -33,7 +33,6 @@ from tests.unit import FakeLogger
from tests.unit.gateway.gateways import FakeFileManager
from storlets.gateway.gateways.docker.gateway import DockerStorletRequest, \
StorletGatewayDocker
from tests.unit import MockSBus
class MockInternalClient(object):
@ -461,11 +460,9 @@ use = egg:swift#catch_errors
# TODO(kota_): need more efficient way for emuration of return value
# from SDaemon
value_generator = iter([
# Forth is return value for invoking as task_id
'This is task id',
# Fifth is for getting meta
# first, we get metadata json
json.dumps({'metadata': 'return'}),
# At last return body and EOF
# then we get object data
'something', '',
])
@ -496,7 +493,6 @@ use = egg:swift#catch_errors
# os.read -> mock reading the file descriptor from container
# select.slect -> mock fd communication which can be readable
@mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient')
@mock.patch('storlets.gateway.gateways.docker.runtime.SBus', MockSBus)
@mock.patch('storlets.gateway.gateways.docker.runtime.os.read',
mock_read)
@mock.patch('storlets.gateway.gateways.docker.runtime.os.close',
@ -509,6 +505,8 @@ use = egg:swift#catch_errors
client.ping.return_value = SBusResponse(True, 'OK')
client.stop_daemon.return_value = SBusResponse(True, 'OK')
client.start_daemon.return_value = SBusResponse(True, 'OK')
client.execute.return_value = SBusResponse(True, 'OK', 'someid')
sresp = self.gateway.invocation_flow(st_req, extra_sources)
eventlet.sleep(0.1)
file_like = FileLikeIter(sresp.data_iter)

View File

@ -34,14 +34,6 @@ from tests.unit import FakeLogger, with_tempdir
from tests.unit.gateway.gateways import FakeFileManager
@contextmanager
def _mock_sbus(send_status=0):
with mock.patch('storlets.gateway.gateways.docker.runtime.'
'SBus.send') as fake_send:
fake_send.return_value = send_status
yield
@contextmanager
def _mock_os_pipe(bufs):
class FakeFd(object):
@ -418,13 +410,39 @@ class TestStorletInvocationProtocol(unittest.TestCase):
except OSError:
pass
def test_invocation_protocol(self):
# os.pipe will be called 4 times
pipe_called = 4
def test_send_execute_command(self):
with mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient.'
'execute') as execute:
execute.return_value = SBusResponse(True, 'OK', 'someid')
self.protocol._send_execute_command()
self.assertEqual('someid', self.protocol.task_id)
with _mock_sbus(0), _mock_os_pipe([''] * pipe_called) as pipes:
with mock.patch.object(
self.protocol, '_wait_for_read_with_timeout'):
with mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient.'
'execute') as execute:
execute.return_value = SBusResponse(True, 'OK')
with self.assertRaises(StorletRuntimeException):
self.protocol._send_execute_command()
with mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient.'
'execute') as execute:
execute.return_value = SBusResponse(False, 'NG', 'someid')
with self.assertRaises(StorletRuntimeException):
self.protocol._send_execute_command()
with mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient.'
'execute') as execute:
execute.side_effect = SBusClientIOError()
with self.assertRaises(StorletRuntimeException):
self.protocol._send_execute_command()
def test_invocation_protocol(self):
# os.pipe will be called 3 times
pipe_called = 3
with _mock_os_pipe([''] * pipe_called) as pipes:
with mock.patch.object(self.protocol,
'_wait_for_read_with_timeout'), \
mock.patch.object(self.protocol, '_send_execute_command'):
self.protocol._invoke()
self.assertEqual(pipe_called, len(pipes))
@ -441,11 +459,6 @@ class TestStorletInvocationProtocol(unittest.TestCase):
self.assertFalse(data_read_fd.closed)
self.assertTrue(data_write_fd.closed)
# both execution str fds are closed
execution_read_fd, execution_write_fd = next(pipes)
self.assertTrue(execution_read_fd.closed)
self.assertTrue(execution_write_fd.closed)
# metadata write fd is closed, metadata read fd is still open.
metadata_read_fd, metadata_write_fd = next(pipes)
self.assertFalse(metadata_read_fd.closed)
@ -455,12 +468,12 @@ class TestStorletInvocationProtocol(unittest.TestCase):
self.assertRaises(StopIteration, next, pipes)
def test_invocation_protocol_remote_fds(self):
# In default, we have 5 fds in remote_fds
# In default, we have 4 fds in remote_fds
storlet_request = DockerStorletRequest(
self.storlet_id, {}, {}, iter(StringIO()), options=self.options)
protocol = StorletInvocationProtocol(
storlet_request, self.pipe_path, self.log_file, 1, self.logger)
self.assertEqual(5, len(protocol.remote_fds))
self.assertEqual(4, len(protocol.remote_fds))
# extra_resources expands the remote_fds
storlet_request = DockerStorletRequest(
@ -468,7 +481,7 @@ class TestStorletInvocationProtocol(unittest.TestCase):
protocol = StorletInvocationProtocol(
storlet_request, self.pipe_path, self.log_file, 1, self.logger,
extra_sources=[storlet_request])
self.assertEqual(6, len(protocol.remote_fds))
self.assertEqual(5, len(protocol.remote_fds))
# 2 more extra_resources expands the remote_fds
storlet_request = DockerStorletRequest(
@ -476,7 +489,7 @@ class TestStorletInvocationProtocol(unittest.TestCase):
protocol = StorletInvocationProtocol(
storlet_request, self.pipe_path, self.log_file, 1, self.logger,
extra_sources=[storlet_request] * 3)
self.assertEqual(8, len(protocol.remote_fds))
self.assertEqual(7, len(protocol.remote_fds))
def test_open_writer_with_invalid_fd(self):
invalid_fds = (

View File

@ -81,16 +81,26 @@ class TestSBusClient(unittest.TestCase):
resp = self.client._parse_response(raw_resp)
self.assertTrue(resp.status)
self.assertEqual('OK', resp.message)
self.assertIsNone(resp.task_id)
raw_resp = json.dumps({'status': True, 'message': 'OK',
'task_id': 'SOMEID'})
resp = self.client._parse_response(raw_resp)
self.assertTrue(resp.status)
self.assertEqual('OK', resp.message)
self.assertEqual('SOMEID', resp.task_id)
raw_resp = json.dumps({'status': False, 'message': 'ERROR'})
resp = self.client._parse_response(raw_resp)
self.assertFalse(resp.status)
self.assertEqual('ERROR', resp.message)
self.assertIsNone(resp.task_id)
raw_resp = json.dumps({'status': True, 'message': 'Sample:Message'})
resp = self.client._parse_response(raw_resp)
self.assertTrue(resp.status)
self.assertEqual('Sample:Message', resp.message)
self.assertIsNone(resp.task_id)
with self.assertRaises(SBusClientMalformedResponse):
self.client._parse_response('Foo')

View File

@ -17,15 +17,16 @@ import json
import unittest
import storlets.sbus.file_description as sbus_fd
from storlets.sbus.datagram import SBusFileDescriptor, SBusDatagram, \
SBusServiceDatagram, SBusExecuteDatagram, build_datagram_from_raw_message
SBusServiceDatagram, SBusExecuteDatagram, build_datagram, \
build_datagram_from_raw_message
from storlets.sbus.command import SBUS_CMD_PING, SBUS_CMD_EXECUTE
ALL_FD_TYPES = [
ALL_FDTYPES = [
sbus_fd.SBUS_FD_INPUT_OBJECT, sbus_fd.SBUS_FD_OUTPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_AND_METADATA,
sbus_fd.SBUS_FD_LOGGER, sbus_fd.SBUS_FD_OUTPUT_CONTAINER,
sbus_fd.SBUS_FD_OUTPUT_TASK_ID, sbus_fd.SBUS_FD_SERVICE_OUT,
sbus_fd.SBUS_FD_SERVICE_OUT,
]
@ -39,11 +40,11 @@ class TestSBusFileDescriptor(unittest.TestCase):
'storage': {'storage_key': 'storage_value'}},
fd.metadata)
def test_from_metadata_dict(self):
fd = SBusFileDescriptor.from_metadata_dict(
def test_from_fileno_and_metadata_dict(self):
fd = SBusFileDescriptor.from_fileno_and_metadata_dict(
1,
{'storlets': {'type': 'MYTYPE', 'storlets_key': 'storlets_value'},
'storage': {'storage_key': 'storage_value'},
'fileno': 1})
'storage': {'storage_key': 'storage_value'}})
self.assertEqual(1, fd.fileno)
self.assertEqual('MYTYPE', fd.fdtype)
self.assertEqual({'storlets_key': 'storlets_value'},
@ -53,7 +54,7 @@ class TestSBusFileDescriptor(unittest.TestCase):
class TestSBusDatagram(unittest.TestCase):
def test_check_required_fd_types_not_implemented(self):
def test_check_required_fdtypes_not_implemented(self):
# SBusDatagram designed not to be called independently
with self.assertRaises(NotImplementedError) as err:
SBusDatagram('', [], [])
@ -62,14 +63,14 @@ class TestSBusDatagram(unittest.TestCase):
err.exception.args[0])
def test_invalid_child_class_definition(self):
# no definition for _required_fd_types
# no definition for _required_fdtypes
class InvalidSBusDatagram(SBusDatagram):
pass
with self.assertRaises(NotImplementedError) as err:
InvalidSBusDatagram('', [], [])
self.assertEqual(
'SBusDatagram class should define _required_fd_types',
'SBusDatagram class should define _required_fdtypes',
err.exception.args[0])
@ -87,7 +88,7 @@ class SBusDatagramTestMixin(object):
self.assertEqual(self.task_id, self.dtg.task_id)
def test_num_fds(self):
self.assertEqual(len(self.types), self.dtg.num_fds)
self.assertEqual(len(self.fdtypes), self.dtg.num_fds)
def test_cmd_params(self):
self.assertEqual({'command': self.command,
@ -101,61 +102,61 @@ class SBusDatagramTestMixin(object):
'task_id': self.task_id}
self.assertEqual(res, json.loads(self.dtg.serialized_cmd_params))
def test_check_required_fd_types_mismatch(self):
invalid_types = (
def test_check_required_fdtypes_mismatch(self):
invalid_fdtypes_list = (
[], # empty list
['Invalid'] + self.types, # invalid type inserted at the first
['Invalid'] + self.fdtypes, # invalid type inserted at the first
# TODO(kota_): we may want *strict* check (not only checking first
# N items.
)
for invalid_type in invalid_types:
for invalid_fdtypes in invalid_fdtypes_list:
with self.assertRaises(ValueError) as cm:
self.dtg._check_required_fd_types(invalid_type)
self.dtg._check_required_fdtypes(invalid_fdtypes)
self.assertTrue(cm.exception.args[0].startswith(
'Fd type mismatch given_fd_types'))
'Fd type mismatch given_fdtypes'))
def test_find_fds(self):
# prepare all fd types and then pop out in the loop below
not_in_fd_types = ALL_FD_TYPES[:]
not_in_fdtypes = ALL_FDTYPES[:]
# N.B. fd should start from 1 (not 0), really?
for index, fd_type in enumerate(self.types, 1):
found_fds = self.dtg._find_fds(fd_type)
for index, fdtype in enumerate(self.fdtypes, 1):
found_fds = self.dtg._find_fds(fdtype)
# at least 1 fd should be found
self.assertTrue(found_fds)
# and the index is in the types
self.assertIn(index, found_fds)
if fd_type in not_in_fd_types:
# N.B. ALL_FD_TYPES should be unique list
not_in_fd_types.remove(fd_type)
if fdtype in not_in_fdtypes:
# N.B. ALL_FDTYPES should be unique list
not_in_fdtypes.remove(fdtype)
# sanity, not a fd type results in []
self.assertEqual([], self.dtg._find_fds('DUMMY_TYPE'))
# sanity, no other types are found
for fd_type in not_in_fd_types:
self.assertEqual([], self.dtg._find_fds(fd_type))
for fdtype in not_in_fdtypes:
self.assertEqual([], self.dtg._find_fds(fdtype))
def test_find_fd(self):
# prepare all fd types and then pop out in the loop below
not_in_fd_types = ALL_FD_TYPES[:]
not_in_fdtypes = ALL_FDTYPES[:]
# N.B. fd should start from 1 (not 0), really?
for index, fd_type in enumerate(self.types, 1):
found_fd = self.dtg._find_fd(fd_type)
for index, fdtype in enumerate(self.fdtypes, 1):
found_fd = self.dtg._find_fd(fdtype)
# at least 1 fd should be found
self.assertEqual(index, found_fd)
if fd_type in not_in_fd_types:
# N.B. ALL_FD_TYPES should be unique list
not_in_fd_types.remove(fd_type)
if fdtype in not_in_fdtypes:
# N.B. ALL_FDTYPES should be unique list
not_in_fdtypes.remove(fdtype)
# sanity, not a fd type results in None
self.assertIsNone(self.dtg._find_fd('DUMMY_TYPE'))
# sanity, no other types are found
for fd_type in not_in_fd_types:
self.assertIsNone(self.dtg._find_fd(fd_type))
for fdtype in not_in_fdtypes:
self.assertIsNone(self.dtg._find_fd(fdtype))
class TestSBusServiceDatagram(SBusDatagramTestMixin, unittest.TestCase):
@ -163,7 +164,7 @@ class TestSBusServiceDatagram(SBusDatagramTestMixin, unittest.TestCase):
def setUp(self):
self.command = 'SBUS_CMD_TEST'
self.types = [sbus_fd.SBUS_FD_SERVICE_OUT]
self.fdtypes = [sbus_fd.SBUS_FD_SERVICE_OUT]
self.sfds = [SBusFileDescriptor(sbus_fd.SBUS_FD_SERVICE_OUT, 1)]
super(TestSBusServiceDatagram, self).setUp()
@ -176,67 +177,111 @@ class TestSBusExecuteDatagram(SBusDatagramTestMixin, unittest.TestCase):
def setUp(self):
self.command = SBUS_CMD_EXECUTE
self.types = [sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_TASK_ID,
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
sbus_fd.SBUS_FD_LOGGER]
self.fdtypes = [sbus_fd.SBUS_FD_SERVICE_OUT,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
sbus_fd.SBUS_FD_LOGGER]
self.sfds = [SBusFileDescriptor(
self.types[i], i + 1,
fdtype, i + 1,
{'key%d' % i: 'value%d' % i},
{'skey%d' % i: 'svalue%d' % i})
for i in range(len(self.types))]
for i, fdtype in enumerate(self.fdtypes)]
super(TestSBusExecuteDatagram, self).setUp()
def test_init_extra_sources(self):
types = [sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_TASK_ID,
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
sbus_fd.SBUS_FD_LOGGER,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_INPUT_OBJECT]
fds = [SBusFileDescriptor(types[i], i + 1,
fdtypes = [sbus_fd.SBUS_FD_SERVICE_OUT,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
sbus_fd.SBUS_FD_LOGGER,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_INPUT_OBJECT]
fds = [SBusFileDescriptor(fdtype, i + 1,
{'key%d' % i: 'value%d' % i},
{'skey%d' % i: 'svalue%d' % i})
for i in range(len(types))]
for i, fdtype in enumerate(fdtypes)]
dtg = self._test_class(
self.command, fds, self.params, self.task_id)
self.assertEqual(types, [sfd.fdtype for sfd in dtg.sfds])
self.assertEqual(fdtypes, [sfd.fdtype for sfd in dtg.sfds])
self.assertEqual(self.params, dtg.params)
self.assertEqual(self.task_id, dtg.task_id)
def test_service_out_fd(self):
self.assertEqual(1, self.dtg.service_out_fd)
def test_invocation_fds(self):
self.assertEqual([2, 3, 4, 5], self.dtg.invocation_fds)
def test_object_out_fds(self):
self.assertEqual([3], self.dtg.object_out_fds)
def test_object_metadata_out_fds(self):
self.assertEqual([4], self.dtg.object_metadata_out_fds)
def test_task_id_out_fd(self):
self.assertEqual(2, self.dtg.task_id_out_fd)
def test_logger_out_fd(self):
self.assertEqual(5, self.dtg.logger_out_fd)
def test_object_in_fds(self):
self.assertEqual([1], self.dtg.object_in_fds)
self.assertEqual([2], self.dtg.object_in_fds)
def test_check_required_fd_types_reverse_order_failed(self):
types = self.types[:]
types.reverse() # reverse order
def test_check_required_fdtypes_reverse_order_failed(self):
fdtypes = self.fdtypes[:]
fdtypes.reverse() # reverse order
with self.assertRaises(ValueError) as cm:
self.dtg._check_required_fd_types(types)
self.dtg._check_required_fdtypes(fdtypes)
self.assertTrue(
cm.exception.args[0].startswith('Fd type mismatch given_fd_types'))
cm.exception.args[0].startswith('Fd type mismatch given_fdtypes'))
class TestBuildDatagramFromRawMessage(unittest.TestCase):
class TestBuildDatagram(unittest.TestCase):
def test_build_datagram(self):
# SBusServiceDatagram scenario
command = SBUS_CMD_PING
fdtypes = [sbus_fd.SBUS_FD_SERVICE_OUT]
fds = [SBusFileDescriptor(sbus_fd.SBUS_FD_SERVICE_OUT, 1)]
params = {'param1': 'paramvalue1'}
task_id = 'id'
dtg = build_datagram(command, fds, params, task_id)
self.assertIsInstance(dtg, SBusServiceDatagram)
self.assertEqual(command, dtg.command)
self.assertEqual(fdtypes, [sfd.fdtype for sfd in dtg.sfds])
self.assertEqual(params, dtg.params)
self.assertEqual(task_id, dtg.task_id)
# SBusExecuteDatagram scenario
command = SBUS_CMD_EXECUTE
fdtypes = [sbus_fd.SBUS_FD_SERVICE_OUT,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
sbus_fd.SBUS_FD_LOGGER,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_INPUT_OBJECT]
fds = [SBusFileDescriptor(fdtype, i + 1,
{'key%d' % i: 'value%d' % i},
{'skey%d' % i: 'svalue%d' % i})
for i, fdtype in enumerate(fdtypes)]
params = {'param1': 'paramvalue1'}
task_id = 'id'
dtg = build_datagram(command, fds, params, task_id)
self.assertIsInstance(dtg, SBusExecuteDatagram)
self.assertEqual(command, dtg.command)
self.assertEqual(fdtypes, [sfd.fdtype for sfd in dtg.sfds])
self.assertEqual(params, dtg.params)
self.assertEqual(task_id, dtg.task_id)
def test_build_datagram_from_raw_message(self):
# SBusServiceDatagram scenario
command = SBUS_CMD_PING
types = [sbus_fd.SBUS_FD_SERVICE_OUT]
fdtypes = [sbus_fd.SBUS_FD_SERVICE_OUT]
fds = [SBusFileDescriptor(sbus_fd.SBUS_FD_SERVICE_OUT, 1)]
params = {'param1': 'paramvalue1'}
task_id = 'id'
@ -247,25 +292,26 @@ class TestBuildDatagramFromRawMessage(unittest.TestCase):
dtg = build_datagram_from_raw_message(fds, str_metadata,
str_cmd_params)
self.assertIsInstance(dtg, SBusServiceDatagram)
self.assertEqual(command, dtg.command)
self.assertEqual(types, [sfd.fdtype for sfd in dtg.sfds])
self.assertEqual(fdtypes, [sfd.fdtype for sfd in dtg.sfds])
self.assertEqual(params, dtg.params)
self.assertEqual(task_id, dtg.task_id)
# SBusExecuteDatagram scenario
command = SBUS_CMD_EXECUTE
types = [sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_TASK_ID,
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
sbus_fd.SBUS_FD_LOGGER,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_INPUT_OBJECT]
fds = [SBusFileDescriptor(types[i], i + 1,
fdtypes = [sbus_fd.SBUS_FD_SERVICE_OUT,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
sbus_fd.SBUS_FD_LOGGER,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_INPUT_OBJECT]
fds = [SBusFileDescriptor(fdtype, i + 1,
{'key%d' % i: 'value%d' % i},
{'skey%d' % i: 'svalue%d' % i})
for i in range(len(types))]
for i, fdtype in enumerate(fdtypes)]
params = {'param1': 'paramvalue1'}
task_id = 'id'
cmd_params = {'command': command, 'params': params, 'task_id': task_id}
@ -275,8 +321,9 @@ class TestBuildDatagramFromRawMessage(unittest.TestCase):
dtg = build_datagram_from_raw_message(fds, str_metadata,
str_cmd_params)
self.assertIsInstance(dtg, SBusExecuteDatagram)
self.assertEqual(command, dtg.command)
self.assertEqual(types, [sfd.fdtype for sfd in dtg.sfds])
self.assertEqual(fdtypes, [sfd.fdtype for sfd in dtg.sfds])
self.assertEqual(params, dtg.params)
self.assertEqual(task_id, dtg.task_id)