Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class AppConfig {
chainEndpointProxyFinder: ChainEndpointProxyFinder,
chainSyncRecordHandler: ChainSyncRecordHandler,
walletSyncRecordHandler: WalletSyncRecordHandler,
chainSyncRetryHandler: ChainSyncRetryHandler,
currencyLoader: CurrencyLoader,
operator: TransactionalOperator
): ChainSyncService {
Expand All @@ -40,6 +41,7 @@ class AppConfig {
chainEndpointProxyFinder,
chainSyncRecordHandler,
walletSyncRecordHandler,
chainSyncRetryHandler,
currencyLoader,
operator,
AppDispatchers.chainSyncExecutor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.time.LocalDateTime

data class Endpoint(val url: String)
data class Chain(val name: String, val addressTypes: List<AddressType>, val endpoints: List<Endpoint>)
data class ChainSyncSchedule(val chainName: String, val retryTime: LocalDateTime, val delay: Long)
data class ChainSyncSchedule(val chainName: String, val retryTime: LocalDateTime, val delay: Long, val errorDelay: Long)
data class ChainSyncRecord(
val chainName: String,
val time: LocalDateTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ open class ChainSyncServiceImpl(
private val chainEndpointProxyFinder: ChainEndpointProxyFinder,
private val chainSyncRecordHandler: ChainSyncRecordHandler,
private val walletSyncRecordHandler: WalletSyncRecordHandler,
private val chainSyncRetryHandler: ChainSyncRetryHandler,
private val currencyLoader: CurrencyLoader,
private val operator: TransactionalOperator,
private val dispatcher: ExecutorCoroutineDispatcher
Expand All @@ -28,28 +29,31 @@ open class ChainSyncServiceImpl(
withContext(coroutineContext) {
val schedules = chainSyncSchedulerHandler.fetchActiveSchedules(currentTime())
schedules.map { syncSchedule ->
logger.info("chain syncing for: ${syncSchedule.chainName}")
async(dispatcher) {
val syncHandler = chainEndpointProxyFinder.findChainEndpointProxy(syncSchedule.chainName)
val lastSync = chainSyncRecordHandler.loadLastSuccessRecord(syncSchedule.chainName)
val tokens = currencyLoader.findImplementationsWithTokenOnChain(syncSchedule.chainName)
.map { impl -> impl.tokenAddress ?: "" }
.toList()

logger.info("chain syncing for: ${syncSchedule.chainName} - block: ${lastSync?.latestBlock}")
val syncResult =
syncHandler.syncTransfers(
ChainEndpointProxy.DepositFilter(
lastSync?.latestBlock, null, tokens
)
)

if (syncResult.success)
logger.info("request successful - synced ${syncSchedule.chainName} until ${syncResult.latestBlock}")
else
logger.info("request failed - ${syncResult.error}")

operator.executeAndAwait {
walletSyncRecordHandler.saveReadyToSyncTransfers(syncResult.chainName, syncResult.records)
chainSyncRecordHandler.saveSyncRecord(syncResult)
if (syncResult.success) {
chainSyncSchedulerHandler.prepareScheduleForNextTry(
syncSchedule,
currentTime().plus(syncSchedule.delay, ChronoUnit.SECONDS)
)
}
chainSyncSchedulerHandler.prepareScheduleForNextTry(syncSchedule, syncResult.success)
chainSyncRetryHandler.handleNextTry(syncSchedule, syncResult, lastSync?.latestBlock ?: 0)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package co.nilin.opex.bcgateway.core.spi

import co.nilin.opex.bcgateway.core.model.ChainSyncRecord
import co.nilin.opex.bcgateway.core.model.ChainSyncSchedule

interface ChainSyncRetryHandler {

suspend fun handleNextTry(syncSchedule: ChainSyncSchedule, records: ChainSyncRecord, sentBlock:Long)

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ import java.time.LocalDateTime

interface ChainSyncSchedulerHandler {
suspend fun fetchActiveSchedules(time: LocalDateTime): List<ChainSyncSchedule>
suspend fun prepareScheduleForNextTry(syncSchedule: ChainSyncSchedule, time: LocalDateTime)
suspend fun prepareScheduleForNextTry(syncSchedule: ChainSyncSchedule, success:Boolean)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import java.util.concurrent.Executors

internal class ChainSyncServiceImplTest {

val ethChain = "ETH_MAINNET"
val bscChain = "BSC_MAINNET"
val time = LocalDateTime.now()
val syncService: ChainSyncServiceImpl
private val ethChain = "ETH_MAINNET"
private val bscChain = "BSC_MAINNET"
private val time = LocalDateTime.now()
private val syncService: ChainSyncServiceImpl

@Mock
lateinit var chainSyncSchedulerHandler: ChainSyncSchedulerHandler
Expand All @@ -35,10 +35,13 @@ internal class ChainSyncServiceImplTest {
@Mock
lateinit var walletSyncRecordHandler: WalletSyncRecordHandler

@Mock
lateinit var chainSyncRetryHandler: ChainSyncRetryHandler

@Mock
lateinit var currencyLoader: CurrencyLoader

val endpointProxy: ChainEndpointProxy = mock()
private val endpointProxy: ChainEndpointProxy = mock()

init {
MockitoAnnotations.openMocks(this)
Expand All @@ -53,6 +56,7 @@ internal class ChainSyncServiceImplTest {
chainEndpointProxyFinder,
chainSyncRecordHandler,
walletSyncRecordHandler,
chainSyncRetryHandler,
currencyLoader,
OPERATOR,
Executors.newFixedThreadPool(2).asCoroutineDispatcher()
Expand Down Expand Up @@ -85,7 +89,7 @@ internal class ChainSyncServiceImplTest {
runBlocking {
//given
val delay = 100L
val syncSchedule = ChainSyncSchedule(ethChain, time, delay)
val syncSchedule = ChainSyncSchedule(ethChain, time, delay, delay)
Mockito.`when`(chainSyncSchedulerHandler.fetchActiveSchedules(any()))
.thenReturn(listOf(syncSchedule))
Mockito.`when`(endpointProxy.syncTransfers(any())).thenReturn(
Expand All @@ -100,10 +104,7 @@ internal class ChainSyncServiceImplTest {
//then
verify(chainSyncRecordHandler).saveSyncRecord(any())
verify(walletSyncRecordHandler).saveReadyToSyncTransfers(any(), any())
verify(chainSyncSchedulerHandler).prepareScheduleForNextTry(
syncSchedule,
time.plus(delay, ChronoUnit.SECONDS)
)
verify(chainSyncSchedulerHandler).prepareScheduleForNextTry(syncSchedule, true)
}
}

Expand All @@ -112,7 +113,7 @@ internal class ChainSyncServiceImplTest {
runBlocking {
//given
val delay = 100L
val syncSchedule = ChainSyncSchedule(ethChain, time, delay)
val syncSchedule = ChainSyncSchedule(ethChain, time, delay, delay)
Mockito.`when`(chainSyncSchedulerHandler.fetchActiveSchedules(any()))
.thenReturn(listOf(syncSchedule))
Mockito.`when`(endpointProxy.syncTransfers(any())).thenReturn(
Expand All @@ -127,7 +128,7 @@ internal class ChainSyncServiceImplTest {
//then
verify(chainSyncRecordHandler).saveSyncRecord(any())
verify(walletSyncRecordHandler).saveReadyToSyncTransfers(any(), any())
verify(chainSyncSchedulerHandler, times(0)).prepareScheduleForNextTry(any(), any())
verify(chainSyncSchedulerHandler).prepareScheduleForNextTry(syncSchedule, false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ class ChainEndpointProxyImpl(
)

data class Transfer(
var txHash: String,
var from: String,
var to: String,
var txHash: String?,
var from: String?,
var to: String?,
var isTokenTransfer: Boolean,
var token: String? = null,
var amount: BigDecimal
Expand All @@ -47,7 +47,6 @@ class ChainEndpointProxyImpl(
private val logger = LoggerFactory.getLogger(ChainEndpointProxyImpl::class.java)

private suspend fun requestTransferList(endpoint: String, request: TransfersRequest): DepositResult {
logger.info("request transfers: base=$endpoint")
val response = webClient.post()
.uri(URI.create(endpoint))
.header("Content-Type", "application/json")
Expand All @@ -60,7 +59,18 @@ class ChainEndpointProxyImpl(
return DepositResult(
response?.latestBlock ?: request.startBlock ?: 0,
response?.transfers
?.map { Deposit(null, it.txHash, it.to, null, it.amount, chain, it.isTokenTransfer, it.token) }
?.map {
Deposit(
null,
it.txHash ?: "",
it.to ?: "",
null,
it.amount,
chain,
it.isTokenTransfer,
it.token
)
}
?: emptyList()
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
package co.nilin.opex.bcgateway.ports.postgres.dao

import co.nilin.opex.bcgateway.ports.postgres.model.ChainSyncRecordModel
import org.springframework.data.r2dbc.repository.Query
import org.springframework.data.repository.reactive.ReactiveCrudRepository
import org.springframework.stereotype.Repository
import reactor.core.publisher.Mono
import java.time.LocalDateTime

@Repository
interface ChainSyncRecordRepository : ReactiveCrudRepository<ChainSyncRecordModel, String> {

@Query("insert into chain_sync_records values(:chain, :time, :endpointUrl, :latestBlock, :success, :error)")
fun insert(
chain: String,
time: LocalDateTime,
endpointUrl: String,
latestBlock: Long?,
success: Boolean,
error: String?
):Mono<ChainSyncRecordModel>

fun findByChain(chain: String): Mono<ChainSyncRecordModel>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package co.nilin.opex.bcgateway.ports.postgres.dao

import co.nilin.opex.bcgateway.ports.postgres.model.ChainSyncRetryModel
import org.springframework.data.r2dbc.repository.Query
import org.springframework.data.repository.reactive.ReactiveCrudRepository
import org.springframework.stereotype.Repository
import reactor.core.publisher.Mono

@Repository
interface ChainSyncRetryRepository : ReactiveCrudRepository<ChainSyncRetryModel, Long> {

@Query("select * from chain_sync_retry where chain = :chain and block = :block")
fun findByChainAndBlock(chain: String, block: Long): Mono<ChainSyncRetryModel?>

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import co.nilin.opex.bcgateway.ports.postgres.model.ChainSyncRecordModel
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.reactive.awaitFirst
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingleOrNull
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional
Expand All @@ -21,15 +22,16 @@ class ChainSyncRecordHandlerImpl(
) : ChainSyncRecordHandler {
override suspend fun loadLastSuccessRecord(chainName: String): ChainSyncRecord? {
val chainSyncRecordDao = chainSyncRecordRepository.findByChain(chainName).awaitSingleOrNull()
return if (chainSyncRecordDao !== null) {
return if (chainSyncRecordDao != null) {
val deposits = depositRepository.findByChainWhereNotSynced(chainName).map {
Deposit(it.id, it.hash, it.depositor, it.depositorMemo, it.amount, it.chain, it.token, it.tokenAddress)
}

ChainSyncRecord(
chainSyncRecordDao.chain,
chainSyncRecordDao.time,
Endpoint(chainSyncRecordDao.endpointUrl),
chainSyncRecordDao.latestBlock,
if (chainSyncRecordDao.latestBlock == null) 0 else chainSyncRecordDao.latestBlock + 1,
chainSyncRecordDao.success,
chainSyncRecordDao.error,
deposits.toList()
Expand All @@ -51,6 +53,17 @@ class ChainSyncRecordHandlerImpl(
syncRecord.success,
syncRecord.error
)
chainSyncRecordRepository.save(chainSyncRecordDao).awaitFirst()

if (currentRecord != null)
chainSyncRecordRepository.save(chainSyncRecordDao).awaitFirst()
else
chainSyncRecordRepository.insert(
syncRecord.chainName,
syncRecord.time,
syncRecord.endpoint.url,
syncRecord.latestBlock ?: currentRecord?.latestBlock,
syncRecord.success,
syncRecord.error
).awaitFirstOrNull()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package co.nilin.opex.bcgateway.ports.postgres.impl

import co.nilin.opex.bcgateway.core.model.ChainSyncRecord
import co.nilin.opex.bcgateway.core.model.ChainSyncSchedule
import co.nilin.opex.bcgateway.core.spi.ChainSyncRetryHandler
import co.nilin.opex.bcgateway.ports.postgres.dao.ChainSyncRecordRepository
import co.nilin.opex.bcgateway.ports.postgres.dao.ChainSyncRetryRepository
import co.nilin.opex.bcgateway.ports.postgres.model.ChainSyncRecordModel
import co.nilin.opex.bcgateway.ports.postgres.model.ChainSyncRetryModel
import kotlinx.coroutines.reactive.awaitFirst
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingleOrNull
import org.springframework.stereotype.Component

@Component
class ChainSyncRetryHandlerImpl(
private val chainSyncRetryRepository: ChainSyncRetryRepository,
private val chainSyncRecordRepository: ChainSyncRecordRepository,
) : ChainSyncRetryHandler {

private val maxRetry = 5

override suspend fun handleNextTry(syncSchedule: ChainSyncSchedule, records: ChainSyncRecord, sentBlock: Long) {
val success = records.success
val chain = syncSchedule.chainName

var retry = chainSyncRetryRepository.findByChainAndBlock(chain, sentBlock).awaitFirstOrNull()
if (success) {
if (retry != null) {
retry.apply {
retries += 1
synced = true
}
chainSyncRetryRepository.save(retry).awaitFirst()
}
} else {
if (retry == null) {
retry = ChainSyncRetryModel(chain, sentBlock, error = records.error)
} else {
val shouldGiveUp = retry.retries >= maxRetry
retry.apply {
retries += 1
error = records.error
giveUp = shouldGiveUp
}
}

chainSyncRetryRepository.save(retry).awaitFirst()

if (retry.giveUp) {
val record = chainSyncRecordRepository.findByChain(chain).awaitSingleOrNull()
if (record != null) {
val chainSyncRecordDao = ChainSyncRecordModel(
records.chainName,
records.time,
records.endpoint.url,
retry.block,
records.success,
records.error
)
chainSyncRecordRepository.save(chainSyncRecordDao).awaitFirst()
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,25 @@ import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.reactive.awaitFirst
import org.springframework.stereotype.Component
import java.time.LocalDateTime
import java.time.temporal.ChronoUnit

@Component
class ChainSyncSchedulerHandlerImpl(private val chainSyncScheduleRepository: ChainSyncScheduleRepository) :
ChainSyncSchedulerHandler {

override suspend fun fetchActiveSchedules(time: LocalDateTime): List<ChainSyncSchedule> {
return chainSyncScheduleRepository.findActiveSchedule(time).map {
ChainSyncSchedule(it.chain, it.retryTime, it.delay)
ChainSyncSchedule(it.chain, it.retryTime, it.delay, it.errorDelay)
}.toList()
}

override suspend fun prepareScheduleForNextTry(syncSchedule: ChainSyncSchedule, time: LocalDateTime) {
val dao = ChainSyncScheduleModel(syncSchedule.chainName, time, syncSchedule.delay)
override suspend fun prepareScheduleForNextTry(syncSchedule: ChainSyncSchedule, success: Boolean) {
val chain = syncSchedule.chainName
val time = LocalDateTime.now().plus(
if (success) syncSchedule.delay else syncSchedule.errorDelay,
ChronoUnit.SECONDS
)
val dao = ChainSyncScheduleModel(chain, time, syncSchedule.delay, syncSchedule.errorDelay)
chainSyncScheduleRepository.save(dao).awaitFirst()
}
}
Loading