From 1ee42cc3ff585abd370c2e7974951c93f2816394 Mon Sep 17 00:00:00 2001
From: Zhenguo Niu <Niu.ZGlinux@gmail.com>
Date: Mon, 25 Jul 2016 19:07:27 +0800
Subject: [PATCH] Parallel erase disk devices

Currently we erase the disks one by one, which takes a long
time to finish, this patch adds support to the IPA so that
it can erase disks in parallel if told so.

Story: 1546949
Task: 11548
Co-Authored-By: yuan liang <leetpy2@gmail.com>
Co-Authored-By: Kaifeng Wang <kaifeng.w@gmail.com>

Change-Id: If5cfb6ec000a654d07103c4b378d4c135249e238
---
 ironic_python_agent/hardware.py               | 25 ++++++--
 .../tests/unit/test_hardware.py               | 64 ++++++++++++++++++-
 ...l-erase-disk-devices-09ea33d5443aead0.yaml | 11 ++++
 3 files changed, 95 insertions(+), 5 deletions(-)
 create mode 100644 releasenotes/notes/parallel-erase-disk-devices-09ea33d5443aead0.yaml

diff --git a/ironic_python_agent/hardware.py b/ironic_python_agent/hardware.py
index 45414c6a1..f758c4c1a 100644
--- a/ironic_python_agent/hardware.py
+++ b/ironic_python_agent/hardware.py
@@ -16,6 +16,7 @@ import abc
 import binascii
 import functools
 import json
+from multiprocessing.pool import ThreadPool
 import os
 import shlex
 import time
@@ -355,7 +356,7 @@ class HardwareManager(object):
         """Attempt to erase a block device.
 
         Implementations should detect the type of device and erase it in the
-        most appropriate way possible.  Generic implementations should support
+        most appropriate way possible. Generic implementations should support
         common erase mechanisms such as ATA secure erase, or multi-pass random
         writes. Operators with more specific needs should override this method
         in order to detect and handle "interesting" cases, or delegate to the
@@ -367,6 +368,9 @@ class HardwareManager(object):
         parent class. Upstream submissions of common functionality are
         encouraged.
 
+        This interface could be called concurrently to speed up erasure, as
+        such, it should be implemented in a thread-safe way.
+
         :param node: Ironic node object
         :param block_device: a BlockDevice indicating a device to be erased.
         :raises IncompatibleHardwareMethodError: when there is no known way to
@@ -390,10 +394,23 @@ class HardwareManager(object):
         """
         erase_results = {}
         block_devices = self.list_block_devices()
+        if not len(block_devices):
+            return {}
+
+        info = node.get('driver_internal_info', {})
+        max_pool_size = info.get('disk_erasure_concurrency', 1)
+
+        thread_pool = ThreadPool(min(max_pool_size, len(block_devices)))
         for block_device in block_devices:
-            result = dispatch_to_managers(
-                'erase_block_device', node=node, block_device=block_device)
-            erase_results[block_device.name] = result
+            params = {'node': node, 'block_device': block_device}
+            erase_results[block_device.name] = thread_pool.apply_async(
+                dispatch_to_managers, ('erase_block_device',), params)
+        thread_pool.close()
+        thread_pool.join()
+
+        for device_name, result in erase_results.items():
+            erase_results[device_name] = result.get()
+
         return erase_results
 
     def wait_for_disks(self):
diff --git a/ironic_python_agent/tests/unit/test_hardware.py b/ironic_python_agent/tests/unit/test_hardware.py
index 3e914783f..316493d5e 100644
--- a/ironic_python_agent/tests/unit/test_hardware.py
+++ b/ironic_python_agent/tests/unit/test_hardware.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 
 import binascii
+import multiprocessing
 import os
 import time
 
@@ -23,6 +24,7 @@ from oslo_concurrency import processutils
 from oslo_config import cfg
 from oslo_utils import units
 import pyudev
+import six
 from stevedore import extension
 
 from ironic_python_agent import errors
@@ -1275,7 +1277,7 @@ class TestGenericHardwareManager(base.IronicAgentTest):
         mocked_listdir.assert_has_calls(expected_calls)
 
     @mock.patch.object(hardware, 'dispatch_to_managers', autospec=True)
