test_mqtt.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. #!/usr/bin/env python3
  2. """
  3. Publish a test MQTT message to the RabbitMQ MQTT plugin.
  4. Usage:
  5. ./test_mqtt.py --host rabbitmq.irdi.eu --topic test/topic --message hello
  6. The script returns exit code 0 on success, non-zero on failure.
  7. """
  8. import argparse
  9. import socket
  10. import time
  11. import sys
  12. import paho.mqtt.client as mqtt
  13. import socket
  14. try:
  15. import pika
  16. except Exception:
  17. pika = None
  18. def main():
  19. p = argparse.ArgumentParser(description="MQTT publish test")
  20. p.add_argument("--host", default="rabbitmq.irdi.eu", help="MQTT host")
  21. p.add_argument("--port", type=int, default=1883, help="MQTT port")
  22. p.add_argument("--topic", default="test/topic", help="Topic to publish to")
  23. p.add_argument("--message", default="hello from test", help="Message payload")
  24. p.add_argument("--timeout", type=int, default=10, help="Connect timeout seconds")
  25. p.add_argument("--username", default=None, help="Username for MQTT (optional)")
  26. p.add_argument("--password", default=None, help="Password for MQTT (optional)")
  27. p.add_argument("--exchange", default=None, help="AMQP exchange to publish to (optional)")
  28. p.add_argument("--routing-key", default=None, help="AMQP routing key (optional)")
  29. p.add_argument("--amqp", action="store_true", help="Publish via AMQP instead of MQTT")
  30. args = p.parse_args()
  31. socket.setdefaulttimeout(args.timeout)
  32. client = mqtt.Client()
  33. if args.username:
  34. client.username_pw_set(args.username, args.password)
  35. connected = False
  36. def on_connect(client, userdata, flags, rc):
  37. nonlocal connected
  38. connected = (rc == 0)
  39. print(f"on_connect rc={rc}, success={connected}")
  40. def on_publish(client, userdata, mid):
  41. print(f"on_publish mid={mid}")
  42. client.on_connect = on_connect
  43. client.on_publish = on_publish
  44. # If AMQP mode requested, publish via pika to given exchange/routing key
  45. if args.amqp or args.exchange:
  46. if pika is None:
  47. print("pika is not installed; install requirements.txt to enable AMQP mode")
  48. return 5
  49. amqp_host = args.host
  50. amqp_port = args.port if args.port else 5672
  51. try:
  52. credentials = None
  53. if args.username:
  54. credentials = pika.PlainCredentials(args.username, args.password or "")
  55. params = pika.ConnectionParameters(host=amqp_host, port=amqp_port, credentials=credentials)
  56. conn = pika.BlockingConnection(params)
  57. ch = conn.channel()
  58. exchange = args.exchange or 'amq.topic'
  59. routing_key = args.routing_key or args.topic
  60. ch.basic_publish(
  61. exchange=exchange,
  62. routing_key=routing_key,
  63. body=args.message,
  64. properties=pika.BasicProperties(delivery_mode=2)
  65. )
  66. conn.close()
  67. print(f"AMQP message published to exchange='{exchange}' routing_key='{routing_key}'")
  68. return 0
  69. except Exception as e:
  70. print("AMQP publish failed:", e)
  71. return 6
  72. # MQTT mode (default)
  73. try:
  74. client.connect(args.host, args.port, keepalive=60)
  75. except Exception as e:
  76. print("Connection failed:", e)
  77. return 2
  78. client.loop_start()
  79. # wait for on_connect (with timeout)
  80. waited = 0
  81. while not connected and waited < args.timeout:
  82. time.sleep(0.2)
  83. waited += 0.2
  84. if not connected:
  85. print("Failed to connect within timeout")
  86. client.loop_stop()
  87. return 3
  88. try:
  89. result, mid = client.publish(args.topic, args.message, qos=1)
  90. print("publish result:", result, "mid:", mid)
  91. except Exception as e:
  92. print("Publish failed:", e)
  93. client.loop_stop()
  94. return 4
  95. time.sleep(1)
  96. client.loop_stop()
  97. client.disconnect()
  98. print("Message sent")
  99. return 0
  100. if __name__ == '__main__':
  101. sys.exit(main())