Use HMAC.hexdigest to avoid non-ascii characters for package data

hmac.compare_digest() method in python 3 doesn't suppport non-ASCII
characters, so that we need to use HMAC.hexdigest() to avoid non-ASCII
bytes.

For backward compability, we still need to support HMAC created by
using HMAC.digest() in the old amphorae.

Change-Id: I1061e855d0ce06d91a26d217291008197e4a708b
Depends-On: https://review.openstack.org/#/c/567688/
Story: 2002125
Task: 19807
This commit is contained in:
Lingxian Kong 2018-05-31 11:39:18 +12:00
parent ac4c6abe11
commit fdc96d9d9e
3 changed files with 57 additions and 13 deletions

View File

@ -27,6 +27,7 @@ LOG = logging.getLogger(__name__)
hash_algo = hashlib.sha256
hash_len = 32
hex_hash_len = 64
def to_hex(byte_array):
@ -45,21 +46,40 @@ def decode_obj(binary_array):
return obj
def wrap_envelope(obj, key):
def wrap_envelope(obj, key, hex=True):
payload = encode_obj(obj)
hmc = get_hmac(payload, key)
hmc = get_hmac(payload, key, hex=hex)
envelope = payload + hmc
return envelope
def unwrap_envelope(envelope, key):
payload = envelope[:-hash_len]
expected_hmc = envelope[-hash_len:]
calculated_hmc = get_hmac(payload, key)
"""A backward-compatible way to get data.
We may still receive package from amphorae that are using digest() instead
of hexdigest()
"""
try:
return get_payload(envelope, key, hex=True)
except Exception:
return get_payload(envelope, key, hex=False)
def get_payload(envelope, key, hex=True):
len = hex_hash_len if hex else hash_len
payload = envelope[:-len]
expected_hmc = envelope[-len:]
calculated_hmc = get_hmac(payload, key, hex=hex)
if not secretutils.constant_time_compare(expected_hmc, calculated_hmc):
LOG.warning('calculated hmac: %(s1)s not equal to msg hmac: '
'%(s2)s dropping packet', {'s1': to_hex(calculated_hmc),
's2': to_hex(expected_hmc)})
LOG.warning(
'calculated hmac(hex=%(hex)s): %(s1)s not equal to msg hmac: '
'%(s2)s dropping packet',
{
'hex': hex,
's1': to_hex(calculated_hmc),
's2': to_hex(expected_hmc)
}
)
fmt = 'calculated hmac: {0} not equal to msg hmac: {1} dropping packet'
raise exceptions.InvalidHMACException(fmt.format(
to_hex(calculated_hmc), to_hex(expected_hmc)))
@ -67,6 +87,11 @@ def unwrap_envelope(envelope, key):
return obj
def get_hmac(payload, key):
def get_hmac(payload, key, hex=True):
"""Get digest for the payload.
The hex param is for backward compatibility, so the package data sent from
the existing amphorae can still be checked in the previous approach.
"""
hmc = hmac.new(key.encode("utf-8"), payload, hashlib.sha256)
return hmc.digest()
return hmc.hexdigest().encode("utf-8") if hex else hmc.digest()

View File

@ -37,3 +37,20 @@ class TestEnvelope(base.TestCase):
args = (envelope, 'samplekey?')
self.assertRaises(exceptions.InvalidHMACException,
status_message.unwrap_envelope, *args)
def test_message_hmac_compatibility(self):
seq = 42
statusMsg = {'seq': seq,
'status': 'OK',
'id': str(uuid.uuid4())}
envelope = status_message.wrap_envelope(statusMsg, 'samplekey1',
hex=False)
obj = status_message.unwrap_envelope(envelope, 'samplekey1')
self.assertEqual('OK', obj['status'])
self.assertEqual(seq, obj['seq'])
args = (envelope, 'samplekey?')
self.assertRaises(exceptions.InvalidHMACException,
status_message.unwrap_envelope, *args)

View File

@ -28,9 +28,11 @@ IP_PORT = ['192.0.2.10:5555', '192.0.2.10:5555']
KEY = 'TEST'
PORT = random.randrange(1, 9000)
SAMPLE_MSG = {'testkey': 'TEST'}
SAMPLE_MSG_BIN = binascii.unhexlify('78daab562a492d2ec94ead54b252500a710d0e5'
'1aa050041b506245806e5c1971e79951818394e'
'a6e71ad989ff950945f9573f4ab6f83e25db8ed7')
SAMPLE_MSG_BIN = binascii.unhexlify('78daab562a492d2ec94ead54b252500a710d0e51a'
'a050041b506243538303665356331393731653739'
'39353138313833393465613665373161643938396'
'66639353039343566393537336634616236663833'
'653235646238656437')
class TestHealthSender(base.TestCase):