8000 support eth_subscribe newPendingTransactionsWithBody by AndreyBronin · Pull Request #672 · drpcorg/dshackle · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

support eth_subscribe newPendingTransactionsWithBody #672

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.Defaults
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.upstream.Capability
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.EgressSubscription
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.Selector
Expand All @@ -9,10 +12,38 @@ import io.emeraldpay.dshackle.upstream.ethereum.hex.Hex32
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.ConnectLogs
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.ConnectNewHeads
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.PendingTxesSource
import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Scheduler

data class Transaction(
val blockHash: String?,
val blockNumber: String?,
val from: String,
val gas: String,

val gasPrice: String?,
val maxFeePerGas: String?,

val maxPriorityFeePerGas: String?,

val hash: String,
val input: String,
val nonce: String,
val to: String,
val transactionIndex: String?,
val value: String,
val type: String,
val accessList: List<Any>?,
val chainId: String,
val v: String,
val yParity: String?,
val r: String?,
val s: String?,
)

open class EthereumEgressSubscription(
val upstream: Multistream,
val scheduler: Scheduler,
Expand All @@ -25,6 +56,7 @@ open class EthereumEgressSubscription(
const val METHOD_NEW_HEADS = "newHeads"
const val METHOD_LOGS = "logs"
const val METHOD_PENDING_TXES = "newPendingTransactions"
const val METHOD_PENDING_TXES_WITH_BODY = "newPendingTransactionsWithBody"
}

private val newHeads = ConnectNewHeads(upstream, scheduler)
Expand All @@ -41,7 +73,7 @@ open class EthereumEgressSubscription(
listOf()
}
return if (pendingTxesSource != null) {
subs.plus(METHOD_PENDING_TXES)
subs.plus(listOf(METHOD_PENDING_TXES, METHOD_PENDING_TXES_WITH_BODY))
} else {
subs
}
Expand All @@ -66,6 +98,36 @@ open class EthereumEgressSubscription(
}
if (topic == METHOD_PENDING_TXES) {
return pendingTxesSource?.connect(matcher) ?: Flux.empty()
} else if (topic == METHOD_PENDING_TXES_WITH_BODY) {
return pendingTxesSource?.connect(matcher)?.flatMap { txHash ->
// Create request to get full transaction
val request = ChainRequest(
"eth_getTransactionByHash",
ListParams(txHash.toString()),
)

// try to read froms each upstream
Flux.fromIterable(upstream.getUpstreams())
.flatMap { currentUpstream ->
currentUpstream.getIngressReader().read(request)
.timeout(Defaults.internalCallsTimeout)
.flatMap { response ->
val result = response.getResult()
if (result.isEmpty()) {
Mono.empty()
} else {
Mono.just(Global.objectMapper.readValue(result, Transaction::class.java))
}
}
.doOnError { err ->
log.debug("Failed to get response from upstream ${currentUpstream.getId()} tx: $txHash: ${err.message}")
}
.onErrorResume { Mono.empty() }
}
.next()
.doOnSuccess { resp -> log.debug("eth_getTransactionByHash got response: $resp") }
.doOnError { err -> log.warn("eth_getTransactionByHash failed to get response from any upstream tx: $txHash: ${err.message}") }
} ?: Flux.empty()
}
return Flux.error(UnsupportedOperationException("Method $topic is not supported"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ class EthereumEgressSubscriptionSpec extends Specification {
up3.start()
def ethereumSubscribe3 = new EthereumEgressSubscription(TestingCommons.multistream(up3) as GenericMultistream, Schedulers.boundedElastic(), Stub(PendingTxesSource))
then:
ethereumSubscribe3.getAvailableTopics().toSet() == [EthereumEgressSubscription.METHOD_LOGS, EthereumEgressSubscription.METHOD_NEW_HEADS, EthereumEgressSubscription.METHOD_PENDING_TXES].toSet()
ethereumSubscribe3.getAvailableTopics().toSet() == [EthereumEgressSubscription.METHOD_LOGS, EthereumEgressSubscription.METHOD_NEW_HEADS, EthereumEgressSubscription.METHOD_PENDING_TXES, EthereumEgressSubscription.METHOD_PENDING_TXES_WITH_BODY].toSet()
when:
def up4 = TestingCommons.upstream(TestingCommons.api(), "eth_getBlockByNumber")
up4.getConnectorMock().setLiveness(Flux.just(HeadLivenessState.OK))
Expand Down
Loading
0