From b4e6fa07d119a5020580592e6b021effaf16bb0b Mon Sep 17 00:00:00 2001
From: Julien Danjou <julien@danjou.info>
Date: Mon, 11 Feb 2013 16:38:06 +0100
Subject: [PATCH] transformer: add acculumator transformer

This adds a transformer accumulating counters until a threshold, and then
flushing them out.

This implements a solution to blueprint swift-batched-requests

Change-Id: Ic1f36138d8ee1e5705f2285987763fbff9de0184
Signed-off-by: Julien Danjou <julien@danjou.info>
---
 ceilometer/transformer/__init__.py    |  0
 ceilometer/transformer/accumulator.py | 43 +++++++++++++++++++++++
 setup.py                              |  1 +
 tests/test_pipeline.py                | 49 ++++++++++++---------------
 4 files changed, 66 insertions(+), 27 deletions(-)
 create mode 100644 ceilometer/transformer/__init__.py
 create mode 100644 ceilometer/transformer/accumulator.py

diff --git a/ceilometer/transformer/__init__.py b/ceilometer/transformer/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/ceilometer/transformer/accumulator.py b/ceilometer/transformer/accumulator.py
new file mode 100644
index 0000000000..dc6a6ad48f
--- /dev/null
+++ b/ceilometer/transformer/accumulator.py
@@ -0,0 +1,43 @@
+# -*- encoding: utf-8 -*-
+#
+# Copyright © 2013 Julien Danjou
+#
+# Author: Julien Danjou <julien@danjou.info>
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from ceilometer import plugin
+
+
+class TransformerAccumulator(plugin.TransformerBase):
+    """Transformer that accumulates counter until a threshold, and then flush
+    them out in the wild. """
+
+    def __init__(self, size=1, **kwargs):
+        if size >= 1:
+            self.counters = []
+        self.size = size
+        super(TransformerAccumulator, self).__init__(**kwargs)
+
+    def handle_sample(self, context, counter, source):
+        if self.size >= 1:
+            self.counters.append(counter)
+        else:
+            return counter
+
+    def flush(self, context, source):
+        if len(self.counters) >= self.size:
+            x = self.counters
+            self.counters = []
+            return x
+        return []
diff --git a/setup.py b/setup.py
index 254efc35d9..d84373b1cf 100755
--- a/setup.py
+++ b/setup.py
@@ -131,6 +131,7 @@ setuptools.setup(
     libvirt = ceilometer.compute.virt.libvirt.inspector:LibvirtInspector
 
     [ceilometer.transformer]
+    accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
 
     [ceilometer.publisher]
     meter_publisher = ceilometer.publisher.meter_publish:MeterPublisher
diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py
index 331f5ea2b1..d0d95045be 100644
--- a/tests/test_pipeline.py
+++ b/tests/test_pipeline.py
@@ -22,6 +22,7 @@ from stevedore import extension
 
 from ceilometer import counter
 from ceilometer import plugin
+from ceilometer.transformer import accumulator
 from ceilometer.openstack.common import timeutils
 from ceilometer import pipeline
 from ceilometer.tests import base
@@ -42,7 +43,7 @@ class TestPipeline(base.TestCase):
             'update': self.TransformerClass,
             'except': self.TransformerClassException,
             'drop': self.TransformerClassDrop,
-            'cache': self.TransformerClassCache}
+            'cache': accumulator.TransformerAccumulator}
 
         if name in class_name_ext:
             return extension.Extension(name, None,
@@ -91,21 +92,6 @@ class TestPipeline(base.TestCase):
         def handle_sample(self, ctxt, counter, source):
             raise Exception()
 
-    class TransformerClassCache(object):
-        samples = []
-        caches = []
-
-        def __init__(self, drop=True):
-            self.__class__.caches = []
-
-        def handle_sample(self, ctxt, counter, source):
-            self.__class__.caches.append(counter)
-
-        def flush(self, ctxt, source):
-            x = self.__class__.caches
-            self.__class__.caches = []
-            return x
-
     def _create_publisher_manager(self, ext_name='test'):
         self.publisher_manager = dispatch.NameDispatchExtensionManager(
             'fake',
@@ -563,10 +549,13 @@ class TestPipeline(base.TestCase):
                         == 'b_update')
 
     def test_flush_pipeline_cache(self):
+        CACHE_SIZE = 10
         self.pipeline_cfg[0]['transformers'].extend([
             {
                 'name': 'cache',
-                'parameters': {}
+                'parameters': {
+                    'size': CACHE_SIZE,
+                }
             },
             {
                 'name': 'update',
@@ -581,20 +570,27 @@ class TestPipeline(base.TestCase):
         pipe = pipeline_manager.pipelines_for_counter('a')[0]
 
         pipe.publish_counter(None, self.test_counter, None)
-        self.assertTrue(len(self.TransformerClassCache.caches) == 1)
-        self.assertTrue(len(self.TransformerClass.samples) == 1)
         self.assertTrue(len(self.publisher.counters) == 0)
         pipe.flush(None, None)
-        self.assertTrue(len(self.publisher.counters) == 1)
-        self.assertTrue(len(self.TransformerClass.samples) == 2)
+        self.assertEqual(len(self.publisher.counters), 0)
+        pipe.publish_counter(None, self.test_counter, None)
+        pipe.flush(None, None)
+        self.assertEqual(len(self.publisher.counters), 0)
+        for i in range(CACHE_SIZE - 2):
+            pipe.publish_counter(None, self.test_counter, None)
+        pipe.flush(None, None)
+        self.assertEqual(len(self.publisher.counters), CACHE_SIZE)
         self.assertTrue(getattr(self.publisher.counters[0], 'name')
                         == 'a_update_new')
 
     def test_flush_pipeline_cache_multiple_counter(self):
+        CACHE_SIZE = 3
         self.pipeline_cfg[0]['transformers'].extend([
             {
                 'name': 'cache',
-                'parameters': {}
+                'parameters': {
+                    'size': CACHE_SIZE
+                }
             },
             {
                 'name': 'update',
@@ -611,12 +607,12 @@ class TestPipeline(base.TestCase):
         pipe.publish_counter(None, self.test_counter, None)
         self.test_counter = self.test_counter._replace(name='b')
         pipe.publish_counter(None, self.test_counter, None)
-        self.assertTrue(len(self.TransformerClassCache.caches) == 2)
-        self.assertTrue(len(self.TransformerClass.samples) == 2)
         self.assertTrue(len(self.publisher.counters) == 0)
         pipe.flush(None, None)
-        self.assertTrue(len(self.publisher.counters) == 2)
-        self.assertTrue(len(self.TransformerClass.samples) == 4)
+        self.assertEqual(len(self.publisher.counters), 0)
+        pipe.publish_counter(None, self.test_counter, None)
+        pipe.flush(None, None)
+        self.assertEqual(len(self.publisher.counters), CACHE_SIZE)
         self.assertTrue(getattr(self.publisher.counters[0], 'name')
                         == 'a_update_new')
         self.assertTrue(getattr(self.publisher.counters[1], 'name')
@@ -632,7 +628,6 @@ class TestPipeline(base.TestCase):
         pipe = pipeline_manager.pipelines_for_counter('a')[0]
 
         pipe.publish_counter(None, self.test_counter, None)
-        self.assertTrue(len(self.TransformerClassCache.caches) == 1)
         self.assertTrue(len(self.publisher.counters) == 0)
         pipe.flush(None, None)
         self.assertTrue(len(self.publisher.counters) == 1)