Skip to content

Commit 0110c8d

Browse files
Close #28, Implement wallet sync (#63)
* Add SyncRecordHandler, WalletSyncRecordHandler raw implementations * Fix some optional columns in postgres module * Implement SyncRecordHandlerImpl.loadLastSuccessRecord() without records list * Add deposits table and repository * Add records to sync record handler * Add token flag to deposit * Implement SyncRecordHandlerImpl.saveSyncRecord() * Add transactional saving ability to sync records * Organize sync record imports * Implement saveReadyToSyncTransfers * Fix SyncModel table names * Add wallet sync schedule table and repository * Improve column types * Update wallet sync record * Update sync record names * Separate chain sync and wallet sync deposits * Make chain a required property in deposit * Fix wallet sync record model * Add wallet sync repositories * Add single row constraint to wallet sync schedules * Implement wallet sync scheduler handler * Implement deposit sync state * Remove wallet sync deposit model * Remove chain sync scheduler handler
1 parent 71bd99e commit 0110c8d

24 files changed

Lines changed: 214 additions & 109 deletions

File tree

BlockchainGateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/config/AppConfig.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,17 @@ class AppConfig {
2828

2929
@Bean
3030
fun chainSyncService(
31-
syncSchedulerHandler: SyncSchedulerHandler,
31+
chainSyncSchedulerHandler: ChainSyncSchedulerHandler,
3232
chainEndpointProxyFinder: ChainEndpointProxyFinder,
33-
syncRecordHandler: SyncRecordHandler,
33+
chainSyncRecordHandler: ChainSyncRecordHandler,
3434
walletSyncRecordHandler: WalletSyncRecordHandler,
3535
currencyLoader: CurrencyLoader,
3636
operator: TransactionalOperator
3737
): ChainSyncService {
3838
return ChainSyncServiceImpl(
39-
syncSchedulerHandler,
39+
chainSyncSchedulerHandler,
4040
chainEndpointProxyFinder,
41-
syncRecordHandler,
41+
chainSyncRecordHandler,
4242
walletSyncRecordHandler,
4343
currencyLoader,
4444
operator,
@@ -61,4 +61,4 @@ class AppConfig {
6161
fun infoService(): InfoService {
6262
return InfoServiceImpl()
6363
}
64-
}
64+
}

BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Chain.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package co.nilin.opex.bcgateway.core.model
22

3-
import java.math.BigDecimal
43
import java.time.LocalDateTime
54

65
data class Endpoint(val url: String)
@@ -15,4 +14,3 @@ data class ChainSyncRecord(
1514
val error: String?,
1615
val records: List<Deposit>
1716
)
18-

BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ package co.nilin.opex.bcgateway.core.model
33
import java.math.BigDecimal
44

55
data class Deposit(
6+
val id: Long?,
67
val depositor: String,
78
val depositorMemo: String?,
89
val amount: BigDecimal,
9-
val chain: String?,
10+
val chain: String,
1011
val token: Boolean,
1112
val tokenAddress: String?
1213
)

BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ import java.time.LocalDateTime
55
data class WalletSyncSchedule(val retryTime: LocalDateTime, val delay: Long, val batchSize: Long?)
66
data class WalletSyncRecord(
77
val time: LocalDateTime, val success: Boolean, val error: String?, val deposit: Deposit
8-
)
8+
)

BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImpl.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ import java.time.temporal.ChronoUnit
1010
import kotlin.coroutines.coroutineContext
1111

1212
open class ChainSyncServiceImpl(
13-
private val syncSchedulerHandler: SyncSchedulerHandler,
13+
private val chainSyncSchedulerHandler: ChainSyncSchedulerHandler,
1414
private val chainEndpointProxyFinder: ChainEndpointProxyFinder,
15-
private val syncRecordHandler: SyncRecordHandler,
15+
private val chainSyncRecordHandler: ChainSyncRecordHandler,
1616
private val walletSyncRecordHandler: WalletSyncRecordHandler,
1717
private val currencyLoader: CurrencyLoader,
1818
private val operator: TransactionalOperator,
@@ -21,11 +21,11 @@ open class ChainSyncServiceImpl(
2121

2222
override suspend fun startSyncWithChain() {
2323
withContext(coroutineContext) {
24-
val schedules = syncSchedulerHandler.fetchActiveSchedules(currentTime())
24+
val schedules = chainSyncSchedulerHandler.fetchActiveSchedules(currentTime())
2525
schedules.map { syncSchedule ->
2626
async(dispatcher) {
2727
val syncHandler = chainEndpointProxyFinder.findChainEndpointProxy(syncSchedule.chainName)
28-
val lastSync = syncRecordHandler.loadLastSuccessRecord(syncSchedule.chainName)
28+
val lastSync = chainSyncRecordHandler.loadLastSuccessRecord(syncSchedule.chainName)
2929
val tokens = currencyLoader.findImplementationsWithTokenOnChain(syncSchedule.chainName)
3030
.map { impl -> impl.tokenAddress!! }
3131
.toList()
@@ -37,9 +37,9 @@ open class ChainSyncServiceImpl(
3737
)
3838
operator.executeAndAwait {
3939
walletSyncRecordHandler.saveReadyToSyncTransfers(syncResult.chainName, syncResult.records)
40-
syncRecordHandler.saveSyncRecord(syncResult)
40+
chainSyncRecordHandler.saveSyncRecord(syncResult)
4141
if (syncResult.success) {
42-
syncSchedulerHandler.prepareScheduleForNextTry(
42+
chainSyncSchedulerHandler.prepareScheduleForNextTry(
4343
syncSchedule,
4444
currentTime().plus(syncSchedule.delay, ChronoUnit.SECONDS)
4545
)
@@ -51,4 +51,4 @@ open class ChainSyncServiceImpl(
5151
}
5252

5353
protected open fun currentTime() = LocalDateTime.now()
54-
}
54+
}

BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/WalletSyncServiceImpl.kt

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package co.nilin.opex.bcgateway.core.service
22

33
import co.nilin.opex.bcgateway.core.api.WalletSyncService
4+
import co.nilin.opex.bcgateway.core.model.WalletSyncRecord
45
import co.nilin.opex.bcgateway.core.spi.*
56
import kotlinx.coroutines.ExecutorCoroutineDispatcher
67
import kotlinx.coroutines.async
@@ -27,8 +28,16 @@ class WalletSyncServiceImpl(
2728
async(dispatcher) {
2829
val uuid = assignedAddressHandler.findUuid(deposit.depositor, deposit.depositorMemo)
2930
if (uuid != null) {
30-
val symbol = currencyLoader.findSymbol(deposit.chain!!, deposit.tokenAddress)
31+
val symbol = currencyLoader.findSymbol(deposit.chain, deposit.tokenAddress)
3132
if (symbol != null) walletProxy.transfer(uuid, symbol, deposit.amount)
33+
walletSyncRecordHandler.saveWalletSyncRecord(
34+
WalletSyncRecord(
35+
LocalDateTime.now(),
36+
true,
37+
null,
38+
deposit
39+
)
40+
)
3241
}
3342
}
3443
}

BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/SyncRecordHandler.kt renamed to BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncRecordHandler.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package co.nilin.opex.bcgateway.core.spi
22

33
import co.nilin.opex.bcgateway.core.model.ChainSyncRecord
44

5-
interface SyncRecordHandler {
5+
interface ChainSyncRecordHandler {
66
suspend fun loadLastSuccessRecord(chainName: String): ChainSyncRecord?
77
suspend fun saveSyncRecord(syncRecord: ChainSyncRecord)
8-
}
8+
}

BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/SyncSchedulerHandler.kt renamed to BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncSchedulerHandler.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package co.nilin.opex.bcgateway.core.spi
33
import co.nilin.opex.bcgateway.core.model.ChainSyncSchedule
44
import java.time.LocalDateTime
55

6-
interface SyncSchedulerHandler {
6+
interface ChainSyncSchedulerHandler {
77
suspend fun fetchActiveSchedules(time: LocalDateTime): List<ChainSyncSchedule>
88
suspend fun prepareScheduleForNextTry(syncSchedule: ChainSyncSchedule, time: LocalDateTime)
9-
}
9+
}
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package co.nilin.opex.bcgateway.core.spi
22

33
import co.nilin.opex.bcgateway.core.model.Deposit
4+
import co.nilin.opex.bcgateway.core.model.WalletSyncRecord
45

56
interface WalletSyncRecordHandler {
67
suspend fun saveReadyToSyncTransfers(chainName: String, deposits: List<Deposit>)
8+
suspend fun saveWalletSyncRecord(syncRecord: WalletSyncRecord)
79
suspend fun findReadyToSyncTransfers(count: Long?): List<Deposit>
8-
}
10+
}

BlockchainGateway/bc-gateway-core/src/test/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImplTest.kt

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import co.nilin.opex.bcgateway.core.model.Endpoint
66
import co.nilin.opex.bcgateway.core.spi.ChainEndpointProxy
77
import co.nilin.opex.bcgateway.core.spi.ChainEndpointProxyFinder
88
import co.nilin.opex.bcgateway.core.spi.CurrencyLoader
9-
import co.nilin.opex.bcgateway.core.spi.SyncRecordHandler
10-
import co.nilin.opex.bcgateway.core.spi.SyncSchedulerHandler
9+
import co.nilin.opex.bcgateway.core.spi.ChainSyncRecordHandler
10+
import co.nilin.opex.bcgateway.core.spi.ChainSyncSchedulerHandler
1111
import co.nilin.opex.bcgateway.core.spi.WalletSyncRecordHandler
1212
import co.nilin.opex.bcgateway.test.OPERATOR
1313
import java.time.LocalDateTime
@@ -23,7 +23,6 @@ import org.mockito.kotlin.any
2323
import org.mockito.kotlin.mock
2424
import org.mockito.kotlin.times
2525
import org.mockito.kotlin.verify
26-
import org.mockito.kotlin.verifyNoMoreInteractions
2726
import org.mockito.kotlin.verifyZeroInteractions
2827

2928
internal class ChainSyncServiceImplTest {
@@ -34,13 +33,13 @@ internal class ChainSyncServiceImplTest {
3433
val syncService: ChainSyncServiceImpl
3534

3635
@Mock
37-
lateinit var syncSchedulerHandler: SyncSchedulerHandler
36+
lateinit var chainSyncSchedulerHandler: ChainSyncSchedulerHandler
3837

3938
@Mock
4039
lateinit var chainEndpointProxyFinder: ChainEndpointProxyFinder
4140

4241
@Mock
43-
lateinit var syncRecordHandler: SyncRecordHandler
42+
lateinit var chainSyncRecordHandler: ChainSyncRecordHandler
4443

4544
@Mock
4645
lateinit var walletSyncRecordHandler: WalletSyncRecordHandler
@@ -59,9 +58,9 @@ internal class ChainSyncServiceImplTest {
5958
}
6059

6160
syncService = object : ChainSyncServiceImpl(
62-
syncSchedulerHandler,
61+
chainSyncSchedulerHandler,
6362
chainEndpointProxyFinder,
64-
syncRecordHandler,
63+
chainSyncRecordHandler,
6564
walletSyncRecordHandler,
6665
currencyLoader,
6766
OPERATOR,
@@ -75,15 +74,15 @@ internal class ChainSyncServiceImplTest {
7574
fun givenNoActiveSchedules_whenStartSync_thenNoOp() {
7675
runBlocking {
7776
//given
78-
Mockito.`when`(syncSchedulerHandler.fetchActiveSchedules(any())).thenReturn(emptyList())
77+
Mockito.`when`(chainSyncSchedulerHandler.fetchActiveSchedules(any())).thenReturn(emptyList())
7978

8079
//when
8180
syncService.startSyncWithChain()
8281

8382
//then
8483
verifyZeroInteractions(
8584
chainEndpointProxyFinder,
86-
syncRecordHandler,
85+
chainSyncRecordHandler,
8786
walletSyncRecordHandler,
8887
currencyLoader
8988
)
@@ -96,7 +95,7 @@ internal class ChainSyncServiceImplTest {
9695
//given
9796
val delay = 100L
9897
val syncSchedule = ChainSyncSchedule(ethChain, time, delay)
99-
Mockito.`when`(syncSchedulerHandler.fetchActiveSchedules(any()))
98+
Mockito.`when`(chainSyncSchedulerHandler.fetchActiveSchedules(any()))
10099
.thenReturn(listOf(syncSchedule))
101100
Mockito.`when`(endpointProxy.syncTransfers(any())).thenReturn(
102101
ChainSyncRecord(
@@ -108,9 +107,9 @@ internal class ChainSyncServiceImplTest {
108107
syncService.startSyncWithChain()
109108

110109
//then
111-
verify(syncRecordHandler).saveSyncRecord(any())
110+
verify(chainSyncRecordHandler).saveSyncRecord(any())
112111
verify(walletSyncRecordHandler).saveReadyToSyncTransfers(any(), any())
113-
verify(syncSchedulerHandler).prepareScheduleForNextTry(syncSchedule, time.plus(delay, ChronoUnit.SECONDS))
112+
verify(chainSyncSchedulerHandler).prepareScheduleForNextTry(syncSchedule, time.plus(delay, ChronoUnit.SECONDS))
114113
}
115114
}
116115

@@ -120,7 +119,7 @@ internal class ChainSyncServiceImplTest {
120119
//given
121120
val delay = 100L
122121
val syncSchedule = ChainSyncSchedule(ethChain, time, delay)
123-
Mockito.`when`(syncSchedulerHandler.fetchActiveSchedules(any()))
122+
Mockito.`when`(chainSyncSchedulerHandler.fetchActiveSchedules(any()))
124123
.thenReturn(listOf(syncSchedule))
125124
Mockito.`when`(endpointProxy.syncTransfers(any())).thenReturn(
126125
ChainSyncRecord(
@@ -132,11 +131,11 @@ internal class ChainSyncServiceImplTest {
132131
syncService.startSyncWithChain()
133132

134133
//then
135-
verify(syncRecordHandler).saveSyncRecord(any())
134+
verify(chainSyncRecordHandler).saveSyncRecord(any())
136135
verify(walletSyncRecordHandler).saveReadyToSyncTransfers(any(), any())
137-
verify(syncSchedulerHandler, times(0)).prepareScheduleForNextTry(any(), any())
136+
verify(chainSyncSchedulerHandler, times(0)).prepareScheduleForNextTry(any(), any())
138137
}
139138
}
140139

141140

142-
}
141+
}

0 commit comments

Comments
 (0)