summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Wingo <wingo@pobox.com>2017-08-06 12:59:50 +0200
committerAndy Wingo <wingo@pobox.com>2017-08-06 12:59:50 +0200
commit2a4f707827105d041668f34b62daaff871c22c07 (patch)
tree81797ea023d5c0ce12d7150aa0d5133406b5e305
parentGarbage collect old condition waiters. (diff)
downloadguile-fibers-2a4f707827105d041668f34b62daaff871c22c07.tar.gz
Rebase fibers on top of schedulers and tasks
This commit refactors fibers to be based on lighter-weight "tasks", and makes the scheduler API more orthogonal. Now there are no more fiber objects (although the fibers layer could re-add them if they are useful), and fibers no longer have names (although again this could be re-added at an upper layer). Also it's the current scheduler that's bound by a parameter, not the current fiber, and as it's a thread-safe parameter it doesn't need to be bound in each fiber.
-rw-r--r--Makefile.am2
-rw-r--r--fibers.scm78
-rw-r--r--fibers.texi227
-rw-r--r--fibers/channels.scm1
-rw-r--r--fibers/internal.scm489
-rw-r--r--fibers/operations.scm13
-rw-r--r--fibers/repl.scm78
-rw-r--r--fibers/scheduler.scm383
-rw-r--r--fibers/timers.scm15
-rw-r--r--tests/channels.scm3
-rw-r--r--tests/preemption.scm2
11 files changed, 583 insertions, 708 deletions
diff --git a/Makefile.am b/Makefile.am
index 4bc6222..676a1dc 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -31,12 +31,12 @@ SOURCES = \
fibers/config.scm \
fibers/deque.scm \
fibers/epoll.scm \
- fibers/internal.scm \
fibers/interrupts.scm \
fibers/nameset.scm \
fibers/operations.scm \
fibers/posix-clocks.scm \
fibers/psq.scm \
+ fibers/scheduler.scm \
fibers/stack.scm \
fibers/repl.scm \
fibers/timers.scm \
diff --git a/fibers.scm b/fibers.scm
index 2aa9354..208d39d 100644
--- a/fibers.scm
+++ b/fibers.scm
@@ -20,7 +20,7 @@
(define-module (fibers)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic)
- #:use-module (fibers internal)
+ #:use-module (fibers scheduler)
#:use-module (fibers repl)
#:use-module (fibers timers)
#:use-module (fibers interrupts)
@@ -29,16 +29,16 @@
#:select (port-read-wait-fd port-write-wait-fd))
#:use-module (ice-9 suspendable-ports)
#:export (run-fibers spawn-fiber)
- #:re-export (current-fiber sleep))
+ #:re-export (sleep))
(define (wait-for-readable port)
- (suspend-current-fiber
- (lambda (fiber)
- (resume-on-readable-fd (port-read-wait-fd port) fiber))))
+ (suspend-current-task
+ (lambda (sched k)
+ (schedule-task-when-fd-readable sched (port-read-wait-fd port) k))))
(define (wait-for-writable port)
- (suspend-current-fiber
- (lambda (fiber)
- (resume-on-writable-fd (port-read-wait-fd port) fiber))))
+ (suspend-current-task
+ (lambda (sched k)
+ (schedule-task-when-fd-writable sched (port-write-wait-fd port) k))))
(define-syntax-rule (with-affinity affinity exp ...)
(let ((saved #f))
@@ -53,19 +53,17 @@
(define (%run-fibers scheduler hz finished? affinity)
(with-affinity
affinity
- (with-scheduler
- scheduler
- (with-interrupts
- hz
- (let ((last-runcount 0))
- (lambda ()
- (let* ((runcount (scheduler-runcount scheduler))
- (res (eqv? runcount last-runcount)))
- (set! last-runcount runcount)
- res)))
- yield-current-fiber
- (lambda ()
- (run-scheduler scheduler finished?))))))
+ (with-interrupts
+ hz
+ (let ((last-runcount 0))
+ (lambda ()
+ (let* ((runcount (scheduler-runcount scheduler))
+ (res (eqv? runcount last-runcount)))
+ (set! last-runcount runcount)
+ res)))
+ yield-current-task
+ (lambda ()
+ (run-scheduler scheduler finished?)))))
(define (start-auxiliary-threads scheduler hz finished? affinities)
(for-each (lambda (sched affinity)
@@ -141,6 +139,31 @@
(apply values (atomic-box-ref ret))))))
(define* (spawn-fiber thunk #:optional sched #:key parallel?)
+ "Spawn a new fiber which will start by invoking @var{thunk}.
+The fiber will be scheduled on the next turn. @var{thunk} will run
+with a copy of the current dynamic state, isolating fluid and
+parameter mutations to the fiber."
+ (define (with-error-handling thunk)
+ (lambda ()
+ (catch #t
+ (lambda ()
+ (%start-stack #t thunk))
+ (lambda _ #f)
+ (let ((err (current-error-port)))
+ (lambda (key . args)
+ (false-if-exception
+ (let ((stack (make-stack #t 4)))
+ (format err "Uncaught exception in fiber:\n")
+ (display-backtrace stack err)
+ (print-exception err (stack-ref stack 0)
+ key args))))))))
+ (define (capture-dynamic-state thunk)
+ (let ((dynamic-state (current-dynamic-state)))
+ (lambda ()
+ (with-dynamic-state dynamic-state thunk))))
+ (define (create-fiber sched thunk)
+ (schedule-task sched
+ (capture-dynamic-state (with-error-handling thunk))))
(cond
(sched
;; When a scheduler is passed explicitly, it could be there is no
@@ -152,12 +175,11 @@
(current-read-waiter wait-for-readable)
(current-write-waiter wait-for-writable)
(thunk))))
- ((current-fiber)
- => (lambda (fiber)
- (let ((sched (fiber-scheduler fiber)))
- (create-fiber (if parallel?
- (choose-parallel-scheduler sched)
- sched)
- thunk))))
+ ((current-scheduler)
+ => (lambda (sched)
+ (create-fiber (if parallel?
+ (choose-parallel-scheduler sched)
+ sched)
+ thunk)))
(else
(error "No scheduler current; call within run-fibers instead"))))
diff --git a/fibers.texi b/fibers.texi
index 09f8efd..be1f328 100644
--- a/fibers.texi
+++ b/fibers.texi
@@ -4,8 +4,8 @@
@settitle Fibers
@c %**end of header
-@set VERSION 1.0.0
-@set UPDATED 18 February 2016
+@set VERSION 1.1.0
+@set UPDATED 6 August 2017
@copying
This manual is for Fibers (version @value{VERSION}, updated
@@ -448,13 +448,13 @@ interface, base support for asynchronous operations, implementations
of operations for channels and timers, and an internals interface.
@menu
-* Using Fibers:: User-facing interface to fibers
-* Operations:: Composable abstractions for concurrency.
-* Channels:: Share memory by communicating.
-* Timers:: Operations on time.
-* Conditions:: Waiting for simple state changes.
-* REPL Commands:: Experimenting with Fibers at the console.
-* Internals:: Scheduler and fiber objects and operations.
+* Using Fibers:: User-facing interface to fibers
+* Operations:: Composable abstractions for concurrency.
+* Channels:: Share memory by communicating.
+* Timers:: Operations on time.
+* Conditions:: Waiting for simple state changes.
+* REPL Commands:: Experimenting with Fibers at the console.
+* Schedulers and Tasks:: Fibers are built from lower-level primitives.
@end menu
@node Using Fibers
@@ -500,10 +500,10 @@ or writes to prevent other fibers from running, pass @code{#f} as the
By default, @code{run-fibers} will create a fresh scheduler, and
destroy it after @code{run-fibers} finishes. If you happen to have a
-pre-existing scheduler (because you used the internals interface to
-create one), you can pass it to @code{run-fibers} using the
-@code{#:scheduler} keyword argument. In that case the scheduler will
-not be destroyed when @code{run-fibers} finishes.
+pre-existing scheduler (because you used the low-level scheduler
+interface to create one), you can pass it to @code{run-fibers} using
+the @code{#:scheduler} keyword argument. In that case the scheduler
+will not be destroyed when @code{run-fibers} finishes.
@code{run-fibers} will return when the @var{init-thunk} call returns.
To make it additionally wait until there are no more runnable fibers
@@ -546,11 +546,6 @@ state) in place when @code{spawn-fiber} is called. Any
fluid or parameter bindings outside the fiber.
@end defun
-@defun current-fiber
-Return the current fiber, or @code{#f} if not called within the
-dynamic extent of a thunk passed to @code{spawn-fiber}.
-@end defun
-
@defun sleep seconds
Wake up the current fiber after @var{seconds} of wall-clock time have
elapsed. This definition will replace the binding for @code{sleep} in
@@ -743,57 +738,82 @@ Show a list of all schedulers.
Create a new scheduler for fibers, and run it on a new kernel thread.
@end deffn
-@deffn {REPL Command} kill-sched sched
-Shut down the scheduler named @var{sched}. Use @code{,scheds} to list
+@deffn {REPL Command} kill-sched name
+Shut down the scheduler named @var{name}. Use @code{,scheds} to list
scheduler names.
@end deffn
-@deffn {REPL Command} fibers [sched]
-Show a list of all fibers. If @var{sched} is given, limit to fibers
-bound to the given scheduler.
-@end deffn
-
@deffn {REPL Command} spawn-fiber exp [sched]
Spawn a new fiber that runs @var{exp}. If @var{sched} is given, the
fiber will be spawned on the given scheduler.
@end deffn
-@deffn {REPL Command} kill-fiber fiber
-Shut down a fiber.
-@end deffn
+@node Schedulers and Tasks
+@section Schedulers and Tasks
+
+Internally, fibers are built on top of schedulers and tasks.
-@node Internals
-@section Internals
+A scheduler runs tasks. A task is just a thunk (a function of no
+arguments) whose return value is ignored. A scheduler runs tasks in
+batches, once per turn. Each turn, a scheduler takes all tasks from
+its ``next'' run-queue and adds them to its ``current'' run-queue, and
+then runs the tasks on the current run-queue in order. The scheduler
+then goes to the next turn, unless its ``finished?'' function returns
+true.
-These internal interfaces are a bit dangerous, in the sense that if
-they are used wrongly, they can corrupt the state of your program.
-For example, the scheduler has some specific mechanisms to ensure
-thread-safety, and not all of the procedures in this module can be
-invoked on a scheduler from any thread. We will document them at some
-point, but for now this section is a stub.
+Both the ``next'' and the ``current'' run-queues are public atomic
+data structures. Scheduling a task adds it to the ``next'' run-queue.
+Scheduling a task is a thread-safe operation; it can be done by any
+thread. Scheduling a task on a scheduler running on a remote thread
+will ensure that the thread wakes up from any sleeping operation it
+might be currently engaged in.
+
+There is normally just one scheduler for each kernel thread that runs
+fibers. Several schedulers can be made aware of each other so that
+they can one can spread out the load when spawning tasks that should
+run in parallel. Also, before moving to the next turn, a scheduler
+will try to steal work from other schedulers that it knows about,
+popping off an item from the remote scheduler's ``current'' run-queue.
+
+There are two additional sources of tasks for a scheduler: file
+descriptor events and timers. When gathering tasks to schedule for
+the next turn, a scheduler will call @code{epoll} to be notified of
+activity on file descriptors of interest. If there are no pending
+tasks on the next run-queue, the call to @code{epoll} will sleep until
+the scheduler is woken up, or until a timer expires.
+
+The capability that allows fibers to be built on schedulers is that
+tasks can suspend. Suspending a task calls a user-supplied
+after-suspend handler that is passed the continuation of the task.
+The user can then schedule that continuation at some later time. In
+this way a fiber starts as a single task run by a scheduler, but each
+time it suspends and is resumed, a fresh task consisting of the
+fiber's is scheduled. The fibers layer also uses other Guile
+mechanisms to isolate fibers from each other, such as dynamic states.
+
+All interfaces in this module are thread-safe except where marked
+otherwise.
@example
-(use-modules (fibers internal))
+(use-modules (fibers scheduler))
@end example
@defun make-scheduler [#:parallelism=@code{#f}] @
[#:prompt-tag=@code{(make-prompt-tag "fibers")}]
-
Make a new scheduler in which to run fibers. If @var{parallelism} is
true, it should be an integer indicating the number of schedulers to
make. The resulting schedulers will all share the same prompt tag and
will steal and share out work from among themselves.
@end defun
-@defspec with-scheduler scheduler body ...
-Evaluate @code{(begin @var{body} ...)} in an environment in which
-@var{scheduler} is bound to the current kernel thread. Signal an
-error if @var{scheduler} is already running in some other kernel
-thread.
-@end defspec
+@defun run-scheduler sched finished?
+Run @var{sched} until calling the supplied @var{finished?} thunk
+returns true. Return zero values. Signal an error if @var{scheduler}
+is already running in some other kernel thread.
+@end defun
-@defun scheduler-name sched
-Return the name of @var{sched}.
+@defun current-scheduler
+Return the current scheduler, or @code{#f} if no scheduler is current.
@end defun
@defun scheduler-kernel-thread sched
@@ -801,111 +821,66 @@ Return the kernel thread that @var{sched} is running on, or @code{#f}
if it is not currently running.
@end defun
+@defun scheduler-runcount sched
+Return the number of tasks that have been run on @var{sched}, modulo
+2@sup{32}. This interface is useful as a lightweight check to see if
+a remote scheduler is making progress.
+@end defun
+
@defun scheduler-remote-peers sched
Return a list of peer schedulers of @var{sched}, not including
@var{sched} itself.
@end defun
-@defun choose-parallel-scheduler sched
-Return a random scheduler from @var{sched}'s peer set. Note that
-@var{sched}'s peer set includes @var{sched} itself.
-@end defun
-
@defun scheduler-work-pending? sched
Return @code{#t} if @var{sched} has any work pending: any runnable
-fibers or any pending timeouts.
+tasks or any pending timeouts.
@end defun
-@defun run-scheduler sched finished?
-Run @var{sched} until there are no more fibers ready to run, no file
-descriptors being waited on, and no more timers pending to run, and
-calling the @var{finished?} thunk returns true. Return zero values.
+@defun choose-parallel-scheduler sched
+Return a random scheduler from @var{sched}'s peer set. Note that
+@var{sched}'s peer set includes @var{sched} itself.
@end defun
@defun destroy-scheduler sched
Release any resources associated with @var{sched}.
@end defun
-@defun resume-on-readable-fd fd fiber
-Arrange to resume @var{fiber} when the file descriptor @var{fd} becomes
-readable.
+@defun schedule-task sched task
+Arrange to run @var{task}, a procedure of no arguments, on the next
+turn of @var{sched}. If @var{sched} is remote and sleeping, it will
+be woken up.
@end defun
-@defun resume-on-writable-fd fd fiber
-Arrange to resume @var{fiber} when the file descriptor @var{fd} becomes
-writable.
+@defun schedule-task-when-fd-readable sched fd task
+Arrange to schedule @var{task} when the file descriptor @var{fd}
+becomes readable. @emph{Not thread-safe.}
@end defun
-@defun add-timer sched expiry thunk
-Arrange to call @var{thunk} when the absolute real time is greater
-than or equal to @var{expiry}, expressed in internal time units.
+@defun schedule-task-when-fd-writable sched fd task
+Arrange to schedule @var{task} on @var{sched} when the file descriptor
+@var{fd} becomes writable. @emph{Not thread-safe.}
@end defun
-@defun create-fiber sched thunk
-Spawn a new fiber in @var{sched} with the continuation @var{thunk}.
-The fiber will be scheduled on the next turn. @var{thunk} will run
-with a copy of the current dynamic state, isolating fluid and
-parameter mutations to the fiber.
-@end defun
-
-@defvar current-fiber
-Return the current fiber, or @code{#f} if no fiber is current.
-@end defvar
-
-@defun kill-fiber fiber
-Try to kill @var{fiber}, causing it to raise an exception. Note that
-this is currently unimplemented!
+@defun schedule-task-at-time sched expiry task
+Arrange to schedule @var{task} on @var{sched} when the absolute real
+time is greater than or equal to @var{expiry}, expressed in internal
+time units. @emph{Not thread-safe.}
@end defun
-@defun fiber-scheduler fiber
-Return the scheduler of @var{fiber}. Note that if the fiber is on a
-run queue, this may change out from under you due to work stealing.
+@defun suspend-current-task after-suspend
+Suspend the current task to the current scheduler. After suspending,
+call the @var{after-suspend} callback with two arguments: the current
+scheduler, and the continuation of the current task. The continuation
+passed to the @var{after-suspend} handler is the continuation of the
+@code{suspend-current-task} call.
@end defun
-@defun fiber-continuation
-Return the continuation of @var{fiber}, or @code{#f} if @var{fiber} is
-not suspended. Again, if @var{fiber} is on a run queue or could be
-resumed by a parallel scheduler, this continuation may change.
-@end defun
-
-@defun fold-all-schedulers f seed
-Fold @var{f} over the set of known schedulers. @var{f} will be invoked
-as @code{(@var{f} @var{name} @var{scheduler} @var{seed})}.
-@end defun
-
-@defun scheduler-by-name name
-Return the scheduler named @var{name}, or @code{#f} if no scheduler of
-that name is known.
-@end defun
-
-@defun fold-all-fibers f seed
-Fold @var{f} over the set of known fibers. @var{f} will be invoked as
-@code{(@var{f} @var{name} @var{fiber} @var{seed})}.
-@end defun
-
-@defun fiber-by-name name
-Return the fiber named @var{name}, or @code{#f} if no fiber of that name
-is known.
-@end defun
-
-@defun suspend-current-fiber [after-suspend]
-Suspend the current fiber. Call the optional @var{after-suspend}
-callback, if present, with the suspended thread as its argument.
-@end defun
-
-@defun resume-fiber fiber thunk
-Resume @var{fiber}, adding it to the run queue of its scheduler. The
-fiber will start by applying @var{thunk}. A fiber @emph{must} only be
-resumed when it is suspended. This function is thread-safe even if
-@var{fiber} is running on a remote scheduler.
-@end defun
-
-@defun yield-current-fiber
-Yield control to the current scheduler. Like
-@code{suspend-current-fiber} followed directly by @code{resume-fiber},
-except that it avoids suspending if the current continuation isn't
-suspendable. Returns @code{#t} if the yield succeeded, or @code{#f}
-otherwise.
+@defun yield-current-task
+Yield control to the current scheduler. Like calling
+@code{(suspend-current-task schedule-task)} except that it avoids
+suspending if the current continuation isn't suspendable. Returns
+@code{#t} if the yield succeeded, or @code{#f} otherwise.
@end defun
@node Pitfalls
diff --git a/fibers/channels.scm b/fibers/channels.scm
index aabf2db..ce6f348 100644
--- a/fibers/channels.scm
+++ b/fibers/channels.scm
@@ -32,7 +32,6 @@
#:use-module (ice-9 atomic)
#:use-module (ice-9 match)
#:use-module (fibers deque)
- #:use-module (fibers internal)
#:use-module (fibers operations)
#:export (make-channel
channel?
diff --git a/fibers/internal.scm b/fibers/internal.scm
deleted file mode 100644
index 79b1011..0000000
--- a/fibers/internal.scm
+++ /dev/null
@@ -1,489 +0,0 @@
-;; Fibers: cooperative, event-driven user-space threads.
-
-;;;; Copyright (C) 2016 Free Software Foundation, Inc.
-;;;;
-;;;; This library is free software; you can redistribute it and/or
-;;;; modify it under the terms of the GNU Lesser General Public
-;;;; License as published by the Free Software Foundation; either
-;;;; version 3 of the License, or (at your option) any later version.
-;;;;
-;;;; This library is distributed in the hope that it will be useful,
-;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
-;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-;;;; Lesser General Public License for more details.
-;;;;
-;;;; You should have received a copy of the GNU Lesser General Public
-;;;; License along with this library; if not, write to the Free Software
-;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-;;;;
-
-(define-module (fibers internal)
- #:use-module (srfi srfi-9)
- #:use-module (fibers stack)
- #:use-module (fibers epoll)
- #:use-module (fibers psq)
- #:use-module (fibers nameset)
- #:use-module (ice-9 atomic)
- #:use-module (ice-9 control)
- #:use-module (ice-9 match)
- #:use-module (ice-9 fdes-finalizers)
- #:use-module ((ice-9 threads) #:select (current-thread))
- #:export (;; Low-level interface: schedulers and threads.
- make-scheduler
- with-scheduler
- scheduler-name
- scheduler-runcount
- (scheduler-kernel-thread/public . scheduler-kernel-thread)
- scheduler-remote-peers
- scheduler-work-pending?
- choose-parallel-scheduler
- run-scheduler
- destroy-scheduler
-
- resume-on-readable-fd
- resume-on-writable-fd
- add-timer
-
- create-fiber
- (current-fiber/public . current-fiber)
- kill-fiber
- fiber-scheduler
- fiber-continuation
-
- fold-all-schedulers
- scheduler-by-name
- fold-all-fibers
- fiber-by-name
-
- suspend-current-fiber
- resume-fiber
- yield-current-fiber))
-
-(define-once fibers-nameset (make-nameset))
-(define-once schedulers-nameset (make-nameset))
-
-(define (fold-all-schedulers f seed)
- "Fold @var{f} over the set of known schedulers. @var{f} will be
-invoked as @code{(@var{f} @var{name} @var{scheduler} @var{seed})}."
- (nameset-fold f schedulers-nameset seed))
-(define (scheduler-by-name name)
- "Return the scheduler named @var{name}, or @code{#f} if no scheduler
-of that name is known."
- (nameset-ref schedulers-nameset name))
-
-(define (fold-all-fibers f seed)
- "Fold @var{f} over the set of known fibers. @var{f} will be
-invoked as @code{(@var{f} @var{name} @var{fiber} @var{seed})}."
- (nameset-fold f fibers-nameset seed))
-(define (fiber-by-name name)
- "Return the fiber named @var{name}, or @code{#f} if no fiber of that
-name is known."
- (nameset-ref fibers-nameset name))
-
-(define-record-type <scheduler>
- (%make-scheduler name epfd runcount-box prompt-tag
- next-runqueue current-runqueue
- sources timers kernel-thread
- remote-peers choose-parallel-scheduler)
- scheduler?
- (name scheduler-name set-scheduler-name!)
- (epfd scheduler-epfd)
- ;; atomic variable of uint32
- (runcount-box scheduler-runcount-box)
- (prompt-tag scheduler-prompt-tag)
- ;; atomic stack of fiber to run next turn (reverse order)
- (next-runqueue scheduler-next-runqueue)
- ;; atomic stack of fiber to run this turn
- (current-runqueue scheduler-current-runqueue)
- ;; fd -> (total-events (events . resume-fn) ...)
- (sources scheduler-sources)
- ;; PSQ of thunk -> expiry
- (timers scheduler-timers set-scheduler-timers!)
- ;; atomic parameter of thread
- (kernel-thread scheduler-kernel-thread)
- ;; list of sched
- (remote-peers scheduler-remote-peers set-scheduler-remote-peers!)
- ;; () -> sched
- (choose-parallel-scheduler scheduler-choose-parallel-scheduler
- set-scheduler-choose-parallel-scheduler!))
-
-(define-record-type <fiber>
- (make-fiber scheduler continuation)
- fiber?
- ;; The scheduler to which a fiber is currently bound.
- (scheduler fiber-scheduler set-fiber-scheduler!)
- ;; What the fiber should do when it resumes, or #f if the fiber is
- ;; currently running.
- (continuation fiber-continuation set-fiber-continuation!))
-
-(define (make-atomic-parameter init)
- (let ((box (make-atomic-box init)))
- (case-lambda
- (() (atomic-box-ref box))
- ((new)
- (if (eq? new init)
- (atomic-box-set! box new)
- (let ((prev (atomic-box-compare-and-swap! box init new)))
- (unless (eq? prev init)
- (error "owned by other thread" prev))))))))
-
-(define (shuffle l)
- (map cdr (sort (map (lambda (x) (cons (random 1.0) x)) l)
- (lambda (a b) (< (car a) (car b))))))
-
-(define (make-selector items)
- (let ((items (list->vector (shuffle items))))
- (match (vector-length items)
- (0 (lambda () #f))
- (1 (let ((item (vector-ref items 0))) (lambda () item)))
- (n (let ((idx 0))
- (lambda ()
- (let ((item (vector-ref items idx)))
- (set! idx (let ((idx (1+ idx)))
- (if (= idx (vector-length items)) 0 idx)))
- item)))))))
-
-(define* (make-scheduler #:key parallelism
- (prompt-tag (make-prompt-tag "fibers")))
- "Make a new scheduler in which to run fibers."
- (let ((epfd (epoll-create))
- (runcount-box (make-atomic-box 0))
- (next-runqueue (make-empty-stack))
- (current-runqueue (make-empty-stack))
- (sources (make-hash-table))
- (timers (make-psq (match-lambda*
- (((t1 . c1) (t2 . c2)) (< t1 t2)))
- <))
- (kernel-thread (make-atomic-parameter #f)))
- (let ((sched (%make-scheduler #f epfd runcount-box prompt-tag
- next-runqueue current-runqueue
- sources timers kernel-thread
- #f #f)))
- (set-scheduler-name! sched (nameset-add! schedulers-nameset sched))
- (let ((all-scheds
- (cons sched
- (if parallelism
- (map (lambda (_)
- (make-scheduler #:prompt-tag prompt-tag))
- (iota (1- parallelism)))
- '()))))
- (for-each
- (lambda (sched)
- (let ((choose! (make-selector all-scheds)))
- (set-scheduler-remote-peers! sched (delq sched all-scheds))
- (set-scheduler-choose-parallel-scheduler! sched choose!)))
- all-scheds))
- sched)))
-
-(define-syntax-rule (with-scheduler scheduler body ...)
- "Evaluate @code{(begin @var{body} ...)} in an environment in which
-@var{scheduler} is bound to the current kernel thread. Signal an
-error if @var{scheduler} is already running in some other kernel
-thread."
- (let ((sched scheduler))
- (dynamic-wind (lambda ()
- ((scheduler-kernel-thread sched) (current-thread)))
- (lambda ()
- body ...)
- (lambda ()
- ((scheduler-kernel-thread sched) #f)))))
-
-(define (scheduler-runcount sched)
- "Return the number of fibers that have been scheduled on
-@var{sched} since it was started, modulo 2@sup{32}."
- (atomic-box-ref (scheduler-runcount-box sched)))
-
-(define (scheduler-kernel-thread/public sched)
- "Return the kernel thread on which @var{sched} is running, or
-@code{#f} if @var{sched} is not running."
- ((scheduler-kernel-thread sched)))
-
-(define (choose-parallel-scheduler sched)
- ((scheduler-choose-parallel-scheduler sched)))
-
-(define current-fiber (make-parameter #f))
-(define (current-fiber/public)
- "Return the current fiber, or @code{#f} if no fiber is current."
- (current-fiber))
-
-(define (schedule-fiber! fiber thunk)
- ;; The fiber will be resumed at most once, and we are the ones that
- ;; will resume it, so we can set the thunk directly. Adding the
- ;; fiber to the runqueue is an atomic operation with SEQ_CST
- ;; ordering, so that will make sure this operation is visible even
- ;; for a fiber scheduled on a remote thread.
- (set-fiber-continuation! fiber thunk)
- (let ((sched (fiber-scheduler fiber)))
- (stack-push! (scheduler-next-runqueue sched) fiber)
- (unless (eq? ((scheduler-kernel-thread sched)) (current-thread))
- (epoll-wake! (scheduler-epfd sched)))
- (values)))
-
-(define internal-time-units-per-millisecond
- (/ internal-time-units-per-second 1000))
-
-(define (schedule-fibers-for-fd fd revents sched)
- (match (hashv-ref (scheduler-sources sched) fd)
- (#f (warn "scheduler for unknown fd" fd))
- ((and sources (active-events . waiters))
- ;; First, clear the active status, as the EPOLLONESHOT has
- ;; deactivated our entry in the epoll set.
- (set-car! sources #f)
- (set-cdr! sources '())
- (unless (zero? (logand revents EPOLLERR))
- (hashv-remove! (scheduler-sources sched) fd))
- ;; Now resume or re-enqueue fibers, as appropriate.
- (let lp ((waiters waiters))
- (match waiters
- (() #f)
- (((events . resume) . waiters)
- (if (zero? (logand revents (logior events EPOLLERR)))
- ;; Re-enqueue.
- (add-fd-event-waiter sched fd events resume)
- ;; Resume.
- (resume revents))
- (lp waiters)))))))
-
-(define (run-timers sched)
- ;; Run expired timer thunks in the order that they expired.
- (let ((now (get-internal-real-time)))
- (let run-timers ((timers (scheduler-timers sched)))
- (cond
- ((or (psq-empty? timers)
- (< now (car (psq-min timers))))
- (set-scheduler-timers! sched timers))
- (else
- (call-with-values (lambda () (psq-pop timers))
- (match-lambda*
- (((_ . thunk) timers)
- (thunk)
- (run-timers timers)))))))))
-
-(define (schedule-runnables-for-next-turn sched)
- ;; Called when all runnables from the current turn have been run.
- ;; Note that there may be runnables already scheduled for the next
- ;; turn; one way this can happen is if a fiber suspended itself
- ;; because it was blocked on a channel, but then another fiber woke
- ;; it up, or if a remote thread scheduled a fiber on this scheduler.
- ;; In any case, check the kernel to see if any of the fd's that we
- ;; are interested in are active, and in that case schedule their
- ;; corresponding fibers. Also run any timers that have timed out.
- (define (timers-expiry timers)
- (and (not (psq-empty? timers))
- (match (psq-min timers)
- ((expiry . thunk)
- expiry))))
- (define (update-expiry expiry)
- ;; If there are pending runnables, cause epoll to return
- ;; immediately.
- (if (stack-empty? (scheduler-next-runqueue sched))
- expiry
- 0))
- (epoll (scheduler-epfd sched)
- #:expiry (timers-expiry (scheduler-timers sched))
- #:update-expiry update-expiry
- #:folder (lambda (fd revents sched)
- (schedule-fibers-for-fd fd revents sched)
- sched)
- #:seed sched)
- (run-timers sched))
-
-(define (fiber-stealer sched)
- "Steal some work from a random scheduler in the vector
-@var{schedulers}. Return a fiber, or @code{#f} if no work could be
-stolen."
- (let ((selector (make-selector (scheduler-remote-peers sched))))
- (lambda ()
- (let ((peer (selector)))
- (and peer
- (stack-pop! (scheduler-current-runqueue peer) #f))))))
-
-(define (scheduler-work-pending? sched)
- "Return @code{#t} if @var{sched} has any work pending: any runnable
-fibers or any pending timeouts."
- (not (and (psq-empty? (scheduler-timers sched))
- (stack-empty? (scheduler-current-runqueue sched))
- (stack-empty? (scheduler-next-runqueue sched)))))
-
-(define* (run-scheduler sched finished?)
- "Run @var{sched} until there are no more fibers ready to run, no
-file descriptors being waited on, and no more timers pending to run.
-Return zero values."
- (let ((tag (scheduler-prompt-tag sched))
- (runcount-box (scheduler-runcount-box sched))
- (next (scheduler-next-runqueue sched))
- (cur (scheduler-current-runqueue sched))
- (steal-fiber! (fiber-stealer sched)))
- (define (run-fiber fiber)
- (atomic-box-set! runcount-box
- (logand (1+ (atomic-box-ref runcount-box)) #xffffFFFF))
- (call-with-prompt tag
- (lambda ()
- (let ((thunk (fiber-continuation fiber)))
- (set-fiber-continuation! fiber #f)
- (thunk)))
- (lambda (k after-suspend)
- (set-fiber-continuation! fiber k)
- (after-suspend fiber))))
- (let next-turn ()
- (unless (finished?)
- (schedule-runnables-for-next-turn sched)
- (stack-push-list! cur (reverse (stack-pop-all! next)))
- (let next-fiber ()
- (match (stack-pop! cur #f)
- (#f
- (when (stack-empty? next)
- ;; Both current and next runqueues are empty; steal a
- ;; little bit of work from a remote scheduler if we
- ;; can. Run it directly instead of pushing onto a
- ;; queue to avoid double stealing.
- (let ((fiber (steal-fiber!)))
- (when fiber
- (set-fiber-scheduler! fiber sched)
- (run-fiber fiber))))
- (next-turn))
- (fiber
- (run-fiber fiber)
- (next-fiber))))))))
-
-(define (destroy-scheduler sched)
- "Release any resources associated with @var{sched}."
- #;
- (for-each kill-fiber (list-copy (scheduler-fibers sched)))
- (epoll-destroy (scheduler-epfd sched)))
-
-(define (create-fiber sched thunk)
- "Spawn a new fiber in @var{sched} with the continuation @var{thunk}.
-The fiber will be scheduled on the next turn. @var{thunk} will run
-with a copy of the current dynamic state, isolating fluid and
-parameter mutations to the fiber."
- (let* ((fiber (make-fiber sched #f))
- (thunk (let ((dynamic-state (current-dynamic-state)))
- (lambda ()
- (with-dynamic-state
- dynamic-state
- (lambda ()
- (current-fiber fiber)
- (catch #t
- (lambda ()
- (%start-stack #t thunk))
- (lambda _ #f)
- (let ((err (current-error-port)))
- (lambda (key . args)
- (false-if-exception
- (let ((stack (make-stack #t 4)))
- (format err "Uncaught exception in fiber #~a:\n"
- (nameset-ref fibers-nameset fiber))
- (display-backtrace stack err)
- (print-exception err (stack-ref stack 0)
- key args))))))))))))
- (nameset-add! fibers-nameset fiber)
- (schedule-fiber! fiber thunk)))
-
-(define (kill-fiber fiber)
- "Try to kill @var{fiber}, causing it to raise an exception. Note
-that this is currently unimplemented!"
- (error "kill-fiber is unimplemented"))
-
-;; Shim for Guile 2.1.5.
-(unless (defined? 'suspendable-continuation?)
- (define! 'suspendable-continuation? (lambda (tag) #t)))
-
-;; The AFTER-SUSPEND thunk allows the user to suspend the current
-;; fiber, saving its state, and then perform some other nonlocal
-;; control flow.
-;;
-(define* (suspend-current-fiber #:optional
- (after-suspend (lambda (fiber) #f)))
- "Suspend the current fiber. Call the optional @var{after-suspend}
-callback, if present, with the suspended thread as its argument."
- (let ((tag (scheduler-prompt-tag (fiber-scheduler (current-fiber)))))
- (unless (suspendable-continuation? tag)
- (error "Attempt to suspend fiber within continuation barrier"))
- ((abort-to-prompt tag after-suspend))))
-
-(define* (resume-fiber fiber thunk)
- "Resume @var{fiber}, adding it to the run queue of its scheduler.
-The fiber will start by applying @var{thunk}. A fiber @emph{must}
-only be resumed when it is suspended. This function is thread-safe
-even if @var{fiber} is running on a remote scheduler."
- (let ((cont (fiber-continuation fiber)))
- (unless cont (error "invalid fiber" fiber))
- (schedule-fiber! fiber (lambda () (cont thunk)))))
-
-(define* (yield-current-fiber)
- "Yield control to the current scheduler. Like
-@code{suspend-current-fiber} followed directly by @code{resume-fiber},
-except that it avoids suspending if the current continuation isn't
-suspendable. Returns @code{#t} if the yield succeeded, or @code{#f}
-otherwise."
- (match (current-fiber)
- (#f #f)
- (fiber
- (let ((tag (scheduler-prompt-tag (fiber-scheduler fiber))))
- (and (suspendable-continuation? tag)
- (begin
- (abort-to-prompt tag (lambda (fiber) (resume-fiber fiber #f)))
- #t))))))
-
-(define (finalize-fd sources)
- "Remove data associated with @var{fd} from the scheduler @var{ctx}.
-Called by Guile just before Guile goes to close a file descriptor, in
-response either to an explicit call to @code{close-port}, or because
-the port became unreachable. In the latter case, this call may come
-from a finalizer thread."
- ;; When a file descriptor is closed, the kernel silently removes it
- ;; from any associated epoll sets, so we don't need to do anything
- ;; there. But because this call could come from any thread,
- ;; especially given the fact that the fiber might be migrated, we
- ;; have to operate locally, just nulling out the sources pair and
- ;; not mucking with the sources table.
- ;;
- ;; FIXME: Wake all sources with EPOLLERR.
- (set-cdr! sources '())
- (set-car! sources #f))
-
-(define (add-fd-event-waiter sched fd events resume)
- "Arrange to resume @var{fiber} when the file descriptor @var{fd} has
-the given @var{events}, expressed as an epoll bitfield."
- (let ((sources (hashv-ref (scheduler-sources sched) fd)))
- (match sources
- ((active-events . waiters)
- (set-cdr! sources (acons events resume waiters))
- (unless (and active-events
- (= (logand events active-events) events))
- (let ((active-events (logior events (or active-events 0))))
- (set-car! sources active-events)
- (epoll-add*! (scheduler-epfd sched) fd
- (logior active-events EPOLLONESHOT)))))
- (#f
- (let ((sources (list events (cons events resume))))
- (hashv-set! (scheduler-sources sched) fd sources)
- (add-fdes-finalizer! fd (lambda (fd) (finalize-fd sources)))
- (epoll-add*! (scheduler-epfd sched) fd
- (logior events EPOLLONESHOT)))))))
-
-(define (resume-on-fd-events fd events fiber)
- "Arrange to resume @var{fiber} when the file descriptor @var{fd} has
-the given @var{events}, expressed as an epoll bitfield."
- (add-fd-event-waiter (fiber-scheduler fiber) fd events
- (lambda (revents)
- (resume-fiber fiber (lambda () revents)))))
-
-(define (resume-on-readable-fd fd fiber)
- "Arrange to resume @var{fiber} when the file descriptor @var{fd}
-becomes readable."
- (resume-on-fd-events fd (logior EPOLLIN EPOLLRDHUP) fiber))
-
-(define (resume-on-writable-fd fd fiber)
- "Arrange to resume @var{fiber} when the file descriptor @var{fd}
-becomes writable."
- (resume-on-fd-events fd EPOLLOUT fiber))
-
-(define (add-timer sched expiry thunk)
- "Arrange to call @var{thunk} when the absolute real time is greater
-than or equal to @var{expiry}, expressed in internal time units."
- (set-scheduler-timers! sched
- (psq-set (scheduler-timers sched)
- (cons expiry thunk)
- expiry)))
diff --git a/fibers/operations.scm b/fibers/operations.scm
index ef2969c..ac55507 100644
--- a/fibers/operations.scm
+++ b/fibers/operations.scm
@@ -56,7 +56,7 @@
make-mutex make-condition-variable
lock-mutex unlock-mutex
wait-condition-variable signal-condition-variable))
- #:use-module (fibers internal)
+ #:use-module (fibers scheduler)
#:export (wrap-operation
choice-operation
perform-operation
@@ -151,11 +151,12 @@ the operation cannot complete directly, block until it can complete."
;; succeeds. Otherwise we block the current thread until the
;; operation succeeds, to allow for communication between fibers
;; and foreign threads.
- (if (current-fiber)
- (suspend-current-fiber
- (lambda (fiber)
- (define (resume thunk) (resume-fiber fiber thunk))
- (block (fiber-scheduler fiber) resume)))
+ (if (current-scheduler)
+ ((suspend-current-task
+ (lambda (sched k)
+ (define (resume thunk)
+ (schedule-task sched (lambda () (k thunk))))
+ (block sched resume))))
(let ((k #f)
(thread (current-thread))
(mutex (make-mutex))
diff --git a/fibers/repl.scm b/fibers/repl.scm
index 3331702..69ee164 100644
--- a/fibers/repl.scm
+++ b/fibers/repl.scm
@@ -25,15 +25,27 @@
#:use-module ((ice-9 threads)
#:select (call-with-new-thread cancel-thread join-thread))
#:use-module (fibers)
- #:use-module (fibers internal))
+ #:use-module (fibers nameset)
+ #:use-module (fibers scheduler))
+
+(define-once schedulers-nameset (make-nameset))
+
+(define (fold-all-schedulers f seed)
+ "Fold @var{f} over the set of known schedulers. @var{f} will be
+invoked as @code{(@var{f} @var{name} @var{scheduler} @var{seed})}."
+ (nameset-fold f schedulers-nameset seed))
+(define (scheduler-by-name name)
+ "Return the scheduler named @var{name}, or @code{#f} if no scheduler
+of that name is known."
+ (nameset-ref schedulers-nameset name))
(define repl-current-scheds (make-doubly-weak-hash-table))
(define (repl-current-sched repl)
(hashq-ref repl-current-scheds repl))
-(define (repl-set-current-sched! repl sched verbose?)
+(define (repl-set-current-sched! repl name sched verbose?)
(when verbose?
(format #t "Scheduler ~a on thread ~a is now current\n."
- (scheduler-name sched) (scheduler-kernel-thread sched)))
+ name (scheduler-kernel-thread sched)))
(hashq-set! repl-current-scheds repl sched))
(define* (repl-ensure-current-sched repl #:optional (verbose? #t))
(define (sched-alive? sched)
@@ -44,19 +56,20 @@
(match scheds
(()
(let* ((sched (make-scheduler))
+ (name (nameset-add! schedulers-nameset sched))
(thread (call-with-new-thread
(lambda ()
(run-fibers #:scheduler sched)))))
(when verbose?
(format #t "No active schedulers; spawned a new one (#~a).\n"
- (scheduler-name sched)))
- (repl-set-current-sched! repl sched verbose?)
+ name))
+ (repl-set-current-sched! repl name sched verbose?)
sched))
(((id . (and sched (? sched-alive?))) . scheds)
(when verbose?
(format #t "No current scheduler; choosing scheduler #~a randomly.\n"
- (scheduler-name sched)))
- (repl-set-current-sched! repl sched verbose?)
+ id))
+ (repl-set-current-sched! repl id sched verbose?)
sched)))))
(define-meta-command ((scheds fibers) repl)
@@ -78,54 +91,28 @@ Show a list of schedulers."
(define-meta-command ((spawn-sched fibers) repl)
"spawn-sched
Create a new scheduler for fibers, and run it on a new kernel thread."
- (let ((sched (make-scheduler)))
+ (let* ((sched (make-scheduler))
+ (name (nameset-add! schedulers-nameset sched)))
(call-with-new-thread (lambda ()
(call-with-new-thread
(lambda ()
(run-fibers #:scheduler sched)))))
- (format #t "Spawned scheduler #~a.\n" (scheduler-name sched))))
+ (format #t "Spawned scheduler #~a.\n" name)))
-(define-meta-command ((kill-sched fibers) repl sched)
- "kill-sched SCHED
+(define-meta-command ((kill-sched fibers) repl name)
+ "kill-sched NAME
Shut down a scheduler."
- (let ((sched (or (scheduler-by-name sched)
- (error "no scheduler with name" sched))))
+ (let ((sched (or (scheduler-by-name name)
+ (error "no scheduler with name" name))))
(cond
((scheduler-kernel-thread sched)
=> (lambda (thread)
- (format #t "Killing thread running scheduler #~a...\n"
- (scheduler-name sched))
+ (format #t "Killing thread running scheduler #~a...\n" name)
(cancel-thread thread)
(join-thread thread)
- (format #t "Thread running scheduler #~a stopped.\n"
- (scheduler-name sched))))
+ (format #t "Thread running scheduler #~a stopped.\n" name)))
(else
- (format #t "Scheduler #~a not running.\n" (scheduler-name sched))))))
-
-(define-meta-command ((fibers fibers) repl #:optional sched)
- "fibers [SCHED]
-Show a list of fibers.
-
-If SCHED is given, limit to fibers bound to the given scheduler."
- (let ((sched (and sched
- (or (scheduler-by-name sched)
- (error "no scheduler with name" sched)))))
- (match (sort (fold-all-fibers acons '())
- (match-lambda*
- (((id1 . _) (id2 . _)) (< id1 id2))))
- (() (format #t "No fibers.\n"))
- (fibers
- (format #t "~a ~8t~a\n" "fiber" "state")
- (format #t "~a ~8t~a\n" "-----" "-----")
- (for-each
- (match-lambda
- ((id . fiber)
- ;; How to show fiber data? Would be nice to say "suspended
- ;; at foo.scm:32:4".
- (when (or (not sched) (eq? (fiber-scheduler fiber) sched))
- (format #t "~a ~8t~a\n" id
- (if (fiber-continuation fiber) "(suspended)" "")))))
- fibers)))))
+ (format #t "Scheduler #~a not running.\n" name)))))
(define-meta-command ((spawn-fiber fibers) repl (form) #:optional sched)
"spawn-fiber EXP [SCHED]
@@ -135,8 +122,3 @@ If SCHED is given, the fiber will be spawned on the given scheduler."
(let ((thunk (repl-prepare-eval-thunk repl (repl-parse repl form)))
(sched (repl-ensure-current-sched repl)))
(spawn-fiber thunk sched)))
-
-(define-meta-command ((kill-fiber fibers) repl fiber)
- "kill-fiber FIBER
-Shut down a fiber."
- (display "Don't know how to do that yet!\n"))
diff --git a/fibers/scheduler.scm b/fibers/scheduler.scm
new file mode 100644
index 0000000..28a08b3
--- /dev/null
+++ b/fibers/scheduler.scm
@@ -0,0 +1,383 @@
+;; Fibers: cooperative, event-driven user-space threads.
+
+;;;; Copyright (C) 2016 Free Software Foundation, Inc.
+;;;;
+;;;; This library is free software; you can redistribute it and/or
+;;;; modify it under the terms of the GNU Lesser General Public
+;;;; License as published by the Free Software Foundation; either
+;;;; version 3 of the License, or (at your option) any later version.
+;;;;
+;;;; This library is distributed in the hope that it will be useful,
+;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;;; Lesser General Public License for more details.
+;;;;
+;;;; You should have received a copy of the GNU Lesser General Public
+;;;; License along with this library; if not, write to the Free Software
+;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+;;;;
+
+(define-module (fibers scheduler)
+ #:use-module (srfi srfi-9)
+ #:use-module (fibers stack)
+ #:use-module (fibers epoll)
+ #:use-module (fibers psq)
+ #:use-module (ice-9 atomic)
+ #:use-module (ice-9 control)
+ #:use-module (ice-9 match)
+ #:use-module (ice-9 fdes-finalizers)
+ #:use-module ((ice-9 threads) #:select (current-thread))
+ #:export (;; Low-level interface: schedulers and tasks.
+ make-scheduler
+ (current-scheduler/public . current-scheduler)
+ scheduler-runcount
+ (scheduler-kernel-thread/public . scheduler-kernel-thread)
+ scheduler-remote-peers
+ scheduler-work-pending?
+ choose-parallel-scheduler
+ run-scheduler
+ destroy-scheduler
+
+ schedule-task
+ schedule-task-when-fd-readable
+ schedule-task-when-fd-writable
+ schedule-task-at-time
+
+ suspend-current-task
+ yield-current-task))
+
+(define-record-type <scheduler>
+ (%make-scheduler epfd runcount-box prompt-tag
+ next-runqueue current-runqueue
+ fd-waiters timers kernel-thread
+ remote-peers choose-parallel-scheduler)
+ scheduler?
+ (epfd scheduler-epfd)
+ ;; atomic variable of uint32
+ (runcount-box scheduler-runcount-box)
+ (prompt-tag scheduler-prompt-tag)
+ ;; atomic stack of tasks to run next turn (reverse order)
+ (next-runqueue scheduler-next-runqueue)
+ ;; atomic stack of tasks to run this turn
+ (current-runqueue scheduler-current-runqueue)
+ ;; fd -> (total-events (events . task) ...)
+ (fd-waiters scheduler-fd-waiters)
+ ;; PSQ of expiry -> task
+ (timers scheduler-timers set-scheduler-timers!)
+ ;; atomic parameter of thread
+ (kernel-thread scheduler-kernel-thread)
+ ;; list of sched
+ (remote-peers scheduler-remote-peers set-scheduler-remote-peers!)
+ ;; () -> sched
+ (choose-parallel-scheduler scheduler-choose-parallel-scheduler
+ set-scheduler-choose-parallel-scheduler!))
+
+(define (make-atomic-parameter init)
+ (let ((box (make-atomic-box init)))
+ (case-lambda
+ (() (atomic-box-ref box))
+ ((new)
+ (if (eq? new init)
+ (atomic-box-set! box new)
+ (let ((prev (atomic-box-compare-and-swap! box init new)))
+ (unless (eq? prev init)
+ (error "owned by other thread" prev))))))))
+
+(define (shuffle l)
+ (map cdr (sort (map (lambda (x) (cons (random 1.0) x)) l)
+ (lambda (a b) (< (car a) (car b))))))
+
+(define (make-selector items)
+ (let ((items (list->vector (shuffle items))))
+ (match (vector-length items)
+ (0 (lambda () #f))
+ (1 (let ((item (vector-ref items 0))) (lambda () item)))
+ (n (let ((idx 0))
+ (lambda ()
+ (let ((item (vector-ref items idx)))
+ (set! idx (let ((idx (1+ idx)))
+ (if (= idx (vector-length items)) 0 idx)))
+ item)))))))
+
+(define* (make-scheduler #:key parallelism
+ (prompt-tag (make-prompt-tag "fibers")))
+ "Make a new scheduler in which to run fibers."
+ (let ((epfd (epoll-create))
+ (runcount-box (make-atomic-box 0))
+ (next-runqueue (make-empty-stack))
+ (current-runqueue (make-empty-stack))
+ (fd-waiters (make-hash-table))
+ (timers (make-psq (match-lambda*
+ (((t1 . c1) (t2 . c2)) (< t1 t2)))
+ <))
+ (kernel-thread (make-atomic-parameter #f)))
+ (let* ((sched (%make-scheduler epfd runcount-box prompt-tag
+ next-runqueue current-runqueue
+ fd-waiters timers kernel-thread
+ #f #f))
+ (all-scheds
+ (cons sched
+ (if parallelism
+ (map (lambda (_)
+ (make-scheduler #:prompt-tag prompt-tag))
+ (iota (1- parallelism)))
+ '()))))
+ (for-each
+ (lambda (sched)
+ (let ((choose! (make-selector all-scheds)))
+ (set-scheduler-remote-peers! sched (delq sched all-scheds))
+ (set-scheduler-choose-parallel-scheduler! sched choose!)))
+ all-scheds)
+ sched)))
+
+(define current-scheduler (fluid->parameter (make-thread-local-fluid #f)))
+(define (current-scheduler/public)
+ "Return the current scheduler, or @code{#f} if no scheduler is current."
+ (current-scheduler))
+
+(define-syntax-rule (with-scheduler scheduler body ...)
+ "Evaluate @code{(begin @var{body} ...)} in an environment in which
+@var{scheduler} is bound to the current kernel thread and
+@code{current-scheduler} is bound to @var{scheduler}. Signal an error
+if @var{scheduler} is already running in some other kernel thread."
+ (let ((sched scheduler))
+ (dynamic-wind (lambda ()
+ ((scheduler-kernel-thread sched) (current-thread)))
+ (lambda ()
+ (parameterize ((current-scheduler sched))
+ body ...))
+ (lambda ()
+ ((scheduler-kernel-thread sched) #f)))))
+
+(define (scheduler-runcount sched)
+ "Return the number of tasks that have been run on @var{sched} since
+it was started, modulo 2@sup{32}."
+ (atomic-box-ref (scheduler-runcount-box sched)))
+
+(define (scheduler-kernel-thread/public sched)
+ "Return the kernel thread on which @var{sched} is running, or
+@code{#f} if @var{sched} is not running."
+ ((scheduler-kernel-thread sched)))
+
+(define (choose-parallel-scheduler sched)
+ ((scheduler-choose-parallel-scheduler sched)))
+
+(define-inlinable (schedule-task/no-wakeup sched task)
+ (stack-push! (scheduler-next-runqueue sched) task))
+
+(define (schedule-task sched task)
+ "Add the task @var{task} to the run queue of the scheduler
+@var{sched}. On the next turn, @var{sched} will invoke @var{task}
+with no arguments.
+
+This function is thread-safe even if @var{sched} is running on a
+remote kernel thread."
+ (schedule-task/no-wakeup sched task)
+ (unless (eq? ((scheduler-kernel-thread sched)) (current-thread))
+ (epoll-wake! (scheduler-epfd sched)))
+ (values))
+
+(define (schedule-tasks-for-active-fd fd revents sched)
+ (match (hashv-ref (scheduler-fd-waiters sched) fd)
+ (#f (warn "scheduler for unknown fd" fd))
+ ((and events+waiters (active-events . waiters))
+ ;; First, clear the active status, as the EPOLLONESHOT has
+ ;; deactivated our entry in the epoll set.
+ (set-car! events+waiters #f)
+ (set-cdr! events+waiters '())
+ (unless (zero? (logand revents EPOLLERR))
+ (hashv-remove! (scheduler-fd-waiters sched) fd))
+ ;; Now resume or re-schedule waiters, as appropriate.
+ (let lp ((waiters waiters))
+ (match waiters
+ (() #f)
+ (((events . task) . waiters)
+ (if (zero? (logand revents (logior events EPOLLERR)))
+ ;; Re-schedule.
+ (schedule-task-when-fd-active sched fd events task)
+ ;; Resume.
+ (schedule-task/no-wakeup sched task))
+ (lp waiters)))))))
+
+(define (schedule-tasks-for-expired-timers sched)
+ ;; Schedule expired timer tasks in the order that they expired.
+ (let ((now (get-internal-real-time)))
+ (let expire-timers ((timers (scheduler-timers sched)))
+ (cond
+ ((or (psq-empty? timers)
+ (< now (car (psq-min timers))))
+ (set-scheduler-timers! sched timers))
+ (else
+ (call-with-values (lambda () (psq-pop timers))
+ (match-lambda*
+ (((_ . task) timers)
+ (schedule-task/no-wakeup sched task)
+ (expire-timers timers)))))))))
+
+(define (schedule-tasks-for-next-turn sched)
+ ;; Called when all tasks from the current turn have been run.
+ ;; Note that there may be tasks already scheduled for the next
+ ;; turn; one way this can happen is if a fiber suspended itself
+ ;; because it was blocked on a channel, but then another fiber woke
+ ;; it up, or if a remote thread scheduled a fiber on this scheduler.
+ ;; In any case, check the kernel to see if any of the fd's that we
+ ;; are interested in are active, and in that case schedule their
+ ;; corresponding tasks. Also run any timers that have timed out.
+ (define (timers-expiry timers)
+ (and (not (psq-empty? timers))
+ (match (psq-min timers)
+ ((expiry . task)
+ expiry))))
+ (define (update-expiry expiry)
+ ;; If there are pending tasks, cause epoll to return
+ ;; immediately.
+ (if (stack-empty? (scheduler-next-runqueue sched))
+ expiry
+ 0))
+ (epoll (scheduler-epfd sched)
+ #:expiry (timers-expiry (scheduler-timers sched))
+ #:update-expiry update-expiry
+ #:folder (lambda (fd revents sched)
+ (schedule-tasks-for-active-fd fd revents sched)
+ sched)
+ #:seed sched)
+ (schedule-tasks-for-expired-timers sched))
+
+(define (work-stealer sched)
+ "Steal some work from a random scheduler in the vector
+@var{schedulers}. Return a task, or @code{#f} if no work could be
+stolen."
+ (let ((selector (make-selector (scheduler-remote-peers sched))))
+ (lambda ()
+ (let ((peer (selector)))
+ (and peer
+ (stack-pop! (scheduler-current-runqueue peer) #f))))))
+
+(define (scheduler-work-pending? sched)
+ "Return @code{#t} if @var{sched} has any work pending: any tasks or
+any pending timeouts."
+ (not (and (psq-empty? (scheduler-timers sched))
+ (stack-empty? (scheduler-current-runqueue sched))
+ (stack-empty? (scheduler-next-runqueue sched)))))
+
+(define* (run-scheduler sched finished?)
+ "Run @var{sched} until calling @code{finished?} returns a true
+value. Return zero values."
+ (let ((tag (scheduler-prompt-tag sched))
+ (runcount-box (scheduler-runcount-box sched))
+ (next (scheduler-next-runqueue sched))
+ (cur (scheduler-current-runqueue sched))
+ (steal-work! (work-stealer sched)))
+ (define (run-task task)
+ (atomic-box-set! runcount-box
+ (logand (1+ (atomic-box-ref runcount-box)) #xffffFFFF))
+ (call-with-prompt tag
+ task
+ (lambda (k after-suspend)
+ (after-suspend sched k))))
+ (with-scheduler
+ sched
+ (let next-turn ()
+ (unless (finished?)
+ (schedule-tasks-for-next-turn sched)
+ (stack-push-list! cur (reverse (stack-pop-all! next)))
+ (let next-task ()
+ (match (stack-pop! cur #f)
+ (#f
+ (when (stack-empty? next)
+ ;; Both current and next runqueues are empty; steal a
+ ;; little bit of work from a remote scheduler if we
+ ;; can. Run it directly instead of pushing onto a
+ ;; queue to avoid double stealing.
+ (let ((task (steal-work!)))
+ (when task
+ (run-task task))))
+ (next-turn))
+ (task
+ (run-task task)
+ (next-task)))))))))
+
+(define (destroy-scheduler sched)
+ "Release any resources associated with @var{sched}."
+ (epoll-destroy (scheduler-epfd sched)))
+
+(define (schedule-task-when-fd-active sched fd events task)
+ "Arrange for @var{sched} to schedule @var{task} when the file
+descriptor @var{fd} becomes active with any of the given @var{events},
+expressed as an epoll bitfield."
+ (let ((fd-waiters (hashv-ref (scheduler-fd-waiters sched) fd)))
+ (match fd-waiters
+ ((active-events . waiters)
+ (set-cdr! fd-waiters (acons events task waiters))
+ (unless (and active-events
+ (= (logand events active-events) events))
+ (let ((active-events (logior events (or active-events 0))))
+ (set-car! fd-waiters active-events)
+ (epoll-add*! (scheduler-epfd sched) fd
+ (logior active-events EPOLLONESHOT)))))
+ (#f
+ (let ((fd-waiters (list events (cons events task))))
+ (define (finalize-fd fd)
+ ;; When a file port is closed, clear out the list of
+ ;; waiting tasks so that when/if this FD is re-used, we
+ ;; don't resume stale tasks. Note that we don't need to
+ ;; remove the FD from the epoll set, as the kernel manages
+ ;; that for us.
+ ;;
+ ;; FIXME: Is there a way to wake all tasks in a thread-safe
+ ;; way? Note that this function may be invoked from a
+ ;; finalizer thread.
+ (set-cdr! fd-waiters '())
+ (set-car! fd-waiters #f))
+ (hashv-set! (scheduler-fd-waiters sched) fd fd-waiters)
+ (add-fdes-finalizer! fd finalize-fd)
+ (epoll-add*! (scheduler-epfd sched) fd
+ (logior events EPOLLONESHOT)))))))
+
+(define (schedule-task-when-fd-readable sched fd task)
+ "Arrange to schedule @var{task} on @var{sched} when the file
+descriptor @var{fd} becomes readable."
+ (schedule-task-when-fd-active sched fd (logior EPOLLIN EPOLLRDHUP) task))
+
+(define (run-task-when-fd-writable sched fd task)
+ "Arrange to schedule @var{k} on @var{sched} when the file descriptor
+@var{fd} becomes writable."
+ (schedule-task-when-fd-active sched fd EPOLLOUT task))
+
+(define (schedule-task-at-time sched expiry task)
+ "Arrange to schedule @var{task} when the absolute real time is
+greater than or equal to @var{expiry}, expressed in internal time
+units."
+ (set-scheduler-timers! sched
+ (psq-set (scheduler-timers sched)
+ (cons expiry task)
+ expiry)))
+
+;; Shim for Guile 2.1.5.
+(unless (defined? 'suspendable-continuation?)
+ (define! 'suspendable-continuation? (lambda (tag) #t)))
+
+(define* (suspend-current-task after-suspend)
+ "Suspend the current task. After suspending, call the
+@var{after-suspend} callback with two arguments: the current
+scheduler, and the continuation of the current task. The continuation
+passed to the @var{after-suspend} handler is the continuation of the
+@code{suspend-current-task} call."
+ (let ((tag (scheduler-prompt-tag (current-scheduler))))
+ (unless (suspendable-continuation? tag)
+ (error "Attempt to suspend fiber within continuation barrier"))
+ (abort-to-prompt tag after-suspend)))
+
+(define* (yield-current-task)
+ "Yield control to the current scheduler. Like calling
+@code{(suspend-current-task schedule-task)} except that it avoids
+suspending if the current continuation isn't suspendable. Returns
+@code{#t} if the yield succeeded, or @code{#f} otherwise."
+ (match (current-scheduler)
+ (#f #f)
+ (sched
+ (let ((tag (scheduler-prompt-tag sched)))
+ (and (suspendable-continuation? tag)
+ (begin
+ (abort-to-prompt tag schedule-task)
+ #t))))))
diff --git a/fibers/timers.scm b/fibers/timers.scm
index 505939f..8bdbea3 100644
--- a/fibers/timers.scm
+++ b/fibers/timers.scm
@@ -18,7 +18,7 @@
;;;;
(define-module (fibers timers)
- #:use-module (fibers internal)
+ #:use-module (fibers scheduler)
#:use-module (fibers operations)
#:use-module (ice-9 atomic)
#:use-module (ice-9 match)
@@ -39,7 +39,7 @@
(call-with-new-thread
(lambda ()
(define (finished?) #f)
- (with-scheduler sched (run-scheduler sched finished?))))
+ (run-scheduler sched finished?)))
sched)))))
(define (timer-operation expiry)
@@ -57,11 +57,12 @@ units. The operation will succeed with no values."
('C (timer))
('S #f)))
(if sched
- (add-timer sched expiry timer)
- (create-fiber (timer-sched)
- (lambda ()
- (perform-operation (timer-operation expiry))
- (timer)))))))
+ (schedule-task-at-time sched expiry timer)
+ (schedule-task
+ (timer-sched)
+ (lambda ()
+ (perform-operation (timer-operation expiry))
+ (timer)))))))
(define (sleep-operation seconds)
"Make an operation that will succeed with no values when
diff --git a/tests/channels.scm b/tests/channels.scm
index 67fa9aa..68c98c9 100644
--- a/tests/channels.scm
+++ b/tests/channels.scm
@@ -17,7 +17,8 @@
;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
;;;;
-(define-module (tests cml)
+(define-module (tests channels)
+ #:use-module ((ice-9 threads) #:select (current-processor-count))
#:use-module (fibers)
#:use-module (fibers channels))
diff --git a/tests/preemption.scm b/tests/preemption.scm
index 1886560..e2395cf 100644
--- a/tests/preemption.scm
+++ b/tests/preemption.scm
@@ -20,7 +20,7 @@
(define-module (tests parameters)
#:use-module (ice-9 atomic)
#:use-module (fibers)
- #:use-module (fibers internal))
+ #:use-module (fibers scheduler))
(define failed? #f)