Fix xml request doesn't work with unicode

Fix bug 1160704

* httplib2 doesn't work with unicode url and params. So encode all unicode
to utf-8 before request.
* Fix xml serializer doesn't work with unicode
* Decode command argument to unicode in main function
* Exception's message maybe include unicode, decode message to unicode
before logging or print.
* Sync the changing of serializer/deserilizer's code with quantum server code
  https://review.openstack.org/#/c/25482/
  https://review.openstack.org/#/c/25046/
  https://review.openstack.org/#/c/21410/
* Enable xml test

Change-Id: Ib140e34d54cc916e2ea172e4bad9e4a77388723a
This commit is contained in:
He Jie Xu 2013-04-01 13:27:28 +08:00
parent 00935b8c1f
commit 4421ab10b8
23 changed files with 481 additions and 65 deletions

@ -1,7 +1,7 @@
[DEFAULT]
# The list of modules to copy from openstack-common
modules=exception,gettextutils,jsonutils,setup,timeutils
modules=exception,gettextutils,jsonutils,setup,strutils,timeutils
# The base module to hold the copy of openstack.common
base=quantumclient

@ -122,6 +122,8 @@ class HTTPClient(httplib2.Http):
if 'body' in kwargs:
kargs['body'] = kwargs['body']
args = utils.safe_encode_list(args)
kargs = utils.safe_encode_dict(kargs)
utils.http_log_req(_logger, args, kargs)
resp, body = self.request(*args, **kargs)
utils.http_log_resp(_logger, resp, body)

@ -23,6 +23,7 @@ TYPE_XMLNS = "xmlns:quantum"
TYPE_ATTR = "quantum:type"
VIRTUAL_ROOT_KEY = "_v_root"
ATOM_NAMESPACE = "http://www.w3.org/2005/Atom"
ATOM_XMLNS = "xmlns:atom"
ATOM_LINK_NOTATION = "{%s}link" % ATOM_NAMESPACE
TYPE_BOOL = "bool"

