diff options
| author | Andy Wingo <wingo@pobox.com> | 2016-12-22 18:32:55 +0100 |
|---|---|---|
| committer | Andy Wingo <wingo@pobox.com> | 2016-12-22 18:32:55 +0100 |
| commit | 347d8715efc89f79aec71207cbe8b6c9ff04adae (patch) | |
| tree | b447710701698ef9bfb48897cccc86009ea6d91e | |
| parent | run-scheduler pops one fiber at a time (diff) | |
| download | guile-fibers-347d8715efc89f79aec71207cbe8b6c9ff04adae.tar.gz | |
Get current scheduler from current fiber
* fibers.scm (run-fibers): Refine #:keep-scheduler? default.
(current-fiber-scheduler): Rename from require-current-scheduler, and
use the current fiber to get at the scheduler.
(spawn-fiber): Let create-fiber handle dynamic state shenanigans, as
it needs to ensure that current-fiber is bound in the thunk.
* fibers/internal.scm (with-scheduler): Don't parameterize
current-scheduler, as that binding for a given scheduler needs to
change over time, and the with-dynamic-state prevents this from
happening.
(schedule-fiber!): Wake schedulers not on the current kernel thread.
(run-fiber): Don't parameterize current-fiber; it's lost across the
with-dynamic-state.
(create-fiber): Instead set current-fiber inside the
with-dynamic-state.
(suspend-current-fiber, yield-current-fiber): Get current scheduler
from current fiber.
| -rw-r--r-- | fibers.scm | 26 | ||||
| -rw-r--r-- | fibers/internal.scm | 52 |
2 files changed, 37 insertions, 41 deletions
@@ -18,17 +18,17 @@ ;;;; (define-module (fibers) + #:use-module (ice-9 match) #:use-module (fibers internal) #:use-module (fibers repl) #:use-module (fibers timers) #:use-module (fibers interrupts) + #:use-module ((ice-9 threads) #:select (current-thread)) #:use-module ((ice-9 ports internal) #:select (port-read-wait-fd port-write-wait-fd)) #:use-module (ice-9 suspendable-ports) - #:export (run-fibers - spawn-fiber) - #:re-export (current-fiber - sleep)) + #:export (run-fibers spawn-fiber) + #:re-export (current-fiber sleep)) (define (wait-for-readable port) (suspend-current-fiber @@ -42,7 +42,8 @@ (define* (run-fibers #:optional (init #f) #:key (hz 0) (scheduler (make-scheduler)) (install-suspendable-ports? #t) - (keep-scheduler? (eq? scheduler (current-scheduler)))) + (keep-scheduler? + (->bool (scheduler-kernel-thread scheduler)))) (when install-suspendable-ports? (install-suspendable-ports!)) (with-scheduler scheduler @@ -62,14 +63,11 @@ (unless keep-scheduler? (destroy-scheduler scheduler)) (apply values ret))))))) -(define (require-current-scheduler) - (or (current-scheduler) - (error "No scheduler current; call within run-fibers instead"))) +(define (current-fiber-scheduler) + (match (current-fiber) + (#f (error "No scheduler current; call within run-fibers instead")) + (fiber (fiber-scheduler fiber)))) -(define* (spawn-fiber thunk #:optional (sched (require-current-scheduler)) +(define* (spawn-fiber thunk #:optional (sched (current-fiber-scheduler)) #:key (dynamic-state (current-dynamic-state))) - (let ((thunk (if dynamic-state - (lambda () - (with-dynamic-state dynamic-state thunk)) - thunk))) - (create-fiber sched thunk))) + (create-fiber sched thunk dynamic-state)) diff --git a/fibers/internal.scm b/fibers/internal.scm index 6a39cfe..df9bc71 100644 --- a/fibers/internal.scm +++ b/fibers/internal.scm @@ -32,7 +32,6 @@ make-scheduler with-scheduler scheduler-name - (current-scheduler/public . current-scheduler) (scheduler-kernel-thread/public . scheduler-kernel-thread) run-scheduler destroy-scheduler @@ -138,15 +137,14 @@ name is known." (define-syntax-rule (with-scheduler scheduler body ...) "Evaluate @code{(begin @var{body} ...)} in an environment in which -@var{scheduler} is bound to the current kernel thread and marked as -current. Signal an error if @var{scheduler} is already running in -some other kernel thread." +@var{scheduler} is bound to the current kernel thread. Signal an +error if @var{scheduler} is already running in some other kernel +thread." (let ((sched scheduler)) (dynamic-wind (lambda () ((scheduler-kernel-thread sched) (current-thread))) (lambda () - (parameterize ((current-scheduler sched)) - body ...)) + body ...) (lambda () ((scheduler-kernel-thread sched) #f))))) @@ -155,11 +153,6 @@ some other kernel thread." @code{#f} if @var{sched} is not running." ((scheduler-kernel-thread sched))) -(define current-scheduler (make-parameter #f)) -(define (current-scheduler/public) - "Return the current scheduler, or @code{#f} if no scheduler is -current." - (current-scheduler)) (define (make-source events expiry fiber) (vector events expiry fiber)) (define (source-events s) (vector-ref s 0)) (define (source-expiry s) (vector-ref s 1)) @@ -179,7 +172,7 @@ current." (set-fiber-continuation! fiber thunk) (let ((sched (fiber-scheduler fiber))) (stack-push! (scheduler-next-runqueue sched) fiber) - (unless (eq? sched (current-scheduler)) + (unless (eq? ((scheduler-kernel-thread sched)) (current-thread)) (epoll-wake! (scheduler-epfd sched))) (values))) @@ -265,16 +258,14 @@ current." (run-timers sched)) (define* (run-fiber fiber) - (parameterize ((current-fiber fiber)) - (call-with-prompt - (scheduler-prompt-tag (fiber-scheduler fiber)) - (lambda () - (let ((thunk (fiber-continuation fiber))) - (set-fiber-continuation! fiber #f) - (thunk))) - (lambda (k after-suspend) - (set-fiber-continuation! fiber k) - (after-suspend fiber))))) + (call-with-prompt (scheduler-prompt-tag (fiber-scheduler fiber)) + (lambda () + (let ((thunk (fiber-continuation fiber))) + (set-fiber-continuation! fiber #f) + (thunk))) + (lambda (k after-suspend) + (set-fiber-continuation! fiber k) + (after-suspend fiber)))) (define* (run-scheduler sched) "Run @var{sched} until there are no more fibers ready to run, no @@ -313,10 +304,17 @@ in FIFO order, or the empty list if no work could be stolen." (for-each kill-fiber (list-copy (scheduler-fibers sched))) (epoll-destroy (scheduler-epfd sched))) -(define (create-fiber sched thunk) +(define (create-fiber sched thunk dynamic-state) "Spawn a new fiber in @var{sched} with the continuation @var{thunk}. -The fiber will be scheduled on the next turn." - (let ((fiber (make-fiber sched #f))) +The fiber will be scheduled on the next turn. During the fiber's +extent, @var{dynamic-state} will be made current, isolating fluid and +parameter mutations to this fiber." + (let* ((fiber (make-fiber sched #f)) + (thunk (lambda () + (with-dynamic-state dynamic-state + (lambda () + (current-fiber fiber) + (thunk)))))) (nameset-add! fibers-nameset fiber) (schedule-fiber! fiber thunk))) @@ -337,7 +335,7 @@ that this is currently unimplemented!" (after-suspend (lambda (fiber) #f))) "Suspend the current fiber. Call the optional @var{after-suspend} callback, if present, with the suspended thread as its argument." - (let ((tag (scheduler-prompt-tag (current-scheduler)))) + (let ((tag (scheduler-prompt-tag (fiber-scheduler (current-fiber))))) (unless (suspendable-continuation? tag) (error "Attempt to suspend fiber within continuation barrier")) ((abort-to-prompt tag after-suspend)))) @@ -357,7 +355,7 @@ even if @var{fiber} is running on a remote scheduler." except that it avoids suspending if the current continuation isn't suspendable. Returns @code{#t} if the yield succeeded, or @code{#f} otherwise." - (let ((tag (scheduler-prompt-tag (current-scheduler)))) + (let ((tag (scheduler-prompt-tag (fiber-scheduler (current-fiber))))) (and (suspendable-continuation? tag) (begin (abort-to-prompt tag (lambda (fiber) (resume-fiber fiber #f))) |
