diff options
| author | Andy Wingo <wingo@pobox.com> | 2016-12-22 21:20:00 +0100 |
|---|---|---|
| committer | Andy Wingo <wingo@pobox.com> | 2016-12-22 21:23:28 +0100 |
| commit | ece64727dd0db80be570d2e6a0579a7bf63ce831 (patch) | |
| tree | 5ea822778bafdd47505df1bf6b939947d00257ae | |
| parent | Get current scheduler from current fiber (diff) | |
| download | guile-fibers-ece64727dd0db80be570d2e6a0579a7bf63ce831.tar.gz | |
Enable work stealing in run-scheduler
* fibers.scm (run-fibers): Store result in an atomic box, given that the
initial fiber could migrate.
* fibers/internal.scm (<scheduler>): Add remote-peers field.
(<fiber>): Add set-fiber-scheduler!.
(make-scheduler): Adapt to <scheduler> change.
(scheduler-finished?): New helper.
(scheduler-poll-timeout): Take a finished? predicate to know when to
wait and when to return directly.
(schedule-runnables-for-next-turn): Plumb finished? predicate
through.
(run-scheduler): Avoid returning until finished? is true. Refactor to
add work stealing.
(steal-fiber!): New helper.
| -rw-r--r-- | fibers.scm | 11 | ||||
| -rw-r--r-- | fibers/internal.scm | 90 |
2 files changed, 58 insertions, 43 deletions
@@ -19,6 +19,7 @@ (define-module (fibers) #:use-module (ice-9 match) + #:use-module (ice-9 atomic) #:use-module (fibers internal) #:use-module (fibers repl) #:use-module (fibers timers) @@ -52,16 +53,14 @@ (with-interrupts hz yield-current-fiber (lambda () - (let ((ret #f)) + (let ((ret (make-atomic-box #f))) (spawn-fiber (lambda () (call-with-values (or init values) - (lambda vals (set! ret vals)))) + (lambda vals (atomic-box-set! ret vals)))) scheduler) - (let lp () - (run-scheduler scheduler) - (unless ret (lp))) + (run-scheduler scheduler (lambda () (atomic-box-ref ret))) (unless keep-scheduler? (destroy-scheduler scheduler)) - (apply values ret))))))) + (apply values (atomic-box-ref ret)))))))) (define (current-fiber-scheduler) (match (current-fiber) diff --git a/fibers/internal.scm b/fibers/internal.scm index df9bc71..b69f3d0 100644 --- a/fibers/internal.scm +++ b/fibers/internal.scm @@ -79,7 +79,7 @@ name is known." (define-record-type <scheduler> (%make-scheduler name epfd active-fd-count prompt-tag next-runqueue current-runqueue - sources timers kernel-thread) + sources timers kernel-thread remote-peers) scheduler? (name scheduler-name set-scheduler-name!) (epfd scheduler-epfd) @@ -94,14 +94,15 @@ name is known." ;; PSQ of thunk -> expiry (timers scheduler-timers set-scheduler-timers!) ;; atomic parameter of thread - (kernel-thread scheduler-kernel-thread)) + (kernel-thread scheduler-kernel-thread) + ;; vector of sched + (remote-peers scheduler-remote-peers)) (define-record-type <fiber> (make-fiber scheduler continuation) fiber? - ;; The scheduler that a fiber runs in. As a scheduler only runs in - ;; one kernel thread, this binds a fiber to a kernel thread. - (scheduler fiber-scheduler) + ;; The scheduler to which a fiber is currently bound. + (scheduler fiber-scheduler set-fiber-scheduler!) ;; What the fiber should do when it resumes, or #f if the fiber is ;; currently running. (continuation fiber-continuation set-fiber-continuation!)) @@ -117,7 +118,7 @@ name is known." (unless (eq? prev init) (error "owned by other thread" prev)))))))) -(define* (make-scheduler) +(define* (make-scheduler #:key (remote-peers #())) "Make a new scheduler in which to run fibers." (let ((epfd (epoll-create)) (active-fd-count 0) @@ -131,7 +132,8 @@ name is known." (kernel-thread (make-atomic-parameter #f))) (let ((sched (%make-scheduler #f epfd active-fd-count prompt-tag next-runqueue current-runqueue - sources timers kernel-thread))) + sources timers kernel-thread + remote-peers))) (set-scheduler-name! sched (nameset-add! schedulers-nameset sched)) sched))) @@ -207,16 +209,19 @@ thread." (set-car! (car sources) #f) (set-cdr! (car sources) #f)))))) -(define (scheduler-poll-timeout sched) +(define (scheduler-finished? sched finished?) + (and (finished?) + (psq-empty? (scheduler-timers sched)))) + +(define (scheduler-poll-timeout sched finished?) (cond ((not (stack-empty? (scheduler-next-runqueue sched))) ;; Don't sleep if there are fibers in the runqueue already. 0) ((psq-empty? (scheduler-timers sched)) - ;; If there are no timers, only sleep if there are active fd's. (?) - (cond - ((zero? (scheduler-active-fd-count sched)) 0) - (else -1))) + ;; Avoid sleeping if the scheduler is actually finished. + (let ((done? (and (finished?) (zero? (scheduler-active-fd-count sched))))) + (if done? 0 -1))) (else (match (psq-min (scheduler-timers sched)) ((expiry . thunk) @@ -241,7 +246,7 @@ thread." (thunk) (run-timers timers))))))))) -(define (schedule-runnables-for-next-turn sched) +(define (schedule-runnables-for-next-turn sched finished?) ;; Called when all runnables from the current turn have been run. ;; Note that there may be runnables already scheduled for the next ;; turn; one way this can happen is if a fiber suspended itself @@ -251,7 +256,7 @@ thread." ;; are interested in are active, and in that case schedule their ;; corresponding fibers. Also run any timers that have timed out. (epoll (scheduler-epfd sched) - #:get-timeout (lambda () (scheduler-poll-timeout sched)) + #:get-timeout (lambda () (scheduler-poll-timeout sched finished?)) #:folder (lambda (fd revents seed) (schedule-fibers-for-fd fd revents sched) seed)) @@ -267,36 +272,47 @@ thread." (set-fiber-continuation! fiber k) (after-suspend fiber)))) -(define* (run-scheduler sched) +(define* (run-scheduler sched finished?) "Run @var{sched} until there are no more fibers ready to run, no file descriptors being waited on, and no more timers pending to run. Return zero values." (let ((next (scheduler-next-runqueue sched)) - (cur (scheduler-current-runqueue sched))) + (cur (scheduler-current-runqueue sched)) + (peers (scheduler-remote-peers sched))) (let next-turn () - (schedule-runnables-for-next-turn sched) + (schedule-runnables-for-next-turn sched finished?) (stack-push-list! cur (reverse (stack-pop-all! next))) - (cond - ((stack-empty? cur) - ;; Could be the scheduler is stopping, or it could be that we - ;; got a spurious wakeup. In any case, this is the place to - ;; check to see whether the scheduler is really done. - (cond - ((not (zero? (scheduler-active-fd-count sched))) (next-turn)) - ((not (psq-empty? (scheduler-timers sched))) (next-turn)) - (else (values)))) - (else - (let next-fiber () - (match (stack-pop! cur #f) - (#f (next-turn)) - (fiber (run-fiber fiber) (next-fiber))))))))) + (let next-fiber () + (match (stack-pop! cur #f) + (#f + (cond + ((stack-empty? next) + ;; Both current and next runqueues are empty; steal a + ;; little bit of work from a remote scheduler if we + ;; can. Run it directly instead of pushing onto a + ;; queue to avoid double stealing. + (match (steal-fiber! peers) + (#f + (unless (scheduler-finished? sched finished?) + (next-turn))) + (fiber + (set-fiber-scheduler! fiber sched) + (run-fiber fiber) + (next-turn)))) + (else + (next-turn)))) + (fiber + (run-fiber fiber) + (next-fiber))))))) -(define (steal-work! sched) - "Steal some work from @var{sched}. Return a list of runnable fibers -in FIFO order, or the empty list if no work could be stolen." - (match (stack-pop! (scheduler-current-runqueue sched) #f) - (#f '()) - (fiber (list fiber)))) +(define (steal-fiber! schedulers) + "Steal some work from a random scheduler in the vector +@var{schedulers}. Return a fiber, or @code{#f} if no work could be +stolen." + (let ((len (vector-length schedulers))) + (and (> len 0) + (let ((sched (vector-ref schedulers (random len)))) + (stack-pop! (scheduler-current-runqueue sched) #f))))) (define (destroy-scheduler sched) "Release any resources associated with @var{sched}." |