@ -58,7 +58,9 @@ class JSONDictSerializer(DictSerializer):
"""Default JSON request body serialization"""
def default(self, data):
return jsonutils.dumps(data)
def sanitizer(obj):
return unicode(obj)
return jsonutils.dumps(data, default=sanitizer)
class XMLDictSerializer(DictSerializer):
@ -78,21 +80,33 @@ class XMLDictSerializer(DictSerializer):
self.xmlns = xmlns
def default(self, data):
# We expect data to contain a single key which is the XML root or
# non root
"""
:param data: expect data to contain a single key as XML root, or
contain another '*_links' key as atom links. Other
case will use 'VIRTUAL_ROOT_KEY' as XML root.
"""
try:
key_len = data and len(data.keys()) or 0
if (key_len == 1):
root_key = data.keys()[0]
root_value = data[root_key]
else:
links = None
has_atom = False
if data is None:
root_key = constants.VIRTUAL_ROOT_KEY
root_value = data
root_value = None
else:
link_keys = [k for k in data.iterkeys() or []
if k.endswith('_links')]
if link_keys:
links = data.pop(link_keys[0], None)
has_atom = True
root_key = (len(data) == 1 and
data.keys()[0] or constants.VIRTUAL_ROOT_KEY)
root_value = data.get(root_key, data)
doc = etree.Element("_temp_root")
used_prefixes = []
self._to_xml_node(doc, self.metadata, root_key,
root_value, used_prefixes)
return self.to_xml_string(list(doc)[0], used_prefixes)
if links:
self._create_link_nodes(list(doc)[0], links)
return self.to_xml_string(list(doc)[0], used_prefixes, has_atom)
except AttributeError as e:
LOG.exception(str(e))
return ''
@ -115,7 +129,7 @@ class XMLDictSerializer(DictSerializer):
node.set('xmlns', self.xmlns)
node.set(constants.TYPE_XMLNS, self.xmlns)
if has_atom:
node.set('xmlns:atom', "http://www.w3.org/2005/Atom")
node.set(constants.ATOM_XMLNS, constants.ATOM_NAMESPACE)
node.set(constants.XSI_NIL_ATTR, constants.XSI_NAMESPACE)
ext_ns = self.metadata.get(constants.EXT_NS, {})
for prefix in used_prefixes:
@ -179,19 +193,17 @@ class XMLDictSerializer(DictSerializer):
LOG.debug(_("Data %(data)s type is %(type)s"),
{'data': data,
'type': type(data)})
result.text = str(data)
if isinstance(data, str):
result.text = unicode(data, 'utf-8')
else:
result.text = unicode(data)
return result
def _create_link_nodes(self, xml_doc, links):
link_nodes = []
for link in links:
link_node = xml_doc.createElement('atom:link')
link_node = etree.SubElement(xml_doc, 'atom:link')
link_node.set('rel', link['rel'])
link_node.set('href', link['href'])
if 'type' in link:
link_node.set('type', link['type'])
link_nodes.append(link_node)
return link_nodes
class TextDeserializer(ActionDispatcher):
@ -329,7 +341,7 @@ class XMLDeserializer(TextDeserializer):
attr == constants.XSI_ATTR or
attr == constants.TYPE_ATTR):
continue
result[self._get_key(attr)] = node.get[attr]
result[self._get_key(attr)] = node.get(attr)
children = list(node)
for child in children:
result[self._get_key(child.tag)] = self._from_xml_node(

@ -27,6 +27,7 @@ import os
import sys
from quantumclient.common import exceptions
from quantumclient.openstack.common import strutils
def env(*vars, **kwargs):
@ -165,6 +166,7 @@ def http_log_req(_logger, args, kwargs):
if 'body' in kwargs and kwargs['body']:
string_parts.append(" -d '%s'" % (kwargs['body']))
string_parts = safe_encode_list(string_parts)
_logger.debug("\nREQ: %s\n" % "".join(string_parts))
@ -172,3 +174,24 @@ def http_log_resp(_logger, resp, body):
if not _logger.isEnabledFor(logging.DEBUG):
return
_logger.debug("RESP:%s %s\n", resp, body)
def _safe_encode_without_obj(data):
if isinstance(data, basestring):
return strutils.safe_encode(data)
return data
def safe_encode_list(data):
return map(_safe_encode_without_obj, data)
def safe_encode_dict(data):
def _encode_item((k, v)):
if isinstance(v, list):
return (k, safe_encode_list(v))
elif isinstance(v, dict):
return (k, safe_encode_dict(v))
return (k, _safe_encode_without_obj(v))
return dict(map(_encode_item, data.items()))

@ -0,0 +1,133 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
System-level utilities and helper functions.
"""
import logging
import sys
LOG = logging.getLogger(__name__)
def int_from_bool_as_string(subject):
"""
Interpret a string as a boolean and return either 1 or 0.
Any string value in:
('True', 'true', 'On', 'on', '1')
is interpreted as a boolean True.
Useful for JSON-decoded stuff and config file parsing
"""
return bool_from_string(subject) and 1 or 0
def bool_from_string(subject):
"""
Interpret a string as a boolean.
Any string value in:
('True', 'true', 'On', 'on', 'Yes', 'yes', '1')
is interpreted as a boolean True.
Useful for JSON-decoded stuff and config file parsing
"""
if isinstance(subject, bool):
return subject
if isinstance(subject, basestring):
if subject.strip().lower() in ('true', 'on', 'yes', '1'):
return True
return False
def safe_decode(text, incoming=None, errors='strict'):
"""
Decodes incoming str using `incoming` if they're
not already unicode.
:param incoming: Text's current encoding
:param errors: Errors handling policy. See here for valid
values http://docs.python.org/2/library/codecs.html
:returns: text or a unicode `incoming` encoded
representation of it.
:raises TypeError: If text is not an isntance of basestring
"""
if not isinstance(text, basestring):
raise TypeError("%s can't be decoded" % type(text))
if isinstance(text, unicode):
return text
if not incoming:
incoming = (sys.stdin.encoding or
sys.getdefaultencoding())
try:
return text.decode(incoming, errors)
except UnicodeDecodeError:
# Note(flaper87) If we get here, it means that
# sys.stdin.encoding / sys.getdefaultencoding
# didn't return a suitable encoding to decode
# text. This happens mostly when global LANG
# var is not set correctly and there's no
# default encoding. In this case, most likely
# python will use ASCII or ANSI encoders as
# default encodings but they won't be capable
# of decoding non-ASCII characters.
#
# Also, UTF-8 is being used since it's an ASCII
# extension.
return text.decode('utf-8', errors)
def safe_encode(text, incoming=None,
encoding='utf-8', errors='strict'):
"""
Encodes incoming str/unicode using `encoding`. If
incoming is not specified, text is expected to
be encoded with current python's default encoding.
(`sys.getdefaultencoding`)
:param incoming: Text's current encoding
:param encoding: Expected encoding for text (Default UTF-8)
:param errors: Errors handling policy. See here for valid
values http://docs.python.org/2/library/codecs.html
:returns: text or a bytestring `encoding` encoded
representation of it.
:raises TypeError: If text is not an isntance of basestring
"""
if not isinstance(text, basestring):
raise TypeError("%s can't be encoded" % type(text))
if not incoming:
incoming = (sys.stdin.encoding or
sys.getdefaultencoding())
if isinstance(text, unicode):
return text.encode(encoding, errors)
elif text and encoding != incoming:
# Decode text before encoding it with `encoding`
text = safe_decode(text, incoming, errors)
return text.encode(encoding, errors)
return text

@ -31,6 +31,7 @@ from cliff.commandmanager import CommandManager
from quantumclient.common import clientmanager
from quantumclient.common import exceptions as exc
from quantumclient.common import utils
from quantumclient.openstack.common import strutils
VERSION = '2.0'
@ -476,10 +477,10 @@ class QuantumShell(App):
self.initialize_app(remainder)
except Exception as err:
if self.options.debug:
self.log.exception(err)
self.log.exception(unicode(err))
raise
else:
self.log.error(err)
self.log.error(unicode(err))
return 1
result = 1
if self.interactive_mode:
@ -506,16 +507,16 @@ class QuantumShell(App):
return run_command(cmd, cmd_parser, sub_argv)
except Exception as err:
if self.options.debug:
self.log.exception(err)
self.log.exception(unicode(err))
else:
self.log.error(err)
self.log.error(unicode(err))
try:
self.clean_up(cmd, result, err)
except Exception as err2:
if self.options.debug:
self.log.exception(err2)
self.log.exception(unicode(err2))
else:
self.log.error('Could not clean up: %s', err2)
self.log.error('Could not clean up: %s', unicode(err2))
if self.options.debug:
raise
else:
@ -523,9 +524,9 @@ class QuantumShell(App):
self.clean_up(cmd, result, None)
except Exception as err3:
if self.options.debug:
self.log.exception(err3)
self.log.exception(unicode(err3))
else:
self.log.error('Could not clean up: %s', err3)
self.log.error('Could not clean up: %s', unicode(err3))
return result
def authenticate_user(self):
@ -608,7 +609,7 @@ class QuantumShell(App):
def clean_up(self, cmd, result, err):
self.log.debug('clean_up %s', cmd.__class__.__name__)
if err:
self.log.debug('got an error: %s', err)
self.log.debug('got an error: %s', unicode(err))
def configure_logging(self):
"""Create logging handlers for any log output.
@ -637,11 +638,12 @@ class QuantumShell(App):
def main(argv=sys.argv[1:]):
gettext.install('quantumclient', unicode=1)
try:
return QuantumShell(QUANTUM_API_VERSION).run(argv)
return QuantumShell(QUANTUM_API_VERSION).run(map(strutils.safe_decode,
argv))
except exc.QuantumClientException:
return 1
except Exception as e:
print e
print unicode(e)
return 1

@ -0,0 +1,45 @@
# Copyright 2013 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import testtools
from quantumclient.common import utils
class UtilsTest(testtools.TestCase):
def test_safe_encode_list(self):
o = object()
unicode_text = u'\u7f51\u7edc'
l = ['abc', unicode_text, unicode_text.encode('utf-8'), o]
expected = ['abc', unicode_text.encode('utf-8'),
unicode_text.encode('utf-8'), o]
self.assertEqual(utils.safe_encode_list(l), expected)
def test_safe_encode_dict(self):
o = object()
unicode_text = u'\u7f51\u7edc'
d = {'test1': unicode_text,
'test2': [unicode_text, o],
'test3': o,
'test4': {'test5': unicode_text},
'test6': unicode_text.encode('utf-8')}
expected = {'test1': unicode_text.encode('utf-8'),
'test2': [unicode_text.encode('utf-8'), o],
'test3': o,
'test4': {'test5': unicode_text.encode('utf-8')},
'test6': unicode_text.encode('utf-8')}
self.assertEqual(utils.safe_encode_dict(d), expected)

@ -26,6 +26,7 @@ from quantumclient.common import _
from quantumclient.common import constants
from quantumclient.common import exceptions
from quantumclient.common import serializer
from quantumclient.common import utils
_logger = logging.getLogger(__name__)
@ -897,6 +898,7 @@ class Client(object):
action += ".%s" % self.format
action = self.action_prefix + action
if type(params) is dict and params:
params = utils.safe_encode_dict(params)
action += '?' + urllib.urlencode(params, doseq=1)
if body:
body = self.serialize(body)

@ -25,7 +25,7 @@ from quantumclient.quantum.v2_0.lb import healthmonitor
from tests.unit import test_cli20
class CLITestV20LbHealthmonitor(test_cli20.CLITestV20Base):
class CLITestV20LbHealthmonitorJSON(test_cli20.CLITestV20Base):
def test_create_healthmonitor_with_mandatory_params(self):
"""lb-healthmonitor-create with mandatory params only"""
resource = 'health_monitor'
@ -213,3 +213,7 @@ class CLITestV20LbHealthmonitor(test_cli20.CLITestV20Base):
cmd.run(parsed_args)
self.mox.VerifyAll()
self.mox.UnsetStubs()
class CLITestV20LbHealthmonitorXML(CLITestV20LbHealthmonitorJSON):
format = 'xml'

@ -23,7 +23,10 @@ from quantumclient.quantum.v2_0.lb import member
from tests.unit import test_cli20
class CLITestV20LbMember(test_cli20.CLITestV20Base):
class CLITestV20LbMemberJSON(test_cli20.CLITestV20Base):
def setUp(self):
super(CLITestV20LbMemberJSON, self).setUp(plurals={'tags': 'tag'})
def test_create_member(self):
"""lb-member-create with mandatory params only"""
resource = 'member'
@ -125,3 +128,7 @@ class CLITestV20LbMember(test_cli20.CLITestV20Base):
my_id = 'my-id'
args = [my_id]
self._test_delete_resource(resource, cmd, my_id, args)
class CLITestV20LbMemberXML(CLITestV20LbMemberJSON):
format = 'xml'

@ -25,7 +25,7 @@ from quantumclient.quantum.v2_0.lb import pool
from tests.unit import test_cli20
class CLITestV20LbPool(test_cli20.CLITestV20Base):
class CLITestV20LbPoolJSON(test_cli20.CLITestV20Base):
def test_create_pool_with_mandatory_params(self):
"""lb-pool-create with mandatory params only"""
@ -165,3 +165,7 @@ class CLITestV20LbPool(test_cli20.CLITestV20Base):
_str = self.fake_stdout.make_string()
self.assertTrue('bytes_in' in _str)
self.assertTrue('bytes_out' in _str)
class CLITestV20LbPoolXML(CLITestV20LbPoolJSON):
format = 'xml'

@ -23,7 +23,10 @@ from quantumclient.quantum.v2_0.lb import vip
from tests.unit import test_cli20
class CLITestV20LbVip(test_cli20.CLITestV20Base):
class CLITestV20LbVipJSON(test_cli20.CLITestV20Base):
def setUp(self):
super(CLITestV20LbVipJSON, self).setUp(plurals={'tags': 'tag'})
def test_create_vip_with_mandatory_params(self):
"""lb-vip-create with all mandatory params"""
resource = 'vip'
@ -205,3 +208,7 @@ class CLITestV20LbVip(test_cli20.CLITestV20Base):
my_id = 'my-id'
args = [my_id]
self._test_delete_resource(resource, cmd, my_id, args)
class CLITestV20LbVipXML(CLITestV20LbVipJSON):
format = 'xml'

@ -15,12 +15,15 @@
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import urllib
import fixtures
import mox
from mox import Comparator
from mox import ContainsKeyValue
import testtools
from quantumclient.common import constants
from quantumclient import shell
from quantumclient.v2_0.client import Client
@ -56,8 +59,8 @@ class MyApp(object):
self.stdout = _stdout
def end_url(path, query=None):
_url_str = ENDURL + "/v" + API_VERSION + path + "." + FORMAT
def end_url(path, query=None, format=FORMAT):
_url_str = ENDURL + "/v" + API_VERSION + path + "." + format
return query and _url_str + "?" + query or _url_str
@ -73,12 +76,12 @@ class MyUrlComparator(Comparator):
if self.client and self.client.format != FORMAT:
lhs_parts = self.lhs.split("?", 1)
if len(lhs_parts) == 2:
lhs = ("%s%s?%s" % (lhs_parts[0][:-4],
self.client.format,
lhs_parts[1]))
lhs = ("%s.%s?%s" % (lhs_parts[0][:-4],
self.client.format,
lhs_parts[1]))
else:
lhs = ("%s%s" % (lhs_parts[0][:-4],
self.client.format))
lhs = ("%s.%s" % (lhs_parts[0][:-4],
self.client.format))
return lhs
return self.lhs
@ -133,27 +136,47 @@ class MyComparator(Comparator):
return self._com(self.lhs, rhs)
def __repr__(self):
if self.client:
return self.client.serialize(self.lhs)
return str(self.lhs)
class CLITestV20Base(testtools.TestCase):
format = 'json'
test_id = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
def _find_resourceid(self, client, resource, name_or_id):
return name_or_id
def setUp(self):
def _get_attr_metadata(self):
return self.metadata
Client.EXTED_PLURALS.update(constants.PLURALS)
Client.EXTED_PLURALS.update({'tags': 'tag'})
return {'plurals': Client.EXTED_PLURALS,
'xmlns': constants.XML_NS_V20,
constants.EXT_NS: {'prefix': 'http://xxxx.yy.com'}}
def setUp(self, plurals={}):
"""Prepare the test environment"""
super(CLITestV20Base, self).setUp()
Client.EXTED_PLURALS.update(constants.PLURALS)
Client.EXTED_PLURALS.update(plurals)
self.metadata = {'plurals': Client.EXTED_PLURALS,
'xmlns': constants.XML_NS_V20,
constants.EXT_NS: {'prefix':
'http://xxxx.yy.com'}}
self.mox = mox.Mox()
self.endurl = ENDURL
self.client = Client(token=TOKEN, endpoint_url=self.endurl)
self.fake_stdout = FakeStdout()
self.useFixture(fixtures.MonkeyPatch('sys.stdout', self.fake_stdout))
self.useFixture(fixtures.MonkeyPatch(
'quantumclient.quantum.v2_0.find_resourceid_by_name_or_id',
self._find_resourceid))
self.useFixture(fixtures.MonkeyPatch(
'quantumclient.v2_0.client.Client.get_attr_metadata',
self._get_attr_metadata))
self.client = Client(token=TOKEN, endpoint_url=self.endurl)
def _test_create_resource(self, resource, cmd,
name, myid, args,
@ -186,15 +209,17 @@ class CLITestV20Base(testtools.TestCase):
{'id': myid}, }
if name:
ress[resource].update({'name': name})
self.client.format = self.format
resstr = self.client.serialize(ress)
# url method body
path = getattr(self.client, resource + "s_path")
self.client.httpclient.request(
end_url(path), 'POST',
end_url(path, format=self.format), 'POST',
body=MyComparator(body, self.client),
headers=ContainsKeyValue('X-Auth-Token',
TOKEN)).AndReturn((MyResp(200),
resstr))
args.extend(['--request-format', self.format])
self.mox.ReplayAll()
cmd_parser = cmd.get_parser('create_' + resource)
shell.run_command(cmd, cmd_parser, args)
@ -210,14 +235,16 @@ class CLITestV20Base(testtools.TestCase):
self.mox.StubOutWithMock(cmd, "get_client")
self.mox.StubOutWithMock(self.client.httpclient, "request")
cmd.get_client().MultipleTimes().AndReturn(self.client)
self.client.format = self.format
resstr = self.client.serialize(resources_out)
path = getattr(self.client, resources_collection + "_path")
self.client.httpclient.request(
end_url(path), 'GET',
end_url(path, format=self.format), 'GET',
body=None,
headers=ContainsKeyValue('X-Auth-Token',
TOKEN)).AndReturn((MyResp(200), resstr))
args.extend(['--request-format', self.format])
self.mox.ReplayAll()
cmd_parser = cmd.get_parser("list_" + resources_collection)
shell.run_command(cmd, cmd_parser, args)
@ -232,10 +259,12 @@ class CLITestV20Base(testtools.TestCase):
cmd.get_client().MultipleTimes().AndReturn(self.client)
reses = {resources: [{'id': 'myid1', },
{'id': 'myid2', }, ], }
self.client.format = self.format
resstr = self.client.serialize(reses)
# url method body
query = ""
args = detail and ['-D', ] or []
args.extend(['--request-format', self.format])
if fields_1:
for field in fields_1:
args.append('--fields')
@ -245,11 +274,13 @@ class CLITestV20Base(testtools.TestCase):
args.append('--')
args.append("--tag")
for tag in tags:
args.append(tag)
if isinstance(tag, unicode):
tag = urllib.quote(tag.encode('utf-8'))
if query:
query += "&tag=" + tag
else:
query = "tag=" + tag
args.append(tag)
if (not tags) and fields_2:
args.append('--')
if fields_2:
@ -292,7 +323,9 @@ class CLITestV20Base(testtools.TestCase):
query += 'sort_dir=%s' % dir
path = getattr(self.client, resources + "_path")
self.client.httpclient.request(
MyUrlComparator(end_url(path, query), self.client), 'GET',
MyUrlComparator(end_url(path, query, format=self.format),
self.client),
'GET',
body=None,
headers=ContainsKeyValue('X-Auth-Token',
TOKEN)).AndReturn((MyResp(200), resstr))
@ -316,23 +349,25 @@ class CLITestV20Base(testtools.TestCase):
'rel': 'next'}]}
reses2 = {resources: [{'id': 'myid3', },
{'id': 'myid4', }]}
self.client.format = self.format
resstr1 = self.client.serialize(reses1)
resstr2 = self.client.serialize(reses2)
self.client.httpclient.request(
end_url(path, ""), 'GET',
end_url(path, "", format=self.format), 'GET',
body=None,
headers=ContainsKeyValue('X-Auth-Token',
TOKEN)).AndReturn((MyResp(200), resstr1))
self.client.httpclient.request(
end_url(path, fake_query), 'GET',
end_url(path, fake_query, format=self.format), 'GET',
body=None,
headers=ContainsKeyValue('X-Auth-Token',
TOKEN)).AndReturn((MyResp(200), resstr2))
self.mox.ReplayAll()
cmd_parser = cmd.get_parser("list_" + resources)
parsed_args = cmd_parser.parse_args("")
cmd.run(parsed_args)
args = ['--request-format', self.format]
shell.run_command(cmd, cmd_parser, args)
#parsed_args = cmd_parser.parse_args("")
#cmd.run(parsed_args)
self.mox.VerifyAll()
self.mox.UnsetStubs()
@ -343,10 +378,13 @@ class CLITestV20Base(testtools.TestCase):
body = {resource: extrafields}
path = getattr(self.client, resource + "_path")
self.client.httpclient.request(
MyUrlComparator(end_url(path % myid), self.client), 'PUT',
MyUrlComparator(end_url(path % myid, format=self.format),
self.client),
'PUT',
body=MyComparator(body, self.client),
headers=ContainsKeyValue('X-Auth-Token',
TOKEN)).AndReturn((MyResp(204), None))
args.extend(['--request-format', self.format])
self.mox.ReplayAll()
cmd_parser = cmd.get_parser("update_" + resource)
shell.run_command(cmd, cmd_parser, args)
@ -363,13 +401,15 @@ class CLITestV20Base(testtools.TestCase):
expected_res = {resource:
{'id': myid,
'name': 'myname', }, }
self.client.format = self.format
resstr = self.client.serialize(expected_res)
path = getattr(self.client, resource + "_path")
self.client.httpclient.request(
end_url(path % myid, query), 'GET',
end_url(path % myid, query, format=self.format), 'GET',
body=None,
headers=ContainsKeyValue('X-Auth-Token',
TOKEN)).AndReturn((MyResp(200), resstr))
args.extend(['--request-format', self.format])
self.mox.ReplayAll()
cmd_parser = cmd.get_parser("show_" + resource)
shell.run_command(cmd, cmd_parser, args)
@ -385,10 +425,11 @@ class CLITestV20Base(testtools.TestCase):
cmd.get_client().MultipleTimes().AndReturn(self.client)
path = getattr(self.client, resource + "_path")
self.client.httpclient.request(
end_url(path % myid), 'DELETE',
end_url(path % myid, format=self.format), 'DELETE',
body=None,
headers=ContainsKeyValue('X-Auth-Token',
TOKEN)).AndReturn((MyResp(204), None))
args.extend(['--request-format', self.format])
self.mox.ReplayAll()
cmd_parser = cmd.get_parser("delete_" + resource)
shell.run_command(cmd, cmd_parser, args)
@ -405,10 +446,11 @@ class CLITestV20Base(testtools.TestCase):
path = getattr(self.client, resource + "_path")
path_action = '%s/%s' % (myid, action)
self.client.httpclient.request(
end_url(path % path_action), 'PUT',
end_url(path % path_action, format=self.format), 'PUT',
body=MyComparator(body, self.client),
headers=ContainsKeyValue('X-Auth-Token',
TOKEN)).AndReturn((MyResp(204), None))
args.extend(['--request-format', self.format])
self.mox.ReplayAll()
cmd_parser = cmd.get_parser("delete_" + resource)
shell.run_command(cmd, cmd_parser, args)
@ -416,3 +458,43 @@ class CLITestV20Base(testtools.TestCase):
self.mox.UnsetStubs()
_str = self.fake_stdout.make_string()
self.assertTrue(myid in _str)
class ClientV2UnicodeTestJson(CLITestV20Base):
def test_do_request(self):
self.client.format = self.format
self.mox.StubOutWithMock(self.client.httpclient, "request")
unicode_text = u'\u7f51\u7edc'
# url with unicode
action = u'/test'
expected_action = action.encode('utf-8')
# query string with unicode
params = {'test': unicode_text}
expect_query = urllib.urlencode({'test':
unicode_text.encode('utf-8')})
# request body with unicode
body = params
expect_body = self.client.serialize(body)
# headers with unicode
self.client.httpclient.auth_token = unicode_text
expected_auth_token = unicode_text.encode('utf-8')
self.client.httpclient.request(
end_url(expected_action, query=expect_query, format=self.format),
'PUT', body=expect_body,
headers=ContainsKeyValue(
'X-Auth-Token',
expected_auth_token)).AndReturn((MyResp(200), expect_body))
self.mox.ReplayAll()
res_body = self.client.do_request('PUT', action, body=body,
params=params)
self.mox.VerifyAll()
self.mox.UnsetStubs()
# test response with unicode
self.assertEqual(res_body, body)
class ClientV2UnicodeTestXML(ClientV2UnicodeTestJson):
format = 'xml'

