Skip to content

Commit a3ce9fa

Browse files
committed
new extense
1 parent 326f515 commit a3ce9fa

24 files changed

+798
-68
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
//package rpc.framework.server;
2+
//
3+
//public class Event {
4+
// public String ID;
5+
//
6+
// public Event(String iD) {
7+
// super();
8+
// ID = iD;
9+
// }
10+
//
11+
// @Override
12+
// public int hashCode() {
13+
// return ID.hashCode(); // FIXME:: same content string but not the same object?
14+
// }
15+
//
16+
// @Override
17+
// public boolean equals(Object obj) {
18+
// if (obj instanceof Event) {
19+
// return this.ID == ((Event) obj).ID;
20+
// } else {
21+
// return super.equals(obj);
22+
// }
23+
// }
24+
//
25+
// @Override
26+
// public String toString() {
27+
// return ID;
28+
// }
29+
//}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
//package rpc.framework.server;
2+
//
3+
//import java.util.ArrayList;
4+
//import java.util.HashSet;
5+
//import java.util.LinkedList;
6+
//import java.util.concurrent.ConcurrentHashMap;
7+
//
8+
//import org.json.JSONObject;
9+
//
10+
//import rpc.framework.RpcInvoker;
11+
//import rpc.framework.RpcServerInvokerHandler;
12+
//import rpc.json.message.RpcNotification;
13+
//import rpc.util.IDGenerator;
14+
//import rpc.util.RpcLog;
15+
//
16+
///**
17+
// * @author sunshyran
18+
// *
19+
// */
20+
//public class EventCenter implements EventListenerInterface {
21+
// private static final String TAG="EventCenter";
22+
// private RpcServerInvokerHandler mHandler = null;
23+
// private IDGenerator mSIDGen = new IDGenerator(); // TODO:: reuse id generator temporally
24+
// public static final long INVALID_SID = -1;
25+
// private HashSet<EventSourceInterface> mSources = new HashSet<EventSourceInterface>();
26+
//
27+
// private class EventListener implements EventListenerInterface {
28+
// private long mSID = -1;
29+
// private EventListenerInterface mListener = null;
30+
// public EventListener(long SID, EventListenerInterface listener) {
31+
// mSID = SID;
32+
// mListener = listener;
33+
// }
34+
// @Override
35+
// public void onNotify(Event event, Object data) {
36+
// mListener.onNotify(event, data);
37+
// }
38+
//
39+
// }
40+
// private ConcurrentHashMap<Event, LinkedList<EventListener>> mListeners = new ConcurrentHashMap<Event, LinkedList<EventListener>>();
41+
//
42+
// /**add event source into event center
43+
// * @param source
44+
// */
45+
// public void addEventSource(EventSourceInterface source) {
46+
// synchronized (mSources) {
47+
// if (mSources.contains(source)) {
48+
// RpcLog.w(TAG, "source " + source + " is already added into event center!");
49+
// } else {
50+
// RpcLog.i(TAG, "add source " + source + " into event center");
51+
// mSources.add(source);
52+
// }
53+
// }
54+
// }
55+
//
56+
// /**remove source from event center
57+
// * @param source
58+
// */
59+
// public void removeEventSource(EventSourceInterface source) {
60+
// synchronized (mSources) {
61+
// if (!mSources.contains(source)) {
62+
// RpcLog.w(TAG, "source " + source + " is not in event center!");
63+
// } else {
64+
// RpcLog.i(TAG, "remove source " + source + " from event center");
65+
// mSources.remove(source);
66+
// }
67+
// }
68+
// }
69+
//
70+
// public long addEventListener(Event event, EventListener listener) {
71+
// boolean isAcceptable = false;
72+
// synchronized (mSources) {
73+
// for (EventSourceInterface source:mSources) {
74+
// if (source.acceptEvent(event)) {
75+
// if (source.subscribe(event, this)) {
76+
// isAcceptable = true;
77+
// } else {
78+
// RpcLog.e(TAG, "subscribe event of source failed: " + event + ", " + source);
79+
// }
80+
// }
81+
// }
82+
// }
83+
//
84+
// if (!isAcceptable) {
85+
// RpcLog.e(TAG, "no source accept event " + event);
86+
// return INVALID_SID;
87+
// }
88+
//
89+
// long sid = aquireSID();
90+
// LinkedList<EventListener> listeners = mListeners.getOrDefault(event, new LinkedList<EventListener>());
91+
// synchronized (listeners) { // FIXME:: thread safe is enough?
92+
// listeners.add(new EventListener(sid, listener));
93+
// mListeners.put(event, listeners);
94+
// }
95+
// return sid;
96+
// }
97+
//
98+
// public void removeEventListener(Event event, EventListener listener) {
99+
// boolean isAcceptable = false;
100+
// synchronized (mSources) {
101+
// for (EventSourceInterface source:mSources) {
102+
// if (source.acceptEvent(event)) {
103+
// source.unsubscribe(event, this);
104+
// }
105+
// }
106+
// }
107+
//
108+
//
109+
// }
110+
//
111+
//
112+
//
113+
// private long aquireSID() {
114+
// return mSIDGen.getRequestId();
115+
// }
116+
//
117+
//
118+
// @Override
119+
// public void onNotify(Event event, Object data) {
120+
// System.out.println("onNotify");
121+
// HashSet<EventListener> listeners = mListeners.get(event);
122+
// if (listeners == null) { return; }
123+
// ArrayList<EventListener> listeners = new ArrayList<EventListener>(listeners.);
124+
//
125+
// RpcNotification notification = new RpcNotification();
126+
// notification.setMethod(event.method);
127+
// JSONObject params = new JSONObject();
128+
// params.put("SID", sid);
129+
// params.put("data", data);
130+
// notification.setParams(params);
131+
// try {
132+
// notify(notification);
133+
// } catch (InterruptedException e) {
134+
// // TODO Auto-generated catch block
135+
// e.printStackTrace();
136+
// }
137+
// }
138+
//
139+
// private long getRemoteEventListener(Event event) {
140+
// returen 0
141+
// }
142+
//
143+
// private void notify(RpcNotification notification) throws InterruptedException {
144+
// mHandler.invoke(new RpcInvoker(notification, null, null));
145+
// }
146+
//}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
//package rpc.framework.server;
2+
//
3+
//public interface EventListenerInterface {
4+
// void onNotify(Event event, Object data);
5+
//}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
//package rpc.framework.server;
2+
//
3+
//public interface EventSourceInterface {
4+
// public boolean subscribe(Event event, EventListenerInterface observer);
5+
// public boolean unsubscribe(Event event, EventListenerInterface observer);
6+
// public boolean acceptEvent(Event event);
7+
//}

