Implements SI/IEC unit system conversion to bytes
The issues with utils.to_bytes are 1) Ignores measure of unit and treats b and B both as Byte 2) Cannot handle SI unit system which base is 1000 (IEC=1024) 3) The name "to_bytes" didn't do a good job representing what the function does. Disscussed here, https://review.openstack.org/#/c/32093/2/openstack/common/strutils.py This patch implements "string_to_bytes". The name comes from "string_to_bool" in cinder/utils.py. Its merits are, 1) Handles bit and byte units 2) Handles SI and IEC unit systems 3) Has a much more intuitive function name Closes-bug: #1189635 Closes-bug: #1193765 Change-Id: I5f1ce56f447179d1ef6ded7c956983151f5a8f18
This commit is contained in:
parent
a53c4dc1db
commit
bec3a5eb81
@ -79,7 +79,8 @@ class QemuImgInfo(object):
|
||||
return int(real_size.group(4))
|
||||
elif not unit_of_measure:
|
||||
return int(magnitude)
|
||||
return strutils.to_bytes('%s%s' % (magnitude, unit_of_measure))
|
||||
return strutils.string_to_bytes('%s%sB' % (magnitude, unit_of_measure),
|
||||
return_int=True)
|
||||
|
||||
def _extract_details(self, root_cmd, root_details, lines_after):
|
||||
real_details = root_details
|
||||
|
@ -17,6 +17,7 @@
|
||||
System-level utilities and helper functions.
|
||||
"""
|
||||
|
||||
import math
|
||||
import re
|
||||
import sys
|
||||
import unicodedata
|
||||
@ -26,16 +27,21 @@ import six
|
||||
from openstack.common.gettextutils import _
|
||||
|
||||
|
||||
# Used for looking up extensions of text
|
||||
# to their 'multiplied' byte amount
|
||||
BYTE_MULTIPLIERS = {
|
||||
'': 1,
|
||||
't': 1024 ** 4,
|
||||
'g': 1024 ** 3,
|
||||
'm': 1024 ** 2,
|
||||
'k': 1024,
|
||||
UNIT_PREFIX_EXPONENT = {
|
||||
'k': 1,
|
||||
'K': 1,
|
||||
'Ki': 1,
|
||||
'M': 2,
|
||||
'Mi': 2,
|
||||
'G': 3,
|
||||
'Gi': 3,
|
||||
'T': 4,
|
||||
'Ti': 4,
|
||||
}
|
||||
UNIT_SYSTEM_INFO = {
|
||||
'IEC': (1024, re.compile(r'(^[-+]?\d*\.?\d+)([KMGT]i?)?(b|bit|B)$')),
|
||||
'SI': (1000, re.compile(r'(^[-+]?\d*\.?\d+)([kMGT])?(b|bit|B)$')),
|
||||
}
|
||||
BYTE_REGEX = re.compile(r'(^-?\d+)(\D*)')
|
||||
|
||||
TRUE_STRINGS = ('1', 't', 'true', 'on', 'y', 'yes')
|
||||
FALSE_STRINGS = ('0', 'f', 'false', 'off', 'n', 'no')
|
||||
@ -167,34 +173,50 @@ def safe_encode(text, incoming=None,
|
||||
return text
|
||||
|
||||
|
||||
def to_bytes(text, default=0):
|
||||
"""Converts a string into an integer of bytes.
|
||||
def string_to_bytes(text, unit_system='IEC', return_int=False):
|
||||
"""Converts a string into an float representation of bytes.
|
||||
|
||||
Looks at the last characters of the text to determine
|
||||
what conversion is needed to turn the input text into a byte number.
|
||||
Supports "B, K(B), M(B), G(B), and T(B)". (case insensitive)
|
||||
The units supported for IEC ::
|
||||
|
||||
Kb(it), Kib(it), Mb(it), Mib(it), Gb(it), Gib(it), Tb(it), Tib(it)
|
||||
KB, KiB, MB, MiB, GB, GiB, TB, TiB
|
||||
|
||||
The units supported for SI ::
|
||||
|
||||
kb(it), Mb(it), Gb(it), Tb(it)
|
||||
kB, MB, GB, TB
|
||||
|
||||
Note that the SI unit system does not support capital letter 'K'
|
||||
|
||||
:param text: String input for bytes size conversion.
|
||||
:param default: Default return value when text is blank.
|
||||
:param unit_system: Unit system for byte size conversion.
|
||||
:param return_int: If True, returns integer representation of text
|
||||
in bytes. (default: decimal)
|
||||
:returns: Numerical representation of text in bytes.
|
||||
:raises ValueError: If text has an invalid value.
|
||||
|
||||
"""
|
||||
match = BYTE_REGEX.search(text)
|
||||
try:
|
||||
base, reg_ex = UNIT_SYSTEM_INFO[unit_system]
|
||||
except KeyError:
|
||||
msg = _('Invalid unit system: "%s"') % unit_system
|
||||
raise ValueError(msg)
|
||||
match = reg_ex.match(text)
|
||||
if match:
|
||||
magnitude = int(match.group(1))
|
||||
mult_key_org = match.group(2)
|
||||
if not mult_key_org:
|
||||
return magnitude
|
||||
elif text:
|
||||
msg = _('Invalid string format: %s') % text
|
||||
raise TypeError(msg)
|
||||
magnitude = float(match.group(1))
|
||||
unit_prefix = match.group(2)
|
||||
if match.group(3) in ['b', 'bit']:
|
||||
magnitude /= 8
|
||||
else:
|
||||
return default
|
||||
mult_key = mult_key_org.lower().replace('b', '', 1)
|
||||
multiplier = BYTE_MULTIPLIERS.get(mult_key)
|
||||
if multiplier is None:
|
||||
msg = _('Unknown byte multiplier: %s') % mult_key_org
|
||||
raise TypeError(msg)
|
||||
return magnitude * multiplier
|
||||
msg = _('Invalid string format: %s') % text
|
||||
raise ValueError(msg)
|
||||
if not unit_prefix:
|
||||
res = magnitude
|
||||
else:
|
||||
res = magnitude * pow(base, UNIT_PREFIX_EXPONENT[unit_prefix])
|
||||
if return_int:
|
||||
return int(math.ceil(res))
|
||||
return res
|
||||
|
||||
|
||||
def to_slug(value, incoming=None, errors="strict"):
|
||||
|
@ -13,4 +13,5 @@ pyzmq==2.2.0.1
|
||||
redis
|
||||
sphinx>=1.1.2,<1.2
|
||||
testrepository>=0.0.17
|
||||
testscenarios>=0.4
|
||||
testtools>=0.9.32
|
||||
|
@ -15,11 +15,17 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import math
|
||||
|
||||
import mock
|
||||
import six
|
||||
import testscenarios
|
||||
|
||||
from openstack.common import strutils
|
||||
from openstack.common import test
|
||||
from openstack.common import units
|
||||
|
||||
load_tests = testscenarios.load_tests_apply_scenarios
|
||||
|
||||
|
||||
class StrUtilsTest(test.BaseTestCase):
|
||||
@ -170,38 +176,6 @@ class StrUtilsTest(test.BaseTestCase):
|
||||
self.assertEqual('ni\xc3\xb1o', safe_encode('ni\xc3\xb1o',
|
||||
incoming='ascii'))
|
||||
|
||||
def test_string_conversions(self):
|
||||
working_examples = {
|
||||
'1024KB': 1048576,
|
||||
'1024TB': 1125899906842624,
|
||||
'1024K': 1048576,
|
||||
'1024T': 1125899906842624,
|
||||
'1TB': 1099511627776,
|
||||
'1T': 1099511627776,
|
||||
'1KB': 1024,
|
||||
'1K': 1024,
|
||||
'1B': 1,
|
||||
'1': 1,
|
||||
'1MB': 1048576,
|
||||
'7MB': 7340032,
|
||||
'0MB': 0,
|
||||
'0KB': 0,
|
||||
'0TB': 0,
|
||||
'': 0,
|
||||
}
|
||||
for (in_value, expected_value) in working_examples.items():
|
||||
b_value = strutils.to_bytes(in_value)
|
||||
self.assertEqual(expected_value, b_value)
|
||||
if in_value:
|
||||
in_value = "-" + in_value
|
||||
b_value = strutils.to_bytes(in_value)
|
||||
self.assertEqual(expected_value * -1, b_value)
|
||||
breaking_examples = [
|
||||
'junk1KB', '1023BBBB',
|
||||
]
|
||||
for v in breaking_examples:
|
||||
self.assertRaises(TypeError, strutils.to_bytes, v)
|
||||
|
||||
def test_slugify(self):
|
||||
to_slug = strutils.to_slug
|
||||
self.assertRaises(TypeError, to_slug, True)
|
||||
@ -216,3 +190,105 @@ class StrUtilsTest(test.BaseTestCase):
|
||||
self.assertEqual(six.u("perche"), to_slug("perch\xc3\xa9"))
|
||||
self.assertEqual(six.u("strange"),
|
||||
to_slug("\x80strange", errors="ignore"))
|
||||
|
||||
|
||||
class StringToBytesTest(test.BaseTestCase):
|
||||
|
||||
_unit_system = [
|
||||
('si', dict(unit_system='SI')),
|
||||
('iec', dict(unit_system='IEC')),
|
||||
('invalid_unit_system', dict(unit_system='KKK', assert_error=True)),
|
||||
]
|
||||
|
||||
_sign = [
|
||||
('no_sign', dict(sign='')),
|
||||
('positive', dict(sign='+')),
|
||||
('negative', dict(sign='-')),
|
||||
('invalid_sign', dict(sign='~', assert_error=True)),
|
||||
]
|
||||
|
||||
_magnitude = [
|
||||
('integer', dict(magnitude='79')),
|
||||
('decimal', dict(magnitude='7.9')),
|
||||
('decimal_point_start', dict(magnitude='.9')),
|
||||
('decimal_point_end', dict(magnitude='79.', assert_error=True)),
|
||||
('invalid_literal', dict(magnitude='7.9.9', assert_error=True)),
|
||||
('garbage_value', dict(magnitude='asdf', assert_error=True)),
|
||||
]
|
||||
|
||||
_unit_prefix = [
|
||||
('no_unit_prefix', dict(unit_prefix='')),
|
||||
('k', dict(unit_prefix='k')),
|
||||
('K', dict(unit_prefix='K')),
|
||||
('M', dict(unit_prefix='M')),
|
||||
('G', dict(unit_prefix='G')),
|
||||
('T', dict(unit_prefix='T')),
|
||||
('Ki', dict(unit_prefix='Ki')),
|
||||
('Mi', dict(unit_prefix='Mi')),
|
||||
('Gi', dict(unit_prefix='Gi')),
|
||||
('Ti', dict(unit_prefix='Ti')),
|
||||
('invalid_unit_prefix', dict(unit_prefix='B', assert_error=True)),
|
||||
]
|
||||
|
||||
_unit_suffix = [
|
||||
('b', dict(unit_suffix='b')),
|
||||
('bit', dict(unit_suffix='bit')),
|
||||
('B', dict(unit_suffix='B')),
|
||||
('invalid_unit_suffix', dict(unit_suffix='Kg', assert_error=True)),
|
||||
]
|
||||
|
||||
_return_int = [
|
||||
('return_dec', dict(return_int=False)),
|
||||
('return_int', dict(return_int=True)),
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def generate_scenarios(cls):
|
||||
cls.scenarios = testscenarios.multiply_scenarios(cls._unit_system,
|
||||
cls._sign,
|
||||
cls._magnitude,
|
||||
cls._unit_prefix,
|
||||
cls._unit_suffix,
|
||||
cls._return_int)
|
||||
|
||||
def test_string_to_bytes(self):
|
||||
|
||||
def _get_quantity(sign, magnitude, unit_suffix):
|
||||
res = float('%s%s' % (sign, magnitude))
|
||||
if unit_suffix in ['b', 'bit']:
|
||||
res /= 8
|
||||
return res
|
||||
|
||||
def _get_constant(unit_prefix, unit_system):
|
||||
if not unit_prefix:
|
||||
return 1
|
||||
elif unit_system == 'SI':
|
||||
res = getattr(units, unit_prefix)
|
||||
elif unit_system == 'IEC':
|
||||
if unit_prefix.endswith('i'):
|
||||
res = getattr(units, unit_prefix)
|
||||
else:
|
||||
res = getattr(units, '%si' % unit_prefix)
|
||||
return res
|
||||
|
||||
text = ''.join([self.sign, self.magnitude, self.unit_prefix,
|
||||
self.unit_suffix])
|
||||
err_si = self.unit_system == 'SI' and (self.unit_prefix == 'K' or
|
||||
self.unit_prefix.endswith('i'))
|
||||
err_iec = self.unit_system == 'IEC' and self.unit_prefix == 'k'
|
||||
if getattr(self, 'assert_error', False) or err_si or err_iec:
|
||||
self.assertRaises(ValueError, strutils.string_to_bytes,
|
||||
text, unit_system=self.unit_system,
|
||||
return_int=self.return_int)
|
||||
return
|
||||
quantity = _get_quantity(self.sign, self.magnitude, self.unit_suffix)
|
||||
constant = _get_constant(self.unit_prefix, self.unit_system)
|
||||
expected = quantity * constant
|
||||
actual = strutils.string_to_bytes(text, unit_system=self.unit_system,
|
||||
return_int=self.return_int)
|
||||
if self.return_int:
|
||||
self.assertEqual(actual, int(math.ceil(expected)))
|
||||
else:
|
||||
self.assertAlmostEqual(actual, expected)
|
||||
|
||||
StringToBytesTest.generate_scenarios()
|
||||
|
Loading…
Reference in New Issue
Block a user