Browse Source

Merge "Distributed serialization implementation"

Jenkins 2 years ago
parent
commit
1c3ef48a2e

+ 4
- 0
nailgun/nailgun/consts.py View File

@@ -532,3 +532,7 @@ DPDK_DRIVER_IN_SRIOV_CASE = 'vfio-pci'
532 532
 DEFAULT_MTU = 1500
533 533
 
534 534
 SIZE_OF_VLAN_TAG = 4
535
+
536
+SERIALIZATION_POLICY = Enum(
537
+    'distributed'
538
+)

+ 49
- 0
nailgun/nailgun/fixtures/openstack.yaml View File

@@ -1114,6 +1114,55 @@
1114 1114
             group: "security"
1115 1115
             weight: 20
1116 1116
             type: "radio"
1117
+          serialization_policy:
1118
+            value: "default"
1119
+            values:
1120
+              - data: "default"
1121
+                label: "Default serialization"
1122
+                description: "Run serialization on the master node only"
1123
+              - data: "distributed"
1124
+                label: "Distributed serialization"
1125
+                description: "Run serialization on the master and environment nodes. Nodes for serialization are selected only form that environment for wich serialization is performing."
1126
+            label: "Serialization policy"
1127
+            group: "general"
1128
+            weight: 30
1129
+            type: "radio"
1130
+          ds_use_discover:
1131
+            group: "general"
1132
+            label: "Use discovered nodes as workers for serialization"
1133
+            type: "checkbox"
1134
+            value: true
1135
+            weight: 31
1136
+            restrictions:
1137
+              - condition: "settings:common.serialization_policy.value != 'distributed'"
1138
+                action: "hide"
1139
+          ds_use_provisioned:
1140
+            group: "general"
1141
+            label: "Use provisioned nodes as workers for serialization"
1142
+            type: "checkbox"
1143
+            value: true
1144
+            weight: 32
1145
+            restrictions:
1146
+              - condition: "settings:common.serialization_policy.value != 'distributed'"
1147
+                action: "hide"
1148
+          ds_use_error:
1149
+            group: "general"
1150
+            label: "Use nodes in error state as workers for serialization"
1151
+            type: "checkbox"
1152
+            value: true
1153
+            weight: 33
1154
+            restrictions:
1155
+              - condition: "settings:common.serialization_policy.value != 'distributed'"
1156
+                action: "hide"
1157
+          ds_use_ready:
1158
+            group: "general"
1159
+            label: "Use ready nodes as workers for serialization"
1160
+            type: "checkbox"
1161
+            value: false
1162
+            weight: 34
1163
+            restrictions:
1164
+              - condition: "settings:common.serialization_policy.value != 'distributed'"
1165
+                action: "hide"
1117 1166
         public_network_assignment:
1118 1167
           metadata:
1119 1168
             weight: 10

+ 12
- 3
nailgun/nailgun/lcm/task_serializer.py View File

@@ -110,6 +110,8 @@ class Context(object):
110 110
         return evaluate
111 111
 
112 112
     def get_formatter_context(self, node_id):
113
+        # TODO(akislitsky) remove formatter context from the
114
+        # tasks serialization workflow
113 115
         data = self._transaction.get_new_data(node_id)
114 116
         return {
115 117
             'CLUSTER_ID': data.get('cluster', {}).get('id'),
@@ -147,9 +149,14 @@ class DeploymentTaskSerializer(object):
147 149
         :return: the result
148 150
         """
149 151
 
150
-    def serialize(self, node_id):
151
-        """Serialize task in expected by orchestrator format.
152
+    def serialize(self, node_id, formatter_context=None):
153
+        """Serialize task in expected by orchestrator format
152 154
 
155
+        If serialization is performed on the remote worker
156
+        we should pass formatter_context parameter with values
157
+        from the master node settings
158
+
159
+        :param formatter_context: formatter context
153 160
         :param node_id: the target node_id
154 161
         """
155 162
 
@@ -157,10 +164,12 @@ class DeploymentTaskSerializer(object):
157 164
             "serialize task %s for node %s",
158 165
             self.task_template['id'], node_id
159 166
         )
