osel/amqp.go

93 lines
2.5 KiB
Go

package main
/*
amqp - This file includes all of the logic necessary to interact with the amqp
library. This is extrapolated out so that a AmqpInterface interface can be
passed to functions. Doing this allows testing by mock classes to be created
that can be passed to functions.
Since this is a wrapper around the amqp library, this does not need testing.
*/
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
// AmqpActioner is an interface for an AmqpActions class. Having
// this as an interface allows us to pass in a dummy class for testing that
// just returns mocked data.
type AmqpActioner interface {
Connect() (<-chan amqp.Delivery, error)
}
// AmqpActions is a class that handles all interactions directly with Amqp.
// See the comment on AmqpActioner for rationale.
type AmqpActions struct {
Incoming *<-chan amqp.Delivery
Options AmqpOptions
AmqpConnection *amqp.Connection
AmqpChannel *amqp.Channel
NotifyError chan *amqp.Error
}
// AmqpOptions is a class to convey all of the configurable options for the
// AmqpActions class.
type AmqpOptions struct {
RabbitURI string
}
// Connect initiates the initial connection to the AMQP.
func (s *AmqpActions) Connect() (<-chan amqp.Delivery, chan *amqp.Error, error) {
var err error
s.AmqpConnection, err = amqp.Dial(s.Options.RabbitURI)
if err != nil {
return nil, nil, fmt.Errorf("Failed to connect to RabbitMQ: %s", err)
}
s.NotifyError = s.AmqpConnection.NotifyClose(make(chan *amqp.Error)) //error channel
s.AmqpChannel, err = s.AmqpConnection.Channel()
if err != nil {
return nil, nil, fmt.Errorf("Failed to open a channel: %s", err)
}
amqpQueue, err := s.AmqpChannel.QueueDeclare(
"notifications.info", // name
false, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, nil, fmt.Errorf("Failed to declare a queue: %s", err)
}
amqpIncoming, err := s.AmqpChannel.Consume(
amqpQueue.Name, // queue
"osel", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return nil, nil, fmt.Errorf("Failed to register a consumer: %s", err)
}
s.Incoming = &amqpIncoming
return amqpIncoming, s.NotifyError, nil
}
// Close closes connections
func (s AmqpActions) Close() {
log.Println("Closing AMQP connection")
s.AmqpConnection.Close()
s.AmqpChannel.Close()
}