Separate tasks for users by sending them to different queues.

Change-Id: I3a3931571a8614b7c5385d7eeda8879610971573
This commit is contained in:
Xicheng Chang 2016-06-27 20:18:30 -04:00
parent d00035ca0b
commit 70425d96c2
3 changed files with 55 additions and 16 deletions

View File

@ -480,7 +480,10 @@ def del_cluster(
for clusterhost in cluster.clusterhosts
],
delete_underlying_host
)
),
queue=user.email,
exchange=user.email,
routing_key=user.email
)
return {
'status': 'delete action is sent',
@ -1183,7 +1186,10 @@ def _del_cluster_host(
(
user.email, clusterhost.cluster_id, clusterhost.host_id,
delete_underlying_host
)
),
queue=user.email,
exchange=user.email,
routing_key=user.email
)
return {
'status': 'delete action sent',
@ -1854,7 +1860,10 @@ def deploy_cluster(
(
user.email, cluster_id,
[clusterhost.host_id for clusterhost in clusterhosts]
)
),
queue=user.email,
exchange=user.email,
routing_key=user.email
)
return {
'status': 'deploy action sent',
@ -1918,7 +1927,10 @@ def redeploy_cluster(
'compass.tasks.redeploy_cluster',
(
user.email, cluster_id
)
),
queue=user.email,
exchange=user.email,
routing_key=user.email
)
return {
'status': 'redeploy action sent',
@ -1945,7 +1957,10 @@ def patch_cluster(cluster_id, user=None, session=None, **kwargs):
'compass.tasks.patch_cluster',
(
user.email, cluster_id,
)
),
queue=user.email,
exchange=user.email,
routing_key=user.email
)
return {
'status': 'patch action sent',
@ -2046,7 +2061,7 @@ def update_cluster_host_state(
def _update_clusterhost_state(
clusterhost, from_database_only=False,
session=None, **kwargs
session=None, user=None, **kwargs
):
"""Update clusterhost state.
@ -2093,7 +2108,10 @@ def _update_clusterhost_state(
(
clusterhost.cluster_id, clusterhost.host_id,
cluster_ready, host_ready
)
),
queue=user.email,
exchange=user.email,
routing_key=user.email
)
status = '%s: cluster ready %s host ready %s' % (
clusterhost.name, cluster_ready, host_ready
@ -2126,7 +2144,7 @@ def update_cluster_host_state_internal(
)
return _update_clusterhost_state(
clusterhost, from_database_only=from_database_only,
session=session, **kwargs
session=session, users=user, **kwargs
)
@ -2169,7 +2187,7 @@ def update_clusterhost_state_internal(
clusterhost = _get_clusterhost(clusterhost_id, session=session)
return _update_clusterhost_state(
clusterhost, from_database_only=from_database_only,
session=session, **kwargs
session=session, user=user, **kwargs
)
@ -2243,7 +2261,10 @@ def update_cluster_state_internal(
from compass.tasks import client as celery_client
celery_client.celery.send_task(
'compass.tasks.cluster_installed',
(clusterhost.cluster_id, clusterhost_ready)
(clusterhost.cluster_id, clusterhost_ready),
queue=user.email,
exchange=user.email,
routing_key=user.email
)
status = '%s installed action set clusterhost ready %s' % (
cluster.name, clusterhost_ready

View File

@ -179,7 +179,10 @@ def start_check_cluster_health(cluster_id, send_report_url,
from compass.tasks import client as celery_client
celery_client.celery.send_task(
'compass.tasks.cluster_health',
(cluster.id, send_report_url, user.email)
(cluster.id, send_report_url, user.email),
queue=user.email,
exchange=user.email,
routing_key=user.email
)
return {
"cluster_id": cluster.id,

View File

@ -419,7 +419,10 @@ def del_host(
'compass.tasks.delete_host',
(
user.email, host.id, cluster_ids
)
),
queue=user.email,
exchange=user.email,
routing_key=user.email
)
return {
'status': 'delete action sent',
@ -918,7 +921,10 @@ def update_host_state_internal(
(
host.id, clusterhosts_ready,
clusters_os_ready
)
),
queue=user.email,
exchange=user.email,
routing_key=user.email
)
status = '%s: clusterhosts ready %s clusters os ready %s' % (
host.name, clusterhosts_ready, clusters_os_ready
@ -1013,7 +1019,10 @@ def poweron_host(
check_host_validated(host)
celery_client.celery.send_task(
'compass.tasks.poweron_host',
(host.id,)
(host.id,),
queue=user.email,
exchange=user.email,
routing_key=user.email
)
return {
'status': 'poweron %s action sent' % host.name,
@ -1039,7 +1048,10 @@ def poweroff_host(
check_host_validated(host)
celery_client.celery.send_task(
'compass.tasks.poweroff_host',
(host.id,)
(host.id,),
queue=user.email,
exchange=user.email,
routing_key=user.email
)
return {
'status': 'poweroff %s action sent' % host.name,
@ -1065,7 +1077,10 @@ def reset_host(
check_host_validated(host)
celery_client.celery.send_task(
'compass.tasks.reset_host',
(host.id,)
(host.id,),
queue=user.email,
exchange=user.email,
routing_key=user.email
)
return {
'status': 'reset %s action sent' % host.name,