Browse Source

Import zmq driver code with minimal modifications

Modifications are:

  - use stdlib logging; no huge need for oslo logging here

  - stub out the _() function; we don't have any l10n infrastructure in
    the project and may never have

  - change imports to oslo.messaging.openstack.common and
    oslo.messaging._drivers as appropriate

Change-Id: I87b85b79a33dec65e51ed95fff90cc56042240c5
Mark McLoughlin 5 years ago
parent
commit
ff3a4155bf

+ 820
- 0
oslo/messaging/_drivers/impl_zmq.py View File

@@ -0,0 +1,820 @@
1
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
+
3
+#    Copyright 2011 Cloudscaling Group, Inc
4
+#
5
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
6
+#    not use this file except in compliance with the License. You may obtain
7
+#    a copy of the License at
8
+#
9
+#         http://www.apache.org/licenses/LICENSE-2.0
10
+#
11
+#    Unless required by applicable law or agreed to in writing, software
12
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
+#    License for the specific language governing permissions and limitations
15
+#    under the License.
16
+
17
+import logging
18
+import os
19
+import pprint
20
+import re
21
+import socket
22
+import sys
23
+import types
24
+import uuid
25
+
26
+import eventlet
27
+import greenlet
28
+from oslo.config import cfg
29
+
30
+from oslo.messaging._drivers import common as rpc_common
31
+from oslo.messaging.openstack.common import excutils
32
+from oslo.messaging.openstack.common import importutils
33
+from oslo.messaging.openstack.common import jsonutils
34
+
35
+zmq = importutils.try_import('eventlet.green.zmq')
36
+
37
+# for convenience, are not modified.
38
+pformat = pprint.pformat
39
+Timeout = eventlet.timeout.Timeout
40
+LOG = logging.getLogger(__name__)
41
+RemoteError = rpc_common.RemoteError
42
+RPCException = rpc_common.RPCException
43
+
44
+# FIXME(markmc): remove this
45
+_ = lambda s: s
46
+
47
+zmq_opts = [
48
+    cfg.StrOpt('rpc_zmq_bind_address', default='*',
49
+               help='ZeroMQ bind address. Should be a wildcard (*), '
50
+                    'an ethernet interface, or IP. '
51
+                    'The "host" option should point or resolve to this '
52
+                    'address.'),
53
+
54
+    # The module.Class to use for matchmaking.
55
+    cfg.StrOpt(
56
+        'rpc_zmq_matchmaker',
57
+        default=('openstack.common.rpc.'
58
+                 'matchmaker.MatchMakerLocalhost'),
59
+        help='MatchMaker driver',
60
+    ),
61
+
62
+    # The following port is unassigned by IANA as of 2012-05-21
63
+    cfg.IntOpt('rpc_zmq_port', default=9501,
64
+               help='ZeroMQ receiver listening port'),
65
+
66
+    cfg.IntOpt('rpc_zmq_contexts', default=1,
67
+               help='Number of ZeroMQ contexts, defaults to 1'),
68
+
69
+    cfg.IntOpt('rpc_zmq_topic_backlog', default=None,
70
+               help='Maximum number of ingress messages to locally buffer '
71
+                    'per topic. Default is unlimited.'),
72
+
73
+    cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
74
+               help='Directory for holding IPC sockets'),
75
+
76
+    cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
77
+               help='Name of this node. Must be a valid hostname, FQDN, or '
78
+                    'IP address. Must match "host" option, if running Nova.')
79
+]
80
+
81
+
82
+CONF = cfg.CONF
83
+CONF.register_opts(zmq_opts)
84
+
85
+ZMQ_CTX = None  # ZeroMQ Context, must be global.
86
+matchmaker = None  # memoized matchmaker object
87
+
88
+
89
+def _serialize(data):
90
+    """Serialization wrapper.
91
+
92
+    We prefer using JSON, but it cannot encode all types.
93
+    Error if a developer passes us bad data.
94
+    """
95
+    try:
96
+        return jsonutils.dumps(data, ensure_ascii=True)
97
+    except TypeError:
98
+        with excutils.save_and_reraise_exception():
99
+            LOG.error(_("JSON serialization failed."))
100
+
101
+
102
+def _deserialize(data):
103
+    """Deserialization wrapper."""
104
+    LOG.debug(_("Deserializing: %s"), data)
105
+    return jsonutils.loads(data)
106
+
107
+
108
+class ZmqSocket(object):
109
+    """A tiny wrapper around ZeroMQ.
110
+
111
+    Simplifies the send/recv protocol and connection management.
112
+    Can be used as a Context (supports the 'with' statement).
113
+    """
114
+
115
+    def __init__(self, addr, zmq_type, bind=True, subscribe=None):
116
+        self.sock = _get_ctxt().socket(zmq_type)
117
+        self.addr = addr
118
+        self.type = zmq_type
119
+        self.subscriptions = []
120
+
121
+        # Support failures on sending/receiving on wrong socket type.
122
+        self.can_recv = zmq_type in (zmq.PULL, zmq.SUB)
123
+        self.can_send = zmq_type in (zmq.PUSH, zmq.PUB)
124
+        self.can_sub = zmq_type in (zmq.SUB, )
125
+
126
+        # Support list, str, & None for subscribe arg (cast to list)
127
+        do_sub = {
128
+            list: subscribe,
129
+            str: [subscribe],
130
+            type(None): []
131
+        }[type(subscribe)]
132
+
133
+        for f in do_sub:
134
+            self.subscribe(f)
135
+
136
+        str_data = {'addr': addr, 'type': self.socket_s(),
137
+                    'subscribe': subscribe, 'bind': bind}
138
+
139
+        LOG.debug(_("Connecting to %(addr)s with %(type)s"), str_data)
140
+        LOG.debug(_("-> Subscribed to %(subscribe)s"), str_data)
141
+        LOG.debug(_("-> bind: %(bind)s"), str_data)
142
+
143
+        try:
144
+            if bind:
145
+                self.sock.bind(addr)
146
+            else:
147
+                self.sock.connect(addr)
148
+        except Exception:
149
+            raise RPCException(_("Could not open socket."))
150
+
151
+    def socket_s(self):
152
+        """Get socket type as string."""
153
+        t_enum = ('PUSH', 'PULL', 'PUB', 'SUB', 'REP', 'REQ', 'ROUTER',
154
+                  'DEALER')
155
+        return dict(map(lambda t: (getattr(zmq, t), t), t_enum))[self.type]
156
+
157
+    def subscribe(self, msg_filter):
158
+        """Subscribe."""
159
+        if not self.can_sub:
160
+            raise RPCException("Cannot subscribe on this socket.")
161
+        LOG.debug(_("Subscribing to %s"), msg_filter)
162
+
163
+        try:
164
+            self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter)
165
+        except Exception:
166
+            return
167
+
168
+        self.subscriptions.append(msg_filter)
169
+
170
+    def unsubscribe(self, msg_filter):
171
+        """Unsubscribe."""
172
+        if msg_filter not in self.subscriptions:
173
+            return
174
+        self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter)
175
+        self.subscriptions.remove(msg_filter)
176
+
177
+    def close(self):
178
+        if self.sock is None or self.sock.closed:
179
+            return
180
+
181
+        # We must unsubscribe, or we'll leak descriptors.
182
+        if self.subscriptions:
183
+            for f in self.subscriptions:
184
+                try:
185
+                    self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
186
+                except Exception:
187
+                    pass
188
+            self.subscriptions = []
189
+
190
+        try:
191
+            # Default is to linger
192
+            self.sock.close()
193
+        except Exception:
194
+            # While this is a bad thing to happen,
195
+            # it would be much worse if some of the code calling this
196
+            # were to fail. For now, lets log, and later evaluate
197
+            # if we can safely raise here.
198
+            LOG.error("ZeroMQ socket could not be closed.")
199
+        self.sock = None
200
+
201
+    def recv(self, **kwargs):
202
+        if not self.can_recv:
203
+            raise RPCException(_("You cannot recv on this socket."))
204
+        return self.sock.recv_multipart(**kwargs)
205
+
206
+    def send(self, data, **kwargs):
207
+        if not self.can_send:
208
+            raise RPCException(_("You cannot send on this socket."))
209
+        self.sock.send_multipart(data, **kwargs)
210
+
211
+
212
+class ZmqClient(object):
213
+    """Client for ZMQ sockets."""
214
+
215
+    def __init__(self, addr):
216
+        self.outq = ZmqSocket(addr, zmq.PUSH, bind=False)
217
+
218
+    def cast(self, msg_id, topic, data, envelope):
219
+        msg_id = msg_id or 0
220
+
221
+        if not envelope:
222
+            self.outq.send(map(bytes,
223
+                           (msg_id, topic, 'cast', _serialize(data))))
224
+            return
225
+
226
+        rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
227
+        zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
228
+        self.outq.send(map(bytes,
229
+                       (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
230
+
231
+    def close(self):
232
+        self.outq.close()
233
+
234
+
235
+class RpcContext(rpc_common.CommonRpcContext):
236
+    """Context that supports replying to a rpc.call."""
237
+    def __init__(self, **kwargs):
238
+        self.replies = []
239
+        super(RpcContext, self).__init__(**kwargs)
240
+
241
+    def deepcopy(self):
242
+        values = self.to_dict()
243
+        values['replies'] = self.replies
244
+        return self.__class__(**values)
245
+
246
+    def reply(self, reply=None, failure=None, ending=False):
247
+        if ending:
248
+            return
249
+        self.replies.append(reply)
250
+
251
+    @classmethod
252
+    def marshal(self, ctx):
253
+        ctx_data = ctx.to_dict()
254
+        return _serialize(ctx_data)
255
+
256
+    @classmethod
257
+    def unmarshal(self, data):
258
+        return RpcContext.from_dict(_deserialize(data))
259
+
260
+
261
+class InternalContext(object):
262
+    """Used by ConsumerBase as a private context for - methods."""
263
+
264
+    def __init__(self, proxy):
265
+        self.proxy = proxy
266
+        self.msg_waiter = None
267
+
268
+    def _get_response(self, ctx, proxy, topic, data):
269
+        """Process a curried message and cast the result to topic."""
270
+        LOG.debug(_("Running func with context: %s"), ctx.to_dict())
271
+        data.setdefault('version', None)
272
+        data.setdefault('args', {})
273
+
274
+        try:
275
+            result = proxy.dispatch(
276
+                ctx, data['version'], data['method'],
277
+                data.get('namespace'), **data['args'])
278
+            return ConsumerBase.normalize_reply(result, ctx.replies)
279
+        except greenlet.GreenletExit:
280
+            # ignore these since they are just from shutdowns
281
+            pass
282
+        except rpc_common.ClientException as e:
283
+            LOG.debug(_("Expected exception during message handling (%s)") %
284
+                      e._exc_info[1])
285
+            return {'exc':
286
+                    rpc_common.serialize_remote_exception(e._exc_info,
287
+                                                          log_failure=False)}
288
+        except Exception:
289
+            LOG.error(_("Exception during message handling"))
290
+            return {'exc':
291
+                    rpc_common.serialize_remote_exception(sys.exc_info())}
292
+
293
+    def reply(self, ctx, proxy,
294
+              msg_id=None, context=None, topic=None, msg=None):
295
+        """Reply to a casted call."""
296
+        # NOTE(ewindisch): context kwarg exists for Grizzly compat.
297
+        #                  this may be able to be removed earlier than
298
+        #                  'I' if ConsumerBase.process were refactored.
299
+        if type(msg) is list:
300
+            payload = msg[-1]
301
+        else:
302
+            payload = msg
303
+
304
+        response = ConsumerBase.normalize_reply(
305
+            self._get_response(ctx, proxy, topic, payload),
306
+            ctx.replies)
307
+
308
+        LOG.debug(_("Sending reply"))
309
+        _multi_send(_cast, ctx, topic, {
310
+            'method': '-process_reply',
311
+            'args': {
312
+                'msg_id': msg_id,  # Include for Folsom compat.
313
+                'response': response
314
+            }
315
+        }, _msg_id=msg_id)
316
+
317
+
318
+class ConsumerBase(object):
319
+    """Base Consumer."""
320
+
321
+    def __init__(self):
322
+        self.private_ctx = InternalContext(None)
323
+
324
+    @classmethod
325
+    def normalize_reply(self, result, replies):
326
+        #TODO(ewindisch): re-evaluate and document this method.
327
+        if isinstance(result, types.GeneratorType):
328
+            return list(result)
329
+        elif replies:
330
+            return replies
331
+        else:
332
+            return [result]
333
+
334
+    def process(self, proxy, ctx, data):
335
+        data.setdefault('version', None)
336
+        data.setdefault('args', {})
337
+
338
+        # Method starting with - are
339
+        # processed internally. (non-valid method name)
340
+        method = data.get('method')
341
+        if not method:
342
+            LOG.error(_("RPC message did not include method."))
343
+            return
344
+
345
+        # Internal method
346
+        # uses internal context for safety.
347
+        if method == '-reply':
348
+            self.private_ctx.reply(ctx, proxy, **data['args'])
349
+            return
350
+
351
+        proxy.dispatch(ctx, data['version'],
352
+                       data['method'], data.get('namespace'), **data['args'])
353
+
354
+
355
+class ZmqBaseReactor(ConsumerBase):
356
+    """A consumer class implementing a centralized casting broker (PULL-PUSH).
357
+
358
+    Used for RoundRobin requests.
359
+    """
360
+
361
+    def __init__(self, conf):
362
+        super(ZmqBaseReactor, self).__init__()
363
+
364
+        self.proxies = {}
365
+        self.threads = []
366
+        self.sockets = []
367
+        self.subscribe = {}
368
+
369
+        self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
370
+
371
+    def register(self, proxy, in_addr, zmq_type_in,
372
+                 in_bind=True, subscribe=None):
373
+
374
+        LOG.info(_("Registering reactor"))
375
+
376
+        if zmq_type_in not in (zmq.PULL, zmq.SUB):
377
+            raise RPCException("Bad input socktype")
378
+
379
+        # Items push in.
380
+        inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind,
381
+                        subscribe=subscribe)
382
+
383
+        self.proxies[inq] = proxy
384
+        self.sockets.append(inq)
385
+
386
+        LOG.info(_("In reactor registered"))
387
+
388
+    def consume_in_thread(self):
389
+        def _consume(sock):
390
+            LOG.info(_("Consuming socket"))
391
+            while True:
392
+                self.consume(sock)
393
+
394
+        for k in self.proxies.keys():
395
+            self.threads.append(
396
+                self.pool.spawn(_consume, k)
397
+            )
398
+
399
+    def wait(self):
400
+        for t in self.threads:
401
+            t.wait()
402
+
403
+    def close(self):
404
+        for s in self.sockets:
405
+            s.close()
406
+
407
+        for t in self.threads:
408
+            t.kill()
409
+
410
+
411
+class ZmqProxy(ZmqBaseReactor):
412
+    """A consumer class implementing a topic-based proxy.
413
+
414
+    Forwards to IPC sockets.
415
+    """
416
+
417
+    def __init__(self, conf):
418
+        super(ZmqProxy, self).__init__(conf)
419
+        pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))
420
+        self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))
421
+
422
+        self.topic_proxy = {}
423
+
424
+    def consume(self, sock):
425
+        ipc_dir = CONF.rpc_zmq_ipc_dir
426
+
427
+        data = sock.recv(copy=False)
428
+        topic = data[1].bytes
429
+
430
+        if topic.startswith('fanout~'):
431
+            sock_type = zmq.PUB
432
+            topic = topic.split('.', 1)[0]
433
+        elif topic.startswith('zmq_replies'):
434
+            sock_type = zmq.PUB
435
+        else:
436
+            sock_type = zmq.PUSH
437
+
438
+        if topic not in self.topic_proxy:
439
+            def publisher(waiter):
440
+                LOG.info(_("Creating proxy for topic: %s"), topic)
441
+
442
+                try:
443
+                    # The topic is received over the network,
444
+                    # don't trust this input.
445
+                    if self.badchars.search(topic) is not None:
446
+                        emsg = _("Topic contained dangerous characters.")
447
+                        LOG.warn(emsg)
448
+                        raise RPCException(emsg)
449
+
450
+                    out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
451
+                                         (ipc_dir, topic),
452
+                                         sock_type, bind=True)
453
+                except RPCException:
454
+                    waiter.send_exception(*sys.exc_info())
455
+                    return
456
+
457
+                self.topic_proxy[topic] = eventlet.queue.LightQueue(
458
+                    CONF.rpc_zmq_topic_backlog)
459
+                self.sockets.append(out_sock)
460
+
461
+                # It takes some time for a pub socket to open,
462
+                # before we can have any faith in doing a send() to it.
463
+                if sock_type == zmq.PUB:
464
+                    eventlet.sleep(.5)
465
+
466
+                waiter.send(True)
467
+
468
+                while(True):
469
+                    data = self.topic_proxy[topic].get()
470
+                    out_sock.send(data, copy=False)
471
+
472
+            wait_sock_creation = eventlet.event.Event()
473
+            eventlet.spawn(publisher, wait_sock_creation)
474
+
475
+            try:
476
+                wait_sock_creation.wait()
477
+            except RPCException:
478
+                LOG.error(_("Topic socket file creation failed."))
479
+                return
480
+
481
+        try:
482
+            self.topic_proxy[topic].put_nowait(data)
483
+        except eventlet.queue.Full:
484
+            LOG.error(_("Local per-topic backlog buffer full for topic "
485
+                        "%(topic)s. Dropping message.") % {'topic': topic})
486
+
487
+    def consume_in_thread(self):
488
+        """Runs the ZmqProxy service."""
489
+        ipc_dir = CONF.rpc_zmq_ipc_dir
490
+        consume_in = "tcp://%s:%s" % \
491
+            (CONF.rpc_zmq_bind_address,
492
+             CONF.rpc_zmq_port)
493
+        consumption_proxy = InternalContext(None)
494
+
495
+        try:
496
+            os.makedirs(ipc_dir)
497
+        except os.error:
498
+            if not os.path.isdir(ipc_dir):
499
+                with excutils.save_and_reraise_exception():
500
+                    LOG.error(_("Required IPC directory does not exist at"
501
+                                " %s") % (ipc_dir, ))
502
+        try:
503
+            self.register(consumption_proxy,
504
+                          consume_in,
505
+                          zmq.PULL)
506
+        except zmq.ZMQError:
507
+            if os.access(ipc_dir, os.X_OK):
508
+                with excutils.save_and_reraise_exception():
509
+                    LOG.error(_("Permission denied to IPC directory at"
510
+                                " %s") % (ipc_dir, ))
511
+            with excutils.save_and_reraise_exception():
512
+                LOG.error(_("Could not create ZeroMQ receiver daemon. "
513
+                            "Socket may already be in use."))
514
+
515
+        super(ZmqProxy, self).consume_in_thread()
516
+
517
+
518
+def unflatten_envelope(packenv):
519
+    """Unflattens the RPC envelope.
520
+
521
+    Takes a list and returns a dictionary.
522
+    i.e. [1,2,3,4] => {1: 2, 3: 4}
523
+    """
524
+    i = iter(packenv)
525
+    h = {}
526
+    try:
527
+        while True:
528
+            k = i.next()
529
+            h[k] = i.next()
530
+    except StopIteration:
531
+        return h
532
+
533
+
534
+class ZmqReactor(ZmqBaseReactor):
535
+    """A consumer class implementing a consumer for messages.
536
+
537
+    Can also be used as a 1:1 proxy
538
+    """
539
+
540
+    def __init__(self, conf):
541
+        super(ZmqReactor, self).__init__(conf)
542
+
543
+    def consume(self, sock):
544
+        #TODO(ewindisch): use zero-copy (i.e. references, not copying)
545
+        data = sock.recv()
546
+        LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
547
+
548
+        proxy = self.proxies[sock]
549
+
550
+        if data[2] == 'cast':  # Legacy protocol
551
+            packenv = data[3]
552
+
553
+            ctx, msg = _deserialize(packenv)
554
+            request = rpc_common.deserialize_msg(msg)
555
+            ctx = RpcContext.unmarshal(ctx)
556
+        elif data[2] == 'impl_zmq_v2':
557
+            packenv = data[4:]
558
+
559
+            msg = unflatten_envelope(packenv)
560
+            request = rpc_common.deserialize_msg(msg)
561
+
562
+            # Unmarshal only after verifying the message.
563
+            ctx = RpcContext.unmarshal(data[3])
564
+        else:
565
+            LOG.error(_("ZMQ Envelope version unsupported or unknown."))
566
+            return
567
+
568
+        self.pool.spawn_n(self.process, proxy, ctx, request)
569
+
570
+
571
+class Connection(rpc_common.Connection):
572
+    """Manages connections and threads."""
573
+
574
+    def __init__(self, conf):
575
+        self.topics = []
576
+        self.reactor = ZmqReactor(conf)
577
+
578
+    def create_consumer(self, topic, proxy, fanout=False):
579
+        # Register with matchmaker.
580
+        _get_matchmaker().register(topic, CONF.rpc_zmq_host)
581
+
582
+        # Subscription scenarios
583
+        if fanout:
584
+            sock_type = zmq.SUB
585
+            subscribe = ('', fanout)[type(fanout) == str]
586
+            topic = 'fanout~' + topic.split('.', 1)[0]
587
+        else:
588
+            sock_type = zmq.PULL
589
+            subscribe = None
590
+            topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
591
+
592
+        if topic in self.topics:
593
+            LOG.info(_("Skipping topic registration. Already registered."))
594
+            return
595
+
596
+        # Receive messages from (local) proxy
597
+        inaddr = "ipc://%s/zmq_topic_%s" % \
598
+            (CONF.rpc_zmq_ipc_dir, topic)
599
+
600
+        LOG.debug(_("Consumer is a zmq.%s"),
601
+                  ['PULL', 'SUB'][sock_type == zmq.SUB])
602
+
603
+        self.reactor.register(proxy, inaddr, sock_type,
604
+                              subscribe=subscribe, in_bind=False)
605
+        self.topics.append(topic)
606
+
607
+    def close(self):
608
+        _get_matchmaker().stop_heartbeat()
609
+        for topic in self.topics:
610
+            _get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
611
+
612
+        self.reactor.close()
613
+        self.topics = []
614
+
615
+    def wait(self):
616
+        self.reactor.wait()
617
+
618
+    def consume_in_thread(self):
619
+        _get_matchmaker().start_heartbeat()
620
+        self.reactor.consume_in_thread()
621
+
622
+
623
+def _cast(addr, context, topic, msg, timeout=None, envelope=False,
624
+          _msg_id=None):
625
+    timeout_cast = timeout or CONF.rpc_cast_timeout
626
+    payload = [RpcContext.marshal(context), msg]
627
+
628
+    with Timeout(timeout_cast, exception=rpc_common.Timeout):
629
+        try:
630
+            conn = ZmqClient(addr)
631
+
632
+            # assumes cast can't return an exception
633
+            conn.cast(_msg_id, topic, payload, envelope)
634
+        except zmq.ZMQError:
635
+            raise RPCException("Cast failed. ZMQ Socket Exception")
636
+        finally:
637
+            if 'conn' in vars():
638
+                conn.close()
639
+
640
+
641
+def _call(addr, context, topic, msg, timeout=None,
642
+          envelope=False):
643
+    # timeout_response is how long we wait for a response
644
+    timeout = timeout or CONF.rpc_response_timeout
645
+
646
+    # The msg_id is used to track replies.
647
+    msg_id = uuid.uuid4().hex
648
+
649
+    # Replies always come into the reply service.
650
+    reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
651
+
652
+    LOG.debug(_("Creating payload"))
653
+    # Curry the original request into a reply method.
654
+    mcontext = RpcContext.marshal(context)
655
+    payload = {
656
+        'method': '-reply',
657
+        'args': {
658
+            'msg_id': msg_id,
659
+            'topic': reply_topic,
660
+            # TODO(ewindisch): safe to remove mcontext in I.
661
+            'msg': [mcontext, msg]
662
+        }
663
+    }
664
+
665
+    LOG.debug(_("Creating queue socket for reply waiter"))
666
+
667
+    # Messages arriving async.
668
+    # TODO(ewindisch): have reply consumer with dynamic subscription mgmt
669
+    with Timeout(timeout, exception=rpc_common.Timeout):
670
+        try:
671
+            msg_waiter = ZmqSocket(
672
+                "ipc://%s/zmq_topic_zmq_replies.%s" %
673
+                (CONF.rpc_zmq_ipc_dir,
674
+                 CONF.rpc_zmq_host),
675
+                zmq.SUB, subscribe=msg_id, bind=False
676
+            )
677
+
678
+            LOG.debug(_("Sending cast"))
679
+            _cast(addr, context, topic, payload, envelope)
680
+
681
+            LOG.debug(_("Cast sent; Waiting reply"))
682
+            # Blocks until receives reply
683
+            msg = msg_waiter.recv()
684
+            LOG.debug(_("Received message: %s"), msg)
685
+            LOG.debug(_("Unpacking response"))
686
+
687
+            if msg[2] == 'cast':  # Legacy version
688
+                raw_msg = _deserialize(msg[-1])[-1]
689
+            elif msg[2] == 'impl_zmq_v2':
690
+                rpc_envelope = unflatten_envelope(msg[4:])
691
+                raw_msg = rpc_common.deserialize_msg(rpc_envelope)
692
+            else:
693
+                raise rpc_common.UnsupportedRpcEnvelopeVersion(
694
+                    _("Unsupported or unknown ZMQ envelope returned."))
695
+
696
+            responses = raw_msg['args']['response']
697
+        # ZMQError trumps the Timeout error.
698
+        except zmq.ZMQError:
699
+            raise RPCException("ZMQ Socket Error")
700
+        except (IndexError, KeyError):
701
+            raise RPCException(_("RPC Message Invalid."))
702
+        finally:
703
+            if 'msg_waiter' in vars():
704
+                msg_waiter.close()
705
+
706
+    # It seems we don't need to do all of the following,
707
+    # but perhaps it would be useful for multicall?
708
+    # One effect of this is that we're checking all
709
+    # responses for Exceptions.
710
+    for resp in responses:
711
+        if isinstance(resp, types.DictType) and 'exc' in resp:
712
+            raise rpc_common.deserialize_remote_exception(CONF, resp['exc'])
713
+
714
+    return responses[-1]
715
+
716
+
717
+def _multi_send(method, context, topic, msg, timeout=None,
718
+                envelope=False, _msg_id=None):
719
+    """Wraps the sending of messages.
720
+
721
+    Dispatches to the matchmaker and sends message to all relevant hosts.
722
+    """
723
+    conf = CONF
724
+    LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
725
+
726
+    queues = _get_matchmaker().queues(topic)
727
+    LOG.debug(_("Sending message(s) to: %s"), queues)
728
+
729
+    # Don't stack if we have no matchmaker results
730
+    if not queues:
731
+        LOG.warn(_("No matchmaker results. Not casting."))
732
+        # While not strictly a timeout, callers know how to handle
733
+        # this exception and a timeout isn't too big a lie.
734
+        raise rpc_common.Timeout(_("No match from matchmaker."))
735
+
736
+    # This supports brokerless fanout (addresses > 1)
737
+    for queue in queues:
738
+        (_topic, ip_addr) = queue
739
+        _addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port)
740
+
741
+        if method.__name__ == '_cast':
742
+            eventlet.spawn_n(method, _addr, context,
743
+                             _topic, msg, timeout, envelope,
744
+                             _msg_id)
745
+            return
746
+        return method(_addr, context, _topic, msg, timeout,
747
+                      envelope)
748
+
749
+
750
+def create_connection(conf, new=True):
751
+    return Connection(conf)
752
+
753
+
754
+def multicall(conf, *args, **kwargs):
755
+    """Multiple calls."""
756
+    return _multi_send(_call, *args, **kwargs)
757
+
758
+
759
+def call(conf, *args, **kwargs):
760
+    """Send a message, expect a response."""
761
+    data = _multi_send(_call, *args, **kwargs)
762
+    return data[-1]
763
+
764
+
765
+def cast(conf, *args, **kwargs):
766
+    """Send a message expecting no reply."""
767
+    _multi_send(_cast, *args, **kwargs)
768
+
769
+
770
+def fanout_cast(conf, context, topic, msg, **kwargs):
771
+    """Send a message to all listening and expect no reply."""
772
+    # NOTE(ewindisch): fanout~ is used because it avoid splitting on .
773
+    # and acts as a non-subtle hint to the matchmaker and ZmqProxy.
774
+    _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
775
+
776
+
777
+def notify(conf, context, topic, msg, envelope):
778
+    """Send notification event.
779
+
780
+    Notifications are sent to topic-priority.
781
+    This differs from the AMQP drivers which send to topic.priority.
782
+    """
783
+    # NOTE(ewindisch): dot-priority in rpc notifier does not
784
+    # work with our assumptions.
785
+    topic = topic.replace('.', '-')
786
+    cast(conf, context, topic, msg, envelope=envelope)
787
+
788
+
789
+def cleanup():
790
+    """Clean up resources in use by implementation."""
791
+    global ZMQ_CTX
792
+    if ZMQ_CTX:
793
+        ZMQ_CTX.term()
794
+    ZMQ_CTX = None
795
+
796
+    global matchmaker
797
+    matchmaker = None
798
+
799
+
800
+def _get_ctxt():
801
+    if not zmq:
802
+        raise ImportError("Failed to import eventlet.green.zmq")
803
+
804
+    global ZMQ_CTX
805
+    if not ZMQ_CTX:
806
+        ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
807
+    return ZMQ_CTX
808
+
809
+
810
+def _get_matchmaker(*args, **kwargs):
811
+    global matchmaker
812
+    if not matchmaker:
813
+        mm = CONF.rpc_zmq_matchmaker
814
+        if mm.endswith('matchmaker.MatchMakerRing'):
815
+            mm.replace('matchmaker', 'matchmaker_ring')
816
+            LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use'
817
+                       ' %(new)s instead') % dict(
818
+                     orig=CONF.rpc_zmq_matchmaker, new=mm))
819
+        matchmaker = importutils.import_object(mm, *args, **kwargs)
820
+    return matchmaker

