Skip to content

Commit 04c183e

Browse files
authored
remove tests from SubscriberTest (#2618)
These tests are moved, in simpler forms, to MessageDispatcherTest and ITPubSubTest. Eventually, we should test streaming reconnection logic independently and remove SubscriberTest altogether.
1 parent 4a5156a commit 04c183e

2 files changed

Lines changed: 56 additions & 318 deletions

File tree

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@
1616

1717
package com.google.cloud.pubsub.it;
1818

19-
import static org.junit.Assert.assertEquals;
20-
import static org.junit.Assert.assertTrue;
19+
import static com.google.common.truth.Truth.assertThat;
2120

22-
import com.google.api.core.SettableApiFuture;
21+
import com.google.auto.value.AutoValue;
2322
import com.google.cloud.ServiceOptions;
2423
import com.google.cloud.pubsub.v1.AckReplyConsumer;
2524
import com.google.cloud.pubsub.v1.MessageReceiver;
@@ -38,6 +37,9 @@
3837
import java.util.Collections;
3938
import java.util.List;
4039
import java.util.UUID;
40+
import java.util.concurrent.BlockingQueue;
41+
import java.util.concurrent.LinkedBlockingQueue;
42+
import java.util.concurrent.TimeUnit;
4143
import org.junit.AfterClass;
4244
import org.junit.BeforeClass;
4345
import org.junit.Rule;
@@ -53,6 +55,17 @@ public class ITPubSubTest {
5355

5456
@Rule public Timeout globalTimeout = Timeout.seconds(300);
5557

58+
@AutoValue
59+
abstract static class MessageAndConsumer {
60+
abstract PubsubMessage message();
61+
62+
abstract AckReplyConsumer consumer();
63+
64+
static MessageAndConsumer create(PubsubMessage message, AckReplyConsumer consumer) {
65+
return new AutoValue_ITPubSubTest_MessageAndConsumer(message, consumer);
66+
}
67+
}
68+
5669
@BeforeClass
5770
public static void setupClass() throws Exception {
5871
topicAdminClient = TopicAdminClient.create();
@@ -81,14 +94,14 @@ public void testTopicPolicy() {
8194
Policy newPolicy =
8295
topicAdminClient.setIamPolicy(
8396
topicName.toString(), policy.toBuilder().addBindings(binding).build());
84-
assertTrue(newPolicy.getBindingsList().contains(binding));
97+
assertThat(newPolicy.getBindingsList()).contains(binding);
8598

8699
String permissionName = "pubsub.topics.get";
87100
List<String> permissions =
88101
topicAdminClient
89102
.testIamPermissions(topicName.toString(), Collections.singletonList(permissionName))
90103
.getPermissionsList();
91-
assertTrue(permissions.contains(permissionName));
104+
assertThat(permissions).contains(permissionName);
92105

93106
topicAdminClient.deleteTopic(topicName);
94107
}
@@ -103,41 +116,68 @@ public void testPublishSubscribe() throws Exception {
103116
topicAdminClient.createTopic(topicName);
104117
subscriptionAdminClient.createSubscription(
105118
subscriptionName, topicName, PushConfig.newBuilder().build(), 10);
106-
PubsubMessage message =
107-
PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("my message")).build();
108119

109-
final SettableApiFuture<PubsubMessage> received = SettableApiFuture.create();
120+
final BlockingQueue<Object> receiveQueue = new LinkedBlockingQueue<>();
110121
Subscriber subscriber =
111122
Subscriber.newBuilder(
112123
subscriptionName,
113124
new MessageReceiver() {
114125
@Override
115126
public void receiveMessage(
116127
final PubsubMessage message, final AckReplyConsumer consumer) {
117-
if (received.set(message)) {
118-
consumer.ack();
119-
} else {
120-
consumer.nack();
121-
}
128+
receiveQueue.offer(MessageAndConsumer.create(message, consumer));
122129
}
123130
})
124131
.build();
125132
subscriber.addListener(
126133
new Subscriber.Listener() {
127134
public void failed(Subscriber.State from, Throwable failure) {
128-
received.setException(failure);
135+
receiveQueue.offer(failure);
129136
}
130137
},
131138
MoreExecutors.directExecutor());
132139
subscriber.startAsync();
133140

134141
Publisher publisher = Publisher.newBuilder(topicName).build();
135-
publisher.publish(message).get();
142+
publisher
143+
.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("msg1")).build())
144+
.get();
145+
publisher
146+
.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("msg2")).build())
147+
.get();
136148
publisher.shutdown();
137149

138-
assertEquals(received.get().getData(), message.getData());
150+
// Ack the first message.
151+
MessageAndConsumer toAck = pollQueue(receiveQueue);
152+
toAck.consumer().ack();
153+
154+
// Nack the other.
155+
MessageAndConsumer toNack = pollQueue(receiveQueue);
156+
assertThat(toNack.message().getData()).isNotEqualTo(toAck.message().getData());
157+
toNack.consumer().nack();
158+
159+
// We should get the nacked message back.
160+
MessageAndConsumer redelivered = pollQueue(receiveQueue);
161+
assertThat(redelivered.message().getData()).isEqualTo(toNack.message().getData());
162+
redelivered.consumer().ack();
163+
139164
subscriber.stopAsync().awaitTerminated();
140165
subscriptionAdminClient.deleteSubscription(subscriptionName);
141166
topicAdminClient.deleteTopic(topicName);
142167
}
168+
169+
private MessageAndConsumer pollQueue(BlockingQueue<Object> queue) throws InterruptedException {
170+
Object obj = queue.poll(1, TimeUnit.MINUTES);
171+
if (obj == null) {
172+
return null;
173+
}
174+
if (obj instanceof Throwable) {
175+
throw new IllegalStateException("unexpected error", (Throwable) obj);
176+
}
177+
if (obj instanceof MessageAndConsumer) {
178+
return (MessageAndConsumer) obj;
179+
}
180+
throw new IllegalStateException(
181+
"expected either MessageAndConsumer or Throwable, found: " + obj);
182+
}
143183
}

0 commit comments

Comments
 (0)