| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- #!/usr/bin/env python3
- """
- Publish a test MQTT message to the RabbitMQ MQTT plugin.
- Usage:
- ./test_mqtt.py --host rabbitmq.irdi.eu --topic test/topic --message hello
- The script returns exit code 0 on success, non-zero on failure.
- """
- import argparse
- import socket
- import time
- import sys
- import paho.mqtt.client as mqtt
- import socket
- try:
- import pika
- except Exception:
- pika = None
- def main():
- p = argparse.ArgumentParser(description="MQTT publish test")
- p.add_argument("--host", default="rabbitmq.irdi.eu", help="MQTT host")
- p.add_argument("--port", type=int, default=1883, help="MQTT port")
- p.add_argument("--topic", default="test/topic", help="Topic to publish to")
- p.add_argument("--message", default="hello from test", help="Message payload")
- p.add_argument("--timeout", type=int, default=10, help="Connect timeout seconds")
- p.add_argument("--username", default=None, help="Username for MQTT (optional)")
- p.add_argument("--password", default=None, help="Password for MQTT (optional)")
- p.add_argument("--exchange", default=None, help="AMQP exchange to publish to (optional)")
- p.add_argument("--routing-key", default=None, help="AMQP routing key (optional)")
- p.add_argument("--amqp", action="store_true", help="Publish via AMQP instead of MQTT")
- args = p.parse_args()
- socket.setdefaulttimeout(args.timeout)
- client = mqtt.Client()
- if args.username:
- client.username_pw_set(args.username, args.password)
- connected = False
- def on_connect(client, userdata, flags, rc):
- nonlocal connected
- connected = (rc == 0)
- print(f"on_connect rc={rc}, success={connected}")
- def on_publish(client, userdata, mid):
- print(f"on_publish mid={mid}")
- client.on_connect = on_connect
- client.on_publish = on_publish
- # If AMQP mode requested, publish via pika to given exchange/routing key
- if args.amqp or args.exchange:
- if pika is None:
- print("pika is not installed; install requirements.txt to enable AMQP mode")
- return 5
- amqp_host = args.host
- amqp_port = args.port if args.port else 5672
- try:
- credentials = None
- if args.username:
- credentials = pika.PlainCredentials(args.username, args.password or "")
- params = pika.ConnectionParameters(host=amqp_host, port=amqp_port, credentials=credentials)
- conn = pika.BlockingConnection(params)
- ch = conn.channel()
- exchange = args.exchange or 'amq.topic'
- routing_key = args.routing_key or args.topic
- ch.basic_publish(
- exchange=exchange,
- routing_key=routing_key,
- body=args.message,
- properties=pika.BasicProperties(delivery_mode=2)
- )
- conn.close()
- print(f"AMQP message published to exchange='{exchange}' routing_key='{routing_key}'")
- return 0
- except Exception as e:
- print("AMQP publish failed:", e)
- return 6
- # MQTT mode (default)
- try:
- client.connect(args.host, args.port, keepalive=60)
- except Exception as e:
- print("Connection failed:", e)
- return 2
- client.loop_start()
- # wait for on_connect (with timeout)
- waited = 0
- while not connected and waited < args.timeout:
- time.sleep(0.2)
- waited += 0.2
- if not connected:
- print("Failed to connect within timeout")
- client.loop_stop()
- return 3
- try:
- result, mid = client.publish(args.topic, args.message, qos=1)
- print("publish result:", result, "mid:", mid)
- except Exception as e:
- print("Publish failed:", e)
- client.loop_stop()
- return 4
- time.sleep(1)
- client.loop_stop()
- client.disconnect()
- print("Message sent")
- return 0
- if __name__ == '__main__':
- sys.exit(main())
|