diff --git a/oslo_privsep/daemon.py b/oslo_privsep/daemon.py index 38e7cb9..94b9a7c 100644 --- a/oslo_privsep/daemon.py +++ b/oslo_privsep/daemon.py @@ -43,6 +43,7 @@ The privsep daemon exits when the communication channel is closed, ''' +from concurrent import futures import enum import errno import io @@ -64,6 +65,7 @@ import eventlet from oslo_config import cfg from oslo_log import log as logging from oslo_utils import importutils +import six from oslo_privsep._i18n import _ from oslo_privsep import capabilities @@ -352,6 +354,9 @@ class Daemon(object): self.user = context.conf.user self.group = context.conf.group self.caps = set(context.conf.capabilities) + self.thread_pool = futures.ThreadPoolExecutor( + context.conf.thread_pool_size) + self.communication_error = None def run(self): """Run request loop. Sets up environment, then calls loop()""" @@ -413,22 +418,75 @@ class Daemon(object): 'inh': fmt_caps(inh), }) - def _process_cmd(self, cmd, *args): + def _process_cmd(self, msgid, cmd, *args): + """Executes the requested command in an execution thread. + + This executes a call within a thread executor and returns the results + of the execution. + + :param msgid: The message identifier. + :param cmd: The `Message` type indicating the command type. + :param args: The function, args, and kwargs if a Message.CALL type. + :return: A tuple of the return status, optional call output, and + optional error information. + """ if cmd == Message.PING: return (Message.PONG.value,) - elif cmd == Message.CALL: + try: + if cmd != Message.CALL: + raise ProtocolError(_('Unknown privsep cmd: %s') % cmd) + + # Extract the callable and arguments name, f_args, f_kwargs = args func = importutils.import_class(name) - if not self.context.is_entrypoint(func): msg = _('Invalid privsep function: %s not exported') % name raise NameError(msg) ret = func(*f_args, **f_kwargs) return (Message.RET.value, ret) + except Exception as e: + LOG.debug( + 'privsep: Exception during request[%(msgid)s]: ' + '%(err)s', {'msgid': msgid, 'err': e}, exc_info=True) + cls = e.__class__ + cls_name = '%s.%s' % (cls.__module__, cls.__name__) + return (Message.ERR.value, cls_name, e.args) - raise ProtocolError(_('Unknown privsep cmd: %s') % cmd) + def _create_done_callback(self, msgid): + """Creates a future callback to receive command execution results. + + :param msgid: The message identifier. + :return: A future reply callback. + """ + channel = self.channel + + def _call_back(result): + """Future execution callback. + + :param result: The `future` execution and its results. + """ + try: + reply = result.result() + LOG.debug('privsep: reply[%(msgid)s]: %(reply)s', + {'msgid': msgid, 'reply': reply}) + channel.send((msgid, reply)) + except IOError: + self.communication_error = sys.exc_info() + except Exception as e: + LOG.debug( + 'privsep: Exception during request[%(msgid)s]: ' + '%(err)s', {'msgid': msgid, 'err': e}, exc_info=True) + cls = e.__class__ + cls_name = '%s.%s' % (cls.__module__, cls.__name__) + reply = (Message.ERR.value, cls_name, e.args) + try: + channel.send((msgid, reply)) + except IOError: + self.communication_error = sys.exc_info() + + return _call_back def loop(self): """Main body of daemon request loop""" @@ -439,27 +497,16 @@ class Daemon(object): self.context.set_client_mode(False) for msgid, msg in self.channel: - LOG.debug('privsep: request[%(msgid)s]: %(req)s', - {'msgid': msgid, 'req': msg}) - try: - reply = self._process_cmd(*msg) - except Exception as e: - LOG.debug( - 'privsep: Exception during request[%(msgid)s]: %(err)s', - {'msgid': msgid, 'err': e}, exc_info=True) - cls = e.__class__ - cls_name = '%s.%s' % (cls.__module__, cls.__name__) - reply = (Message.ERR.value, cls_name, e.args) - - try: - LOG.debug('privsep: reply[%(msgid)s]: %(reply)s', - {'msgid': msgid, 'reply': reply}) - self.channel.send((msgid, reply)) - except IOError as e: - if e.errno == errno.EPIPE: + error = self.communication_error + if error: + if error[1].errno == errno.EPIPE: # Write stream closed, exit loop break - raise + six.reraise(error) + + # Submit the command for execution + future = self.thread_pool.submit(self._process_cmd, msgid, *msg) + future.add_done_callback(self._create_done_callback(msgid)) LOG.debug('Socket closed, shutting down privsep daemon') diff --git a/oslo_privsep/priv_context.py b/oslo_privsep/priv_context.py index 1036701..1b44861 100644 --- a/oslo_privsep/priv_context.py +++ b/oslo_privsep/priv_context.py @@ -16,6 +16,7 @@ import enum import functools import logging +import multiprocessing import shlex import sys @@ -48,6 +49,12 @@ OPTS = [ type=types.List(CapNameOrInt), default=[], help=_('List of Linux capabilities retained by the privsep ' 'daemon.')), + cfg.IntOpt('thread_pool_size', + min=1, + help=_("The number of threads available for privsep to " + "concurrently run processes. Defaults to the number of " + "CPU cores in the system."), + default=multiprocessing.cpu_count()), cfg.StrOpt('helper_command', help=_('Command to invoke to start the privsep daemon if ' 'not using the "fork" method. ' diff --git a/oslo_privsep/tests/test_daemon.py b/oslo_privsep/tests/test_daemon.py index ba8ed92..4ad7666 100644 --- a/oslo_privsep/tests/test_daemon.py +++ b/oslo_privsep/tests/test_daemon.py @@ -149,6 +149,7 @@ class DaemonTest(base.BaseTestCase): context = mock.NonCallableMock() context.conf.user = 42 context.conf.group = 84 + context.conf.thread_pool_size = 10 context.conf.capabilities = [ capabilities.CAP_SYS_ADMIN, capabilities.CAP_NET_ADMIN] diff --git a/releasenotes/notes/add_thread_pool_size-a54e6f27ab019f96.yaml b/releasenotes/notes/add_thread_pool_size-a54e6f27ab019f96.yaml new file mode 100644 index 0000000..92fa9db --- /dev/null +++ b/releasenotes/notes/add_thread_pool_size-a54e6f27ab019f96.yaml @@ -0,0 +1,7 @@ +--- +features: + - | + Privsep now uses multithreading to allow concurrency in executing + privileged commands. The number of concurrent threads defaults to the + available CPU cores, but can be adjusted by the new ``thread_pool_size`` + config option.