Listen for events and forward to external security scanning services.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

amqp.go 2.5KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package main
  2. /*
  3. amqp - This file includes all of the logic necessary to interact with the amqp
  4. library. This is extrapolated out so that a AmqpInterface interface can be
  5. passed to functions. Doing this allows testing by mock classes to be created
  6. that can be passed to functions.
  7. Since this is a wrapper around the amqp library, this does not need testing.
  8. */
  9. import (
  10. "fmt"
  11. "log"
  12. "github.com/streadway/amqp"
  13. )
  14. // AmqpActioner is an interface for an AmqpActions class. Having
  15. // this as an interface allows us to pass in a dummy class for testing that
  16. // just returns mocked data.
  17. type AmqpActioner interface {
  18. Connect() (<-chan amqp.Delivery, error)
  19. }
  20. // AmqpActions is a class that handles all interactions directly with Amqp.
  21. // See the comment on AmqpActioner for rationale.
  22. type AmqpActions struct {
  23. Incoming *<-chan amqp.Delivery
  24. Options AmqpOptions
  25. AmqpConnection *amqp.Connection
  26. AmqpChannel *amqp.Channel
  27. NotifyError chan *amqp.Error
  28. }
  29. // AmqpOptions is a class to convey all of the configurable options for the
  30. // AmqpActions class.
  31. type AmqpOptions struct {
  32. RabbitURI string
  33. }
  34. // Connect initiates the initial connection to the AMQP.
  35. func (s *AmqpActions) Connect() (<-chan amqp.Delivery, chan *amqp.Error, error) {
  36. var err error
  37. s.AmqpConnection, err = amqp.Dial(s.Options.RabbitURI)
  38. if err != nil {
  39. return nil, nil, fmt.Errorf("Failed to connect to RabbitMQ: %s", err)
  40. }
  41. s.NotifyError = s.AmqpConnection.NotifyClose(make(chan *amqp.Error)) //error channel
  42. s.AmqpChannel, err = s.AmqpConnection.Channel()
  43. if err != nil {
  44. return nil, nil, fmt.Errorf("Failed to open a channel: %s", err)
  45. }
  46. amqpQueue, err := s.AmqpChannel.QueueDeclare(
  47. "notifications.info", // name
  48. false, // durable
  49. false, // delete when usused
  50. false, // exclusive
  51. false, // no-wait
  52. nil, // arguments
  53. )
  54. if err != nil {
  55. return nil, nil, fmt.Errorf("Failed to declare a queue: %s", err)
  56. }
  57. amqpIncoming, err := s.AmqpChannel.Consume(
  58. amqpQueue.Name, // queue
  59. "osel", // consumer
  60. true, // auto-ack
  61. false, // exclusive
  62. false, // no-local
  63. false, // no-wait
  64. nil, // args
  65. )
  66. if err != nil {
  67. return nil, nil, fmt.Errorf("Failed to register a consumer: %s", err)
  68. }
  69. s.Incoming = &amqpIncoming
  70. return amqpIncoming, s.NotifyError, nil
  71. }
  72. // Close closes connections
  73. func (s AmqpActions) Close() {
  74. log.Println("Closing AMQP connection")
  75. s.AmqpConnection.Close()
  76. s.AmqpChannel.Close()
  77. }