@ -28,7 +28,7 @@ from tests.unit.test_cli20 import CLITestV20Base
from tests.unit.test_cli20 import MyApp
class CLITestV20FloatingIps(CLITestV20Base):
class CLITestV20FloatingIpsJSON(CLITestV20Base):
def test_create_floatingip(self):
"""Create floatingip: fip1."""
resource = 'floatingip'
@ -139,3 +139,7 @@ class CLITestV20FloatingIps(CLITestV20Base):
self._test_update_resource(resource, cmd, 'myid',
args, {"port_id": "portid"}
)
class CLITestV20FloatingIpsXML(CLITestV20FloatingIpsJSON):
format = 'xml'

@ -33,7 +33,10 @@ from tests.unit.test_cli20 import CLITestV20Base
from tests.unit.test_cli20 import MyApp
class CLITestV20Network(CLITestV20Base):
class CLITestV20NetworkJSON(CLITestV20Base):
def setUp(self):
super(CLITestV20NetworkJSON, self).setUp(plurals={'tags': 'tag'})
def test_create_network(self):
"""Create net: myname."""
resource = 'network'
@ -46,6 +49,18 @@ class CLITestV20Network(CLITestV20Base):
self._test_create_resource(resource, cmd, name, myid, args,
position_names, position_values)
def test_create_network_with_unicode(self):
"""Create net: u'\u7f51\u7edc'."""
resource = 'network'
cmd = CreateNetwork(MyApp(sys.stdout), None)
name = u'\u7f51\u7edc'
myid = 'myid'
args = [name, ]
position_names = ['name', ]
position_values = [name, ]
self._test_create_resource(resource, cmd, name, myid, args,
position_names, position_values)
def test_create_network_tenant(self):
"""Create net: --tenant_id tenantid myname."""
resource = 'network'
@ -179,6 +194,11 @@ class CLITestV20Network(CLITestV20Base):
cmd = ListNetwork(MyApp(sys.stdout), None)
self._test_list_networks(cmd, tags=['a', 'b'])
def test_list_nets_tags_with_unicode(self):
"""List nets: -- --tags u'\u7f51\u7edc'."""
cmd = ListNetwork(MyApp(sys.stdout), None)
self._test_list_networks(cmd, tags=[u'\u7f51\u7edc'])
def test_list_nets_detail_tags(self):
"""List nets: -D -- --tags a b."""
cmd = ListNetwork(MyApp(sys.stdout), None)
@ -426,6 +446,17 @@ class CLITestV20Network(CLITestV20Base):
{'name': 'myname', 'tags': ['a', 'b'], }
)
def test_update_network_with_unicode(self):
"""Update net: myid --name u'\u7f51\u7edc' --tags a b."""
resource = 'network'
cmd = UpdateNetwork(MyApp(sys.stdout), None)
self._test_update_resource(resource, cmd, 'myid',
['myid', '--name', u'\u7f51\u7edc',
'--tags', 'a', 'b'],
{'name': u'\u7f51\u7edc',
'tags': ['a', 'b'], }
)
def test_show_network(self):
"""Show net: --fields id --fields name myid."""
resource = 'network'
@ -441,3 +472,7 @@ class CLITestV20Network(CLITestV20Base):
myid = 'myid'
args = [myid]
self._test_delete_resource(resource, cmd, myid, args)
class CLITestV20NetworkXML(CLITestV20NetworkJSON):
format = 'xml'

