Add queue name to notifications

Currently notifications from Zaqar are very consise and simply represent
messages. Example:
{'body': 'somemessage', 'ttl': 60}

But subscribers are likely to want queue information inside
notifications, if they subscribe to multiple queues.

The only option for subscribers to know from which queue each
notification was sent is to listen on many ports, and make each port
correspond to each particular queue, but it's very inconvenient.

This patch adds queue name to each notification to make notifications
look like this:
{'body': 'somemessage', 'ttl': 60, 'queue_name': 'somequeue'}

Since we only add a new key to JSON dictionary, this patch is backward
compatible.

This patch adds new test case "test_proper_notification_data".
It also modifies "test_mailto" and "test_webhook" test cases, so they
consider undeterministic behavior of "json.dumps" function, which
often serializes dictionaries in different order, making test assertions
sometimes fail.

Closes-Bug: 1535811
Change-Id: Ied5fa24fd973043eda643e09aac1d6c0f1500892
This commit is contained in:
Eva Balycheva 2016-02-01 09:30:25 +03:00
parent 7e3931afb2
commit 1a0e8e719a
3 changed files with 80 additions and 15 deletions

View File

@ -36,6 +36,10 @@ class MailtoTask(object):
for message in messages:
p = subprocess.Popen(conf.notification.smtp_command.split(' '),
stdin=subprocess.PIPE)
# NOTE(Eva-i): Unfortunately this will add 'queue_name' key to
# our original messages(dicts) which will be later consumed in
# the storage controller. It seems safe though.
message['queue_name'] = subscription['source']
msg = text.MIMEText(json.dumps(message))
msg["to"] = subscriber.path
msg["from"] = subscription['options'].get('from', '')

View File

@ -25,6 +25,10 @@ class WebhookTask(object):
def execute(self, subscription, messages, **kwargs):
try:
for msg in messages:
# NOTE(Eva-i): Unfortunately this will add 'queue_name' key to
# our original messages(dicts) which will be later consumed in
# the storage controller. It seems safe though.
msg['queue_name'] = subscription['source']
requests.post(subscription['subscriber'],
data=json.dumps(msg),
headers={'Content-Type': 'application/json'})

View File

