Skip to content

Commit be44f2b

Browse files
authored
Merge pull request #83 from opexdev/iss82-implement-chain-scan-request
Close #82, Implement chain scan request
2 parents a8f8ad2 + 5b503f6 commit be44f2b

3 files changed

Lines changed: 79 additions & 5 deletions

File tree

BlockchainGateway/bc-gateway-ports/bc-chain-proxy/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@
5151
<artifactId>bc-gateway-core</artifactId>
5252
<version>${bc-gateway.version}</version>
5353
</dependency>
54+
<dependency>
55+
<groupId>org.springframework</groupId>
56+
<artifactId>spring-webflux</artifactId>
57+
</dependency>
5458
</dependencies>
5559

5660
<build>
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,80 @@
11
package co.nilin.opex.port.bcgateway.chainproxy.impl
22

33
import co.nilin.opex.bcgateway.core.model.ChainSyncRecord
4+
import co.nilin.opex.bcgateway.core.model.Deposit
45
import co.nilin.opex.bcgateway.core.model.Endpoint
56
import co.nilin.opex.bcgateway.core.spi.ChainEndpointProxy
6-
import org.springframework.stereotype.Component
7+
import kotlinx.coroutines.reactive.awaitFirstOrElse
8+
import org.springframework.core.ParameterizedTypeReference
9+
import org.springframework.web.reactive.function.client.WebClient
10+
import org.springframework.web.reactive.function.client.WebClientResponseException
11+
import reactor.core.publisher.Mono
12+
import java.math.BigDecimal
13+
import java.net.URI
14+
import java.time.LocalDateTime
15+
16+
inline fun <reified T : Any> typeRef(): ParameterizedTypeReference<T> = object : ParameterizedTypeReference<T>() {}
17+
18+
class ChainEndpointProxyImpl(
19+
private val chain: String,
20+
private val endpoints: List<Endpoint>,
21+
private val webClient: WebClient
22+
) :
23+
ChainEndpointProxy {
24+
data class TransfersRequest(
25+
val startBlock: Long?,
26+
val endBlock: Long?,
27+
val addresses: List<String>?
28+
)
29+
30+
data class Transfer(
31+
var txHash: String,
32+
var from: String,
33+
var to: String,
34+
var isTokenTransfer: Boolean,
35+
var token: String? = null,
36+
var amount: BigDecimal
37+
)
38+
39+
private suspend fun requestTransferList(baseUrl: String, request: TransfersRequest): List<Deposit> {
40+
return webClient.post()
41+
.uri(URI.create("$baseUrl/transfers"))
42+
.header("Content-Type", "application/json")
43+
.body(Mono.just(request), TransfersRequest::class.java)
44+
.retrieve()
45+
.onStatus({ t -> t.isError }, { it.createException() })
46+
.bodyToFlux(typeRef<Transfer>())
47+
.log().map { Deposit(null, it.to, null, it.amount, chain, it.isTokenTransfer, it.token) }
48+
.collectList()
49+
.awaitFirstOrElse { emptyList() }
50+
}
51+
52+
private suspend fun roundRobin(i: Int, request: TransfersRequest): ChainSyncRecord {
53+
return try {
54+
val deposits =
55+
requestTransferList(
56+
endpoints[i].url,
57+
request
58+
)
59+
ChainSyncRecord(chain, LocalDateTime.now(), endpoints[i], request.endBlock, true, null, deposits)
60+
} catch (error: WebClientResponseException) {
61+
if (i < endpoints.size - 1) {
62+
roundRobin(i + 1, request)
63+
} else {
64+
ChainSyncRecord(
65+
chain,
66+
LocalDateTime.now(),
67+
endpoints[i],
68+
request.endBlock,
69+
false,
70+
error.message,
71+
emptyList()
72+
)
73+
}
74+
}
75+
}
776

8-
class ChainEndpointProxyImpl(private val endpoints: List<Endpoint>) : ChainEndpointProxy {
977
override suspend fun syncTransfers(filter: ChainEndpointProxy.DepositFilter): ChainSyncRecord {
10-
TODO("Not yet implemented")
78+
return roundRobin(0, TransfersRequest(filter.startBlock, filter.endBlock, filter.tokenAddresses))
1179
}
1280
}

BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainEndpointProxyFinderImpl.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ import co.nilin.opex.port.bcgateway.postgres.dao.ChainRepository
88
import kotlinx.coroutines.flow.map
99
import kotlinx.coroutines.flow.toList
1010
import org.springframework.stereotype.Component
11+
import org.springframework.web.reactive.function.client.WebClient
1112

1213
@Component
13-
class ChainEndpointProxyFinderImpl(private val chainRepository: ChainRepository) : ChainEndpointProxyFinder {
14+
class ChainEndpointProxyFinderImpl(private val chainRepository: ChainRepository, private val webClient: WebClient) :
15+
ChainEndpointProxyFinder {
1416
override suspend fun findChainEndpointProxy(chainName: String): ChainEndpointProxy {
1517
val endpoints = chainRepository.findEndpointsByName(chainName).map { Endpoint(it.url) }.toList()
16-
return ChainEndpointProxyImpl(endpoints)
18+
return ChainEndpointProxyImpl(chainName, endpoints,webClient)
1719
}
1820
}

0 commit comments

Comments
 (0)