167
+        formatter_context = formatter_context \
168
+            or self.context.get_formatter_context(node_id)
160 169
         task = utils.traverse(
161 170
             self.task_template,
162 171
             utils.text_format_safe,
163
-            self.context.get_formatter_context(node_id),
172
+            formatter_context,
164 173
             {
165 174
                 'yaql_exp': self.context.get_yaql_interpreter(
166 175
                     node_id, self.task_template['id'])

+ 313
- 4
nailgun/nailgun/lcm/transaction_serializer.py View File

@@ -14,13 +14,21 @@
14 14
 #    License for the specific language governing permissions and limitations
15 15
 #    under the License.
16 16
 
17
-from distutils.version import StrictVersion
17
+import datetime
18 18
 import multiprocessing
19
+import os
20
+from Queue import Queue
21
+import shutil
22
+import tempfile
19 23
 
24
+import distributed
25
+from distutils.version import StrictVersion
20 26
 import six
27
+import toolz
21 28
 
22 29
 from nailgun import consts
23 30
 from nailgun import errors
31
+from nailgun.lcm.task_serializer import Context
24 32
 from nailgun.lcm.task_serializer import TasksSerializersFactory
25 33
 from nailgun.logger import logger
26 34
 from nailgun.settings import settings
@@ -128,7 +136,308 @@ class MultiProcessingConcurrencyPolicy(object):
128 136
             pool.join()
129 137
 
130 138
 
131
-def get_concurrency_policy():
139
+def _distributed_serialize_tasks_for_node(formatter_contexts_idx,
140
+                                          node_and_tasks, scattered_data):
141
+    """Remote serialization call for DistributedProcessingPolicy
142
+
143
+    Code of the function is copied to the workers and executed there, thus
144
+    we are including all required imports inside the function.
145
+
146
+    :param formatter_contexts_idx: dict of formatter contexts with node_id
147
+    value as key
148
+    :param node_and_tasks: list of node_id, task_data tuples
149
+    :param scattered_data: feature object, that points to data copied to
150
+    workers
151
+    :return: [(node_id, serialized), error]
152
+    """
153
+
154
+    try:
155
+        factory = TasksSerializersFactory(scattered_data['context'])
156
+
157
+        # Restoring settings
158
+        settings.config = scattered_data['settings_config']
159
+        for k in formatter_contexts_idx:
160
+            formatter_contexts_idx[k]['SETTINGS'] = settings
161
+
162
+    except Exception as e:
163
+        logger.exception("Distributed serialization failed")
164
+        return [((None, None), e)]
165
+
166
+    result = []
167
+
168
+    for node_and_task in node_and_tasks:
169
+
170
+        node_id = None
171
+        try:
172
+            node_id, task = node_and_task
173
+
174
+            logger.debug("Starting distributed node %s task %s serialization",
175
+                         node_id, task['id'])
176
+
177
+            formatter_context = formatter_contexts_idx[node_id]
178
+
179
+            serializer = factory.create_serializer(task)
180
+            serialized = serializer.serialize(
181
+                node_id, formatter_context=formatter_context)
182
+
183
+            logger.debug("Distributed node %s task %s serialization "
184
+                         "result: %s", node_id, task['id'], serialized)
185
+
186
+            result.append(((node_id, serialized), None))
187
+        except Exception as e:
188
+            logger.exception("Distributed serialization failed")
189
+            result.append(((node_id, None), e))
190
+            break
191
+
192
+    logger.debug("Processed tasks count: %s", len(result))
193
+    return result
194
+
195
+
196
+class DistributedProcessingPolicy(object):
197
+
198
+    def __init__(self):
199
+        self.sent_jobs = Queue()
200
+        self.sent_jobs_count = 0
201
+
202
+    def _consume_jobs(self, chunk_size=None):
203
+        """Consumes jobs
204
+
205
+        If chunk_size is set function consumes specified number of
206
+        Finished tasks or less if sent_jobs_ids queue became empty.
207
+        If chunk_size is None function consumes jobs until
208
+        sent_jobs_ids queue became empty.
209
+        Jobs with statuses Cancelled, Abandoned, Terminated will be
210
+        resent and their ids added to sent_jobs_ids queue
211
+
212
+        :param chunk_size: size of consuming chunk
213
+        :return: generator on job results
214
+        """
215
+        logger.debug("Consuming jobs started")
216
+
217
+        jobs_to_consume = []
218
+        while not self.sent_jobs.empty():
219
+            job = self.sent_jobs.get()
220
+            jobs_to_consume.append(job)
221
+
222
+            if chunk_size is not None:
223
+                chunk_size -= 1
224
+                if chunk_size <= 0:
225
+                    break
226
+
227
+        for ready_job in distributed.as_completed(jobs_to_consume):
228
+            results = ready_job.result()
229
+            self.sent_jobs_count -= 1
230
+
231
+            for result in results:
232
+                (node_id, serialized), exc = result
233
+                logger.debug("Got ready task for node %s, serialized: %s, "
234
+                             "error: %s", node_id, serialized, exc)
235
+                if exc is not None:
236
+                    raise exc
237
+                yield node_id, serialized
238
+
239
+        logger.debug("Consuming jobs finished")
240
+
241
+    def _get_formatter_context(self, task_context, formatter_contexts_idx,
242
+                               node_id):
243
+        try:
244
+            return formatter_contexts_idx[node_id]
245
+        except KeyError:
246
+            pass
247
+
248
+        logger.debug("Calculating formatter context for node %s", node_id)
249
+        formatter_context = task_context.get_formatter_context(
250
+            node_id)
251
+        # Settings file is already sent to the workers
252
+        formatter_context.pop('SETTINGS', None)
253
+        formatter_contexts_idx[node_id] = formatter_context
254
+
255
+        return formatter_context
256
+
257
+    def _upload_nailgun_code(self, job_cluster):
258
+        """Creates zip of current nailgun code and uploads it to workers
259
+
260
+        TODO(akislitsky): add workers scope when it will be implemented
261
+        in the distributed library
262
+
263
+        :param job_cluster: distributed.Client
264
+        """
265
+        logger.debug("Compressing nailgun code")
266
+        file_dir = os.path.dirname(__file__)
267
+        nailgun_root_dir = os.path.realpath(os.path.join(file_dir, '..', '..'))
268
+        archive = os.path.join(tempfile.gettempdir(), 'nailgun')
269
+        result = shutil.make_archive(archive, 'zip', nailgun_root_dir,
270
+                                     'nailgun')
271
+        logger.debug("Nailgun code saved to: %s", result)
272
+
273
+        logger.debug("Uploading nailgun archive %s to workers", result)
274
+        job_cluster.upload_file(result)
275
+
276
+    def _scatter_data(self, job_cluster, context, workers):
277
+        logger.debug("Scattering data to workers started")
278
+        shared_data = {'context': context, 'settings_config': settings.config}
279
+        scattered = job_cluster.scatter(shared_data, broadcast=True,
280
+                                        workers=workers)
281
+        # Waiting data is scattered to workers
282
+        distributed.wait(scattered.values())
283
+        logger.debug("Scattering data to workers finished")
284
+
285
+        return scattered
286
+
287
+    def _get_allowed_nodes_statuses(self, context):
288
+        """Extracts node statuses that allows distributed serialization"""
289
+        common = context.new.get('common', {})
290
+        cluster = common.get('cluster', {})
291
+        logger.debug("Getting allowed nodes statuses to use as serialization "
292
+                     "workers for cluster %s", cluster.get('id'))
293
+        check_fields = {
294
+            'ds_use_ready': consts.NODE_STATUSES.ready,
295
+            'ds_use_provisioned': consts.NODE_STATUSES.provisioned,
296
+            'ds_use_discover': consts.NODE_STATUSES.discover,
297
+            'ds_use_error': consts.NODE_STATUSES.error
298
+        }
299
+        statuses = set()
300
+        for field, node_status in check_fields.items():
301
+            if common.get(field):
302
+                statuses.add(node_status)
303
+
304
+        logger.debug("Allowed nodes statuses to use as serialization workers "
305
+                     "for cluster %s are: %s", cluster.get('id'), statuses)
306
+        return statuses
307
+
308
+    def _get_allowed_nodes_ips(self, context):
309
+        """Filters online nodes from cluster by their status
310
+
311
+        In the cluster settings we select nodes statuses allowed for
312
+        using in the distributed serialization. Accordingly to selected
313
+        statuses nodes are going to be filtered.
314
+
315
+        :param context: TransactionContext
316
+        :return: set of allowed nodes ips
317
+        """
318
+        ips = set()
319
+        allowed_statuses = self._get_allowed_nodes_statuses(context)
320
+        for node in six.itervalues(context.new.get('nodes', {})):
321
+            if node.get('status') in allowed_statuses:
322
+                ips.add(node.get('ip'))
323
+        ips.add(settings.MASTER_IP)
324
+        return ips
325
+
326
+    def _get_allowed_workers(self, job_cluster, allowed_ips):
327
+        """Calculates workers addresses for distributed serialization
328
+
329
+        Only workers that placed on the allowed nodes must be selected
330
+        for the serialization.
331
+
332
+        :param job_cluster: distributed.Client
333
+        :param allowed_ips: allowed for serialization nodes ips
334
+        :return: list of workers addresses in format 'ip:port'
335
+        """
336
+        logger.debug("Getting allowed workers")
337
+        workers = {}
338
+
339
+        # Worker has address like tcp://ip:port
340
+        info = job_cluster.scheduler_info()
341
+        for worker_addr in six.iterkeys(info['workers']):
342
+            ip_port = worker_addr.split('//')[1]
343
+            ip = ip_port.split(':')[0]
344
+            if ip not in allowed_ips:
345
+                continue
346
+            try:
347
+                pool = workers[ip]
348
+                pool.add(ip_port)
349
+            except KeyError:
350
+                workers[ip] = set([ip_port])
351
+
352
+        return list(toolz.itertoolz.concat(six.itervalues(workers)))
353
+
354
+    def execute(self, context, _, tasks):
355
+        """Executes task serialization on distributed nodes
356
+
357
+        :param context: the transaction context
358
+        :param _: serializers factory
359
+        :param tasks: the tasks to serialize
360
+        :return sequence of serialized tasks
361
+        """
362
+        logger.debug("Performing distributed tasks processing")
363
+        sched_address = '{0}:{1}'.format(settings.MASTER_IP,
364
+                                         settings.LCM_DS_JOB_SHEDULER_PORT)
365
+        job_cluster = distributed.Client(sched_address)
366
+
367
+        allowed_ips = self._get_allowed_nodes_ips(context)
368
+        workers = self._get_allowed_workers(job_cluster, allowed_ips)
369
+        logger.debug("Allowed workers list for serialization: %s", workers)
370
+        workers_ips = set([ip_port.split(':')[0] for ip_port in workers])
371
+        logger.debug("Allowed workers ips list for serialization: %s",
372
+                     workers_ips)
373
+
374
+        task_context = Context(context)
375
+        formatter_contexts_idx = {}
376
+        workers_num = len(workers)
377
+        max_jobs_in_queue = workers_num * settings.LCM_DS_NODE_LOAD_COEFF
378
+        logger.debug("Max jobs allowed in queue: %s", max_jobs_in_queue)
379
+
380
+        start = datetime.datetime.utcnow()
381
+        tasks_count = 0
382
+
383
+        try:
384
+            self._upload_nailgun_code(job_cluster)
385
+            scattered = self._scatter_data(job_cluster, context, workers)
386
+
387
+            for tasks_chunk in toolz.partition_all(
388
+                    settings.LCM_DS_TASKS_PER_JOB, tasks):
389
+
390
+                formatter_contexts_for_tasks = {}
391
+
392
+                # Collecting required contexts for tasks
393
+                for task in tasks_chunk:
394
+                    node_id, task_data = task
395
+                    formatter_context = self._get_formatter_context(
396
+                        task_context, formatter_contexts_idx, node_id)
397
+                    if node_id not in formatter_contexts_for_tasks:
398
+                        formatter_contexts_for_tasks[node_id] = \
399
+                            formatter_context
400
+
401
+                logger.debug("Submitting job for tasks chunk: %s", tasks_chunk)
402
+                job = job_cluster.submit(
403
+                    _distributed_serialize_tasks_for_node,
404
+                    formatter_contexts_for_tasks,
405
+                    tasks_chunk,
406
+                    scattered,
407
+                    workers=workers_ips
408
+                )
409
+
410
+                self.sent_jobs.put(job)
411
+                self.sent_jobs_count += 1
412
+
413
+                # We are limit the max number of tasks by the number of nodes
414
+                # which are used in the serialization
415
+                if self.sent_jobs_count >= max_jobs_in_queue:
416
+                    for result in self._consume_jobs(chunk_size=workers_num):
417
+                        tasks_count += 1
418
+                        yield result
419
+
420
+            # We have no tasks any more but have unconsumed jobs
421
+            for result in self._consume_jobs():
422
+                tasks_count += 1
423
+                yield result
424
+        finally:
425
+            end = datetime.datetime.utcnow()
426
+            logger.debug("Distributed tasks processing finished. "
427
+                         "Total time: %s. Tasks processed: %s",
428
+                         end - start, tasks_count)
429
+            job_cluster.shutdown()
430
+
431
+
432
+def is_distributed_processing_enabled(context):
433
+    common = context.new.get('common', {})
434
+    return common.get('serialization_policy') == \
435
+        consts.SERIALIZATION_POLICY.distributed
436
+
437
+
438
+def get_processing_policy(context):
439
+    if is_distributed_processing_enabled(context):
440
+        return DistributedProcessingPolicy()
132 441
     cpu_num = settings.LCM_SERIALIZERS_CONCURRENCY_FACTOR
133 442
     if not cpu_num:
134 443
         try:
@@ -162,7 +471,7 @@ class TransactionSerializer(object):
162 471
         # ids of nodes in this group and how many nodes in this group can fail
163 472
         # and deployment will not be interrupted
164 473
         self.fault_tolerance_groups = []
165
-        self.concurrency_policy = get_concurrency_policy()
474
+        self.processing_policy = get_processing_policy(context)
166 475
 
167 476
     @classmethod
168 477
     def serialize(cls, context, tasks, resolver):
@@ -216,7 +525,7 @@ class TransactionSerializer(object):
216 525
         :param tasks: the deployment tasks
217 526
         :return the mapping tasks per node
218 527
         """
219
-        serialized = self.concurrency_policy.execute(
528
+        serialized = self.processing_policy.execute(
220 529
             self.context,
221 530
             self.serializer_factory_class,
222 531
             self.expand_tasks(tasks)

+ 1
- 0
nailgun/nailgun/orchestrator/deployment_serializers.py View File

@@ -221,6 +221,7 @@ class DeploymentMultinodeSerializer(object):
221 221
             'role': role,
222 222
             'vms_conf': node.vms_conf,
223 223
             'fail_if_error': role in self.critical_roles,
224
+            'ip': node.ip,
224 225
             # TODO(eli): need to remove, requried for the fake thread only
225 226
             'online': node.online,
226 227
         }

+ 18
- 4
nailgun/nailgun/settings.py View File

@@ -38,7 +38,21 @@ class NailgunSettings(object):
38 38
         if test_config:
39 39
             settings_files.append(test_config)
40 40
 
41
-        self.config = {}
41
+        # If settings.yaml doesn't exist we should have default
42
+        # config structure. Nailgun without settings is used
43
+        # when we distribute source code to the workers for
44
+        # distributed serialization
45
+        self.config = {
46
+            'VERSION': {},
47
+            'DATABASE': {
48
+                'engine': 'postgresql',
49
+                'name': '',
50
+                'host': '',
51
+                'port': '0',
52
+                'user': '',
53
+                'passwd': ''
54
+            }
55
+        }
42 56
         for sf in settings_files:
43 57
             try:
44 58
                 logger.debug("Trying to read config file %s" % sf)
@@ -47,9 +61,9 @@ class NailgunSettings(object):
47 61
                 logger.error("Error while reading config file %s: %s" %
48 62
                              (sf, str(e)))
49 63
 
50
-        self.config['VERSION']['api'] = self.config['API']
64
+        self.config['VERSION']['api'] = self.config.get('API')
51 65
         self.config['VERSION']['feature_groups'] = \
52
-            self.config['FEATURE_GROUPS']
66
+            self.config.get('FEATURE_GROUPS')
53 67
 
54 68
         fuel_release = self.get_file_content(consts.FUEL_RELEASE_FILE)
55 69
         if fuel_release:
@@ -61,7 +75,7 @@ class NailgunSettings(object):
61 75
             self.config['VERSION']['openstack_version'] = \
62 76
                 fuel_openstack_version
63 77
 
64
-        if int(self.config.get("DEVELOPMENT")):
78
+        if int(self.config.get("DEVELOPMENT", 0)):
65 79
             logger.info("DEVELOPMENT MODE ON:")
66 80
             here = os.path.abspath(
67 81
                 os.path.join(os.path.dirname(__file__), '..')

+ 9
- 0
nailgun/nailgun/settings.yaml View File

@@ -177,6 +177,15 @@ YAQL_MEMORY_QUOTA: 104857600
177 177
 
178 178
 LCM_CHECK_TASK_VERSION: False
179 179
 
180
+# Coefficient for calculation max jobs queue length. If jobs number reaches the
181
+# len(nodes) * load_coef we stop generate and start consume of jobs.
182
+LCM_DS_NODE_LOAD_COEFF: 2
183
+# Port of dask-scheduler on the master node
184
+LCM_DS_JOB_SHEDULER_PORT: 8002
185
+# Size of tasks chunk sending to the distributed worker
186
+LCM_DS_TASKS_PER_JOB: 100
187
+
188
+
180 189
 DPDK_MAX_CPUS_PER_NIC: 4
181 190
 
182 191
 TRUNCATE_LOG_ENTRIES: 100

+ 10
- 0
nailgun/nailgun/statistics/fuel_statistics/installation_info.py View File

@@ -56,6 +56,16 @@ class InstallationInfo(object):
56 56
                       'propagate_task_deploy', None),
57 57
         WhiteListRule(('common', 'security_groups', 'value'),
58 58
                       'security_groups', None),
59
+        WhiteListRule(('common', 'serialization_policy', 'value'),
60
+                      'serialization_policy', None),
61
+        WhiteListRule(('common', 'ds_use_discover', 'value'),
62
+                      'ds_use_discover', None),
63
+        WhiteListRule(('common', 'ds_use_provisioned', 'value'),
64
+                      'ds_use_provisioned', None),
65
+        WhiteListRule(('common', 'ds_use_ready', 'value'),
66
+                      'ds_use_ready', None),
67
+        WhiteListRule(('common', 'ds_use_error', 'value'),
68
+                      'ds_use_error', None),
59 69
         WhiteListRule(('corosync', 'verified', 'value'),
60 70
                       'corosync_verified', None),
61 71
 

+ 3
- 0
nailgun/nailgun/test/integration/test_cluster_changes_handler.py View File

@@ -187,6 +187,7 @@ class TestHandlers(BaseIntegrationTest):
187 187
                     'fail_if_error': is_critical,
188 188
                     'vms_conf': [],
189 189
                     'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN),
190
+                    'ip': node.ip,
190 191
 
191 192
                     'network_data': {
192 193
                         'eth1': {
@@ -603,6 +604,7 @@ class TestHandlers(BaseIntegrationTest):
603 604
                     'online': node.online,
604 605
                     'fail_if_error': is_critical,
605 606
                     'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN),
607
+                    'ip': node.ip,
606 608
                     'priority': 100,
607 609
                     'vms_conf': [],
608 610
                     'network_scheme': {
@@ -1096,6 +1098,7 @@ class TestHandlers(BaseIntegrationTest):
1096 1098
                     'fail_if_error': is_critical,
1097 1099
                     'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN),
1098 1100
                     'priority': 100,
1101
+                    'ip': node.ip,
1099 1102
                     'vms_conf': [],
1100 1103
 
1101 1104
                     'network_scheme': {

+ 348
- 3
nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py View File

@@ -14,20 +14,24 @@
14 14
 #    License for the specific language governing permissions and limitations
15 15
 #    under the License.
16 16
 
17
+import copy
18
+import exceptions
17 19
 import mock
18 20
 import multiprocessing.dummy
19 21
 
20 22
 from nailgun import consts
21 23
 from nailgun import errors
22 24
 from nailgun import lcm
25
+from nailgun.lcm import TransactionContext
26
+from nailgun.settings import settings
27
+from nailgun.test.base import BaseTestCase
23 28
 from nailgun.utils.resolvers import TagResolver
24 29
 
25
-from nailgun.test.base import BaseUnitTest
26 30
 
27
-
28
-class TestTransactionSerializer(BaseUnitTest):
31
+class TestTransactionSerializer(BaseTestCase):
29 32
     @classmethod
30 33
     def setUpClass(cls):
34
+        super(TestTransactionSerializer, cls).setUpClass()
31 35
         cls.tasks = [
32 36
             {
33 37
                 'id': 'task1', 'roles': ['controller'],
@@ -462,3 +466,344 @@ class TestTransactionSerializer(BaseUnitTest):
462 466
             9,
463 467
             lcm.TransactionSerializer.calculate_fault_tolerance('-1 ', 10)
464 468
         )
469
+
470
+    def _get_context_for_distributed_serialization(self):
471
+        new = copy.deepcopy(self.context.new)
472
+        new['common']['serialization_policy'] = \
473
+            consts.SERIALIZATION_POLICY.distributed
474
+        return TransactionContext(new)
475
+
476
+    @mock.patch('nailgun.lcm.transaction_serializer.distributed.wait')
477
+    @mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed')
478
+    def test_distributed_serialization(self, _, as_completed):
479
+        context = self._get_context_for_distributed_serialization()
480
+
481
+        with mock.patch(
482
+                'nailgun.lcm.transaction_serializer.distributed.Client'
483
+        ) as job_cluster:
484
+            job = mock.Mock()
485
+            job.result.return_value = [
486
+                (('1', {"id": "task1", "type": "skipped"}), None)
487
+            ]
488
+
489
+            submit = mock.Mock()
490
+            submit.return_value = job
491
+
492
+            as_completed.return_value = [job]
493
+
494
+            job_cluster.return_value.submit = submit
495
+            job_cluster.return_value.scheduler_info.return_value = \
496
+                {'workers': {'tcp://worker': {}}}
497
+
498
+            lcm.TransactionSerializer.serialize(
499
+                context, self.tasks, self.resolver)
500
+            self.assertTrue(submit.called)
501
+            # 4 controller task + 1 compute + 1 cinder
502
+            self.assertTrue(6, submit.call_count)
503
+
504
+    @mock.patch('nailgun.lcm.transaction_serializer.distributed.wait')
505
+    @mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed')
506
+    @mock.patch('nailgun.lcm.transaction_serializer.'
507
+                'DistributedProcessingPolicy._get_formatter_context')
508
+    def test_distributed_serialization_workers_scope(self, formatter_context,
509
+                                                     as_completed, _):
510
+        context = self._get_context_for_distributed_serialization()
511
+
512
+        node_id = '1'
513
+        task = {
514
+            'id': 'task1', 'roles': ['controller'],
515
+            'type': 'puppet', 'version': '2.0.0'
516
+        }
517
+
518
+        with mock.patch(
519
+                'nailgun.lcm.transaction_serializer.distributed.Client'
520
+        ) as job_cluster:
521
+
522
+            # Mocking job processing
523
+            job = mock.Mock()
524
+            job.result.return_value = [((node_id, task), None)]
525
+
526
+            submit = mock.Mock()
527
+            submit.return_value = job
528
+
529
+            as_completed.return_value = [job]
530
+
531
+            scatter = mock.Mock()
532
+            job_cluster.return_value.scatter = scatter
533
+
534
+            job_cluster.return_value.scatter.return_value = {}
535
+            job_cluster.return_value.submit = submit
536
+
537
+            formatter_context.return_value = {node_id: {}}
538
+
539
+            # Configuring available workers
540
+            job_cluster.return_value.scheduler_info.return_value = \
541
+                {
542
+                    'workers': {
543
+                        'tcp://{0}'.format(settings.MASTER_IP): {},
544
+                        'tcp://192.168.0.1:33334': {},
545
+                        'tcp://127.0.0.2:33335': {},
546
+                    }
547
+                }
548
+
549
+            # Performing serialization
550
+            lcm.TransactionSerializer.serialize(
551
+                context, [task], self.resolver
552
+            )
553
+
554
+            # Checking data is scattered only to expected workers
555
+            scatter.assert_called_once()
556
+            scatter.assert_called_with(
557
+                {'context': context, 'settings_config': settings.config},
558
+                broadcast=True,
559
+                workers=[settings.MASTER_IP]
560
+            )
561
+
562
+            # Checking submit job only to expected workers
563
+            submit.assert_called_once()
564
+            serializer = lcm.transaction_serializer
565
+            submit.assert_called_with(
566
+                serializer._distributed_serialize_tasks_for_node,
567
+                {node_id: formatter_context()},
568
+                ((node_id, task),),
569
+                job_cluster().scatter(),
570
+                workers=set([settings.MASTER_IP])
571
+            )
572
+
573
+    def test_distributed_serialization_get_allowed_nodes_ips(self):
574
+        policy = lcm.transaction_serializer.DistributedProcessingPolicy()
575
+
576
+        context_data = {
577
+            'common': {
578
+                'serialization_policy':
579
+                    consts.SERIALIZATION_POLICY.distributed,
580
+                'ds_use_error': True,
581
+                'ds_use_provisioned': True,
582
+                'ds_use_discover': True,
583
+                'ds_use_ready': False
584
+            },
585
+            'nodes': {
586
+                '1': {'status': consts.NODE_STATUSES.error,
587
+                      'ip': '10.20.0.3'},
588
+                '2': {'status': consts.NODE_STATUSES.provisioned,
589
+                      'ip': '10.20.0.4'},
590
+                '3': {'status': consts.NODE_STATUSES.discover,
591
+                      'ip': '10.20.0.5'},
592
+                '4': {'status': consts.NODE_STATUSES.ready,
593
+                      'ip': '10.20.0.6'},
594
+            }
595
+        }
596
+
597
+        actual = policy._get_allowed_nodes_ips(
598
+            TransactionContext(context_data))
599
+        self.assertItemsEqual(
600
+            [settings.MASTER_IP, '10.20.0.3', '10.20.0.4', '10.20.0.5'],
601
+            actual
602
+        )
603
+
604
+    def test_distributed_serialization_get_allowed_nodes_statuses(self):
605
+        policy = lcm.transaction_serializer.DistributedProcessingPolicy()
606
+        context_data = {}
607
+        actual = policy._get_allowed_nodes_statuses(
608
+            TransactionContext(context_data))
609
+        self.assertItemsEqual([], actual)
610
+
611
+        context_data['common'] = {
612
+            'ds_use_discover': False,
613
+            'ds_use_provisioned': False,
614
+            'ds_use_error': False,
615
+            'ds_use_ready': False
616
+        }
617
+        actual = policy._get_allowed_nodes_statuses(
618
+            TransactionContext(context_data))
619
+        self.assertItemsEqual([], actual)
620
+
621
+        context_data['common']['ds_use_discover'] = True
622
+        actual = policy._get_allowed_nodes_statuses(
623
+            TransactionContext(context_data))
624
+        expected = [consts.NODE_STATUSES.discover]
625
+        self.assertItemsEqual(expected, actual)
626
+
627
+        context_data['common']['ds_use_provisioned'] = True
628
+        actual = policy._get_allowed_nodes_statuses(
629
+            TransactionContext(context_data))
630
+        expected = [consts.NODE_STATUSES.discover,
631
+                    consts.NODE_STATUSES.provisioned]
632
+        self.assertItemsEqual(expected, actual)
633
+
634
+        context_data['common']['ds_use_error'] = True
635
+        actual = policy._get_allowed_nodes_statuses(
636
+            TransactionContext(context_data))
637
+        expected = [consts.NODE_STATUSES.discover,
638
+                    consts.NODE_STATUSES.provisioned,
639
+                    consts.NODE_STATUSES.error]
640
+        self.assertItemsEqual(expected, actual)
641
+
642
+        context_data['common']['ds_use_ready'] = True
643
+        actual = policy._get_allowed_nodes_statuses(
644
+            TransactionContext(context_data))
645
+        expected = [consts.NODE_STATUSES.discover,
646
+                    consts.NODE_STATUSES.provisioned,
647
+                    consts.NODE_STATUSES.error,
648
+                    consts.NODE_STATUSES.ready]
649
+        self.assertItemsEqual(expected, actual)
650
+
651
+    def test_distributed_serialization_get_allowed_workers(self):
652
+        policy = lcm.transaction_serializer.DistributedProcessingPolicy()
653
+
654
+        with mock.patch(
655
+                'nailgun.lcm.transaction_serializer.distributed.Client'
656
+        ) as job_cluster:
657
+            job_cluster.scheduler_info.return_value = \
658
+                {'workers': {
659
+                    'tcp://10.20.0.2:1': {},
660
+                    'tcp://10.20.0.2:2': {},
661
+                    'tcp://10.20.0.3:1': {},
662
+                    'tcp://10.20.0.3:2': {},
663
+                    'tcp://10.20.0.3:3': {},
664
+                    'tcp://10.20.0.4:1': {},
665
+                    'tcp://10.20.0.5:1': {}
666
+                }}
667
+            allowed_ips = set(['10.20.0.2', '10.20.0.3', '10.20.0.5'])
668
+
669
+            expected = ['10.20.0.2:1', '10.20.0.2:2', '10.20.0.3:1',
670
+                        '10.20.0.3:2', '10.20.0.3:3', '10.20.0.5:1']
671
+            actual = policy._get_allowed_workers(job_cluster, allowed_ips)
672
+            self.assertItemsEqual(expected, actual)
673
+
674
+    def test_distributed_serialization_serialize_task(self):
675
+        task = {
676
+            'id': 'task1', 'roles': ['controller'],
677
+            'type': 'puppet', 'version': '2.0.0',
678
+            'parameters': {
679
+                'master_ip': '{MN_IP}',
680
+                'host': {'yaql_exp': '$.public_ssl.hostname'},
681
+                'attr': {'yaql_exp': '$node.attributes.a_str'}
682
+            }
683
+        }
684
+
685
+        formatter_contexts_idx = {
686
+            '1': {'MN_IP': '10.0.0.1'},
687
+            '2': {}
688
+        }
689
+        scattered_data = {
690
+            'settings_config': settings.config,
691
+            'context': self.context
692
+        }
693
+
694
+        serializer = lcm.transaction_serializer
695
+        actual = serializer._distributed_serialize_tasks_for_node(
696
+            formatter_contexts_idx, [('1', task), ('2', task)], scattered_data)
697
+
698
+        expected = [
699
+            (
700
+                (
701
+                    '1',
702
+                    {
703
+                        'id': 'task1',
704
+                        'type': 'puppet',
705
+                        'parameters': {
706
+                            'cwd': '/',
707
+                            'master_ip': '10.0.0.1',
708
+                            'host': 'localhost',
709
+                            'attr': 'text1'
710
+                        },
711
+                        'fail_on_error': True
712
+                    }
713
+                ),
714
+                None
715
+            ),
716
+            (
717
+                (
718
+                    '2',
719
+                    {
720
+                        'id': 'task1',
721
+                        'type': 'puppet',
722
+                        'parameters': {
723
+                            'cwd': '/',
724
+                            'master_ip': '{MN_IP}',
725
+                            'host': 'localhost',
726
+                            'attr': 'text2'
727
+                        },
728
+                        'fail_on_error': True
729
+                    }
730
+                ),
731
+                None
732
+            )
733
+        ]
734
+
735
+        self.assertItemsEqual(expected, actual)
736
+
737
+    def test_distributed_serialization_serialize_task_failure(self):
738
+        task = {
739
+            'id': 'task1', 'roles': ['controller'],
740
+            'type': 'puppet', 'version': '2.0.0',
741
+            'parameters': {
742
+                'fake': {'yaql_exp': '$.some.fake_param'}
743
+            }
744
+        }
745
+
746
+        formatter_contexts_idx = {'2': {}}
747
+        scattered_data = {
748
+            'settings_config': settings.config,
749
+            'context': self.context
750
+        }
751
+
752
+        serializer = lcm.transaction_serializer
753
+        result = serializer._distributed_serialize_tasks_for_node(
754
+            formatter_contexts_idx, [('2', task)], scattered_data)
755
+        (_, __), err = result[0]
756
+        self.assertIsInstance(err, exceptions.KeyError)
757
+
758
+
759
+class TestConcurrencyPolicy(BaseTestCase):
760
+
761
+    @mock.patch(
762
+        'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count',
763
+        return_value=1
764
+    )
765
+    def test_one_cpu(self, cpu_count):
766
+        policy = lcm.transaction_serializer.get_processing_policy(
767
+            lcm.TransactionContext({}))
768
+        self.assertIsInstance(
769
+            policy,
770
+            lcm.transaction_serializer.SingleWorkerConcurrencyPolicy
771
+        )
772
+        self.assertTrue(cpu_count.is_called)
773
+
774
+    @mock.patch(
775
+        'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count',
776
+        return_value=0
777
+    )
778
+    def test_zero_cpu(self, cpu_count):
779
+        policy = lcm.transaction_serializer.get_processing_policy(
780
+            lcm.TransactionContext({}))
781
+        self.assertIsInstance(
782
+            policy,
783
+            lcm.transaction_serializer.SingleWorkerConcurrencyPolicy
784
+        )
785
+        self.assertTrue(cpu_count.is_called)
786
+
787
+    @mock.patch(
788
+        'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count',
789
+        side_effect=NotImplementedError
790
+    )
791
+    def test_cpu_count_not_implemented(self, cpu_count):
792
+        policy = lcm.transaction_serializer.get_processing_policy(
793
+            lcm.TransactionContext({}))
794
+        self.assertIsInstance(
795
+            policy,
796
+            lcm.transaction_serializer.SingleWorkerConcurrencyPolicy
797
+        )
798
+        self.assertTrue(cpu_count.is_called)
799
+
800
+    def test_distributed_serialization_enabled_in_cluster(self):
801
+        context_data = {'common': {
802
+            'serialization_policy': consts.SERIALIZATION_POLICY.distributed
803
+        }}
804
+        policy = lcm.transaction_serializer.get_processing_policy(
805
+            lcm.TransactionContext(context_data))
806
+        self.assertIsInstance(
807
+            policy,
808
+            lcm.transaction_serializer.DistributedProcessingPolicy
809
+        )

+ 2
- 0
nailgun/requirements.txt View File

@@ -47,3 +47,5 @@ stevedore>=1.5.0
47 47
 # See: https://bugs.launchpad.net/fuel/+bug/1519727
48 48
 setuptools<=18.5
49 49
 yaql>=1.0.0
50
+# Distributed nodes serialization
51
+distributed==1.16.0

Loading…
Cancel
Save