8000 fix: parallel peer requests with dht over bitswap by TheGreatAlgo · Pull Request #773 · ipfs/helia · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

fix: parallel peer requests with dht over bitswap #773

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 46 additions & 21 deletions packages/bitswap/src/bitswap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,34 +72,59 @@

async want (cid: CID, options: WantOptions = {}): Promise<Uint8Array> {
const controller = new AbortController()
// Combine the passed signal with the internal controller's signal
const signal = anySignal([controller.signal, options.signal])
// Ensure listeners don't cause warnings, especially with multiple operations
setMaxListeners(Infinity, controller.signal, signal)

// find providers and connect to them
this.network.findAndConnect(cid, {
// 1. Initiate the request via wantList (checks connected peers and waits for block)
// This promise resolves when the block is successfully received by the wantList,
// regardless of whether it came from a direct peer or via the network lookup.
< 10000 /td> const blockPromise = this.wantList.wantBlock(cid, {
...options,
signal
})
.catch(err => {
// if the controller was aborted we found the block already so ignore
// the error
if (!controller.signal.aborted) {
this.log.error('error during finding and connect for cid %c', cid, err)
}
})
signal // Pass the combined signal
});

// 2. Initiate the network search concurrently (DHT lookup etc.)
// We don't necessarily need to await this promise directly,
// as its purpose is to find *providers* and connect to them.
// The wantList should eventually receive the block if this succeeds.
// We run it in the background and handle potential errors.
// Ensure we don't start findAndConnect if already aborted
if (!signal.aborted) {
this.network.findAndConnect(cid, {
...options,
signal // Use the same combined signal
})
.catch(err => {
// Only log if not aborted externally or by blockPromise succeeding/failing first
// If the signal was aborted, it's likely because blockPromise resolved or failed, or the user aborted.
if (!signal.aborted) {
this.log.error('want %c: error during background findAndConnect: %s', cid, err.message ?? err);
}
// We don't necessarily need to abort the controller here,
// as blockPromise might still succeed from a direct peer.
// If blockPromise fails later, the main try/catch will handle it.

Check warning on line 107 in packages/bitswap/src/bitswap.ts

View check run for this annotation

Codecov / codecov/patch

packages/bitswap/src/bitswap.ts#L100-L107

Added lines #L100 - L107 were not covered by tests
});
}

try {
const result = await this.wantList.wantBlock(cid, {
...options,
signal
})

return result.block
// 3. Await the blockPromise. This will resolve if the block is found
// either quickly from a connected peer or after findAndConnect helps locate a provider.
const result = await blockPromise;
controller.abort(); // Abort controller and signal findAndConnect to stop
return result.block;
} catch (err: any) {
this.log.error('want %c: failed to receive block via wantList: %s', cid, err.message ?? err);
controller.abort(); // Ensure controller and findAndConnect signal are aborted on final failure
throw err; // Re-throw the error from wantList.wantBlock
} finally {
// since we have the block we can now abort any outstanding attempts to
// find providers for it
controller.abort()
signal.clear()
// Cleanup signal listeners associated with anySignal
if (typeof signal.clear === 'function') {
signal.clear();
}
// Ensure the internal controller's listeners are removed
controller.abort();
}
}

Expand Down
60 changes: 60 additions & 0 deletions packages/bitswap/test/bitswap.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@ import { MemoryBlockstore } from 'blockstore-core'
import { CID } from 'multiformats/cid'
import { sha256 } from 'multiformats/hashes/sha2'
import pDefer from 'p-defer'
import pRetry from 'p-retry'
import Sinon from 'sinon'
import { stubInterface } from 'sinon-ts'
import { Bitswap } from '../src/bitswap.js'
import { cidToPrefix } from '../src/utils/cid-prefix.js'
import type { BitswapMessageEventDetail } from '../src/network.js'
import type { WantBlockResult } from '../src/want-list.js'
import type { Routing } from '@helia/interface/routing'
import type { Libp2p, PeerId } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { StubbedInstance } from 'sinon-ts'
import delay from 'delay'

interface StubbedBitswapComponents {
peerId: PeerId
Expand Down Expand Up @@ -61,6 +64,63 @@ describe('bitswap', () => {
})

describe('want', () => {
it('should call wantList.wantBlock and network.findAndConnect concurrently and abort findAndConnect on completion', async () => {
const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
let findAndConnectSignal: AbortSignal | undefined // To capture the signal

// --- Setup Stubs/Mocks ---
// Stub findAndConnect - we don't need it to resolve, just capture args
const findAndConnectStub = Sinon.stub(bitswap.network, 'findAndConnect')
// Prevent the stubbed method from hanging indefinitely if called unexpectedly after the test logic finishes
findAndConnectStub.returns(Promise.resolve()) // Or reject with AbortError if preferred
// Capture the signal passed to findAndConnect
findAndConnectStub.callsFake(async (_cid, options) => {
findAndConnectSignal = options?.signal
// Return a promise that never resolves on its own,
// relying on the signal for abortion
return new Promise(() => {})
})

// Stub wantBlock and make it return a deferred promise
const blockDeferred = pDefer<WantBlockResult>()
const wantBlockStub = Sinon.stub(bitswap.wantList, 'wantBlock').returns(blockDeferred.promise)

// --- Execution ---
const wantPromise = bitswap.want(cid)

// --- Assertions ---
// Give the event loop a chance to run the concurrent calls
await delay(20) // Increased delay slightly just in case

expect(wantBlockStub.calledOnce).to.be.true('wantList.wantBlock should have been called once')
expect(findAndConnectStub.calledOnce).to.be.true('network.findAndConnect should have been called once')

// Verify arguments
expect(wantBlockStub.getCall(0).args[0].toString()).to.equal(cid.toString())
expect(findAndConnectStub.getCall(0).args[0].toString()).to.equal(cid.toString())
expect(wantBlockStub.getCall(0).args[1]?.signal).to.be.an.instanceOf(AbortSignal)
expect(findAndConnectStub.getCall(0).args[1]?.signal).to.be.an.instanceOf(AbortSignal)
expect(findAndConnectSignal).to.be.an.instanceOf(AbortSignal, 'Signal should have been captured from findAndConnect call')

// --- Resolution ---
// Simulate block arrival
blockDeferred.resolve({
sender: remotePeer,
cid,
block
})

// Await the main want promise
const resultBlock = await wantPromise
expect(resultBlock).to.equalBytes(block)

// --- Final Assertion ---
// Check that the signal passed to findAndConnect is now aborted
// Add a small delay to ensure the abort controller within `want` has executed
await delay(10)
expect(findAndConnectSignal?.aborted).to.be.true('Signal passed to findAndConnect should be aborted after want completes')
})

it('should want a block that is available on the network', async () => {
const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
const findProvsSpy = bitswap.network.findAndConnect = Sinon.stub()
Expand Down
Loading
0