diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py
index 4ff335ab6f..2999d219be 100644
--- a/ceilometer/tests/pipeline_base.py
+++ b/ceilometer/tests/pipeline_base.py
@@ -61,6 +61,7 @@ class BasePipelineTestCase(base.BaseTestCase):
             'unit_conversion': conversions.ScalingTransformer,
             'rate_of_change': conversions.RateOfChangeTransformer,
             'arithmetic': arithmetic.ArithmeticTransformer,
+            'delta': conversions.DeltaTransformer,
         }
 
         if name in class_name_ext:
@@ -1898,6 +1899,164 @@ class BasePipelineTestCase(base.BaseTestCase):
         publisher = pipeline_manager.pipelines[0].publishers[0]
         self.assertEqual(0, len(publisher.samples))
 
+    def _do_test_delta(self, data, expected, growth_only=False):
+        transformer_cfg = [
+            {
+                'name': 'delta',
+                'parameters': {
+                    'target': {'name': 'new_meter'},
+                    'growth_only': growth_only,
+                }
+            },
+        ]
+        self._set_pipeline_cfg('transformers', transformer_cfg)
+        self._set_pipeline_cfg('counters', ['cpu'])
+
+        pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
+                                                    self.transformer_manager)
+        pipe = pipeline_manager.pipelines[0]
+
+        pipe.publish_data(None, data)
+        pipe.flush(None)
+        publisher = pipeline_manager.pipelines[0].publishers[0]
+        self.assertEqual(expected, len(publisher.samples))
+        return publisher.samples
+
+    def test_delta_transformer(self):
+        samples = [
+            sample.Sample(
+                name='cpu',
+                type=sample.TYPE_CUMULATIVE,
+                volume=26,
+                unit='ns',
+                user_id='test_user',
+                project_id='test_proj',
+                resource_id='test_resource',
+                timestamp=timeutils.utcnow().isoformat(),
+                resource_metadata={'version': '1.0'}
+            ),
+            sample.Sample(
+                name='cpu',
+                type=sample.TYPE_CUMULATIVE,
+                volume=16,
+                unit='ns',
+                user_id='test_user',
+                project_id='test_proj',
+                resource_id='test_resource',
+                timestamp=timeutils.utcnow().isoformat(),
+                resource_metadata={'version': '2.0'}
+            ),
+            sample.Sample(
+                name='cpu',
+                type=sample.TYPE_CUMULATIVE,
+                volume=53,
+                unit='ns',
+                user_id='test_user_bis',
+                project_id='test_proj_bis',
+                resource_id='test_resource',
+                timestamp=timeutils.utcnow().isoformat(),
+                resource_metadata={'version': '1.0'}
+            ),
+        ]
+        deltas = self._do_test_delta(samples, 2)
+        self.assertEqual('new_meter', deltas[0].name)
+        self.assertEqual('delta', deltas[0].type)
+        self.assertEqual('ns', deltas[0].unit)
+        self.assertEqual({'version': '2.0'}, deltas[0].resource_metadata)
+        self.assertEqual(-10, deltas[0].volume)
+        self.assertEqual('new_meter', deltas[1].name)
+        self.assertEqual('delta', deltas[1].type)
+        self.assertEqual('ns', deltas[1].unit)
+        self.assertEqual({'version': '1.0'}, deltas[1].resource_metadata)
+        self.assertEqual(37, deltas[1].volume)
+
+    def test_delta_transformer_out_of_order(self):
+        samples = [
+            sample.Sample(
+                name='cpu',
+                type=sample.TYPE_CUMULATIVE,
+                volume=26,
+                unit='ns',
+                user_id='test_user',
+                project_id='test_proj',
+                resource_id='test_resource',
+                timestamp=timeutils.utcnow().isoformat(),
+                resource_metadata={'version': '1.0'}
+            ),
+            sample.Sample(
+                name='cpu',
+                type=sample.TYPE_CUMULATIVE,
+                volume=16,
+                unit='ns',
+                user_id='test_user',
+                project_id='test_proj',
+                resource_id='test_resource',
+                timestamp=((timeutils.utcnow() - datetime.timedelta(minutes=5))
+                           .isoformat()),
+                resource_metadata={'version': '2.0'}
+            ),
+            sample.Sample(
+                name='cpu',
+                type=sample.TYPE_CUMULATIVE,
+                volume=53,
+                unit='ns',
+                user_id='test_user_bis',
+                project_id='test_proj_bis',
+                resource_id='test_resource',
+                timestamp=timeutils.utcnow().isoformat(),
+                resource_metadata={'version': '1.0'}
+            ),
+        ]
+        deltas = self._do_test_delta(samples, 1)
+        self.assertEqual('new_meter', deltas[0].name)
+        self.assertEqual('delta', deltas[0].type)
+        self.assertEqual('ns', deltas[0].unit)
+        self.assertEqual({'version': '1.0'}, deltas[0].resource_metadata)
+        self.assertEqual(27, deltas[0].volume)
+
+    def test_delta_transformer_growth_only(self):
+        samples = [
+            sample.Sample(
+                name='cpu',
+                type=sample.TYPE_CUMULATIVE,
+                volume=26,
+                unit='ns',
+                user_id='test_user',
+                project_id='test_proj',
+                resource_id='test_resource',
+                timestamp=timeutils.utcnow().isoformat(),
+                resource_metadata={'version': '1.0'}
+            ),
+            sample.Sample(
+                name='cpu',
+                type=sample.TYPE_CUMULATIVE,
+                volume=16,
+                unit='ns',
+                user_id='test_user',
+                project_id='test_proj',
+                resource_id='test_resource',
+                timestamp=timeutils.utcnow().isoformat(),
+                resource_metadata={'version': '2.0'}
+            ),
+            sample.Sample(
+                name='cpu',
+                type=sample.TYPE_CUMULATIVE,
+                volume=53,
+                unit='ns',
+                user_id='test_user_bis',
+                project_id='test_proj_bis',
+                resource_id='test_resource',
+                timestamp=timeutils.utcnow().isoformat(),
+                resource_metadata={'version': '1.0'}
+            ),
+        ]
+        deltas = self._do_test_delta(samples, 1, True)
+        self.assertEqual('new_meter', deltas[0].name)
+        self.assertEqual('delta', deltas[0].type)
+        self.assertEqual('ns', deltas[0].unit)
+        self.assertEqual({'version': '1.0'}, deltas[0].resource_metadata)
+        self.assertEqual(37, deltas[0].volume)
+
     def test_unique_pipeline_names(self):
         self._dup_pipeline_name_cfg()
         self._exception_create_pipelinemanager()
