Add encoding of rows and qualifiers in impl_hbase
Currently, in impl_hbase we dump only cell values. Respectively if row or qualifier contains non-ascii symbol exception is raised. This patchset adds encoding/decoding for rows and qualifiers. It uses encoding instead dumping for backward compatibility of rows and cell keys in tables. Change-Id: Id238d38b4ce1f91d9e5e760856e27c5f6eb0685c Closes-bug: #1350826
This commit is contained in:
parent
942fb02a85
commit
87a5114342
|
@ -364,9 +364,10 @@ def deserialize_entry(entry, get_raw_meta=True):
|
||||||
metadata_flattened = {}
|
metadata_flattened = {}
|
||||||
for k, v in entry.items():
|
for k, v in entry.items():
|
||||||
if k.startswith('f:s_'):
|
if k.startswith('f:s_'):
|
||||||
sources.append(k[4:])
|
sources.append(decode_unicode(k[4:]))
|
||||||
elif k.startswith('f:r_metadata.'):
|
elif k.startswith('f:r_metadata.'):
|
||||||
metadata_flattened[k[len('f:r_metadata.'):]] = load(v)
|
qualifier = decode_unicode(k[len('f:r_metadata.'):])
|
||||||
|
metadata_flattened[qualifier] = load(v)
|
||||||
elif k.startswith("f:m_"):
|
elif k.startswith("f:m_"):
|
||||||
meter = ([unquote(i) for i in k[4:].split(':')], load(v))
|
meter = ([unquote(i) for i in k[4:].split(':')], load(v))
|
||||||
meters.append(meter)
|
meters.append(meter)
|
||||||
|
@ -402,17 +403,20 @@ def serialize_entry(data=None, **kwargs):
|
||||||
# To make insertion safe we need to store all meters and sources in
|
# To make insertion safe we need to store all meters and sources in
|
||||||
# a separate cell. For this purpose s_ and m_ prefixes are
|
# a separate cell. For this purpose s_ and m_ prefixes are
|
||||||
# introduced.
|
# introduced.
|
||||||
result['f:s_%s' % v] = dump('1')
|
qualifier = encode_unicode('f:s_%s' % v)
|
||||||
|
result[qualifier] = dump('1')
|
||||||
elif k == 'meter':
|
elif k == 'meter':
|
||||||
for meter, ts in v.items():
|
for meter, ts in v.items():
|
||||||
result['f:m_%s' % meter] = dump(ts)
|
qualifier = encode_unicode('f:m_%s' % meter)
|
||||||
|
result[qualifier] = dump(ts)
|
||||||
elif k == 'resource_metadata':
|
elif k == 'resource_metadata':
|
||||||
# keep raw metadata as well as flattened to provide
|
# keep raw metadata as well as flattened to provide
|
||||||
# capability with API v2. It will be flattened in another
|
# capability with API v2. It will be flattened in another
|
||||||
# way on API level. But we need flattened too for quick filtering.
|
# way on API level. But we need flattened too for quick filtering.
|
||||||
flattened_meta = dump_metadata(v)
|
flattened_meta = dump_metadata(v)
|
||||||
for key, m in flattened_meta.items():
|
for key, m in flattened_meta.items():
|
||||||
result['f:r_metadata.' + key] = dump(m)
|
metadata_qualifier = encode_unicode('f:r_metadata.' + key)
|
||||||
|
result[metadata_qualifier] = dump(m)
|
||||||
result['f:resource_metadata'] = dump(v)
|
result['f:resource_metadata'] = dump(v)
|
||||||
else:
|
else:
|
||||||
result['f:' + quote(k, ':')] = dump(v)
|
result['f:' + quote(k, ':')] = dump(v)
|
||||||
|
@ -434,6 +438,14 @@ def load(data):
|
||||||
return json.loads(data, object_hook=object_hook)
|
return json.loads(data, object_hook=object_hook)
|
||||||
|
|
||||||
|
|
||||||
|
def encode_unicode(data):
|
||||||
|
return data.encode('utf-8') if isinstance(data, six.text_type) else data
|
||||||
|
|
||||||
|
|
||||||
|
def decode_unicode(data):
|
||||||
|
return data.decode('utf-8') if isinstance(data, six.string_types) else data
|
||||||
|
|
||||||
|
|
||||||
# We don't want to have tzinfo in decoded json.This object_hook is
|
# We don't want to have tzinfo in decoded json.This object_hook is
|
||||||
# overwritten json_util.object_hook for $date
|
# overwritten json_util.object_hook for $date
|
||||||
def object_hook(dct):
|
def object_hook(dct):
|
||||||
|
|
|
@ -171,7 +171,8 @@ class Connection(hbase_base.Connection, base.Connection):
|
||||||
# automatically 'on the top'. It is needed to keep metadata
|
# automatically 'on the top'. It is needed to keep metadata
|
||||||
# up-to-date: metadata from newest samples is considered as actual.
|
# up-to-date: metadata from newest samples is considered as actual.
|
||||||
ts = int(time.mktime(data['timestamp'].timetuple()) * 1000)
|
ts = int(time.mktime(data['timestamp'].timetuple()) * 1000)
|
||||||
resource_table.put(data['resource_id'], resource, ts)
|
resource_table.put(hbase_utils.encode_unicode(data['resource_id']),
|
||||||
|
resource, ts)
|
||||||
|
|
||||||
# Rowkey consists of reversed timestamp, meter and a
|
# Rowkey consists of reversed timestamp, meter and a
|
||||||
# message signature for purposes of uniqueness
|
# message signature for purposes of uniqueness
|
||||||
|
@ -216,6 +217,7 @@ class Connection(hbase_base.Connection, base.Connection):
|
||||||
for resource_id, data in resource_table.scan(filter=q):
|
for resource_id, data in resource_table.scan(filter=q):
|
||||||
f_res, sources, meters, md = hbase_utils.deserialize_entry(
|
f_res, sources, meters, md = hbase_utils.deserialize_entry(
|
||||||
data)
|
data)
|
||||||
|
resource_id = hbase_utils.encode_unicode(resource_id)
|
||||||
# Unfortunately happybase doesn't keep ordered result from
|
# Unfortunately happybase doesn't keep ordered result from
|
||||||
# HBase. So that's why it's needed to find min and max
|
# HBase. So that's why it's needed to find min and max
|
||||||
# manually
|
# manually
|
||||||
|
|
|
@ -3227,3 +3227,24 @@ class MongoTimeToLiveTest(DBTestBase, tests_db.MixinTestsWithBackendScenarios):
|
||||||
['resource_ttl']['expireAfterSeconds'])
|
['resource_ttl']['expireAfterSeconds'])
|
||||||
self.assertEqual(15, self.conn.db.meter.index_information()
|
self.assertEqual(15, self.conn.db.meter.index_information()
|
||||||
['meter_ttl']['expireAfterSeconds'])
|
['meter_ttl']['expireAfterSeconds'])
|
||||||
|
|
||||||
|
|
||||||
|
class TestRecordUnicodeSamples(DBTestBase,
|
||||||
|
tests_db.MixinTestsWithBackendScenarios):
|
||||||
|
def prepare_data(self):
|
||||||
|
self.msgs = []
|
||||||
|
self.msgs.append(self.create_and_store_sample(
|
||||||
|
name=u'meter.accent\xe9\u0437',
|
||||||
|
metadata={u"metadata_key\xe9\u0437": "test",
|
||||||
|
u"metadata_key": u"test\xe9\u0437"},
|
||||||
|
))
|
||||||
|
|
||||||
|
def test_unicode_sample(self):
|
||||||
|
f = storage.SampleFilter()
|
||||||
|
results = list(self.conn.get_samples(f))
|
||||||
|
self.assertEqual(1, len(results))
|
||||||
|
expected = self.msgs[0]
|
||||||
|
actual = results[0].as_dict()
|
||||||
|
self.assertEqual(expected['counter_name'], actual['counter_name'])
|
||||||
|
self.assertEqual(expected['resource_metadata'],
|
||||||
|
actual['resource_metadata'])
|
||||||
|
|
Loading…
Reference in New Issue