-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSnapshot.java
More file actions
127 lines (104 loc) · 2.92 KB
/
Snapshot.java
File metadata and controls
127 lines (104 loc) · 2.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package com.ashishp.dc.assignment.cl.entity;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import com.google.common.base.MoreObjects;
/**
*
* @author <a href="http://www.linkedin.com/in/aashishpanchal">Aashish</a>
*
*/
public final class Snapshot implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
/**
* Sequential number, current snapshot ID to be taken
*/
private int id;
/**
* Current amount of money at the bank
*/
private int localBalance;
/**
* All money transfers from incoming channels upon receiving the marker
*/
private int moneyInTransfer;
/**
* Incoming nodes to be recorded for distributed snapshot holds only node ids
* from where the marker has not arrived yet if collection is empty -> all
* markers are received
* <p>
* Map<NodeId>
*/
private final @NotNull Set<Integer> unrecordedChannels = new HashSet<Integer>();
public void startSnapshotRecording(int nodeId, int balance, Map<Integer, String> nodes) {
id++;
localBalance = balance;
moneyInTransfer = 0;
unrecordedChannels.addAll(nodes.entrySet().parallelStream().filter(n -> n.getKey() != nodeId)
.map(Map.Entry::getKey).collect(Collectors.toSet()));
}
public void stopSnapshotRecording() {
localBalance = 0;
moneyInTransfer = 0;
unrecordedChannels.clear();
}
public int getId() {
return id;
}
public int getLocalBalance() {
return localBalance;
}
public int getMoneyInTransfer() {
return moneyInTransfer;
}
/**
* Increments the money-in-transfer upon receiving the marker from that node
*
* @param recipientNodeId
* recipient of the money transfer
* @param amount
* of the money transfer
*/
public void incrementMoneyInTransfer(int recipientNodeId, int amount) {
if (unrecordedChannels.contains(recipientNodeId)) {
moneyInTransfer += amount;
}
}
public void stopRecording(int nodeId) {
unrecordedChannels.remove(nodeId);
}
public boolean isRecording() {
return unrecordedChannels.size() != 0;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
if (o instanceof Snapshot) {
Snapshot object = (Snapshot) o;
return Objects.equals(id, object.id) && Objects.equals(localBalance, object.localBalance)
&& Objects.equals(moneyInTransfer, object.moneyInTransfer);
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(id, localBalance, moneyInTransfer);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("id", id).add("localBalance", localBalance)
.add("moneyInTransfer", moneyInTransfer)
.add("unrecordedChannels", Arrays.toString(unrecordedChannels.toArray())).toString();
}
}