feat: update lassie to sync Retriever (before provider rewrite revert)#167
feat: update lassie to sync Retriever (before provider rewrite revert)#167
Conversation
d6c4231 to
8814195
Compare
8814195 to
f7ec395
Compare
Codecov ReportBase: 4.87% // Head: 5.24% // Increases project coverage by
Additional details and impacted files@@ Coverage Diff @@
## master #167 +/- ##
=========================================
+ Coverage 4.87% 5.24% +0.36%
=========================================
Files 15 14 -1
Lines 1723 1697 -26
=========================================
+ Hits 84 89 +5
+ Misses 1634 1603 -31
Partials 5 5
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
0d024b6 to
e3a5629
Compare
* Retriever#Retrieve() calls are now synchronous, so we get to wait for the direct return value and error synchronously * Change the AwaitGet call order and make it cancellable * Make the provider context-cancel aware for cleaner shutdown * Other minor fixes and adaptions to the new Lassie code
e3a5629 to
b6db210
Compare
hannahhoward
left a comment
There was a problem hiding this comment.
These are just some possibilities. I don't know whether they fix underlying issues though
| blockManager: blockManager, | ||
| retriever: retriever, | ||
| requestQueue: peertaskqueue.New(peertaskqueue.TaskMerger(&overwriteTaskMerger{}), peertaskqueue.IgnoreFreezing(true)), | ||
| requestQueueSignalChan: make(chan struct{}, 10), |
There was a problem hiding this comment.
so here's my recommendation for these signals:
- make them buffer 1
- when writing, call:
select {
case provider.requestQueueSignalChan <- struct{}{}:
default:
}
There was a problem hiding this comment.
oh, nice, so if it blocks then bail, I didn't think of that!
| select { | ||
| case <-ctx.Done(): | ||
| case <-time.After(time.Millisecond * 250): | ||
| case <-provider.responseQueueSignalChan: |
There was a problem hiding this comment.
when len(tasks) != 0, you had better still optionally drain the signal chan, i.e
if len(tasks == 0) {
///...
continue
}
select {
case <-provider.responseQueueSignalChan:
default:
}
///...There was a problem hiding this comment.
I wouldn't be surprised if you're getting stuck cause of this.
| @@ -325,13 +339,15 @@ func (provider *Provider) handleResponses() { | |||
| } | |||
There was a problem hiding this comment.
why are we calling TasksDone twice when an error occurs sending a message?
| } | ||
| continue | ||
| } | ||
|
|
There was a problem hiding this comment.
same, better drain the signal queue
| for { | ||
| func (provider *Provider) handleRetrievals(ctx context.Context) { | ||
| for ctx.Err() == nil { | ||
| peerID, tasks, _ := provider.retrievalQueue.PopTasks(1) |
There was a problem hiding this comment.
now that retreival is synchronous, this appears to limit things to one retrieval per worker queue, no?
There was a problem hiding this comment.
yes, default is 8 workers, but I hadn't seen a reason to increase this (yet) because it appears the pipe of incoming requests is so small; but maybe I'm not seeing it right
DRAFT for now because it's using filecoin-project/lassie#41
@elijaharita I'd like your eyes on this for the synchronous call in the provider,
provider.retriever.Retrieve()now blocks, holding up one of the goroutines while a retrieval attempt happens. Most of them still fail from indexer lookup fails but the ones that attempt may cause a backlog.Is there any mechanism in here to prevent an excessive backlog of tasks on the
retrievalQueuethat we can't process fast enough to keep up with incoming? I don't see it but I might be missing something. I'm also not experiencing a huge queue while running this locally but that might change when on a faster open network.