@ -22,7 +22,11 @@ from quantumclient.quantum.v2_0 import nvp_qos_queue as qos
from tests.unit import test_cli20
class CLITestV20NvpQosQueue(test_cli20.CLITestV20Base):
class CLITestV20NvpQosQueueJSON(test_cli20.CLITestV20Base):
def setUp(self):
super(CLITestV20NvpQosQueueJSON, self).setUp(
plurals={'qos_queues': 'qos_queue'})
def test_create_qos_queue(self):
"""Create a qos queue."""
resource = 'qos_queue'
@ -78,3 +82,7 @@ class CLITestV20NvpQosQueue(test_cli20.CLITestV20Base):
myid = 'myid'
args = [myid]
self._test_delete_resource(resource, cmd, myid, args)
class CLITestV20NvpQosQueueXML(CLITestV20NvpQosQueueJSON):
format = 'xml'

@ -22,10 +22,15 @@ from tests.unit.test_cli20 import CLITestV20Base
from tests.unit.test_cli20 import MyApp
class CLITestV20NetworkGateway(CLITestV20Base):
class CLITestV20NetworkGatewayJSON(CLITestV20Base):
resource = "network_gateway"
def setUp(self):
super(CLITestV20NetworkGatewayJSON, self).setUp(
plurals={'devices': 'device',
'network_gateways': 'network_gateway'})
def test_create_gateway(self):
cmd = nvpnetworkgateway.CreateNetworkGateway(MyApp(sys.stdout), None)
name = 'gw-test'
@ -105,3 +110,7 @@ class CLITestV20NetworkGateway(CLITestV20Base):
{'network_id': 'net_id',
'segmentation_type': 'edi',
'segmentation_id': '7'})
class CLITestV20NetworkGatewayXML(CLITestV20NetworkGatewayJSON):
format = 'xml'

