-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathMaster.java
More file actions
193 lines (154 loc) · 5.63 KB
/
Master.java
File metadata and controls
193 lines (154 loc) · 5.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import java.io.FileNotFoundException;
import java.io.IOException;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeSet;
public class Master implements MasterReplicaInterface, MasterServerClientInterface, Remote{
class HeartBeatTask extends TimerTask {
@Override
public void run() {
// check state of replicas
for (ReplicaLoc replicaLoc : replicaServersLocs) {
try {
replicaServersStubs.get(replicaLoc.getId()).isAlive();
} catch (RemoteException e) {
replicaLoc.setAlive(false);
e.printStackTrace();
}
}
}
}
private int nextTID;
private int heartBeatRate = Configurations.HEART_BEAT_RATE;
private int replicationN = Configurations.REPLICATION_N; // number of file replicas
private Timer HeartBeatTimer;
private Random randomGen;
private Map<String, List<ReplicaLoc> > filesLocationMap;
private Map<String, ReplicaLoc> primaryReplicaMap;
// private Map<Integer, String> activeTransactions; // active transactions <ID, fileName>
private List<ReplicaLoc> replicaServersLocs;
private List<ReplicaMasterInterface> replicaServersStubs;
public Master() {
filesLocationMap = new HashMap<String, List<ReplicaLoc>>();
primaryReplicaMap = new HashMap<String, ReplicaLoc>();
// activeTransactions = new HashMap<Integer, String>();
replicaServersLocs = new ArrayList<ReplicaLoc>();
replicaServersStubs = new ArrayList<ReplicaMasterInterface>();
nextTID = 0;
randomGen = new Random();
HeartBeatTimer = new Timer(); //At this line a new Thread will be created
HeartBeatTimer.scheduleAtFixedRate(new HeartBeatTask(), 0, heartBeatRate); //delay in milliseconds
}
/**
* elects a new primary replica for the given file
* @param fileName
*/
private void assignNewMaster(String fileName){
List<ReplicaLoc> replicas = filesLocationMap.get(fileName);
boolean newPrimaryAssigned = false;
for (ReplicaLoc replicaLoc : replicas) {
if (replicaLoc.isAlive()){
newPrimaryAssigned = true;
primaryReplicaMap.put(fileName, replicaLoc);
try {
replicaServersStubs.get(replicaLoc.getId()).takeCharge(fileName, filesLocationMap.get(fileName));
} catch (RemoteException | NotBoundException e) {
e.printStackTrace();
}
break;
}
}
if (!newPrimaryAssigned){
//TODO a7a ya3ni
}
}
/**
* creates a new file @ N replica servers that are randomly chosen
* elect the primary replica at random
* @param fileName
*/
private void createNewFile(String fileName){
System.out.println("[@Master] Creating new file initiated");
int luckyReplicas[] = new int[replicationN];
List<ReplicaLoc> replicas = new ArrayList<ReplicaLoc>();
Set<Integer> chosenReplicas = new TreeSet<Integer>();
for (int i = 0; i < luckyReplicas.length; i++) {
// TODO if no replica alive enter infinte loop
do {
luckyReplicas[i] = randomGen.nextInt(replicationN);
// System.err.println(luckyReplicas[i] );
// System.err.println(replicaServersLocs.get(luckyReplicas[i]).isAlive());
} while(!replicaServersLocs.get(luckyReplicas[i]).isAlive() || chosenReplicas.contains(luckyReplicas[i]));
chosenReplicas.add(luckyReplicas[i]);
// add the lucky replica to the list of replicas maintaining the file
replicas.add(replicaServersLocs.get(luckyReplicas[i]));
// create the file at the lucky replicas
try {
replicaServersStubs.get(luckyReplicas[i]).createFile(fileName);
} catch (IOException e) {
// failed to create the file at replica server
e.printStackTrace();
}
}
// the primary replica is the first lucky replica picked
int primary = luckyReplicas[0];
try {
replicaServersStubs.get(primary).takeCharge(fileName, replicas);
} catch (RemoteException | NotBoundException e) {
// couldn't assign the master replica
e.printStackTrace();
}
filesLocationMap.put(fileName, replicas);
primaryReplicaMap.put(fileName, replicaServersLocs.get(primary));
}
@Override
public List<ReplicaLoc> read(String fileName) throws FileNotFoundException,
IOException, RemoteException {
List<ReplicaLoc> replicaLocs = filesLocationMap.get(fileName);
if (replicaLocs == null)
throw new FileNotFoundException();
return replicaLocs;
}
@Override
public WriteAck write(String fileName) throws RemoteException, IOException {
System.out.println("[@Master] write request initiated");
long timeStamp = System.currentTimeMillis();
List<ReplicaLoc> replicaLocs= filesLocationMap.get(fileName);
int tid = nextTID++;
if (replicaLocs == null) // file not found
createNewFile(fileName);
ReplicaLoc primaryReplicaLoc = primaryReplicaMap.get(fileName);
if (primaryReplicaLoc == null)
throw new IllegalStateException("No primary replica found");
// if the primary replica is down .. elect a new replica
if (!primaryReplicaLoc.isAlive()){
assignNewMaster(fileName);
primaryReplicaLoc = primaryReplicaMap.get(fileName);
}
return new WriteAck(tid, timeStamp,primaryReplicaLoc);
}
@Override
public ReplicaLoc locatePrimaryReplica(String fileName)
throws RemoteException {
return primaryReplicaMap.get(fileName);
}
/**
* registers new replica server @ the master by adding required meta data
* @param replicaLoc
* @param replicaStub
*/
public void registerReplicaServer(ReplicaLoc replicaLoc, ReplicaInterface replicaStub){
replicaServersLocs.add(replicaLoc);
replicaServersStubs.add( (ReplicaMasterInterface) replicaStub);
}
//TODO add commit to master to handle meta-data
}