OpenStack library for rootwrap
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

daemon.py 7.2KB


  1. # Copyright (c) 2014 Mirantis Inc.
  2. # All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. from __future__ import print_function
  16. import functools
  17. import logging
  18. from multiprocessing import managers
  19. import os
  20. import shutil
  21. import signal
  22. import six
  23. import stat
  24. import sys
  25. import tempfile
  26. import threading
  27. import time
  28. from oslo_rootwrap import cmd
  29. from oslo_rootwrap import jsonrpc
  30. from oslo_rootwrap import subprocess
  31. from oslo_rootwrap import wrapper
  32. LOG = logging.getLogger(__name__)
  33. # Since multiprocessing supports only pickle and xmlrpclib for serialization of
  34. # RPC requests and responses, we declare another 'jsonrpc' serializer
  35. managers.listener_client['jsonrpc'] = jsonrpc.JsonListener, jsonrpc.JsonClient
  36. class RootwrapClass(object):
  37. def __init__(self, config, filters):
  38. self.config = config
  39. self.filters = filters
  40. self.reset_timer()
  41. self.prepare_timer(config)
  42. def run_one_command(self, userargs, stdin=None):
  43. self.reset_timer()
  44. try:
  45. obj = wrapper.start_subprocess(
  46. self.filters, userargs,
  47. exec_dirs=self.config.exec_dirs,
  48. log=self.config.use_syslog,
  49. close_fds=True,
  50. stdin=subprocess.PIPE,
  51. stdout=subprocess.PIPE,
  52. stderr=subprocess.PIPE)
  53. except wrapper.FilterMatchNotExecutable:
  54. LOG.warning("Executable not found for: %s",
  55. ' '.join(userargs))
  56. return cmd.RC_NOEXECFOUND, "", ""
  57. except wrapper.NoFilterMatched:
  58. LOG.warning("Unauthorized command: %s (no filter matched)",
  59. ' '.join(userargs))
  60. return cmd.RC_UNAUTHORIZED, "", ""
  61. if six.PY3 and stdin is not None:
  62. stdin = os.fsencode(stdin)
  63. out, err = obj.communicate(stdin)
  64. if six.PY3:
  65. out = os.fsdecode(out)
  66. err = os.fsdecode(err)
  67. return obj.returncode, out, err
  68. @classmethod
  69. def reset_timer(cls):
  70. cls.last_called = time.time()
  71. @classmethod
  72. def cancel_timer(cls):
  73. try:
  74. cls.timeout.cancel()
  75. except RuntimeError:
  76. pass
  77. @classmethod
  78. def prepare_timer(cls, config=None):
  79. if config is not None:
  80. cls.daemon_timeout = config.daemon_timeout
  81. # Wait a bit longer to avoid rounding errors
  82. timeout = max(
  83. cls.last_called + cls.daemon_timeout - time.time(),
  84. 0) + 1
  85. if getattr(cls, 'timeout', None):
  86. # Another timer is already initialized
  87. return
  88. cls.timeout = threading.Timer(timeout, cls.handle_timeout)
  89. cls.timeout.start()
  90. @classmethod
  91. def handle_timeout(cls):
  92. if cls.last_called < time.time() - cls.daemon_timeout:
  93. cls.shutdown()
  94. cls.prepare_timer()
  95. @staticmethod
  96. def shutdown():
  97. # Suicide to force break of the main thread
  98. os.kill(os.getpid(), signal.SIGINT)
  99. def get_manager_class(config=None, filters=None):
  100. class RootwrapManager(managers.BaseManager):
  101. def __init__(self, address=None, authkey=None):
  102. # Force jsonrpc because neither pickle nor xmlrpclib is secure
  103. super(RootwrapManager, self).__init__(address, authkey,
  104. serializer='jsonrpc')
  105. if config is not None:
  106. partial_class = functools.partial(RootwrapClass, config, filters)
  107. RootwrapManager.register('rootwrap', partial_class)
  108. else:
  109. RootwrapManager.register('rootwrap')
  110. return RootwrapManager
  111. def daemon_start(config, filters):
  112. temp_dir = tempfile.mkdtemp(prefix='rootwrap-')
  113. LOG.debug("Created temporary directory %s", temp_dir)
  114. try:
  115. # allow everybody to find the socket
  116. rwxr_xr_x = (stat.S_IRWXU |
  117. stat.S_IRGRP | stat.S_IXGRP |
  118. stat.S_IROTH | stat.S_IXOTH)
  119. os.chmod(temp_dir, rwxr_xr_x)
  120. socket_path = os.path.join(temp_dir, "rootwrap.sock")
  121. LOG.debug("Will listen on socket %s", socket_path)
  122. manager_cls = get_manager_class(config, filters)
  123. manager = manager_cls(address=socket_path)
  124. server = manager.get_server()
  125. try:
  126. # allow everybody to connect to the socket
  127. rw_rw_rw_ = (stat.S_IRUSR | stat.S_IWUSR |
  128. stat.S_IRGRP | stat.S_IWGRP |
  129. stat.S_IROTH | stat.S_IWOTH)
  130. os.chmod(socket_path, rw_rw_rw_)
  131. try:
  132. # In Python 3 we have to use buffer to push in bytes directly
  133. stdout = sys.stdout.buffer
  134. except AttributeError:
  135. stdout = sys.stdout
  136. stdout.write(socket_path.encode('utf-8'))
  137. stdout.write(b'\n')
  138. stdout.write(bytes(server.authkey))
  139. sys.stdin.close()
  140. sys.stdout.close()
  141. sys.stderr.close()
  142. # Gracefully shutdown on INT or TERM signals
  143. stop = functools.partial(daemon_stop, server)
  144. signal.signal(signal.SIGTERM, stop)
  145. signal.signal(signal.SIGINT, stop)
  146. LOG.info("Starting rootwrap daemon main loop")
  147. server.serve_forever()
  148. finally:
  149. conn = server.listener
  150. # This will break accept() loop with EOFError if it was not in the
  151. # main thread (as in Python 3.x)
  152. conn.close()
  153. # Closing all currently connected client sockets for reading to
  154. # break worker threads blocked on recv()
  155. for cl_conn in conn.get_accepted():
  156. try:
  157. cl_conn.half_close()
  158. except Exception:
  159. # Most likely the socket have already been closed
  160. LOG.debug("Failed to close connection")
  161. RootwrapClass.cancel_timer()
  162. LOG.info("Waiting for all client threads to finish.")
  163. for thread in threading.enumerate():
  164. if thread.daemon:
  165. LOG.debug("Joining thread %s", thread)
  166. thread.join()
  167. finally:
  168. LOG.debug("Removing temporary directory %s", temp_dir)
  169. shutil.rmtree(temp_dir)
  170. def daemon_stop(server, signal, frame):
  171. LOG.info("Got signal %s. Shutting down server", signal)
  172. # Signals are caught in the main thread which means this handler will run
  173. # in the middle of serve_forever() loop. It will catch this exception and
  174. # properly return. Since all threads created by server_forever are
  175. # daemonic, we need to join them afterwards. In Python 3 we can just hit
  176. # stop_event instead.
  177. try:
  178. server.stop_event.set()
  179. except AttributeError:
  180. raise KeyboardInterrupt