Browse Source

Merge "Wrap rpc server into oslo.service"

Zuul 5 months ago
parent
commit
e5d5bfa196

+ 2
- 0
devstack/plugin.sh View File

@@ -267,6 +267,8 @@ function configure_inspector {
267 267
     inspector_iniset iptables dnsmasq_interface $IRONIC_INSPECTOR_INTERFACE
268 268
     inspector_iniset database connection `database_connection_url ironic_inspector`
269 269
 
270
+    iniset_rpc_backend ironic-inspector $IRONIC_INSPECTOR_CONF_FILE
271
+
270 272
     if is_service_enabled swift; then
271 273
         configure_inspector_swift
272 274
     fi

+ 3
- 0
devstack/upgrade/upgrade.sh View File

@@ -45,6 +45,7 @@ source $TARGET_DEVSTACK_DIR/lib/neutron-legacy
45 45
 source $TARGET_DEVSTACK_DIR/lib/apache
46 46
 source $TARGET_DEVSTACK_DIR/lib/keystone
47 47
 source $TARGET_DEVSTACK_DIR/lib/database
48
+source $TARGET_DEVSTACK_DIR/lib/rpc_backend
48 49
 
49 50
 # Inspector relies on couple of Ironic variables
50 51
 source $TARGET_RELEASE_DIR/ironic/devstack/lib/ironic
@@ -84,6 +85,8 @@ $IRONIC_INSPECTOR_DBSYNC_BIN_FILE --config-file $IRONIC_INSPECTOR_CONF_FILE upgr
84 85
 # calls upgrade inspector for specific release
85 86
 upgrade_project ironic-inspector $RUN_DIR $BASE_DEVSTACK_BRANCH $TARGET_DEVSTACK_BRANCH
86 87
 
88
+# setup transport_url for rpc messaging
89
+iniset_rpc_backend ironic-inspector $IRONIC_INSPECTOR_CONF_FILE
87 90
 
88 91
 start_inspector
89 92
 if is_inspector_dhcp_required; then

+ 11
- 18
ironic_inspector/common/rpc.py View File

@@ -19,38 +19,31 @@ from oslo_messaging.rpc import dispatcher
19 19
 from ironic_inspector.conductor import manager
20 20
 
21 21
 CONF = cfg.CONF
22
-
23
-_SERVER = None
24 22
 TRANSPORT = None
25
-TOPIC = 'ironic-inspector-worker'
26
-SERVER_NAME = 'ironic-inspector-rpc-server'
27 23
 
28 24
 
29 25
 def get_transport():
30 26
     global TRANSPORT
31 27
 
32 28
     if TRANSPORT is None:
33
-        TRANSPORT = messaging.get_rpc_transport(CONF, url='fake://')
29
+        TRANSPORT = messaging.get_rpc_transport(CONF)
34 30
     return TRANSPORT
35 31
 
36 32
 
37 33
 def get_client():
38
-    target = messaging.Target(topic=TOPIC, server=SERVER_NAME,
34
+    """Get a RPC client instance."""
35
+    target = messaging.Target(topic=manager.MANAGER_TOPIC, server=CONF.host,
39 36
                               version='1.1')
40 37
     transport = get_transport()
41 38
     return messaging.RPCClient(transport, target)
42 39
 
43 40
 
44
-def get_server():
45
-    """Get the singleton RPC server."""
46
-    global _SERVER
41
+def get_server(endpoints):
42
+    """Get a RPC server instance."""
47 43
 
48
-    if _SERVER is None:
49
-        transport = get_transport()
50
-        target = messaging.Target(topic=TOPIC, server=SERVER_NAME,
51
-                                  version='1.1')
52
-        mgr = manager.ConductorManager()
53
-        _SERVER = messaging.get_rpc_server(
54
-            transport, target, [mgr], executor='eventlet',
55
-            access_policy=dispatcher.DefaultRPCAccessPolicy)
56
-    return _SERVER
44
+    transport = get_transport()
45
+    target = messaging.Target(topic=manager.MANAGER_TOPIC, server=CONF.host,
46
+                              version='1.1')
47
+    return messaging.get_rpc_server(
48
+        transport, target, endpoints, executor='eventlet',
49
+        access_policy=dispatcher.DefaultRPCAccessPolicy)

+ 62
- 0
ironic_inspector/common/rpc_service.py View File

@@ -0,0 +1,62 @@
1
+# Licensed under the Apache License, Version 2.0 (the "License");
2
+# you may not use this file except in compliance with the License.
3
+# You may obtain a copy of the License at
4
+#
5
+#    http://www.apache.org/licenses/LICENSE-2.0
6
+#
7
+# Unless required by applicable law or agreed to in writing, software
8
+# distributed under the License is distributed on an "AS IS" BASIS,
9
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
10
+# implied.
11
+# See the License for the specific language governing permissions and
12
+# limitations under the License.
13
+
14
+from oslo_config import cfg
15
+from oslo_log import log
16
+from oslo_service import service
17
+
18
+from ironic_inspector.common import rpc
19
+from ironic_inspector.conductor import manager
20
+
21
+CONF = cfg.CONF
22
+LOG = log.getLogger(__name__)
23
+
24
+SERVER_NAME = 'ironic-inspector-rpc-server'
25
+
26
+
27
+class RPCService(service.Service):
28
+
29
+    def __init__(self, host):
30
+        super(RPCService, self).__init__()
31
+        self.host = host
32
+        self.manager = manager.ConductorManager()
33
+        self.rpcserver = None
34
+
35
+    def start(self):
36
+        super(RPCService, self).start()
37
+        self.rpcserver = rpc.get_server([self.manager])
38
+        self.rpcserver.start()
39
+
40
+        self.manager.init_host()
41
+        LOG.info('Created RPC server for service %(service)s on host '
42
+                 '%(host)s.',
43
+                 {'service': manager.MANAGER_TOPIC, 'host': self.host})
44
+
45
+    def stop(self):
46
+        try:
47
+            self.rpcserver.stop()
48
+            self.rpcserver.wait()
49
+        except Exception as e:
50
+            LOG.exception('Service error occurred when stopping the '
51
+                          'RPC server. Error: %s', e)
52
+
53
+        try:
54
+            self.manager.del_host()
55
+        except Exception as e:
56
+            LOG.exception('Service error occurred when cleaning up '
57
+                          'the RPC manager. Error: %s', e)
58
+
59
+        super(RPCService, self).stop(graceful=True)
60
+        LOG.info('Stopped RPC server for service %(service)s on host '
61
+                 '%(host)s.',
62
+                 {'service': manager.MANAGER_TOPIC, 'host': self.host})

+ 108
- 0
ironic_inspector/conductor/manager.py View File

@@ -11,12 +11,30 @@
11 11
 # See the License for the specific language governing permissions and
12 12
 # limitations under the License.
13 13
 
14
+import sys
15
+import traceback as traceback_mod
16
+
17
+import eventlet
18
+from eventlet import semaphore
19
+from futurist import periodics
20
+from oslo_config import cfg
21
+from oslo_log import log
14 22
 import oslo_messaging as messaging
23
+from oslo_utils import reflection
15 24
 
25
+from ironic_inspector.common import ironic as ir_utils
26
+from ironic_inspector import db
16 27
 from ironic_inspector import introspect
28
+from ironic_inspector import node_cache
29
+from ironic_inspector.plugins import base as plugins_base
17 30
 from ironic_inspector import process
31
+from ironic_inspector.pxe_filter import base as pxe_filter
18 32
 from ironic_inspector import utils
19 33
 
34
+LOG = log.getLogger(__name__)
35
+CONF = cfg.CONF
36
+MANAGER_TOPIC = 'ironic-inspector-conductor'
37
+
20 38
 
21 39
 class ConductorManager(object):
22 40
     """ironic inspector conductor manager"""
@@ -24,6 +42,79 @@ class ConductorManager(object):
24 42
 
25 43
     target = messaging.Target(version=RPC_API_VERSION)
26 44
 
45
+    def __init__(self):
46
+        self._periodics_worker = None
47
+        self._shutting_down = semaphore.Semaphore()
48
+
49
+    def init_host(self):
50
+        """Initialize Worker host
51
+
52
+        Init db connection, load and validate processing
53
+        hooks, runs periodic tasks.
54
+
55
+        :returns None
56
+        """
57
+        if CONF.processing.store_data == 'none':
58
+            LOG.warning('Introspection data will not be stored. Change '
59
+                        '"[processing] store_data" option if this is not '
60
+                        'the desired behavior')
61
+        elif CONF.processing.store_data == 'swift':
62
+            LOG.info('Introspection data will be stored in Swift in the '
63
+                     'container %s', CONF.swift.container)
64
+
65
+        db.init()
66
+
67
+        try:
68
+            hooks = plugins_base.validate_processing_hooks()
69
+        except Exception as exc:
70
+            LOG.critical(str(exc))
71
+            sys.exit(1)
72
+        LOG.info('Enabled processing hooks: %s', [h.name for h in hooks])
73
+
74
+        driver = pxe_filter.driver()
75
+        driver.init_filter()
76
+
77
+        periodic_clean_up_ = periodics.periodic(
78
+            spacing=CONF.clean_up_period
79
+        )(periodic_clean_up)
80
+
81
+        self._periodics_worker = periodics.PeriodicWorker(
82
+            callables=[(driver.get_periodic_sync_task(), None, None),
83
+                       (periodic_clean_up_, None, None)],
84
+            executor_factory=periodics.ExistingExecutor(utils.executor()),
85
+            on_failure=self._periodics_watchdog)
86
+        utils.executor().submit(self._periodics_worker.start)
87
+
88
+    def del_host(self):
89
+
90
+        if not self._shutting_down.acquire(blocking=False):
91
+            LOG.warning('Attempted to shut down while already shutting down')
92
+            return
93
+
94
+        pxe_filter.driver().tear_down_filter()
95
+        if self._periodics_worker is not None:
96
+            try:
97
+                self._periodics_worker.stop()
98
+                self._periodics_worker.wait()
99
+            except Exception as e:
100
+                LOG.exception('Service error occurred when stopping '
101
+                              'periodic workers. Error: %s', e)
102
+            self._periodics_worker = None
103
+
104
+        if utils.executor().alive:
105
+            utils.executor().shutdown(wait=True)
106
+
107
+        self._shutting_down.release()
108
+        LOG.info('Shut down successfully')
109
+
110
+    def _periodics_watchdog(self, callable_, activity, spacing, exc_info,
111
+                            traceback=None):
112
+        LOG.exception("The periodic %(callable)s failed with: %(exception)s", {
113
+            'exception': ''.join(traceback_mod.format_exception(*exc_info)),
114
+            'callable': reflection.get_callable_name(callable_)})
115
+        # NOTE(milan): spawn new thread otherwise waiting would block
116
+        eventlet.spawn(self.del_host)
117
+
27 118
     @messaging.expected_exceptions(utils.Error)
28 119
     def do_introspection(self, context, node_id, token=None,
29 120
                          manage_boot=True):
@@ -36,3 +127,20 @@ class ConductorManager(object):
36 127
     @messaging.expected_exceptions(utils.Error)
37 128
     def do_reapply(self, context, node_id, token=None):
38 129
         process.reapply(node_id)
130
+
131
+
132
+def periodic_clean_up():  # pragma: no cover
133
+    try:
134
+        if node_cache.clean_up():
135
+            pxe_filter.driver().sync(ir_utils.get_client())
136
+        sync_with_ironic()
137
+    except Exception:
138
+        LOG.exception('Periodic clean up of node cache failed')
139
+
140
+
141
+def sync_with_ironic():
142
+    ironic = ir_utils.get_client()
143
+    # TODO(yuikotakada): pagination
144
+    ironic_nodes = ironic.node.list(limit=0)
145
+    ironic_node_uuids = {node.uuid for node in ironic_nodes}
146
+    node_cache.delete_nodes_not_in_list(ironic_node_uuids)

+ 10
- 0
ironic_inspector/conf/default.py View File

@@ -11,6 +11,8 @@
11 11
 # See the License for the specific language governing permissions and
12 12
 # limitations under the License.
13 13
 
14
+import socket
15
+
14 16
 from oslo_config import cfg
15 17
 
16 18
 from ironic_inspector.common.i18n import _
@@ -23,6 +25,14 @@ _OPTS = [
23 25
     cfg.PortOpt('listen_port',
24 26
                 default=5050,
25 27
                 help=_('Port to listen on.')),
28
+    cfg.StrOpt('host',
29
+               default=socket.getfqdn(),
30
+               sample_default='localhost',
31
+               help=_('Name of this node. This can be an opaque identifier. '
32
+                      'It is not necessarily a hostname, FQDN, or IP address. '
33
+                      'However, the node name must be valid within '
34
+                      'an AMQP key, and if using ZeroMQ, a valid '
35
+                      'hostname, FQDN, or IP address.')),
26 36
     cfg.StrOpt('auth_strategy',
27 37
                default='keystone',
28 38
                choices=('keystone', 'noauth'),

+ 1
- 0
ironic_inspector/test/functional.py View File

@@ -57,6 +57,7 @@ driver = noop
57 57
 debug = True
58 58
 introspection_delay = 0
59 59
 auth_strategy=noauth
60
+transport_url=fake://
60 61
 [database]
61 62
 connection = sqlite:///%(db_file)s
62 63
 [processing]

+ 203
- 0
ironic_inspector/test/unit/test_manager.py View File

@@ -11,6 +11,7 @@
11 11
 # See the License for the specific language governing permissions and
12 12
 # limitations under the License.
13 13
 
14
+import fixtures
14 15
 import mock
15 16
 import oslo_messaging as messaging
16 17
 
@@ -27,11 +28,213 @@ CONF = ironic_inspector.conf.CONF
27 28
 class BaseManagerTest(test_base.NodeTest):
28 29
     def setUp(self):
29 30
         super(BaseManagerTest, self).setUp()
31
+        self.mock_log = self.useFixture(fixtures.MockPatchObject(
32
+            manager, 'LOG')).mock
33
+        self.mock__shutting_down = (self.useFixture(fixtures.MockPatchObject(
34
+            manager.semaphore, 'Semaphore', autospec=True))
35
+            .mock.return_value)
36
+        self.mock__shutting_down.acquire.return_value = True
30 37
         self.manager = manager.ConductorManager()
31 38
         self.context = {}
32 39
         self.token = None
33 40
 
34 41
 
42
+class TestManagerInitHost(BaseManagerTest):
43
+    def setUp(self):
44
+        super(TestManagerInitHost, self).setUp()
45
+        self.mock_db_init = self.useFixture(fixtures.MockPatchObject(
46
+            manager.db, 'init')).mock
47
+        self.mock_validate_processing_hooks = self.useFixture(
48
+            fixtures.MockPatchObject(manager.plugins_base,
49
+                                     'validate_processing_hooks')).mock
50
+        self.mock_filter = self.useFixture(fixtures.MockPatchObject(
51
+            manager.pxe_filter, 'driver')).mock.return_value
52
+        self.mock_periodic = self.useFixture(fixtures.MockPatchObject(
53
+            manager.periodics, 'periodic')).mock
54
+        self.mock_PeriodicWorker = self.useFixture(fixtures.MockPatchObject(
55
+            manager.periodics, 'PeriodicWorker')).mock
56
+        self.mock_executor = self.useFixture(fixtures.MockPatchObject(
57
+            manager.utils, 'executor')).mock
58
+        self.mock_ExistingExecutor = self.useFixture(fixtures.MockPatchObject(
59
+            manager.periodics, 'ExistingExecutor')).mock
60
+        self.mock_exit = self.useFixture(fixtures.MockPatchObject(
61
+            manager.sys, 'exit')).mock
62
+
63
+    def assert_periodics(self):
64
+        outer_cleanup_decorator_call = mock.call(
65
+            spacing=CONF.clean_up_period)
66
+        self.mock_periodic.assert_has_calls([
67
+            outer_cleanup_decorator_call,
68
+            mock.call()(manager.periodic_clean_up)])
69
+
70
+        inner_decorator = self.mock_periodic.return_value
71
+        inner_cleanup_decorator_call = mock.call(
72
+            manager.periodic_clean_up)
73
+        inner_decorator.assert_has_calls([inner_cleanup_decorator_call])
74
+
75
+        self.mock_ExistingExecutor.assert_called_once_with(
76
+            self.mock_executor.return_value)
77
+
78
+        periodic_worker = self.mock_PeriodicWorker.return_value
79
+
80
+        periodic_sync = self.mock_filter.get_periodic_sync_task.return_value
81
+        callables = [(periodic_sync, None, None),
82
+                     (inner_decorator.return_value, None, None)]
83
+        self.mock_PeriodicWorker.assert_called_once_with(
84
+            callables=callables,
85
+            executor_factory=self.mock_ExistingExecutor.return_value,
86
+            on_failure=self.manager._periodics_watchdog)
87
+        self.assertIs(periodic_worker, self.manager._periodics_worker)
88
+
89
+        self.mock_executor.return_value.submit.assert_called_once_with(
90
+            self.manager._periodics_worker.start)
91
+
92
+    def test_no_introspection_data_store(self):
93
+        CONF.set_override('store_data', 'none', 'processing')
94
+        self.manager.init_host()
95
+        self.mock_log.warning.assert_called_once_with(
96
+            'Introspection data will not be stored. Change "[processing] '
97
+            'store_data" option if this is not the desired behavior')
98
+
99
+    def test_init_host(self):
100
+        self.manager.init_host()
101
+        self.mock_db_init.asset_called_once_with()
102
+        self.mock_validate_processing_hooks.assert_called_once_with()
103
+        self.mock_filter.init_filter.assert_called_once_with()
104
+        self.assert_periodics()
105
+
106
+    def test_init_host_validate_processing_hooks_exception(self):
107
+        class MyError(Exception):
108
+            pass
109
+
110
+        error = MyError('Oops!')
111
+        self.mock_validate_processing_hooks.side_effect = error
112
+
113
+        # NOTE(milan): have to stop executing the test case at this point to
114
+        # simulate a real sys.exit() call
115
+        self.mock_exit.side_effect = SystemExit('Stop!')
116
+        self.assertRaisesRegex(SystemExit, 'Stop!', self.manager.init_host)
117
+
118
+        self.mock_db_init.assert_called_once_with()
119
+        self.mock_log.critical.assert_called_once_with(str(error))
120
+        self.mock_exit.assert_called_once_with(1)
121
+        self.mock_filter.init_filter.assert_not_called()
122
+
123
+
124
+class TestManagerDelHost(BaseManagerTest):
125
+    def setUp(self):
126
+        super(TestManagerDelHost, self).setUp()
127
+        self.mock_filter = self.useFixture(fixtures.MockPatchObject(
128
+            manager.pxe_filter, 'driver')).mock.return_value
129
+        self.mock_executor = mock.Mock()
130
+        self.mock_executor.alive = True
131
+        self.mock_get_executor = self.useFixture(fixtures.MockPatchObject(
132
+            manager.utils, 'executor')).mock
133
+        self.mock_get_executor.return_value = self.mock_executor
134
+        self.mock__periodic_worker = self.useFixture(fixtures.MockPatchObject(
135
+            self.manager, '_periodics_worker')).mock
136
+        self.mock_exit = self.useFixture(fixtures.MockPatchObject(
137
+            manager.sys, 'exit')).mock
138
+
139
+    def test_del_host(self):
140
+        self.manager.del_host()
141
+
142
+        self.mock__shutting_down.acquire.assert_called_once_with(
143
+            blocking=False)
144
+        self.mock__periodic_worker.stop.assert_called_once_with()
145
+        self.mock__periodic_worker.wait.assert_called_once_with()
146
+        self.assertIsNone(self.manager._periodics_worker)
147
+        self.mock_executor.shutdown.assert_called_once_with(wait=True)
148
+        self.mock_filter.tear_down_filter.assert_called_once_with()
149
+        self.mock__shutting_down.release.assert_called_once_with()
150
+
151
+    def test_del_host_race(self):
152
+        self.mock__shutting_down.acquire.return_value = False
153
+
154
+        self.manager.del_host()
155
+
156
+        self.mock__shutting_down.acquire.assert_called_once_with(
157
+            blocking=False)
158
+        self.mock_log.warning.assert_called_once_with(
159
+            'Attempted to shut down while already shutting down')
160
+        self.mock__periodic_worker.stop.assert_not_called()
161
+        self.mock__periodic_worker.wait.assert_not_called()
162
+        self.assertIs(self.mock__periodic_worker,
163
+                      self.manager._periodics_worker)
164
+        self.mock_executor.shutdown.assert_not_called()
165
+        self.mock_filter.tear_down_filter.assert_not_called()
166
+        self.mock__shutting_down.release.assert_not_called()
167
+        self.mock_exit.assert_not_called()
168
+
169
+    def test_del_host_worker_exception(self):
170
+        class MyError(Exception):
171
+            pass
172
+
173
+        error = MyError('Oops!')
174
+        self.mock__periodic_worker.wait.side_effect = error
175
+
176
+        self.manager.del_host()
177
+
178
+        self.mock__shutting_down.acquire.assert_called_once_with(
179
+            blocking=False)
180
+        self.mock__periodic_worker.stop.assert_called_once_with()
181
+        self.mock__periodic_worker.wait.assert_called_once_with()
182
+        self.mock_log.exception.assert_called_once_with(
183
+            'Service error occurred when stopping periodic workers. Error: %s',
184
+            error)
185
+        self.assertIsNone(self.manager._periodics_worker)
186
+        self.mock_executor.shutdown.assert_called_once_with(wait=True)
187
+        self.mock_filter.tear_down_filter.assert_called_once_with()
188
+        self.mock__shutting_down.release.assert_called_once_with()
189
+
190
+    def test_del_host_no_worker(self):
191
+        self.manager._periodics_worker = None
192
+
193
+        self.manager.del_host()
194
+
195
+        self.mock__shutting_down.acquire.assert_called_once_with(
196
+            blocking=False)
197
+        self.mock__periodic_worker.stop.assert_not_called()
198
+        self.mock__periodic_worker.wait.assert_not_called()
199
+        self.assertIsNone(self.manager._periodics_worker)
200
+        self.mock_executor.shutdown.assert_called_once_with(wait=True)
201
+        self.mock_filter.tear_down_filter.assert_called_once_with()
202
+        self.mock__shutting_down.release.assert_called_once_with()
203
+
204
+    def test_del_host_stopped_executor(self):
205
+        self.mock_executor.alive = False
206
+
207
+        self.manager.del_host()
208
+
209
+        self.mock__shutting_down.acquire.assert_called_once_with(
210
+            blocking=False)
211
+        self.mock__periodic_worker.stop.assert_called_once_with()
212
+        self.mock__periodic_worker.wait.assert_called_once_with()
213
+        self.assertIsNone(self.manager._periodics_worker)
214
+        self.mock_executor.shutdown.assert_not_called()
215
+        self.mock_filter.tear_down_filter.assert_called_once_with()
216
+        self.mock__shutting_down.release.assert_called_once_with()
217
+
218
+
219
+class TestManagerPeriodicWatchDog(BaseManagerTest):
220
+    def setUp(self):
221
+        super(TestManagerPeriodicWatchDog, self).setUp()
222
+        self.mock_get_callable_name = self.useFixture(fixtures.MockPatchObject(
223
+            manager.reflection, 'get_callable_name')).mock
224
+        self.mock_spawn = self.useFixture(fixtures.MockPatchObject(
225
+            manager.eventlet, 'spawn')).mock
226
+
227
+    def test__periodics_watchdog(self):
228
+        error = RuntimeError('Oops!')
229
+
230
+        self.manager._periodics_watchdog(
231
+            callable_=None, activity=None, spacing=None,
232
+            exc_info=(None, error, None), traceback=None)
233
+
234
+        self.mock_get_callable_name.assert_called_once_with(None)
235
+        self.mock_spawn.assert_called_once_with(self.manager.del_host)
236
+
237
+
35 238
 class TestManagerIntrospect(BaseManagerTest):
36 239
     @mock.patch.object(introspect, 'introspect', autospec=True)
37 240
     def test_do_introspect(self, introspect_mock):

+ 3
- 211
ironic_inspector/test/unit/test_wsgi_service.py View File

@@ -20,11 +20,9 @@ import fixtures
20 20
 import mock
21 21
 from oslo_config import cfg
22 22
 
23
-from ironic_inspector.common import rpc
24 23
 from ironic_inspector.test import base as test_base
25 24
 from ironic_inspector import wsgi_service
26 25
 
27
-
28 26
 CONF = cfg.CONF
29 27
 
30 28
 
@@ -34,15 +32,9 @@ class BaseWSGITest(test_base.BaseTest):
34 32
         super(BaseWSGITest, self).setUp()
35 33
         self.app = self.useFixture(fixtures.MockPatchObject(
36 34
             wsgi_service.app, 'app', autospec=True)).mock
37
-        self.mock__shutting_down = (self.useFixture(fixtures.MockPatchObject(
38
-            wsgi_service.semaphore, 'Semaphore', autospec=True))
39
-            .mock.return_value)
40
-        self.mock__shutting_down.acquire.return_value = True
41 35
         self.mock_log = self.useFixture(fixtures.MockPatchObject(
42 36
             wsgi_service, 'LOG')).mock
43 37
         self.service = wsgi_service.WSGIService()
44
-        self.mock_rpc_server = self.useFixture(fixtures.MockPatchObject(
45
-            rpc, 'get_server')).mock
46 38
 
47 39
 
48 40
 class TestWSGIServiceInitMiddleware(BaseWSGITest):
@@ -73,118 +65,10 @@ class TestWSGIServiceInitMiddleware(BaseWSGITest):
73 65
             'Starting unauthenticated, please check configuration')
74 66
         self.mock_add_cors_middleware.assert_called_once_with(self.app)
75 67
 
76
-    def test_init_middleware_no_store(self):
77
-        CONF.set_override('store_data', 'none', 'processing')
78
-        self.service._init_middleware()
79
-
80
-        self.mock_add_auth_middleware.assert_called_once_with(self.app)
81
-        self.mock_log.warning.assert_called_once_with(
82
-            'Introspection data will not be stored. Change "[processing] '
83
-            'store_data" option if this is not the desired behavior')
84
-        self.mock_add_cors_middleware.assert_called_once_with(self.app)
85
-
86
-
87
-class TestWSGIServiceInitHost(BaseWSGITest):
88
-    def setUp(self):
89
-        super(TestWSGIServiceInitHost, self).setUp()
90
-        self.mock_db_init = self.useFixture(fixtures.MockPatchObject(
91
-            wsgi_service.db, 'init')).mock
92
-        self.mock_validate_processing_hooks = self.useFixture(
93
-            fixtures.MockPatchObject(wsgi_service.plugins_base,
94
-                                     'validate_processing_hooks')).mock
95
-        self.mock_filter = self.useFixture(fixtures.MockPatchObject(
96
-            wsgi_service.pxe_filter, 'driver')).mock.return_value
97
-        self.mock_periodic = self.useFixture(fixtures.MockPatchObject(
98
-            wsgi_service.periodics, 'periodic')).mock
99
-        self.mock_PeriodicWorker = self.useFixture(fixtures.MockPatchObject(
100
-            wsgi_service.periodics, 'PeriodicWorker')).mock
101
-        self.mock_executor = self.useFixture(fixtures.MockPatchObject(
102
-            wsgi_service.utils, 'executor')).mock
103
-        self.mock_ExistingExecutor = self.useFixture(fixtures.MockPatchObject(
104
-            wsgi_service.periodics, 'ExistingExecutor')).mock
105
-        self.mock_exit = self.useFixture(fixtures.MockPatchObject(
106
-            wsgi_service.sys, 'exit')).mock
107
-
108
-    def assert_periodics(self):
109
-        outer_cleanup_decorator_call = mock.call(
110
-            spacing=CONF.clean_up_period)
111
-        self.mock_periodic.assert_has_calls([
112
-            outer_cleanup_decorator_call,
113
-            mock.call()(wsgi_service.periodic_clean_up)])
114
-
115
-        inner_decorator = self.mock_periodic.return_value
116
-        inner_cleanup_decorator_call = mock.call(
117
-            wsgi_service.periodic_clean_up)
118
-        inner_decorator.assert_has_calls([inner_cleanup_decorator_call])
119
-
120
-        self.mock_ExistingExecutor.assert_called_once_with(
121
-            self.mock_executor.return_value)
122
-
123
-        periodic_worker = self.mock_PeriodicWorker.return_value
124
-
125
-        periodic_sync = self.mock_filter.get_periodic_sync_task.return_value
126
-        callables = [(periodic_sync, None, None),
127
-                     (inner_decorator.return_value, None, None)]
128
-        self.mock_PeriodicWorker.assert_called_once_with(
129
-            callables=callables,
130
-            executor_factory=self.mock_ExistingExecutor.return_value,
131
-            on_failure=self.service._periodics_watchdog)
132
-        self.assertIs(periodic_worker, self.service._periodics_worker)
133
-
134
-        self.mock_executor.return_value.submit.assert_called_once_with(
135
-            self.service._periodics_worker.start)
136
-
137
-    def test_init_host(self):
138
-        self.service._init_host()
139
-
140
-        self.mock_db_init.asset_called_once_with()
141
-        self.mock_validate_processing_hooks.assert_called_once_with()
142
-        self.mock_filter.init_filter.assert_called_once_with()
143
-        self.assert_periodics()
144
-
145
-    def test_init_host_validate_processing_hooks_exception(self):
146
-        class MyError(Exception):
147
-            pass
148
-
149
-        error = MyError('Oops!')
150
-        self.mock_validate_processing_hooks.side_effect = error
151
-
152
-        # NOTE(milan): have to stop executing the test case at this point to
153
-        # simulate a real sys.exit() call
154
-        self.mock_exit.side_effect = SystemExit('Stop!')
155
-        self.assertRaisesRegex(SystemExit, 'Stop!', self.service._init_host)
156
-
157
-        self.mock_db_init.assert_called_once_with()
158
-        self.mock_log.critical.assert_called_once_with(str(error))
159
-        self.mock_exit.assert_called_once_with(1)
160
-        self.mock_filter.init_filter.assert_not_called()
161
-
162
-
163
-class TestWSGIServicePeriodicWatchDog(BaseWSGITest):
164
-    def setUp(self):
165
-        super(TestWSGIServicePeriodicWatchDog, self).setUp()
166
-        self.mock_get_callable_name = self.useFixture(fixtures.MockPatchObject(
167
-            wsgi_service.reflection, 'get_callable_name')).mock
168
-        self.mock_spawn = self.useFixture(fixtures.MockPatchObject(
169
-            wsgi_service.eventlet, 'spawn')).mock
170
-
171
-    def test__periodics_watchdog(self):
172
-        error = RuntimeError('Oops!')
173
-
174
-        self.service._periodics_watchdog(
175
-            callable_=None, activity=None, spacing=None,
176
-            exc_info=(None, error, None), traceback=None)
177
-
178
-        self.mock_get_callable_name.assert_called_once_with(None)
179
-        self.mock_spawn.assert_called_once_with(self.service.shutdown,
180
-                                                error=str(error))
181
-
182 68
 
183 69
 class TestWSGIServiceRun(BaseWSGITest):
184 70
     def setUp(self):
185 71
         super(TestWSGIServiceRun, self).setUp()
186
-        self.mock__init_host = self.useFixture(fixtures.MockPatchObject(
187
-            self.service, '_init_host')).mock
188 72
         self.mock__init_middleware = self.useFixture(fixtures.MockPatchObject(
189 73
             self.service, '_init_middleware')).mock
190 74
         self.mock__create_ssl_context = self.useFixture(
@@ -201,9 +85,6 @@ class TestWSGIServiceRun(BaseWSGITest):
201 85
 
202 86
         self.mock__create_ssl_context.assert_called_once_with()
203 87
         self.mock__init_middleware.assert_called_once_with()
204
-        self.mock__init_host.assert_called_once_with()
205
-        self.mock_rpc_server.assert_called_once_with()
206
-        self.service.rpc_server.start.assert_called_once_with()
207 88
         self.app.run.assert_called_once_with(
208 89
             host=CONF.listen_address, port=CONF.listen_port,
209 90
             ssl_context=self.mock__create_ssl_context.return_value)
@@ -215,7 +96,6 @@ class TestWSGIServiceRun(BaseWSGITest):
215 96
         self.service.run()
216 97
         self.mock__create_ssl_context.assert_called_once_with()
217 98
         self.mock__init_middleware.assert_called_once_with()
218
-        self.mock__init_host.assert_called_once_with()
219 99
         self.app.run.assert_called_once_with(
220 100
             host=CONF.listen_address, port=CONF.listen_port)
221 101
         self.mock_shutdown.assert_called_once_with()
@@ -230,7 +110,6 @@ class TestWSGIServiceRun(BaseWSGITest):
230 110
 
231 111
         self.mock__create_ssl_context.assert_called_once_with()
232 112
         self.mock__init_middleware.assert_called_once_with()
233
-        self.mock__init_host.assert_called_once_with()
234 113
         self.app.run.assert_called_once_with(
235 114
             host=CONF.listen_address, port=CONF.listen_port,
236 115
             ssl_context=self.mock__create_ssl_context.return_value)
@@ -240,108 +119,21 @@ class TestWSGIServiceRun(BaseWSGITest):
240 119
 class TestWSGIServiceShutdown(BaseWSGITest):
241 120
     def setUp(self):
242 121
         super(TestWSGIServiceShutdown, self).setUp()
243
-        self.mock_filter = self.useFixture(fixtures.MockPatchObject(
244
-            wsgi_service.pxe_filter, 'driver')).mock.return_value
245
-        self.mock_executor = mock.Mock()
246
-        self.mock_executor.alive = True
247
-        self.mock_get_executor = self.useFixture(fixtures.MockPatchObject(
248
-            wsgi_service.utils, 'executor')).mock
249
-        self.mock_get_executor.return_value = self.mock_executor
250 122
         self.service = wsgi_service.WSGIService()
251
-        self.mock__periodic_worker = self.useFixture(fixtures.MockPatchObject(
252
-            self.service, '_periodics_worker')).mock
123
+        self.mock_rpc_service = mock.MagicMock()
124
+        self.service.rpc_service = self.mock_rpc_service
253 125
         self.mock_exit = self.useFixture(fixtures.MockPatchObject(
254 126
             wsgi_service.sys, 'exit')).mock
255
-        self.service.rpc_server = self.mock_rpc_server
256 127
 
257 128
     def test_shutdown(self):
258 129
         class MyError(Exception):
259 130
             pass
260
-
261 131
         error = MyError('Oops!')
262 132
 
263 133
         self.service.shutdown(error=error)
264
-
265
-        self.mock__shutting_down.acquire.assert_called_once_with(
266
-            blocking=False)
267
-        self.mock__periodic_worker.stop.assert_called_once_with()
268
-        self.mock__periodic_worker.wait.assert_called_once_with()
269
-        self.assertIsNone(self.service._periodics_worker)
270
-        self.mock_executor.shutdown.assert_called_once_with(wait=True)
271
-        self.mock_filter.tear_down_filter.assert_called_once_with()
272
-        self.mock__shutting_down.release.assert_called_once_with()
134
+        self.mock_rpc_service.stop.assert_called_once_with()
273 135
         self.mock_exit.assert_called_once_with(error)
274 136
 
275
-    def test_shutdown_race(self):
276
-        self.mock__shutting_down.acquire.return_value = False
277
-
278
-        self.service.shutdown()
279
-
280
-        self.mock__shutting_down.acquire.assert_called_once_with(
281
-            blocking=False)
282
-        self.mock_log.warning.assert_called_once_with(
283
-            'Attempted to shut down while already shutting down')
284
-        self.mock__periodic_worker.stop.assert_not_called()
285
-        self.mock__periodic_worker.wait.assert_not_called()
286
-        self.assertIs(self.mock__periodic_worker,
287
-                      self.service._periodics_worker)
288
-        self.mock_executor.shutdown.assert_not_called()
289
-        self.mock_filter.tear_down_filter.assert_not_called()
290
-        self.mock__shutting_down.release.assert_not_called()
291
-        self.mock_exit.assert_not_called()
292
-
293
-    def test_shutdown_worker_exception(self):
294
-        class MyError(Exception):
295
-            pass
296
-
297
-        error = MyError('Oops!')
298
-        self.mock__periodic_worker.wait.side_effect = error
299
-
300
-        self.service.shutdown()
301
-
302
-        self.mock__shutting_down.acquire.assert_called_once_with(
303
-            blocking=False)
304
-        self.mock__periodic_worker.stop.assert_called_once_with()
305
-        self.mock__periodic_worker.wait.assert_called_once_with()
306
-        self.mock_log.exception.assert_called_once_with(
307
-            'Service error occurred when stopping periodic workers. Error: %s',
308
-            error)
309
-        self.assertIsNone(self.service._periodics_worker)
310
-        self.mock_executor.shutdown.assert_called_once_with(wait=True)
311
-        self.mock_filter.tear_down_filter.assert_called_once_with()
312
-        self.mock__shutting_down.release.assert_called_once_with()
313
-        self.mock_exit.assert_called_once_with(None)
314
-
315
-    def test_shutdown_no_worker(self):
316
-        self.service._periodics_worker = None
317
-
318
-        self.service.shutdown()
319
-
320
-        self.mock__shutting_down.acquire.assert_called_once_with(
321
-            blocking=False)
322
-        self.mock__periodic_worker.stop.assert_not_called()
323
-        self.mock__periodic_worker.wait.assert_not_called()
324
-        self.assertIsNone(self.service._periodics_worker)
325
-        self.mock_executor.shutdown.assert_called_once_with(wait=True)
326
-        self.mock_filter.tear_down_filter.assert_called_once_with()
327
-        self.mock__shutting_down.release.assert_called_once_with()
328
-        self.mock_exit.assert_called_once_with(None)
329
-
330
-    def test_shutdown_stopped_executor(self):
331
-        self.mock_executor.alive = False
332
-
333
-        self.service.shutdown()
334
-
335
-        self.mock__shutting_down.acquire.assert_called_once_with(
336
-            blocking=False)
337
-        self.mock__periodic_worker.stop.assert_called_once_with()
338
-        self.mock__periodic_worker.wait.assert_called_once_with()
339
-        self.assertIsNone(self.service._periodics_worker)
340
-        self.mock_executor.shutdown.assert_not_called()
341
-        self.mock_filter.tear_down_filter.assert_called_once_with()
342
-        self.mock__shutting_down.release.assert_called_once_with()
343
-        self.mock_exit.assert_called_once_with(None)
344
-
345 137
 
346 138
 class TestCreateSSLContext(test_base.BaseTest):
347 139
     def setUp(self):

+ 8
- 109
ironic_inspector/wsgi_service.py View File

@@ -13,25 +13,16 @@
13 13
 import signal
14 14
 import ssl
15 15
 import sys
16
-import traceback as traceback_mod
17 16
 
18 17
 import eventlet
19
-from eventlet import semaphore
20
-from futurist import periodics
21 18
 from oslo_config import cfg
22 19
 from oslo_log import log
23
-from oslo_utils import reflection
20
+from oslo_service import service
24 21
 
25
-from ironic_inspector.common import ironic as ir_utils
26
-from ironic_inspector.common import rpc
27
-from ironic_inspector import db
22
+from ironic_inspector.common.rpc_service import RPCService
28 23
 from ironic_inspector import main as app
29
-from ironic_inspector import node_cache
30
-from ironic_inspector.plugins import base as plugins_base
31
-from ironic_inspector.pxe_filter import base as pxe_filter
32 24
 from ironic_inspector import utils
33 25
 
34
-
35 26
 LOG = log.getLogger(__name__)
36 27
 CONF = cfg.CONF
37 28
 
@@ -41,10 +32,9 @@ class WSGIService(object):
41 32
 
42 33
     def __init__(self):
43 34
         self.app = app.app
44
-        self._periodics_worker = None
45
-        self._shutting_down = semaphore.Semaphore()
46 35
         signal.signal(signal.SIGHUP, self._handle_sighup)
47 36
         signal.signal(signal.SIGTERM, self._handle_sigterm)
37
+        self.rpc_service = RPCService(CONF.host)
48 38
 
49 39
     def _init_middleware(self):
50 40
         """Initialize WSGI middleware.
@@ -57,15 +47,6 @@ class WSGIService(object):
57 47
         else:
58 48
             LOG.warning('Starting unauthenticated, please check'
59 49
                         ' configuration')
60
-
61
-        # TODO(aarefiev): move to WorkerService once we split service
62
-        if CONF.processing.store_data == 'none':
63
-            LOG.warning('Introspection data will not be stored. Change '
64
-                        '"[processing] store_data" option if this is not '
65
-                        'the desired behavior')
66
-        elif CONF.processing.store_data == 'swift':
67
-            LOG.info('Introspection data will be stored in Swift in the '
68
-                     'container %s', CONF.swift.container)
69 50
         utils.add_cors_middleware(self.app)
70 51
 
71 52
     def _create_ssl_context(self):
@@ -99,77 +80,13 @@ class WSGIService(object):
99 80
                             'settings: %s', exc)
100 81
         return context
101 82
 
102
-    # TODO(aarefiev): move init code to WorkerService
103
-    def _init_host(self):
104
-        """Initialize Worker host
105
-
106
-        Init db connection, load and validate processing
107
-        hooks, runs periodic tasks.
108
-
109
-        :returns None
110
-        """
111
-        db.init()
112
-
113
-        try:
114
-            hooks = plugins_base.validate_processing_hooks()
115
-        except Exception as exc:
116
-            LOG.critical(str(exc))
117
-            sys.exit(1)
118
-
119
-        LOG.info('Enabled processing hooks: %s', [h.name for h in hooks])
120
-
121
-        driver = pxe_filter.driver()
122
-        driver.init_filter()
123
-
124
-        periodic_clean_up_ = periodics.periodic(
125
-            spacing=CONF.clean_up_period
126
-        )(periodic_clean_up)
127
-
128
-        self._periodics_worker = periodics.PeriodicWorker(
129
-            callables=[(driver.get_periodic_sync_task(), None, None),
130
-                       (periodic_clean_up_, None, None)],
131
-            executor_factory=periodics.ExistingExecutor(utils.executor()),
132
-            on_failure=self._periodics_watchdog)
133
-        utils.executor().submit(self._periodics_worker.start)
134
-
135
-    def _periodics_watchdog(self, callable_, activity, spacing, exc_info,
136
-                            traceback=None):
137
-        LOG.exception("The periodic %(callable)s failed with: %(exception)s", {
138
-            'exception': ''.join(traceback_mod.format_exception(*exc_info)),
139
-            'callable': reflection.get_callable_name(callable_)})
140
-        # NOTE(milan): spawn new thread otherwise waiting would block
141
-        eventlet.spawn(self.shutdown, error=str(exc_info[1]))
142
-
143 83
     def shutdown(self, error=None):
144
-        """Stop serving API, clean up.
84
+        """Stop serving API.
145 85
 
146 86
         :returns: None
147 87
         """
148
-        # TODO(aarefiev): move shutdown code to WorkerService
149
-        if not self._shutting_down.acquire(blocking=False):
150
-            LOG.warning('Attempted to shut down while already shutting down')
151
-            return
152
-
153 88
         LOG.debug('Shutting down')
154
-
155
-        self.rpc_server.stop()
156
-
157
-        if self._periodics_worker is not None:
158
-            try:
159
-                self._periodics_worker.stop()
160
-                self._periodics_worker.wait()
161
-            except Exception as e:
162
-                LOG.exception('Service error occurred when stopping '
163
-                              'periodic workers. Error: %s', e)
164
-            self._periodics_worker = None
165
-
166
-        if utils.executor().alive:
167
-            utils.executor().shutdown(wait=True)
168
-
169
-        pxe_filter.driver().tear_down_filter()
170
-
171
-        self._shutting_down.release()
172
-        LOG.info('Shut down successfully')
89
+        self.rpc_service.stop()
173 90
         sys.exit(error)
174 91
 
175 92
     def run(self):
@@ -186,10 +103,9 @@ class WSGIService(object):
186 103
 
187 104
         self._init_middleware()
188 105
 
189
-        self._init_host()
190
-
191
-        self.rpc_server = rpc.get_server()
192
-        self.rpc_server.start()
106
+        LOG.info('Spawning RPC service')
107
+        service.launch(CONF, self.rpc_service,
108
+                       restart_method='mutate')
193 109
 
194 110
         try:
195 111
             self.app.run(**app_kwargs)
@@ -210,20 +126,3 @@ class WSGIService(object):
210 126
         # SIGTERM. Raising KeyboardIntrerrupt which won't be caught by any
211 127
         # 'except Exception' clauses.
212 128
         raise KeyboardInterrupt
213
-
214
-
215
-def periodic_clean_up():  # pragma: no cover
216
-    try:
217
-        if node_cache.clean_up():
218
-            pxe_filter.driver().sync(ir_utils.get_client())
219
-        sync_with_ironic()
220
-    except Exception:
221
-        LOG.exception('Periodic clean up of node cache failed')
222
-
223
-
224
-def sync_with_ironic():
225
-    ironic = ir_utils.get_client()
226
-    # TODO(yuikotakada): pagination
227
-    ironic_nodes = ironic.node.list(limit=0)
228
-    ironic_node_uuids = {node.uuid for node in ironic_nodes}
229
-    node_cache.delete_nodes_not_in_list(ironic_node_uuids)

+ 7
- 0
releasenotes/notes/rpc-backends-0e7405aa1c7723a0.yaml View File

@@ -0,0 +1,7 @@
1
+---
2
+upgrade:
3
+  - |
4
+    Adds rpc related configuration options for the communication between
5
+    ironic-inspector API and worker. It needs to be configured properly
6
+    during upgrade. Set ``[DEFAULT]transport_url`` to ``fake://`` if a
7
+    rpc backend is not available or not desired.

+ 1
- 0
tools/config-generator.conf View File

@@ -6,3 +6,4 @@ namespace = oslo.db
6 6
 namespace = oslo.log
7 7
 namespace = oslo.middleware.cors
8 8
 namespace = oslo.policy
9
+namespace = oslo.messaging

Loading…
Cancel
Save