[kafka] Use notification priority
Notification priority actually points to an endpoint method to be called and couldn't be ignored by the driver. Also reduced exchange because it shouldn't take part in final topic resolution. Closes-Bug #1546081 Change-Id: I4a7c367d91c2fefc6830f0ab3cdcbc0b605127b0
This commit is contained in:
parent
a95035c264
commit
71450fac34
@ -61,15 +61,17 @@ def pack_context_with_message(ctxt, msg):
|
||||
return {'message': msg, 'context': context_d}
|
||||
|
||||
|
||||
def target_to_topic(target):
|
||||
def target_to_topic(target, priority=None):
|
||||
"""Convert target into topic string
|
||||
|
||||
:param target: Message destination target
|
||||
:type target: oslo_messaging.Target
|
||||
:param priority: Notification priority
|
||||
:type priority: string
|
||||
"""
|
||||
if target.exchange is None:
|
||||
if not priority:
|
||||
return target.topic
|
||||
return "%s_%s" % (target.exchange, target.topic)
|
||||
return target.topic + '.' + priority
|
||||
|
||||
|
||||
class Connection(object):
|
||||
@ -351,7 +353,7 @@ class KafkaDriver(base.BaseDriver):
|
||||
conn = self._get_connection(purpose=PURPOSE_LISTEN)
|
||||
topics = []
|
||||
for target, priority in targets_and_priorities:
|
||||
topics.append(target_to_topic(target))
|
||||
topics.append(target_to_topic(target, priority))
|
||||
|
||||
conn.declare_topic_consumer(topics, pool)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user