Browse Source

Merge "Use threads to process target function"

tags/1.31.0
Zuul 6 months ago
parent
commit
7b31740c09

+ 70
- 23
oslo_privsep/daemon.py View File

@@ -43,6 +43,7 @@ The privsep daemon exits when the communication channel is closed,
43 43
 
44 44
 '''
45 45
 
46
+from concurrent import futures
46 47
 import enum
47 48
 import errno
48 49
 import io
@@ -64,6 +65,7 @@ import eventlet
64 65
 from oslo_config import cfg
65 66
 from oslo_log import log as logging
66 67
 from oslo_utils import importutils
68
+import six
67 69
 
68 70
 from oslo_privsep._i18n import _
69 71
 from oslo_privsep import capabilities
@@ -352,6 +354,9 @@ class Daemon(object):
352 354
         self.user = context.conf.user
353 355
         self.group = context.conf.group
354 356
         self.caps = set(context.conf.capabilities)
357
+        self.thread_pool = futures.ThreadPoolExecutor(
358
+            context.conf.thread_pool_size)
359
+        self.communication_error = None
355 360
 
356 361
     def run(self):
357 362
         """Run request loop. Sets up environment, then calls loop()"""
@@ -413,22 +418,75 @@ class Daemon(object):
413 418
                 'inh': fmt_caps(inh),
414 419
             })
415 420
 
416
-    def _process_cmd(self, cmd, *args):
421
+    def _process_cmd(self, msgid, cmd, *args):
422
+        """Executes the requested command in an execution thread.
423
+
424
+        This executes a call within a thread executor and returns the results
425
+        of the execution.
426
+
427
+        :param msgid: The message identifier.
428
+        :param cmd: The `Message` type indicating the command type.
429
+        :param args: The function, args, and kwargs if a Message.CALL type.
430
+        :return: A tuple of the return status, optional call output, and
431
+                 optional error information.
432
+        """
417 433
         if cmd == Message.PING:
418 434
             return (Message.PONG.value,)
419 435
 
420
-        elif cmd == Message.CALL:
436
+        try:
437
+            if cmd != Message.CALL:
438
+                raise ProtocolError(_('Unknown privsep cmd: %s') % cmd)
439
+
440
+            # Extract the callable and arguments
421 441
             name, f_args, f_kwargs = args
422 442
             func = importutils.import_class(name)
423
-
424 443
             if not self.context.is_entrypoint(func):
425 444
                 msg = _('Invalid privsep function: %s not exported') % name
426 445
                 raise NameError(msg)
427 446
 
428 447
             ret = func(*f_args, **f_kwargs)
429 448
             return (Message.RET.value, ret)
449
+        except Exception as e:
450
+            LOG.debug(
451
+                'privsep: Exception during request[%(msgid)s]: '
452
+                '%(err)s', {'msgid': msgid, 'err': e}, exc_info=True)
453
+            cls = e.__class__
454
+            cls_name = '%s.%s' % (cls.__module__, cls.__name__)
455
+            return (Message.ERR.value, cls_name, e.args)
456
+
457
+    def _create_done_callback(self, msgid):
458
+        """Creates a future callback to receive command execution results.
459
+
460
+        :param msgid: The message identifier.
461
+        :return: A future reply callback.
462
+        """
463
+        channel = self.channel
464
+
465
+        def _call_back(result):
466
+            """Future execution callback.
467
+
468
+            :param result: The `future` execution and its results.
469
+            """
470
+            try:
471
+                reply = result.result()
472
+                LOG.debug('privsep: reply[%(msgid)s]: %(reply)s',
473
+                          {'msgid': msgid, 'reply': reply})
474
+                channel.send((msgid, reply))
475
+            except IOError:
476
+                self.communication_error = sys.exc_info()
477
+            except Exception as e:
478
+                LOG.debug(
479
+                    'privsep: Exception during request[%(msgid)s]: '
480
+                    '%(err)s', {'msgid': msgid, 'err': e}, exc_info=True)
481
+                cls = e.__class__
482
+                cls_name = '%s.%s' % (cls.__module__, cls.__name__)
483
+                reply = (Message.ERR.value, cls_name, e.args)
484
+                try:
485
+                    channel.send((msgid, reply))
486
+                except IOError:
487
+                    self.communication_error = sys.exc_info()
430 488
 
431
-        raise ProtocolError(_('Unknown privsep cmd: %s') % cmd)
489
+        return _call_back
432 490
 
433 491
     def loop(self):
434 492
         """Main body of daemon request loop"""
@@ -439,27 +497,16 @@ class Daemon(object):
439 497
         self.context.set_client_mode(False)
440 498
 
441 499
         for msgid, msg in self.channel:
442
-            LOG.debug('privsep: request[%(msgid)s]: %(req)s',
443
-                      {'msgid': msgid, 'req': msg})
444
-            try:
445
-                reply = self._process_cmd(*msg)
446
-            except Exception as e:
447
-                LOG.debug(
448
-                    'privsep: Exception during request[%(msgid)s]: %(err)s',
449
-                    {'msgid': msgid, 'err': e}, exc_info=True)
450
-                cls = e.__class__
451
-                cls_name = '%s.%s' % (cls.__module__, cls.__name__)
452
-                reply = (Message.ERR.value, cls_name, e.args)
453
-
454
-            try:
455
-                LOG.debug('privsep: reply[%(msgid)s]: %(reply)s',
456
-                          {'msgid': msgid, 'reply': reply})
457
-                self.channel.send((msgid, reply))
458
-            except IOError as e:
459
-                if e.errno == errno.EPIPE:
500
+            error = self.communication_error
501
+            if error:
502
+                if error[1].errno == errno.EPIPE:
460 503
                     # Write stream closed, exit loop
461 504
                     break
462
-                raise
505
+                six.reraise(error)
506
+
507
+            # Submit the command for execution
508
+            future = self.thread_pool.submit(self._process_cmd, msgid, *msg)
509
+            future.add_done_callback(self._create_done_callback(msgid))
463 510
 
464 511
         LOG.debug('Socket closed, shutting down privsep daemon')
465 512
 

+ 7
- 0
oslo_privsep/priv_context.py View File

@@ -16,6 +16,7 @@
16 16
 import enum
17 17
 import functools
18 18
 import logging
19
+import multiprocessing
19 20
 import shlex
20 21
 import sys
21 22
 
@@ -48,6 +49,12 @@ OPTS = [
48 49
             type=types.List(CapNameOrInt), default=[],
49 50
             help=_('List of Linux capabilities retained by the privsep '
50 51
                    'daemon.')),
52
+    cfg.IntOpt('thread_pool_size',
53
+               min=1,
54
+               help=_("The number of threads available for privsep to "
55
+                      "concurrently run processes. Defaults to the number of "
56
+                      "CPU cores in the system."),
57
+               default=multiprocessing.cpu_count()),
51 58
     cfg.StrOpt('helper_command',
52 59
                help=_('Command to invoke to start the privsep daemon if '
53 60
                       'not using the "fork" method. '

+ 1
- 0
oslo_privsep/tests/test_daemon.py View File

@@ -149,6 +149,7 @@ class DaemonTest(base.BaseTestCase):
149 149
         context = mock.NonCallableMock()
150 150
         context.conf.user = 42
151 151
         context.conf.group = 84
152
+        context.conf.thread_pool_size = 10
152 153
         context.conf.capabilities = [
153 154
             capabilities.CAP_SYS_ADMIN, capabilities.CAP_NET_ADMIN]
154 155
 

+ 7
- 0
releasenotes/notes/add_thread_pool_size-a54e6f27ab019f96.yaml View File

@@ -0,0 +1,7 @@
1
+---
2
+features:
3
+  - |
4
+    Privsep now uses multithreading to allow concurrency in executing
5
+    privileged commands. The number of concurrent threads defaults to the
6
+    available CPU cores, but can be adjusted by the new ``thread_pool_size``
7
+    config option.

Loading…
Cancel
Save