diff --git a/ceilometer/tests/unit/test_decoupled_pipeline.py b/ceilometer/tests/unit/test_decoupled_pipeline.py
index 640856a9ec..ac9a6d1d2f 100644
--- a/ceilometer/tests/unit/test_decoupled_pipeline.py
+++ b/ceilometer/tests/unit/test_decoupled_pipeline.py
@@ -229,42 +229,42 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
     def test_rate_of_change_boilerplate_disk_read_cfg(self):
         meters = ('disk.read.bytes', 'disk.read.requests')
         units = ('B', 'request')
-        self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2,
+        self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
                                                                  meters,
                                                                  units)
 
     def test_rate_of_change_boilerplate_disk_write_cfg(self):
         meters = ('disk.write.bytes', 'disk.write.requests')
         units = ('B', 'request')
-        self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2,
+        self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
                                                                  meters,
                                                                  units)
 
     def test_rate_of_change_boilerplate_network_incoming_cfg(self):
         meters = ('network.incoming.bytes', 'network.incoming.packets')
         units = ('B', 'packet')
-        self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
+        self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(4,
                                                                  meters,
                                                                  units)
 
     def test_rate_of_change_boilerplate_per_disk_device_read_cfg(self):
         meters = ('disk.device.read.bytes', 'disk.device.read.requests')
         units = ('B', 'request')
-        self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2,
+        self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
                                                                  meters,
                                                                  units)
 
     def test_rate_of_change_boilerplate_per_disk_device_write_cfg(self):
         meters = ('disk.device.write.bytes', 'disk.device.write.requests')
         units = ('B', 'request')
-        self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2,
+        self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
                                                                  meters,
                                                                  units)
 
     def test_rate_of_change_boilerplate_network_outgoing_cfg(self):
         meters = ('network.outgoing.bytes', 'network.outgoing.packets')
         units = ('B', 'packet')
-        self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
+        self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(4,
                                                                  meters,
                                                                  units)
 
diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py
index 44e6f54c24..a9b25dfaec 100644
--- a/ceilometer/transformer/conversions.py
+++ b/ceilometer/transformer/conversions.py
@@ -20,15 +20,15 @@ from oslo_log import log
 from oslo_utils import timeutils
 import six
 
-from ceilometer.i18n import _
+from ceilometer.i18n import _, _LW
 from ceilometer import sample
 from ceilometer import transformer
 
 LOG = log.getLogger(__name__)
 
 
-class ScalingTransformer(transformer.TransformerBase):
-    """Transformer to apply a scaling conversion."""
+class BaseConversionTransformer(transformer.TransformerBase):
+    """Transformer to derive conversion."""
 
     grouping_keys = ['resource_id']
 
@@ -44,22 +44,7 @@ class ScalingTransformer(transformer.TransformerBase):
         target = target or {}
         self.source = source
         self.target = target
-        self.scale = target.get('scale')
-        LOG.debug('scaling conversion transformer with source:'
-                  ' %(source)s target: %(target)s:', {'source': source,
-                                                      'target': target})
-        super(ScalingTransformer, self).__init__(**kwargs)
-
-    def _scale(self, s):
-        """Apply the scaling factor.
-
-        Either a straight multiplicative factor or else a string to be eval'd.
-        """
-        ns = transformer.Namespace(s.as_dict())
-
-        scale = self.scale
-        return ((eval(scale, {}, ns) if isinstance(scale, six.string_types)
-                 else s.volume * scale) if scale else s.volume)
+        super(BaseConversionTransformer, self).__init__(**kwargs)
 
     def _map(self, s, attr):
         """Apply the name or unit mapping if configured."""
