Thomas Herve 432abd47ec Use pop for checking messages with websocket
I1c94ad82c3b20cc331045c19d4a9987a701b081b fixed the issue in Zaqar where
the pop argument was wrongly named. We can now use it in the zaqar
collector to retrieve messages over websocket.

Change-Id: I0ce894b0eef7018a37db9ef9d1b43d8d20629bf0
2017-01-09 10:58:23 +01:00

164 lines
5.9 KiB
Python

#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from keystoneclient.v3 import client as keystoneclient
from oslo_config import cfg
from oslo_log import log
import six
from zaqarclient.queues.v1 import client as zaqarclient
from zaqarclient import transport
from zaqarclient.transport import request
from os_collect_config import exc
from os_collect_config import keystone
from os_collect_config import merger
CONF = cfg.CONF
logger = log.getLogger(__name__)
opts = [
cfg.StrOpt('user-id',
help='User ID for API authentication'),
cfg.StrOpt('password',
help='Password for API authentication'),
cfg.StrOpt('project-id',
help='ID of project for API authentication'),
cfg.StrOpt('auth-url',
help='URL for API authentication'),
cfg.StrOpt('queue-id',
help='ID of the queue to be checked'),
cfg.BoolOpt('use-websockets',
default=False,
help='Use the websocket transport to connect to Zaqar.'),
]
name = 'zaqar'
class Collector(object):
def __init__(self,
keystoneclient=keystoneclient,
zaqarclient=zaqarclient,
discover_class=None,
transport=transport):
self.keystoneclient = keystoneclient
self.zaqarclient = zaqarclient
self.discover_class = discover_class
self.transport = transport
def get_data_wsgi(self, ks, conf):
endpoint = ks.service_catalog.url_for(
service_type='messaging', endpoint_type='publicURL')
logger.debug('Fetching metadata from %s' % endpoint)
zaqar = self.zaqarclient.Client(endpoint, conf=conf, version=1.1)
queue = zaqar.queue(CONF.zaqar.queue_id)
r = six.next(queue.pop())
return r.body
def _create_req(self, endpoint, action, body):
return request.Request(endpoint, action, content=json.dumps(body))
def get_data_websocket(self, ks, conf):
endpoint = ks.service_catalog.url_for(
service_type='messaging-websocket', endpoint_type='publicURL')
logger.debug('Fetching metadata from %s' % endpoint)
with self.transport.get_transport_for(endpoint, options=conf) as ws:
# create queue
req = self._create_req(endpoint, 'queue_create',
{'queue_name': CONF.zaqar.queue_id})
ws.send(req)
# subscribe to queue messages
req = self._create_req(endpoint, 'subscription_create',
{'queue_name': CONF.zaqar.queue_id,
'ttl': 10000})
ws.send(req)
# check for pre-existing messages
req = self._create_req(endpoint, 'message_delete_many',
{'queue_name': CONF.zaqar.queue_id,
'pop': 1})
resp = ws.send(req)
messages = json.loads(resp.content).get('messages', [])
if len(messages) > 0:
# NOTE(dprince) In this case we are checking for queue
# messages that arrived before we subscribed.
logger.debug('Websocket message found...')
msg_0 = messages[0]
data = msg_0['body']
else:
# NOTE(dprince) This will block until there is data available
# or the socket times out. Because we subscribe to the queue
# it will allow us to process data immediately.
logger.debug('websocket recv()')
data = ws.recv()['body']
return data
def collect(self):
if CONF.zaqar.auth_url is None:
logger.warn('No auth_url configured.')
raise exc.ZaqarMetadataNotConfigured()
if CONF.zaqar.password is None:
logger.warn('No password configured.')
raise exc.ZaqarMetadataNotConfigured()
if CONF.zaqar.project_id is None:
logger.warn('No project_id configured.')
raise exc.ZaqarMetadataNotConfigured()
if CONF.zaqar.user_id is None:
logger.warn('No user_id configured.')
raise exc.ZaqarMetadataNotConfigured()
if CONF.zaqar.queue_id is None:
logger.warn('No queue_id configured.')
raise exc.ZaqarMetadataNotConfigured()
try:
ks = keystone.Keystone(
auth_url=CONF.zaqar.auth_url,
user_id=CONF.zaqar.user_id,
password=CONF.zaqar.password,
project_id=CONF.zaqar.project_id,
keystoneclient=self.keystoneclient,
discover_class=self.discover_class).client
conf = {
'auth_opts': {
'backend': 'keystone',
'options': {
'os_auth_token': ks.auth_token,
'os_project_id': CONF.zaqar.project_id
}
}
}
if CONF.zaqar.use_websockets:
data = self.get_data_websocket(ks, conf)
else:
data = self.get_data_wsgi(ks, conf)
final_list = merger.merged_list_from_content(
data, cfg.CONF.deployment_key, name)
return final_list
except Exception as e:
logger.warn(str(e))
raise exc.ZaqarMetadataNotAvailable()