Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@ package co.nilin.opex.bcgateway.core.model

import java.time.LocalDateTime

data class WalletSyncSchedule(val retryTime: LocalDateTime, val delay: Long, val batchSize: Long?)
data class WalletSyncSchedule(
val retryTime: LocalDateTime,
val delay: Long,
val batchSize: Long?
)

data class WalletSyncRecord(
val time: LocalDateTime, val success: Boolean, val error: String?, val deposit: Deposit
val time: LocalDateTime,
val success: Boolean,
val error: String?
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import co.nilin.opex.bcgateway.core.model.CurrencyImplementation
import co.nilin.opex.bcgateway.core.model.Deposit
import co.nilin.opex.bcgateway.core.model.WalletSyncRecord
import co.nilin.opex.bcgateway.core.spi.*
import kotlinx.coroutines.ExecutorCoroutineDispatcher
import kotlinx.coroutines.async
import kotlinx.coroutines.withContext
import kotlinx.coroutines.*
import org.slf4j.LoggerFactory
import java.math.BigDecimal
import java.time.LocalDateTime
Expand All @@ -31,26 +29,33 @@ class WalletSyncServiceImpl(
if (schedule != null) {
val deposits = walletSyncRecordHandler.findReadyToSyncTransfers(schedule.batchSize)
logger.info("syncing ${deposits.size} deposits")
deposits.map { deposit ->

val result = deposits.map { deposit ->
async(dispatcher) {
var deposited = false
val uuid = assignedAddressHandler.findUuid(deposit.depositor, deposit.depositorMemo)
if (uuid != null) {
logger.info("deposit came for $uuid - to ${deposit.depositor}")
val symbol = currencyLoader.findByChainAndTokenAddress(deposit.chain, deposit.tokenAddress)
if (symbol != null) {
sendDeposit(uuid, symbol, deposit)
deposited = true
}
}
walletSyncRecordHandler.saveWalletSyncRecord(
WalletSyncRecord(
LocalDateTime.now(),
true,
null,
deposit
)
)
Pair(deposit, deposited)
}
}
}.awaitAll()

walletSyncRecordHandler.saveWalletSyncRecord(
WalletSyncRecord(
LocalDateTime.now(),
true,
null
),
result.filter { it.second }.map { it.first },
result.filter { !it.second }.map { it.first }
)

syncSchedulerHandler.prepareScheduleForNextTry(
schedule, LocalDateTime.now()
.plus(schedule.delay, ChronoUnit.SECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@ import co.nilin.opex.bcgateway.core.model.Deposit
import co.nilin.opex.bcgateway.core.model.WalletSyncRecord

interface WalletSyncRecordHandler {

suspend fun saveReadyToSyncTransfers(chainName: String, deposits: List<Deposit>)
suspend fun saveWalletSyncRecord(syncRecord: WalletSyncRecord)

suspend fun saveWalletSyncRecord(
syncRecord: WalletSyncRecord,
sentDeposits: List<Deposit>,
deletingDeposits: List<Deposit>
)

suspend fun findReadyToSyncTransfers(count: Long?): List<Deposit>
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ class ChainEndpointProxyImpl(
private val chain: String,
private val endpoints: List<Endpoint>,
private val webClient: WebClient
) :
ChainEndpointProxy {
) : ChainEndpointProxy {

data class TransfersRequest(
val startBlock: Long?,
val endBlock: Long?,
Expand All @@ -47,10 +47,10 @@ class ChainEndpointProxyImpl(

private val logger = LoggerFactory.getLogger(ChainEndpointProxyImpl::class.java)

private suspend fun requestTransferList(baseUrl: String, request: TransfersRequest): DepositResult {
logger.info("request transfers: base=$baseUrl")
private suspend fun requestTransferList(endpoint: String, request: TransfersRequest): DepositResult {
logger.info("request transfers: base=$endpoint")
val response = webClient.post()
.uri(URI.create("$baseUrl/transfers"))
.uri(URI.create(endpoint))
.header("Content-Type", "application/json")
.body(Mono.just(request))
.retrieve()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,46 @@ class DemoPostgresConfig(db: DatabaseClient) {
init {
val initDb = db.sql {
"""
insert into currency values ('BTC', 'Bitcoin') ON CONFLICT DO NOTHING;
insert into chains values ('Bit') ON CONFLICT DO NOTHING;
insert into chains values ('Bsc') ON CONFLICT DO NOTHING;
insert into address_types(id, address_type, address_regex) values (1, 'BTC', '.*') ON CONFLICT DO NOTHING;
insert into address_types(id, address_type, address_regex) values (2, 'ETH', '.*') ON CONFLICT DO NOTHING;
insert into chain_address_types (chain_name, addr_type_id) values ('Bit', 1) ON CONFLICT DO NOTHING;
insert into chain_address_types (chain_name, addr_type_id) values ('Bsc', 2) ON CONFLICT DO NOTHING;
insert into currency_implementations (
id,
symbol,
chain,
token,
token_address,
token_name,
withdraw_enabled,
withdraw_fee,
withdraw_min
)values(1, 'BTC', 'Bit', false, null, null, true, 0.0001, 0.0001)
,(2, 'BTC', 'Bsc', true, '0x1111', 'WBTC', true, 0.00001, 0.000001) ON CONFLICT DO NOTHING;
insert into address_types values
(1, 'bitcoin', '', ''),
(2, 'ethereum', '', '')
ON CONFLICT DO NOTHING;

insert into chains values
('bitcoin-testnet'),
('ethereum-ropsten'),
('bsc-ropsten')
ON CONFLICT DO NOTHING;

insert into chain_address_types values
(1, 'bitcoin-testnet', 1),
(2, 'ethereum-ropsten', 2),
(3, 'bsc-ropsten', 2)
ON CONFLICT DO NOTHING;

insert into currency values
('BTC', 'Bitcoin'),
('ETH', 'Ethereum'),
('USDT', 'Tether')
ON CONFLICT DO NOTHING;

insert into currency_implementations values
(1, 'BTC', 'bitcoin-testnet', false, null, null, true, 0.00001, 0.00001, 0),
(2, 'ETH', 'ethereum-ropsten', false, null, null, true, 0.0001, 0.0001, 18),
(3, 'USDT', 'ethereum-ropsten', true, '0x110a13fc3efe6a245b50102d2d79b3e76125ae83', 'USDT', true, 0.01, 0.01, 6)
ON CONFLICT DO NOTHING;

insert into chain_endpoints (id, chain_name, endpoint_url) values
(1, 'bitcoin-testnet', 'http://host.docker.internal:9990/bitcoin/transfers'),
(2, 'ethereum-ropsten', 'http://host.docker.internal:9990/eth/transfers')
ON CONFLICT DO NOTHING;

insert into chain_sync_schedules values
('bitcoin-testnet', CURRENT_DATE, 600),
('ethereum-ropsten', CURRENT_DATE, 90)
ON CONFLICT DO NOTHING;

insert into wallet_sync_schedules values (1, CURRENT_DATE, 30, 10000) ON CONFLICT DO NOTHING;
"""
}
initDb // initialize the database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ interface DepositRepository : ReactiveCrudRepository<DepositModel, Long> {
fun findByChain(chain: String): Flow<DepositModel>

@Query("select * from deposits where hash in (:hash)")
fun findAllByHash(hash: List<String>):Flow<DepositModel>
fun findAllByHash(hash: List<String>): Flow<DepositModel>

@Query("select * from deposits where chain = :chain and wallet_record_id is null")
fun findByChainWhereNotSynced(chain: String): Flow<DepositModel>
Expand All @@ -28,6 +28,10 @@ interface DepositRepository : ReactiveCrudRepository<DepositModel, Long> {
@Query("update deposits set wallet_record_id = :walletRecordId where id = :id")
fun updateWalletSyncRecord(id: Long, walletRecordId: Long): Mono<Int>

@Modifying
@Query("update deposits set wallet_record_id = :walletRecordId where id in (:ids)")
fun updateWalletSyncRecords(ids: List<Long>, walletRecordId: Long): Mono<Int>

@Modifying
@Query(
"""
Expand All @@ -54,4 +58,8 @@ interface DepositRepository : ReactiveCrudRepository<DepositModel, Long> {
memo: String?
): Mono<DepositModel>

@Modifying
@Query("delete from deposits where id in (:ids)")
fun deleteSyncedDeposits(ids: List<Long>): Mono<Int>

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ class ChainSyncRecordHandlerImpl(

@Transactional
override suspend fun saveSyncRecord(syncRecord: ChainSyncRecord) {
val currentRecord = chainSyncRecordRepository.findByChain(syncRecord.chainName).awaitSingleOrNull()
val chainSyncRecordDao =
ChainSyncRecordModel(
syncRecord.chainName,
syncRecord.time,
syncRecord.endpoint.url,
syncRecord.latestBlock,
syncRecord.latestBlock ?: currentRecord?.latestBlock,
syncRecord.success,
syncRecord.error
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ class WalletSyncRecordHandlerImpl(
}

@Transactional
override suspend fun saveWalletSyncRecord(syncRecord: WalletSyncRecord) {
override suspend fun saveWalletSyncRecord(
syncRecord: WalletSyncRecord,
sentDeposits: List<Deposit>,
deletingDeposits: List<Deposit>
) {
val dao = walletSyncRecordRepository.save(
WalletSyncRecordModel(
null,
Expand All @@ -45,7 +49,12 @@ class WalletSyncRecordHandlerImpl(
syncRecord.error
)
).awaitFirst()
depositRepository.updateWalletSyncRecord(syncRecord.deposit.id!!, dao.id!!).awaitFirst()

if (sentDeposits.isNotEmpty())
depositRepository.updateWalletSyncRecords(sentDeposits.map { it.id ?: -1 }, dao.id!!).awaitFirst()

if (deletingDeposits.isNotEmpty())
depositRepository.deleteSyncedDeposits(deletingDeposits.map { it.id ?: -1 }).awaitFirst()
}

override suspend fun findReadyToSyncTransfers(count: Long?): List<Deposit> {
Expand Down