| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- import argparse
- import sys
- import paho.mqtt.client as mqtt
- def publish_mqtt(host, port, username, password, topic, message, qos):
- client = mqtt.Client(client_id="mqtt-test-client", protocol=mqtt.MQTTv311)
- if username:
- client.username_pw_set(username, password)
- try:
- client.connect(host, port, 60)
- except Exception as e:
- print(f"MQTT connect error: {e}")
- return False
- info = client.publish(topic, payload=message, qos=qos)
- info.wait_for_publish()
- client.disconnect()
- return info.is_published()
- def publish_amqp(host, port, username, password, exchange, routing_key, message):
- try:
- import pika
- except Exception:
- print("pika not installed; cannot publish via AMQP")
- return False
- creds = pika.PlainCredentials(username, password) if username else None
- params = pika.ConnectionParameters(host=host, port=port, credentials=creds) if creds else pika.ConnectionParameters(host=host, port=port)
- try:
- conn = pika.BlockingConnection(params)
- ch = conn.channel()
- props = pika.BasicProperties(delivery_mode=2)
- ok = ch.basic_publish(exchange=exchange, routing_key=routing_key, body=message, properties=props)
- conn.close()
- return ok
- except Exception as e:
- print(f"AMQP publish error: {e}")
- return False
- def main():
- parser = argparse.ArgumentParser(description="Publish a test message to RabbitMQ via MQTT (and optional AMQP)")
- parser.add_argument("--host", default="192.168.1.243", help="MQTT broker host (default: Traefik LB)")
- parser.add_argument("--port", type=int, default=1883, help="MQTT broker port (default: 1883)")
- parser.add_argument("--user", default="user", help="username")
- parser.add_argument("--password", default="changeme", help="password")
- parser.add_argument("--topic", default="mqtt/test", help="MQTT topic to publish")
- parser.add_argument("--message", default="Hello from mqtt_test.py", help="Message payload")
- parser.add_argument("--qos", type=int, choices=[0,1,2], default=1, help="MQTT QoS (0/1/2)")
- parser.add_argument("--amqp", action="store_true", help="Also publish via AMQP to exchange 'mqtt.test' (routing_key 'mqtt/test')")
- parser.add_argument("--amqp-host", default="192.168.1.243", help="AMQP host (default: Traefik LB)")
- parser.add_argument("--amqp-port", type=int, default=5672, help="AMQP port (default: 5672)")
- args = parser.parse_args()
- print(f"Publishing MQTT -> {args.host}:{args.port} topic={args.topic} qos={args.qos}")
- ok = publish_mqtt(args.host, args.port, args.user, args.password, args.topic, args.message, args.qos)
- if ok:
- print("MQTT publish succeeded")
- else:
- print("MQTT publish failed")
- if args.amqp:
- print(f"Publishing AMQP -> {args.amqp_host}:{args.amqp_port} exchange=mqtt.test routing_key=mqtt/test")
- ok2 = publish_amqp(args.amqp_host, args.amqp_port, args.user, args.password, "mqtt.test", "mqtt/test", args.message)
- if ok2:
- print("AMQP publish succeeded")
- else:
- print("AMQP publish failed")
- sys.exit(0 if ok else 2)
- if __name__ == "__main__":
- main()
- import paho.mqtt.client as mqtt
- def main():
- print("starting up")
- client = mqtt.Client(client_id="my_client_id", protocol=mqtt.MQTTv311)
- client.connect("153.92.153.32", 1883)
- client.publish("mqtt/test", payload="Hello", qos=1) # QoS 1 or 2 for persistence
- print("Message published successfully.")
- if __name__ == "__main__":
- main()
|