Skip to content

Commit 56f5376

Browse files
committed
IO相关测试代码
1 parent 2c6e494 commit 56f5376

4 files changed

Lines changed: 325 additions & 3 deletions

File tree

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package com.prd.io.bio;
2+
3+
4+
import java.io.*;
5+
import java.net.Socket;
6+
7+
/**
8+
* BIO实现多人聊天室,功能
9+
* (1)基于BIO模型
10+
* (2)支持多人同时在线
11+
* (3)每个用户的发言都被转发给其他用户
12+
*
13+
* 这里因为使用consoleReader.readLine();不能使用JUint调试
14+
* 只能使用main方法,否则会到consoleReader.readLine();无反应卡死
15+
*/
16+
public class BIOChatClient {
17+
18+
private Socket socket;
19+
private BufferedReader reader;
20+
private BufferedWriter writer;
21+
22+
/**
23+
* 发送消息到服务端
24+
* @param msg
25+
* @throws IOException
26+
*/
27+
private void send(String msg) throws IOException {
28+
if (!socket.isOutputShutdown()) {
29+
writer.write(msg+"\n");
30+
writer.flush();
31+
}
32+
}
33+
34+
private String receive() throws IOException {
35+
return socket.isInputShutdown()?null:reader.readLine();
36+
}
37+
38+
/**
39+
* 检查用户是否退出
40+
* @param msg
41+
* @return
42+
*/
43+
public boolean readyToQuit(String msg) {
44+
return BIOChatServer.QUIT.equals(msg);
45+
}
46+
47+
public void close() {
48+
System.out.println("关闭socket");
49+
if (writer!=null) {
50+
try {
51+
writer.close();
52+
} catch (IOException e) {
53+
e.printStackTrace();
54+
}
55+
}
56+
if (reader!=null) {
57+
try {
58+
reader.close();
59+
} catch (IOException e) {
60+
e.printStackTrace();
61+
}
62+
}
63+
}
64+
65+
public void start() {
66+
try {
67+
// 创建socket
68+
socket = new Socket(BIOChatServer.DEFAULT_HOST,
69+
BIOChatServer.DEFAULT_PORT);
70+
71+
//创建IO流
72+
reader = new BufferedReader(
73+
new InputStreamReader(socket.getInputStream()));
74+
75+
writer = new BufferedWriter(
76+
new OutputStreamWriter(socket.getOutputStream()));
77+
78+
//处理用户输入,这里需要开启另外的线程
79+
//因为等待用户输入时阻塞式的,如果不开启额外的线程,会导致读数据被阻塞,无法接受消息
80+
new Thread(new UserInputHandler(this)).start();
81+
82+
//读取服务器转发的消息
83+
String msg = null;
84+
while ((msg = receive())!=null) {
85+
System.out.println(msg);
86+
}
87+
88+
} catch (IOException e) {
89+
e.printStackTrace();
90+
} finally {
91+
close();
92+
}
93+
}
94+
95+
/**
96+
* 用户输入处理线程
97+
* 单独开启线程是因为读取控制台输入,是会造成阻塞的。
98+
* 防止阻塞从服务器获取消息。
99+
*/
100+
private static class UserInputHandler implements Runnable{
101+
102+
private BIOChatClient chatClient;
103+
104+
public UserInputHandler(BIOChatClient chatClient) {
105+
this.chatClient = chatClient;
106+
}
107+
108+
109+
@Override
110+
public void run() {
111+
try {
112+
System.out.println("请输入:");
113+
// 等待用户输入消息
114+
BufferedReader consoleReader =
115+
new BufferedReader(
116+
new InputStreamReader(System.in));
117+
118+
while(true) {
119+
//一般从控制台获取输入,是会有回车的,所以这里用readline
120+
String input = consoleReader.readLine();
121+
// 向服务器发送消息
122+
chatClient.send(input);
123+
124+
// 检查用户是否准备推出
125+
if (chatClient.readyToQuit(input)) {
126+
break;
127+
}
128+
}
129+
} catch (IOException e) {
130+
e.printStackTrace();
131+
} finally {
132+
System.out.println("关闭");
133+
}
134+
}
135+
}
136+
137+
public static void main(String[] args) {
138+
BIOChatClient client = new BIOChatClient();
139+
client.start();
140+
}
141+
}
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
package com.prd.io.bio;
2+
3+
4+
import java.io.*;
5+
import java.net.ServerSocket;
6+
import java.net.Socket;
7+
import java.util.HashMap;
8+
import java.util.Map;
9+
import java.util.concurrent.ExecutorService;
10+
import java.util.concurrent.Executors;
11+
12+
/**
13+
* BIO实现多人聊天室,功能
14+
* (1)基于BIO模型
15+
* (2)支持多人同时在线
16+
* (3)每个用户的发言都被转发给其他用户
17+
*/
18+
public class BIOChatServer {
19+
public static final int DEFAULT_PORT = 8888;
20+
public static final String DEFAULT_HOST = "127.0.0.1";
21+
public static final String QUIT ="quit";
22+
23+
private ServerSocket serverSocket;
24+
private ExecutorService executorService= Executors.newFixedThreadPool(10);
25+
private Map<Integer, Writer> connectedClients = new HashMap<>();
26+
27+
/**
28+
* 新接入客户端,并将客户端加入到连接Map中
29+
* 多个线程客户端调用,需要线程安全
30+
* @param socket
31+
* @throws IOException
32+
*/
33+
private synchronized void addClient(Socket socket) throws IOException {
34+
if (socket!=null) {
35+
int port = socket.getPort();
36+
BufferedWriter writer = new BufferedWriter(
37+
new OutputStreamWriter(socket.getOutputStream()));
38+
connectedClients.put(port,writer);
39+
System.out.println("客户端["+socket.getInetAddress()+socket.getPort()+"]已连接到服务器");
40+
}
41+
}
42+
43+
/**
44+
* 关闭客户端,并将客户端从连接Map中清除
45+
* 多个线程客户端调用,需要线程安全
46+
* @param socket
47+
* @throws IOException
48+
*/
49+
private synchronized void removeClient(Socket socket) throws IOException {
50+
if (socket!=null) {
51+
int port = socket.getPort();
52+
if (connectedClients.containsKey(port)) {
53+
// socket是通过writer进行封装的,所以关闭writer就会关闭socket
54+
connectedClients.get(port).close();
55+
connectedClients.remove(port);
56+
System.out.println("客户端["+socket.getInetAddress()+socket.getPort()+"]已断开连接");
57+
}
58+
}
59+
}
60+
61+
/**
62+
* 将客户端socket发生来的消息,转发到其他已连接的所有客户端
63+
* @param socket
64+
* @param fwdMsg
65+
*/
66+
private synchronized void forwardMessage(Socket socket,String fwdMsg) {
67+
connectedClients.forEach((port,writer)->{
68+
if (port!=socket.getPort()) {
69+
try {
70+
writer.write(fwdMsg);
71+
writer.flush();
72+
} catch (IOException e) {
73+
e.printStackTrace();
74+
}
75+
}
76+
});
77+
}
78+
79+
/**
80+
* 启动服务器
81+
*/
82+
public void start() {
83+
try {
84+
// 绑定监听端口
85+
serverSocket = new ServerSocket(DEFAULT_PORT);
86+
System.out.println("启动服务器,监听端口:"+DEFAULT_PORT);
87+
88+
while (true){
89+
// 等待客户端连接
90+
Socket socket = serverSocket.accept();
91+
// // 创建ChatHandler线程
92+
// new Thread(new ChatHandler(socket,this)).start();
93+
94+
// 使用线程池
95+
executorService.execute(new ChatHandler(socket,this));
96+
}
97+
98+
} catch (IOException e) {
99+
e.printStackTrace();
100+
} finally {
101+
close();
102+
}
103+
}
104+
105+
public synchronized void close() {
106+
if (serverSocket!=null) {
107+
try {
108+
serverSocket.close();
109+
} catch (IOException e) {
110+
e.printStackTrace();
111+
}
112+
}
113+
}
114+
115+
/**
116+
* 检查用户是否退出
117+
* @param msg
118+
* @return
119+
*/
120+
public boolean readyToQuit(String msg) {
121+
return QUIT.equals(msg);
122+
}
123+
124+
/**
125+
* 每个客户端连接,建立一个线程来处理
126+
*/
127+
private static class ChatHandler implements Runnable{
128+
private Socket socket;
129+
130+
private BIOChatServer server;
131+
132+
public ChatHandler(Socket socket, BIOChatServer server) {
133+
this.socket = socket;
134+
this.server = server;
135+
}
136+
137+
@Override
138+
public void run() {
139+
try {
140+
// 保存新连接用户
141+
server.addClient(socket);
142+
143+
//读取用户发送的消息
144+
BufferedReader reader = new BufferedReader(
145+
new InputStreamReader(socket.getInputStream()));
146+
147+
while(true) {
148+
String msg=reader.readLine();
149+
String fwdMsg = "客户端["+socket.getInetAddress()+socket.getPort()+"]:"+msg;
150+
System.out.println(fwdMsg);
151+
152+
// 将消息转发给聊天室里在线的其他用户
153+
// 添加 \n 方便接收方readline方法处理
154+
server.forwardMessage(socket,fwdMsg+"\n");
155+
156+
// 检查用户是否准备退出
157+
if (server.readyToQuit(msg)) {
158+
break;
159+
}
160+
}
161+
} catch (IOException e) {
162+
e.printStackTrace();
163+
} finally {
164+
try {
165+
server.removeClient(socket);
166+
} catch (IOException e) {
167+
e.printStackTrace();
168+
}
169+
}
170+
}
171+
}
172+
173+
public static void main(String[] args) {
174+
BIOChatServer server = new BIOChatServer();
175+
server.start();
176+
}
177+
}

