93 lines
2.5 KiB
Go
93 lines
2.5 KiB
Go
package main
|
|
|
|
/*
|
|
|
|
amqp - This file includes all of the logic necessary to interact with the amqp
|
|
library. This is extrapolated out so that a AmqpInterface interface can be
|
|
passed to functions. Doing this allows testing by mock classes to be created
|
|
that can be passed to functions.
|
|
|
|
Since this is a wrapper around the amqp library, this does not need testing.
|
|
|
|
*/
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
|
|
"github.com/streadway/amqp"
|
|
)
|
|
|
|
// AmqpActioner is an interface for an AmqpActions class. Having
|
|
// this as an interface allows us to pass in a dummy class for testing that
|
|
// just returns mocked data.
|
|
type AmqpActioner interface {
|
|
Connect() (<-chan amqp.Delivery, error)
|
|
}
|
|
|
|
// AmqpActions is a class that handles all interactions directly with Amqp.
|
|
// See the comment on AmqpActioner for rationale.
|
|
type AmqpActions struct {
|
|
Incoming *<-chan amqp.Delivery
|
|
Options AmqpOptions
|
|
AmqpConnection *amqp.Connection
|
|
AmqpChannel *amqp.Channel
|
|
NotifyError chan *amqp.Error
|
|
}
|
|
|
|
// AmqpOptions is a class to convey all of the configurable options for the
|
|
// AmqpActions class.
|
|
type AmqpOptions struct {
|
|
RabbitURI string
|
|
}
|
|
|
|
// Connect initiates the initial connection to the AMQP.
|
|
func (s *AmqpActions) Connect() (<-chan amqp.Delivery, chan *amqp.Error, error) {
|
|
var err error
|
|
|
|
s.AmqpConnection, err = amqp.Dial(s.Options.RabbitURI)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("Failed to connect to RabbitMQ: %s", err)
|
|
}
|
|
s.NotifyError = s.AmqpConnection.NotifyClose(make(chan *amqp.Error)) //error channel
|
|
|
|
s.AmqpChannel, err = s.AmqpConnection.Channel()
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("Failed to open a channel: %s", err)
|
|
}
|
|
|
|
amqpQueue, err := s.AmqpChannel.QueueDeclare(
|
|
"notifications.info", // name
|
|
false, // durable
|
|
false, // delete when usused
|
|
false, // exclusive
|
|
false, // no-wait
|
|
nil, // arguments
|
|
)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("Failed to declare a queue: %s", err)
|
|
}
|
|
|
|
amqpIncoming, err := s.AmqpChannel.Consume(
|
|
amqpQueue.Name, // queue
|
|
"osel", // consumer
|
|
true, // auto-ack
|
|
false, // exclusive
|
|
false, // no-local
|
|
false, // no-wait
|
|
nil, // args
|
|
)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("Failed to register a consumer: %s", err)
|
|
}
|
|
s.Incoming = &amqpIncoming
|
|
return amqpIncoming, s.NotifyError, nil
|
|
}
|
|
|
|
// Close closes connections
|
|
func (s AmqpActions) Close() {
|
|
log.Println("Closing AMQP connection")
|
|
s.AmqpConnection.Close()
|
|
s.AmqpChannel.Close()
|
|
}
|