summaryrefslogtreecommitdiff
path: root/fibers
diff options
context:
space:
mode:
Diffstat (limited to 'fibers')
-rw-r--r--fibers/channels.scm1
-rw-r--r--fibers/internal.scm489
-rw-r--r--fibers/operations.scm13
-rw-r--r--fibers/repl.scm78
-rw-r--r--fibers/scheduler.scm383
-rw-r--r--fibers/timers.scm15
6 files changed, 428 insertions, 551 deletions
diff --git a/fibers/channels.scm b/fibers/channels.scm
index aabf2db..ce6f348 100644
--- a/fibers/channels.scm
+++ b/fibers/channels.scm
@@ -32,7 +32,6 @@
#:use-module (ice-9 atomic)
#:use-module (ice-9 match)
#:use-module (fibers deque)
- #:use-module (fibers internal)
#:use-module (fibers operations)
#:export (make-channel
channel?
diff --git a/fibers/internal.scm b/fibers/internal.scm
deleted file mode 100644
index 79b1011..0000000
--- a/fibers/internal.scm
+++ /dev/null
@@ -1,489 +0,0 @@
-;; Fibers: cooperative, event-driven user-space threads.
-
-;;;; Copyright (C) 2016 Free Software Foundation, Inc.
-;;;;
-;;;; This library is free software; you can redistribute it and/or
-;;;; modify it under the terms of the GNU Lesser General Public
-;;;; License as published by the Free Software Foundation; either
-;;;; version 3 of the License, or (at your option) any later version.
-;;;;
-;;;; This library is distributed in the hope that it will be useful,
-;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
-;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-;;;; Lesser General Public License for more details.
-;;;;
-;;;; You should have received a copy of the GNU Lesser General Public
-;;;; License along with this library; if not, write to the Free Software
-;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-;;;;
-
-(define-module (fibers internal)
- #:use-module (srfi srfi-9)
- #:use-module (fibers stack)
- #:use-module (fibers epoll)
- #:use-module (fibers psq)
- #:use-module (fibers nameset)
- #:use-module (ice-9 atomic)
- #:use-module (ice-9 control)
- #:use-module (ice-9 match)
- #:use-module (ice-9 fdes-finalizers)
- #:use-module ((ice-9 threads) #:select (current-thread))
- #:export (;; Low-level interface: schedulers and threads.
- make-scheduler
- with-scheduler
- scheduler-name
- scheduler-runcount
- (scheduler-kernel-thread/public . scheduler-kernel-thread)
- scheduler-remote-peers
- scheduler-work-pending?
- choose-parallel-scheduler
- run-scheduler
- destroy-scheduler
-
- resume-on-readable-fd
- resume-on-writable-fd
- add-timer
-
- create-fiber
- (current-fiber/public . current-fiber)
- kill-fiber
- fiber-scheduler
- fiber-continuation
-
- fold-all-schedulers
- scheduler-by-name
- fold-all-fibers
- fiber-by-name
-
- suspend-current-fiber
- resume-fiber
- yield-current-fiber))
-
-(define-once fibers-nameset (make-nameset))
-(define-once schedulers-nameset (make-nameset))
-
-(define (fold-all-schedulers f seed)
- "Fold @var{f} over the set of known schedulers. @var{f} will be
-invoked as @code{(@var{f} @var{name} @var{scheduler} @var{seed})}."
- (nameset-fold f schedulers-nameset seed))
-(define (scheduler-by-name name)
- "Return the scheduler named @var{name}, or @code{#f} if no scheduler
-of that name is known."
- (nameset-ref schedulers-nameset name))
-
-(define (fold-all-fibers f seed)
- "Fold @var{f} over the set of known fibers. @var{f} will be
-invoked as @code{(@var{f} @var{name} @var{fiber} @var{seed})}."
- (nameset-fold f fibers-nameset seed))
-(define (fiber-by-name name)
- "Return the fiber named @var{name}, or @code{#f} if no fiber of that
-name is known."
- (nameset-ref fibers-nameset name))
-
-(define-record-type <scheduler>
- (%make-scheduler name epfd runcount-box prompt-tag
- next-runqueue current-runqueue
- sources timers kernel-thread
- remote-peers choose-parallel-scheduler)
- scheduler?
- (name scheduler-name set-scheduler-name!)
- (epfd scheduler-epfd)
- ;; atomic variable of uint32
- (runcount-box scheduler-runcount-box)
- (prompt-tag scheduler-prompt-tag)
- ;; atomic stack of fiber to run next turn (reverse order)
- (next-runqueue scheduler-next-runqueue)
- ;; atomic stack of fiber to run this turn
- (current-runqueue scheduler-current-runqueue)
- ;; fd -> (total-events (events . resume-fn) ...)
- (sources scheduler-sources)
- ;; PSQ of thunk -> expiry
- (timers scheduler-timers set-scheduler-timers!)
- ;; atomic parameter of thread
- (kernel-thread scheduler-kernel-thread)
- ;; list of sched
- (remote-peers scheduler-remote-peers set-scheduler-remote-peers!)
- ;; () -> sched
- (choose-parallel-scheduler scheduler-choose-parallel-scheduler
- set-scheduler-choose-parallel-scheduler!))
-
-(define-record-type <fiber>
- (make-fiber scheduler continuation)
- fiber?
- ;; The scheduler to which a fiber is currently bound.
- (scheduler fiber-scheduler set-fiber-scheduler!)
- ;; What the fiber should do when it resumes, or #f if the fiber is
- ;; currently running.
- (continuation fiber-continuation set-fiber-continuation!))
-
-(define (make-atomic-parameter init)
- (let ((box (make-atomic-box init)))
- (case-lambda
- (() (atomic-box-ref box))
- ((new)
- (if (eq? new init)
- (atomic-box-set! box new)
- (let ((prev (atomic-box-compare-and-swap! box init new)))
- (unless (eq? prev init)
- (error "owned by other thread" prev))))))))
-
-(define (shuffle l)
- (map cdr (sort (map (lambda (x) (cons (random 1.0) x)) l)
- (lambda (a b) (< (car a) (car b))))))
-
-(define (make-selector items)
- (let ((items (list->vector (shuffle items))))
- (match (vector-length items)
- (0 (lambda () #f))
- (1 (let ((item (vector-ref items 0))) (lambda () item)))
- (n (let ((idx 0))
- (lambda ()
- (let ((item (vector-ref items idx)))
- (set! idx (let ((idx (1+ idx)))
- (if (= idx (vector-length items)) 0 idx)))
- item)))))))
-
-(define* (make-scheduler #:key parallelism
- (prompt-tag (make-prompt-tag "fibers")))
- "Make a new scheduler in which to run fibers."
- (let ((epfd (epoll-create))
- (runcount-box (make-atomic-box 0))
- (next-runqueue (make-empty-stack))
- (current-runqueue (make-empty-stack))
- (sources (make-hash-table))
- (timers (make-psq (match-lambda*
- (((t1 . c1) (t2 . c2)) (< t1 t2)))
- <))
- (kernel-thread (make-atomic-parameter #f)))
- (let ((sched (%make-scheduler #f epfd runcount-box prompt-tag
- next-runqueue current-runqueue
- sources timers kernel-thread
- #f #f)))
- (set-scheduler-name! sched (nameset-add! schedulers-nameset sched))
- (let ((all-scheds
- (cons sched
- (if parallelism
- (map (lambda (_)
- (make-scheduler #:prompt-tag prompt-tag))
- (iota (1- parallelism)))
- '()))))
- (for-each
- (lambda (sched)
- (let ((choose! (make-selector all-scheds)))
- (set-scheduler-remote-peers! sched (delq sched all-scheds))
- (set-scheduler-choose-parallel-scheduler! sched choose!)))
- all-scheds))
- sched)))
-
-(define-syntax-rule (with-scheduler scheduler body ...)
- "Evaluate @code{(begin @var{body} ...)} in an environment in which
-@var{scheduler} is bound to the current kernel thread. Signal an
-error if @var{scheduler} is already running in some other kernel
-thread."
- (let ((sched scheduler))
- (dynamic-wind (lambda ()
- ((scheduler-kernel-thread sched) (current-thread)))
- (lambda ()
- body ...)
- (lambda ()
- ((scheduler-kernel-thread sched) #f)))))
-
-(define (scheduler-runcount sched)
- "Return the number of fibers that have been scheduled on
-@var{sched} since it was started, modulo 2@sup{32}."
- (atomic-box-ref (scheduler-runcount-box sched)))
-
-(define (scheduler-kernel-thread/public sched)
- "Return the kernel thread on which @var{sched} is running, or
-@code{#f} if @var{sched} is not running."
- ((scheduler-kernel-thread sched)))
-
-(define (choose-parallel-scheduler sched)
- ((scheduler-choose-parallel-scheduler sched)))
-
-(define current-fiber (make-parameter #f))
-(define (current-fiber/public)
- "Return the current fiber, or @code{#f} if no fiber is current."
- (current-fiber))
-
-(define (schedule-fiber! fiber thunk)
- ;; The fiber will be resumed at most once, and we are the ones that
- ;; will resume it, so we can set the thunk directly. Adding the
- ;; fiber to the runqueue is an atomic operation with SEQ_CST
- ;; ordering, so that will make sure this operation is visible even
- ;; for a fiber scheduled on a remote thread.
- (set-fiber-continuation! fiber thunk)
- (let ((sched (fiber-scheduler fiber)))
- (stack-push! (scheduler-next-runqueue sched) fiber)
- (unless (eq? ((scheduler-kernel-thread sched)) (current-thread))
- (epoll-wake! (scheduler-epfd sched)))
- (values)))
-
-(define internal-time-units-per-millisecond
- (/ internal-time-units-per-second 1000))
-
-(define (schedule-fibers-for-fd fd revents sched)
- (match (hashv-ref (scheduler-sources sched) fd)
- (#f (warn "scheduler for unknown fd" fd))
- ((and sources (active-events . waiters))
- ;; First, clear the active status, as the EPOLLONESHOT has
- ;; deactivated our entry in the epoll set.
- (set-car! sources #f)
- (set-cdr! sources '())
- (unless (zero? (logand revents EPOLLERR))
- (hashv-remove! (scheduler-sources sched) fd))
- ;; Now resume or re-enqueue fibers, as appropriate.
- (let lp ((waiters waiters))
- (match waiters
- (() #f)
- (((events . resume) . waiters)
- (if (zero? (logand revents (logior events EPOLLERR)))
- ;; Re-enqueue.
- (add-fd-event-waiter sched fd events resume)
- ;; Resume.
- (resume revents))
- (lp waiters)))))))
-
-(define (run-timers sched)
- ;; Run expired timer thunks in the order that they expired.
- (let ((now (get-internal-real-time)))
- (let run-timers ((timers (scheduler-timers sched)))
- (cond
- ((or (psq-empty? timers)
- (< now (car (psq-min timers))))
- (set-scheduler-timers! sched timers))
- (else
- (call-with-values (lambda () (psq-pop timers))
- (match-lambda*
- (((_ . thunk) timers)
- (thunk)
- (run-timers timers)))))))))
-
-(define (schedule-runnables-for-next-turn sched)
- ;; Called when all runnables from the current turn have been run.
- ;; Note that there may be runnables already scheduled for the next
- ;; turn; one way this can happen is if a fiber suspended itself
- ;; because it was blocked on a channel, but then another fiber woke
- ;; it up, or if a remote thread scheduled a fiber on this scheduler.
- ;; In any case, check the kernel to see if any of the fd's that we
- ;; are interested in are active, and in that case schedule their
- ;; corresponding fibers. Also run any timers that have timed out.
- (define (timers-expiry timers)
- (and (not (psq-empty? timers))
- (match (psq-min timers)
- ((expiry . thunk)
- expiry))))
- (define (update-expiry expiry)
- ;; If there are pending runnables, cause epoll to return
- ;; immediately.
- (if (stack-empty? (scheduler-next-runqueue sched))
- expiry
- 0))
- (epoll (scheduler-epfd sched)
- #:expiry (timers-expiry (scheduler-timers sched))
- #:update-expiry update-expiry
- #:folder (lambda (fd revents sched)
- (schedule-fibers-for-fd fd revents sched)
- sched)
- #:seed sched)
- (run-timers sched))
-
-(define (fiber-stealer sched)
- "Steal some work from a random scheduler in the vector
-@var{schedulers}. Return a fiber, or @code{#f} if no work could be
-stolen."
- (let ((selector (make-selector (scheduler-remote-peers sched))))
- (lambda ()
- (let ((peer (selector)))
- (and peer
- (stack-pop! (scheduler-current-runqueue peer) #f))))))
-
-(define (scheduler-work-pending? sched)
- "Return @code{#t} if @var{sched} has any work pending: any runnable
-fibers or any pending timeouts."
- (not (and (psq-empty? (scheduler-timers sched))
- (stack-empty? (scheduler-current-runqueue sched))
- (stack-empty? (scheduler-next-runqueue sched)))))
-
-(define* (run-scheduler sched finished?)
- "Run @var{sched} until there are no more fibers ready to run, no
-file descriptors being waited on, and no more timers pending to run.
-Return zero values."
- (let ((tag (scheduler-prompt-tag sched))
- (runcount-box (scheduler-runcount-box sched))
- (next (scheduler-next-runqueue sched))
- (cur (scheduler-current-runqueue sched))
- (steal-fiber! (fiber-stealer sched)))
- (define (run-fiber fiber)
- (atomic-box-set! runcount-box
- (logand (1+ (atomic-box-ref runcount-box)) #xffffFFFF))
- (call-with-prompt tag
- (lambda ()
- (let ((thunk (fiber-continuation fiber)))
- (set-fiber-continuation! fiber #f)
- (thunk)))
- (lambda (k after-suspend)
- (set-fiber-continuation! fiber k)
- (after-suspend fiber))))
- (let next-turn ()
- (unless (finished?)
- (schedule-runnables-for-next-turn sched)
- (stack-push-list! cur (reverse (stack-pop-all! next)))
- (let next-fiber ()
- (match (stack-pop! cur #f)
- (#f
- (when (stack-empty? next)
- ;; Both current and next runqueues are empty; steal a
- ;; little bit of work from a remote scheduler if we
- ;; can. Run it directly instead of pushing onto a
- ;; queue to avoid double stealing.
- (let ((fiber (steal-fiber!)))
- (when fiber
- (set-fiber-scheduler! fiber sched)
- (run-fiber fiber))))
- (next-turn))
- (fiber
- (run-fiber fiber)
- (next-fiber))))))))
-
-(define (destroy-scheduler sched)
- "Release any resources associated with @var{sched}."
- #;
- (for-each kill-fiber (list-copy (scheduler-fibers sched)))
- (epoll-destroy (scheduler-epfd sched)))
-
-(define (create-fiber sched thunk)
- "Spawn a new fiber in @var{sched} with the continuation @var{thunk}.
-The fiber will be scheduled on the next turn. @var{thunk} will run
-with a copy of the current dynamic state, isolating fluid and
-parameter mutations to the fiber."
- (let* ((fiber (make-fiber sched #f))
- (thunk (let ((dynamic-state (current-dynamic-state)))
- (lambda ()
- (with-dynamic-state
- dynamic-state
- (lambda ()
- (current-fiber fiber)
- (catch #t
- (lambda ()
- (%start-stack #t thunk))
- (lambda _ #f)
- (let ((err (current-error-port)))
- (lambda (key . args)
- (false-if-exception
- (let ((stack (make-stack #t 4)))
- (format err "Uncaught exception in fiber #~a:\n"
- (nameset-ref fibers-nameset fiber))
- (display-backtrace stack err)
- (print-exception err (stack-ref stack 0)
- key args))))))))))))
- (nameset-add! fibers-nameset fiber)
- (schedule-fiber! fiber thunk)))
-
-(define (kill-fiber fiber)
- "Try to kill @var{fiber}, causing it to raise an exception. Note
-that this is currently unimplemented!"
- (error "kill-fiber is unimplemented"))
-
-;; Shim for Guile 2.1.5.
-(unless (defined? 'suspendable-continuation?)
- (define! 'suspendable-continuation? (lambda (tag) #t)))
-
-;; The AFTER-SUSPEND thunk allows the user to suspend the current
-;; fiber, saving its state, and then perform some other nonlocal
-;; control flow.
-;;
-(define* (suspend-current-fiber #:optional
- (after-suspend (lambda (fiber) #f)))
- "Suspend the current fiber. Call the optional @var{after-suspend}
-callback, if present, with the suspended thread as its argument."
- (let ((tag (scheduler-prompt-tag (fiber-scheduler (current-fiber)))))
- (unless (suspendable-continuation? tag)
- (error "Attempt to suspend fiber within continuation barrier"))
- ((abort-to-prompt tag after-suspend))))
-
-(define* (resume-fiber fiber thunk)
- "Resume @var{fiber}, adding it to the run queue of its scheduler.
-The fiber will start by applying @var{thunk}. A fiber @emph{must}
-only be resumed when it is suspended. This function is thread-safe
-even if @var{fiber} is running on a remote scheduler."
- (let ((cont (fiber-continuation fiber)))
- (unless cont (error "invalid fiber" fiber))
- (schedule-fiber! fiber (lambda () (cont thunk)))))
-
-(define* (yield-current-fiber)
- "Yield control to the current scheduler. Like
-@code{suspend-current-fiber} followed directly by @code{resume-fiber},
-except that it avoids suspending if the current continuation isn't
-suspendable. Returns @code{#t} if the yield succeeded, or @code{#f}
-otherwise."
- (match (current-fiber)
- (#f #f)
- (fiber
- (let ((tag (scheduler-prompt-tag (fiber-scheduler fiber))))
- (and (suspendable-continuation? tag)
- (begin
- (abort-to-prompt tag (lambda (fiber) (resume-fiber fiber #f)))
- #t))))))
-
-(define (finalize-fd sources)
- "Remove data associated with @var{fd} from the scheduler @var{ctx}.
-Called by Guile just before Guile goes to close a file descriptor, in
-response either to an explicit call to @code{close-port}, or because
-the port became unreachable. In the latter case, this call may come
-from a finalizer thread."
- ;; When a file descriptor is closed, the kernel silently removes it
- ;; from any associated epoll sets, so we don't need to do anything
- ;; there. But because this call could come from any thread,
- ;; especially given the fact that the fiber might be migrated, we
- ;; have to operate locally, just nulling out the sources pair and
- ;; not mucking with the sources table.
- ;;
- ;; FIXME: Wake all sources with EPOLLERR.
- (set-cdr! sources '())
- (set-car! sources #f))
-
-(define (add-fd-event-waiter sched fd events resume)
- "Arrange to resume @var{fiber} when the file descriptor @var{fd} has
-the given @var{events}, expressed as an epoll bitfield."
- (let ((sources (hashv-ref (scheduler-sources sched) fd)))
- (match sources
- ((active-events . waiters)
- (set-cdr! sources (acons events resume waiters))
- (unless (and active-events
- (= (logand events active-events) events))
- (let ((active-events (logior events (or active-events 0))))
- (set-car! sources active-events)
- (epoll-add*! (scheduler-epfd sched) fd
- (logior active-events EPOLLONESHOT)))))
- (#f
- (let ((sources (list events (cons events resume))))
- (hashv-set! (scheduler-sources sched) fd sources)
- (add-fdes-finalizer! fd (lambda (fd) (finalize-fd sources)))
- (epoll-add*! (scheduler-epfd sched) fd
- (logior events EPOLLONESHOT)))))))
-
-(define (resume-on-fd-events fd events fiber)
- "Arrange to resume @var{fiber} when the file descriptor @var{fd} has
-the given @var{events}, expressed as an epoll bitfield."
- (add-fd-event-waiter (fiber-scheduler fiber) fd events
- (lambda (revents)
- (resume-fiber fiber (lambda () revents)))))
-
-(define (resume-on-readable-fd fd fiber)
- "Arrange to resume @var{fiber} when the file descriptor @var{fd}
-becomes readable."
- (resume-on-fd-events fd (logior EPOLLIN EPOLLRDHUP) fiber))
-
-(define (resume-on-writable-fd fd fiber)
- "Arrange to resume @var{fiber} when the file descriptor @var{fd}
-becomes writable."
- (resume-on-fd-events fd EPOLLOUT fiber))
-
-(define (add-timer sched expiry thunk)
- "Arrange to call @var{thunk} when the absolute real time is greater
-than or equal to @var{expiry}, expressed in internal time units."
- (set-scheduler-timers! sched
- (psq-set (scheduler-timers sched)
- (cons expiry thunk)
- expiry)))
diff --git a/fibers/operations.scm b/fibers/operations.scm
index ef2969c..ac55507 100644
--- a/fibers/operations.scm
+++ b/fibers/operations.scm
@@ -56,7 +56,7 @@
make-mutex make-condition-variable
lock-mutex unlock-mutex
wait-condition-variable signal-condition-variable))
- #:use-module (fibers internal)
+ #:use-module (fibers scheduler)
#:export (wrap-operation
choice-operation
perform-operation
@@ -151,11 +151,12 @@ the operation cannot complete directly, block until it can complete."
;; succeeds. Otherwise we block the current thread until the
;; operation succeeds, to allow for communication between fibers
;; and foreign threads.
- (if (current-fiber)
- (suspend-current-fiber
- (lambda (fiber)
- (define (resume thunk) (resume-fiber fiber thunk))
- (block (fiber-scheduler fiber) resume)))
+ (if (current-scheduler)
+ ((suspend-current-task
+ (lambda (sched k)
+ (define (resume thunk)
+ (schedule-task sched (lambda () (k thunk))))
+ (block sched resume))))
(let ((k #f)
(thread (current-thread))
(mutex (make-mutex))
diff --git a/fibers/repl.scm b/fibers/repl.scm
index 3331702..69ee164 100644
--- a/fibers/repl.scm
+++ b/fibers/repl.scm
@@ -25,15 +25,27 @@
#:use-module ((ice-9 threads)
#:select (call-with-new-thread cancel-thread join-thread))
#:use-module (fibers)
- #:use-module (fibers internal))
+ #:use-module (fibers nameset)
+ #:use-module (fibers scheduler))
+
+(define-once schedulers-nameset (make-nameset))
+
+(define (fold-all-schedulers f seed)
+ "Fold @var{f} over the set of known schedulers. @var{f} will be
+invoked as @code{(@var{f} @var{name} @var{scheduler} @var{seed})}."
+ (nameset-fold f schedulers-nameset seed))
+(define (scheduler-by-name name)
+ "Return the scheduler named @var{name}, or @code{#f} if no scheduler
+of that name is known."
+ (nameset-ref schedulers-nameset name))
(define repl-current-scheds (make-doubly-weak-hash-table))
(define (repl-current-sched repl)
(hashq-ref repl-current-scheds repl))
-(define (repl-set-current-sched! repl sched verbose?)
+(define (repl-set-current-sched! repl name sched verbose?)
(when verbose?
(format #t "Scheduler ~a on thread ~a is now current\n."
- (scheduler-name sched) (scheduler-kernel-thread sched)))
+ name (scheduler-kernel-thread sched)))
(hashq-set! repl-current-scheds repl sched))
(define* (repl-ensure-current-sched repl #:optional (verbose? #t))
(define (sched-alive? sched)
@@ -44,19 +56,20 @@
(match scheds
(()
(let* ((sched (make-scheduler))
+ (name (nameset-add! schedulers-nameset sched))
(thread (call-with-new-thread
(lambda ()
(run-fibers #:scheduler sched)))))
(when verbose?
(format #t "No active schedulers; spawned a new one (#~a).\n"
- (scheduler-name sched)))
- (repl-set-current-sched! repl sched verbose?)
+ name))
+ (repl-set-current-sched! repl name sched verbose?)
sched))
(((id . (and sched (? sched-alive?))) . scheds)
(when verbose?
(format #t "No current scheduler; choosing scheduler #~a randomly.\n"
- (scheduler-name sched)))
- (repl-set-current-sched! repl sched verbose?)
+ id))
+ (repl-set-current-sched! repl id sched verbose?)
sched)))))
(define-meta-command ((scheds fibers) repl)
@@ -78,54 +91,28 @@ Show a list of schedulers."
(define-meta-command ((spawn-sched fibers) repl)
"spawn-sched
Create a new scheduler for fibers, and run it on a new kernel thread."
- (let ((sched (make-scheduler)))
+ (let* ((sched (make-scheduler))
+ (name (nameset-add! schedulers-nameset sched)))
(call-with-new-thread (lambda ()
(call-with-new-thread
(lambda ()
(run-fibers #:scheduler sched)))))
- (format #t "Spawned scheduler #~a.\n" (scheduler-name sched))))
+ (format #t "Spawned scheduler #~a.\n" name)))
-(define-meta-command ((kill-sched fibers) repl sched)
- "kill-sched SCHED
+(define-meta-command ((kill-sched fibers) repl name)
+ "kill-sched NAME
Shut down a scheduler."
- (let ((sched (or (scheduler-by-name sched)
- (error "no scheduler with name" sched))))
+ (let ((sched (or (scheduler-by-name name)
+ (error "no scheduler with name" name))))
(cond
((scheduler-kernel-thread sched)
=> (lambda (thread)
- (format #t "Killing thread running scheduler #~a...\n"
- (scheduler-name sched))
+ (format #t "Killing thread running scheduler #~a...\n" name)
(cancel-thread thread)
(join-thread thread)
- (format #t "Thread running scheduler #~a stopped.\n"
- (scheduler-name sched))))
+ (format #t "Thread running scheduler #~a stopped.\n" name)))
(else
- (format #t "Scheduler #~a not running.\n" (scheduler-name sched))))))
-
-(define-meta-command ((fibers fibers) repl #:optional sched)
- "fibers [SCHED]
-Show a list of fibers.
-
-If SCHED is given, limit to fibers bound to the given scheduler."
- (let ((sched (and sched
- (or (scheduler-by-name sched)
- (error "no scheduler with name" sched)))))
- (match (sort (fold-all-fibers acons '())
- (match-lambda*
- (((id1 . _) (id2 . _)) (< id1 id2))))
- (() (format #t "No fibers.\n"))
- (fibers
- (format #t "~a ~8t~a\n" "fiber" "state")
- (format #t "~a ~8t~a\n" "-----" "-----")
- (for-each
- (match-lambda
- ((id . fiber)
- ;; How to show fiber data? Would be nice to say "suspended
- ;; at foo.scm:32:4".
- (when (or (not sched) (eq? (fiber-scheduler fiber) sched))
- (format #t "~a ~8t~a\n" id
- (if (fiber-continuation fiber) "(suspended)" "")))))
- fibers)))))
+ (format #t "Scheduler #~a not running.\n" name)))))
(define-meta-command ((spawn-fiber fibers) repl (form) #:optional sched)
"spawn-fiber EXP [SCHED]
@@ -135,8 +122,3 @@ If SCHED is given, the fiber will be spawned on the given scheduler."
(let ((thunk (repl-prepare-eval-thunk repl (repl-parse repl form)))
(sched (repl-ensure-current-sched repl)))
(spawn-fiber thunk sched)))
-
-(define-meta-command ((kill-fiber fibers) repl fiber)
- "kill-fiber FIBER
-Shut down a fiber."
- (display "Don't know how to do that yet!\n"))
diff --git a/fibers/scheduler.scm b/fibers/scheduler.scm
new file mode 100644
index 0000000..28a08b3
--- /dev/null
+++ b/fibers/scheduler.scm
@@ -0,0 +1,383 @@
+;; Fibers: cooperative, event-driven user-space threads.
+
+;;;; Copyright (C) 2016 Free Software Foundation, Inc.
+;;;;
+;;;; This library is free software; you can redistribute it and/or
+;;;; modify it under the terms of the GNU Lesser General Public
+;;;; License as published by the Free Software Foundation; either
+;;;; version 3 of the License, or (at your option) any later version.
+;;;;
+;;;; This library is distributed in the hope that it will be useful,
+;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;;; Lesser General Public License for more details.
+;;;;
+;;;; You should have received a copy of the GNU Lesser General Public
+;;;; License along with this library; if not, write to the Free Software
+;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+;;;;
+
+(define-module (fibers scheduler)
+ #:use-module (srfi srfi-9)
+ #:use-module (fibers stack)
+ #:use-module (fibers epoll)
+ #:use-module (fibers psq)
+ #:use-module (ice-9 atomic)
+ #:use-module (ice-9 control)
+ #:use-module (ice-9 match)
+ #:use-module (ice-9 fdes-finalizers)
+ #:use-module ((ice-9 threads) #:select (current-thread))
+ #:export (;; Low-level interface: schedulers and tasks.
+ make-scheduler
+ (current-scheduler/public . current-scheduler)
+ scheduler-runcount
+ (scheduler-kernel-thread/public . scheduler-kernel-thread)
+ scheduler-remote-peers
+ scheduler-work-pending?
+ choose-parallel-scheduler
+ run-scheduler
+ destroy-scheduler
+
+ schedule-task
+ schedule-task-when-fd-readable
+ schedule-task-when-fd-writable
+ schedule-task-at-time
+
+ suspend-current-task
+ yield-current-task))
+
+(define-record-type <scheduler>
+ (%make-scheduler epfd runcount-box prompt-tag
+ next-runqueue current-runqueue
+ fd-waiters timers kernel-thread
+ remote-peers choose-parallel-scheduler)
+ scheduler?
+ (epfd scheduler-epfd)
+ ;; atomic variable of uint32
+ (runcount-box scheduler-runcount-box)
+ (prompt-tag scheduler-prompt-tag)
+ ;; atomic stack of tasks to run next turn (reverse order)
+ (next-runqueue scheduler-next-runqueue)
+ ;; atomic stack of tasks to run this turn
+ (current-runqueue scheduler-current-runqueue)
+ ;; fd -> (total-events (events . task) ...)
+ (fd-waiters scheduler-fd-waiters)
+ ;; PSQ of expiry -> task
+ (timers scheduler-timers set-scheduler-timers!)
+ ;; atomic parameter of thread
+ (kernel-thread scheduler-kernel-thread)
+ ;; list of sched
+ (remote-peers scheduler-remote-peers set-scheduler-remote-peers!)
+ ;; () -> sched
+ (choose-parallel-scheduler scheduler-choose-parallel-scheduler
+ set-scheduler-choose-parallel-scheduler!))
+
+(define (make-atomic-parameter init)
+ (let ((box (make-atomic-box init)))
+ (case-lambda
+ (() (atomic-box-ref box))
+ ((new)
+ (if (eq? new init)
+ (atomic-box-set! box new)
+ (let ((prev (atomic-box-compare-and-swap! box init new)))
+ (unless (eq? prev init)
+ (error "owned by other thread" prev))))))))
+
+(define (shuffle l)
+ (map cdr (sort (map (lambda (x) (cons (random 1.0) x)) l)
+ (lambda (a b) (< (car a) (car b))))))
+
+(define (make-selector items)
+ (let ((items (list->vector (shuffle items))))
+ (match (vector-length items)
+ (0 (lambda () #f))
+ (1 (let ((item (vector-ref items 0))) (lambda () item)))
+ (n (let ((idx 0))
+ (lambda ()
+ (let ((item (vector-ref items idx)))
+ (set! idx (let ((idx (1+ idx)))
+ (if (= idx (vector-length items)) 0 idx)))
+ item)))))))
+
+(define* (make-scheduler #:key parallelism
+ (prompt-tag (make-prompt-tag "fibers")))
+ "Make a new scheduler in which to run fibers."
+ (let ((epfd (epoll-create))
+ (runcount-box (make-atomic-box 0))
+ (next-runqueue (make-empty-stack))
+ (current-runqueue (make-empty-stack))
+ (fd-waiters (make-hash-table))
+ (timers (make-psq (match-lambda*
+ (((t1 . c1) (t2 . c2)) (< t1 t2)))
+ <))
+ (kernel-thread (make-atomic-parameter #f)))
+ (let* ((sched (%make-scheduler epfd runcount-box prompt-tag
+ next-runqueue current-runqueue
+ fd-waiters timers kernel-thread
+ #f #f))
+ (all-scheds
+ (cons sched
+ (if parallelism
+ (map (lambda (_)
+ (make-scheduler #:prompt-tag prompt-tag))
+ (iota (1- parallelism)))
+ '()))))
+ (for-each
+ (lambda (sched)
+ (let ((choose! (make-selector all-scheds)))
+ (set-scheduler-remote-peers! sched (delq sched all-scheds))
+ (set-scheduler-choose-parallel-scheduler! sched choose!)))
+ all-scheds)
+ sched)))
+
+(define current-scheduler (fluid->parameter (make-thread-local-fluid #f)))
+(define (current-scheduler/public)
+ "Return the current scheduler, or @code{#f} if no scheduler is current."
+ (current-scheduler))
+
+(define-syntax-rule (with-scheduler scheduler body ...)
+ "Evaluate @code{(begin @var{body} ...)} in an environment in which
+@var{scheduler} is bound to the current kernel thread and
+@code{current-scheduler} is bound to @var{scheduler}. Signal an error
+if @var{scheduler} is already running in some other kernel thread."
+ (let ((sched scheduler))
+ (dynamic-wind (lambda ()
+ ((scheduler-kernel-thread sched) (current-thread)))
+ (lambda ()
+ (parameterize ((current-scheduler sched))
+ body ...))
+ (lambda ()
+ ((scheduler-kernel-thread sched) #f)))))
+
+(define (scheduler-runcount sched)
+ "Return the number of tasks that have been run on @var{sched} since
+it was started, modulo 2@sup{32}."
+ (atomic-box-ref (scheduler-runcount-box sched)))
+
+(define (scheduler-kernel-thread/public sched)
+ "Return the kernel thread on which @var{sched} is running, or
+@code{#f} if @var{sched} is not running."
+ ((scheduler-kernel-thread sched)))
+
+(define (choose-parallel-scheduler sched)
+ ((scheduler-choose-parallel-scheduler sched)))
+
+(define-inlinable (schedule-task/no-wakeup sched task)
+ (stack-push! (scheduler-next-runqueue sched) task))
+
+(define (schedule-task sched task)
+ "Add the task @var{task} to the run queue of the scheduler
+@var{sched}. On the next turn, @var{sched} will invoke @var{task}
+with no arguments.
+
+This function is thread-safe even if @var{sched} is running on a
+remote kernel thread."
+ (schedule-task/no-wakeup sched task)
+ (unless (eq? ((scheduler-kernel-thread sched)) (current-thread))
+ (epoll-wake! (scheduler-epfd sched)))
+ (values))
+
+(define (schedule-tasks-for-active-fd fd revents sched)
+ (match (hashv-ref (scheduler-fd-waiters sched) fd)
+ (#f (warn "scheduler for unknown fd" fd))
+ ((and events+waiters (active-events . waiters))
+ ;; First, clear the active status, as the EPOLLONESHOT has
+ ;; deactivated our entry in the epoll set.
+ (set-car! events+waiters #f)
+ (set-cdr! events+waiters '())
+ (unless (zero? (logand revents EPOLLERR))
+ (hashv-remove! (scheduler-fd-waiters sched) fd))
+ ;; Now resume or re-schedule waiters, as appropriate.
+ (let lp ((waiters waiters))
+ (match waiters
+ (() #f)
+ (((events . task) . waiters)
+ (if (zero? (logand revents (logior events EPOLLERR)))
+ ;; Re-schedule.
+ (schedule-task-when-fd-active sched fd events task)
+ ;; Resume.
+ (schedule-task/no-wakeup sched task))
+ (lp waiters)))))))
+
+(define (schedule-tasks-for-expired-timers sched)
+ ;; Schedule expired timer tasks in the order that they expired.
+ (let ((now (get-internal-real-time)))
+ (let expire-timers ((timers (scheduler-timers sched)))
+ (cond
+ ((or (psq-empty? timers)
+ (< now (car (psq-min timers))))
+ (set-scheduler-timers! sched timers))
+ (else
+ (call-with-values (lambda () (psq-pop timers))
+ (match-lambda*
+ (((_ . task) timers)
+ (schedule-task/no-wakeup sched task)
+ (expire-timers timers)))))))))
+
+(define (schedule-tasks-for-next-turn sched)
+ ;; Called when all tasks from the current turn have been run.
+ ;; Note that there may be tasks already scheduled for the next
+ ;; turn; one way this can happen is if a fiber suspended itself
+ ;; because it was blocked on a channel, but then another fiber woke
+ ;; it up, or if a remote thread scheduled a fiber on this scheduler.
+ ;; In any case, check the kernel to see if any of the fd's that we
+ ;; are interested in are active, and in that case schedule their
+ ;; corresponding tasks. Also run any timers that have timed out.
+ (define (timers-expiry timers)
+ (and (not (psq-empty? timers))
+ (match (psq-min timers)
+ ((expiry . task)
+ expiry))))
+ (define (update-expiry expiry)
+ ;; If there are pending tasks, cause epoll to return
+ ;; immediately.
+ (if (stack-empty? (scheduler-next-runqueue sched))
+ expiry
+ 0))
+ (epoll (scheduler-epfd sched)
+ #:expiry (timers-expiry (scheduler-timers sched))
+ #:update-expiry update-expiry
+ #:folder (lambda (fd revents sched)
+ (schedule-tasks-for-active-fd fd revents sched)
+ sched)
+ #:seed sched)
+ (schedule-tasks-for-expired-timers sched))
+
+(define (work-stealer sched)
+ "Steal some work from a random scheduler in the vector
+@var{schedulers}. Return a task, or @code{#f} if no work could be
+stolen."
+ (let ((selector (make-selector (scheduler-remote-peers sched))))
+ (lambda ()
+ (let ((peer (selector)))
+ (and peer
+ (stack-pop! (scheduler-current-runqueue peer) #f))))))
+
+(define (scheduler-work-pending? sched)
+ "Return @code{#t} if @var{sched} has any work pending: any tasks or
+any pending timeouts."
+ (not (and (psq-empty? (scheduler-timers sched))
+ (stack-empty? (scheduler-current-runqueue sched))
+ (stack-empty? (scheduler-next-runqueue sched)))))
+
+(define* (run-scheduler sched finished?)
+ "Run @var{sched} until calling @code{finished?} returns a true
+value. Return zero values."
+ (let ((tag (scheduler-prompt-tag sched))
+ (runcount-box (scheduler-runcount-box sched))
+ (next (scheduler-next-runqueue sched))
+ (cur (scheduler-current-runqueue sched))
+ (steal-work! (work-stealer sched)))
+ (define (run-task task)
+ (atomic-box-set! runcount-box
+ (logand (1+ (atomic-box-ref runcount-box)) #xffffFFFF))
+ (call-with-prompt tag
+ task
+ (lambda (k after-suspend)
+ (after-suspend sched k))))
+ (with-scheduler
+ sched
+ (let next-turn ()
+ (unless (finished?)
+ (schedule-tasks-for-next-turn sched)
+ (stack-push-list! cur (reverse (stack-pop-all! next)))
+ (let next-task ()
+ (match (stack-pop! cur #f)
+ (#f
+ (when (stack-empty? next)
+ ;; Both current and next runqueues are empty; steal a
+ ;; little bit of work from a remote scheduler if we
+ ;; can. Run it directly instead of pushing onto a
+ ;; queue to avoid double stealing.
+ (let ((task (steal-work!)))
+ (when task
+ (run-task task))))
+ (next-turn))
+ (task
+ (run-task task)
+ (next-task)))))))))
+
+(define (destroy-scheduler sched)
+ "Release any resources associated with @var{sched}."
+ (epoll-destroy (scheduler-epfd sched)))
+
+(define (schedule-task-when-fd-active sched fd events task)
+ "Arrange for @var{sched} to schedule @var{task} when the file
+descriptor @var{fd} becomes active with any of the given @var{events},
+expressed as an epoll bitfield."
+ (let ((fd-waiters (hashv-ref (scheduler-fd-waiters sched) fd)))
+ (match fd-waiters
+ ((active-events . waiters)
+ (set-cdr! fd-waiters (acons events task waiters))
+ (unless (and active-events
+ (= (logand events active-events) events))
+ (let ((active-events (logior events (or active-events 0))))
+ (set-car! fd-waiters active-events)
+ (epoll-add*! (scheduler-epfd sched) fd
+ (logior active-events EPOLLONESHOT)))))
+ (#f
+ (let ((fd-waiters (list events (cons events task))))
+ (define (finalize-fd fd)
+ ;; When a file port is closed, clear out the list of
+ ;; waiting tasks so that when/if this FD is re-used, we
+ ;; don't resume stale tasks. Note that we don't need to
+ ;; remove the FD from the epoll set, as the kernel manages
+ ;; that for us.
+ ;;
+ ;; FIXME: Is there a way to wake all tasks in a thread-safe
+ ;; way? Note that this function may be invoked from a
+ ;; finalizer thread.
+ (set-cdr! fd-waiters '())
+ (set-car! fd-waiters #f))
+ (hashv-set! (scheduler-fd-waiters sched) fd fd-waiters)
+ (add-fdes-finalizer! fd finalize-fd)
+ (epoll-add*! (scheduler-epfd sched) fd
+ (logior events EPOLLONESHOT)))))))
+
+(define (schedule-task-when-fd-readable sched fd task)
+ "Arrange to schedule @var{task} on @var{sched} when the file
+descriptor @var{fd} becomes readable."
+ (schedule-task-when-fd-active sched fd (logior EPOLLIN EPOLLRDHUP) task))
+
+(define (run-task-when-fd-writable sched fd task)
+ "Arrange to schedule @var{k} on @var{sched} when the file descriptor
+@var{fd} becomes writable."
+ (schedule-task-when-fd-active sched fd EPOLLOUT task))
+
+(define (schedule-task-at-time sched expiry task)
+ "Arrange to schedule @var{task} when the absolute real time is
+greater than or equal to @var{expiry}, expressed in internal time
+units."
+ (set-scheduler-timers! sched
+ (psq-set (scheduler-timers sched)
+ (cons expiry task)
+ expiry)))
+
+;; Shim for Guile 2.1.5.
+(unless (defined? 'suspendable-continuation?)
+ (define! 'suspendable-continuation? (lambda (tag) #t)))
+
+(define* (suspend-current-task after-suspend)
+ "Suspend the current task. After suspending, call the
+@var{after-suspend} callback with two arguments: the current
+scheduler, and the continuation of the current task. The continuation
+passed to the @var{after-suspend} handler is the continuation of the
+@code{suspend-current-task} call."
+ (let ((tag (scheduler-prompt-tag (current-scheduler))))
+ (unless (suspendable-continuation? tag)
+ (error "Attempt to suspend fiber within continuation barrier"))
+ (abort-to-prompt tag after-suspend)))
+
+(define* (yield-current-task)
+ "Yield control to the current scheduler. Like calling
+@code{(suspend-current-task schedule-task)} except that it avoids
+suspending if the current continuation isn't suspendable. Returns
+@code{#t} if the yield succeeded, or @code{#f} otherwise."
+ (match (current-scheduler)
+ (#f #f)
+ (sched
+ (let ((tag (scheduler-prompt-tag sched)))
+ (and (suspendable-continuation? tag)
+ (begin
+ (abort-to-prompt tag schedule-task)
+ #t))))))
diff --git a/fibers/timers.scm b/fibers/timers.scm
index 505939f..8bdbea3 100644
--- a/fibers/timers.scm
+++ b/fibers/timers.scm
@@ -18,7 +18,7 @@
;;;;
(define-module (fibers timers)
- #:use-module (fibers internal)
+ #:use-module (fibers scheduler)
#:use-module (fibers operations)
#:use-module (ice-9 atomic)
#:use-module (ice-9 match)
@@ -39,7 +39,7 @@
(call-with-new-thread
(lambda ()
(define (finished?) #f)
- (with-scheduler sched (run-scheduler sched finished?))))
+ (run-scheduler sched finished?)))
sched)))))
(define (timer-operation expiry)
@@ -57,11 +57,12 @@ units. The operation will succeed with no values."
('C (timer))
('S #f)))
(if sched
- (add-timer sched expiry timer)
- (create-fiber (timer-sched)
- (lambda ()
- (perform-operation (timer-operation expiry))
- (timer)))))))
+ (schedule-task-at-time sched expiry timer)
+ (schedule-task
+ (timer-sched)
+ (lambda ()
+ (perform-operation (timer-operation expiry))
+ (timer)))))))
(define (sleep-operation seconds)
"Make an operation that will succeed with no values when