+ 324
- 0
oslo/messaging/_drivers/matchmaker.py View File

@@ -0,0 +1,324 @@
1
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
+
3
+#    Copyright 2011 Cloudscaling Group, Inc
4
+#
5
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
6
+#    not use this file except in compliance with the License. You may obtain
7
+#    a copy of the License at
8
+#
9
+#         http://www.apache.org/licenses/LICENSE-2.0
10
+#
11
+#    Unless required by applicable law or agreed to in writing, software
12
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
+#    License for the specific language governing permissions and limitations
15
+#    under the License.
16
+"""
17
+The MatchMaker classes should except a Topic or Fanout exchange key and
18
+return keys for direct exchanges, per (approximate) AMQP parlance.
19
+"""
20
+
21
+import contextlib
22
+import logging
23
+
24
+import eventlet
25
+from oslo.config import cfg
26
+
27
+# FIXME(markmc): remove this
28
+_ = lambda s: s
29
+
30
+matchmaker_opts = [
31
+    cfg.IntOpt('matchmaker_heartbeat_freq',
32
+               default=300,
33
+               help='Heartbeat frequency'),
34
+    cfg.IntOpt('matchmaker_heartbeat_ttl',
35
+               default=600,
36
+               help='Heartbeat time-to-live.'),
37
+]
38
+
39
+CONF = cfg.CONF
40
+CONF.register_opts(matchmaker_opts)
41
+LOG = logging.getLogger(__name__)
42
+contextmanager = contextlib.contextmanager
43
+
44
+
45
+class MatchMakerException(Exception):
46
+    """Signified a match could not be found."""
47
+    message = _("Match not found by MatchMaker.")
48
+
49
+
50
+class Exchange(object):
51
+    """Implements lookups.
52
+
53
+    Subclass this to support hashtables, dns, etc.
54
+    """
55
+    def __init__(self):
56
+        pass
57
+
58
+    def run(self, key):
59
+        raise NotImplementedError()
60
+
61
+
62
+class Binding(object):
63
+    """A binding on which to perform a lookup."""
64
+    def __init__(self):
65
+        pass
66
+
67
+    def test(self, key):
68
+        raise NotImplementedError()
69
+
70
+
71
+class MatchMakerBase(object):
72
+    """Match Maker Base Class.
73
+
74
+    Build off HeartbeatMatchMakerBase if building a heartbeat-capable
75
+    MatchMaker.
76
+    """
77
+    def __init__(self):
78
+        # Array of tuples. Index [2] toggles negation, [3] is last-if-true
79
+        self.bindings = []
80
+
81
+        self.no_heartbeat_msg = _('Matchmaker does not implement '
82
+                                  'registration or heartbeat.')
83
+
84
+    def register(self, key, host):
85
+        """Register a host on a backend.
86
+
87
+        Heartbeats, if applicable, may keepalive registration.
88
+        """
89
+        pass
90
+
91
+    def ack_alive(self, key, host):
92
+        """Acknowledge that a key.host is alive.
93
+
94
+        Used internally for updating heartbeats, but may also be used
95
+        publically to acknowledge a system is alive (i.e. rpc message
96
+        successfully sent to host)
97
+        """
98
+        pass
99
+
100
+    def is_alive(self, topic, host):
101
+        """Checks if a host is alive."""
102
+        pass
103
+
104
+    def expire(self, topic, host):
105
+        """Explicitly expire a host's registration."""
106
+        pass
107
+
108
+    def send_heartbeats(self):
109
+        """Send all heartbeats.
110
+
111
+        Use start_heartbeat to spawn a heartbeat greenthread,
112
+        which loops this method.
113
+        """
114
+        pass
115
+
116
+    def unregister(self, key, host):
117
+        """Unregister a topic."""
118
+        pass
119
+
120
+    def start_heartbeat(self):
121
+        """Spawn heartbeat greenthread."""
122
+        pass
123
+
124
+    def stop_heartbeat(self):
125
+        """Destroys the heartbeat greenthread."""
126
+        pass
127
+
128
+    def add_binding(self, binding, rule, last=True):
129
+        self.bindings.append((binding, rule, False, last))
130
+
131
+    #NOTE(ewindisch): kept the following method in case we implement the
132
+    #                 underlying support.
133
+    #def add_negate_binding(self, binding, rule, last=True):
134
+    #    self.bindings.append((binding, rule, True, last))
135
+
136
+    def queues(self, key):
137
+        workers = []
138
+
139
+        # bit is for negate bindings - if we choose to implement it.
140
+        # last stops processing rules if this matches.
141
+        for (binding, exchange, bit, last) in self.bindings:
142
+            if binding.test(key):
143
+                workers.extend(exchange.run(key))
144
+
145
+                # Support last.
146
+                if last:
147
+                    return workers
148
+        return workers
149
+
150
+
151
+class HeartbeatMatchMakerBase(MatchMakerBase):
152
+    """Base for a heart-beat capable MatchMaker.
153
+
154
+    Provides common methods for registering, unregistering, and maintaining
155
+    heartbeats.
156
+    """
157
+    def __init__(self):
158
+        self.hosts = set()
159
+        self._heart = None
160
+        self.host_topic = {}
161
+
162
+        super(HeartbeatMatchMakerBase, self).__init__()
163
+
164
+    def send_heartbeats(self):
165
+        """Send all heartbeats.
166
+
167
+        Use start_heartbeat to spawn a heartbeat greenthread,
168
+        which loops this method.
169
+        """
170
+        for key, host in self.host_topic:
171
+            self.ack_alive(key, host)
172
+
173
+    def ack_alive(self, key, host):
174
+        """Acknowledge that a host.topic is alive.
175
+
176
+        Used internally for updating heartbeats, but may also be used
177
+        publically to acknowledge a system is alive (i.e. rpc message
178
+        successfully sent to host)
179
+        """
180
+        raise NotImplementedError("Must implement ack_alive")
181
+
182
+    def backend_register(self, key, host):
183
+        """Implements registration logic.
184
+
185
+        Called by register(self,key,host)
186
+        """
187
+        raise NotImplementedError("Must implement backend_register")
188
+
189
+    def backend_unregister(self, key, key_host):
190
+        """Implements de-registration logic.
191
+
192
+        Called by unregister(self,key,host)
193
+        """
194
+        raise NotImplementedError("Must implement backend_unregister")
195
+
196
+    def register(self, key, host):
197
+        """Register a host on a backend.
198
+
199
+        Heartbeats, if applicable, may keepalive registration.
200
+        """
201
+        self.hosts.add(host)
202
+        self.host_topic[(key, host)] = host
203
+        key_host = '.'.join((key, host))
204
+
205
+        self.backend_register(key, key_host)
206
+
207
+        self.ack_alive(key, host)
208
+
209
+    def unregister(self, key, host):
210
+        """Unregister a topic."""
211
+        if (key, host) in self.host_topic:
212
+            del self.host_topic[(key, host)]
213
+
214
+        self.hosts.discard(host)
215
+        self.backend_unregister(key, '.'.join((key, host)))
216
+
217
+        LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"),
218
+                 {'key': key, 'host': host})
219
+
220
+    def start_heartbeat(self):
221
+        """Implementation of MatchMakerBase.start_heartbeat.
222
+
223
+        Launches greenthread looping send_heartbeats(),
224
+        yielding for CONF.matchmaker_heartbeat_freq seconds
225
+        between iterations.
226
+        """
227
+        if not self.hosts:
228
+            raise MatchMakerException(
229
+                _("Register before starting heartbeat."))
230
+
231
+        def do_heartbeat():
232
+            while True:
233
+                self.send_heartbeats()
234
+                eventlet.sleep(CONF.matchmaker_heartbeat_freq)
235
+
236
+        self._heart = eventlet.spawn(do_heartbeat)
237
+
238
+    def stop_heartbeat(self):
239
+        """Destroys the heartbeat greenthread."""
240
+        if self._heart:
241
+            self._heart.kill()
242
+
243
+
244
+class DirectBinding(Binding):
245
+    """Specifies a host in the key via a '.' character.
246
+
247
+    Although dots are used in the key, the behavior here is
248
+    that it maps directly to a host, thus direct.
249
+    """
250
+    def test(self, key):
251
+        return '.' in key
252
+
253
+
254
+class TopicBinding(Binding):
255
+    """Where a 'bare' key without dots.
256
+
257
+    AMQP generally considers topic exchanges to be those *with* dots,
258
+    but we deviate here in terminology as the behavior here matches
259
+    that of a topic exchange (whereas where there are dots, behavior
260
+    matches that of a direct exchange.
261
+    """
262
+    def test(self, key):
263
+        return '.' not in key
264
+
265
+
266
+class FanoutBinding(Binding):
267
+    """Match on fanout keys, where key starts with 'fanout.' string."""
268
+    def test(self, key):
269
+        return key.startswith('fanout~')
270
+
271
+
272
+class StubExchange(Exchange):
273
+    """Exchange that does nothing."""
274
+    def run(self, key):
275
+        return [(key, None)]
276
+
277
+
278
+class LocalhostExchange(Exchange):
279
+    """Exchange where all direct topics are local."""
280
+    def __init__(self, host='localhost'):
281
+        self.host = host
282
+        super(Exchange, self).__init__()
283
+
284
+    def run(self, key):
285
+        return [('.'.join((key.split('.')[0], self.host)), self.host)]
286
+
287
+
288
+class DirectExchange(Exchange):
289
+    """Exchange where all topic keys are split, sending to second half.
290
+
291
+    i.e. "compute.host" sends a message to "compute.host" running on "host"
292
+    """
293
+    def __init__(self):
294
+        super(Exchange, self).__init__()
295
+
296
+    def run(self, key):
297
+        e = key.split('.', 1)[1]
298
+        return [(key, e)]
299
+
300
+
301
+class MatchMakerLocalhost(MatchMakerBase):
302
+    """Match Maker where all bare topics resolve to localhost.
303
+
304
+    Useful for testing.
305
+    """
306
+    def __init__(self, host='localhost'):
307
+        super(MatchMakerLocalhost, self).__init__()
308
+        self.add_binding(FanoutBinding(), LocalhostExchange(host))
309
+        self.add_binding(DirectBinding(), DirectExchange())
310
+        self.add_binding(TopicBinding(), LocalhostExchange(host))
311
+
312
+
313
+class MatchMakerStub(MatchMakerBase):
314
+    """Match Maker where topics are untouched.
315
+
316
+    Useful for testing, or for AMQP/brokered queues.
317
+    Will not work where knowledge of hosts is known (i.e. zeromq)
318
+    """
319
+    def __init__(self):
320
+        super(MatchMakerStub, self).__init__()
321
+
322
+        self.add_binding(FanoutBinding(), StubExchange())
323
+        self.add_binding(DirectBinding(), StubExchange())
324
+        self.add_binding(TopicBinding(), StubExchange())

