diff --git a/os_collect_config/tests/test_zaqar.py b/os_collect_config/tests/test_zaqar.py index d9ce109..0753607 100644 --- a/os_collect_config/tests/test_zaqar.py +++ b/os_collect_config/tests/test_zaqar.py @@ -12,13 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json + import fixtures from keystoneclient import discover as ks_discover import mock from oslo_config import cfg +from oslo_config import fixture as config_fixture import testtools from testtools import matchers from zaqarclient.queues.v1 import message +from zaqarclient import transport +from zaqarclient.transport import response from os_collect_config import collect from os_collect_config import exc @@ -34,6 +39,14 @@ class FakeKeystoneClient(test_heat.FakeKeystoneClient): return 'http://192.0.2.1:8888/' +class FakeKeystoneClientWebsocket(test_heat.FakeKeystoneClient): + + def url_for(self, service_type, endpoint_type): + self._test.assertEqual('messaging-websocket', service_type) + self._test.assertEqual('publicURL', endpoint_type) + return 'ws://127.0.0.1:9000/' + + class FakeZaqarClient(object): def __init__(self, testcase): @@ -50,6 +63,31 @@ class FakeZaqarClient(object): return FakeQueue() +class FakeZaqarWebsocketClient(object): + + def __init__(self, options, messages=None, testcase=None): + self._messages = messages + self._test = testcase + + def send(self, request): + self._test.assertEqual('ws://127.0.0.1:9000/', request.endpoint) + if request.operation == 'message_list': + body = json.loads(request.content) + self._test.assertEqual( + '4f3f46d3-09f1-42a7-8c13-f91a5457192c', body['queue_name']) + return response.Response(request, content=json.dumps(self._messages), + status_code=200) + + def recv(self): + return {'body': test_heat.META_DATA} + + def __enter__(self): + return self + + def __exit__(self, *exc): + pass + + class FakeQueue(object): def pop(self): @@ -87,11 +125,17 @@ class TestZaqar(testtools.TestCase): self.log = self.useFixture(fixtures.FakeLogger()) self.useFixture(fixtures.NestedTempfile()) collect.setup_conf() - cfg.CONF.zaqar.auth_url = 'http://192.0.2.1:5000/v3' - cfg.CONF.zaqar.user_id = '0123456789ABCDEF' - cfg.CONF.zaqar.password = 'FEDCBA9876543210' - cfg.CONF.zaqar.project_id = '9f6b09df-4d7f-4a33-8ec3-9924d8f46f10' - cfg.CONF.zaqar.queue_id = '4f3f46d3-09f1-42a7-8c13-f91a5457192c' + + conf = config_fixture.Config() + self.useFixture(conf) + conf.config(group='zaqar', use_websockets=False) + conf.config(group='zaqar', auth_url='http://192.0.2.1:5000/v3') + conf.config(group='zaqar', user_id='0123456789ABCDEF') + conf.config(group='zaqar', password='FEDCBA9876543210') + conf.config(group='zaqar', + project_id='9f6b09df-4d7f-4a33-8ec3-9924d8f46f10') + conf.config(group='zaqar', + queue_id='4f3f46d3-09f1-42a7-8c13-f91a5457192c') @mock.patch.object(ks_discover.Discover, '__init__') @mock.patch.object(ks_discover.Discover, 'url_for') @@ -176,3 +220,51 @@ class TestZaqar(testtools.TestCase): self.assertRaises( exc.ZaqarMetadataNotConfigured, zaqar_collect.collect) self.assertIn('No queue_id configured', self.log.output) + + @mock.patch.object(transport, 'get_transport_for') + @mock.patch.object(ks_discover.Discover, '__init__') + @mock.patch.object(ks_discover.Discover, 'url_for') + def test_collect_zaqar_websocket(self, mock_url_for, mock___init__, + mock_transport): + + mock___init__.return_value = None + mock_url_for.return_value = cfg.CONF.zaqar.auth_url + conf = config_fixture.Config() + self.useFixture(conf) + conf.config(group='zaqar', use_websockets=True) + messages = {'messages': [{'body': test_heat.META_DATA, 'id': 1}]} + ws = FakeZaqarWebsocketClient({}, messages=messages, testcase=self) + mock_transport.return_value = ws + zaqar_md = zaqar.Collector( + keystoneclient=FakeKeystoneClientWebsocket(self, cfg.CONF.zaqar) + ).collect() + self.assertThat(zaqar_md, matchers.IsInstance(list)) + self.assertEqual('zaqar', zaqar_md[0][0]) + zaqar_md = zaqar_md[0][1] + + for k in ('int1', 'strfoo', 'map_ab'): + self.assertIn(k, zaqar_md) + self.assertEqual(zaqar_md[k], test_heat.META_DATA[k]) + + @mock.patch.object(transport, 'get_transport_for') + @mock.patch.object(ks_discover.Discover, '__init__') + @mock.patch.object(ks_discover.Discover, 'url_for') + def test_collect_zaqar_websocket_recv(self, mock_url_for, mock___init__, + mock_transport): + mock___init__.return_value = None + mock_url_for.return_value = cfg.CONF.zaqar.auth_url + ws = FakeZaqarWebsocketClient({}, messages={}, testcase=self) + mock_transport.return_value = ws + conf = config_fixture.Config() + self.useFixture(conf) + conf.config(group='zaqar', use_websockets=True) + zaqar_md = zaqar.Collector( + keystoneclient=FakeKeystoneClientWebsocket(self, cfg.CONF.zaqar), + ).collect() + self.assertThat(zaqar_md, matchers.IsInstance(list)) + self.assertEqual('zaqar', zaqar_md[0][0]) + zaqar_md = zaqar_md[0][1] + + for k in ('int1', 'strfoo', 'map_ab'): + self.assertIn(k, zaqar_md) + self.assertEqual(zaqar_md[k], test_heat.META_DATA[k]) diff --git a/os_collect_config/zaqar.py b/os_collect_config/zaqar.py index fdf1f3d..e16f9ba 100644 --- a/os_collect_config/zaqar.py +++ b/os_collect_config/zaqar.py @@ -12,11 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json + from keystoneclient.v3 import client as keystoneclient from oslo_config import cfg from oslo_log import log import six from zaqarclient.queues.v1 import client as zaqarclient +from zaqarclient import transport +from zaqarclient.transport import request from os_collect_config import exc from os_collect_config import keystone @@ -36,6 +40,9 @@ opts = [ help='URL for API authentication'), cfg.StrOpt('queue-id', help='ID of the queue to be checked'), + cfg.BoolOpt('use-websockets', + default=False, + help='Use the websocket transport to connect to Zaqar.'), ] name = 'zaqar' @@ -44,10 +51,75 @@ class Collector(object): def __init__(self, keystoneclient=keystoneclient, zaqarclient=zaqarclient, - discover_class=None): + discover_class=None, + transport=transport): self.keystoneclient = keystoneclient self.zaqarclient = zaqarclient self.discover_class = discover_class + self.transport = transport + + def get_data_wsgi(self, ks, conf): + + endpoint = ks.service_catalog.url_for( + service_type='messaging', endpoint_type='publicURL') + logger.debug('Fetching metadata from %s' % endpoint) + zaqar = self.zaqarclient.Client(endpoint, conf=conf, version=1.1) + + queue = zaqar.queue(CONF.zaqar.queue_id) + r = six.next(queue.pop()) + return r.body + + def _create_req(self, endpoint, action, body): + return request.Request(endpoint, action, content=json.dumps(body)) + + def get_data_websocket(self, ks, conf): + + endpoint = ks.service_catalog.url_for( + service_type='messaging-websocket', endpoint_type='publicURL') + + logger.debug('Fetching metadata from %s' % endpoint) + + with self.transport.get_transport_for(endpoint, options=conf) as ws: + # create queue + req = self._create_req(endpoint, 'queue_create', + {'queue_name': CONF.zaqar.queue_id}) + ws.send(req) + # subscribe to queue messages + req = self._create_req(endpoint, 'subscription_create', + {'queue_name': CONF.zaqar.queue_id, + 'ttl': 10000}) + ws.send(req) + + # TODO(dprince) would be nice to use message_delete_many but + # websockets doesn't support parameters so we can't send 'pop'. + # This would allow us to avoid the 'message_delete' below. Example: + # req = self._create_req(endpoint, 'message_delete_many', + # {'queue_name': CONF.zaqar.queue_id, 'pop': 1}) + req = self._create_req(endpoint, 'message_list', + {'queue_name': CONF.zaqar.queue_id, + 'echo': True}) + resp = ws.send(req) + messages = json.loads(resp.content).get('messages', []) + + if len(messages) > 0: + # NOTE(dprince) In this case we are checking for queue + # messages that arrived before we subscribed. + logger.debug('Websocket message_list found...') + msg_0 = messages[0] + data = msg_0['body'] + req = self._create_req(endpoint, 'message_delete', + {'queue_name': CONF.zaqar.queue_id, + 'message_id': msg_0['id']}) + ws.send(req) + + else: + # NOTE(dprince) This will block until there is data available + # or the socket times out. Because we subscribe to the queue + # it will allow us to process data immediately. + logger.debug('websocket recv()') + data = ws.recv()['body'] + + return data def collect(self): if CONF.zaqar.auth_url is None: @@ -74,9 +146,7 @@ class Collector(object): project_id=CONF.zaqar.project_id, keystoneclient=self.keystoneclient, discover_class=self.discover_class).client - endpoint = ks.service_catalog.url_for( - service_type='messaging', endpoint_type='publicURL') - logger.debug('Fetching metadata from %s' % endpoint) + conf = { 'auth_opts': { 'backend': 'keystone', @@ -87,13 +157,13 @@ class Collector(object): } } - zaqar = self.zaqarclient.Client(endpoint, conf=conf, version=1.1) - - queue = zaqar.queue(CONF.zaqar.queue_id) - r = six.next(queue.pop()) + if CONF.zaqar.use_websockets: + data = self.get_data_websocket(ks, conf) + else: + data = self.get_data_wsgi(ks, conf) final_list = merger.merged_list_from_content( - r.body, cfg.CONF.deployment_key, name) + data, cfg.CONF.deployment_key, name) return final_list except Exception as e: