Merge "Use SBusClient for execute command"

This commit is contained in:
Zuul 2020-04-15 01:38:43 +00:00 committed by Gerrit Code Review
commit ddf82cf59c
19 changed files with 347 additions and 283 deletions

View File

@ -36,10 +36,13 @@ public abstract class SAbstractTask {
this.logger = logger;
}
protected boolean respond(OutputStream ostream, boolean status, String message) {
protected boolean respond(OutputStream ostream, boolean status, String message, String taskid) {
JSONObject obj = new JSONObject();
obj.put("status", status);
obj.put("message", message);
if ( taskid != null ) {
obj.put("task_id", taskid);
}
boolean bStatus = true;
try {
ostream.write(obj.toJSONString().getBytes());

View File

@ -63,7 +63,7 @@ public class SCancelTask extends SAbstractTask {
respMessage = new String("Task id " + this.taskId_
+ "is not found");
}
return respond(this.sOut_, respStatus, respMessage);
return respond(this.sOut_, respStatus, respMessage, null);
}
}
/* ============================== END OF FILE =============================== */

View File

@ -21,6 +21,7 @@ import java.io.OutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.UUID;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.Logger;
@ -71,7 +72,9 @@ public class SExecutionManager {
public String submitTask(final SExecutionTask sTask) {
Future futureTask = threadPool_.submit(sTask);
String taskId = futureTask.toString().split("@")[1];
UUID uuid = UUID.randomUUID();
String taskId = uuid.toString().substring(8);
synchronized (this.taskIdToTask_) {
this.taskIdToTask_.put(taskId, futureTask);

View File

@ -22,8 +22,8 @@ import org.slf4j.Logger;
import org.openstack.storlet.common.*;
import org.openstack.storlet.daemon.SExecutionManager;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.io.IOException;
import java.io.OutputStream;
@ -36,26 +36,26 @@ import java.io.OutputStream;
public class SExecutionTask extends SAbstractTask implements Runnable {
private StorletLogger storletLogger_ = null;
private IStorlet storlet_ = null;
private OutputStream sOut_ = null;
private ArrayList<StorletInputStream> inStreams_ = null;
private ArrayList<StorletOutputStream> outStreams_ = null;
private HashMap<String, String> executionParams_ = null;
private OutputStream taskIdOut_ = null;
private String taskId_ = null;
private SExecutionManager sExecManager_ = null;
public SExecutionTask(IStorlet storlet,
public SExecutionTask(IStorlet storlet, OutputStream sOut,
ArrayList<StorletInputStream> instreams,
ArrayList<StorletOutputStream> outstreams, OutputStream taskIdOut,
ArrayList<StorletOutputStream> outstreams,
HashMap<String, String> executionParams,
StorletLogger storletLogger, Logger logger,
SExecutionManager sExecManager) {
super(logger);
this.storlet_ = storlet;
this.sOut_ = sOut;
this.inStreams_ = instreams;
this.outStreams_ = outstreams;
this.executionParams_ = executionParams;
this.storletLogger_ = storletLogger;
this.taskIdOut_ = taskIdOut;
this.sExecManager_ = sExecManager;
}
@ -92,20 +92,7 @@ public class SExecutionTask extends SAbstractTask implements Runnable {
public boolean exec() {
boolean bStatus = true;
this.taskId_ = this.sExecManager_.submitTask((SExecutionTask) this);
try {
this.taskIdOut_.write(this.taskId_.getBytes());
} catch (IOException e) {
e.printStackTrace();
bStatus = false;
} finally {
try{
this.taskIdOut_.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return bStatus;
return respond(this.sOut_, true, new String("OK"), this.taskId_);
}
@Override

View File

@ -46,7 +46,7 @@ public class SHaltTask extends SAbstractTask {
* */
@Override
public boolean exec() {
respond(this.sOut_, true, new String("OK"));
respond(this.sOut_, true, new String("OK"), null);
return false;
}
}

View File

@ -46,7 +46,7 @@ public class SPingTask extends SAbstractTask {
* */
@Override
public boolean exec() {
return respond(this.sOut_, true, new String("OK"));
return respond(this.sOut_, true, new String("OK"), null);
}
}
/* ============================== END OF FILE =============================== */

View File

@ -86,14 +86,14 @@ public class STaskFactory {
int nFiles = dtg.getNFiles();
HashMap<String, HashMap<String, String>>[] FilesMD = dtg.getFilesMetadata();
this.logger_.trace("StorletTask: Got " + nFiles + " fds");
OutputStream taskIdOut = null;
OutputStream sOut = null;
for (int i = 0; i < nFiles; ++i) {
HashMap<String, String> storletsMetadata = FilesMD[i].get("storlets");
HashMap<String, String> storageMetadata = FilesMD[i].get("storage");
FileDescriptor fd = dtg.getFiles()[i];
String strFDtype = storletsMetadata.get("type");
if (strFDtype.equals("SBUS_FD_OUTPUT_TASK_ID")) {
taskIdOut = new FileOutputStream(fd);
if (strFDtype.equals("SBUS_FD_SERVICE_OUT")) {
sOut = new FileOutputStream(fd);
} else if (strFDtype.equals("SBUS_FD_INPUT_OBJECT")) {
this.logger_.trace("createStorletTask: fd " + i
+ " is of type SBUS_FD_INPUT_OBJECT");
@ -147,7 +147,7 @@ public class STaskFactory {
this.logger_.error("createStorletTask: fd " + i
+ " is of unknown type " + strFDtype);
}
return new SExecutionTask(storlet_, inStreams, outStreams, taskIdOut,
return new SExecutionTask(storlet_, sOut, inStreams, outStreams,
dtg.getExecParams(), storletLogger, logger_, sExecManager);
}

View File

@ -28,13 +28,14 @@ class CommandResponse(Exception):
The result of command execution
"""
def __init__(self, status, message, iterable=True):
def __init__(self, status, message, iterable=True, task_id=None):
"""
Constract CommandResponse instance
:param status: task status
:param message: message to be returned and logged
:param iterable: wheter we can keep SDaemon process running
:param task_id: ID assigned to the requested task
"""
self.status = status
self.message = message
@ -43,9 +44,14 @@ class CommandResponse(Exception):
# exit or not as a result of processing the command
self.iterable = iterable
self.task_id = task_id
@property
def report_message(self):
return json.dumps({'status': self.status, 'message': self.message})
rsp = {'status': self.status, 'message': self.message}
if self.task_id:
rsp['task_id'] = self.task_id
return json.dumps(rsp)
CommandSuccess = partial(CommandResponse, True)
@ -119,16 +125,9 @@ class SBusServer(object):
self.logger.info('Command:%s Response:%s' %
(command, resp.report_message))
try:
outfd = dtg.service_out_fd
with os.fdopen(outfd, 'wb') as outfile:
self._respond(outfile, resp)
except AttributeError:
# TODO(takashi): Currently we return response via service out fd
# only for service commands, but to be more
# consistent, we should do the same for execute
# command
pass
outfd = dtg.service_out_fd
with os.fdopen(outfd, 'wb') as outfile:
self._respond(outfile, resp)
return resp.iterable

View File

@ -147,19 +147,38 @@ class StorletDaemon(SBusServer):
else:
return StorletInputFile(in_md, in_fd)
def _safe_close(self, fileno):
try:
os.close(fileno)
except OSError as e:
if e.errno != errno.EBADF:
raise
pass
@command_handler
def execute(self, dtg):
task_id_out_fd = dtg.task_id_out_fd
task_id = str(uuid.uuid4())[:8]
while len(self.task_id_to_pid) >= self.pool_size:
self._wait_child_process()
self.logger.debug('Returning task_id: %s ' % task_id)
with os.fdopen(task_id_out_fd, 'wb') as outfile:
outfile.write(task_id.encode("utf-8"))
pid = os.fork()
if pid:
self.logger.debug('Create a subprocess %d for task %s' %
(pid, task_id))
self.task_id_to_pid[task_id] = pid
for fd in dtg.invocation_fds:
# We do not use invocation fds in main process, so close them
self._safe_close(fd)
else:
self._safe_close(dtg.service_out_fd)
self._execute(dtg)
sys.exit()
return CommandSuccess('OK', task_id=task_id)
def _execute(self, dtg):
storlet_md = dtg.object_in_storlet_metadata
params = dtg.params
in_md = dtg.object_in_metadata
@ -168,50 +187,28 @@ class StorletDaemon(SBusServer):
out_fds = dtg.object_out_fds
logger_fd = dtg.logger_out_fd
pid = os.fork()
if pid:
self.logger.debug('Create a subprocess %d for task %s' %
(pid, task_id))
self.task_id_to_pid[task_id] = pid
self.logger.debug('Start storlet invocation')
self.logger.debug(
'in_fds:%s in_md:%s out_md_fds:%s out_fds:%s logger_fd: %s'
% (in_fds, in_md, out_md_fds, out_fds, logger_fd))
for fd in dtg.fds:
# We do not use fds in main process, so close them
try:
os.close(fd)
except OSError as e:
if e.errno != errno.EBADF:
raise
pass
else:
try:
self.logger.debug('Start storlet invocation')
in_files = [self._create_input_file(st_md, md, in_fd)
for st_md, md, in_fd in zip(storlet_md, in_md, in_fds)]
self.logger.debug('in_fds:%s in_md:%s out_md_fds:%s out_fds:%s'
' logger_fd: %s'
% (in_fds, in_md, out_md_fds, out_fds,
logger_fd))
out_files = [StorletOutputFile(out_md_fd, out_fd)
for out_md_fd, out_fd in zip(out_md_fds, out_fds)]
in_files = [self._create_input_file(st_md, md, in_fd)
for st_md, md, in_fd
in zip(storlet_md, in_md, in_fds)]
out_files = [StorletOutputFile(out_md_fd, out_fd)
for out_md_fd, out_fd
in zip(out_md_fds, out_fds)]
self.logger.debug('Start storlet execution')
with StorletLogger(self.storlet_name, logger_fd) as slogger:
handler = self.storlet_cls(slogger)
handler(in_files, out_files, params)
self.logger.debug('Completed')
except Exception:
self.logger.exception('Error in storlet invocation')
finally:
# Make sure that all fds are closed
self._safe_close_files(in_files)
self._safe_close_files(out_files)
sys.exit()
return CommandSuccess('OK')
try:
self.logger.debug('Start storlet execution')
with StorletLogger(self.storlet_name, logger_fd) as slogger:
handler = self.storlet_cls(slogger)
handler(in_files, out_files, params)
self.logger.debug('Completed')
except Exception:
self.logger.exception('Error in storlet invocation')
finally:
# Make sure that all fds are closed
self._safe_close_files(in_files + out_files)
@command_handler
def cancel(self, dtg):

View File

@ -26,12 +26,10 @@ import eventlet
import json
from contextlib import contextmanager
from storlets.sbus import SBus
from storlets.sbus.command import SBUS_CMD_EXECUTE
from storlets.sbus.datagram import SBusFileDescriptor, SBusExecuteDatagram
from storlets.sbus import file_description as sbus_fd
from storlets.sbus.client import SBusClient
from storlets.sbus.client.exceptions import SBusClientException
from storlets.sbus.datagram import SBusFileDescriptor
from storlets.sbus import file_description as sbus_fd
from storlets.gateway.common.exceptions import StorletRuntimeException, \
StorletTimeout
from storlets.gateway.common.logger import StorletLogger
@ -505,8 +503,6 @@ class StorletInvocationProtocol(object):
self.data_write_fd = None
self.metadata_read_fd = None
self.metadata_write_fd = None
self.taskid_read_fd = None
self.taskid_write_fd = None
self.task_id = None
self._input_data_read_fd = None
self._input_data_write_fd = None
@ -548,8 +544,6 @@ class StorletInvocationProtocol(object):
self.input_data_read_fd,
storage_metadata=self.srequest.user_metadata,
storlets_metadata=storlets_metadata),
SBusFileDescriptor(sbus_fd.SBUS_FD_OUTPUT_TASK_ID,
self.taskid_write_fd),
SBusFileDescriptor(sbus_fd.SBUS_FD_OUTPUT_OBJECT,
self.data_write_fd),
SBusFileDescriptor(sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
@ -587,7 +581,6 @@ class StorletInvocationProtocol(object):
if not self.srequest.has_fd:
self._input_data_read_fd, self._input_data_write_fd = os.pipe()
self.data_read_fd, self.data_write_fd = os.pipe()
self.taskid_read_fd, self.taskid_write_fd = os.pipe()
self.metadata_read_fd, self.metadata_write_fd = os.pipe()
for source in self.extra_data_sources:
@ -614,8 +607,7 @@ class StorletInvocationProtocol(object):
"""
Close all of the container side descriptors
"""
fds = [self.data_write_fd, self.metadata_write_fd,
self.taskid_write_fd]
fds = [self.data_write_fd, self.metadata_write_fd]
if not self.srequest.has_fd:
fds.append(self.input_data_read_fd)
fds.extend([source['read_fd'] for source in self.extra_data_sources])
@ -626,8 +618,7 @@ class StorletInvocationProtocol(object):
"""
Close all of the host side descriptors
"""
fds = [self.data_read_fd, self.metadata_read_fd,
self.taskid_read_fd]
fds = [self.data_read_fd, self.metadata_read_fd]
fds.extend([source['write_fd'] for source in self.extra_data_sources])
self._safe_close(fds)
@ -650,25 +641,23 @@ class StorletInvocationProtocol(object):
with self.storlet_logger.activate(),\
self._activate_invocation_descriptors():
self._send_execute_command()
self._wait_for_read_with_timeout(self.taskid_read_fd)
# TODO(kota_): need an assertion for task_id format
self.task_id = os.read(self.taskid_read_fd, 10)
if not isinstance(self.task_id, str):
self.task_id = self.task_id.decode('utf-8')
os.close(self.taskid_read_fd)
def _send_execute_command(self):
"""
Send execute command to the remote daemon factory to invoke storlet
execution
"""
dtg = SBusExecuteDatagram(
SBUS_CMD_EXECUTE,
self.remote_fds,
self.srequest.params)
rc = SBus.send(self.storlet_pipe_path, dtg)
client = SBusClient(self.storlet_pipe_path)
try:
resp = client.execute(self.srequest.params, self.remote_fds)
if not resp.status:
raise StorletRuntimeException("Failed to send execute command")
if (rc < 0):
if not resp.task_id:
raise StorletRuntimeException("Missing task id")
else:
self.task_id = resp.task_id
except SBusClientException:
raise StorletRuntimeException("Failed to send execute command")
def _wait_for_read_with_timeout(self, fd):

View File

@ -15,25 +15,25 @@
import json
import os
from storlets.sbus import SBus
from storlets.sbus.command import SBUS_CMD_CANCEL, SBUS_CMD_DAEMON_STATUS, \
SBUS_CMD_HALT, SBUS_CMD_PING, SBUS_CMD_START_DAEMON, \
SBUS_CMD_STOP_DAEMON, SBUS_CMD_STOP_DAEMONS
from storlets.sbus.datagram import SBusFileDescriptor, SBusServiceDatagram
from storlets.sbus import command as sbus_cmd
from storlets.sbus.datagram import SBusFileDescriptor, build_datagram
from storlets.sbus.file_description import SBUS_FD_SERVICE_OUT
from storlets.sbus.client.exceptions import SBusClientIOError, \
SBusClientMalformedResponse, SBusClientSendError
class SBusResponse(object):
def __init__(self, status, message):
def __init__(self, status, message, task_id=None):
"""
Construct SBusResponse class
:param status: Whether the server succeed to process the given request
:param message: Messages to describe the process result
:param task_id: Id assigned to each tasks
"""
self.status = status
self.message = message
self.task_id = task_id
class SBusClient(object):
@ -54,16 +54,22 @@ class SBusClient(object):
message = resp['message']
except (ValueError, KeyError):
raise SBusClientMalformedResponse('Got malformed response')
return SBusResponse(status, message)
def _request(self, command, params=None, task_id=None):
# NOTE(takashi): task_id is currently used only in EXECUTE command, so
# we don't fail here even if the given response doesn't
# have task_id.
task_id = resp.get('task_id')
return SBusResponse(status, message, task_id)
def _request(self, command, params=None, task_id=None, extra_fds=None):
read_fd, write_fd = os.pipe()
try:
try:
datagram = SBusServiceDatagram(
command,
[SBusFileDescriptor(SBUS_FD_SERVICE_OUT, write_fd)],
params, task_id)
sfds = \
[SBusFileDescriptor(SBUS_FD_SERVICE_OUT, write_fd)] + \
(extra_fds or [])
datagram = build_datagram(command, sfds, params, task_id)
rc = SBus.send(self.socket_path, datagram)
if rc < 0:
raise SBusClientSendError(
@ -79,8 +85,8 @@ class SBusClient(object):
try:
buf = os.read(read_fd, self.chunk_size)
except IOError:
raise SBusClientIOError('Failed to read data from read '
'pipe')
raise SBusClientIOError(
'Failed to read data from read pipe')
if not buf:
break
reply = reply + buf
@ -92,12 +98,13 @@ class SBusClient(object):
return self._parse_response(reply)
def execute(self, *args, **kwargs):
# TODO(takashi): implement this
raise NotImplementedError('Execute command is not supported yet')
def execute(self, invocation_params, invocation_fds):
return self._request(sbus_cmd.SBUS_CMD_EXECUTE,
params=invocation_params,
extra_fds=invocation_fds)
def ping(self):
return self._request(SBUS_CMD_PING)
return self._request(sbus_cmd.SBUS_CMD_PING)
def start_daemon(self, language, storlet_path, storlet_id,
uds_path, log_level, pool_size,
@ -108,21 +115,21 @@ class SBusClient(object):
if language_version:
params['daemon_language_version'] = language_version
return self._request(SBUS_CMD_START_DAEMON, params)
return self._request(sbus_cmd.SBUS_CMD_START_DAEMON, params)
def stop_daemon(self, storlet_name):
return self._request(SBUS_CMD_STOP_DAEMON,
return self._request(sbus_cmd.SBUS_CMD_STOP_DAEMON,
{'storlet_name': storlet_name})
def stop_daemons(self):
return self._request(SBUS_CMD_STOP_DAEMONS)
return self._request(sbus_cmd.SBUS_CMD_STOP_DAEMONS)
def halt(self):
return self._request(SBUS_CMD_HALT)
return self._request(sbus_cmd.SBUS_CMD_HALT)
def daemon_status(self, storlet_name):
return self._request(SBUS_CMD_DAEMON_STATUS,
return self._request(sbus_cmd.SBUS_CMD_DAEMON_STATUS,
{'storlet_name': storlet_name})
def cancel(self, task_id):
return self._request(SBUS_CMD_CANCEL, task_id=task_id)
return self._request(sbus_cmd.SBUS_CMD_CANCEL, task_id=task_id)

View File

@ -15,8 +15,8 @@
import copy
import json
from storlets.sbus import command as sbus_cmd
from storlets.sbus import file_description as sbus_fd
from storlets.sbus.command import SBUS_CMD_EXECUTE
class SBusFileDescriptor(object):
@ -39,11 +39,10 @@ class SBusFileDescriptor(object):
'storage': self.storage_metadata}
@classmethod
def from_metadata_dict(cls, metadict):
def from_fileno_and_metadata_dict(cls, fileno, metadict):
_metadict = copy.deepcopy(metadict)
storlets_metadata = _metadict['storlets']
storage_metadata = _metadict['storage']
fileno = _metadict['fileno']
fdtype = storlets_metadata.pop('type')
return cls(fdtype, fileno, storlets_metadata, storage_metadata)
@ -55,7 +54,7 @@ class SBusDatagram(object):
# Each child Datagram should define what fd types are expected with
# list format
_required_fd_types = None
_required_fdtypes = None
def __init__(self, command, sfds, params=None, task_id=None):
"""
@ -72,8 +71,8 @@ class SBusDatagram(object):
raise NotImplementedError(
'SBusDatagram class should not be initialized as bare')
self.command = command
fd_types = [sfd.fdtype for sfd in sfds]
self._check_required_fd_types(fd_types)
fdtypes = [sfd.fdtype for sfd in sfds]
self._check_required_fdtypes(fdtypes)
self.sfds = sfds
self.params = params
self.task_id = task_id
@ -145,17 +144,17 @@ class SBusDatagram(object):
# fd validation.
return ret[0]
def _check_required_fd_types(self, given_fd_types):
if self._required_fd_types is None:
def _check_required_fdtypes(self, given_fdtypes):
if self._required_fdtypes is None:
raise NotImplementedError(
'SBusDatagram class should define _required_fd_types')
# the first len(self._required_fd_types) types should be fit
'SBusDatagram class should define _required_fdtypes')
# the first len(self._required_fdtypes) types should be fit
# to the required list
if given_fd_types[:len(self._required_fd_types)] != \
self._required_fd_types:
raise ValueError('Fd type mismatch given_fd_types:%s \
required_fd_types:%s' %
(given_fd_types, self._required_fd_types))
if given_fdtypes[:len(self._required_fdtypes)] != \
self._required_fdtypes:
raise ValueError('Fd type mismatch given_fdtypes:%s \
required_fdtypes:%s' %
(given_fdtypes, self._required_fdtypes))
def __str__(self):
return 'num_fds=%s, md=%s, cmd_params=%s' % (
@ -179,7 +178,7 @@ class SBusServiceDatagram(SBusDatagram):
- SBUS_CMD_PING
- SBUS_CMD_CANCEL
"""
_required_fd_types = [sbus_fd.SBUS_FD_SERVICE_OUT]
_required_fdtypes = [sbus_fd.SBUS_FD_SERVICE_OUT]
def __init__(self, command, sfds, params=None, task_id=None):
super(SBusServiceDatagram, self).__init__(
@ -191,11 +190,11 @@ class SBusServiceDatagram(SBusDatagram):
class SBusExecuteDatagram(SBusDatagram):
_required_fd_types = [sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_TASK_ID,
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
sbus_fd.SBUS_FD_LOGGER]
_required_fdtypes = [sbus_fd.SBUS_FD_SERVICE_OUT,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
sbus_fd.SBUS_FD_LOGGER]
def __init__(self, command, sfds, params=None, task_id=None):
# TODO(kota_): the args command is not used in ExecuteDatagram
@ -207,15 +206,23 @@ class SBusExecuteDatagram(SBusDatagram):
# this implementation is based on the idea that
# we only have extra input sources, which is
# added at the end of fd list
extra_fd_types = [sfd.fdtype for sfd in
sfds[len(self._required_fd_types):]]
extra_fdtypes = [sfd.fdtype for sfd in
sfds[len(self._required_fdtypes):]]
if [t for t in extra_fd_types if t != sbus_fd.SBUS_FD_INPUT_OBJECT]:
if [t for t in extra_fdtypes if t != sbus_fd.SBUS_FD_INPUT_OBJECT]:
raise ValueError(
'Extra data should be SBUS_FD_INPUT_OBJECT')
super(SBusExecuteDatagram, self).__init__(
SBUS_CMD_EXECUTE, sfds, params, task_id)
sbus_cmd.SBUS_CMD_EXECUTE, sfds, params, task_id)
@property
def invocation_fds(self):
return [sfd.fileno for sfd in self.sfds[1:]]
@property
def service_out_fd(self):
return self._find_fd(sbus_fd.SBUS_FD_SERVICE_OUT)
@property
def object_out_fds(self):
@ -225,10 +232,6 @@ class SBusExecuteDatagram(SBusDatagram):
def object_metadata_out_fds(self):
return self._find_fds(sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA)
@property
def task_id_out_fd(self):
return self._find_fd(sbus_fd.SBUS_FD_OUTPUT_TASK_ID)
@property
def logger_out_fd(self):
return self._find_fd(sbus_fd.SBUS_FD_LOGGER)
@ -238,6 +241,15 @@ class SBusExecuteDatagram(SBusDatagram):
return self._find_fds(sbus_fd.SBUS_FD_INPUT_OBJECT)
def build_datagram(command, sfds, params, task_id):
if command == sbus_cmd.SBUS_CMD_EXECUTE:
dtg_cls = SBusExecuteDatagram
else:
dtg_cls = SBusServiceDatagram
return dtg_cls(command, sfds, params, task_id)
def build_datagram_from_raw_message(fds, str_md, str_cmd_params):
"""
Build SBusDatagram from raw message received over sbus
@ -256,11 +268,6 @@ def build_datagram_from_raw_message(fds, str_md, str_cmd_params):
if len(fds) != len(metadata):
raise ValueError('Length mismatch fds: %d != md %d' %
(len(fds), len(metadata)))
sfds = []
for fileno, md in zip(fds, metadata):
md['fileno'] = fileno
sfds.append(SBusFileDescriptor.from_metadata_dict(md))
if command == SBUS_CMD_EXECUTE:
return SBusExecuteDatagram(command, sfds, params, task_id)
return SBusServiceDatagram(command, sfds, params, task_id)
sfds = [SBusFileDescriptor.from_fileno_and_metadata_dict(fileno, md)
for (fileno, md) in zip(fds, metadata)]
return build_datagram(command, sfds, params, task_id)

View File

@ -19,5 +19,4 @@ SBUS_FD_OUTPUT_OBJECT_METADATA = 'SBUS_FD_OUTPUT_OBJECT_METADATA'
SBUS_FD_OUTPUT_OBJECT_AND_METADATA = 'SBUS_FD_OUTPUT_OBJECT_AND_METADATA'
SBUS_FD_LOGGER = 'SBUS_FD_LOGGER'
SBUS_FD_OUTPUT_CONTAINER = 'SBUS_FD_OUTPUT_CONTAINER'
SBUS_FD_OUTPUT_TASK_ID = 'SBUS_FD_OUTPUT_TASK_ID'
SBUS_FD_SERVICE_OUT = 'SBUS_FD_SERVICE_OUT'

View File

@ -36,13 +36,6 @@ def with_tempdir(f):
return wrapped
class MockSBus(object):
@classmethod
def send(self, path, datagram):
# return success code
return 0
class FakeLogger(object):
def __init__(self, *args, **kwargs):
self._log_lines = defaultdict(list)

View File

@ -30,17 +30,29 @@ class TestCommandResponse(unittest.TestCase):
self.assertTrue(resp.status)
self.assertEqual('ok', resp.message)
self.assertTrue(resp.iterable)
self.assertIsNone(resp.task_id)
resp = CommandResponse(False, 'error', False)
self.assertFalse(resp.status)
self.assertEqual('error', resp.message)
self.assertFalse(resp.iterable)
self.assertIsNone(resp.task_id)
resp = CommandResponse(True, 'ok', task_id='foo')
self.assertTrue(resp.status)
self.assertEqual('ok', resp.message)
self.assertTrue(resp.iterable)
self.assertEqual('foo', resp.task_id)
def test_report_message(self):
resp = CommandResponse(True, 'msg', True)
self.assertEqual({'status': True, 'message': 'msg'},
json.loads(resp.report_message))
resp = CommandResponse(True, 'msg', True, 'foo')
self.assertEqual({'status': True, 'message': 'msg', 'task_id': 'foo'},
json.loads(resp.report_message))
class TestCommandSuccess(unittest.TestCase):
def test_init(self):

View File

@ -33,7 +33,6 @@ from tests.unit import FakeLogger
from tests.unit.gateway.gateways import FakeFileManager
from storlets.gateway.gateways.docker.gateway import DockerStorletRequest, \
StorletGatewayDocker
from tests.unit import MockSBus
class MockInternalClient(object):
@ -461,11 +460,9 @@ use = egg:swift#catch_errors
# TODO(kota_): need more efficient way for emuration of return value
# from SDaemon
value_generator = iter([
# Forth is return value for invoking as task_id
'This is task id',
# Fifth is for getting meta
# first, we get metadata json
json.dumps({'metadata': 'return'}),
# At last return body and EOF
# then we get object data
'something', '',
])
@ -496,7 +493,6 @@ use = egg:swift#catch_errors
# os.read -> mock reading the file descriptor from container
# select.slect -> mock fd communication which can be readable
@mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient')
@mock.patch('storlets.gateway.gateways.docker.runtime.SBus', MockSBus)
@mock.patch('storlets.gateway.gateways.docker.runtime.os.read',
mock_read)
@mock.patch('storlets.gateway.gateways.docker.runtime.os.close',
@ -509,6 +505,8 @@ use = egg:swift#catch_errors
client.ping.return_value = SBusResponse(True, 'OK')
client.stop_daemon.return_value = SBusResponse(True, 'OK')
client.start_daemon.return_value = SBusResponse(True, 'OK')
client.execute.return_value = SBusResponse(True, 'OK', 'someid')
sresp = self.gateway.invocation_flow(st_req, extra_sources)
eventlet.sleep(0.1)
file_like = FileLikeIter(sresp.data_iter)

View File

@ -34,14 +34,6 @@ from tests.unit import FakeLogger, with_tempdir
from tests.unit.gateway.gateways import FakeFileManager
@contextmanager
def _mock_sbus(send_status=0):
with mock.patch('storlets.gateway.gateways.docker.runtime.'
'SBus.send') as fake_send:
fake_send.return_value = send_status
yield
@contextmanager
def _mock_os_pipe(bufs):
class FakeFd(object):
@ -418,13 +410,39 @@ class TestStorletInvocationProtocol(unittest.TestCase):
except OSError:
pass
def test_invocation_protocol(self):
# os.pipe will be called 4 times
pipe_called = 4
def test_send_execute_command(self):
with mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient.'
'execute') as execute:
execute.return_value = SBusResponse(True, 'OK', 'someid')
self.protocol._send_execute_command()
self.assertEqual('someid', self.protocol.task_id)
with _mock_sbus(0), _mock_os_pipe([''] * pipe_called) as pipes:
with mock.patch.object(
self.protocol, '_wait_for_read_with_timeout'):
with mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient.'
'execute') as execute:
execute.return_value = SBusResponse(True, 'OK')
with self.assertRaises(StorletRuntimeException):
self.protocol._send_execute_command()
with mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient.'
'execute') as execute:
execute.return_value = SBusResponse(False, 'NG', 'someid')
with self.assertRaises(StorletRuntimeException):
self.protocol._send_execute_command()
with mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient.'
'execute') as execute:
execute.side_effect = SBusClientIOError()
with self.assertRaises(StorletRuntimeException):
self.protocol._send_execute_command()
def test_invocation_protocol(self):
# os.pipe will be called 3 times
pipe_called = 3
with _mock_os_pipe([''] * pipe_called) as pipes:
with mock.patch.object(self.protocol,
'_wait_for_read_with_timeout'), \
mock.patch.object(self.protocol, '_send_execute_command'):
self.protocol._invoke()
self.assertEqual(pipe_called, len(pipes))
@ -441,11 +459,6 @@ class TestStorletInvocationProtocol(unittest.TestCase):
self.assertFalse(data_read_fd.closed)
self.assertTrue(data_write_fd.closed)
# both execution str fds are closed
execution_read_fd, execution_write_fd = next(pipes)
self.assertTrue(execution_read_fd.closed)
self.assertTrue(execution_write_fd.closed)
# metadata write fd is closed, metadata read fd is still open.
metadata_read_fd, metadata_write_fd = next(pipes)
self.assertFalse(metadata_read_fd.closed)
@ -455,12 +468,12 @@ class TestStorletInvocationProtocol(unittest.TestCase):
self.assertRaises(StopIteration, next, pipes)
def test_invocation_protocol_remote_fds(self):
# In default, we have 5 fds in remote_fds
# In default, we have 4 fds in remote_fds
storlet_request = DockerStorletRequest(
self.storlet_id, {}, {}, iter(StringIO()), options=self.options)
protocol = StorletInvocationProtocol(
storlet_request, self.pipe_path, self.log_file, 1, self.logger)
self.assertEqual(5, len(protocol.remote_fds))
self.assertEqual(4, len(protocol.remote_fds))
# extra_resources expands the remote_fds
storlet_request = DockerStorletRequest(
@ -468,7 +481,7 @@ class TestStorletInvocationProtocol(unittest.TestCase):
protocol = StorletInvocationProtocol(
storlet_request, self.pipe_path, self.log_file, 1, self.logger,
extra_sources=[storlet_request])
self.assertEqual(6, len(protocol.remote_fds))
self.assertEqual(5, len(protocol.remote_fds))
# 2 more extra_resources expands the remote_fds
storlet_request = DockerStorletRequest(
@ -476,7 +489,7 @@ class TestStorletInvocationProtocol(unittest.TestCase):
protocol = StorletInvocationProtocol(
storlet_request, self.pipe_path, self.log_file, 1, self.logger,
extra_sources=[storlet_request] * 3)
self.assertEqual(8, len(protocol.remote_fds))
self.assertEqual(7, len(protocol.remote_fds))
def test_open_writer_with_invalid_fd(self):
invalid_fds = (

View File

@ -81,16 +81,26 @@ class TestSBusClient(unittest.TestCase):
resp = self.client._parse_response(raw_resp)
self.assertTrue(resp.status)
self.assertEqual('OK', resp.message)
self.assertIsNone(resp.task_id)
raw_resp = json.dumps({'status': True, 'message': 'OK',
'task_id': 'SOMEID'})
resp = self.client._parse_response(raw_resp)
self.assertTrue(resp.status)
self.assertEqual('OK', resp.message)
self.assertEqual('SOMEID', resp.task_id)
raw_resp = json.dumps({'status': False, 'message': 'ERROR'})
resp = self.client._parse_response(raw_resp)
self.assertFalse(resp.status)
self.assertEqual('ERROR', resp.message)
self.assertIsNone(resp.task_id)
raw_resp = json.dumps({'status': True, 'message': 'Sample:Message'})
resp = self.client._parse_response(raw_resp)
self.assertTrue(resp.status)
self.assertEqual('Sample:Message', resp.message)
self.assertIsNone(resp.task_id)
with self.assertRaises(SBusClientMalformedResponse):
self.client._parse_response('Foo')

View File

@ -17,15 +17,16 @@ import json
import unittest
import storlets.sbus.file_description as sbus_fd
from storlets.sbus.datagram import SBusFileDescriptor, SBusDatagram, \
SBusServiceDatagram, SBusExecuteDatagram, build_datagram_from_raw_message
SBusServiceDatagram, SBusExecuteDatagram, build_datagram, \
build_datagram_from_raw_message
from storlets.sbus.command import SBUS_CMD_PING, SBUS_CMD_EXECUTE
ALL_FD_TYPES = [
ALL_FDTYPES = [
sbus_fd.SBUS_FD_INPUT_OBJECT, sbus_fd.SBUS_FD_OUTPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_AND_METADATA,
sbus_fd.SBUS_FD_LOGGER, sbus_fd.SBUS_FD_OUTPUT_CONTAINER,
sbus_fd.SBUS_FD_OUTPUT_TASK_ID, sbus_fd.SBUS_FD_SERVICE_OUT,
sbus_fd.SBUS_FD_SERVICE_OUT,
]
@ -39,11 +40,11 @@ class TestSBusFileDescriptor(unittest.TestCase):
'storage': {'storage_key': 'storage_value'}},
fd.metadata)
def test_from_metadata_dict(self):
fd = SBusFileDescriptor.from_metadata_dict(
def test_from_fileno_and_metadata_dict(self):
fd = SBusFileDescriptor.from_fileno_and_metadata_dict(
1,
{'storlets': {'type': 'MYTYPE', 'storlets_key': 'storlets_value'},
'storage': {'storage_key': 'storage_value'},
'fileno': 1})
'storage': {'storage_key': 'storage_value'}})
self.assertEqual(1, fd.fileno)
self.assertEqual('MYTYPE', fd.fdtype)
self.assertEqual({'storlets_key': 'storlets_value'},
@ -53,7 +54,7 @@ class TestSBusFileDescriptor(unittest.TestCase):
class TestSBusDatagram(unittest.TestCase):
def test_check_required_fd_types_not_implemented(self):
def test_check_required_fdtypes_not_implemented(self):
# SBusDatagram designed not to be called independently
with self.assertRaises(NotImplementedError) as err:
SBusDatagram('', [], [])
@ -62,14 +63,14 @@ class TestSBusDatagram(unittest.TestCase):
err.exception.args[0])
def test_invalid_child_class_definition(self):
# no definition for _required_fd_types
# no definition for _required_fdtypes
class InvalidSBusDatagram(SBusDatagram):
pass
with self.assertRaises(NotImplementedError) as err:
InvalidSBusDatagram('', [], [])
self.assertEqual(
'SBusDatagram class should define _required_fd_types',
'SBusDatagram class should define _required_fdtypes',
err.exception.args[0])
@ -87,7 +88,7 @@ class SBusDatagramTestMixin(object):
self.assertEqual(self.task_id, self.dtg.task_id)
def test_num_fds(self):
self.assertEqual(len(self.types), self.dtg.num_fds)
self.assertEqual(len(self.fdtypes), self.dtg.num_fds)
def test_cmd_params(self):
self.assertEqual({'command': self.command,
@ -101,61 +102,61 @@ class SBusDatagramTestMixin(object):
'task_id': self.task_id}
self.assertEqual(res, json.loads(self.dtg.serialized_cmd_params))
def test_check_required_fd_types_mismatch(self):
invalid_types = (
def test_check_required_fdtypes_mismatch(self):
invalid_fdtypes_list = (
[], # empty list
['Invalid'] + self.types, # invalid type inserted at the first
['Invalid'] + self.fdtypes, # invalid type inserted at the first
# TODO(kota_): we may want *strict* check (not only checking first
# N items.
)
for invalid_type in invalid_types:
for invalid_fdtypes in invalid_fdtypes_list:
with self.assertRaises(ValueError) as cm:
self.dtg._check_required_fd_types(invalid_type)
self.dtg._check_required_fdtypes(invalid_fdtypes)
self.assertTrue(cm.exception.args[0].startswith(
'Fd type mismatch given_fd_types'))
'Fd type mismatch given_fdtypes'))
def test_find_fds(self):
# prepare all fd types and then pop out in the loop below
not_in_fd_types = ALL_FD_TYPES[:]
not_in_fdtypes = ALL_FDTYPES[:]
# N.B. fd should start from 1 (not 0), really?
for index, fd_type in enumerate(self.types, 1):
found_fds = self.dtg._find_fds(fd_type)
for index, fdtype in enumerate(self.fdtypes, 1):
found_fds = self.dtg._find_fds(fdtype)
# at least 1 fd should be found
self.assertTrue(found_fds)
# and the index is in the types
self.assertIn(index, found_fds)
if fd_type in not_in_fd_types:
# N.B. ALL_FD_TYPES should be unique list
not_in_fd_types.remove(fd_type)
if fdtype in not_in_fdtypes:
# N.B. ALL_FDTYPES should be unique list
not_in_fdtypes.remove(fdtype)
# sanity, not a fd type results in []
self.assertEqual([], self.dtg._find_fds('DUMMY_TYPE'))
# sanity, no other types are found
for fd_type in not_in_fd_types:
self.assertEqual([], self.dtg._find_fds(fd_type))
for fdtype in not_in_fdtypes:
self.assertEqual([], self.dtg._find_fds(fdtype))
def test_find_fd(self):
# prepare all fd types and then pop out in the loop below
not_in_fd_types = ALL_FD_TYPES[:]
not_in_fdtypes = ALL_FDTYPES[:]
# N.B. fd should start from 1 (not 0), really?
for index, fd_type in enumerate(self.types, 1):
found_fd = self.dtg._find_fd(fd_type)
for index, fdtype in enumerate(self.fdtypes, 1):
found_fd = self.dtg._find_fd(fdtype)
# at least 1 fd should be found
self.assertEqual(index, found_fd)
if fd_type in not_in_fd_types:
# N.B. ALL_FD_TYPES should be unique list
not_in_fd_types.remove(fd_type)
if fdtype in not_in_fdtypes:
# N.B. ALL_FDTYPES should be unique list
not_in_fdtypes.remove(fdtype)
# sanity, not a fd type results in None
self.assertIsNone(self.dtg._find_fd('DUMMY_TYPE'))
# sanity, no other types are found
for fd_type in not_in_fd_types:
self.assertIsNone(self.dtg._find_fd(fd_type))
for fdtype in not_in_fdtypes:
self.assertIsNone(self.dtg._find_fd(fdtype))
class TestSBusServiceDatagram(SBusDatagramTestMixin, unittest.TestCase):
@ -163,7 +164,7 @@ class TestSBusServiceDatagram(SBusDatagramTestMixin, unittest.TestCase):
def setUp(self):
self.command = 'SBUS_CMD_TEST'
self.types = [sbus_fd.SBUS_FD_SERVICE_OUT]
self.fdtypes = [sbus_fd.SBUS_FD_SERVICE_OUT]
self.sfds = [SBusFileDescriptor(sbus_fd.SBUS_FD_SERVICE_OUT, 1)]
super(TestSBusServiceDatagram, self).setUp()
@ -176,67 +177,111 @@ class TestSBusExecuteDatagram(SBusDatagramTestMixin, unittest.TestCase):
def setUp(self):
self.command = SBUS_CMD_EXECUTE
self.types = [sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_TASK_ID,
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
sbus_fd.SBUS_FD_LOGGER]
self.fdtypes = [sbus_fd.SBUS_FD_SERVICE_OUT,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
sbus_fd.SBUS_FD_LOGGER]
self.sfds = [SBusFileDescriptor(
self.types[i], i + 1,
fdtype, i + 1,
{'key%d' % i: 'value%d' % i},
{'skey%d' % i: 'svalue%d' % i})
for i in range(len(self.types))]
for i, fdtype in enumerate(self.fdtypes)]
super(TestSBusExecuteDatagram, self).setUp()
def test_init_extra_sources(self):
types = [sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_TASK_ID,
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
sbus_fd.SBUS_FD_LOGGER,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_INPUT_OBJECT]
fds = [SBusFileDescriptor(types[i], i + 1,
fdtypes = [sbus_fd.SBUS_FD_SERVICE_OUT,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
sbus_fd.SBUS_FD_LOGGER,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_INPUT_OBJECT]
fds = [SBusFileDescriptor(fdtype, i + 1,
{'key%d' % i: 'value%d' % i},
{'skey%d' % i: 'svalue%d' % i})
for i in range(len(types))]
for i, fdtype in enumerate(fdtypes)]
dtg = self._test_class(
self.command, fds, self.params, self.task_id)
self.assertEqual(types, [sfd.fdtype for sfd in dtg.sfds])
self.assertEqual(fdtypes, [sfd.fdtype for sfd in dtg.sfds])
self.assertEqual(self.params, dtg.params)
self.assertEqual(self.task_id, dtg.task_id)
def test_service_out_fd(self):
self.assertEqual(1, self.dtg.service_out_fd)
def test_invocation_fds(self):
self.assertEqual([2, 3, 4, 5], self.dtg.invocation_fds)
def test_object_out_fds(self):
self.assertEqual([3], self.dtg.object_out_fds)
def test_object_metadata_out_fds(self):
self.assertEqual([4], self.dtg.object_metadata_out_fds)
def test_task_id_out_fd(self):
self.assertEqual(2, self.dtg.task_id_out_fd)
def test_logger_out_fd(self):
self.assertEqual(5, self.dtg.logger_out_fd)
def test_object_in_fds(self):
self.assertEqual([1], self.dtg.object_in_fds)
self.assertEqual([2], self.dtg.object_in_fds)
def test_check_required_fd_types_reverse_order_failed(self):
types = self.types[:]
types.reverse() # reverse order
def test_check_required_fdtypes_reverse_order_failed(self):
fdtypes = self.fdtypes[:]
fdtypes.reverse() # reverse order
with self.assertRaises(ValueError) as cm:
self.dtg._check_required_fd_types(types)
self.dtg._check_required_fdtypes(fdtypes)
self.assertTrue(
cm.exception.args[0].startswith('Fd type mismatch given_fd_types'))
cm.exception.args[0].startswith('Fd type mismatch given_fdtypes'))
class TestBuildDatagramFromRawMessage(unittest.TestCase):
class TestBuildDatagram(unittest.TestCase):
def test_build_datagram(self):
# SBusServiceDatagram scenario
command = SBUS_CMD_PING
fdtypes = [sbus_fd.SBUS_FD_SERVICE_OUT]
fds = [SBusFileDescriptor(sbus_fd.SBUS_FD_SERVICE_OUT, 1)]
params = {'param1': 'paramvalue1'}
task_id = 'id'
dtg = build_datagram(command, fds, params, task_id)
self.assertIsInstance(dtg, SBusServiceDatagram)
self.assertEqual(command, dtg.command)
self.assertEqual(fdtypes, [sfd.fdtype for sfd in dtg.sfds])
self.assertEqual(params, dtg.params)
self.assertEqual(task_id, dtg.task_id)
# SBusExecuteDatagram scenario
command = SBUS_CMD_EXECUTE
fdtypes = [sbus_fd.SBUS_FD_SERVICE_OUT,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
sbus_fd.SBUS_FD_LOGGER,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_INPUT_OBJECT]
fds = [SBusFileDescriptor(fdtype, i + 1,
{'key%d' % i: 'value%d' % i},
{'skey%d' % i: 'svalue%d' % i})
for i, fdtype in enumerate(fdtypes)]
params = {'param1': 'paramvalue1'}
task_id = 'id'
dtg = build_datagram(command, fds, params, task_id)
self.assertIsInstance(dtg, SBusExecuteDatagram)
self.assertEqual(command, dtg.command)
self.assertEqual(fdtypes, [sfd.fdtype for sfd in dtg.sfds])
self.assertEqual(params, dtg.params)
self.assertEqual(task_id, dtg.task_id)
def test_build_datagram_from_raw_message(self):
# SBusServiceDatagram scenario
command = SBUS_CMD_PING
types = [sbus_fd.SBUS_FD_SERVICE_OUT]
fdtypes = [sbus_fd.SBUS_FD_SERVICE_OUT]
fds = [SBusFileDescriptor(sbus_fd.SBUS_FD_SERVICE_OUT, 1)]
params = {'param1': 'paramvalue1'}
task_id = 'id'
@ -247,25 +292,26 @@ class TestBuildDatagramFromRawMessage(unittest.TestCase):
dtg = build_datagram_from_raw_message(fds, str_metadata,
str_cmd_params)
self.assertIsInstance(dtg, SBusServiceDatagram)
self.assertEqual(command, dtg.command)
self.assertEqual(types, [sfd.fdtype for sfd in dtg.sfds])
self.assertEqual(fdtypes, [sfd.fdtype for sfd in dtg.sfds])
self.assertEqual(params, dtg.params)
self.assertEqual(task_id, dtg.task_id)
# SBusExecuteDatagram scenario
command = SBUS_CMD_EXECUTE
types = [sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_TASK_ID,
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
sbus_fd.SBUS_FD_LOGGER,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_INPUT_OBJECT]
fds = [SBusFileDescriptor(types[i], i + 1,
fdtypes = [sbus_fd.SBUS_FD_SERVICE_OUT,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
sbus_fd.SBUS_FD_LOGGER,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_INPUT_OBJECT,
sbus_fd.SBUS_FD_INPUT_OBJECT]
fds = [SBusFileDescriptor(fdtype, i + 1,
{'key%d' % i: 'value%d' % i},
{'skey%d' % i: 'svalue%d' % i})
for i in range(len(types))]
for i, fdtype in enumerate(fdtypes)]
params = {'param1': 'paramvalue1'}
task_id = 'id'
cmd_params = {'command': command, 'params': params, 'task_id': task_id}
@ -275,8 +321,9 @@ class TestBuildDatagramFromRawMessage(unittest.TestCase):
dtg = build_datagram_from_raw_message(fds, str_metadata,
str_cmd_params)
self.assertIsInstance(dtg, SBusExecuteDatagram)
self.assertEqual(command, dtg.command)
self.assertEqual(types, [sfd.fdtype for sfd in dtg.sfds])
self.assertEqual(fdtypes, [sfd.fdtype for sfd in dtg.sfds])
self.assertEqual(params, dtg.params)
self.assertEqual(task_id, dtg.task_id)