+ 146
- 0
oslo/messaging/_drivers/matchmaker_redis.py View File

@@ -0,0 +1,146 @@
1
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
+
3
+#    Copyright 2013 Cloudscaling Group, Inc
4
+#
5
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
6
+#    not use this file except in compliance with the License. You may obtain
7
+#    a copy of the License at
8
+#
9
+#         http://www.apache.org/licenses/LICENSE-2.0
10
+#
11
+#    Unless required by applicable law or agreed to in writing, software
12
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
+#    License for the specific language governing permissions and limitations
15
+#    under the License.
16
+"""
17
+The MatchMaker classes should accept a Topic or Fanout exchange key and
18
+return keys for direct exchanges, per (approximate) AMQP parlance.
19
+"""
20
+
21
+import logging
22
+
23
+from oslo.config import cfg
24
+
25
+from oslo.messaging._drivers import matchmaker as mm_common
26
+from oslo.messaging.openstack.common import importutils
27
+
28
+redis = importutils.try_import('redis')
29
+
30
+
31
+matchmaker_redis_opts = [
32
+    cfg.StrOpt('host',
33
+               default='127.0.0.1',
34
+               help='Host to locate redis'),
35
+    cfg.IntOpt('port',
36
+               default=6379,
37
+               help='Use this port to connect to redis host.'),
38
+    cfg.StrOpt('password',
39
+               default=None,
40
+               help='Password for Redis server. (optional)'),
41
+]
42
+
43
+CONF = cfg.CONF
44
+opt_group = cfg.OptGroup(name='matchmaker_redis',
45
+                         title='Options for Redis-based MatchMaker')
46
+CONF.register_group(opt_group)
47
+CONF.register_opts(matchmaker_redis_opts, opt_group)
48
+LOG = logging.getLogger(__name__)
49
+
50
+
51
+class RedisExchange(mm_common.Exchange):
52
+    def __init__(self, matchmaker):
53
+        self.matchmaker = matchmaker
54
+        self.redis = matchmaker.redis
55
+        super(RedisExchange, self).__init__()
56
+
57
+
58
+class RedisTopicExchange(RedisExchange):
59
+    """Exchange where all topic keys are split, sending to second half.
60
+
61
+    i.e. "compute.host" sends a message to "compute" running on "host"
62
+    """
63
+    def run(self, topic):
64
+        while True:
65
+            member_name = self.redis.srandmember(topic)
66
+
67
+            if not member_name:
68
+                # If this happens, there are no
69
+                # longer any members.
70
+                break
71
+
72
+            if not self.matchmaker.is_alive(topic, member_name):
73
+                continue
74
+
75
+            host = member_name.split('.', 1)[1]
76
+            return [(member_name, host)]
77
+        return []
78
+
79
+
80
+class RedisFanoutExchange(RedisExchange):
81
+    """Return a list of all hosts."""
82
+    def run(self, topic):
83
+        topic = topic.split('~', 1)[1]
84
+        hosts = self.redis.smembers(topic)
85
+        good_hosts = filter(
86
+            lambda host: self.matchmaker.is_alive(topic, host), hosts)
87
+
88
+        return [(x, x.split('.', 1)[1]) for x in good_hosts]
89
+
90
+
91
+class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
92
+    """MatchMaker registering and looking-up hosts with a Redis server."""
93
+    def __init__(self):
94
+        super(MatchMakerRedis, self).__init__()
95
+
96
+        if not redis:
97
+            raise ImportError("Failed to import module redis.")
98
+
99
+        self.redis = redis.StrictRedis(
100
+            host=CONF.matchmaker_redis.host,
101
+            port=CONF.matchmaker_redis.port,
102
+            password=CONF.matchmaker_redis.password)
103
+
104
+        self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self))
105
+        self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange())
106
+        self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self))
107
+
108
+    def ack_alive(self, key, host):
109
+        topic = "%s.%s" % (key, host)
110
+        if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl):
111
+            # If we could not update the expiration, the key
112
+            # might have been pruned. Re-register, creating a new
113
+            # key in Redis.
114
+            self.register(self.topic_host[host], host)
115
+
116
+    def is_alive(self, topic, host):
117
+        if self.redis.ttl(host) == -1:
118
+            self.expire(topic, host)
119
+            return False
120
+        return True
121
+
122
+    def expire(self, topic, host):
123
+        with self.redis.pipeline() as pipe:
124
+            pipe.multi()
125
+            pipe.delete(host)
126
+            pipe.srem(topic, host)
127
+            pipe.execute()
128
+
129
+    def backend_register(self, key, key_host):
130
+        with self.redis.pipeline() as pipe:
131
+            pipe.multi()
132
+            pipe.sadd(key, key_host)
133
+
134
+            # No value is needed, we just
135
+            # care if it exists. Sets aren't viable
136
+            # because only keys can expire.
137
+            pipe.set(key_host, '')
138
+
139
+            pipe.execute()
140
+
141
+    def backend_unregister(self, key, key_host):
142
+        with self.redis.pipeline() as pipe:
143
+            pipe.multi()
144
+            pipe.srem(key, key_host)
145
+            pipe.delete(key_host)
146
+            pipe.execute()

