Skip to content

Commit eda954c

Browse files
committed
Merge pull request kaazing#9 from sanjay-saxena/develop
Update to the latest K3PO. Fixed a race brought to light by WebSocket…
2 parents 6b2d3bf + 91dc145 commit eda954c

4 files changed

Lines changed: 47 additions & 31 deletions

File tree

ws/ws/pom.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,20 +52,20 @@
5252
<dependency>
5353
<groupId>org.kaazing</groupId>
5454
<artifactId>k3po.junit</artifactId>
55-
<version>2.0.0-alpha-4</version>
55+
<version>2.0.0</version>
5656
<scope>test</scope>
5757
</dependency>
5858
<dependency>
5959
<!-- Needed for EL only, not AST -->
6060
<groupId>org.kaazing</groupId>
6161
<artifactId>k3po.lang</artifactId>
62-
<version>2.0.0-alpha-9</version>
62+
<version>2.0.0</version>
6363
<scope>test</scope>
6464
</dependency>
6565
<dependency>
6666
<groupId>org.kaazing</groupId>
6767
<artifactId>specification.ws</artifactId>
68-
<version>1.0.0-alpha-1</version>
68+
<version>[0.1,0.99)</version>
6969
<scope>test</scope>
7070
</dependency>
7171
</dependencies>
@@ -128,7 +128,7 @@
128128
<plugin>
129129
<groupId>org.kaazing</groupId>
130130
<artifactId>k3po-maven-plugin</artifactId>
131-
<version>2.0.0-alpha-9</version>
131+
<version>2.0.0</version>
132132
<configuration>
133133
<skipTests>${skipITs}</skipTests>
134134
</configuration>

