ingestion.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package main
  2. import (
  3. "encoding/json"
  4. "flag"
  5. "fmt"
  6. "log"
  7. "os"
  8. amqp "github.com/rabbitmq/amqp091-go"
  9. "github.com/tealeg/xlsx"
  10. )
  11. func failOnError(err error, msg string) {
  12. if err != nil {
  13. log.Fatalf("%s: %v", msg, err)
  14. }
  15. }
  16. func main() {
  17. var (
  18. filePath = flag.String("file", "data.xlsx", "path to xlsx file")
  19. amqpURL = flag.String("amqp", "amqp://admin:ward-tootled-wick-intrados@localhost:5672/", "AMQP URL")
  20. queueName = flag.String("queue", "ingestion", "RabbitMQ queue name")
  21. sheetName = flag.String("sheet", "", "Sheet name (default: first sheet)")
  22. headerRow = flag.Bool("header", true, "treat first row as header keys")
  23. )
  24. flag.Parse()
  25. if _, err := os.Stat(*filePath); os.IsNotExist(err) {
  26. log.Fatalf("file not found: %s", *filePath)
  27. }
  28. xlFile, err := xlsx.OpenFile(*filePath)
  29. failOnError(err, "opening xlsx")
  30. var sh *xlsx.Sheet
  31. if *sheetName == "" {
  32. if len(xlFile.Sheets) == 0 {
  33. log.Fatalf("no sheets in workbook")
  34. }
  35. sh = xlFile.Sheets[0]
  36. } else {
  37. for _, s := range xlFile.Sheets {
  38. if s.Name == *sheetName {
  39. sh = s
  40. break
  41. }
  42. }
  43. if sh == nil {
  44. log.Fatalf("sheet %s not found", *sheetName)
  45. }
  46. }
  47. if len(sh.Rows) == 0 {
  48. log.Fatalf("sheet %s is empty", sh.Name)
  49. }
  50. // convert rows to [][]string
  51. rows := make([][]string, 0, len(sh.Rows))
  52. for _, r := range sh.Rows {
  53. cols := make([]string, 0, len(r.Cells))
  54. for _, c := range r.Cells {
  55. cols = append(cols, c.String())
  56. }
  57. rows = append(rows, cols)
  58. }
  59. conn, err := amqp.Dial(*amqpURL)
  60. failOnError(err, "connecting to amqp")
  61. defer conn.Close()
  62. ch, err := conn.Channel()
  63. failOnError(err, "opening channel")
  64. defer ch.Close()
  65. q, err := ch.QueueDeclare(
  66. *queueName,
  67. true,
  68. false,
  69. false,
  70. false,
  71. nil,
  72. )
  73. failOnError(err, "declaring queue")
  74. var headers []string
  75. start := 0
  76. if *headerRow {
  77. headers = rows[0]
  78. start = 1
  79. }
  80. sent := 0
  81. for i := start; i < len(rows); i++ {
  82. row := rows[i]
  83. var body []byte
  84. if *headerRow {
  85. m := make(map[string]string, len(headers))
  86. for j, h := range headers {
  87. val := ""
  88. if j < len(row) {
  89. val = row[j]
  90. }
  91. m[h] = val
  92. }
  93. body, err = json.Marshal(m)
  94. if err != nil {
  95. log.Printf("skipping row %d: marshal error: %v", i+1, err)
  96. continue
  97. }
  98. } else {
  99. body, err = json.Marshal(row)
  100. if err != nil {
  101. log.Printf("skipping row %d: marshal error: %v", i+1, err)
  102. continue
  103. }
  104. }
  105. err = ch.Publish(
  106. "", // exchange
  107. q.Name, // routing key = queue
  108. false,
  109. false,
  110. amqp.Publishing{
  111. ContentType: "application/json",
  112. Body: body,
  113. },
  114. )
  115. if err != nil {
  116. log.Printf("failed to publish row %d: %v", i+1, err)
  117. continue
  118. }
  119. sent++
  120. }
  121. fmt.Printf("sent %d messages from %s to queue %s\n", sent, *filePath, q.Name)
  122. }