Partial implementation of contexts: a new feature to simplify common conversions

This commit implements contexts as a way to allow conversion between two unrelated
dimensions (e.g. time and length). contexts are specified by the source and
destination dimensions and the conversion function. These functions are used
in case

The context is implemented as a graph (not just isolated pairs) to enable
conversion from A -> C, if A -> B and B -> are defined.

Still to be done:
- a proper API to define contexts
- parsing of context in the defintion file
- default context
- parameterized contexts

See Issue #65
This commit is contained in:
Hernan Grecco
2013-09-16 21:24:46 -03:00
parent 1c41facfe7
commit 2c0d10a3a1
4 changed files with 382 additions and 5 deletions

132
pint/compat.py Normal file
View File

@@ -0,0 +1,132 @@
from collections import MutableMapping
from thread import get_ident
def _recursive_repr(fillvalue='...'):
'Decorator to make a repr function return fillvalue for a recursive call'
def decorating_function(user_function):
repr_running = set()
def wrapper(self):
key = id(self), get_ident()
if key in repr_running:
return fillvalue
repr_running.add(key)
try:
result = user_function(self)
finally:
repr_running.discard(key)
return result
# Can't use functools.wraps() here because of bootstrap issues
wrapper.__module__ = getattr(user_function, '__module__')
wrapper.__doc__ = getattr(user_function, '__doc__')
wrapper.__name__ = getattr(user_function, '__name__')
wrapper.__annotations__ = getattr(user_function, '__annotations__', {})
return wrapper
return decorating_function
class ChainMap(MutableMapping):
''' A ChainMap groups multiple dicts (or other mappings) together
to create a single, updateable view.
The underlying mappings are stored in a list. That list is public and can
accessed or updated using the *maps* attribute. There is no other state.
Lookups search the underlying mappings successively until a key is found.
In contrast, writes, updates, and deletions only operate on the first
mapping.
'''
def __init__(self, *maps):
'''Initialize a ChainMap by setting *maps* to the given mappings.
If no mappings are provided, a single empty dictionary is used.
'''
self.maps = list(maps) or [{}] # always at least one map
def __missing__(self, key):
raise KeyError(key)
def __getitem__(self, key):
for mapping in self.maps:
try:
return mapping[key] # can't use 'key in mapping' with defaultdict
except KeyError:
pass
return self.__missing__(key) # support subclasses that define __missing__
def get(self, key, default=None):
return self[key] if key in self else default
def __len__(self):
return len(set().union(*self.maps)) # reuses stored hash values if possible
def __iter__(self):
return iter(set().union(*self.maps))
def __contains__(self, key):
return any(key in m for m in self.maps)
def __bool__(self):
return any(self.maps)
@_recursive_repr()
def __repr__(self):
return '{0.__class__.__name__}({1})'.format(
self, ', '.join(map(repr, self.maps)))
@classmethod
def fromkeys(cls, iterable, *args):
'Create a ChainMap with a single dict created from the iterable.'
return cls(dict.fromkeys(iterable, *args))
def copy(self):
'New ChainMap or subclass with a new copy of maps[0] and refs to maps[1:]'
return self.__class__(self.maps[0].copy(), *self.maps[1:])
__copy__ = copy
def new_child(self, m=None): # like Django's Context.push()
'''
New ChainMap with a new map followed by all previous maps. If no
map is provided, an empty dict is used.
'''
if m is None:
m = {}
return self.__class__(m, *self.maps)
@property
def parents(self): # like Django's Context.pop()
'New ChainMap from maps[1:].'
return self.__class__(*self.maps[1:])
def __setitem__(self, key, value):
self.maps[0][key] = value
def __delitem__(self, key):
try:
del self.maps[0][key]
except KeyError:
raise KeyError('Key not found in the first mapping: {!r}'.format(key))
def popitem(self):
'Remove and return an item pair from maps[0]. Raise KeyError is maps[0] is empty.'
try:
return self.maps[0].popitem()
except KeyError:
raise KeyError('No keys found in the first mapping.')
def pop(self, key, *args):
'Remove *key* from maps[0] and return its value. Raise KeyError if *key* not in maps[0].'
try:
return self.maps[0].pop(key, *args)
except KeyError:
raise KeyError('Key not found in the first mapping: {!r}'.format(key))
def clear(self):
'Clear maps[0], leaving maps[1:] intact.'
self.maps[0].clear()

View File

