VM-side agent for Murano
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

RabbitMqClient.cs 4.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Configuration;
  4. using System.Linq;
  5. using System.Net;
  6. using System.Net.Security;
  7. using System.Security.Authentication;
  8. using System.Security.Cryptography.X509Certificates;
  9. using System.Text;
  10. using System.Threading.Tasks;
  11. using NLog;
  12. using RabbitMQ.Client;
  13. namespace Mirantis.Murano.WindowsAgent
  14. {
  15. class RabbitMqClient : IDisposable
  16. {
  17. private static readonly Logger Log = LogManager.GetCurrentClassLogger();
  18. private static readonly ConnectionFactory connectionFactory;
  19. private IConnection currentConnecton;
  20. static RabbitMqClient()
  21. {
  22. var ssl = new SslOption {
  23. Enabled = bool.Parse(ConfigurationManager.AppSettings["rabbitmq.ssl"] ?? "false"),
  24. Version = SslProtocols.Default,
  25. AcceptablePolicyErrors = bool.Parse(ConfigurationManager.AppSettings["rabbitmq.allowInvalidCA"] ?? "true") ?
  26. SslPolicyErrors.RemoteCertificateNameMismatch : SslPolicyErrors.None
  27. };
  28. var sslServerName = ConfigurationManager.AppSettings["rabbitmq.sslServerName"] ?? "";
  29. ssl.ServerName = sslServerName;
  30. if (String.IsNullOrWhiteSpace(sslServerName))
  31. {
  32. ssl.AcceptablePolicyErrors |= SslPolicyErrors.RemoteCertificateNameMismatch;
  33. }
  34. connectionFactory = new ConnectionFactory {
  35. HostName = ConfigurationManager.AppSettings["rabbitmq.host"] ?? "localhost",
  36. UserName = ConfigurationManager.AppSettings["rabbitmq.user"] ?? "guest",
  37. Password = ConfigurationManager.AppSettings["rabbitmq.password"] ??"guest",
  38. Protocol = Protocols.DefaultProtocol,
  39. VirtualHost = ConfigurationManager.AppSettings["rabbitmq.vhost"] ?? "/",
  40. Port = int.Parse(ConfigurationManager.AppSettings["rabbitmq.port"] ?? "5672"),
  41. RequestedHeartbeat = 10,
  42. Ssl = ssl
  43. };
  44. }
  45. public RabbitMqClient()
  46. {
  47. }
  48. public MqMessage GetMessage()
  49. {
  50. var queueName = ConfigurationManager.AppSettings["rabbitmq.inputQueue"] ?? Dns.GetHostName().ToLower();
  51. try
  52. {
  53. IConnection connection = null;
  54. lock (this)
  55. {
  56. connection = this.currentConnecton = this.currentConnecton ?? connectionFactory.CreateConnection();
  57. }
  58. var session = connection.CreateModel();
  59. session.BasicQos(0, 1, false);
  60. //session.QueueDeclare(queueName, true, false, false, null);
  61. var consumer = new QueueingBasicConsumer(session);
  62. var consumeTag = session.BasicConsume(queueName, false, consumer);
  63. var e = (RabbitMQ.Client.Events.BasicDeliverEventArgs) consumer.Queue.Dequeue();
  64. Action ackFunc = delegate {
  65. session.BasicAck(e.DeliveryTag, false);
  66. session.BasicCancel(consumeTag);
  67. session.Close();
  68. };
  69. return new MqMessage(ackFunc) {
  70. Body = Encoding.UTF8.GetString(e.Body),
  71. Id = e.BasicProperties.MessageId
  72. };
  73. }
  74. catch (Exception exception)
  75. {
  76. Dispose();
  77. throw;
  78. }
  79. }
  80. public void SendResult(MqMessage message)
  81. {
  82. var exchangeName = ConfigurationManager.AppSettings["rabbitmq.resultExchange"] ?? "";
  83. var resultRoutingKey = ConfigurationManager.AppSettings["rabbitmq.resultRoutingKey"] ?? "-execution-results";
  84. bool durable = bool.Parse(ConfigurationManager.AppSettings["rabbitmq.durableMessages"] ?? "true");
  85. try
  86. {
  87. IConnection connection = null;
  88. lock (this)
  89. {
  90. connection = this.currentConnecton = this.currentConnecton ?? connectionFactory.CreateConnection();
  91. }
  92. var session = connection.CreateModel();
  93. /*if (!string.IsNullOrEmpty(resultQueue))
  94. {
  95. //session.QueueDeclare(resultQueue, true, false, false, null);
  96. if (!string.IsNullOrEmpty(exchangeName))
  97. {
  98. session.ExchangeBind(exchangeName, resultQueue, resultQueue);
  99. }
  100. }*/
  101. var basicProperties = session.CreateBasicProperties();
  102. basicProperties.SetPersistent(durable);
  103. basicProperties.MessageId = message.Id;
  104. basicProperties.ContentType = "application/json";
  105. session.BasicPublish(exchangeName, resultRoutingKey, basicProperties, Encoding.UTF8.GetBytes(message.Body));
  106. session.Close();
  107. }
  108. catch (Exception)
  109. {
  110. Dispose();
  111. throw;
  112. }
  113. }
  114. public void Dispose()
  115. {
  116. lock (this)
  117. {
  118. try
  119. {
  120. if (this.currentConnecton != null)
  121. {
  122. this.currentConnecton.Close();
  123. }
  124. }
  125. catch
  126. {
  127. }
  128. finally
  129. {
  130. this.currentConnecton = null;
  131. }
  132. }
  133. }
  134. }
  135. }