@ -31,7 +31,9 @@ from tests.unit.test_cli20 import CLITestV20Base
from tests.unit.test_cli20 import MyApp
class CLITestV20Port(CLITestV20Base):
class CLITestV20PortJSON(CLITestV20Base):
def setUp(self):
super(CLITestV20PortJSON, self).setUp(plurals={'tags': 'tag'})
def test_create_port(self):
"""Create port: netid."""
@ -304,3 +306,7 @@ class CLITestV20Port(CLITestV20Base):
myid = 'myid'
args = [myid]
self._test_delete_resource(resource, cmd, myid, args)
class CLITestV20PortXML(CLITestV20PortJSON):
format = 'xml'

@ -31,7 +31,7 @@ from tests.unit.test_cli20 import CLITestV20Base
from tests.unit.test_cli20 import MyApp
class CLITestV20Router(CLITestV20Base):
class CLITestV20RouterJSON(CLITestV20Base):
def test_create_router(self):
"""Create router: router1."""
resource = 'router'
@ -170,3 +170,7 @@ class CLITestV20Router(CLITestV20Base):
self._test_update_resource(resource, cmd, 'externalid',
args, {"external_gateway_info": {}}
)
class CLITestV20RouterXML(CLITestV20RouterJSON):
format = 'xml'

