-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_simple.py
More file actions
260 lines (190 loc) · 6.37 KB
/
test_simple.py
File metadata and controls
260 lines (190 loc) · 6.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
#!/usr/bin/env python3
"""
Simple test to verify basic functionality.
This creates a publisher and subscriber in the same process to test basic operations.
"""
import time
import threading
from gz_transport_py import Node
# Simple message type for testing
class TestMsg:
def __init__(self, text=""):
self.data = text
def SerializeToString(self):
return self.data.encode('utf-8')
def ParseFromString(self, data):
self.data = data.decode('utf-8')
class DESCRIPTOR:
full_name = "TestMsg"
def test_basic_pubsub():
"""Test basic publish/subscribe functionality."""
print("=" * 50)
print("TEST: Basic Pub/Sub")
print("=" * 50)
received_messages = []
def callback(msg):
received_messages.append(msg.data)
print(f" ✓ Received: {msg.data}")
# Create subscriber node
print("\n1. Creating subscriber node...")
sub_node = Node(verbose=False)
sub_node.subscribe(TestMsg, "/test_topic", callback)
print(" ✓ Subscriber ready")
# Wait for discovery to initialize
time.sleep(0.5)
# Create publisher node
print("\n2. Creating publisher node...")
pub_node = Node(verbose=False)
pub = pub_node.advertise("/test_topic", TestMsg)
print(" ✓ Publisher ready")
# Wait for discovery
time.sleep(1.0)
# Publish messages
print("\n3. Publishing messages...")
for i in range(5):
msg = TestMsg(f"Message {i}")
pub.publish(msg)
print(f" → Sent: Message {i}")
time.sleep(0.2)
# Wait for messages to arrive
time.sleep(0.5)
# Check results
print("\n4. Checking results...")
print(f" Messages sent: 5")
print(f" Messages received: {len(received_messages)}")
# Cleanup
print("\n5. Cleaning up...")
pub_node.shutdown()
sub_node.shutdown()
print(" ✓ Cleanup complete")
# Verify
if len(received_messages) >= 4: # Allow some loss due to timing
print("\n✅ TEST PASSED")
return True
else:
print(f"\n❌ TEST FAILED: Only received {len(received_messages)}/5 messages")
return False
def test_multiple_topics():
"""Test multiple topics simultaneously."""
print("\n" + "=" * 50)
print("TEST: Multiple Topics")
print("=" * 50)
topic1_msgs = []
topic2_msgs = []
def callback1(msg):
topic1_msgs.append(msg.data)
def callback2(msg):
topic2_msgs.append(msg.data)
print("\n1. Setting up nodes...")
node = Node(verbose=False)
# Subscribe to two topics
node.subscribe(TestMsg, "/topic1", callback1)
node.subscribe(TestMsg, "/topic2", callback2)
# Advertise two topics
pub1 = node.advertise("/topic1", TestMsg)
pub2 = node.advertise("/topic2", TestMsg)
time.sleep(0.5)
print("\n2. Publishing to multiple topics...")
for i in range(3):
msg1 = TestMsg(f"Topic1-{i}")
msg2 = TestMsg(f"Topic2-{i}")
pub1.publish(msg1)
pub2.publish(msg2)
time.sleep(0.1)
time.sleep(0.5)
print(f"\n3. Results:")
print(f" Topic1 received: {len(topic1_msgs)} messages")
print(f" Topic2 received: {len(topic2_msgs)} messages")
node.shutdown()
if len(topic1_msgs) >= 2 and len(topic2_msgs) >= 2:
print("\n✅ TEST PASSED")
return True
else:
print("\n❌ TEST FAILED")
return False
def test_topic_discovery():
"""Test topic discovery."""
print("\n" + "=" * 50)
print("TEST: Topic Discovery")
print("=" * 50)
print("\n1. Creating nodes with topics...")
node1 = Node(verbose=False)
node2 = Node(verbose=False)
pub1 = node1.advertise("/discovery_test_1", TestMsg)
pub2 = node2.advertise("/discovery_test_2", TestMsg)
time.sleep(1.0)
print("\n2. Discovering topics...")
topics = node1.topic_list()
print(f"\n3. Found topics: {topics}")
# Check if our topics are discovered
has_topic1 = "/discovery_test_1" in topics
has_topic2 = "/discovery_test_2" in topics
print(f" Found /discovery_test_1: {has_topic1}")
print(f" Found /discovery_test_2: {has_topic2}")
node1.shutdown()
node2.shutdown()
if has_topic1 and has_topic2:
print("\n✅ TEST PASSED")
return True
else:
print("\n❌ TEST FAILED: Not all topics discovered")
return False
def test_unadvertise():
"""Test unadvertising topics."""
print("\n" + "=" * 50)
print("TEST: Unadvertise")
print("=" * 50)
print("\n1. Creating publisher...")
node = Node(verbose=False)
pub = node.advertise("/temp_topic", TestMsg)
time.sleep(0.5)
print("\n2. Checking advertised topics...")
topics = node.advertised_topics()
print(f" Advertised: {topics}")
print("\n3. Unadvertising topic...")
node.unadvertise("/temp_topic")
time.sleep(0.2)
print("\n4. Checking advertised topics again...")
topics = node.advertised_topics()
print(f" Advertised: {topics}")
node.shutdown()
if len(topics) == 0:
print("\n✅ TEST PASSED")
return True
else:
print("\n❌ TEST FAILED: Topic still advertised")
return False
def main():
"""Run all tests."""
print("\n" + "=" * 50)
print("GZ-TRANSPORT-PY TEST SUITE")
print("=" * 50)
results = []
# Run tests
results.append(("Basic Pub/Sub", test_basic_pubsub()))
time.sleep(1)
results.append(("Multiple Topics", test_multiple_topics()))
time.sleep(1)
results.append(("Topic Discovery", test_topic_discovery()))
time.sleep(1)
results.append(("Unadvertise", test_unadvertise()))
# Summary
print("\n" + "=" * 50)
print("TEST SUMMARY")
print("=" * 50)
passed = sum(1 for _, result in results if result)
total = len(results)
for name, result in results:
status = "✅ PASS" if result else "❌ FAIL"
print(f" {status} - {name}")
print("\n" + "=" * 50)
print(f"Result: {passed}/{total} tests passed")
print("=" * 50)
if passed == total:
print("\n🎉 All tests passed!")
return 0
else:
print(f"\n⚠️ {total - passed} test(s) failed")
return 1
if __name__ == "__main__":
exit(main())