mqtt_test.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. import argparse
  2. import sys
  3. import paho.mqtt.client as mqtt
  4. def publish_mqtt(host, port, username, password, topic, message, qos):
  5. client = mqtt.Client(client_id="mqtt-test-client", protocol=mqtt.MQTTv311)
  6. if username:
  7. client.username_pw_set(username, password)
  8. try:
  9. client.connect(host, port, 60)
  10. except Exception as e:
  11. print(f"MQTT connect error: {e}")
  12. return False
  13. info = client.publish(topic, payload=message, qos=qos)
  14. info.wait_for_publish()
  15. client.disconnect()
  16. return info.is_published()
  17. def publish_amqp(host, port, username, password, exchange, routing_key, message):
  18. try:
  19. import pika
  20. except Exception:
  21. print("pika not installed; cannot publish via AMQP")
  22. return False
  23. creds = pika.PlainCredentials(username, password) if username else None
  24. params = pika.ConnectionParameters(host=host, port=port, credentials=creds) if creds else pika.ConnectionParameters(host=host, port=port)
  25. try:
  26. conn = pika.BlockingConnection(params)
  27. ch = conn.channel()
  28. props = pika.BasicProperties(delivery_mode=2)
  29. ok = ch.basic_publish(exchange=exchange, routing_key=routing_key, body=message, properties=props)
  30. conn.close()
  31. return ok
  32. except Exception as e:
  33. print(f"AMQP publish error: {e}")
  34. return False
  35. def main():
  36. parser = argparse.ArgumentParser(description="Publish a test message to RabbitMQ via MQTT (and optional AMQP)")
  37. parser.add_argument("--host", default="192.168.1.243", help="MQTT broker host (default: Traefik LB)")
  38. parser.add_argument("--port", type=int, default=1883, help="MQTT broker port (default: 1883)")
  39. parser.add_argument("--user", default="user", help="username")
  40. parser.add_argument("--password", default="changeme", help="password")
  41. parser.add_argument("--topic", default="mqtt/test", help="MQTT topic to publish")
  42. parser.add_argument("--message", default="Hello from mqtt_test.py", help="Message payload")
  43. parser.add_argument("--qos", type=int, choices=[0,1,2], default=1, help="MQTT QoS (0/1/2)")
  44. parser.add_argument("--amqp", action="store_true", help="Also publish via AMQP to exchange 'mqtt.test' (routing_key 'mqtt/test')")
  45. parser.add_argument("--amqp-host", default="192.168.1.243", help="AMQP host (default: Traefik LB)")
  46. parser.add_argument("--amqp-port", type=int, default=5672, help="AMQP port (default: 5672)")
  47. args = parser.parse_args()
  48. print(f"Publishing MQTT -> {args.host}:{args.port} topic={args.topic} qos={args.qos}")
  49. ok = publish_mqtt(args.host, args.port, args.user, args.password, args.topic, args.message, args.qos)
  50. if ok:
  51. print("MQTT publish succeeded")
  52. else:
  53. print("MQTT publish failed")
  54. if args.amqp:
  55. print(f"Publishing AMQP -> {args.amqp_host}:{args.amqp_port} exchange=mqtt.test routing_key=mqtt/test")
  56. ok2 = publish_amqp(args.amqp_host, args.amqp_port, args.user, args.password, "mqtt.test", "mqtt/test", args.message)
  57. if ok2:
  58. print("AMQP publish succeeded")
  59. else:
  60. print("AMQP publish failed")
  61. sys.exit(0 if ok else 2)
  62. if __name__ == "__main__":
  63. main()
  64. import paho.mqtt.client as mqtt
  65. def main():
  66. print("starting up")
  67. client = mqtt.Client(client_id="my_client_id", protocol=mqtt.MQTTv311)
  68. client.connect("153.92.153.32", 1883)
  69. client.publish("mqtt/test", payload="Hello", qos=1) # QoS 1 or 2 for persistence
  70. print("Message published successfully.")
  71. if __name__ == "__main__":
  72. main()