-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathtest_mqtt_connection.py
More file actions
148 lines (128 loc) · 5.32 KB
/
test_mqtt_connection.py
File metadata and controls
148 lines (128 loc) · 5.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Test MQTT connection with paho-mqtt 2.1.0 compatibility
Supports testing local Docker MQTT and public HiveMQ broker
"""
import time
import logging
import sys
import argparse
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def test_mqtt(broker_address='localhost', port=1883, username=None, password=None):
"""Test MQTT connection"""
try:
import paho.mqtt.client as mqtt
try:
version = mqtt.__version__
except AttributeError:
try:
import paho.mqtt
version = paho.mqtt.__version__
except:
version = "2.x (unknown exact version)"
logger.info(f"paho-mqtt version: {version}")
except ImportError as e:
logger.error(f"paho-mqtt not installed: {e}")
return False
except Exception as e:
logger.error(f"Error importing paho-mqtt: {e}")
import traceback
traceback.print_exc()
return False
connected = False
def on_connect(client, userdata, flags=None, rc=None, properties=None):
"""Handle connection - compatible with both VERSION1 and VERSION2"""
nonlocal connected
# Handle both VERSION2 (5 params) and VERSION1 (3-4 params)
reason_code = rc if rc is not None else flags
if reason_code == 0:
logger.info(f"✓ Successfully connected to {broker_address}:{port}")
connected = True
else:
error_msg = {
0: "Connection successful",
1: "Incorrect protocol version",
2: "Invalid client identifier",
3: "Server unavailable",
4: "Bad username or password",
5: "Not authorized",
}.get(reason_code, f"Unknown error code {reason_code}")
logger.error(f"✗ Connection failed with code {reason_code}: {error_msg}")
def on_disconnect(client, userdata, flags=None, rc=None, properties=None):
"""Handle disconnection"""
reason_code = rc if rc is not None else flags
if reason_code != 0:
error_msg = {
0: "Normal disconnection",
1: "Incorrect protocol version",
4: "Bad username or password",
5: "Not authorized",
}.get(reason_code, f"Unspecified error")
logger.error(f"Unexpected disconnection (code {reason_code}: {error_msg})")
logger.info("=" * 60)
logger.info(f"Testing MQTT connection to {broker_address}:{port}")
if username:
logger.info(f"Credentials: {username}:{'*' * len(password or '')}")
else:
logger.info("Using anonymous connection")
logger.info("=" * 60)
# Try paho-mqtt 2.x API first
try:
logger.info("Trying paho-mqtt 2.x API (VERSION2)...")
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="test-client")
except (TypeError, AttributeError):
logger.info("Falling back to paho-mqtt VERSION1 API...")
try:
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, client_id="test-client")
except (TypeError, AttributeError):
logger.info("Falling back to paho-mqtt 1.x API...")
client = mqtt.Client(client_id="test-client")
client.on_connect = on_connect
client.on_disconnect = on_disconnect
# Set credentials if provided
if username:
client.username_pw_set(username, password or "")
try:
logger.info(f"Connecting to {broker_address}:{port}...")
client.connect(broker_address, port, keepalive=60)
client.loop_start()
# Wait for connection
for i in range(20):
if connected:
logger.info("✓ Connection test PASSED")
client.disconnect()
client.loop_stop()
return True
time.sleep(0.5)
logger.error("✗ Connection timeout (waited 10 seconds)")
client.loop_stop()
return False
except Exception as e:
logger.error(f"✗ Exception during connection: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Test MQTT broker connection')
parser.add_argument('--broker', default='localhost', help='Broker address (default: localhost)')
parser.add_argument('--port', type=int, default=1883, help='Broker port (default: 1883)')
parser.add_argument('--username', help='Username for authentication')
parser.add_argument('--password', help='Password for authentication')
parser.add_argument('--public', action='store_true', help='Test with public HiveMQ broker instead')
args = parser.parse_args()
# Use public broker if requested
if args.public:
broker = 'broker.hivemq.com'
port = 1883
username = None
password = None
logger.info("Using public HiveMQ broker (broker.hivemq.com:1883)")
else:
broker = args.broker
port = args.port
username = args.username
password = args.password
success = test_mqtt(broker, port, username, password)
sys.exit(0 if success else 1)