os-collect-config/os_collect_config/zaqar.py
Dan Prince b6364df3e5 Add support for Zaqar websockets
This patch adds a new zaqar.use_websockets option which if set to True
in the config file will configure the Zaqar collector to use
the websocket transport instead of wsgi. This can be more efficient
where you want to avoid the continuous polling of o-c-c and instead
just listen on the websocket that is subscribed to a queue.

Like other collectors each iteration creates a new socket object. This
allows us to use the normal re-exec logic in o-c-c and thus gives
the option to re-configure the agent in the future to use other types
of collectors. We could (optionally) look into a higher level option in
the future that would allow o-c-c to avoid re-exec'ing and thus re-use
the same websocket for multiple sets of metadata.

Depends-On: Ia2a8deb599252d8308e44d595eb2bf443999aaad
Change-Id: Id5c7ed590df776844b6c7961eb40f89206cd24e0
2016-11-30 21:12:47 -05:00

172 lines
6.4 KiB
Python

#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from keystoneclient.v3 import client as keystoneclient
from oslo_config import cfg
from oslo_log import log
import six
from zaqarclient.queues.v1 import client as zaqarclient
from zaqarclient import transport
from zaqarclient.transport import request
from os_collect_config import exc
from os_collect_config import keystone
from os_collect_config import merger
CONF = cfg.CONF
logger = log.getLogger(__name__)
opts = [
cfg.StrOpt('user-id',
help='User ID for API authentication'),
cfg.StrOpt('password',
help='Password for API authentication'),
cfg.StrOpt('project-id',
help='ID of project for API authentication'),
cfg.StrOpt('auth-url',
help='URL for API authentication'),
cfg.StrOpt('queue-id',
help='ID of the queue to be checked'),
cfg.BoolOpt('use-websockets',
default=False,
help='Use the websocket transport to connect to Zaqar.'),
]
name = 'zaqar'
class Collector(object):
def __init__(self,
keystoneclient=keystoneclient,
zaqarclient=zaqarclient,
discover_class=None,
transport=transport):
self.keystoneclient = keystoneclient
self.zaqarclient = zaqarclient
self.discover_class = discover_class
self.transport = transport
def get_data_wsgi(self, ks, conf):
endpoint = ks.service_catalog.url_for(
service_type='messaging', endpoint_type='publicURL')
logger.debug('Fetching metadata from %s' % endpoint)
zaqar = self.zaqarclient.Client(endpoint, conf=conf, version=1.1)
queue = zaqar.queue(CONF.zaqar.queue_id)
r = six.next(queue.pop())
return r.body
def _create_req(self, endpoint, action, body):
return request.Request(endpoint, action, content=json.dumps(body))
def get_data_websocket(self, ks, conf):
endpoint = ks.service_catalog.url_for(
service_type='messaging-websocket', endpoint_type='publicURL')
logger.debug('Fetching metadata from %s' % endpoint)
with self.transport.get_transport_for(endpoint, options=conf) as ws:
# create queue
req = self._create_req(endpoint, 'queue_create',
{'queue_name': CONF.zaqar.queue_id})
ws.send(req)
# subscribe to queue messages
req = self._create_req(endpoint, 'subscription_create',
{'queue_name': CONF.zaqar.queue_id,
'ttl': 10000})
ws.send(req)
# TODO(dprince) would be nice to use message_delete_many but
# websockets doesn't support parameters so we can't send 'pop'.
# This would allow us to avoid the 'message_delete' below. Example:
# req = self._create_req(endpoint, 'message_delete_many',
# {'queue_name': CONF.zaqar.queue_id, 'pop': 1})
req = self._create_req(endpoint, 'message_list',
{'queue_name': CONF.zaqar.queue_id,
'echo': True})
resp = ws.send(req)
messages = json.loads(resp.content).get('messages', [])
if len(messages) > 0:
# NOTE(dprince) In this case we are checking for queue
# messages that arrived before we subscribed.
logger.debug('Websocket message_list found...')
msg_0 = messages[0]
data = msg_0['body']
req = self._create_req(endpoint, 'message_delete',
{'queue_name': CONF.zaqar.queue_id,
'message_id': msg_0['id']})
ws.send(req)
else:
# NOTE(dprince) This will block until there is data available
# or the socket times out. Because we subscribe to the queue
# it will allow us to process data immediately.
logger.debug('websocket recv()')
data = ws.recv()['body']
return data
def collect(self):
if CONF.zaqar.auth_url is None:
logger.warn('No auth_url configured.')
raise exc.ZaqarMetadataNotConfigured()
if CONF.zaqar.password is None:
logger.warn('No password configured.')
raise exc.ZaqarMetadataNotConfigured()
if CONF.zaqar.project_id is None:
logger.warn('No project_id configured.')
raise exc.ZaqarMetadataNotConfigured()
if CONF.zaqar.user_id is None:
logger.warn('No user_id configured.')
raise exc.ZaqarMetadataNotConfigured()
if CONF.zaqar.queue_id is None:
logger.warn('No queue_id configured.')
raise exc.ZaqarMetadataNotConfigured()
try:
ks = keystone.Keystone(
auth_url=CONF.zaqar.auth_url,
user_id=CONF.zaqar.user_id,
password=CONF.zaqar.password,
project_id=CONF.zaqar.project_id,
keystoneclient=self.keystoneclient,
discover_class=self.discover_class).client
conf = {
'auth_opts': {
'backend': 'keystone',
'options': {
'os_auth_token': ks.auth_token,
'os_project_id': CONF.zaqar.project_id
}
}
}
if CONF.zaqar.use_websockets:
data = self.get_data_websocket(ks, conf)
else:
data = self.get_data_wsgi(ks, conf)
final_list = merger.merged_list_from_content(
data, cfg.CONF.deployment_key, name)
return final_list
except Exception as e:
logger.warn(str(e))
raise exc.ZaqarMetadataNotAvailable()