@ -37,11 +37,28 @@ class NotifierTest(testing.TestBase):
"total_bytes": "99614720"}
}
]
# NOTE(Eva-i): NotifiedDriver adds "queue_name" key to each
# message (dictionary), so final notifications look like this
self.notifications = [{"ttl": 300,
"body": {"event": "BackupStarted",
"backup_id":
"c378813c-3f0b-11e2-ad92"},
"queue_name": "fake_queue"
},
{"body": {"event": "BackupProgress",
"current_bytes": "0",
"total_bytes": "99614720"},
"queue_name": "fake_queue"
}
]
def test_webhook(self):
subscription = [{'subscriber': 'http://trigger_me'},
{'subscriber': 'http://call_me'},
{'subscriber': 'http://ping_me'}]
subscription = [{'subscriber': 'http://trigger_me',
'source': 'fake_queue'},
{'subscriber': 'http://call_me',
'source': 'fake_queue'},
{'subscriber': 'http://ping_me',
'source': 'fake_queue'}]
ctlr = mock.MagicMock()
ctlr.list = mock.Mock(return_value=iter([subscription]))
driver = notifier.NotifierDriver(subscription_controller=ctlr)
@ -50,24 +67,33 @@ class NotifierTest(testing.TestBase):
driver.post('fake_queue', self.messages, self.client_id,
self.project)
driver.executor.shutdown()
# Let's deserialize "data" from JSON string to dict in each mock
# call, so we can do dict comparisons. JSON string comparisons
# often fail, because dict keys can be serialized in different
# order inside the string.
for call in mock_post.call_args_list:
call[1]['data'] = json.loads(call[1]['data'])
# These are not real calls. In real calls each "data" argument is
# serialized by json.dumps. But we made a substitution before,
# so it will work.
mock_post.assert_has_calls([
mock.call(subscription[0]['subscriber'],
data=json.dumps(self.messages[0]),
data=self.notifications[0],
headers=headers),
mock.call(subscription[1]['subscriber'],
data=json.dumps(self.messages[0]),
data=self.notifications[0],
headers=headers),
mock.call(subscription[2]['subscriber'],
data=json.dumps(self.messages[0]),
data=self.notifications[0],
headers=headers),
mock.call(subscription[0]['subscriber'],
data=json.dumps(self.messages[1]),
data=self.notifications[1],
headers=headers),
mock.call(subscription[1]['subscriber'],
data=json.dumps(self.messages[1]),
data=self.notifications[1],
headers=headers),
mock.call(subscription[2]['subscriber'],
data=json.dumps(self.messages[1]),
data=self.notifications[1],
headers=headers),
], any_order=True)
self.assertEqual(6, len(mock_post.mock_calls))
@ -75,9 +101,11 @@ class NotifierTest(testing.TestBase):
@mock.patch('subprocess.Popen')
def test_mailto(self, mock_popen):
subscription = [{'subscriber': 'mailto:aaa@example.com',
'source': 'fake_queue',
'options': {'subject': 'Hello',
'from': 'zaqar@example.com'}},
{'subscriber': 'mailto:bbb@example.com',
'source': 'fake_queue',
'options': {'subject': 'Hello',
'from': 'zaqar@example.com'}}]
ctlr = mock.MagicMock()
@ -87,19 +115,18 @@ class NotifierTest(testing.TestBase):
msg = ('Content-Type: text/plain; charset="us-ascii"\n'
'MIME-Version: 1.0\nContent-Transfer-Encoding: 7bit\nto:'
' %(to)s\nfrom: %(from)s\nsubject: %(subject)s\n\n%(body)s')
mail1 = msg % {'to': subscription[0]['subscriber'][7:],
'from': 'zaqar@example.com', 'subject': 'Hello',
'body': json.dumps(self.messages[0])}
'body': json.dumps(self.notifications[0])}
mail2 = msg % {'to': subscription[0]['subscriber'][7:],
'from': 'zaqar@example.com', 'subject': 'Hello',
'body': json.dumps(self.messages[1])}
'body': json.dumps(self.notifications[1])}
mail3 = msg % {'to': subscription[1]['subscriber'][7:],
'from': 'zaqar@example.com', 'subject': 'Hello',
'body': json.dumps(self.messages[0])}
'body': json.dumps(self.notifications[0])}
mail4 = msg % {'to': subscription[1]['subscriber'][7:],
'from': 'zaqar@example.com', 'subject': 'Hello',
'body': json.dumps(self.messages[1])}
'body': json.dumps(self.notifications[1])}
def _communicate(msg):
called.add(msg)
@ -112,7 +139,23 @@ class NotifierTest(testing.TestBase):
driver.executor.shutdown()
self.assertEqual(4, len(called))
self.assertEqual({mail1, mail2, mail3, mail4}, called)
# Let's deserialize "body" from JSON string to dict and then serialize
# it back to JSON, but sorted, allowing us make comparisons.
mails = {mail1, mail2, mail3, mail4}
mail_options = []
mail_bodies = []
for mail in mails:
options, body = mail.split('\n\n')
mail_options.append(options)
mail_bodies.append(json.dumps(json.loads(body), sort_keys=True))
called_options = []
called_bodies = []
for call in called:
options, body = call.split('\n\n')
called_options.append(options)
called_bodies.append(json.dumps(json.loads(body), sort_keys=True))
self.assertEqual(sorted(mail_options), sorted(called_options))
self.assertEqual(sorted(mail_bodies), sorted(called_bodies))
def test_post_no_subscriber(self):
ctlr = mock.MagicMock()
@ -123,3 +166,17 @@ class NotifierTest(testing.TestBase):
self.project)
driver.executor.shutdown()
self.assertEqual(0, mock_post.call_count)
def test_proper_notification_data(self):
subscription = [{'subscriber': 'http://trigger_me',
'source': 'fake_queue'}]
ctlr = mock.MagicMock()
ctlr.list = mock.Mock(return_value=iter([subscription]))
driver = notifier.NotifierDriver(subscription_controller=ctlr)
with mock.patch('requests.post') as mock_post:
driver.post('fake_queue', self.messages, self.client_id,
self.project)
driver.executor.shutdown()
self.assertEqual(2, mock_post.call_count)
self.assertEqual(self.notifications[1],
json.loads(mock_post.call_args[1]['data']))