From b6364df3e566322e3a9a438d340d62a097b72c16 Mon Sep 17 00:00:00 2001 From: Dan Prince Date: Thu, 14 Jul 2016 21:07:28 -0400 Subject: [PATCH] Add support for Zaqar websockets This patch adds a new zaqar.use_websockets option which if set to True in the config file will configure the Zaqar collector to use the websocket transport instead of wsgi. This can be more efficient where you want to avoid the continuous polling of o-c-c and instead just listen on the websocket that is subscribed to a queue. Like other collectors each iteration creates a new socket object. This allows us to use the normal re-exec logic in o-c-c and thus gives the option to re-configure the agent in the future to use other types of collectors. We could (optionally) look into a higher level option in the future that would allow o-c-c to avoid re-exec'ing and thus re-use the same websocket for multiple sets of metadata. Depends-On: Ia2a8deb599252d8308e44d595eb2bf443999aaad Change-Id: Id5c7ed590df776844b6c7961eb40f89206cd24e0 --- os_collect_config/tests/test_zaqar.py | 102 ++++++++++++++++++++++++-- os_collect_config/zaqar.py | 88 +++++++++++++++++++--- 2 files changed, 176 insertions(+), 14 deletions(-) diff --git a/os_collect_config/tests/test_zaqar.py b/os_collect_config/tests/test_zaqar.py index d9ce109..0753607 100644 --- a/os_collect_config/tests/test_zaqar.py +++ b/os_collect_config/tests/test_zaqar.py @@ -12,13 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json + import fixtures from keystoneclient import discover as ks_discover import mock from oslo_config import cfg +from oslo_config import fixture as config_fixture import testtools from testtools import matchers from zaqarclient.queues.v1 import message +from zaqarclient import transport +from zaqarclient.transport import response from os_collect_config import collect from os_collect_config import exc @@ -34,6 +39,14 @@ class FakeKeystoneClient(test_heat.FakeKeystoneClient): return 'http://192.0.2.1:8888/' +class FakeKeystoneClientWebsocket(test_heat.FakeKeystoneClient): + + def url_for(self, service_type, endpoint_type): + self._test.assertEqual('messaging-websocket', service_type) + self._test.assertEqual('publicURL', endpoint_type) + return 'ws://127.0.0.1:9000/' + + class FakeZaqarClient(object): def __init__(self, testcase): @@ -50,6 +63,31 @@ class FakeZaqarClient(object): return FakeQueue() +class FakeZaqarWebsocketClient(object): + + def __init__(self, options, messages=None, testcase=None): + self._messages = messages + self._test = testcase + + def send(self, request): + self._test.assertEqual('ws://127.0.0.1:9000/', request.endpoint) + if request.operation == 'message_list': + body = json.loads(request.content) + self._test.assertEqual( + '4f3f46d3-09f1-42a7-8c13-f91a5457192c', body['queue_name']) + return response.Response(request, content=json.dumps(self._messages), + status_code=200) + + def recv(self): + return {'body': test_heat.META_DATA} + + def __enter__(self): + return self + + def __exit__(self, *exc): + pass + + class FakeQueue(object): def pop(self): @@ -87,11 +125,17 @@ class TestZaqar(testtools.TestCase): self.log = self.useFixture(fixtures.FakeLogger()) self.useFixture(fixtures.NestedTempfile()) collect.setup_conf() - cfg.CONF.zaqar.auth_url = 'http://192.0.2.1:5000/v3' - cfg.CONF.zaqar.user_id = '0123456789ABCDEF' - cfg.CONF.zaqar.password = 'FEDCBA9876543210' - cfg.CONF.zaqar.project_id = '9f6b09df-4d7f-4a33-8ec3-9924d8f46f10' - cfg.CONF.zaqar.queue_id = '4f3f46d3-09f1-42a7-8c13-f91a5457192c' + + conf = config_fixture.Config() + self.useFixture(conf) + conf.config(group='zaqar', use_websockets=False) + conf.config(group='zaqar', auth_url='http://192.0.2.1:5000/v3') + conf.config(group='zaqar', user_id='0123456789ABCDEF') + conf.config(group='zaqar', password='FEDCBA9876543210') + conf.config(group='zaqar', + project_id='9f6b09df-4d7f-4a33-8ec3-9924d8f46f10') + conf.config(group='zaqar', + queue_id='4f3f46d3-09f1-42a7-8c13-f91a5457192c') @mock.patch.object(ks_discover.Discover, '__init__') @mock.patch.object(ks_discover.Discover, 'url_for') @@ -176,3 +220,51 @@ class TestZaqar(testtools.TestCase): self.assertRaises( exc.ZaqarMetadataNotConfigured, zaqar_collect.collect) self.assertIn('No queue_id configured', self.log.output) + + @mock.patch.object(transport, 'get_transport_for') + @mock.patch.object(ks_discover.Discover, '__init__') + @mock.patch.object(ks_discover.Discover, 'url_for') + def test_collect_zaqar_websocket(self, mock_url_for, mock___init__, + mock_transport): + + mock___init__.return_value = None + mock_url_for.return_value = cfg.CONF.zaqar.auth_url + conf = config_fixture.Config() + self.useFixture(conf) + conf.config(group='zaqar', use_websockets=True) + messages = {'messages': [{'body': test_heat.META_DATA, 'id': 1}]} + ws = FakeZaqarWebsocketClient({}, messages=messages, testcase=self) + mock_transport.return_value = ws + zaqar_md = zaqar.Collector( + keystoneclient=FakeKeystoneClientWebsocket(self, cfg.CONF.zaqar) + ).collect() + self.assertThat(zaqar_md, matchers.IsInstance(list)) + self.assertEqual('zaqar', zaqar_md[0][0]) + zaqar_md = zaqar_md[0][1] + + for k in ('int1', 'strfoo', 'map_ab'): + self.assertIn(k, zaqar_md) + self.assertEqual(zaqar_md[k], test_heat.META_DATA[k]) + + @mock.patch.object(transport, 'get_transport_for') + @mock.patch.object(ks_discover.Discover, '__init__') + @mock.patch.object(ks_discover.Discover, 'url_for') + def test_collect_zaqar_websocket_recv(self, mock_url_for, mock___init__, + mock_transport): + mock___init__.return_value = None + mock_url_for.return_value = cfg.CONF.zaqar.auth_url + ws = FakeZaqarWebsocketClient({}, messages={}, testcase=self) + mock_transport.return_value = ws + conf = config_fixture.Config() + self.useFixture(conf) + conf.config(group='zaqar', use_websockets=True) + zaqar_md = zaqar.Collector( + keystoneclient=FakeKeystoneClientWebsocket(self, cfg.CONF.zaqar), + ).collect() + self.assertThat(zaqar_md, matchers.IsInstance(list)) + self.assertEqual('zaqar', zaqar_md[0][0]) + zaqar_md = zaqar_md[0][1] + + for k in ('int1', 'strfoo', 'map_ab'): + self.assertIn(k, zaqar_md) + self.assertEqual(zaqar_md[k], test_heat.META_DATA[k]) diff --git a/os_collect_config/zaqar.py b/os_collect_config/zaqar.py index fdf1f3d..e16f9ba 100644 --- a/os_collect_config/zaqar.py +++ b/os_collect_config/zaqar.py @@ -12,11 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json + from keystoneclient.v3 import client as keystoneclient from oslo_config import cfg from oslo_log import log import six from zaqarclient.queues.v1 import client as zaqarclient +from zaqarclient import transport +from zaqarclient.transport import request from os_collect_config import exc from os_collect_config import keystone @@ -36,6 +40,9 @@ opts = [ help='URL for API authentication'), cfg.StrOpt('queue-id', help='ID of the queue to be checked'), + cfg.BoolOpt('use-websockets', + default=False, + help='Use the websocket transport to connect to Zaqar.'), ] name = 'zaqar' @@ -44,10 +51,75 @@ class Collector(object): def __init__(self, keystoneclient=keystoneclient, zaqarclient=zaqarclient, - discover_class=None): + discover_class=None, + transport=transport): self.keystoneclient = keystoneclient self.zaqarclient = zaqarclient self.discover_class = discover_class + self.transport = transport + + def get_data_wsgi(self, ks, conf): + + endpoint = ks.service_catalog.url_for( + service_type='messaging', endpoint_type='publicURL') + logger.debug('Fetching metadata from %s' % endpoint) + zaqar = self.zaqarclient.Client(endpoint, conf=conf, version=1.1) + + queue = zaqar.queue(CONF.zaqar.queue_id) + r = six.next(queue.pop()) + return r.body + + def _create_req(self, endpoint, action, body): + return request.Request(endpoint, action, content=json.dumps(body)) + + def get_data_websocket(self, ks, conf): + + endpoint = ks.service_catalog.url_for( + service_type='messaging-websocket', endpoint_type='publicURL') + + logger.debug('Fetching metadata from %s' % endpoint) + + with self.transport.get_transport_for(endpoint, options=conf) as ws: + # create queue + req = self._create_req(endpoint, 'queue_create', + {'queue_name': CONF.zaqar.queue_id}) + ws.send(req) + # subscribe to queue messages + req = self._create_req(endpoint, 'subscription_create', + {'queue_name': CONF.zaqar.queue_id, + 'ttl': 10000}) + ws.send(req) + + # TODO(dprince) would be nice to use message_delete_many but + # websockets doesn't support parameters so we can't send 'pop'. + # This would allow us to avoid the 'message_delete' below. Example: + # req = self._create_req(endpoint, 'message_delete_many', + # {'queue_name': CONF.zaqar.queue_id, 'pop': 1}) + req = self._create_req(endpoint, 'message_list', + {'queue_name': CONF.zaqar.queue_id, + 'echo': True}) + resp = ws.send(req) + messages = json.loads(resp.content).get('messages', []) + + if len(messages) > 0: + # NOTE(dprince) In this case we are checking for queue + # messages that arrived before we subscribed. + logger.debug('Websocket message_list found...') + msg_0 = messages[0] + data = msg_0['body'] + req = self._create_req(endpoint, 'message_delete', + {'queue_name': CONF.zaqar.queue_id, + 'message_id': msg_0['id']}) + ws.send(req) + + else: + # NOTE(dprince) This will block until there is data available + # or the socket times out. Because we subscribe to the queue + # it will allow us to process data immediately. + logger.debug('websocket recv()') + data = ws.recv()['body'] + + return data def collect(self): if CONF.zaqar.auth_url is None: @@ -74,9 +146,7 @@ class Collector(object): project_id=CONF.zaqar.project_id, keystoneclient=self.keystoneclient, discover_class=self.discover_class).client - endpoint = ks.service_catalog.url_for( - service_type='messaging', endpoint_type='publicURL') - logger.debug('Fetching metadata from %s' % endpoint) + conf = { 'auth_opts': { 'backend': 'keystone', @@ -87,13 +157,13 @@ class Collector(object): } } - zaqar = self.zaqarclient.Client(endpoint, conf=conf, version=1.1) - - queue = zaqar.queue(CONF.zaqar.queue_id) - r = six.next(queue.pop()) + if CONF.zaqar.use_websockets: + data = self.get_data_websocket(ks, conf) + else: + data = self.get_data_wsgi(ks, conf) final_list = merger.merged_list_from_content( - r.body, cfg.CONF.deployment_key, name) + data, cfg.CONF.deployment_key, name) return final_list except Exception as e: