From a30fce9b568320cd5729ccf806bf63ed8b776ea3 Mon Sep 17 00:00:00 2001 From: Andrei Bronin Date: Thu, 5 Jun 2025 10:29:45 +0300 Subject: [PATCH 1/3] add newPendingTransactionsWithBody support --- .../ethereum/EthereumEgressSubscription.kt | 59 ++++++++++++++++++- 1 file changed, 58 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumEgressSubscription.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumEgressSubscription.kt index 04ac6c70..08f97bd6 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumEgressSubscription.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumEgressSubscription.kt @@ -1,6 +1,9 @@ package io.emeraldpay.dshackle.upstream.ethereum +import io.emeraldpay.dshackle.Defaults +import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.upstream.Capability +import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.EgressSubscription import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.Selector @@ -9,10 +12,38 @@ import io.emeraldpay.dshackle.upstream.ethereum.hex.Hex32 import io.emeraldpay.dshackle.upstream.ethereum.subscribe.ConnectLogs import io.emeraldpay.dshackle.upstream.ethereum.subscribe.ConnectNewHeads import io.emeraldpay.dshackle.upstream.ethereum.subscribe.PendingTxesSource +import io.emeraldpay.dshackle.upstream.rpcclient.ListParams import org.slf4j.LoggerFactory import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import reactor.core.scheduler.Scheduler +data class Transaction( + val blockHash: String?, + val blockNumber: String?, + val from: String, + val gas: String, + + val gasPrice: String?, + val maxFeePerGas: String?, + + val maxPriorityFeePerGas: String?, + + val hash: String, + val input: String, + val nonce: String, + val to: String, + val transactionIndex: String?, + val value: String, + val type: String, + val accessList: List?, + val chainId: String, + val v: String, + val yParity: String?, + val r: String?, + val s: String?, +) + open class EthereumEgressSubscription( val upstream: Multistream, val scheduler: Scheduler, @@ -25,6 +56,7 @@ open class EthereumEgressSubscription( const val METHOD_NEW_HEADS = "newHeads" const val METHOD_LOGS = "logs" const val METHOD_PENDING_TXES = "newPendingTransactions" + const val METHOD_PENDING_TXES_WITH_BODY = "newPendingTransactionsWithBody" } private val newHeads = ConnectNewHeads(upstream, scheduler) @@ -41,7 +73,7 @@ open class EthereumEgressSubscription( listOf() } return if (pendingTxesSource != null) { - subs.plus(METHOD_PENDING_TXES) + subs.plus(listOf(METHOD_PENDING_TXES, METHOD_PENDING_TXES_WITH_BODY)) } else { subs } @@ -66,6 +98,31 @@ open class EthereumEgressSubscription( } if (topic == METHOD_PENDING_TXES) { return pendingTxesSource?.connect(matcher) ?: Flux.empty() + } else if (topic == METHOD_PENDING_TXES_WITH_BODY) { + return pendingTxesSource?.connect(matcher)?.flatMap { txHash -> + // Create request to get full transaction + val request = ChainRequest( + "eth_getTransactionByHash", + ListParams(txHash.toString()), + ) + + // try to read froms each upstream + Flux.fromIterable(upstream.getUpstreams()) + .flatMap { currentUpstream -> + currentUpstream.getIngressReader().read(request) + .timeout(Defaults.internalCallsTimeout) + .map { response -> + Global.objectMapper.readValue(response.getResult(), Transaction::class.java) + } + .doOnError { err -> + log.debug("Failed to get response from upstream ${currentUpstream.getId()} tx: $txHash: ${err.message}") + } + .onErrorResume { Mono.empty() } + } + .next() + .doOnSuccess { resp -> log.debug("eth_getTransactionByHash got response: $resp") } + .doOnError { err -> log.warn("eth_getTransactionByHash failed to get response from any upstream tx: $txHash: ${err.message}") } + } ?: Flux.empty() } return Flux.error(UnsupportedOperationException("Method $topic is not supported")) } From cdc393b37a81894f4734b561b7b95715ef48cc8e Mon Sep 17 00:00:00 2001 From: Andrei Bronin Date: Thu, 5 Jun 2025 10:45:04 +0300 Subject: [PATCH 2/3] fix test --- .../upstream/ethereum/EthereumEgressSubscriptionSpec.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumEgressSubscriptionSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumEgressSubscriptionSpec.groovy index 222f98d4..3d16893f 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumEgressSubscriptionSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumEgressSubscriptionSpec.groovy @@ -220,7 +220,7 @@ class EthereumEgressSubscriptionSpec extends Specification { up3.start() def ethereumSubscribe3 = new EthereumEgressSubscription(TestingCommons.multistream(up3) as GenericMultistream, Schedulers.boundedElastic(), Stub(PendingTxesSource)) then: - ethereumSubscribe3.getAvailableTopics().toSet() == [EthereumEgressSubscription.METHOD_LOGS, EthereumEgressSubscription.METHOD_NEW_HEADS, EthereumEgressSubscription.METHOD_PENDING_TXES].toSet() + ethereumSubscribe3.getAvailableTopics().toSet() == [EthereumEgressSubscription.METHOD_LOGS, EthereumEgressSubscription.METHOD_NEW_HEADS, EthereumEgressSubscription.METHOD_PENDING_TXES, EthereumEgressSubscription.METHOD_PENDING_TXES_WITH_BODY].toSet() when: def up4 = TestingCommons.upstream(TestingCommons.api(), "eth_getBlockByNumber") up4.getConnectorMock().setLiveness(Flux.just(HeadLivenessState.OK)) From 22530d8a3eccafc43bd1a7a2dd3e1cd249fdb378 Mon Sep 17 00:00:00 2001 From: Andrei Bronin Date: Thu, 5 Jun 2025 17:26:59 +0300 Subject: [PATCH 3/3] review fix --- .../upstream/ethereum/EthereumEgressSubscription.kt | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumEgressSubscription.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumEgressSubscription.kt index 08f97bd6..e1bfee35 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumEgressSubscription.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumEgressSubscription.kt @@ -111,8 +111,13 @@ open class EthereumEgressSubscription( .flatMap { currentUpstream -> currentUpstream.getIngressReader().read(request) .timeout(Defaults.internalCallsTimeout) - .map { response -> - Global.objectMapper.readValue(response.getResult(), Transaction::class.java) + .flatMap { response -> + val result = response.getResult() + if (result.isEmpty()) { + Mono.empty() + } else { + Mono.just(Global.objectMapper.readValue(result, Transaction::class.java)) + } } .doOnError { err -> log.debug("Failed to get response from upstream ${currentUpstream.getId()} tx: $txHash: ${err.message}")