+ 109
- 0
oslo/messaging/_drivers/matchmaker_ring.py View File

@@ -0,0 +1,109 @@
1
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
+
3
+#    Copyright 2011-2013 Cloudscaling Group, Inc
4
+#
5
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
6
+#    not use this file except in compliance with the License. You may obtain
7
+#    a copy of the License at
8
+#
9
+#         http://www.apache.org/licenses/LICENSE-2.0
10
+#
11
+#    Unless required by applicable law or agreed to in writing, software
12
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
+#    License for the specific language governing permissions and limitations
15
+#    under the License.
16
+"""
17
+The MatchMaker classes should except a Topic or Fanout exchange key and
18
+return keys for direct exchanges, per (approximate) AMQP parlance.
19
+"""
20
+
21
+import itertools
22
+import json
23
+import logging
24
+
25
+from oslo.config import cfg
26
+
27
+from oslo.messaging._drivers import matchmaker as mm
28
+
29
+# FIXME(markmc): remove this
30
+_ = lambda s: s
31
+
32
+matchmaker_opts = [
33
+    # Matchmaker ring file
34
+    cfg.StrOpt('ringfile',
35
+               deprecated_name='matchmaker_ringfile',
36
+               deprecated_group='DEFAULT',
37
+               default='/etc/oslo/matchmaker_ring.json',
38
+               help='Matchmaker ring file (JSON)'),
39
+]
40
+
41
+CONF = cfg.CONF
42
+CONF.register_opts(matchmaker_opts, 'matchmaker_ring')
43
+LOG = logging.getLogger(__name__)
44
+
45
+
46
+class RingExchange(mm.Exchange):
47
+    """Match Maker where hosts are loaded from a static JSON formatted file.
48
+
49
+    __init__ takes optional ring dictionary argument, otherwise
50
+    loads the ringfile from CONF.mathcmaker_ringfile.
51
+    """
52
+    def __init__(self, ring=None):
53
+        super(RingExchange, self).__init__()
54
+
55
+        if ring:
56
+            self.ring = ring
57
+        else:
58
+            fh = open(CONF.matchmaker_ring.ringfile, 'r')
59
+            self.ring = json.load(fh)
60
+            fh.close()
61
+
62
+        self.ring0 = {}
63
+        for k in self.ring.keys():
64
+            self.ring0[k] = itertools.cycle(self.ring[k])
65
+
66
+    def _ring_has(self, key):
67
+        return key in self.ring0
68
+
69
+
70
+class RoundRobinRingExchange(RingExchange):
71
+    """A Topic Exchange based on a hashmap."""
72
+    def __init__(self, ring=None):
73
+        super(RoundRobinRingExchange, self).__init__(ring)
74
+
75
+    def run(self, key):
76
+        if not self._ring_has(key):
77
+            LOG.warn(
78
+                _("No key defining hosts for topic '%s', "
79
+                  "see ringfile") % (key, )
80
+            )
81
+            return []
82
+        host = next(self.ring0[key])
83
+        return [(key + '.' + host, host)]
84
+
85
+
86
+class FanoutRingExchange(RingExchange):
87
+    """Fanout Exchange based on a hashmap."""
88
+    def __init__(self, ring=None):
89
+        super(FanoutRingExchange, self).__init__(ring)
90
+
91
+    def run(self, key):
92
+        # Assume starts with "fanout~", strip it for lookup.
93
+        nkey = key.split('fanout~')[1:][0]
94
+        if not self._ring_has(nkey):
95
+            LOG.warn(
96
+                _("No key defining hosts for topic '%s', "
97
+                  "see ringfile") % (nkey, )
98
+            )
99
+            return []
100
+        return map(lambda x: (key + '.' + x, x), self.ring[nkey])
101
+
102
+
103
+class MatchMakerRing(mm.MatchMakerBase):
104
+    """Match Maker where hosts are loaded from a static hashmap."""
105
+    def __init__(self, ring=None):
106
+        super(MatchMakerRing, self).__init__()
107
+        self.add_binding(mm.FanoutBinding(), FanoutRingExchange(ring))
108
+        self.add_binding(mm.DirectBinding(), mm.DirectExchange())
109
+        self.add_binding(mm.TopicBinding(), RoundRobinRingExchange(ring))

Loading…
Cancel
Save