Skip to content

Commit ec162e4

Browse files
Merge pull request #277 from opexdev/dev
Release v1.0.0-beta.2
2 parents c527251 + 00d8c0b commit ec162e4

10 files changed

Lines changed: 606 additions & 621 deletions

File tree

Lines changed: 79 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,79 +1,79 @@
1-
package co.nilin.opex.bcgateway.core.service
2-
3-
import co.nilin.opex.bcgateway.core.api.WalletSyncService
4-
import co.nilin.opex.bcgateway.core.model.CurrencyImplementation
5-
import co.nilin.opex.bcgateway.core.model.Deposit
6-
import co.nilin.opex.bcgateway.core.model.WalletSyncRecord
7-
import co.nilin.opex.bcgateway.core.spi.*
8-
import kotlinx.coroutines.ExecutorCoroutineDispatcher
9-
import kotlinx.coroutines.async
10-
import kotlinx.coroutines.awaitAll
11-
import kotlinx.coroutines.withContext
12-
import org.slf4j.LoggerFactory
13-
import java.math.BigDecimal
14-
import java.time.LocalDateTime
15-
import java.time.temporal.ChronoUnit
16-
import kotlin.coroutines.coroutineContext
17-
18-
class WalletSyncServiceImpl(
19-
private val syncSchedulerHandler: WalletSyncSchedulerHandler,
20-
private val walletProxy: WalletProxy,
21-
private val walletSyncRecordHandler: WalletSyncRecordHandler,
22-
private val assignedAddressHandler: AssignedAddressHandler,
23-
private val currencyHandler: CurrencyHandler,
24-
private val dispatcher: ExecutorCoroutineDispatcher
25-
) : WalletSyncService {
26-
27-
private val logger = LoggerFactory.getLogger(ChainSyncServiceImpl::class.java)
28-
29-
override suspend fun startSyncWithWallet() {
30-
withContext(coroutineContext) {
31-
val schedule = syncSchedulerHandler.fetchActiveSchedule(LocalDateTime.now())
32-
if (schedule != null) {
33-
val deposits = walletSyncRecordHandler.findReadyToSyncTransfers(schedule.batchSize)
34-
logger.info("syncing ${deposits.size} deposits")
35-
36-
val result = deposits.map { deposit ->
37-
async(dispatcher) {
38-
var deposited = false
39-
val uuid = assignedAddressHandler.findUuid(
40-
deposit.depositor,
41-
deposit.depositorMemo?.lowercase()
42-
)
43-
if (uuid != null) {
44-
logger.info("deposit came for $uuid - to ${deposit.depositor}")
45-
val symbol = currencyHandler.findByChainAndTokenAddress(deposit.chain, deposit.tokenAddress)
46-
if (symbol != null) {
47-
sendDeposit(uuid, symbol, deposit)
48-
deposited = true
49-
}
50-
}
51-
Pair(deposit, deposited)
52-
}
53-
}.awaitAll()
54-
55-
walletSyncRecordHandler.saveWalletSyncRecord(
56-
WalletSyncRecord(
57-
LocalDateTime.now(),
58-
true,
59-
null
60-
),
61-
result.filter { it.second }.map { it.first },
62-
result.filter { !it.second }.map { it.first }
63-
)
64-
65-
syncSchedulerHandler.prepareScheduleForNextTry(
66-
schedule, LocalDateTime.now()
67-
.plus(schedule.delay, ChronoUnit.SECONDS)
68-
)
69-
}
70-
}
71-
}
72-
73-
private suspend fun sendDeposit(uuid: String, currencyImpl: CurrencyImplementation, deposit: Deposit) {
74-
val amount = deposit.amount.divide(BigDecimal(10).pow(currencyImpl.decimal))
75-
val symbol = currencyImpl.currency.symbol
76-
logger.info("sending deposit to $uuid - $amount $symbol")
77-
walletProxy.transfer(uuid, symbol, amount, deposit.hash)
78-
}
79-
}
1+
package co.nilin.opex.bcgateway.core.service
2+
3+
import co.nilin.opex.bcgateway.core.api.WalletSyncService
4+
import co.nilin.opex.bcgateway.core.model.CurrencyImplementation
5+
import co.nilin.opex.bcgateway.core.model.Deposit
6+
import co.nilin.opex.bcgateway.core.model.WalletSyncRecord
7+
import co.nilin.opex.bcgateway.core.spi.*
8+
import kotlinx.coroutines.ExecutorCoroutineDispatcher
9+
import kotlinx.coroutines.async
10+
import kotlinx.coroutines.awaitAll
11+
import kotlinx.coroutines.withContext
12+
import org.slf4j.LoggerFactory
13+
import java.math.BigDecimal
14+
import java.time.LocalDateTime
15+
import java.time.temporal.ChronoUnit
16+
import kotlin.coroutines.coroutineContext
17+
18+
class WalletSyncServiceImpl(
19+
private val syncSchedulerHandler: WalletSyncSchedulerHandler,
20+
private val walletProxy: WalletProxy,
21+
private val walletSyncRecordHandler: WalletSyncRecordHandler,
22+
private val assignedAddressHandler: AssignedAddressHandler,
23+
private val currencyHandler: CurrencyHandler,
24+
private val dispatcher: ExecutorCoroutineDispatcher
25+
) : WalletSyncService {
26+
27+
private val logger = LoggerFactory.getLogger(ChainSyncServiceImpl::class.java)
28+
29+
override suspend fun startSyncWithWallet() {
30+
withContext(coroutineContext) {
31+
val schedule = syncSchedulerHandler.fetchActiveSchedule(LocalDateTime.now())
32+
if (schedule != null) {
33+
val deposits = walletSyncRecordHandler.findReadyToSyncTransfers(schedule.batchSize)
34+
logger.info("syncing ${deposits.size} deposits")
35+
36+
val result = deposits.map { deposit ->
37+
async(dispatcher) {
38+
var deposited = false
39+
val uuid = assignedAddressHandler.findUuid(
40+
deposit.depositor,
41+
deposit.depositorMemo
42+
)
43+
if (uuid != null) {
44+
logger.info("deposit came for $uuid - to ${deposit.depositor}")
45+
val symbol = currencyHandler.findByChainAndTokenAddress(deposit.chain, deposit.tokenAddress)
46+
if (symbol != null) {
47+
sendDeposit(uuid, symbol, deposit)
48+
deposited = true
49+
}
50+
}
51+
Pair(deposit, deposited)
52+
}
53+
}.awaitAll()
54+
55+
walletSyncRecordHandler.saveWalletSyncRecord(
56+
WalletSyncRecord(
57+
LocalDateTime.now(),
58+
true,
59+
null
60+
),
61+
result.filter { it.second }.map { it.first },
62+
result.filter { !it.second }.map { it.first }
63+
)
64+
65+
syncSchedulerHandler.prepareScheduleForNextTry(
66+
schedule, LocalDateTime.now()
67+
.plus(schedule.delay, ChronoUnit.SECONDS)
68+
)
69+
}
70+
}
71+
}
72+
73+
private suspend fun sendDeposit(uuid: String, currencyImpl: CurrencyImplementation, deposit: Deposit) {
74+
val amount = deposit.amount.divide(BigDecimal(10).pow(currencyImpl.decimal))
75+
val symbol = currencyImpl.currency.symbol
76+
logger.info("sending deposit to $uuid - $amount $symbol")
77+
walletProxy.transfer(uuid, symbol, amount, deposit.hash)
78+
}
79+
}
Original file line numberDiff line numberDiff line change
@@ -1,66 +1,66 @@
1-
package co.nilin.opex.bcgateway.ports.postgres.impl
2-
3-
import co.nilin.opex.bcgateway.core.model.AddressType
4-
import co.nilin.opex.bcgateway.core.model.AssignedAddress
5-
import co.nilin.opex.bcgateway.core.spi.AssignedAddressHandler
6-
import co.nilin.opex.bcgateway.core.spi.ChainLoader
7-
import co.nilin.opex.bcgateway.ports.postgres.dao.AddressTypeRepository
8-
import co.nilin.opex.bcgateway.ports.postgres.dao.AssignedAddressChainRepository
9-
import co.nilin.opex.bcgateway.ports.postgres.dao.AssignedAddressRepository
10-
import co.nilin.opex.bcgateway.ports.postgres.model.AssignedAddressModel
11-
import kotlinx.coroutines.flow.map
12-
import kotlinx.coroutines.flow.toList
13-
import kotlinx.coroutines.reactive.awaitFirst
14-
import kotlinx.coroutines.reactive.awaitFirstOrNull
15-
import org.springframework.stereotype.Service
16-
17-
@Service
18-
class AssignedAddressHandlerImpl(
19-
val assignedAddressRepository: AssignedAddressRepository,
20-
val addressTypeRepository: AddressTypeRepository,
21-
val assignedAddressChainRepository: AssignedAddressChainRepository,
22-
val chainLoader: ChainLoader
23-
) : AssignedAddressHandler {
24-
override suspend fun fetchAssignedAddresses(user: String, addressTypes: List<AddressType>): List<AssignedAddress> {
25-
if (addressTypes.isEmpty()) return emptyList()
26-
return assignedAddressRepository.findByUuidAndAddressType(
27-
user, addressTypes.map(AddressType::id)
28-
)
29-
.map { model ->
30-
AssignedAddress(
31-
model.uuid, model.address, model.memo,
32-
addressTypeRepository
33-
.findById(model.addressTypeId)
34-
.map { aam ->
35-
AddressType(aam.id!!, aam.type, aam.addressRegex, aam.memoRegex)
36-
}
37-
.awaitFirst(),
38-
assignedAddressChainRepository.findByAssignedAddress(model.id!!)
39-
.map { cm ->
40-
chainLoader.fetchChainInfo(cm.chain)
41-
}
42-
.toList().toMutableList()
43-
)
44-
}.toList()
45-
}
46-
47-
override suspend fun persist(assignedAddress: AssignedAddress) {
48-
try {
49-
assignedAddressRepository.save(
50-
AssignedAddressModel(
51-
null,
52-
assignedAddress.uuid,
53-
assignedAddress.address,
54-
assignedAddress.memo?.lowercase(),
55-
assignedAddress.type.id
56-
)
57-
).awaitFirst()
58-
} catch (e: Exception) {
59-
60-
}
61-
}
62-
63-
override suspend fun findUuid(address: String, memo: String?): String? {
64-
return assignedAddressRepository.findByAddressAndMemo(address, memo).awaitFirstOrNull()?.uuid
65-
}
1+
package co.nilin.opex.bcgateway.ports.postgres.impl
2+
3+
import co.nilin.opex.bcgateway.core.model.AddressType
4+
import co.nilin.opex.bcgateway.core.model.AssignedAddress
5+
import co.nilin.opex.bcgateway.core.spi.AssignedAddressHandler
6+
import co.nilin.opex.bcgateway.core.spi.ChainLoader
7+
import co.nilin.opex.bcgateway.ports.postgres.dao.AddressTypeRepository
8+
import co.nilin.opex.bcgateway.ports.postgres.dao.AssignedAddressChainRepository
9+
import co.nilin.opex.bcgateway.ports.postgres.dao.AssignedAddressRepository
10+
import co.nilin.opex.bcgateway.ports.postgres.model.AssignedAddressModel
11+
import kotlinx.coroutines.flow.map
12+
import kotlinx.coroutines.flow.toList
13+
import kotlinx.coroutines.reactive.awaitFirst
14+
import kotlinx.coroutines.reactive.awaitFirstOrNull
15+
import org.springframework.stereotype.Service
16+
17+
@Service
18+
class AssignedAddressHandlerImpl(
19+
val assignedAddressRepository: AssignedAddressRepository,
20+
val addressTypeRepository: AddressTypeRepository,
21+
val assignedAddressChainRepository: AssignedAddressChainRepository,
22+
val chainLoader: ChainLoader
23+
) : AssignedAddressHandler {
24+
override suspend fun fetchAssignedAddresses(user: String, addressTypes: List<AddressType>): List<AssignedAddress> {
25+
if (addressTypes.isEmpty()) return emptyList()
26+
return assignedAddressRepository.findByUuidAndAddressType(
27+
user, addressTypes.map(AddressType::id)
28+
)
29+
.map { model ->
30+
AssignedAddress(
31+
model.uuid, model.address, model.memo,
32+
addressTypeRepository
33+
.findById(model.addressTypeId)
34+
.map { aam ->
35+
AddressType(aam.id!!, aam.type, aam.addressRegex, aam.memoRegex)
36+
}
37+
.awaitFirst(),
38+
assignedAddressChainRepository.findByAssignedAddress(model.id!!)
39+
.map { cm ->
40+
chainLoader.fetchChainInfo(cm.chain)
41+
}
42+
.toList().toMutableList()
43+
)
44+
}.toList()
45+
}
46+
47+
override suspend fun persist(assignedAddress: AssignedAddress) {
48+
try {
49+
assignedAddressRepository.save(
50+
AssignedAddressModel(
51+
null,
52+
assignedAddress.uuid,
53+
assignedAddress.address,
54+
assignedAddress.memo,
55+
assignedAddress.type.id
56+
)
57+
).awaitFirst()
58+
} catch (e: Exception) {
59+
60+
}
61+
}
62+
63+
override suspend fun findUuid(address: String, memo: String?): String? {
64+
return assignedAddressRepository.findByAddressAndMemo(address, memo).awaitFirstOrNull()?.uuid
65+
}
6666
}

0 commit comments

Comments
 (0)