Delay updateAll() & only update subset immediately#815
Conversation
If the number of peers is greater than or equal to `MIN_PEER_UPDATE_ALL` (currently `10`), only a subset of the peers are scanned immediately. A full scan is schedule later with a delay based on the number of peers. `Math.sqrt(this.peers.length) * 100ms` `updateAll()` now takes an optional `limit` arg so the code is reused between the subset & complete scans. All calls to `updateAll()` were refactor to `queueUpdateAll()` except in `core.truncate()` as that is assumed to be strongly propagated.
| this._notDownloadingTimer = null | ||
|
|
||
| this._updateAllBump = null | ||
| this._updateAllBound = () => { |
There was a problem hiding this comment.
use a bind instead on an instance method
There was a problem hiding this comment.
I needed to set _updateAllBump to null so it would reschedule. would normally put in updateAll but wanted to reuse it for both subset and full scan.
There was a problem hiding this comment.
thats ok, just define this as _something and then bind that here.
There was a problem hiding this comment.
actually much easier if you add to updateAll no?
if (this._updateBump !== null) {
clearTimeout(this._updateBump)
this._updateBump = null
}
then my below comment on the clear is irrelevant and you can rmeove the clearTimeotu down there also from the else
There was a problem hiding this comment.
Adjusted so the bound version is just a .bind() and moved the clear to updateAll(). It checks if a full scan before clearing since the method is used for the subset as well.
|
|
||
| queueUpdateAll() { | ||
| if (this.peers.length >= MIN_PEER_UPDATE_ALL) { | ||
| const MIN_DELAY_UPDATE_ALL = Math.sqrt(this.peers.length) * 100 |
There was a problem hiding this comment.
normal case, its not a constant. I think you should define a function isntead that just makes this static
getDelay(peers) {
if (peers < 10) return 100
if (peers < 50) return 200
if (peers < 100) return 300
if (peers < 400) return 400
return 1000
}
or whatever we want
There was a problem hiding this comment.
Do we still want it to be square root? I like that it doesn't grow linear just in case.
| const LAST_BLOCKS = 256 | ||
|
|
||
| const MAX_RANGES = 64 | ||
| const MIN_PEER_UPDATE_ALL = 10 |
There was a problem hiding this comment.
we should prob set a bit higher, like
| if (this._updateAllBump !== null) return //skip if already scheduled | ||
| this._updateAllBump = setTimeout(this._updateAllBound, this.getUpdateAllDelay(this.peers)) | ||
| } else { | ||
| clearTimeout(this._updateAllBump) |
There was a problem hiding this comment.
need to set bump to null post this btw, otherwise live lock per above
There was a problem hiding this comment.
There was a problem hiding this comment.
mmm, you are cancelling that call here tho
There was a problem hiding this comment.
ah, you're right sorry. i have fixed per the other comment.
|
|
||
| getUpdateAllDelay(peers) { | ||
| return Math.sqrt(peers.length) * 100 | ||
| return Math.min(3000, Math.max(100, (peers.length - MIN_PEER_UPDATE_ALL) * 5)) |
There was a problem hiding this comment.
ok if MIN_PEER_UPDATE is relatively non small, like 40-50
There was a problem hiding this comment.
Updated MIN_PEER_UPDATE_ALL to 40. Previously was 30.
|
Merged |
|
I got curious about this PR, |
|
The regression I am analysing: test('delayed updateAll clears timer after reorg skip', async function (t) {
const Replicator = require('../lib/replicator')
const r = new Replicator({})
t.teardown(() => {
if (r._updateAllBump) clearTimeout(r._updateAllBump)
})
r.peers = Array.from({ length: 40 }, () => ({}))
r.getUpdateAllDelay = () => 1
// Simulate the delayed full scan firing while a reorg is active
r._applyingReorg = {}
r.queueUpdateAll()
await new Promise(resolve => setTimeout(resolve, 10))
t.is(r._updateAllBump, null, 'timer handle should be cleared after firing')
}) |
Test based on @marcus-pousette-hp's repo with tweaks to make it more realistic at the cost of taking longer.
|
Sanity check, I am investigating side effects that core.upgrade({ wait: true }) can resolve early false because in the case if there is 41 peers, and we do
If I understand it from previous behaviour, this is different, and unexpected. Since we should not resolve early false we we acutally have not waited for the full update. I could try to produce a regression test |
It becomes a bit messy but, this seems to catch the regression (Co-authored with AI) test('wait update does not resolve false before delayed full scan with many peers', async function (t) {
const writer = await create(t)
const blocks = []
for (let i = 0; i < 100; i++) blocks.push('block-' + i)
await writer.append(blocks)
const reader = await create(t, writer.key, { eagerUpgrade: false })
// Bootstrap reader to the stale length, then disconnect so only the explicit
// update below can observe the later writer append.
let streams = replicate(writer, reader, t, { teardown: false })
await reader.update({ wait: true, force: true })
await reader.download({ start: 0, end: 100 }).done()
await unreplicate(streams)
const stalePeers = []
for (let i = 0; i < 40; i++) {
const peer = await create(t, writer.key)
streams = replicate(writer, peer, t, { teardown: false })
await peer.update({ wait: true, force: true })
await peer.download({ start: 0, end: 100 }).done()
await unreplicate(streams)
stalePeers.push(peer)
}
await writer.append('new-block')
// Keep real non-primary work pending. Without this, the skipped writer can
// remain in the first 3 peers that _checkUpgradeIfAvailable() samples.
await reader.clear(0, 100)
const range = reader.download({ start: 0, end: 100, linear: true })
// These live streams can use normal test teardown.
replicate(writer, reader, t)
for (const peer of stalePeers) replicate(peer, reader, t)
while (reader.replicator.peers.length !== 41) {
await new Promise((resolve) => setImmediate(resolve))
}
const oldRandom = Math.random
let update = null
try {
// RandomIterator otherwise makes whether the writer is skipped probabilistic.
Math.random = () => 0
update = reader.update({ wait: true, force: true })
let settled = false
let result = null
update.then((value) => {
settled = true
result = value
})
await new Promise((resolve) => setImmediate(resolve))
t.is(settled, false, `partial scan should not resolve wait update as ${result}`)
} finally {
Math.random = oldRandom
// In the fixed case, update is intentionally still pending at the assertion.
// Cancel it so the test does not leave an active upgrade ref behind.
if (update !== null) {
reader.core.replicator.clearRequests(reader.activeRequests, null)
await update.catch(() => {})
}
range.destroy()
// When the cleanup issue in PR is fixed this can be removed
if (reader.replicator._updateAllBump) {
clearTimeout(reader.replicator._updateAllBump)
reader.replicator._updateAllBump = null
}
}
}) |
|
@marcus-pousette-hp thanks for checking this. This tests also doesn't pass on The docs are a bit confusing as they say:
Which does sound like it would be fully blocking by exhaustively checking all peers, but instead is more of a 'blocking, if available' where 'if available' is a check against a random subset of peers. |
ht @marcus-pousette-hp for finding this.
The timeout only works if the entire process exits with it. It also was brittle.
Then I am happy with the PR! I guess this is either a documentation bug, or a behaivour bug that would need a follow up in another issue. |
Update all is too aggressive at the moment and tries to bump any pending ranges etc every time. This is meant to cover edgecases but that can be done via a delay instead.
If the number of peers is greater than or equal to
MIN_PEER_UPDATE_ALL(currently10), only a subset of the peers are scanned immediately. A full scan is schedule later with a delay based on the number of peers.Math.sqrt(this.peers.length) * 100msupdateAll()now takes an optionallimitarg so the code is reused between the subset & complete scans.All calls to
updateAll()were refactor toqueueUpdateAll()except incore.truncate()as that is assumed to be strongly propagated.