src/rpc/framework/server/RpcServer.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import rpc.exception.RpcException;
1010
import rpc.exception.RpcInternalError;
1111
import rpc.exception.RpcInvalidRquest;
12+
import rpc.framework.INotificationListener;
1213
import rpc.framework.RpcInvoker;
1314
import rpc.framework.RpcServerInvokerHandler;
1415
import rpc.framework.connection.IRpcChannel;
@@ -19,11 +20,11 @@
1920
import rpc.json.message.RpcResponse;
2021
import rpc.util.RpcLog;
2122

22-
public class RpcServer {
23+
public class RpcServer implements INotificationListener{
2324
private static final String TAG = "RpcServer";
2425
private boolean mIsRunning = false;
2526
private RpcServerInvokerHandler mHandler = null;
26-
ServiceRegister mServiceRegister = new ServiceRegister();
27+
ServiceRegistry mServiceRegister = new ServiceRegistry();
2728
ExecutorService mThreadPool = Executors.newCachedThreadPool();
2829

2930
class RequestProcessor implements Callable<RpcResponse> {
@@ -43,7 +44,7 @@ public RpcResponse call() throws Exception {
4344
RpcLog.d(TAG, "RequestProcessor is callings");
4445
try {
4546
if (mRequest.getMethod() == null || mRequest.getMethod().isEmpty()) {
46-
throw new RpcInvalidRquest("method is empty in request of " + mRequest.toString());
47+
throw new RpcInvalidRquest("method is empty in request of " + mRequest.toString());
4748
}
4849
return mService.execute(mRequest);
4950

@@ -59,7 +60,10 @@ public RpcResponse call() throws Exception {
5960
*/
6061
public void registerService(RpcServiceInterface service) {
6162
mServiceRegister.addService(service);
62-
63+
}
64+
65+
public void registerSubject(String servicename, Object subject) {
66+
RpcSubscriber.BaseRpcSubscriber.registerSubject(servicename, subject);
6367
}
6468

6569

@@ -68,7 +72,7 @@ public void registerService(RpcServiceInterface service) {
6872
* @param port
6973
*/
7074
public void serve(String ip, int port) {
71-
RpcLog.i(TAG, mServiceRegister.listServices());
75+
7276
while(true) {
7377
IRpcChannel channel = RpcConnector.acceptChannel(ip, port);
7478
if (channel == null) {
@@ -78,19 +82,35 @@ public void serve(String ip, int port) {
7882
mIsRunning = true;
7983
mHandler = new RpcServerInvokerHandler();
8084
mHandler.bindChannel(channel);
85+
// TODO:: 做一些与回调相关的处理
86+
RpcSubscriber s = null;
87+
try {
88+
s = (RpcSubscriber)RpcSubscriber.BaseRpcSubscriber.clone();
89+
} catch (CloneNotSupportedException e) {
90+
e.printStackTrace();
91+
return;
92+
}
93+
s.onConnect(this);
94+
mServiceRegister.removeService(s.list());
95+
mServiceRegister.addService(s);
96+
RpcLog.i(TAG, mServiceRegister.listServices());
8197
run();
98+
s.onDisconnect();
8299
}
83100
}
84101

85-
public void onNotify(RpcNotification message) {
86-
while (mIsRunning) {
87-
if (message == null) { continue; }
102+
public boolean onNotify(RpcNotification message) {
103+
if (mIsRunning) {
104+
if (message == null) { return false; }
88105
try {
106+
RpcLog.i(TAG, "notify message");
89107
mHandler.invoke(new RpcInvoker(message, null, null));
108+
return true;
90109
} catch (InterruptedException e) {
91110
RpcLog.e(RpcServer.TAG, "server is stopped because of exception:" + e);
92111
}
93112
}
113+
return false;
94114
}
95115

96116

@@ -101,7 +121,7 @@ public void run() {
101121
RpcInvoker invoker = mHandler.retrieve();
102122
RpcRequest request = (RpcRequest)invoker.getRequest();
103123
RpcServiceInterface service = (RpcServiceInterface)mServiceRegister.getService(request.getMethod());
104-
RpcLog.d(TAG, "ask request: " + request.toString());
124+
// RpcLog.d(TAG, "ask request: " + request.toString());
105125
RpcResponse response = null;
106126
if (service == null) {
107127
RpcLog.e(TAG, "NO service can handle this method:" + request.getMethod());

0 commit comments

Comments
 (0)