@@ -0,0 +1,144 @@
# -*- coding: utf-8 -*-
from __future__ import division, unicode_literals, print_function, absolute_import
import unittest
from pint import UnitRegistry
from pint.unit import UnitsContainer, _freeze
def add_ctxs(ureg):
a, b = _freeze(UnitsContainer({'[length]': 1})), _freeze(UnitsContainer({'[time]': -1}))
d = {}
d[(a, b)] = lambda x: ureg.speed_of_light / x
d[(b, a)] = lambda x: ureg.speed_of_light / x
ureg._contexts['sp'] = d
a, b = _freeze(UnitsContainer({'[length]': 1})), _freeze(UnitsContainer({'[current]': -1}))
d = {}
d[(a, b)] = lambda x: 1 / x
d[(b, a)] = lambda x: 1 / x
ureg._contexts['ab'] = d
class TestContexts(unittest.TestCase):
def test_known_context(self):
ureg = UnitRegistry()
add_ctxs(ureg)
with ureg.context('sp'):
self.assertTrue(ureg._active_ctx)
self.assertTrue(ureg._active_ctx_graph)
self.assertFalse(ureg._active_ctx)
self.assertFalse(ureg._active_ctx_graph)
def test_known_nested_context(self):
ureg = UnitRegistry()
add_ctxs(ureg)
with ureg.context('sp'):
x = dict(ureg._active_ctx)
y = dict(ureg._active_ctx_graph)
self.assertTrue(ureg._active_ctx)
self.assertTrue(ureg._active_ctx_graph)
with ureg.context('ab'):
self.assertTrue(ureg._active_ctx)
self.assertTrue(ureg._active_ctx_graph)
self.assertNotEqual(x, ureg._active_ctx)
self.assertNotEqual(y, ureg._active_ctx_graph)
self.assertEqual(x, ureg._active_ctx)
self.assertEqual(y, ureg._active_ctx_graph)
self.assertFalse(ureg._active_ctx)
self.assertFalse(ureg._active_ctx_graph)
def test_unknown_context(self):
ureg = UnitRegistry()
add_ctxs(ureg)
try:
with ureg.context('la'):
pass
except KeyError as e:
value = True
except Exception as e:
value = False
self.assertTrue(value)
self.assertFalse(ureg._active_ctx)
self.assertFalse(ureg._active_ctx_graph)
def test_unknown_nested_context(self):
ureg = UnitRegistry()
add_ctxs(ureg)
with ureg.context('sp'):
x = dict(ureg._active_ctx)
y = dict(ureg._active_ctx_graph)
try:
with ureg.context('la'):
pass
except KeyError as e:
value = True
except Exception as e:
value = False
self.assertTrue(value)
self.assertEqual(x, ureg._active_ctx)
self.assertEqual(y, ureg._active_ctx_graph)
self.assertFalse(ureg._active_ctx)
self.assertFalse(ureg._active_ctx_graph)
def test_one_context(self):
ureg = UnitRegistry()
add_ctxs(ureg)
q = 500 * ureg.meter
s = (ureg.speed_of_light / q).to('Hz')
self.assertRaises(ValueError, q.to, 'Hz')
with ureg.context('sp'):
self.assertEqual(q.to('Hz'), s)
self.assertRaises(ValueError, q.to, 'Hz')
def test_multiple_context(self):
ureg = UnitRegistry()
add_ctxs(ureg)
q = 500 * ureg.meter
s = (ureg.speed_of_light / q).to('Hz')
self.assertRaises(ValueError, q.to, 'Hz')
with ureg.context('sp', 'ab'):
self.assertEqual(q.to('Hz'), s)
self.assertRaises(ValueError, q.to, 'Hz')
def test_nested_context(self):
ureg = UnitRegistry()
add_ctxs(ureg)
q = 500 * ureg.meter
s = (ureg.speed_of_light / q).to('Hz')
self.assertRaises(ValueError, q.to, 'Hz')
with ureg.context('sp'):
self.assertEqual(q.to('Hz'), s)
with ureg.context('ab'):
self.assertEqual(q.to('Hz'), s)
self.assertEqual(q.to('Hz'), s)
with ureg.context('ab'):
self.assertRaises(ValueError, q.to, 'Hz')
with ureg.context('sp'):
self.assertEqual(q.to('Hz'), s)
self.assertRaises(ValueError, q.to, 'Hz')

View File