src/main/java/com/prd/io/bio/Client.java renamed to src/main/java/com/prd/io/bio/BIOClientBase.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
import java.io.*;
66
import java.net.Socket;
77

8-
public class Client {
8+
/**
9+
* BIO模式的SocketClient
10+
*/
11+
public class BIOClientBase {
912

1013
public static final String QUIT="quit";
1114

@@ -37,6 +40,7 @@ public void createClient(int i) {
3740
String input = consoleReader.readLine();
3841

3942
// 发送消息给服务器
43+
// 添加换行符,以便服务端readline()可以区分
4044
writer.write(input+"\n");
4145
writer.flush();
4246

src/main/java/com/prd/io/bio/BIOServer.java renamed to src/main/java/com/prd/io/bio/BIOServerBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
/**
1010
* BIO模式的SocketServer
1111
*/
12-
public class BIOServer {
12+
public class BIOServerBase {
1313

1414
private static final int DEFAULT_PORT=8888;
1515

@@ -47,7 +47,7 @@ public void singleThreadServer() {
4747
writer.flush();
4848

4949
//查看客户端是否退出
50-
if(Client.QUIT.equals(msg)){
50+
if(BIOClientBase.QUIT.equals(msg)){
5151
System.out.println("客户端["+socket.toString()+"]已退出");
5252
break;
5353
}

0 commit comments

Comments
 (0)