main.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package main
  2. import (
  3. "encoding/json"
  4. "flag"
  5. "log"
  6. "os"
  7. "time"
  8. amqp "github.com/rabbitmq/amqp091-go"
  9. )
  10. type Transaction map[string]interface{}
  11. func main() {
  12. var (
  13. amqpURL = flag.String("amqp", "amqp://admin:ward-tootled-wick-intrados@localhost:5672/", "AMQP URL")
  14. queueName = flag.String("queue", "ingestion", "RabbitMQ queue to consume")
  15. persistF = flag.String("persist-file", os.Getenv("PERSIST_FILE"), "file to append persisted transactions")
  16. )
  17. flag.Parse()
  18. // set PERSIST_FILE from flag if provided
  19. if *persistF != "" {
  20. os.Setenv("PERSIST_FILE", *persistF)
  21. }
  22. // start AMQP consumer (blocking)
  23. for {
  24. err := consumeAMQP(*amqpURL, *queueName)
  25. if err != nil {
  26. log.Printf("AMQP consumer error: %v; retrying in 5s", err)
  27. time.Sleep(5 * time.Second)
  28. continue
  29. }
  30. break
  31. }
  32. }
  33. func consumeAMQP(amqpURL, queue string) error {
  34. conn, err := amqp.Dial(amqpURL)
  35. if err != nil {
  36. return err
  37. }
  38. defer conn.Close()
  39. ch, err := conn.Channel()
  40. if err != nil {
  41. return err
  42. }
  43. defer ch.Close()
  44. _, err = ch.QueueDeclare(
  45. queue,
  46. true,
  47. false,
  48. false,
  49. false,
  50. nil,
  51. )
  52. if err != nil {
  53. return err
  54. }
  55. msgs, err := ch.Consume(
  56. queue,
  57. "",
  58. false, // autoAck false
  59. false,
  60. false,
  61. false,
  62. nil,
  63. )
  64. if err != nil {
  65. return err
  66. }
  67. log.Printf("AMQP consumer started on queue %s", queue)
  68. for d := range msgs {
  69. var t Transaction
  70. if err := json.Unmarshal(d.Body, &t); err != nil {
  71. log.Printf("failed to unmarshal message: %v", err)
  72. d.Nack(false, false)
  73. continue
  74. }
  75. // persist same as HTTP handler: append to file if set
  76. if path := os.Getenv("PERSIST_FILE"); path != "" {
  77. f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
  78. if err != nil {
  79. log.Printf("failed to open persist file: %v", err)
  80. } else {
  81. f.Write(d.Body)
  82. f.Write([]byte("\n"))
  83. f.Close()
  84. }
  85. }
  86. d.Ack(false)
  87. }
  88. return nil
  89. }