Openstack common has a wrapper for generating uuids. We should use that function to generate uuids for consistency. Change-Id: I2c31de4566fd4ca1f4c9a0df4403538b00621859
		
			
				
	
	
		
			139 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			139 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#   Copyright 2016 Red Hat, Inc.
 | 
						|
#
 | 
						|
#   Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
						|
#   not use this file except in compliance with the License. You may obtain
 | 
						|
#   a copy of the License at
 | 
						|
#
 | 
						|
#        http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
#   Unless required by applicable law or agreed to in writing, software
 | 
						|
#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
						|
#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
						|
#   License for the specific language governing permissions and limitations
 | 
						|
#   under the License.
 | 
						|
#
 | 
						|
import json
 | 
						|
 | 
						|
from oslo_log import log as logging
 | 
						|
from oslo_utils import importutils
 | 
						|
from oslo_utils import uuidutils
 | 
						|
 | 
						|
from zaqarclient.transport import base
 | 
						|
from zaqarclient.transport import request
 | 
						|
from zaqarclient.transport import response
 | 
						|
 | 
						|
websocket = importutils.try_import('websocket')
 | 
						|
 | 
						|
LOG = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
class WebsocketTransport(base.Transport):
 | 
						|
 | 
						|
    """Zaqar websocket transport.
 | 
						|
 | 
						|
    *NOTE:* Zaqar's websocket interface does not yet appear to work
 | 
						|
    well with parameters. Until it does the websocket transport may not
 | 
						|
    integrate with all of zaqarclients higherlevel request. Even so...
 | 
						|
    websockets today is still quite usable and use of the transport
 | 
						|
    via lower level API's in zaqarclient work quite nicely. Example:
 | 
						|
 | 
						|
       conf = {
 | 
						|
            'auth_opts': {
 | 
						|
                'backend': 'keystone',
 | 
						|
                'options': {
 | 
						|
                    'os_auth_token': ks.auth_token,
 | 
						|
                    'os_project_id': CONF.zaqar.project_id
 | 
						|
                }
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        endpoint = 'ws://172.19.0.3:9000'
 | 
						|
 | 
						|
        with transport.get_transport_for(endpoint, options=conf) as ws:
 | 
						|
            req = request.Request(endpoint, 'queue_create',
 | 
						|
                                  content=json.dumps({'queue_name': 'foo'}))
 | 
						|
            resp = ws.send(req)
 | 
						|
 | 
						|
    """
 | 
						|
    def __init__(self, options):
 | 
						|
        super(WebsocketTransport, self).__init__(options)
 | 
						|
        option = options['auth_opts']['options']
 | 
						|
        # TODO(wangxiyuan): To keep backwards compatibility, we leave
 | 
						|
        # "os_project_id" here. Remove it in the next release.
 | 
						|
        self._project_id = option.get('os_project_id',
 | 
						|
                                      option.get('project_id'))
 | 
						|
        self._token = options['auth_opts']['options']['os_auth_token']
 | 
						|
        self._websocket_client_id = None
 | 
						|
        self._ws = None
 | 
						|
 | 
						|
    def _init_client(self, endpoint):
 | 
						|
        """Initialize a websocket transport client.
 | 
						|
 | 
						|
        :param endpoint: The websocket endpoint. Example: ws://127.0.0.1:9000/.
 | 
						|
                         Required.
 | 
						|
        :type endpoint: string
 | 
						|
        """
 | 
						|
        self._websocket_client_id = uuidutils.generate_uuid()
 | 
						|
 | 
						|
        LOG.debug('Instantiating messaging websocket client: %s', endpoint)
 | 
						|
        self._ws = self._create_connection(endpoint)
 | 
						|
 | 
						|
        auth_req = request.Request(endpoint, 'authenticate',
 | 
						|
                                   headers={'X-Auth-Token': self._token})
 | 
						|
        self.send(auth_req)
 | 
						|
 | 
						|
    def _create_connection(self, endpoint):
 | 
						|
        return websocket.create_connection(endpoint)
 | 
						|
 | 
						|
    def send(self, request):
 | 
						|
        if not self._ws:
 | 
						|
            self._init_client(request.endpoint)
 | 
						|
 | 
						|
        headers = request.headers.copy()
 | 
						|
        headers.update({
 | 
						|
            'Client-ID': self._websocket_client_id,
 | 
						|
            'X-Project-ID': self._project_id
 | 
						|
        })
 | 
						|
 | 
						|
        msg = {'action': request.operation, 'headers': headers}
 | 
						|
        if request.content:
 | 
						|
            msg['body'] = json.loads(request.content)
 | 
						|
        # NOTE(dprince): Zaqar websockets do not yet seem to support params?!
 | 
						|
        # Users of this protocol will need to send everything in the body.
 | 
						|
        if request.params:
 | 
						|
            LOG.warning('Websocket transport does not yet support params.')
 | 
						|
        self._ws.send(json.dumps(msg))
 | 
						|
        ret = self.recv()
 | 
						|
 | 
						|
        resp = response.Response(request, json.dumps(ret.get('body', '')),
 | 
						|
                                 headers=ret['headers'],
 | 
						|
                                 status_code=int(ret['headers']['status']))
 | 
						|
 | 
						|
        if resp.status_code in self.http_to_zaqar:
 | 
						|
            kwargs = {}
 | 
						|
            try:
 | 
						|
                error_body = json.loads(resp.content)
 | 
						|
                kwargs['title'] = 'Websocket Transport Error'
 | 
						|
                kwargs['description'] = error_body['error']
 | 
						|
            except Exception:
 | 
						|
                kwargs['text'] = resp.content
 | 
						|
            raise self.http_to_zaqar[resp.status_code](**kwargs)
 | 
						|
 | 
						|
        return resp
 | 
						|
 | 
						|
    def recv(self):
 | 
						|
        return json.loads(self._ws.recv())
 | 
						|
 | 
						|
    def cleanup(self):
 | 
						|
        if self._ws:
 | 
						|
            self._ws.close()
 | 
						|
            self._ws = None
 | 
						|
 | 
						|
    def __enter__(self):
 | 
						|
        """Return self to allow usage as a context manager"""
 | 
						|
        return self
 | 
						|
 | 
						|
    def __exit__(self, *exc):
 | 
						|
        """Call cleanup when exiting the context manager"""
 | 
						|
        self.cleanup()
 |