@ -24,7 +24,7 @@ from quantumclient.quantum.v2_0 import securitygroup
from tests.unit import test_cli20
class CLITestV20SecurityGroups(test_cli20.CLITestV20Base):
class CLITestV20SecurityGroupsJSON(test_cli20.CLITestV20Base):
def test_create_security_group(self):
"""Create security group: webservers."""
resource = 'security_group'
@ -304,3 +304,7 @@ class CLITestV20SecurityGroups(test_cli20.CLITestV20Base):
args = '-F id -F security_group -F remote_group'.split()
self._test_list_security_group_rules_extend(args=args,
query_field=True)
class CLITestV20SecurityGroupsXML(CLITestV20SecurityGroupsJSON):
format = 'xml'

@ -26,7 +26,9 @@ from tests.unit.test_cli20 import CLITestV20Base
from tests.unit.test_cli20 import MyApp
class CLITestV20Subnet(CLITestV20Base):
class CLITestV20SubnetJSON(CLITestV20Base):
def setUp(self):
super(CLITestV20SubnetJSON, self).setUp(plurals={'tags': 'tag'})
def test_create_subnet(self):
"""Create subnet: --gateway gateway netid cidr."""
@ -399,3 +401,7 @@ class CLITestV20Subnet(CLITestV20Base):
myid = 'myid'
args = [myid]
self._test_delete_resource(resource, cmd, myid, args)
class CLITestV20SubnetXML(CLITestV20SubnetJSON):
format = 'xml'

