Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@ class AccountantController(
val canFulfil = runCatching { walletProxy.canFulfil(currency, "main", uuid, amount) }
.onFailure { logger.error(it.message) }
.getOrElse { false }
val unprocessed = financialActionLoader.countUnprocessed(uuid, currency, SubmitOrderEvent::class.simpleName!!)
return BooleanResponse(unprocessed <= 0 && canFulfil)
if ( canFulfil ) {
val unprocessed = financialActionLoader.countUnprocessed(uuid, currency, SubmitOrderEvent::class.simpleName!!)
return BooleanResponse(unprocessed <= 0 )
} else
return BooleanResponse(false)
}

@GetMapping("/config/{pair}/fee/{direction}-{userLevel}")
Expand Down
41 changes: 40 additions & 1 deletion wallet/wallet-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,47 @@
<artifactId>micrometer-registry-prometheus</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.18.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>1.18.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>r2dbc</artifactId>
<version>1.18.0</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>3.2.6</version>
<type>test-jar</type>
<scope>test</scope>
<classifier>test-binder</classifier>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.9.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.18.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import io.micrometer.core.instrument.MeterRegistry
import org.springframework.boot.actuate.health.HealthComponent
import org.springframework.boot.actuate.health.HealthEndpoint
import org.springframework.boot.actuate.health.SystemHealth
import org.springframework.context.annotation.Profile
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component

@Component
@Profile("!test")
class PrometheusHealthExtension(
private val registry: MeterRegistry,
private val endpoint: HealthEndpoint
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package co.nilin.opex.wallet.app

import org.junit.jupiter.api.Test
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration
import org.springframework.context.annotation.Import
import org.springframework.kafka.test.context.EmbeddedKafka
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ActiveProfiles

@SpringBootTest
@DirtiesContext
@ActiveProfiles("test")
@Import(TestChannelBinderConfiguration::class)
@EmbeddedKafka(partitions = 1, brokerProperties = ["listeners=PLAINTEXT://localhost:9092", "port=9092"])
class WalletAppTest {

@Test
fun contextLoad() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package co.nilin.opex.wallet.app.service

import co.nilin.opex.wallet.core.exc.ConcurrentBalanceChangException
import co.nilin.opex.wallet.core.inout.TransferCommand
import co.nilin.opex.wallet.core.model.Amount
import co.nilin.opex.wallet.core.spi.CurrencyService
import co.nilin.opex.wallet.core.spi.TransferManager
import co.nilin.opex.wallet.core.spi.WalletManager
import co.nilin.opex.wallet.core.spi.WalletOwnerManager
import kotlinx.coroutines.async
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration
import org.springframework.context.annotation.Import
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ActiveProfiles
import java.math.BigDecimal
import java.util.*

@SpringBootTest
@DirtiesContext
@ActiveProfiles("test")
@Import(TestChannelBinderConfiguration::class)
class TransferManagerImplIT {
@Autowired
lateinit var transferManager: TransferManager

@Autowired
lateinit var currencyService: CurrencyService

@Autowired
lateinit var walletManager: WalletManager

@Autowired
lateinit var walletOwnerManager: WalletOwnerManager

val senderWalletType = "main"
val receiverWalletType = "exchange"
val cc = "CC"
val amount = BigDecimal.valueOf(10)
var sourceUuid: String? = null

@BeforeEach
fun setup() {
sourceUuid = UUID.randomUUID().toString()
setupWallets(sourceUuid!!)
}

@Test
fun givenSameSenderWallet_whenConcurrentTransfers_thenSecondTransferFail() {

val block: () -> Unit = {
runBlocking {
val currency = currencyService.getCurrency(cc)!!
val owner = walletOwnerManager.findWalletOwner(sourceUuid!!)
val sourceWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner!!, senderWalletType, currency)
val receiverWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner, receiverWalletType, currency)

launch {
transferManager.transfer(
TransferCommand(
sourceWallet!!,
receiverWallet!!,
Amount(sourceWallet.currency, amount),
"Amount1 ${System.currentTimeMillis()}", "Ref1 ${System.currentTimeMillis()}", emptyMap()
)
)
}
launch {
transferManager.transfer(
TransferCommand(
sourceWallet!!,
receiverWallet!!,
Amount(sourceWallet.currency, amount),
"Amount2 ${System.currentTimeMillis()}", "Ref2 ${System.currentTimeMillis()}", emptyMap()
)
)
}
}
}
try {
block.invoke()
} catch (_: ConcurrentBalanceChangException) {

}
runBlocking {
val currency = currencyService.getCurrency(cc)!!
val owner = walletOwnerManager.findWalletOwner(sourceUuid!!)
val sourceWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner!!, senderWalletType, currency)
val receiverWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner, receiverWalletType, currency)

assertEquals(amount, sourceWallet!!.balance.amount)
assertEquals(amount, receiverWallet!!.balance.amount)
}
}

