diff --git a/nailgun/nailgun/consts.py b/nailgun/nailgun/consts.py index 3d3b1ae18b..4c790cc992 100644 --- a/nailgun/nailgun/consts.py +++ b/nailgun/nailgun/consts.py @@ -532,3 +532,7 @@ DPDK_DRIVER_IN_SRIOV_CASE = 'vfio-pci' DEFAULT_MTU = 1500 SIZE_OF_VLAN_TAG = 4 + +SERIALIZATION_POLICY = Enum( + 'distributed' +) diff --git a/nailgun/nailgun/fixtures/openstack.yaml b/nailgun/nailgun/fixtures/openstack.yaml index 8b4d763841..5557710d0b 100644 --- a/nailgun/nailgun/fixtures/openstack.yaml +++ b/nailgun/nailgun/fixtures/openstack.yaml @@ -1114,6 +1114,55 @@ group: "security" weight: 20 type: "radio" + serialization_policy: + value: "default" + values: + - data: "default" + label: "Default serialization" + description: "Run serialization on the master node only" + - data: "distributed" + label: "Distributed serialization" + description: "Run serialization on the master and environment nodes. Nodes for serialization are selected only form that environment for wich serialization is performing." + label: "Serialization policy" + group: "general" + weight: 30 + type: "radio" + ds_use_discover: + group: "general" + label: "Use discovered nodes as workers for serialization" + type: "checkbox" + value: true + weight: 31 + restrictions: + - condition: "settings:common.serialization_policy.value != 'distributed'" + action: "hide" + ds_use_provisioned: + group: "general" + label: "Use provisioned nodes as workers for serialization" + type: "checkbox" + value: true + weight: 32 + restrictions: + - condition: "settings:common.serialization_policy.value != 'distributed'" + action: "hide" + ds_use_error: + group: "general" + label: "Use nodes in error state as workers for serialization" + type: "checkbox" + value: true + weight: 33 + restrictions: + - condition: "settings:common.serialization_policy.value != 'distributed'" + action: "hide" + ds_use_ready: + group: "general" + label: "Use ready nodes as workers for serialization" + type: "checkbox" + value: false + weight: 34 + restrictions: + - condition: "settings:common.serialization_policy.value != 'distributed'" + action: "hide" public_network_assignment: metadata: weight: 10 diff --git a/nailgun/nailgun/lcm/task_serializer.py b/nailgun/nailgun/lcm/task_serializer.py index c00411542a..0d70f3695e 100644 --- a/nailgun/nailgun/lcm/task_serializer.py +++ b/nailgun/nailgun/lcm/task_serializer.py @@ -110,6 +110,8 @@ class Context(object): return evaluate def get_formatter_context(self, node_id): + # TODO(akislitsky) remove formatter context from the + # tasks serialization workflow data = self._transaction.get_new_data(node_id) return { 'CLUSTER_ID': data.get('cluster', {}).get('id'), @@ -147,9 +149,14 @@ class DeploymentTaskSerializer(object): :return: the result """ - def serialize(self, node_id): - """Serialize task in expected by orchestrator format. + def serialize(self, node_id, formatter_context=None): + """Serialize task in expected by orchestrator format + If serialization is performed on the remote worker + we should pass formatter_context parameter with values + from the master node settings + + :param formatter_context: formatter context :param node_id: the target node_id """ @@ -157,10 +164,12 @@ class DeploymentTaskSerializer(object): "serialize task %s for node %s", self.task_template['id'], node_id ) + formatter_context = formatter_context \ + or self.context.get_formatter_context(node_id) task = utils.traverse( self.task_template, utils.text_format_safe, - self.context.get_formatter_context(node_id), + formatter_context, { 'yaql_exp': self.context.get_yaql_interpreter( node_id, self.task_template['id']) diff --git a/nailgun/nailgun/lcm/transaction_serializer.py b/nailgun/nailgun/lcm/transaction_serializer.py index c02146b6bb..32953c1555 100644 --- a/nailgun/nailgun/lcm/transaction_serializer.py +++ b/nailgun/nailgun/lcm/transaction_serializer.py @@ -14,13 +14,21 @@ # License for the specific language governing permissions and limitations # under the License. -from distutils.version import StrictVersion +import datetime import multiprocessing +import os +from Queue import Queue +import shutil +import tempfile +import distributed +from distutils.version import StrictVersion import six +import toolz from nailgun import consts from nailgun import errors +from nailgun.lcm.task_serializer import Context from nailgun.lcm.task_serializer import TasksSerializersFactory from nailgun.logger import logger from nailgun.settings import settings @@ -128,7 +136,308 @@ class MultiProcessingConcurrencyPolicy(object): pool.join() -def get_concurrency_policy(): +def _distributed_serialize_tasks_for_node(formatter_contexts_idx, + node_and_tasks, scattered_data): + """Remote serialization call for DistributedProcessingPolicy + + Code of the function is copied to the workers and executed there, thus + we are including all required imports inside the function. + + :param formatter_contexts_idx: dict of formatter contexts with node_id + value as key + :param node_and_tasks: list of node_id, task_data tuples + :param scattered_data: feature object, that points to data copied to + workers + :return: [(node_id, serialized), error] + """ + + try: + factory = TasksSerializersFactory(scattered_data['context']) + + # Restoring settings + settings.config = scattered_data['settings_config'] + for k in formatter_contexts_idx: + formatter_contexts_idx[k]['SETTINGS'] = settings + + except Exception as e: + logger.exception("Distributed serialization failed") + return [((None, None), e)] + + result = [] + + for node_and_task in node_and_tasks: + + node_id = None + try: + node_id, task = node_and_task + + logger.debug("Starting distributed node %s task %s serialization", + node_id, task['id']) + + formatter_context = formatter_contexts_idx[node_id] + + serializer = factory.create_serializer(task) + serialized = serializer.serialize( + node_id, formatter_context=formatter_context) + + logger.debug("Distributed node %s task %s serialization " + "result: %s", node_id, task['id'], serialized) + + result.append(((node_id, serialized), None)) + except Exception as e: + logger.exception("Distributed serialization failed") + result.append(((node_id, None), e)) + break + + logger.debug("Processed tasks count: %s", len(result)) + return result + + +class DistributedProcessingPolicy(object): + + def __init__(self): + self.sent_jobs = Queue() + self.sent_jobs_count = 0 + + def _consume_jobs(self, chunk_size=None): + """Consumes jobs + + If chunk_size is set function consumes specified number of + Finished tasks or less if sent_jobs_ids queue became empty. + If chunk_size is None function consumes jobs until + sent_jobs_ids queue became empty. + Jobs with statuses Cancelled, Abandoned, Terminated will be + resent and their ids added to sent_jobs_ids queue + + :param chunk_size: size of consuming chunk + :return: generator on job results + """ + logger.debug("Consuming jobs started") + + jobs_to_consume = [] + while not self.sent_jobs.empty(): + job = self.sent_jobs.get() + jobs_to_consume.append(job) + + if chunk_size is not None: + chunk_size -= 1 + if chunk_size <= 0: + break + + for ready_job in distributed.as_completed(jobs_to_consume): + results = ready_job.result() + self.sent_jobs_count -= 1 + + for result in results: + (node_id, serialized), exc = result + logger.debug("Got ready task for node %s, serialized: %s, " + "error: %s", node_id, serialized, exc) + if exc is not None: + raise exc + yield node_id, serialized + + logger.debug("Consuming jobs finished") + + def _get_formatter_context(self, task_context, formatter_contexts_idx, + node_id): + try: + return formatter_contexts_idx[node_id] + except KeyError: + pass + + logger.debug("Calculating formatter context for node %s", node_id) + formatter_context = task_context.get_formatter_context( + node_id) + # Settings file is already sent to the workers + formatter_context.pop('SETTINGS', None) + formatter_contexts_idx[node_id] = formatter_context + + return formatter_context + + def _upload_nailgun_code(self, job_cluster): + """Creates zip of current nailgun code and uploads it to workers + + TODO(akislitsky): add workers scope when it will be implemented + in the distributed library + + :param job_cluster: distributed.Client + """ + logger.debug("Compressing nailgun code") + file_dir = os.path.dirname(__file__) + nailgun_root_dir = os.path.realpath(os.path.join(file_dir, '..', '..')) + archive = os.path.join(tempfile.gettempdir(), 'nailgun') + result = shutil.make_archive(archive, 'zip', nailgun_root_dir, + 'nailgun') + logger.debug("Nailgun code saved to: %s", result) + + logger.debug("Uploading nailgun archive %s to workers", result) + job_cluster.upload_file(result) + + def _scatter_data(self, job_cluster, context, workers): + logger.debug("Scattering data to workers started") + shared_data = {'context': context, 'settings_config': settings.config} + scattered = job_cluster.scatter(shared_data, broadcast=True, + workers=workers) + # Waiting data is scattered to workers + distributed.wait(scattered.values()) + logger.debug("Scattering data to workers finished") + + return scattered + + def _get_allowed_nodes_statuses(self, context): + """Extracts node statuses that allows distributed serialization""" + common = context.new.get('common', {}) + cluster = common.get('cluster', {}) + logger.debug("Getting allowed nodes statuses to use as serialization " + "workers for cluster %s", cluster.get('id')) + check_fields = { + 'ds_use_ready': consts.NODE_STATUSES.ready, + 'ds_use_provisioned': consts.NODE_STATUSES.provisioned, + 'ds_use_discover': consts.NODE_STATUSES.discover, + 'ds_use_error': consts.NODE_STATUSES.error + } + statuses = set() + for field, node_status in check_fields.items(): + if common.get(field): + statuses.add(node_status) + + logger.debug("Allowed nodes statuses to use as serialization workers " + "for cluster %s are: %s", cluster.get('id'), statuses) + return statuses + + def _get_allowed_nodes_ips(self, context): + """Filters online nodes from cluster by their status + + In the cluster settings we select nodes statuses allowed for + using in the distributed serialization. Accordingly to selected + statuses nodes are going to be filtered. + + :param context: TransactionContext + :return: set of allowed nodes ips + """ + ips = set() + allowed_statuses = self._get_allowed_nodes_statuses(context) + for node in six.itervalues(context.new.get('nodes', {})): + if node.get('status') in allowed_statuses: + ips.add(node.get('ip')) + ips.add(settings.MASTER_IP) + return ips + + def _get_allowed_workers(self, job_cluster, allowed_ips): + """Calculates workers addresses for distributed serialization + + Only workers that placed on the allowed nodes must be selected + for the serialization. + + :param job_cluster: distributed.Client + :param allowed_ips: allowed for serialization nodes ips + :return: list of workers addresses in format 'ip:port' + """ + logger.debug("Getting allowed workers") + workers = {} + + # Worker has address like tcp://ip:port + info = job_cluster.scheduler_info() + for worker_addr in six.iterkeys(info['workers']): + ip_port = worker_addr.split('//')[1] + ip = ip_port.split(':')[0] + if ip not in allowed_ips: + continue + try: + pool = workers[ip] + pool.add(ip_port) + except KeyError: + workers[ip] = set([ip_port]) + + return list(toolz.itertoolz.concat(six.itervalues(workers))) + + def execute(self, context, _, tasks): + """Executes task serialization on distributed nodes + + :param context: the transaction context + :param _: serializers factory + :param tasks: the tasks to serialize + :return sequence of serialized tasks + """ + logger.debug("Performing distributed tasks processing") + sched_address = '{0}:{1}'.format(settings.MASTER_IP, + settings.LCM_DS_JOB_SHEDULER_PORT) + job_cluster = distributed.Client(sched_address) + + allowed_ips = self._get_allowed_nodes_ips(context) + workers = self._get_allowed_workers(job_cluster, allowed_ips) + logger.debug("Allowed workers list for serialization: %s", workers) + workers_ips = set([ip_port.split(':')[0] for ip_port in workers]) + logger.debug("Allowed workers ips list for serialization: %s", + workers_ips) + + task_context = Context(context) + formatter_contexts_idx = {} + workers_num = len(workers) + max_jobs_in_queue = workers_num * settings.LCM_DS_NODE_LOAD_COEFF + logger.debug("Max jobs allowed in queue: %s", max_jobs_in_queue) + + start = datetime.datetime.utcnow() + tasks_count = 0 + + try: + self._upload_nailgun_code(job_cluster) + scattered = self._scatter_data(job_cluster, context, workers) + + for tasks_chunk in toolz.partition_all( + settings.LCM_DS_TASKS_PER_JOB, tasks): + + formatter_contexts_for_tasks = {} + + # Collecting required contexts for tasks + for task in tasks_chunk: + node_id, task_data = task + formatter_context = self._get_formatter_context( + task_context, formatter_contexts_idx, node_id) + if node_id not in formatter_contexts_for_tasks: + formatter_contexts_for_tasks[node_id] = \ + formatter_context + + logger.debug("Submitting job for tasks chunk: %s", tasks_chunk) + job = job_cluster.submit( + _distributed_serialize_tasks_for_node, + formatter_contexts_for_tasks, + tasks_chunk, + scattered, + workers=workers_ips + ) + + self.sent_jobs.put(job) + self.sent_jobs_count += 1 + + # We are limit the max number of tasks by the number of nodes + # which are used in the serialization + if self.sent_jobs_count >= max_jobs_in_queue: + for result in self._consume_jobs(chunk_size=workers_num): + tasks_count += 1 + yield result + + # We have no tasks any more but have unconsumed jobs + for result in self._consume_jobs(): + tasks_count += 1 + yield result + finally: + end = datetime.datetime.utcnow() + logger.debug("Distributed tasks processing finished. " + "Total time: %s. Tasks processed: %s", + end - start, tasks_count) + job_cluster.shutdown() + + +def is_distributed_processing_enabled(context): + common = context.new.get('common', {}) + return common.get('serialization_policy') == \ + consts.SERIALIZATION_POLICY.distributed + + +def get_processing_policy(context): + if is_distributed_processing_enabled(context): + return DistributedProcessingPolicy() cpu_num = settings.LCM_SERIALIZERS_CONCURRENCY_FACTOR if not cpu_num: try: @@ -162,7 +471,7 @@ class TransactionSerializer(object): # ids of nodes in this group and how many nodes in this group can fail # and deployment will not be interrupted self.fault_tolerance_groups = [] - self.concurrency_policy = get_concurrency_policy() + self.processing_policy = get_processing_policy(context) @classmethod def serialize(cls, context, tasks, resolver): @@ -216,7 +525,7 @@ class TransactionSerializer(object): :param tasks: the deployment tasks :return the mapping tasks per node """ - serialized = self.concurrency_policy.execute( + serialized = self.processing_policy.execute( self.context, self.serializer_factory_class, self.expand_tasks(tasks) diff --git a/nailgun/nailgun/orchestrator/deployment_serializers.py b/nailgun/nailgun/orchestrator/deployment_serializers.py index d9bf3b6c86..eb40f5a7bd 100644 --- a/nailgun/nailgun/orchestrator/deployment_serializers.py +++ b/nailgun/nailgun/orchestrator/deployment_serializers.py @@ -221,6 +221,7 @@ class DeploymentMultinodeSerializer(object): 'role': role, 'vms_conf': node.vms_conf, 'fail_if_error': role in self.critical_roles, + 'ip': node.ip, # TODO(eli): need to remove, requried for the fake thread only 'online': node.online, } diff --git a/nailgun/nailgun/settings.py b/nailgun/nailgun/settings.py index eee08c94fc..1f9222b9c0 100644 --- a/nailgun/nailgun/settings.py +++ b/nailgun/nailgun/settings.py @@ -38,7 +38,21 @@ class NailgunSettings(object): if test_config: settings_files.append(test_config) - self.config = {} + # If settings.yaml doesn't exist we should have default + # config structure. Nailgun without settings is used + # when we distribute source code to the workers for + # distributed serialization + self.config = { + 'VERSION': {}, + 'DATABASE': { + 'engine': 'postgresql', + 'name': '', + 'host': '', + 'port': '0', + 'user': '', + 'passwd': '' + } + } for sf in settings_files: try: logger.debug("Trying to read config file %s" % sf) @@ -47,9 +61,9 @@ class NailgunSettings(object): logger.error("Error while reading config file %s: %s" % (sf, str(e))) - self.config['VERSION']['api'] = self.config['API'] + self.config['VERSION']['api'] = self.config.get('API') self.config['VERSION']['feature_groups'] = \ - self.config['FEATURE_GROUPS'] + self.config.get('FEATURE_GROUPS') fuel_release = self.get_file_content(consts.FUEL_RELEASE_FILE) if fuel_release: @@ -61,7 +75,7 @@ class NailgunSettings(object): self.config['VERSION']['openstack_version'] = \ fuel_openstack_version - if int(self.config.get("DEVELOPMENT")): + if int(self.config.get("DEVELOPMENT", 0)): logger.info("DEVELOPMENT MODE ON:") here = os.path.abspath( os.path.join(os.path.dirname(__file__), '..') diff --git a/nailgun/nailgun/settings.yaml b/nailgun/nailgun/settings.yaml index 92b35181ee..c2faaa818a 100644 --- a/nailgun/nailgun/settings.yaml +++ b/nailgun/nailgun/settings.yaml @@ -177,6 +177,15 @@ YAQL_MEMORY_QUOTA: 104857600 LCM_CHECK_TASK_VERSION: False +# Coefficient for calculation max jobs queue length. If jobs number reaches the +# len(nodes) * load_coef we stop generate and start consume of jobs. +LCM_DS_NODE_LOAD_COEFF: 2 +# Port of dask-scheduler on the master node +LCM_DS_JOB_SHEDULER_PORT: 8002 +# Size of tasks chunk sending to the distributed worker +LCM_DS_TASKS_PER_JOB: 100 + + DPDK_MAX_CPUS_PER_NIC: 4 TRUNCATE_LOG_ENTRIES: 100 diff --git a/nailgun/nailgun/statistics/fuel_statistics/installation_info.py b/nailgun/nailgun/statistics/fuel_statistics/installation_info.py index 482d8c54e4..d024cddd0b 100644 --- a/nailgun/nailgun/statistics/fuel_statistics/installation_info.py +++ b/nailgun/nailgun/statistics/fuel_statistics/installation_info.py @@ -56,6 +56,16 @@ class InstallationInfo(object): 'propagate_task_deploy', None), WhiteListRule(('common', 'security_groups', 'value'), 'security_groups', None), + WhiteListRule(('common', 'serialization_policy', 'value'), + 'serialization_policy', None), + WhiteListRule(('common', 'ds_use_discover', 'value'), + 'ds_use_discover', None), + WhiteListRule(('common', 'ds_use_provisioned', 'value'), + 'ds_use_provisioned', None), + WhiteListRule(('common', 'ds_use_ready', 'value'), + 'ds_use_ready', None), + WhiteListRule(('common', 'ds_use_error', 'value'), + 'ds_use_error', None), WhiteListRule(('corosync', 'verified', 'value'), 'corosync_verified', None), diff --git a/nailgun/nailgun/test/integration/test_cluster_changes_handler.py b/nailgun/nailgun/test/integration/test_cluster_changes_handler.py index a8661ab816..b8f4e9829a 100644 --- a/nailgun/nailgun/test/integration/test_cluster_changes_handler.py +++ b/nailgun/nailgun/test/integration/test_cluster_changes_handler.py @@ -187,6 +187,7 @@ class TestHandlers(BaseIntegrationTest): 'fail_if_error': is_critical, 'vms_conf': [], 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN), + 'ip': node.ip, 'network_data': { 'eth1': { @@ -603,6 +604,7 @@ class TestHandlers(BaseIntegrationTest): 'online': node.online, 'fail_if_error': is_critical, 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN), + 'ip': node.ip, 'priority': 100, 'vms_conf': [], 'network_scheme': { @@ -1096,6 +1098,7 @@ class TestHandlers(BaseIntegrationTest): 'fail_if_error': is_critical, 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN), 'priority': 100, + 'ip': node.ip, 'vms_conf': [], 'network_scheme': { diff --git a/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py b/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py index 6450e19e0d..9751d556a0 100644 --- a/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py +++ b/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py @@ -14,20 +14,24 @@ # License for the specific language governing permissions and limitations # under the License. +import copy +import exceptions import mock import multiprocessing.dummy from nailgun import consts from nailgun import errors from nailgun import lcm +from nailgun.lcm import TransactionContext +from nailgun.settings import settings +from nailgun.test.base import BaseTestCase from nailgun.utils.resolvers import TagResolver -from nailgun.test.base import BaseUnitTest - -class TestTransactionSerializer(BaseUnitTest): +class TestTransactionSerializer(BaseTestCase): @classmethod def setUpClass(cls): + super(TestTransactionSerializer, cls).setUpClass() cls.tasks = [ { 'id': 'task1', 'roles': ['controller'], @@ -462,3 +466,344 @@ class TestTransactionSerializer(BaseUnitTest): 9, lcm.TransactionSerializer.calculate_fault_tolerance('-1 ', 10) ) + + def _get_context_for_distributed_serialization(self): + new = copy.deepcopy(self.context.new) + new['common']['serialization_policy'] = \ + consts.SERIALIZATION_POLICY.distributed + return TransactionContext(new) + + @mock.patch('nailgun.lcm.transaction_serializer.distributed.wait') + @mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed') + def test_distributed_serialization(self, _, as_completed): + context = self._get_context_for_distributed_serialization() + + with mock.patch( + 'nailgun.lcm.transaction_serializer.distributed.Client' + ) as job_cluster: + job = mock.Mock() + job.result.return_value = [ + (('1', {"id": "task1", "type": "skipped"}), None) + ] + + submit = mock.Mock() + submit.return_value = job + + as_completed.return_value = [job] + + job_cluster.return_value.submit = submit + job_cluster.return_value.scheduler_info.return_value = \ + {'workers': {'tcp://worker': {}}} + + lcm.TransactionSerializer.serialize( + context, self.tasks, self.resolver) + self.assertTrue(submit.called) + # 4 controller task + 1 compute + 1 cinder + self.assertTrue(6, submit.call_count) + + @mock.patch('nailgun.lcm.transaction_serializer.distributed.wait') + @mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed') + @mock.patch('nailgun.lcm.transaction_serializer.' + 'DistributedProcessingPolicy._get_formatter_context') + def test_distributed_serialization_workers_scope(self, formatter_context, + as_completed, _): + context = self._get_context_for_distributed_serialization() + + node_id = '1' + task = { + 'id': 'task1', 'roles': ['controller'], + 'type': 'puppet', 'version': '2.0.0' + } + + with mock.patch( + 'nailgun.lcm.transaction_serializer.distributed.Client' + ) as job_cluster: + + # Mocking job processing + job = mock.Mock() + job.result.return_value = [((node_id, task), None)] + + submit = mock.Mock() + submit.return_value = job + + as_completed.return_value = [job] + + scatter = mock.Mock() + job_cluster.return_value.scatter = scatter + + job_cluster.return_value.scatter.return_value = {} + job_cluster.return_value.submit = submit + + formatter_context.return_value = {node_id: {}} + + # Configuring available workers + job_cluster.return_value.scheduler_info.return_value = \ + { + 'workers': { + 'tcp://{0}'.format(settings.MASTER_IP): {}, + 'tcp://192.168.0.1:33334': {}, + 'tcp://127.0.0.2:33335': {}, + } + } + + # Performing serialization + lcm.TransactionSerializer.serialize( + context, [task], self.resolver + ) + + # Checking data is scattered only to expected workers + scatter.assert_called_once() + scatter.assert_called_with( + {'context': context, 'settings_config': settings.config}, + broadcast=True, + workers=[settings.MASTER_IP] + ) + + # Checking submit job only to expected workers + submit.assert_called_once() + serializer = lcm.transaction_serializer + submit.assert_called_with( + serializer._distributed_serialize_tasks_for_node, + {node_id: formatter_context()}, + ((node_id, task),), + job_cluster().scatter(), + workers=set([settings.MASTER_IP]) + ) + + def test_distributed_serialization_get_allowed_nodes_ips(self): + policy = lcm.transaction_serializer.DistributedProcessingPolicy() + + context_data = { + 'common': { + 'serialization_policy': + consts.SERIALIZATION_POLICY.distributed, + 'ds_use_error': True, + 'ds_use_provisioned': True, + 'ds_use_discover': True, + 'ds_use_ready': False + }, + 'nodes': { + '1': {'status': consts.NODE_STATUSES.error, + 'ip': '10.20.0.3'}, + '2': {'status': consts.NODE_STATUSES.provisioned, + 'ip': '10.20.0.4'}, + '3': {'status': consts.NODE_STATUSES.discover, + 'ip': '10.20.0.5'}, + '4': {'status': consts.NODE_STATUSES.ready, + 'ip': '10.20.0.6'}, + } + } + + actual = policy._get_allowed_nodes_ips( + TransactionContext(context_data)) + self.assertItemsEqual( + [settings.MASTER_IP, '10.20.0.3', '10.20.0.4', '10.20.0.5'], + actual + ) + + def test_distributed_serialization_get_allowed_nodes_statuses(self): + policy = lcm.transaction_serializer.DistributedProcessingPolicy() + context_data = {} + actual = policy._get_allowed_nodes_statuses( + TransactionContext(context_data)) + self.assertItemsEqual([], actual) + + context_data['common'] = { + 'ds_use_discover': False, + 'ds_use_provisioned': False, + 'ds_use_error': False, + 'ds_use_ready': False + } + actual = policy._get_allowed_nodes_statuses( + TransactionContext(context_data)) + self.assertItemsEqual([], actual) + + context_data['common']['ds_use_discover'] = True + actual = policy._get_allowed_nodes_statuses( + TransactionContext(context_data)) + expected = [consts.NODE_STATUSES.discover] + self.assertItemsEqual(expected, actual) + + context_data['common']['ds_use_provisioned'] = True + actual = policy._get_allowed_nodes_statuses( + TransactionContext(context_data)) + expected = [consts.NODE_STATUSES.discover, + consts.NODE_STATUSES.provisioned] + self.assertItemsEqual(expected, actual) + + context_data['common']['ds_use_error'] = True + actual = policy._get_allowed_nodes_statuses( + TransactionContext(context_data)) + expected = [consts.NODE_STATUSES.discover, + consts.NODE_STATUSES.provisioned, + consts.NODE_STATUSES.error] + self.assertItemsEqual(expected, actual) + + context_data['common']['ds_use_ready'] = True + actual = policy._get_allowed_nodes_statuses( + TransactionContext(context_data)) + expected = [consts.NODE_STATUSES.discover, + consts.NODE_STATUSES.provisioned, + consts.NODE_STATUSES.error, + consts.NODE_STATUSES.ready] + self.assertItemsEqual(expected, actual) + + def test_distributed_serialization_get_allowed_workers(self): + policy = lcm.transaction_serializer.DistributedProcessingPolicy() + + with mock.patch( + 'nailgun.lcm.transaction_serializer.distributed.Client' + ) as job_cluster: + job_cluster.scheduler_info.return_value = \ + {'workers': { + 'tcp://10.20.0.2:1': {}, + 'tcp://10.20.0.2:2': {}, + 'tcp://10.20.0.3:1': {}, + 'tcp://10.20.0.3:2': {}, + 'tcp://10.20.0.3:3': {}, + 'tcp://10.20.0.4:1': {}, + 'tcp://10.20.0.5:1': {} + }} + allowed_ips = set(['10.20.0.2', '10.20.0.3', '10.20.0.5']) + + expected = ['10.20.0.2:1', '10.20.0.2:2', '10.20.0.3:1', + '10.20.0.3:2', '10.20.0.3:3', '10.20.0.5:1'] + actual = policy._get_allowed_workers(job_cluster, allowed_ips) + self.assertItemsEqual(expected, actual) + + def test_distributed_serialization_serialize_task(self): + task = { + 'id': 'task1', 'roles': ['controller'], + 'type': 'puppet', 'version': '2.0.0', + 'parameters': { + 'master_ip': '{MN_IP}', + 'host': {'yaql_exp': '$.public_ssl.hostname'}, + 'attr': {'yaql_exp': '$node.attributes.a_str'} + } + } + + formatter_contexts_idx = { + '1': {'MN_IP': '10.0.0.1'}, + '2': {} + } + scattered_data = { + 'settings_config': settings.config, + 'context': self.context + } + + serializer = lcm.transaction_serializer + actual = serializer._distributed_serialize_tasks_for_node( + formatter_contexts_idx, [('1', task), ('2', task)], scattered_data) + + expected = [ + ( + ( + '1', + { + 'id': 'task1', + 'type': 'puppet', + 'parameters': { + 'cwd': '/', + 'master_ip': '10.0.0.1', + 'host': 'localhost', + 'attr': 'text1' + }, + 'fail_on_error': True + } + ), + None + ), + ( + ( + '2', + { + 'id': 'task1', + 'type': 'puppet', + 'parameters': { + 'cwd': '/', + 'master_ip': '{MN_IP}', + 'host': 'localhost', + 'attr': 'text2' + }, + 'fail_on_error': True + } + ), + None + ) + ] + + self.assertItemsEqual(expected, actual) + + def test_distributed_serialization_serialize_task_failure(self): + task = { + 'id': 'task1', 'roles': ['controller'], + 'type': 'puppet', 'version': '2.0.0', + 'parameters': { + 'fake': {'yaql_exp': '$.some.fake_param'} + } + } + + formatter_contexts_idx = {'2': {}} + scattered_data = { + 'settings_config': settings.config, + 'context': self.context + } + + serializer = lcm.transaction_serializer + result = serializer._distributed_serialize_tasks_for_node( + formatter_contexts_idx, [('2', task)], scattered_data) + (_, __), err = result[0] + self.assertIsInstance(err, exceptions.KeyError) + + +class TestConcurrencyPolicy(BaseTestCase): + + @mock.patch( + 'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count', + return_value=1 + ) + def test_one_cpu(self, cpu_count): + policy = lcm.transaction_serializer.get_processing_policy( + lcm.TransactionContext({})) + self.assertIsInstance( + policy, + lcm.transaction_serializer.SingleWorkerConcurrencyPolicy + ) + self.assertTrue(cpu_count.is_called) + + @mock.patch( + 'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count', + return_value=0 + ) + def test_zero_cpu(self, cpu_count): + policy = lcm.transaction_serializer.get_processing_policy( + lcm.TransactionContext({})) + self.assertIsInstance( + policy, + lcm.transaction_serializer.SingleWorkerConcurrencyPolicy + ) + self.assertTrue(cpu_count.is_called) + + @mock.patch( + 'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count', + side_effect=NotImplementedError + ) + def test_cpu_count_not_implemented(self, cpu_count): + policy = lcm.transaction_serializer.get_processing_policy( + lcm.TransactionContext({})) + self.assertIsInstance( + policy, + lcm.transaction_serializer.SingleWorkerConcurrencyPolicy + ) + self.assertTrue(cpu_count.is_called) + + def test_distributed_serialization_enabled_in_cluster(self): + context_data = {'common': { + 'serialization_policy': consts.SERIALIZATION_POLICY.distributed + }} + policy = lcm.transaction_serializer.get_processing_policy( + lcm.TransactionContext(context_data)) + self.assertIsInstance( + policy, + lcm.transaction_serializer.DistributedProcessingPolicy + ) diff --git a/nailgun/requirements.txt b/nailgun/requirements.txt index 96bed25904..c702e8aab1 100644 --- a/nailgun/requirements.txt +++ b/nailgun/requirements.txt @@ -47,3 +47,5 @@ stevedore>=1.5.0 # See: https://bugs.launchpad.net/fuel/+bug/1519727 setuptools<=18.5 yaql>=1.0.0 +# Distributed nodes serialization +distributed==1.16.0