@@ -74,6 +59,93 @@ class ScalingTransformer(transformer.TransformerBase):
                     pass
         return mapped or self.target.get(attr, getattr(s, attr))
 
+
+class DeltaTransformer(BaseConversionTransformer):
+    """Transformer based on the delta of a sample volume."""
+
+    def __init__(self, target=None, growth_only=False, **kwargs):
+        """Initialize transformer with configured parameters.
+
+        :param growth_only: capture only positive deltas
+        """
+        super(DeltaTransformer, self).__init__(target=target, **kwargs)
+        self.growth_only = growth_only
+        self.cache = {}
+
+    def handle_sample(self, context, s):
+        """Handle a sample, converting if necessary."""
+        key = s.name + s.resource_id
+        prev = self.cache.get(key)
+        timestamp = timeutils.parse_isotime(s.timestamp)
+        self.cache[key] = (s.volume, timestamp)
+
+        if prev:
+            prev_volume = prev[0]
+            prev_timestamp = prev[1]
+            time_delta = timeutils.delta_seconds(prev_timestamp, timestamp)
+            # disallow violations of the arrow of time
+            if time_delta < 0:
+                LOG.warn(_LW('Dropping out of time order sample: %s'), (s,))
+                # Reset the cache to the newer sample.
+                self.cache[key] = prev
+                return None
+            volume_delta = s.volume - prev_volume
+            if self.growth_only and volume_delta < 0:
+                LOG.warn(_LW('Negative delta detected, dropping value'))
+                s = None
+            else:
+                s = self._convert(s, volume_delta)
+                LOG.debug('Converted to: %s', s)
+        else:
+            LOG.warn(_LW('Dropping sample with no predecessor: %s'),
+                     (s,))
+            s = None
+        return s
+
+    def _convert(self, s, delta):
+        """Transform the appropriate sample fields."""
+        return sample.Sample(
+            name=self._map(s, 'name'),
+            unit=s.unit,
+            type=sample.TYPE_DELTA,
+            volume=delta,
+            user_id=s.user_id,
+            project_id=s.project_id,
+            resource_id=s.resource_id,
+            timestamp=s.timestamp,
+            resource_metadata=s.resource_metadata
+        )
+
+
+class ScalingTransformer(BaseConversionTransformer):
+    """Transformer to apply a scaling conversion."""
+
+    def __init__(self, source=None, target=None, **kwargs):
+        """Initialize transformer with configured parameters.
+
+        :param source: dict containing source sample unit
+        :param target: dict containing target sample name, type,
+                       unit and scaling factor (a missing value
+                       connotes no change)
+        """
+        super(ScalingTransformer, self).__init__(source=source, target=target,
+                                                 **kwargs)
+        self.scale = self.target.get('scale')
+        LOG.debug('scaling conversion transformer with source:'
+                  ' %(source)s target: %(target)s:', {'source': self.source,
+                                                      'target': self.target})
+
+    def _scale(self, s):
+        """Apply the scaling factor.
+
+        Either a straight multiplicative factor or else a string to be eval'd.
+        """
+        ns = transformer.Namespace(s.as_dict())
+
+        scale = self.scale
+        return ((eval(scale, {}, ns) if isinstance(scale, six.string_types)
+                 else s.volume * scale) if scale else s.volume)
+
     def _convert(self, s, growth=1):
         """Transform the appropriate sample fields."""
         return sample.Sample(
diff --git a/etc/ceilometer/pipeline.yaml b/etc/ceilometer/pipeline.yaml
index 12b45f2e08..a5bd5148e7 100644
--- a/etc/ceilometer/pipeline.yaml
+++ b/etc/ceilometer/pipeline.yaml
@@ -12,6 +12,7 @@ sources:
           - "cpu"
       sinks:
           - cpu_sink
+          - cpu_delta_sink
     - name: disk_source
       interval: 600
       meters:
@@ -50,6 +51,15 @@ sinks:
                     scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
       publishers:
           - notifier://
+    - name: cpu_delta_sink
+      transformers:
+          - name: "delta"
+            parameters:
+                target:
+                    name: "cpu.delta"
+                growth_only: True
+      publishers:
+          - notifier://
     - name: disk_sink
       transformers:
           - name: "rate_of_change"
diff --git a/setup.cfg b/setup.cfg
index df3d9351a2..e775f70c92 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -220,6 +220,7 @@ ceilometer.hardware.inspectors =
 
 ceilometer.transformer =
     accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
+    delta = ceilometer.transformer.conversions:DeltaTransformer
     unit_conversion = ceilometer.transformer.conversions:ScalingTransformer
     rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer
     aggregator = ceilometer.transformer.conversions:AggregatorTransformer