summaryrefslogtreecommitdiff
path: root/fibers/internal.scm
diff options
context:
space:
mode:
Diffstat (limited to 'fibers/internal.scm')
-rw-r--r--fibers/internal.scm90
1 files changed, 53 insertions, 37 deletions
diff --git a/fibers/internal.scm b/fibers/internal.scm
index df9bc71..b69f3d0 100644
--- a/fibers/internal.scm
+++ b/fibers/internal.scm
@@ -79,7 +79,7 @@ name is known."
(define-record-type <scheduler>
(%make-scheduler name epfd active-fd-count prompt-tag
next-runqueue current-runqueue
- sources timers kernel-thread)
+ sources timers kernel-thread remote-peers)
scheduler?
(name scheduler-name set-scheduler-name!)
(epfd scheduler-epfd)
@@ -94,14 +94,15 @@ name is known."
;; PSQ of thunk -> expiry
(timers scheduler-timers set-scheduler-timers!)
;; atomic parameter of thread
- (kernel-thread scheduler-kernel-thread))
+ (kernel-thread scheduler-kernel-thread)
+ ;; vector of sched
+ (remote-peers scheduler-remote-peers))
(define-record-type <fiber>
(make-fiber scheduler continuation)
fiber?
- ;; The scheduler that a fiber runs in. As a scheduler only runs in
- ;; one kernel thread, this binds a fiber to a kernel thread.
- (scheduler fiber-scheduler)
+ ;; The scheduler to which a fiber is currently bound.
+ (scheduler fiber-scheduler set-fiber-scheduler!)
;; What the fiber should do when it resumes, or #f if the fiber is
;; currently running.
(continuation fiber-continuation set-fiber-continuation!))
@@ -117,7 +118,7 @@ name is known."
(unless (eq? prev init)
(error "owned by other thread" prev))))))))
-(define* (make-scheduler)
+(define* (make-scheduler #:key (remote-peers #()))
"Make a new scheduler in which to run fibers."
(let ((epfd (epoll-create))
(active-fd-count 0)
@@ -131,7 +132,8 @@ name is known."
(kernel-thread (make-atomic-parameter #f)))
(let ((sched (%make-scheduler #f epfd active-fd-count prompt-tag
next-runqueue current-runqueue
- sources timers kernel-thread)))
+ sources timers kernel-thread
+ remote-peers)))
(set-scheduler-name! sched (nameset-add! schedulers-nameset sched))
sched)))
@@ -207,16 +209,19 @@ thread."
(set-car! (car sources) #f)
(set-cdr! (car sources) #f))))))
-(define (scheduler-poll-timeout sched)
+(define (scheduler-finished? sched finished?)
+ (and (finished?)
+ (psq-empty? (scheduler-timers sched))))
+
+(define (scheduler-poll-timeout sched finished?)
(cond
((not (stack-empty? (scheduler-next-runqueue sched)))
;; Don't sleep if there are fibers in the runqueue already.
0)
((psq-empty? (scheduler-timers sched))
- ;; If there are no timers, only sleep if there are active fd's. (?)
- (cond
- ((zero? (scheduler-active-fd-count sched)) 0)
- (else -1)))
+ ;; Avoid sleeping if the scheduler is actually finished.
+ (let ((done? (and (finished?) (zero? (scheduler-active-fd-count sched)))))
+ (if done? 0 -1)))
(else
(match (psq-min (scheduler-timers sched))
((expiry . thunk)
@@ -241,7 +246,7 @@ thread."
(thunk)
(run-timers timers)))))))))
-(define (schedule-runnables-for-next-turn sched)
+(define (schedule-runnables-for-next-turn sched finished?)
;; Called when all runnables from the current turn have been run.
;; Note that there may be runnables already scheduled for the next
;; turn; one way this can happen is if a fiber suspended itself
@@ -251,7 +256,7 @@ thread."
;; are interested in are active, and in that case schedule their
;; corresponding fibers. Also run any timers that have timed out.
(epoll (scheduler-epfd sched)
- #:get-timeout (lambda () (scheduler-poll-timeout sched))
+ #:get-timeout (lambda () (scheduler-poll-timeout sched finished?))
#:folder (lambda (fd revents seed)
(schedule-fibers-for-fd fd revents sched)
seed))
@@ -267,36 +272,47 @@ thread."
(set-fiber-continuation! fiber k)
(after-suspend fiber))))
-(define* (run-scheduler sched)
+(define* (run-scheduler sched finished?)
"Run @var{sched} until there are no more fibers ready to run, no
file descriptors being waited on, and no more timers pending to run.
Return zero values."
(let ((next (scheduler-next-runqueue sched))
- (cur (scheduler-current-runqueue sched)))
+ (cur (scheduler-current-runqueue sched))
+ (peers (scheduler-remote-peers sched)))
(let next-turn ()
- (schedule-runnables-for-next-turn sched)
+ (schedule-runnables-for-next-turn sched finished?)
(stack-push-list! cur (reverse (stack-pop-all! next)))
- (cond
- ((stack-empty? cur)
- ;; Could be the scheduler is stopping, or it could be that we
- ;; got a spurious wakeup. In any case, this is the place to
- ;; check to see whether the scheduler is really done.
- (cond
- ((not (zero? (scheduler-active-fd-count sched))) (next-turn))
- ((not (psq-empty? (scheduler-timers sched))) (next-turn))
- (else (values))))
- (else
- (let next-fiber ()
- (match (stack-pop! cur #f)
- (#f (next-turn))
- (fiber (run-fiber fiber) (next-fiber)))))))))
+ (let next-fiber ()
+ (match (stack-pop! cur #f)
+ (#f
+ (cond
+ ((stack-empty? next)
+ ;; Both current and next runqueues are empty; steal a
+ ;; little bit of work from a remote scheduler if we
+ ;; can. Run it directly instead of pushing onto a
+ ;; queue to avoid double stealing.
+ (match (steal-fiber! peers)
+ (#f
+ (unless (scheduler-finished? sched finished?)
+ (next-turn)))
+ (fiber
+ (set-fiber-scheduler! fiber sched)
+ (run-fiber fiber)
+ (next-turn))))
+ (else
+ (next-turn))))
+ (fiber
+ (run-fiber fiber)
+ (next-fiber)))))))
-(define (steal-work! sched)
- "Steal some work from @var{sched}. Return a list of runnable fibers
-in FIFO order, or the empty list if no work could be stolen."
- (match (stack-pop! (scheduler-current-runqueue sched) #f)
- (#f '())
- (fiber (list fiber))))
+(define (steal-fiber! schedulers)
+ "Steal some work from a random scheduler in the vector
+@var{schedulers}. Return a fiber, or @code{#f} if no work could be
+stolen."
+ (let ((len (vector-length schedulers)))
+ (and (> len 0)
+ (let ((sched (vector-ref schedulers (random len))))
+ (stack-pop! (scheduler-current-runqueue sched) #f)))))
(define (destroy-scheduler sched)
"Release any resources associated with @var{sched}."