-    def test_erase_devices(self, mocked_dispatch):
+    def test_erase_devices_no_parallel_by_default(self, mocked_dispatch):
         mocked_dispatch.return_value = 'erased device'
 
         self.hardware.list_block_devices = mock.Mock()
@@ -1290,6 +1292,66 @@ class TestGenericHardwareManager(base.IronicAgentTest):
 
         self.assertEqual(expected, result)
 
+    @mock.patch('multiprocessing.pool.ThreadPool.apply_async', autospec=True)
+    @mock.patch.object(hardware, 'dispatch_to_managers', autospec=True)
+    def test_erase_devices_concurrency(self, mocked_dispatch, mocked_async):
+        internal_info = self.node['driver_internal_info']
+        internal_info['disk_erasure_concurrency'] = 10
+        mocked_dispatch.return_value = 'erased device'
+
+        if six.PY3:
+            apply_result = multiprocessing.pool.ApplyResult({}, None, None)
+        else:
+            apply_result = multiprocessing.pool.ApplyResult({}, None)
+        apply_result._success = True
+        apply_result._ready = True
+        apply_result.get = lambda: 'erased device'
+        mocked_async.return_value = apply_result
+
+        self.hardware.list_block_devices = mock.Mock()
+        self.hardware.list_block_devices.return_value = [
+            hardware.BlockDevice('/dev/sdj', 'big', 1073741824, True),
+            hardware.BlockDevice('/dev/hdaa', 'small', 65535, False),
+        ]
+
+        expected = {'/dev/hdaa': 'erased device', '/dev/sdj': 'erased device'}
+
+        result = self.hardware.erase_devices(self.node, [])
+
+        self.assertTrue(mocked_async.called)
+        self.assertEqual(expected, result)
+
+    @mock.patch.object(hardware, 'ThreadPool', autospec=True)
+    def test_erase_devices_concurrency_pool_size(self, mocked_pool):
+        self.hardware.list_block_devices = mock.Mock()
+        self.hardware.list_block_devices.return_value = [
+            hardware.BlockDevice('/dev/sdj', 'big', 1073741824, True),
+            hardware.BlockDevice('/dev/hdaa', 'small', 65535, False),
+        ]
+
+        # Test pool size 10 with 2 disks
+        internal_info = self.node['driver_internal_info']
+        internal_info['disk_erasure_concurrency'] = 10
+
+        self.hardware.erase_devices(self.node, [])
+        mocked_pool.assert_called_with(2)
+
+        # Test default pool size with 2 disks
+        internal_info = self.node['driver_internal_info']
+        del internal_info['disk_erasure_concurrency']
+
+        self.hardware.erase_devices(self.node, [])
+        mocked_pool.assert_called_with(1)
+
+    @mock.patch.object(hardware, 'dispatch_to_managers', autospec=True)
+    def test_erase_devices_without_disk(self, mocked_dispatch):
+        self.hardware.list_block_devices = mock.Mock()
+        self.hardware.list_block_devices.return_value = []
+
+        expected = {}
+        result = self.hardware.erase_devices({}, [])
+        self.assertEqual(expected, result)
+
     @mock.patch.object(utils, 'execute', autospec=True)
     def test_erase_block_device_ata_success(self, mocked_execute):
         mocked_execute.side_effect = [
diff --git a/releasenotes/notes/parallel-erase-disk-devices-09ea33d5443aead0.yaml b/releasenotes/notes/parallel-erase-disk-devices-09ea33d5443aead0.yaml
new file mode 100644
index 000000000..0f04ecb04
--- /dev/null
+++ b/releasenotes/notes/parallel-erase-disk-devices-09ea33d5443aead0.yaml
@@ -0,0 +1,11 @@
+---
+features:
+  - |
+    Support parallel disk device erasure. This is controlled by the
+    ``driver_internal_info['agent_enable_parallel_erasure']`` passed
+    by ironic.
+other:
+  - |
+    The ``HardwareManager.erase_block_device`` interface could be called
+    concurrently to support the feature of parallel disk device erasure,
+    it should be implemented in a thread-safe way.
\ No newline at end of file