@ -21,6 +21,7 @@ import re
import sys
import fixtures
import mox
import testtools
from testtools import matchers
@ -58,6 +59,7 @@ class ShellTest(testtools.TestCase):
# Patch os.environ to avoid required auth info.
def setUp(self):
super(ShellTest, self).setUp()
self.mox = mox.Mox()
for var in self.FAKE_ENV:
self.useFixture(
fixtures.EnvironmentVariable(
@ -127,3 +129,17 @@ class ShellTest(testtools.TestCase):
quant_shell = openstack_shell.QuantumShell('2.0')
result = quant_shell.build_option_parser('descr', '2.0')
self.assertEqual(True, isinstance(result, argparse.ArgumentParser))
def test_main_with_unicode(self):
self.mox.StubOutClassWithMocks(openstack_shell, 'QuantumShell')
qshell_mock = openstack_shell.QuantumShell('2.0')
#self.mox.StubOutWithMock(qshell_mock, 'run')
unicode_text = u'\u7f51\u7edc'
argv = ['net-list', unicode_text, unicode_text.encode('utf-8')]
qshell_mock.run([u'net-list', unicode_text,
unicode_text]).AndReturn(0)
self.mox.ReplayAll()
ret = openstack_shell.main(argv=argv)
self.mox.VerifyAll()
self.mox.UnsetStubs()
self.assertEqual(ret, 0)