11/**
22 * Copyright (c) 2007-2014 Kaazing Corporation. All rights reserved.
3- *
3+ *
44 * Licensed to the Apache Software Foundation (ASF) under one
55 * or more contributor license agreements. See the NOTICE file
66 * distributed with this work for additional information
77 * regarding copyright ownership. The ASF licenses this file
88 * to you under the Apache License, Version 2.0 (the
99 * "License"); you may not use this file except in compliance
1010 * with the License. You may obtain a copy of the License at
11- *
11+ *
1212 * http://www.apache.org/licenses/LICENSE-2.0
13- *
13+ *
1414 * Unless required by applicable law or agreed to in writing,
1515 * software distributed under the License is distributed on an
1616 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
2626/**
2727 * ArrayBlockingQueue extension with ability to interrupt or end-of-stream.
2828 * This will be used by producer(ie. the listener) and the consumer(ie. the
29- * WebSocketMessageReader). To match the 3.X event-listener behavior, the
29+ * WebSocketMessageReader). To match the 3.X event-listener behavior, the
3030 * capacity of the queue can be set to 1.
31- *
31+ *
3232 * @param <E> element type
3333 */
3434public class BlockingQueueImpl <E > extends ArrayBlockingQueue <E > {
3535 private static final long serialVersionUID = 1L ;
3636
3737 // ### TODO: Maybe expose an API on WebSocket/WsURLConnection for developers
38- // to specify the number of incoming messages that can be held
38+ // to specify the number of incoming messages that can be held
3939 // before we start pushing on the network.
40- private static final int _QUEUE_CAPACITY = 32 ;
41-
40+ private static final int _QUEUE_CAPACITY = 32 ;
41+
4242 private boolean _done = false ;
4343
4444 public BlockingQueueImpl () {
4545 super (_QUEUE_CAPACITY , true );
4646 }
47-
48- public synchronized void done () {
49- _done = true ;
47+
48+ public synchronized void done () {
49+ _done = true ;
5050 notifyAll ();
51- clear ();
5251 }
5352
54- public boolean isDone () {
55- return _done ;
53+ public boolean isDone () {
54+ return _done ;
5655 }
57-
56+
5857 public synchronized void reset () {
5958 // Wake up threads that maybe blocked to retrieve data.
6059 notifyAll ();
@@ -77,31 +76,31 @@ public E peek() {
7776 }
7877 }
7978 }
80-
79+
8180 if ((el == null ) && isDone ()) {
8281 String s = "Reader has been interrupted maybe the connection is closed" ;
8382 throw new RuntimeException (s );
8483 }
8584
8685 return el ;
8786 }
88-
87+
8988 @ Override
9089 public void put (E el ) throws InterruptedException {
9190 synchronized (this ) {
9291 while ((size () == _QUEUE_CAPACITY ) && !isDone ()) {
9392 // Push on the network as the messages are not being retrieved.
9493 wait ();
9594 }
96-
95+
9796 if (isDone ()) {
9897 notifyAll ();
9998 return ;
10099 }
101100 }
102101
103102 super .put (el );
104-
103+
105104 synchronized (this ) {
106105 notifyAll ();
107106 }
@@ -115,17 +114,19 @@ public E take() throws InterruptedException {
115114 while (isEmpty () && !isDone ()) {
116115 wait ();
117116 }
118-
117+
119118 if (isDone ()) {
120119 notifyAll ();
121-
122- String s = "Reader has been interrupted maybe the connection is closed" ;
123- throw new InterruptedException (s );
120+
121+ if (size () == 0 ) {
122+ String s = "Reader has been interrupted maybe the connection is closed" ;
123+ throw new InterruptedException (s );
124+ }
124125 }
125126 }
126-
127+
127128 el = super .take ();
128-
129+
129130 synchronized (this ) {
130131 notifyAll ();
131132 }
0 commit comments