Break up messages to avoid max message size

Messages posted back to a zaqar queue by the ansible-playbook action
could easily exceed the max message size for the queue. Instead of
posting a single message each time, break it up based on the max message
size and post a separate message for each.

Change-Id: If51416120b690c28cff4e909799246c16cd96fc0
Closes-Bug: #1737026
This commit is contained in:
James Slagle 2017-12-07 14:47:22 -05:00
parent e41a73c01a
commit 680050dcbe
3 changed files with 89 additions and 4 deletions

View File

@ -0,0 +1,6 @@
---
fixes:
- Messages posted back to a zaqar queue by the ansible-playbook action could
easily exceed the max message size for the queue. Instead of posting a
single message each time, break it up based on the max message size and
post a separate message for each.

View File

@ -264,6 +264,8 @@ class AnsiblePlaybookAction(base.TripleOAction):
self.execution_id = self._kwargs_for_run.pop('execution_id', None)
self._work_dir = self._kwargs_for_run.pop(
'work_dir', None)
self.max_message_size = self._kwargs_for_run.pop(
'max_message_size', 1048576)
@property
def work_dir(self):
@ -346,14 +348,32 @@ class AnsiblePlaybookAction(base.TripleOAction):
self._ssh_private_key = path
return path
def format_message(self, lines):
def format_message(self, message):
return {
'body': {
'payload': {
'message': ''.join(lines),
'message': message,
'status': 'RUNNING',
'execution': {'id': self.execution_id}}}}
def post_message(self, queue, message):
"""Posts message to queue
Breaks the message up it up by maximum message size if needed.
"""
start = 0
# We use 90% of the max message size to account for any overhead,
# plus the wrapped dict structure from format_message
message_size = int(self.max_message_size * 0.9)
while True:
end = start + message_size
message_part = message[start:end]
start = end
if not message_part:
return
queue.post(self.format_message(message_part))
def run(self, context):
if 0 < self.verbosity < 6:
verbosity_option = '-' + ('v' * self.verbosity)
@ -463,10 +483,10 @@ class AnsiblePlaybookAction(base.TripleOAction):
lines.append(line)
stdout.append(line)
if time.time() - start > 30:
queue.post(self.format_message(lines))
self.post_message(queue, ''.join(lines))
lines = []
start = time.time()
queue.post(self.format_message(lines))
self.post_message(queue, ''.join(lines))
process.stdout.close()
returncode = process.wait()
# TODO(d0ugal): This bit isn't ideal - as we redirect stderr to

View File

@ -16,7 +16,9 @@
import json
import mock
import os
import random
from six.moves import configparser
import string
import tempfile
from oslo_concurrency import processutils
@ -78,6 +80,7 @@ class AnsiblePlaybookActionTest(base.TestCase):
self.extra_vars = {"var1": True, "var2": 0}
self.verbosity = 1
self.ctx = mock.MagicMock()
self.max_message_size = 1024
@mock.patch("tripleo_common.actions.ansible.write_default_ansible_cfg")
@mock.patch("oslo_concurrency.processutils.execute")
@ -108,6 +111,62 @@ class AnsiblePlaybookActionTest(base.TestCase):
env_variables=env, cwd=action.work_dir,
log_errors=processutils.LogErrors.ALL)
@mock.patch("tripleo_common.actions.ansible.write_default_ansible_cfg")
@mock.patch("oslo_concurrency.processutils.execute")
def test_post_message(self, mock_execute, mock_write_cfg):
action = ansible.AnsiblePlaybookAction(
playbook=self.playbook, limit_hosts=self.limit_hosts,
remote_user=self.remote_user, become=self.become,
become_user=self.become_user, extra_vars=self.extra_vars,
verbosity=self.verbosity,
max_message_size=self.max_message_size)
ansible_config_path = os.path.join(action.work_dir, 'ansible.cfg')
mock_write_cfg.return_value = ansible_config_path
message_size = int(self.max_message_size * 0.9)
# Message equal to max_message_size
queue = mock.Mock()
message = ''.join([string.ascii_letters[int(random.random() * 26)]
for x in range(1024)])
action.post_message(queue, message)
self.assertEqual(queue.post.call_count, 2)
self.assertEqual(
queue.post.call_args_list[0],
mock.call(action.format_message(message[:message_size])))
self.assertEqual(
queue.post.call_args_list[1],
mock.call(action.format_message(message[message_size:])))
# Message less than max_message_size
queue = mock.Mock()
message = ''.join([string.ascii_letters[int(random.random() * 26)]
for x in range(512)])
action.post_message(queue, message)
self.assertEqual(queue.post.call_count, 1)
self.assertEqual(
queue.post.call_args_list[0],
mock.call(action.format_message(message)))
# Message double max_message_size
queue = mock.Mock()
message = ''.join([string.ascii_letters[int(random.random() * 26)]
for x in range(2048)])
action.post_message(queue, message)
self.assertEqual(queue.post.call_count, 3)
self.assertEqual(
queue.post.call_args_list[0],
mock.call(action.format_message(message[:message_size])))
self.assertEqual(
queue.post.call_args_list[1],
mock.call(action.format_message(
message[message_size:message_size * 2])))
self.assertEqual(
queue.post.call_args_list[2],
mock.call(action.format_message(
message[message_size * 2:2048])))
class CopyConfigFileTest(base.TestCase):