@@ -17,16 +17,20 @@ import math
import itertools
import functools
import pkg_resources
from decimal import Decimal
from collections import defaultdict
from contextlib import contextmanager
from io import open
from numbers import Number
from tokenize import untokenize, NUMBER, STRING, NAME, OP
from .compat import ChainMap
from .util import (formatter, logger, NUMERIC_TYPES, pi_theorem, solve_dependencies,
ParserHelper, string_types, ptok, string_preprocessor)
from .util import find_shortest_path
from decimal import Decimal
class UndefinedUnitError(ValueError):
"""Raised when the units are not defined in the unit registry.
@@ -167,6 +171,12 @@ def _is_dim(name):
return name.startswith('[') and name.endswith(']')
def _freeze(d):
"""Return a hashable view of dict.
"""
return frozenset(d.items())
class PrefixDefinition(Definition):
"""Definition of a prefix.
"""
@@ -408,7 +418,22 @@ class UnitRegistry(object):
#: Map suffix name (string) to canonical , and unit alias to canonical unit name
self._suffixes = {'': None, 's': ''}
#: In the context of a multiplication of units, interpret
# A context defines transformation rules between base dimensions (e.g. time and length).
# Transformations are stored in a dict with:
# - key: tuple with source and destination dimensions represented
# as set of UnitContainer.items()
# - value: conversion function taking a single value.
#: Map context name (string) or abbreviation to context.
self._contexts = {}
#: Stores active contexts.
self._active_ctx = ChainMap()
#: Store a graph representation of the context.
self._active_ctx_graph = None
#: When performing a multiplication of units, interpret
#: non-multiplicative units as their *delta* counterparts.
self.default_to_delta = default_to_delta
@@ -433,6 +458,52 @@ class UnitRegistry(object):
'parse_units', 'parse_expression', 'pi_theorem',
'convert', 'get_base_units']
@contextmanager
def context(self, *names):
"""Used as a context manager, this function enables to activate a context
which is removed after usage.
:param names: name of the context.
Multiple contexts can be called in single call or nested::
>>> with ureg.context('one', 'two'):
... pass
>>> with ureg.context('one'):
... with ureg.context('two'):
... pass
"""
# For each name, we first find the corresponding context.
ctxs = tuple(self._contexts[name] for name in names)
# And then add them to the active context.
for ctx in ctxs:
self._active_ctx = self._active_ctx.new_child(ctx)
# The graph representing connections between dimensions is rebuilt
# from the connections (edges) stored in the context.
self._active_ctx_graph = defaultdict(list)
for fr_, to_ in self._active_ctx.keys():
self._active_ctx_graph[fr_].append(to_)
try:
# After adding the context and rebuilding the graph, the registry
# is ready to use.
yield self
finally:
# Upon leaving the with statement,
# the added contexts are removed from the active one.
for _ in names:
self._active_ctx = self._active_ctx.parents
# The graph representing connections between dimensions is rebuilt
# from the connections (edges) remaining in the context.
self._active_ctx_graph = defaultdict(list)
for fr_, to_ in self._active_ctx.keys():
self._active_ctx_graph[fr_].append(to_)
def define(self, definition):
"""Add unit to the registry.
"""
@@ -649,6 +720,22 @@ class UnitRegistry(object):
dst = ParserHelper.from_string(dst)
if src == dst:
return value
src_dim = self.get_dimensionality(src)
dst_dim = self.get_dimensionality(dst)
# If there is an active context, we look for a path connecting source and
# destination dimensionality. If it exists, we transform the source value
# by applying sequentially each transformation of the path.
if self._active_ctx:
path = find_shortest_path(self._active_ctx_graph, _freeze(src_dim), _freeze(dst_dim))
if path:
src = self.Quantity(value, src)
for a, b in zip(path[:-1], path[1:]):
src = self._active_ctx[(a, b)](src)
value, src = src.magnitude, src.units
if len(src) == 1:
src_unit, src_value = list(src.items())[0]
src_unit = self._units[src_unit]
@@ -668,9 +755,7 @@ class UnitRegistry(object):
factor, units = self.get_base_units(src / dst)
if len(units):
raise DimensionalityError(src, dst,
self.get_dimensionality(src),
self.get_dimensionality(dst))
raise DimensionalityError(src, dst, src_dim, dst_dim)
# factor is type float and if our magintude is type Decimal then
# must first convert to Decimal before we can '*' the values

View File

@@ -273,6 +273,22 @@ def solve_dependencies(dependencies):
return r
def find_shortest_path(graph, start, end, path=[]):
path = path + [start]
if start == end:
return path
if not graph.has_key(start):
return None
shortest = None
for node in graph[start]:
if node not in path:
newpath = find_shortest_path(graph, node, end, path)
if newpath:
if not shortest or len(newpath) < len(shortest):
shortest = newpath
return shortest
class ParserHelper(dict):
"""The ParserHelper stores in place the product of variables and
their respective exponent and implements the corresponding operations.