ws/ws/src/main/java/org/kaazing/net/impl/util/BlockingQueueImpl.java

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
/**
22
* Copyright (c) 2007-2014 Kaazing Corporation. All rights reserved.
3-
*
3+
*
44
* Licensed to the Apache Software Foundation (ASF) under one
55
* or more contributor license agreements. See the NOTICE file
66
* distributed with this work for additional information
77
* regarding copyright ownership. The ASF licenses this file
88
* to you under the Apache License, Version 2.0 (the
99
* "License"); you may not use this file except in compliance
1010
* with the License. You may obtain a copy of the License at
11-
*
11+
*
1212
* http://www.apache.org/licenses/LICENSE-2.0
13-
*
13+
*
1414
* Unless required by applicable law or agreed to in writing,
1515
* software distributed under the License is distributed on an
1616
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,35 +26,34 @@
2626
/**
2727
* ArrayBlockingQueue extension with ability to interrupt or end-of-stream.
2828
* This will be used by producer(ie. the listener) and the consumer(ie. the
29-
* WebSocketMessageReader). To match the 3.X event-listener behavior, the
29+
* WebSocketMessageReader). To match the 3.X event-listener behavior, the
3030
* capacity of the queue can be set to 1.
31-
*
31+
*
3232
* @param <E> element type
3333
*/
3434
public class BlockingQueueImpl<E> extends ArrayBlockingQueue<E> {
3535
private static final long serialVersionUID = 1L;
3636

3737
// ### TODO: Maybe expose an API on WebSocket/WsURLConnection for developers
38-
// to specify the number of incoming messages that can be held
38+
// to specify the number of incoming messages that can be held
3939
// before we start pushing on the network.
40-
private static final int _QUEUE_CAPACITY = 32;
41-
40+
private static final int _QUEUE_CAPACITY = 32;
41+
4242
private boolean _done = false;
4343

4444
public BlockingQueueImpl() {
4545
super(_QUEUE_CAPACITY, true);
4646
}
47-
48-
public synchronized void done() {
49-
_done = true;
47+
48+
public synchronized void done() {
49+
_done = true;
5050
notifyAll();
51-
clear();
5251
}
5352

54-
public boolean isDone() {
55-
return _done;
53+
public boolean isDone() {
54+
return _done;
5655
}
57-
56+
5857
public synchronized void reset() {
5958
// Wake up threads that maybe blocked to retrieve data.
6059
notifyAll();
@@ -77,31 +76,31 @@ public E peek() {
7776
}
7877
}
7978
}
80-
79+
8180
if ((el == null) && isDone()) {
8281
String s = "Reader has been interrupted maybe the connection is closed";
8382
throw new RuntimeException(s);
8483
}
8584

8685
return el;
8786
}
88-
87+
8988
@Override
9089
public void put(E el) throws InterruptedException {
9190
synchronized (this) {
9291
while ((size() == _QUEUE_CAPACITY) && !isDone()) {
9392
// Push on the network as the messages are not being retrieved.
9493
wait();
9594
}
96-
95+
9796
if (isDone()) {
9897
notifyAll();
9998
return;
10099
}
101100
}
102101

103102
super.put(el);
104-
103+
105104
synchronized (this) {
106105
notifyAll();
107106
}
@@ -115,17 +114,19 @@ public E take() throws InterruptedException {
115114
while (isEmpty() && !isDone()) {
116115
wait();
117116
}
118-
117+
119118
if (isDone()) {
120119
notifyAll();
121-
122-
String s = "Reader has been interrupted maybe the connection is closed";
123-
throw new InterruptedException(s);
120+
121+
if (size() == 0) {
122+
String s = "Reader has been interrupted maybe the connection is closed";
123+
throw new InterruptedException(s);
124+
}
124125
}
125126
}
126-
127+
127128
el = super.take();
128-
129+
129130
synchronized (this) {
130131
notifyAll();
131132
}

ws/ws/src/main/java/org/kaazing/net/ws/impl/io/WsMessageReaderImpl.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public WebSocketMessageType next() throws IOException {
112112
}
113113

114114
synchronized (this) {
115-
if (!_webSocket.isConnected()) {
115+
if ((_sharedQueue.size() == 0) && !_webSocket.isConnected()) {
116116
_messageType = WebSocketMessageType.EOS;
117117
return _messageType;
118118
}
@@ -162,7 +162,6 @@ public void close() throws IOException {
162162
}
163163

164164
_sharedQueue.done();
165-
_payload = null;
166165
_closed = true;
167166
}
168167

ws/ws/src/test/java/org/kaazing/gateway/client/impl/wseb/WebSocketEmulatedHandlerTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ public void testProcessOpen() throws URISyntaxException {
4747
final UpstreamHandler upstreamHandler = context.mock(UpstreamHandler.class);
4848
final DownstreamHandler downstreamHandler = context.mock(DownstreamHandler.class);
4949
final WebSocketHandlerListener listener = context.mock(WebSocketHandlerListener.class);
50+
51+
CreateHandlerFactory defaultCreateHandlerFactory = WebSocketEmulatedHandler.createHandlerFactory;
52+
UpstreamHandlerFactory defaultUpstreamHandlerFactory = WebSocketEmulatedHandler.upstreamHandlerFactory;
53+
DownstreamHandlerFactory defaultDownstreamHandlerFactory = WebSocketEmulatedHandler.downstreamHandlerFactory;
5054

5155
context.checking(new Expectations() {
5256
{
@@ -119,6 +123,10 @@ public DownstreamHandler createDownstreamHandler() {
119123

120124
handler.processConnect(channel, uri, new String[]{"foo"});
121125
context.assertIsSatisfied();
126+
127+
WebSocketEmulatedHandler.createHandlerFactory = defaultCreateHandlerFactory;
128+
WebSocketEmulatedHandler.upstreamHandlerFactory = defaultUpstreamHandlerFactory;
129+
WebSocketEmulatedHandler.downstreamHandlerFactory = defaultDownstreamHandlerFactory;
122130
}
123131

124132
/*
@@ -133,6 +141,10 @@ public void testProcessFailed() throws URISyntaxException {
133141
final DownstreamHandler downstreamHandler = context.mock(DownstreamHandler.class);
134142
final WebSocketHandlerListener listener = context.mock(WebSocketHandlerListener.class);
135143

144+
CreateHandlerFactory defaultCreateHandlerFactory = WebSocketEmulatedHandler.createHandlerFactory;
145+
UpstreamHandlerFactory defaultUpstreamHandlerFactory = WebSocketEmulatedHandler.upstreamHandlerFactory;
146+
DownstreamHandlerFactory defaultDownstreamHandlerFactory = WebSocketEmulatedHandler.downstreamHandlerFactory;
147+
136148
context.checking(new Expectations() {
137149

138150
{
@@ -191,6 +203,10 @@ public DownstreamHandler createDownstreamHandler() {
191203

192204
handler.processConnect(channel, uri, protocols);
193205
context.assertIsSatisfied();
206+
207+
WebSocketEmulatedHandler.createHandlerFactory = defaultCreateHandlerFactory;
208+
WebSocketEmulatedHandler.upstreamHandlerFactory = defaultUpstreamHandlerFactory;
209+
WebSocketEmulatedHandler.downstreamHandlerFactory = defaultDownstreamHandlerFactory;
194210
}
195211

196212
}

0 commit comments

Comments
 (0)