Browse Source

Add logic to stop repair scripts

When watchdog detects that repair script(s) have been killed, get
a list of scripts to nuke and pass to stop_repair_scripts. Then,
get its routing key(s), and send a message from a special user to
any queue listening on those keys.

Modified an example repair script to show how it could be killed,
but need a more concrete way that that. For now, messages from
'react_killer' will raise the RepairStoppedException, which will
stop react scripts

Modified the example engine cfg to have some details about the
kombu connection to use.

Implements blueprint kill-repair-scripts
Change-Id: I67e15e9b9ebb5d36c5cb0e01995bc95f7a73b3dd
Pranesh Pandurangan 4 years ago
parent
commit
5b647e5f79

+ 2
- 0
entropy/__main__.py View File

@@ -54,6 +54,8 @@ def _add_to_list(engine, script_type, script_name, **script_args):
54 54
         }
55 55
         backend.add_script(script_type, data)
56 56
         return True
57
+    except KeyError:
58
+        LOG.exception("No %s script called %s", script_type, script_name)
57 59
     except Exception:
58 60
         LOG.exception("Could not register %s script %s", script_type,
59 61
                       script_name)

+ 54
- 1
entropy/engine.py View File

@@ -24,7 +24,10 @@ import tempfile
24 24
 
25 25
 from concurrent import futures as cf
26 26
 import croniter
27
+from kombu import BrokerConnection
28
+from kombu.common import maybe_declare
27 29
 from kombu import Exchange
30
+from kombu.pools import producers
28 31
 from kombu import Queue
29 32
 import pause
30 33
 import six
@@ -84,6 +87,14 @@ class Engine(object):
84 87
         # State related variables
85 88
         self._state = states.ENABLED
86 89
 
90
+        # Variables for mq.
91
+        self._mq_args = {
92
+            'mq_user': cfg_data['mq_user'],
93
+            'mq_password': cfg_data['mq_password'],
94
+            'mq_host': cfg_data['mq_host'],
95
+            'mq_port': cfg_data['mq_port']
96
+        }
97
+
87 98
         LOG.info('Created engine obj %s', self.name)
88 99
 
89 100
     # TODO(praneshp): Move to utils?
@@ -249,7 +260,10 @@ class Engine(object):
249 260
                     repairs_to_delete.append(repair)
250 261
         LOG.info('Will add new repairs: %s', new_repairs)
251 262
         LOG.info('Will nuke repairs: %s', repairs_to_delete)
252
-        self.futures.extend(self.start_react_scripts(new_repairs))
263
+        if new_repairs:
264
+            self.futures.extend(self.start_react_scripts(new_repairs))
265
+        if repairs_to_delete:
266
+            self.stop_react_scripts(repairs_to_delete)
253 267
 
254 268
     def start_watchdog(self):
255 269
         LOG.debug('Watchdog mapping is: ', self._watchdog_event_fn)
@@ -286,6 +300,45 @@ class Engine(object):
286 300
         repairs = self._backend_driver.get_repairs()
287 301
         return repairs
288 302
 
303
+    def stop_react_scripts(self, repairs_to_stop):
304
+        # current react scripts
305
+        LOG.info("Currently running react scripts: %s", self._repairs)
306
+        for repair in repairs_to_stop:
307
+            self.stop_react(repair)
308
+        # react scripts at the end
309
+        LOG.info("Currently running react scripts: %s", self._repairs)
310
+
311
+    def stop_react(self, repair):
312
+        LOG.info("Stopping react script %s", repair)
313
+        # Get what the keywords are
314
+        routing_key = self._known_routing_keys[repair]
315
+        # remove the repair script from our known set.
316
+        self._known_routing_keys.pop(repair)
317
+        # put out a special message, repair script will see that and die.
318
+        self._send_killer_message(routing_key)
319
+        LOG.info("Stopped react script %s", repair)
320
+
321
+    def _send_killer_message(self, routing_key):
322
+        # NOTE(praneshp): routing_key is a list
323
+        # TODO(praneshp): we'll figure out a way to do this better.
324
+        connection = BrokerConnection('amqp://%(mq_user)s:%(mq_password)s@'
325
+                                      '%(mq_host)s:%(mq_port)s//' %
326
+                                      self._mq_args)
327
+        message = {'From': 'repair_killer',
328
+                   'Date': str(datetime.datetime.now().isoformat())}
329
+
330
+        with producers[connection].acquire(block=True) as producer:
331
+            try:
332
+                maybe_declare(self.entropy_exchange, producer.channel)
333
+                for rk in routing_key:
334
+                    producer.publish(message,
335
+                                     exchange=self.entropy_exchange,
336
+                                     routing_key=rk,
337
+                                     serializer='json')
338
+                    LOG.debug("React killer published message")
339
+            except Exception:
340
+                LOG.exception("React killer could not send message")
341
+
289 342
     def start_react_scripts(self, repairs):
290 343
         futures = []
291 344
         if repairs:

+ 4
- 0
entropy/examples/cfg/test.cfg View File

@@ -6,3 +6,7 @@ test:
6 6
     serializer_schedule: "*/2 * * * *"
7 7
     engine_timeout: "25"
8 8
     backend: file
9
+    mq_host: "localhost"
10
+    mq_port: "5672"
11
+    mq_user: "guest"
12
+    mq_password: "guest"

+ 5
- 1
entropy/examples/repair/react.py View File

@@ -17,6 +17,8 @@ import logging
17 17
 from kombu import BrokerConnection
18 18
 from kombu.mixins import ConsumerMixin
19 19
 
20
+from entropy import exceptions
21
+
20 22
 LOG = logging.getLogger(__name__)
21 23
 
22 24
 
@@ -33,6 +35,8 @@ class SomeConsumer(ConsumerMixin):
33 35
     def on_message(self, body, message):
34 36
         LOG.warning("React script %s received message: %r", self.name, body)
35 37
         message.ack()
38
+        if body['From'] == 'repair_killer':
39
+            raise exceptions.RepairStopException
36 40
         return
37 41
 
38 42
 
@@ -42,7 +46,7 @@ def receive_message(**kwargs):
42 46
     with connection as conn:
43 47
         try:
44 48
             SomeConsumer(conn, **kwargs).run()
45
-        except KeyboardInterrupt:
49
+        except (KeyboardInterrupt, exceptions.RepairStopException):
46 50
             LOG.warning('Quitting %s' % __name__)
47 51
 
48 52
 

+ 4
- 0
entropy/exceptions.py View File

@@ -49,3 +49,7 @@ class NoEnginesException(EntropyException):
49 49
 
50 50
 class SerializerException(EntropyException):
51 51
     """Exception raised when the serializer fails."""
52
+
53
+
54
+class RepairStopException(EntropyException):
55
+    """Exception raised when repair scripts should be stopped."""

Loading…
Cancel
Save