diff --git a/compass/db/api/cluster.py b/compass/db/api/cluster.py index 9fc00402..c3e4e044 100644 --- a/compass/db/api/cluster.py +++ b/compass/db/api/cluster.py @@ -480,7 +480,10 @@ def del_cluster( for clusterhost in cluster.clusterhosts ], delete_underlying_host - ) + ), + queue=user.email, + exchange=user.email, + routing_key=user.email ) return { 'status': 'delete action is sent', @@ -1183,7 +1186,10 @@ def _del_cluster_host( ( user.email, clusterhost.cluster_id, clusterhost.host_id, delete_underlying_host - ) + ), + queue=user.email, + exchange=user.email, + routing_key=user.email ) return { 'status': 'delete action sent', @@ -1854,7 +1860,10 @@ def deploy_cluster( ( user.email, cluster_id, [clusterhost.host_id for clusterhost in clusterhosts] - ) + ), + queue=user.email, + exchange=user.email, + routing_key=user.email ) return { 'status': 'deploy action sent', @@ -1918,7 +1927,10 @@ def redeploy_cluster( 'compass.tasks.redeploy_cluster', ( user.email, cluster_id - ) + ), + queue=user.email, + exchange=user.email, + routing_key=user.email ) return { 'status': 'redeploy action sent', @@ -1945,7 +1957,10 @@ def patch_cluster(cluster_id, user=None, session=None, **kwargs): 'compass.tasks.patch_cluster', ( user.email, cluster_id, - ) + ), + queue=user.email, + exchange=user.email, + routing_key=user.email ) return { 'status': 'patch action sent', @@ -2046,7 +2061,7 @@ def update_cluster_host_state( def _update_clusterhost_state( clusterhost, from_database_only=False, - session=None, **kwargs + session=None, user=None, **kwargs ): """Update clusterhost state. @@ -2093,7 +2108,10 @@ def _update_clusterhost_state( ( clusterhost.cluster_id, clusterhost.host_id, cluster_ready, host_ready - ) + ), + queue=user.email, + exchange=user.email, + routing_key=user.email ) status = '%s: cluster ready %s host ready %s' % ( clusterhost.name, cluster_ready, host_ready @@ -2126,7 +2144,7 @@ def update_cluster_host_state_internal( ) return _update_clusterhost_state( clusterhost, from_database_only=from_database_only, - session=session, **kwargs + session=session, users=user, **kwargs ) @@ -2169,7 +2187,7 @@ def update_clusterhost_state_internal( clusterhost = _get_clusterhost(clusterhost_id, session=session) return _update_clusterhost_state( clusterhost, from_database_only=from_database_only, - session=session, **kwargs + session=session, user=user, **kwargs ) @@ -2243,7 +2261,10 @@ def update_cluster_state_internal( from compass.tasks import client as celery_client celery_client.celery.send_task( 'compass.tasks.cluster_installed', - (clusterhost.cluster_id, clusterhost_ready) + (clusterhost.cluster_id, clusterhost_ready), + queue=user.email, + exchange=user.email, + routing_key=user.email ) status = '%s installed action set clusterhost ready %s' % ( cluster.name, clusterhost_ready diff --git a/compass/db/api/health_check_report.py b/compass/db/api/health_check_report.py index 562008c9..aaea7a7c 100644 --- a/compass/db/api/health_check_report.py +++ b/compass/db/api/health_check_report.py @@ -179,7 +179,10 @@ def start_check_cluster_health(cluster_id, send_report_url, from compass.tasks import client as celery_client celery_client.celery.send_task( 'compass.tasks.cluster_health', - (cluster.id, send_report_url, user.email) + (cluster.id, send_report_url, user.email), + queue=user.email, + exchange=user.email, + routing_key=user.email ) return { "cluster_id": cluster.id, diff --git a/compass/db/api/host.py b/compass/db/api/host.py index c82f3f8c..5b079037 100644 --- a/compass/db/api/host.py +++ b/compass/db/api/host.py @@ -419,7 +419,10 @@ def del_host( 'compass.tasks.delete_host', ( user.email, host.id, cluster_ids - ) + ), + queue=user.email, + exchange=user.email, + routing_key=user.email ) return { 'status': 'delete action sent', @@ -918,7 +921,10 @@ def update_host_state_internal( ( host.id, clusterhosts_ready, clusters_os_ready - ) + ), + queue=user.email, + exchange=user.email, + routing_key=user.email ) status = '%s: clusterhosts ready %s clusters os ready %s' % ( host.name, clusterhosts_ready, clusters_os_ready @@ -1013,7 +1019,10 @@ def poweron_host( check_host_validated(host) celery_client.celery.send_task( 'compass.tasks.poweron_host', - (host.id,) + (host.id,), + queue=user.email, + exchange=user.email, + routing_key=user.email ) return { 'status': 'poweron %s action sent' % host.name, @@ -1039,7 +1048,10 @@ def poweroff_host( check_host_validated(host) celery_client.celery.send_task( 'compass.tasks.poweroff_host', - (host.id,) + (host.id,), + queue=user.email, + exchange=user.email, + routing_key=user.email ) return { 'status': 'poweroff %s action sent' % host.name, @@ -1065,7 +1077,10 @@ def reset_host( check_host_validated(host) celery_client.celery.send_task( 'compass.tasks.reset_host', - (host.id,) + (host.id,), + queue=user.email, + exchange=user.email, + routing_key=user.email ) return { 'status': 'reset %s action sent' % host.name,