Listen for events and forward to external security scanning services.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

processing.go 3.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "net"
  6. "time"
  7. "github.com/streadway/amqp"
  8. )
  9. func processWaitingEvent(delivery amqp.Delivery, openstackActions OpenStackActioner) (Event, error) {
  10. // executes when an event is waiting
  11. event, err := ParseEvent(delivery.Body)
  12. if err != nil {
  13. return Event{}, fmt.Errorf("Failed to parse event due to error: %s", err)
  14. }
  15. if event.Processor == nil {
  16. if !Debug {
  17. return Event{}, nil
  18. }
  19. return Event{}, fmt.Errorf("Ignoring event type %s", event.EventData.EventType)
  20. }
  21. if Debug {
  22. log.Printf("Processing event type %s\n", event.EventData.EventType)
  23. }
  24. err = event.Processor.FillExtraData(&event, openstackActions)
  25. if err != nil {
  26. return Event{}, fmt.Errorf("Error fetching extra data: %s", err)
  27. }
  28. return event, nil
  29. }
  30. func logEvents(events []Event, logger SyslogActioner, qualys QualysActioner) {
  31. var ipAddresses []string
  32. var qualysIPAddresses []string
  33. if Debug {
  34. log.Println("Timer Expired")
  35. }
  36. // De-dupe IP addresses and get them into a single struct
  37. dedupIPAddresses := make(map[string]struct{})
  38. for _, event := range events {
  39. for _, IPs := range event.IPs {
  40. for _, IP := range IPs {
  41. if _, ok := dedupIPAddresses[IP]; !ok {
  42. ipAddresses = append(ipAddresses, IP)
  43. }
  44. dedupIPAddresses[IP] = struct{}{}
  45. }
  46. }
  47. }
  48. // Disregard the scan if no targets have been found
  49. if len(ipAddresses) == 0 {
  50. if Debug {
  51. log.Println("Nothing to scan, skipping...")
  52. }
  53. return
  54. }
  55. // Remove IPv6 addresses
  56. if qualys.DropIPv6() {
  57. for ipAddressIndex := range ipAddresses {
  58. testIPAddress := ipAddresses[ipAddressIndex]
  59. if net.ParseIP(testIPAddress).To4() != nil {
  60. qualysIPAddresses = append(qualysIPAddresses, testIPAddress)
  61. } else {
  62. log.Println("Disregarded IPv6 address", testIPAddress)
  63. }
  64. }
  65. }
  66. // Execute Qualys scan
  67. log.Println("Qualys Scan Starting")
  68. scanID, scanError := qualys.InitiateScan(qualysIPAddresses)
  69. log.Printf("Qualys Scan Complete: scan ID='%s'; scan_error='%v'", scanID, scanError)
  70. // Iterate through entries and format the logs
  71. log.Printf("Processing %d events\n", len(events))
  72. for _, event := range events {
  73. event.QualysScanID = scanID
  74. if scanError != nil {
  75. event.QualysScanError = scanError.Error()
  76. }
  77. event.LogLines, _ = event.Processor.FormatLogs(&event, qualysIPAddresses)
  78. // Output the logs
  79. log.Printf("Processing %d loglines\n", len(event.LogLines))
  80. for lineToLog := range event.LogLines {
  81. logger.Info(event.LogLines[lineToLog])
  82. }
  83. }
  84. }
  85. func mainLoop(batchInterval time.Duration, deliveries <-chan amqp.Delivery, amqpNotifyError chan *amqp.Error, openstackActions OpenStackActioner, logger SyslogActioner, qualys QualysActioner) {
  86. var events []Event
  87. ticker := time.NewTicker(batchInterval)
  88. amqpReconnectTimer := time.NewTimer(1)
  89. for {
  90. select {
  91. case e := <-deliveries:
  92. event, err := processWaitingEvent(e, openstackActions)
  93. if err != nil {
  94. log.Printf("Event skipped: %s\n", err)
  95. continue
  96. }
  97. events = append(events, event)
  98. case <-ticker.C:
  99. logEvents(events, logger, qualys)
  100. events = nil
  101. case err := <-amqpNotifyError:
  102. // Reinitialize AMQP on connection error
  103. log.Printf("AMQP connection error: %s\n", err)
  104. amqpReconnectTimer = time.NewTimer(time.Second * 30)
  105. case <-amqpReconnectTimer.C:
  106. var err error
  107. amqpBus := new(AmqpActions)
  108. amqpBus.Options = AmqpOptions{
  109. RabbitURI: rabbitURI,
  110. }
  111. deliveries, amqpNotifyError, err = amqpBus.Connect()
  112. if err != nil {
  113. log.Printf("AMQP retry connection error: %s\n", err)
  114. amqpReconnectTimer = time.NewTimer(time.Second * 30)
  115. } else {
  116. log.Printf("AMQP reconnected\n")
  117. }
  118. }
  119. }
  120. }