@Test
fun givenSameReceiverWallet_whenConcurrentTransfers_thenTransfersSuccess() {
runBlocking {
val currency = currencyService.getCurrency(cc)!!
val owner = walletOwnerManager.findWalletOwner(sourceUuid!!)
val receiverWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner!!, receiverWalletType, currency)

val source2Uuid = UUID.randomUUID().toString()
setupWallets(source2Uuid)
val sourceOwner2 = walletOwnerManager.findWalletOwner(source2Uuid)

val t1 = async {
val sourceWallet1 = walletManager.findWalletByOwnerAndCurrencyAndType(owner, senderWalletType, currency)
transferManager.transfer(
TransferCommand(
sourceWallet1!!,
receiverWallet!!,
Amount(sourceWallet1.currency, amount),
"Amount1 ${System.currentTimeMillis()}", "Ref1 ${System.currentTimeMillis()}", emptyMap()
)
)
}
val t2 = async {
val sourceWallet2 = walletManager.findWalletByOwnerAndCurrencyAndType(sourceOwner2!!, senderWalletType, currency)
transferManager.transfer(
TransferCommand(
sourceWallet2!!,
receiverWallet!!,
Amount(sourceWallet2.currency, amount),
"Amount2 ${System.currentTimeMillis()}", "Ref2 ${System.currentTimeMillis()}", emptyMap()
)
)
}
t1.await()
t2.await()

val sourceWallet1Refresh = walletManager.findWalletByOwnerAndCurrencyAndType(owner, senderWalletType, currency)
val sourceWallet2Refresh = walletManager.findWalletByOwnerAndCurrencyAndType(sourceOwner2!!, senderWalletType, currency)
val receiverWalletRefresh = walletManager.findWalletByOwnerAndCurrencyAndType(owner, receiverWalletType, currency)

assertEquals(amount, sourceWallet1Refresh!!.balance.amount)
assertEquals(amount, sourceWallet2Refresh!!.balance.amount)
assertEquals(amount.plus(amount), receiverWalletRefresh!!.balance.amount)
}


}

@Test
fun givenSameSenderWallet_whenSequentialTransfers_thenTransfersSuccess() {
runBlocking {
val currency = currencyService.getCurrency(cc)!!
val owner = walletOwnerManager.findWalletOwner(sourceUuid!!)

async {
val sourceWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner!!, senderWalletType, currency)
val receiverWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner, receiverWalletType, currency)

transferManager.transfer(
TransferCommand(
sourceWallet!!,
receiverWallet!!,
Amount(sourceWallet.currency, amount),
"Amount1 ${System.currentTimeMillis()}", "Ref1 ${System.currentTimeMillis()}", emptyMap()
)
)
}.await()
async {
val sourceWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner!!, senderWalletType, currency)
val receiverWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner, receiverWalletType, currency)

transferManager.transfer(
TransferCommand(
sourceWallet!!,
receiverWallet!!,
Amount(sourceWallet!!.currency, amount),
"Amount2 ${System.currentTimeMillis()}", "Ref2 ${System.currentTimeMillis()}", emptyMap()
)
)
}.await()
val sourceWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner!!, senderWalletType, currency)
val receiverWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner, receiverWalletType, currency)

assertEquals(BigDecimal.ZERO, sourceWallet!!.balance.amount)
assertEquals(amount.plus(amount), receiverWallet!!.balance.amount)
}
}

fun setupWallets(sourceUuid: String) {
runBlocking {
var currency = currencyService.getCurrency(cc)
if (currency == null) {
currencyService.deleteCurrency(cc)
currencyService.addCurrency(cc, cc, BigDecimal.ONE)
currency = currencyService.getCurrency(cc)
}
val sourceOwner = walletOwnerManager.createWalletOwner(sourceUuid, "not set", "")
walletManager.createWallet(sourceOwner, Amount(currency!!, amount.multiply(BigDecimal.valueOf(2))), currency, senderWalletType)
walletManager.createWallet(
sourceOwner,
Amount(currency, BigDecimal.ZERO),
currency,
receiverWalletType
)
}
}
}

58 changes: 58 additions & 0 deletions wallet/wallet-app/src/test/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
server.port: 8080
management:
health:
vault:
enabled: false
endpoints:
web:
base-path: /actuator
exposure:
include: ["health", "metrics"]
endpoint:
health:
show-details: always
metrics:
enabled: true
prometheus:
enabled: false
spring:
jackson:
serialization:
write-dates-as-timestamps: false
application:
name: opex-wallet
main:
allow-bean-definition-overriding: false
allow-circular-references: true
kafka:
bootstrap-servers: ${KAFKA_IP_PORT:localhost:9092}
consumer:
auto-offset-reset: earliest
group-id: wallet
r2dbc:
url: r2dbc:tc:postgresql:///databasename?TC_IMAGE_TAG=9.6.8
initialization-mode: always
cloud:
bootstrap:
enabled: true
discovery:
enabled: false
consul:
enabled: false
config:
enabled: false
vault:
enabled: false
app:
auth:
cert-url: none
system:
uuid: 1
logging:
level:
org.apache.kafka: ERROR
co.nilin: DEBUG
reactor.netty.http.client: DEBUG
swagger.authUrl: ${SWAGGER_AUTH_URL:https://api.opex.dev/auth}/realms/opex/protocol/openid-connect/token
test:
topic: embedded-test-topic
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package co.nilin.opex.wallet.core.exc

class ConcurrentBalanceChangException(override val message: String?): Exception()
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ data class Wallet(
val owner: WalletOwner,
val balance: Amount,
val currency: Currency,
val type: String
val type: String,
val version: Long?
)
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ object VALID {
Amount(CURRENCY, BigDecimal.valueOf(1.5)),
CURRENCY,
WALLET_TYPE_MAIN
, 0
)

val DEST_WALLET_OWNER = SOURCE_WALLET_OWNER.copy(2, "e1950578-ef22-44e4-89f5-0b78feb03e2a")
Expand Down
Loading