
This patch did: 1. Add two commands: "set/get metadata" in API v2. 2. As v1.1 and v2 don't contain the queue.exist() function, and the queue exist check has been done in server side. We should only use it in v1.0. 3. As v1.1 doesn't support PATCH in zaqar server side, we should not allow set metadata in v1.1. DocImpact Closes-bug: #1554326 Change-Id: I01b523ece09e87689516ecccf0c2c7795db46bb7
287 lines
9.3 KiB
Python
287 lines
9.3 KiB
Python
# Copyright (c) 2013 Red Hat, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
This module defines a lower level API for queues' v2. This level of the
|
|
API is responsible for packing up the final request, sending it to the server
|
|
and handling asynchronous requests.
|
|
|
|
Functions present in this module assume that:
|
|
|
|
1. The transport instance is ready to `send` the
|
|
request to the server.
|
|
|
|
2. Transport instance holds the conf instance to use for this
|
|
request.
|
|
"""
|
|
|
|
import datetime
|
|
import json
|
|
|
|
from oslo_utils import timeutils
|
|
|
|
from zaqarclient.queues.v1 import core
|
|
|
|
queue_create = core.queue_create
|
|
queue_exists = core.queue_exists
|
|
queue_get = core.queue_get
|
|
queue_get_metadata = core.queue_get_metadata
|
|
queue_set_metadata = core.queue_set_metadata
|
|
queue_get_stats = core.queue_get_stats
|
|
queue_delete = core.queue_delete
|
|
queue_list = core.queue_list
|
|
message_get = core.message_get
|
|
message_list = core.message_list
|
|
message_post = core.message_post
|
|
message_delete = core.message_delete
|
|
message_delete_many = core.message_delete_many
|
|
pool_get = core.pool_get
|
|
pool_create = core.pool_create
|
|
pool_delete = core.pool_delete
|
|
pool_update = core.pool_update
|
|
pool_list = core.pool_list
|
|
flavor_get = core.flavor_get
|
|
flavor_create = core.flavor_create
|
|
flavor_delete = core.flavor_delete
|
|
flavor_update = core.flavor_update
|
|
flavor_list = core.flavor_list
|
|
claim_create = core.claim_create
|
|
claim_get = core.claim_get
|
|
claim_update = core.claim_update
|
|
claim_delete = core.claim_delete
|
|
|
|
|
|
def queue_update(transport, request, name, metadata, callback=None):
|
|
"""Updates a queue's metadata using PATCH for API v2
|
|
|
|
:param transport: Transport instance to use
|
|
:type transport: `transport.base.Transport`
|
|
:param request: Request instance ready to be sent.
|
|
:type request: `transport.request.Request`
|
|
:param name: Queue reference name.
|
|
:type name: `six.text_type`
|
|
:param metadata: Queue's metadata object.
|
|
:type metadata: `list`
|
|
:param callback: Optional callable to use as callback.
|
|
If specified, this request will be sent asynchronously.
|
|
(IGNORED UNTIL ASYNC SUPPORT IS COMPLETE)
|
|
:type callback: Callable object.
|
|
"""
|
|
|
|
request.operation = 'queue_update'
|
|
request.params['queue_name'] = name
|
|
request.content = json.dumps(metadata)
|
|
|
|
resp = transport.send(request)
|
|
return resp.deserialized_content
|
|
|
|
|
|
def signed_url_create(transport, request, queue_name, paths=None,
|
|
ttl_seconds=None, project_id=None, methods=None):
|
|
"""Creates a signed URL given a queue name
|
|
|
|
:param transport: Transport instance to use
|
|
:type transport: `transport.base.Transport`
|
|
:param request: Request instance ready to be sent.
|
|
:type request: `transport.request.Request`
|
|
:param queue_name: name of Queue for the URL to access
|
|
:type name: `six.text_type`
|
|
:param paths: Allowed actions. Options: messages, subscriptions, claims
|
|
:type name: list
|
|
:param ttl_seconds: Seconds the URL will be valid for, default 86400
|
|
:type name: int
|
|
:param project_id: defaults to None
|
|
:type name: `six.text_type`
|
|
:param methods: HTTP methods to allow, defaults to ["GET"]
|
|
:type name: `list`
|
|
"""
|
|
|
|
request.operation = 'signed_url_create'
|
|
request.params['queue_name'] = queue_name
|
|
|
|
body = {}
|
|
if ttl_seconds is not None:
|
|
expiry = (timeutils.utcnow() + datetime.timedelta(seconds=ttl_seconds))
|
|
body['expires'] = expiry.isoformat()
|
|
|
|
if project_id is not None:
|
|
body['project_id'] = project_id
|
|
|
|
if paths is not None:
|
|
body['paths'] = paths
|
|
|
|
if methods is not None:
|
|
body['methods'] = methods
|
|
|
|
request.content = json.dumps(body)
|
|
|
|
resp = transport.send(request)
|
|
return resp.deserialized_content
|
|
|
|
|
|
def subscription_create(transport, request, queue_name, subscription_data):
|
|
"""Creates a new subscription against the `queue_name`
|
|
|
|
|
|
:param transport: Transport instance to use
|
|
:type transport: `transport.base.Transport`
|
|
:param request: Request instance ready to be sent.
|
|
:type request: `transport.request.Request`
|
|
:param queue_name: Queue reference name.
|
|
:type queue_name: `six.text_type`
|
|
:param subscription_data: Subscription's properties, i.e: subscriber,
|
|
ttl, options.
|
|
:type subscription_data: `dict`
|
|
"""
|
|
|
|
request.operation = 'subscription_create'
|
|
request.params['queue_name'] = queue_name
|
|
request.content = json.dumps(subscription_data)
|
|
resp = transport.send(request)
|
|
|
|
return resp.deserialized_content
|
|
|
|
|
|
def subscription_get(transport, request, queue_name, subscription_id):
|
|
"""Gets a particular subscription data
|
|
|
|
:param transport: Transport instance to use
|
|
:type transport: `transport.base.Transport`
|
|
:param request: Request instance ready to be sent.
|
|
:type request: `transport.request.Request`
|
|
:param queue_name: Queue reference name.
|
|
:type queue_name: `six.text_type`
|
|
:param subscription_id: ID of subscription.
|
|
:type subscription_id: `six.text_type`
|
|
|
|
"""
|
|
|
|
request.operation = 'subscription_get'
|
|
request.params['queue_name'] = queue_name
|
|
request.params['subscription_id'] = subscription_id
|
|
|
|
resp = transport.send(request)
|
|
return resp.deserialized_content
|
|
|
|
|
|
def subscription_update(transport, request, queue_name, subscription_id,
|
|
subscription_data):
|
|
"""Updates the subscription
|
|
|
|
:param transport: Transport instance to use
|
|
:type transport: `transport.base.Transport`
|
|
:param request: Request instance ready to be sent.
|
|
:type request: `transport.request.Request`
|
|
:param queue_name: Queue reference name.
|
|
:type queue_name: `six.text_type`
|
|
:param subscription_id: ID of subscription.
|
|
:type subscription_id: `six.text_type`
|
|
:param subscription_data: Subscription's properties, i.e: subscriber,
|
|
ttl, options.
|
|
:type subscription_data: `dict`
|
|
"""
|
|
|
|
request.operation = 'subscription_update'
|
|
request.params['queue_name'] = queue_name
|
|
request.params['subscription_id'] = subscription_id
|
|
request.content = json.dumps(subscription_data)
|
|
|
|
resp = transport.send(request)
|
|
return resp.deserialized_content
|
|
|
|
|
|
def subscription_delete(transport, request, queue_name, subscription_id):
|
|
"""Deletes the subscription
|
|
|
|
:param transport: Transport instance to use
|
|
:type transport: `transport.base.Transport`
|
|
:param request: Request instance ready to be sent.
|
|
:type request: `transport.request.Request`
|
|
:param queue_name: Queue reference name.
|
|
:type queue_name: `six.text_type`
|
|
:param subscription_id: ID of subscription.
|
|
:type subscription_id: `six.text_type`
|
|
"""
|
|
|
|
request.operation = 'subscription_delete'
|
|
request.params['queue_name'] = queue_name
|
|
request.params['subscription_id'] = subscription_id
|
|
transport.send(request)
|
|
|
|
|
|
def subscription_list(transport, request, queue_name, **kwargs):
|
|
"""Gets a list of subscriptions
|
|
|
|
:param transport: Transport instance to use
|
|
:type transport: `transport.base.Transport`
|
|
:param request: Request instance ready to be sent.
|
|
:type request: `transport.request.Request`
|
|
:param queue_name: Queue reference name.
|
|
:type queue_name: `six.text_type`
|
|
:param kwargs: Optional arguments for this operation.
|
|
- marker: Where to start getting subscriptions from.
|
|
- limit: Maximum number of subscriptions to get.
|
|
"""
|
|
|
|
request.operation = 'subscription_list'
|
|
request.params['queue_name'] = queue_name
|
|
request.params.update(kwargs)
|
|
|
|
resp = transport.send(request)
|
|
|
|
if not resp.content:
|
|
return {'links': [], 'subscriptions': []}
|
|
|
|
return resp.deserialized_content
|
|
|
|
|
|
def ping(transport, request, callback=None):
|
|
"""Check the health of web head for load balancing
|
|
|
|
:param transport: Transport instance to use
|
|
:type transport: `transport.base.Transport`
|
|
:param request: Request instance ready to be sent.
|
|
:type request: `transport.request.Request`
|
|
:param callback: Optional callable to use as callback.
|
|
If specified, this request will be sent asynchronously.
|
|
(IGNORED UNTIL ASYNC SUPPORT IS COMPLETE)
|
|
:type callback: Callable object.
|
|
"""
|
|
|
|
request.operation = 'ping'
|
|
try:
|
|
transport.send(request)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def health(transport, request, callback=None):
|
|
"""Get detailed health status of Zaqar server
|
|
|
|
:param transport: Transport instance to use
|
|
:type transport: `transport.base.Transport`
|
|
:param request: Request instance ready to be sent.
|
|
:type request: `transport.request.Request`
|
|
:param callback: Optional callable to use as callback.
|
|
If specified, this request will be sent asynchronously.
|
|
(IGNORED UNTIL ASYNC SUPPORT IS COMPLETE)
|
|
:type callback: Callable object.
|
|
"""
|
|
|
|
request.operation = 'health'
|
|
resp = transport.send(request)
|
|
return resp.deserialized_content
|