1616
1717package com .google .cloud .pubsub .it ;
1818
19- import static org .junit .Assert .assertEquals ;
20- import static org .junit .Assert .assertTrue ;
19+ import static com .google .common .truth .Truth .assertThat ;
2120
22- import com .google .api . core . SettableApiFuture ;
21+ import com .google .auto . value . AutoValue ;
2322import com .google .cloud .ServiceOptions ;
2423import com .google .cloud .pubsub .v1 .AckReplyConsumer ;
2524import com .google .cloud .pubsub .v1 .MessageReceiver ;
3837import java .util .Collections ;
3938import java .util .List ;
4039import java .util .UUID ;
40+ import java .util .concurrent .BlockingQueue ;
41+ import java .util .concurrent .LinkedBlockingQueue ;
42+ import java .util .concurrent .TimeUnit ;
4143import org .junit .AfterClass ;
4244import org .junit .BeforeClass ;
4345import org .junit .Rule ;
@@ -53,6 +55,17 @@ public class ITPubSubTest {
5355
5456 @ Rule public Timeout globalTimeout = Timeout .seconds (300 );
5557
58+ @ AutoValue
59+ abstract static class MessageAndConsumer {
60+ abstract PubsubMessage message ();
61+
62+ abstract AckReplyConsumer consumer ();
63+
64+ static MessageAndConsumer create (PubsubMessage message , AckReplyConsumer consumer ) {
65+ return new AutoValue_ITPubSubTest_MessageAndConsumer (message , consumer );
66+ }
67+ }
68+
5669 @ BeforeClass
5770 public static void setupClass () throws Exception {
5871 topicAdminClient = TopicAdminClient .create ();
@@ -81,14 +94,14 @@ public void testTopicPolicy() {
8194 Policy newPolicy =
8295 topicAdminClient .setIamPolicy (
8396 topicName .toString (), policy .toBuilder ().addBindings (binding ).build ());
84- assertTrue (newPolicy .getBindingsList ().contains (binding ) );
97+ assertThat (newPolicy .getBindingsList ()) .contains (binding );
8598
8699 String permissionName = "pubsub.topics.get" ;
87100 List <String > permissions =
88101 topicAdminClient
89102 .testIamPermissions (topicName .toString (), Collections .singletonList (permissionName ))
90103 .getPermissionsList ();
91- assertTrue (permissions .contains (permissionName ) );
104+ assertThat (permissions ) .contains (permissionName );
92105
93106 topicAdminClient .deleteTopic (topicName );
94107 }
@@ -103,41 +116,68 @@ public void testPublishSubscribe() throws Exception {
103116 topicAdminClient .createTopic (topicName );
104117 subscriptionAdminClient .createSubscription (
105118 subscriptionName , topicName , PushConfig .newBuilder ().build (), 10 );
106- PubsubMessage message =
107- PubsubMessage .newBuilder ().setData (ByteString .copyFromUtf8 ("my message" )).build ();
108119
109- final SettableApiFuture < PubsubMessage > received = SettableApiFuture . create ();
120+ final BlockingQueue < Object > receiveQueue = new LinkedBlockingQueue <> ();
110121 Subscriber subscriber =
111122 Subscriber .newBuilder (
112123 subscriptionName ,
113124 new MessageReceiver () {
114125 @ Override
115126 public void receiveMessage (
116127 final PubsubMessage message , final AckReplyConsumer consumer ) {
117- if (received .set (message )) {
118- consumer .ack ();
119- } else {
120- consumer .nack ();
121- }
128+ receiveQueue .offer (MessageAndConsumer .create (message , consumer ));
122129 }
123130 })
124131 .build ();
125132 subscriber .addListener (
126133 new Subscriber .Listener () {
127134 public void failed (Subscriber .State from , Throwable failure ) {
128- received . setException (failure );
135+ receiveQueue . offer (failure );
129136 }
130137 },
131138 MoreExecutors .directExecutor ());
132139 subscriber .startAsync ();
133140
134141 Publisher publisher = Publisher .newBuilder (topicName ).build ();
135- publisher .publish (message ).get ();
142+ publisher
143+ .publish (PubsubMessage .newBuilder ().setData (ByteString .copyFromUtf8 ("msg1" )).build ())
144+ .get ();
145+ publisher
146+ .publish (PubsubMessage .newBuilder ().setData (ByteString .copyFromUtf8 ("msg2" )).build ())
147+ .get ();
136148 publisher .shutdown ();
137149
138- assertEquals (received .get ().getData (), message .getData ());
150+ // Ack the first message.
151+ MessageAndConsumer toAck = pollQueue (receiveQueue );
152+ toAck .consumer ().ack ();
153+
154+ // Nack the other.
155+ MessageAndConsumer toNack = pollQueue (receiveQueue );
156+ assertThat (toNack .message ().getData ()).isNotEqualTo (toAck .message ().getData ());
157+ toNack .consumer ().nack ();
158+
159+ // We should get the nacked message back.
160+ MessageAndConsumer redelivered = pollQueue (receiveQueue );
161+ assertThat (redelivered .message ().getData ()).isEqualTo (toNack .message ().getData ());
162+ redelivered .consumer ().ack ();
163+
139164 subscriber .stopAsync ().awaitTerminated ();
140165 subscriptionAdminClient .deleteSubscription (subscriptionName );
141166 topicAdminClient .deleteTopic (topicName );
142167 }
168+
169+ private MessageAndConsumer pollQueue (BlockingQueue <Object > queue ) throws InterruptedException {
170+ Object obj = queue .poll (1 , TimeUnit .MINUTES );
171+ if (obj == null ) {
172+ return null ;
173+ }
174+ if (obj instanceof Throwable ) {
175+ throw new IllegalStateException ("unexpected error" , (Throwable ) obj );
176+ }
177+ if (obj instanceof MessageAndConsumer ) {
178+ return (MessageAndConsumer ) obj ;
179+ }
180+ throw new IllegalStateException (
181+ "expected either MessageAndConsumer or Throwable, found: " + obj );
182+ }
143183}
0 commit comments