b6364df3e5
This patch adds a new zaqar.use_websockets option which if set to True in the config file will configure the Zaqar collector to use the websocket transport instead of wsgi. This can be more efficient where you want to avoid the continuous polling of o-c-c and instead just listen on the websocket that is subscribed to a queue. Like other collectors each iteration creates a new socket object. This allows us to use the normal re-exec logic in o-c-c and thus gives the option to re-configure the agent in the future to use other types of collectors. We could (optionally) look into a higher level option in the future that would allow o-c-c to avoid re-exec'ing and thus re-use the same websocket for multiple sets of metadata. Depends-On: Ia2a8deb599252d8308e44d595eb2bf443999aaad Change-Id: Id5c7ed590df776844b6c7961eb40f89206cd24e0
172 lines
6.4 KiB
Python
172 lines
6.4 KiB
Python
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import json
|
|
|
|
from keystoneclient.v3 import client as keystoneclient
|
|
from oslo_config import cfg
|
|
from oslo_log import log
|
|
import six
|
|
from zaqarclient.queues.v1 import client as zaqarclient
|
|
from zaqarclient import transport
|
|
from zaqarclient.transport import request
|
|
|
|
from os_collect_config import exc
|
|
from os_collect_config import keystone
|
|
from os_collect_config import merger
|
|
|
|
CONF = cfg.CONF
|
|
logger = log.getLogger(__name__)
|
|
|
|
opts = [
|
|
cfg.StrOpt('user-id',
|
|
help='User ID for API authentication'),
|
|
cfg.StrOpt('password',
|
|
help='Password for API authentication'),
|
|
cfg.StrOpt('project-id',
|
|
help='ID of project for API authentication'),
|
|
cfg.StrOpt('auth-url',
|
|
help='URL for API authentication'),
|
|
cfg.StrOpt('queue-id',
|
|
help='ID of the queue to be checked'),
|
|
cfg.BoolOpt('use-websockets',
|
|
default=False,
|
|
help='Use the websocket transport to connect to Zaqar.'),
|
|
]
|
|
name = 'zaqar'
|
|
|
|
|
|
class Collector(object):
|
|
def __init__(self,
|
|
keystoneclient=keystoneclient,
|
|
zaqarclient=zaqarclient,
|
|
discover_class=None,
|
|
transport=transport):
|
|
self.keystoneclient = keystoneclient
|
|
self.zaqarclient = zaqarclient
|
|
self.discover_class = discover_class
|
|
self.transport = transport
|
|
|
|
def get_data_wsgi(self, ks, conf):
|
|
|
|
endpoint = ks.service_catalog.url_for(
|
|
service_type='messaging', endpoint_type='publicURL')
|
|
logger.debug('Fetching metadata from %s' % endpoint)
|
|
zaqar = self.zaqarclient.Client(endpoint, conf=conf, version=1.1)
|
|
|
|
queue = zaqar.queue(CONF.zaqar.queue_id)
|
|
r = six.next(queue.pop())
|
|
return r.body
|
|
|
|
def _create_req(self, endpoint, action, body):
|
|
return request.Request(endpoint, action, content=json.dumps(body))
|
|
|
|
def get_data_websocket(self, ks, conf):
|
|
|
|
endpoint = ks.service_catalog.url_for(
|
|
service_type='messaging-websocket', endpoint_type='publicURL')
|
|
|
|
logger.debug('Fetching metadata from %s' % endpoint)
|
|
|
|
with self.transport.get_transport_for(endpoint, options=conf) as ws:
|
|
# create queue
|
|
req = self._create_req(endpoint, 'queue_create',
|
|
{'queue_name': CONF.zaqar.queue_id})
|
|
ws.send(req)
|
|
# subscribe to queue messages
|
|
req = self._create_req(endpoint, 'subscription_create',
|
|
{'queue_name': CONF.zaqar.queue_id,
|
|
'ttl': 10000})
|
|
ws.send(req)
|
|
|
|
# TODO(dprince) would be nice to use message_delete_many but
|
|
# websockets doesn't support parameters so we can't send 'pop'.
|
|
# This would allow us to avoid the 'message_delete' below. Example:
|
|
# req = self._create_req(endpoint, 'message_delete_many',
|
|
# {'queue_name': CONF.zaqar.queue_id, 'pop': 1})
|
|
req = self._create_req(endpoint, 'message_list',
|
|
{'queue_name': CONF.zaqar.queue_id,
|
|
'echo': True})
|
|
resp = ws.send(req)
|
|
messages = json.loads(resp.content).get('messages', [])
|
|
|
|
if len(messages) > 0:
|
|
# NOTE(dprince) In this case we are checking for queue
|
|
# messages that arrived before we subscribed.
|
|
logger.debug('Websocket message_list found...')
|
|
msg_0 = messages[0]
|
|
data = msg_0['body']
|
|
req = self._create_req(endpoint, 'message_delete',
|
|
{'queue_name': CONF.zaqar.queue_id,
|
|
'message_id': msg_0['id']})
|
|
ws.send(req)
|
|
|
|
else:
|
|
# NOTE(dprince) This will block until there is data available
|
|
# or the socket times out. Because we subscribe to the queue
|
|
# it will allow us to process data immediately.
|
|
logger.debug('websocket recv()')
|
|
data = ws.recv()['body']
|
|
|
|
return data
|
|
|
|
def collect(self):
|
|
if CONF.zaqar.auth_url is None:
|
|
logger.warn('No auth_url configured.')
|
|
raise exc.ZaqarMetadataNotConfigured()
|
|
if CONF.zaqar.password is None:
|
|
logger.warn('No password configured.')
|
|
raise exc.ZaqarMetadataNotConfigured()
|
|
if CONF.zaqar.project_id is None:
|
|
logger.warn('No project_id configured.')
|
|
raise exc.ZaqarMetadataNotConfigured()
|
|
if CONF.zaqar.user_id is None:
|
|
logger.warn('No user_id configured.')
|
|
raise exc.ZaqarMetadataNotConfigured()
|
|
if CONF.zaqar.queue_id is None:
|
|
logger.warn('No queue_id configured.')
|
|
raise exc.ZaqarMetadataNotConfigured()
|
|
|
|
try:
|
|
ks = keystone.Keystone(
|
|
auth_url=CONF.zaqar.auth_url,
|
|
user_id=CONF.zaqar.user_id,
|
|
password=CONF.zaqar.password,
|
|
project_id=CONF.zaqar.project_id,
|
|
keystoneclient=self.keystoneclient,
|
|
discover_class=self.discover_class).client
|
|
|
|
conf = {
|
|
'auth_opts': {
|
|
'backend': 'keystone',
|
|
'options': {
|
|
'os_auth_token': ks.auth_token,
|
|
'os_project_id': CONF.zaqar.project_id
|
|
}
|
|
}
|
|
}
|
|
|
|
if CONF.zaqar.use_websockets:
|
|
data = self.get_data_websocket(ks, conf)
|
|
else:
|
|
data = self.get_data_wsgi(ks, conf)
|
|
|
|
final_list = merger.merged_list_from_content(
|
|
data, cfg.CONF.deployment_key, name)
|
|
return final_list
|
|
|
|
except Exception as e:
|
|
logger.warn(str(e))
|
|
raise exc.ZaqarMetadataNotAvailable()
|