Skip to content

Commit 95bd379

Browse files
authored
Fixed scan endpoint and Added demo data (#145)
close #135
1 parent 347599e commit 95bd379

8 files changed

Lines changed: 102 additions & 44 deletions

File tree

BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,14 @@ package co.nilin.opex.bcgateway.core.model
22

33
import java.time.LocalDateTime
44

5-
data class WalletSyncSchedule(val retryTime: LocalDateTime, val delay: Long, val batchSize: Long?)
5+
data class WalletSyncSchedule(
6+
val retryTime: LocalDateTime,
7+
val delay: Long,
8+
val batchSize: Long?
9+
)
10+
611
data class WalletSyncRecord(
7-
val time: LocalDateTime, val success: Boolean, val error: String?, val deposit: Deposit
12+
val time: LocalDateTime,
13+
val success: Boolean,
14+
val error: String?
815
)

BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/WalletSyncServiceImpl.kt

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@ import co.nilin.opex.bcgateway.core.model.CurrencyImplementation
55
import co.nilin.opex.bcgateway.core.model.Deposit
66
import co.nilin.opex.bcgateway.core.model.WalletSyncRecord
77
import co.nilin.opex.bcgateway.core.spi.*
8-
import kotlinx.coroutines.ExecutorCoroutineDispatcher
9-
import kotlinx.coroutines.async
10-
import kotlinx.coroutines.withContext
8+
import kotlinx.coroutines.*
119
import org.slf4j.LoggerFactory
1210
import java.math.BigDecimal
1311
import java.time.LocalDateTime
@@ -31,26 +29,33 @@ class WalletSyncServiceImpl(
3129
if (schedule != null) {
3230
val deposits = walletSyncRecordHandler.findReadyToSyncTransfers(schedule.batchSize)
3331
logger.info("syncing ${deposits.size} deposits")
34-
deposits.map { deposit ->
32+
33+
val result = deposits.map { deposit ->
3534
async(dispatcher) {
35+
var deposited = false
3636
val uuid = assignedAddressHandler.findUuid(deposit.depositor, deposit.depositorMemo)
3737
if (uuid != null) {
3838
logger.info("deposit came for $uuid - to ${deposit.depositor}")
3939
val symbol = currencyLoader.findByChainAndTokenAddress(deposit.chain, deposit.tokenAddress)
4040
if (symbol != null) {
4141
sendDeposit(uuid, symbol, deposit)
42+
deposited = true
4243
}
4344
}
44-
walletSyncRecordHandler.saveWalletSyncRecord(
45-
WalletSyncRecord(
46-
LocalDateTime.now(),
47-
true,
48-
null,
49-
deposit
50-
)
51-
)
45+
Pair(deposit, deposited)
5246
}
53-
}
47+
}.awaitAll()
48+
49+
walletSyncRecordHandler.saveWalletSyncRecord(
50+
WalletSyncRecord(
51+
LocalDateTime.now(),
52+
true,
53+
null
54+
),
55+
result.filter { it.second }.map { it.first },
56+
result.filter { !it.second }.map { it.first }
57+
)
58+
5459
syncSchedulerHandler.prepareScheduleForNextTry(
5560
schedule, LocalDateTime.now()
5661
.plus(schedule.delay, ChronoUnit.SECONDS)

BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/WalletSyncRecordHandler.kt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,14 @@ import co.nilin.opex.bcgateway.core.model.Deposit
44
import co.nilin.opex.bcgateway.core.model.WalletSyncRecord
55

66
interface WalletSyncRecordHandler {
7+
78
suspend fun saveReadyToSyncTransfers(chainName: String, deposits: List<Deposit>)
8-
suspend fun saveWalletSyncRecord(syncRecord: WalletSyncRecord)
9+
10+
suspend fun saveWalletSyncRecord(
11+
syncRecord: WalletSyncRecord,
12+
sentDeposits: List<Deposit>,
13+
deletingDeposits: List<Deposit>
14+
)
15+
916
suspend fun findReadyToSyncTransfers(count: Long?): List<Deposit>
1017
}

BlockchainGateway/bc-gateway-ports/bc-chain-proxy/src/main/kotlin/co.nilin.opex.port.bcgateway.chainproxy/impl/ChainEndpointProxyImpl.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ class ChainEndpointProxyImpl(
2323
private val chain: String,
2424
private val endpoints: List<Endpoint>,
2525
private val webClient: WebClient
26-
) :
27-
ChainEndpointProxy {
26+
) : ChainEndpointProxy {
27+
2828
data class TransfersRequest(
2929
val startBlock: Long?,
3030
val endBlock: Long?,
@@ -47,10 +47,10 @@ class ChainEndpointProxyImpl(
4747

4848
private val logger = LoggerFactory.getLogger(ChainEndpointProxyImpl::class.java)
4949

50-
private suspend fun requestTransferList(baseUrl: String, request: TransfersRequest): DepositResult {
51-
logger.info("request transfers: base=$baseUrl")
50+
private suspend fun requestTransferList(endpoint: String, request: TransfersRequest): DepositResult {
51+
logger.info("request transfers: base=$endpoint")
5252
val response = webClient.post()
53-
.uri(URI.create("$baseUrl/transfers"))
53+
.uri(URI.create(endpoint))
5454
.header("Content-Type", "application/json")
5555
.body(Mono.just(request))
5656
.retrieve()

BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/DemoPostgresConfig.kt

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,46 @@ class DemoPostgresConfig(db: DatabaseClient) {
1111
init {
1212
val initDb = db.sql {
1313
"""
14-
insert into currency values ('BTC', 'Bitcoin') ON CONFLICT DO NOTHING;
15-
insert into chains values ('Bit') ON CONFLICT DO NOTHING;
16-
insert into chains values ('Bsc') ON CONFLICT DO NOTHING;
17-
insert into address_types(id, address_type, address_regex) values (1, 'BTC', '.*') ON CONFLICT DO NOTHING;
18-
insert into address_types(id, address_type, address_regex) values (2, 'ETH', '.*') ON CONFLICT DO NOTHING;
19-
insert into chain_address_types (chain_name, addr_type_id) values ('Bit', 1) ON CONFLICT DO NOTHING;
20-
insert into chain_address_types (chain_name, addr_type_id) values ('Bsc', 2) ON CONFLICT DO NOTHING;
21-
insert into currency_implementations (
22-
id,
23-
symbol,
24-
chain,
25-
token,
26-
token_address,
27-
token_name,
28-
withdraw_enabled,
29-
withdraw_fee,
30-
withdraw_min
31-
)values(1, 'BTC', 'Bit', false, null, null, true, 0.0001, 0.0001)
32-
,(2, 'BTC', 'Bsc', true, '0x1111', 'WBTC', true, 0.00001, 0.000001) ON CONFLICT DO NOTHING;
14+
insert into address_types values
15+
(1, 'bitcoin', '', ''),
16+
(2, 'ethereum', '', '')
17+
ON CONFLICT DO NOTHING;
18+
19+
insert into chains values
20+
('bitcoin-testnet'),
21+
('ethereum-ropsten'),
22+
('bsc-ropsten')
23+
ON CONFLICT DO NOTHING;
24+
25+
insert into chain_address_types values
26+
(1, 'bitcoin-testnet', 1),
27+
(2, 'ethereum-ropsten', 2),
28+
(3, 'bsc-ropsten', 2)
29+
ON CONFLICT DO NOTHING;
30+
31+
insert into currency values
32+
('BTC', 'Bitcoin'),
33+
('ETH', 'Ethereum'),
34+
('USDT', 'Tether')
35+
ON CONFLICT DO NOTHING;
36+
37+
insert into currency_implementations values
38+
(1, 'BTC', 'bitcoin-testnet', false, null, null, true, 0.00001, 0.00001, 0),
39+
(2, 'ETH', 'ethereum-ropsten', false, null, null, true, 0.0001, 0.0001, 18),
40+
(3, 'USDT', 'ethereum-ropsten', true, '0x110a13fc3efe6a245b50102d2d79b3e76125ae83', 'USDT', true, 0.01, 0.01, 6)
41+
ON CONFLICT DO NOTHING;
42+
43+
insert into chain_endpoints (id, chain_name, endpoint_url) values
44+
(1, 'bitcoin-testnet', 'http://host.docker.internal:9990/bitcoin/transfers'),
45+
(2, 'ethereum-ropsten', 'http://host.docker.internal:9990/eth/transfers')
46+
ON CONFLICT DO NOTHING;
47+
48+
insert into chain_sync_schedules values
49+
('bitcoin-testnet', CURRENT_DATE, 600),
50+
('ethereum-ropsten', CURRENT_DATE, 90)
51+
ON CONFLICT DO NOTHING;
52+
53+
insert into wallet_sync_schedules values (1, CURRENT_DATE, 30, 10000) ON CONFLICT DO NOTHING;
3354
"""
3455
}
3556
initDb // initialize the database

BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ interface DepositRepository : ReactiveCrudRepository<DepositModel, Long> {
1616
fun findByChain(chain: String): Flow<DepositModel>
1717

1818
@Query("select * from deposits where hash in (:hash)")
19-
fun findAllByHash(hash: List<String>):Flow<DepositModel>
19+
fun findAllByHash(hash: List<String>): Flow<DepositModel>
2020

2121
@Query("select * from deposits where chain = :chain and wallet_record_id is null")
2222
fun findByChainWhereNotSynced(chain: String): Flow<DepositModel>
@@ -28,6 +28,10 @@ interface DepositRepository : ReactiveCrudRepository<DepositModel, Long> {
2828
@Query("update deposits set wallet_record_id = :walletRecordId where id = :id")
2929
fun updateWalletSyncRecord(id: Long, walletRecordId: Long): Mono<Int>
3030

31+
@Modifying
32+
@Query("update deposits set wallet_record_id = :walletRecordId where id in (:ids)")
33+
fun updateWalletSyncRecords(ids: List<Long>, walletRecordId: Long): Mono<Int>
34+
3135
@Modifying
3236
@Query(
3337
"""
@@ -54,4 +58,8 @@ interface DepositRepository : ReactiveCrudRepository<DepositModel, Long> {
5458
memo: String?
5559
): Mono<DepositModel>
5660

61+
@Modifying
62+
@Query("delete from deposits where id in (:ids)")
63+
fun deleteSyncedDeposits(ids: List<Long>): Mono<Int>
64+
5765
}

BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncRecordHandlerImpl.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,13 @@ class ChainSyncRecordHandlerImpl(
4141

4242
@Transactional
4343
override suspend fun saveSyncRecord(syncRecord: ChainSyncRecord) {
44+
val currentRecord = chainSyncRecordRepository.findByChain(syncRecord.chainName).awaitSingleOrNull()
4445
val chainSyncRecordDao =
4546
ChainSyncRecordModel(
4647
syncRecord.chainName,
4748
syncRecord.time,
4849
syncRecord.endpoint.url,
49-
syncRecord.latestBlock,
50+
syncRecord.latestBlock ?: currentRecord?.latestBlock,
5051
syncRecord.success,
5152
syncRecord.error
5253
)

BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@ class WalletSyncRecordHandlerImpl(
3636
}
3737

3838
@Transactional
39-
override suspend fun saveWalletSyncRecord(syncRecord: WalletSyncRecord) {
39+
override suspend fun saveWalletSyncRecord(
40+
syncRecord: WalletSyncRecord,
41+
sentDeposits: List<Deposit>,
42+
deletingDeposits: List<Deposit>
43+
) {
4044
val dao = walletSyncRecordRepository.save(
4145
WalletSyncRecordModel(
4246
null,
@@ -45,7 +49,12 @@ class WalletSyncRecordHandlerImpl(
4549
syncRecord.error
4650
)
4751
).awaitFirst()
48-
depositRepository.updateWalletSyncRecord(syncRecord.deposit.id!!, dao.id!!).awaitFirst()
52+
53+
if (sentDeposits.isNotEmpty())
54+
depositRepository.updateWalletSyncRecords(sentDeposits.map { it.id ?: -1 }, dao.id!!).awaitFirst()
55+
56+
if (deletingDeposits.isNotEmpty())
57+
depositRepository.deleteSyncedDeposits(deletingDeposits.map { it.id ?: -1 }).awaitFirst()
4958
}
5059

5160
override suspend fun findReadyToSyncTransfers(count: Long?): List<Deposit> {

0 commit comments

Comments
 (0)