summaryrefslogtreecommitdiff
path: root/fibers/scheduler.scm
diff options
context:
space:
mode:
Diffstat (limited to 'fibers/scheduler.scm')
-rw-r--r--fibers/scheduler.scm383
1 files changed, 383 insertions, 0 deletions
diff --git a/fibers/scheduler.scm b/fibers/scheduler.scm
new file mode 100644
index 0000000..28a08b3
--- /dev/null
+++ b/fibers/scheduler.scm
@@ -0,0 +1,383 @@
+;; Fibers: cooperative, event-driven user-space threads.
+
+;;;; Copyright (C) 2016 Free Software Foundation, Inc.
+;;;;
+;;;; This library is free software; you can redistribute it and/or
+;;;; modify it under the terms of the GNU Lesser General Public
+;;;; License as published by the Free Software Foundation; either
+;;;; version 3 of the License, or (at your option) any later version.
+;;;;
+;;;; This library is distributed in the hope that it will be useful,
+;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;;; Lesser General Public License for more details.
+;;;;
+;;;; You should have received a copy of the GNU Lesser General Public
+;;;; License along with this library; if not, write to the Free Software
+;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+;;;;
+
+(define-module (fibers scheduler)
+ #:use-module (srfi srfi-9)
+ #:use-module (fibers stack)
+ #:use-module (fibers epoll)
+ #:use-module (fibers psq)
+ #:use-module (ice-9 atomic)
+ #:use-module (ice-9 control)
+ #:use-module (ice-9 match)
+ #:use-module (ice-9 fdes-finalizers)
+ #:use-module ((ice-9 threads) #:select (current-thread))
+ #:export (;; Low-level interface: schedulers and tasks.
+ make-scheduler
+ (current-scheduler/public . current-scheduler)
+ scheduler-runcount
+ (scheduler-kernel-thread/public . scheduler-kernel-thread)
+ scheduler-remote-peers
+ scheduler-work-pending?
+ choose-parallel-scheduler
+ run-scheduler
+ destroy-scheduler
+
+ schedule-task
+ schedule-task-when-fd-readable
+ schedule-task-when-fd-writable
+ schedule-task-at-time
+
+ suspend-current-task
+ yield-current-task))
+
+(define-record-type <scheduler>
+ (%make-scheduler epfd runcount-box prompt-tag
+ next-runqueue current-runqueue
+ fd-waiters timers kernel-thread
+ remote-peers choose-parallel-scheduler)
+ scheduler?
+ (epfd scheduler-epfd)
+ ;; atomic variable of uint32
+ (runcount-box scheduler-runcount-box)
+ (prompt-tag scheduler-prompt-tag)
+ ;; atomic stack of tasks to run next turn (reverse order)
+ (next-runqueue scheduler-next-runqueue)
+ ;; atomic stack of tasks to run this turn
+ (current-runqueue scheduler-current-runqueue)
+ ;; fd -> (total-events (events . task) ...)
+ (fd-waiters scheduler-fd-waiters)
+ ;; PSQ of expiry -> task
+ (timers scheduler-timers set-scheduler-timers!)
+ ;; atomic parameter of thread
+ (kernel-thread scheduler-kernel-thread)
+ ;; list of sched
+ (remote-peers scheduler-remote-peers set-scheduler-remote-peers!)
+ ;; () -> sched
+ (choose-parallel-scheduler scheduler-choose-parallel-scheduler
+ set-scheduler-choose-parallel-scheduler!))
+
+(define (make-atomic-parameter init)
+ (let ((box (make-atomic-box init)))
+ (case-lambda
+ (() (atomic-box-ref box))
+ ((new)
+ (if (eq? new init)
+ (atomic-box-set! box new)
+ (let ((prev (atomic-box-compare-and-swap! box init new)))
+ (unless (eq? prev init)
+ (error "owned by other thread" prev))))))))
+
+(define (shuffle l)
+ (map cdr (sort (map (lambda (x) (cons (random 1.0) x)) l)
+ (lambda (a b) (< (car a) (car b))))))
+
+(define (make-selector items)
+ (let ((items (list->vector (shuffle items))))
+ (match (vector-length items)
+ (0 (lambda () #f))
+ (1 (let ((item (vector-ref items 0))) (lambda () item)))
+ (n (let ((idx 0))
+ (lambda ()
+ (let ((item (vector-ref items idx)))
+ (set! idx (let ((idx (1+ idx)))
+ (if (= idx (vector-length items)) 0 idx)))
+ item)))))))
+
+(define* (make-scheduler #:key parallelism
+ (prompt-tag (make-prompt-tag "fibers")))
+ "Make a new scheduler in which to run fibers."
+ (let ((epfd (epoll-create))
+ (runcount-box (make-atomic-box 0))
+ (next-runqueue (make-empty-stack))
+ (current-runqueue (make-empty-stack))
+ (fd-waiters (make-hash-table))
+ (timers (make-psq (match-lambda*
+ (((t1 . c1) (t2 . c2)) (< t1 t2)))
+ <))
+ (kernel-thread (make-atomic-parameter #f)))
+ (let* ((sched (%make-scheduler epfd runcount-box prompt-tag
+ next-runqueue current-runqueue
+ fd-waiters timers kernel-thread
+ #f #f))
+ (all-scheds
+ (cons sched
+ (if parallelism
+ (map (lambda (_)
+ (make-scheduler #:prompt-tag prompt-tag))
+ (iota (1- parallelism)))
+ '()))))
+ (for-each
+ (lambda (sched)
+ (let ((choose! (make-selector all-scheds)))
+ (set-scheduler-remote-peers! sched (delq sched all-scheds))
+ (set-scheduler-choose-parallel-scheduler! sched choose!)))
+ all-scheds)
+ sched)))
+
+(define current-scheduler (fluid->parameter (make-thread-local-fluid #f)))
+(define (current-scheduler/public)
+ "Return the current scheduler, or @code{#f} if no scheduler is current."
+ (current-scheduler))
+
+(define-syntax-rule (with-scheduler scheduler body ...)
+ "Evaluate @code{(begin @var{body} ...)} in an environment in which
+@var{scheduler} is bound to the current kernel thread and
+@code{current-scheduler} is bound to @var{scheduler}. Signal an error
+if @var{scheduler} is already running in some other kernel thread."
+ (let ((sched scheduler))
+ (dynamic-wind (lambda ()
+ ((scheduler-kernel-thread sched) (current-thread)))
+ (lambda ()
+ (parameterize ((current-scheduler sched))
+ body ...))
+ (lambda ()
+ ((scheduler-kernel-thread sched) #f)))))
+
+(define (scheduler-runcount sched)
+ "Return the number of tasks that have been run on @var{sched} since
+it was started, modulo 2@sup{32}."
+ (atomic-box-ref (scheduler-runcount-box sched)))
+
+(define (scheduler-kernel-thread/public sched)
+ "Return the kernel thread on which @var{sched} is running, or
+@code{#f} if @var{sched} is not running."
+ ((scheduler-kernel-thread sched)))
+
+(define (choose-parallel-scheduler sched)
+ ((scheduler-choose-parallel-scheduler sched)))
+
+(define-inlinable (schedule-task/no-wakeup sched task)
+ (stack-push! (scheduler-next-runqueue sched) task))
+
+(define (schedule-task sched task)
+ "Add the task @var{task} to the run queue of the scheduler
+@var{sched}. On the next turn, @var{sched} will invoke @var{task}
+with no arguments.
+
+This function is thread-safe even if @var{sched} is running on a
+remote kernel thread."
+ (schedule-task/no-wakeup sched task)
+ (unless (eq? ((scheduler-kernel-thread sched)) (current-thread))
+ (epoll-wake! (scheduler-epfd sched)))
+ (values))
+
+(define (schedule-tasks-for-active-fd fd revents sched)
+ (match (hashv-ref (scheduler-fd-waiters sched) fd)
+ (#f (warn "scheduler for unknown fd" fd))
+ ((and events+waiters (active-events . waiters))
+ ;; First, clear the active status, as the EPOLLONESHOT has
+ ;; deactivated our entry in the epoll set.
+ (set-car! events+waiters #f)
+ (set-cdr! events+waiters '())
+ (unless (zero? (logand revents EPOLLERR))
+ (hashv-remove! (scheduler-fd-waiters sched) fd))
+ ;; Now resume or re-schedule waiters, as appropriate.
+ (let lp ((waiters waiters))
+ (match waiters
+ (() #f)
+ (((events . task) . waiters)
+ (if (zero? (logand revents (logior events EPOLLERR)))
+ ;; Re-schedule.
+ (schedule-task-when-fd-active sched fd events task)
+ ;; Resume.
+ (schedule-task/no-wakeup sched task))
+ (lp waiters)))))))
+
+(define (schedule-tasks-for-expired-timers sched)
+ ;; Schedule expired timer tasks in the order that they expired.
+ (let ((now (get-internal-real-time)))
+ (let expire-timers ((timers (scheduler-timers sched)))
+ (cond
+ ((or (psq-empty? timers)
+ (< now (car (psq-min timers))))
+ (set-scheduler-timers! sched timers))
+ (else
+ (call-with-values (lambda () (psq-pop timers))
+ (match-lambda*
+ (((_ . task) timers)
+ (schedule-task/no-wakeup sched task)
+ (expire-timers timers)))))))))
+
+(define (schedule-tasks-for-next-turn sched)
+ ;; Called when all tasks from the current turn have been run.
+ ;; Note that there may be tasks already scheduled for the next
+ ;; turn; one way this can happen is if a fiber suspended itself
+ ;; because it was blocked on a channel, but then another fiber woke
+ ;; it up, or if a remote thread scheduled a fiber on this scheduler.
+ ;; In any case, check the kernel to see if any of the fd's that we
+ ;; are interested in are active, and in that case schedule their
+ ;; corresponding tasks. Also run any timers that have timed out.
+ (define (timers-expiry timers)
+ (and (not (psq-empty? timers))
+ (match (psq-min timers)
+ ((expiry . task)
+ expiry))))
+ (define (update-expiry expiry)
+ ;; If there are pending tasks, cause epoll to return
+ ;; immediately.
+ (if (stack-empty? (scheduler-next-runqueue sched))
+ expiry
+ 0))
+ (epoll (scheduler-epfd sched)
+ #:expiry (timers-expiry (scheduler-timers sched))
+ #:update-expiry update-expiry
+ #:folder (lambda (fd revents sched)
+ (schedule-tasks-for-active-fd fd revents sched)
+ sched)
+ #:seed sched)
+ (schedule-tasks-for-expired-timers sched))
+
+(define (work-stealer sched)
+ "Steal some work from a random scheduler in the vector
+@var{schedulers}. Return a task, or @code{#f} if no work could be
+stolen."
+ (let ((selector (make-selector (scheduler-remote-peers sched))))
+ (lambda ()
+ (let ((peer (selector)))
+ (and peer
+ (stack-pop! (scheduler-current-runqueue peer) #f))))))
+
+(define (scheduler-work-pending? sched)
+ "Return @code{#t} if @var{sched} has any work pending: any tasks or
+any pending timeouts."
+ (not (and (psq-empty? (scheduler-timers sched))
+ (stack-empty? (scheduler-current-runqueue sched))
+ (stack-empty? (scheduler-next-runqueue sched)))))
+
+(define* (run-scheduler sched finished?)
+ "Run @var{sched} until calling @code{finished?} returns a true
+value. Return zero values."
+ (let ((tag (scheduler-prompt-tag sched))
+ (runcount-box (scheduler-runcount-box sched))
+ (next (scheduler-next-runqueue sched))
+ (cur (scheduler-current-runqueue sched))
+ (steal-work! (work-stealer sched)))
+ (define (run-task task)
+ (atomic-box-set! runcount-box
+ (logand (1+ (atomic-box-ref runcount-box)) #xffffFFFF))
+ (call-with-prompt tag
+ task
+ (lambda (k after-suspend)
+ (after-suspend sched k))))
+ (with-scheduler
+ sched
+ (let next-turn ()
+ (unless (finished?)
+ (schedule-tasks-for-next-turn sched)
+ (stack-push-list! cur (reverse (stack-pop-all! next)))
+ (let next-task ()
+ (match (stack-pop! cur #f)
+ (#f
+ (when (stack-empty? next)
+ ;; Both current and next runqueues are empty; steal a
+ ;; little bit of work from a remote scheduler if we
+ ;; can. Run it directly instead of pushing onto a
+ ;; queue to avoid double stealing.
+ (let ((task (steal-work!)))
+ (when task
+ (run-task task))))
+ (next-turn))
+ (task
+ (run-task task)
+ (next-task)))))))))
+
+(define (destroy-scheduler sched)
+ "Release any resources associated with @var{sched}."
+ (epoll-destroy (scheduler-epfd sched)))
+
+(define (schedule-task-when-fd-active sched fd events task)
+ "Arrange for @var{sched} to schedule @var{task} when the file
+descriptor @var{fd} becomes active with any of the given @var{events},
+expressed as an epoll bitfield."
+ (let ((fd-waiters (hashv-ref (scheduler-fd-waiters sched) fd)))
+ (match fd-waiters
+ ((active-events . waiters)
+ (set-cdr! fd-waiters (acons events task waiters))
+ (unless (and active-events
+ (= (logand events active-events) events))
+ (let ((active-events (logior events (or active-events 0))))
+ (set-car! fd-waiters active-events)
+ (epoll-add*! (scheduler-epfd sched) fd
+ (logior active-events EPOLLONESHOT)))))
+ (#f
+ (let ((fd-waiters (list events (cons events task))))
+ (define (finalize-fd fd)
+ ;; When a file port is closed, clear out the list of
+ ;; waiting tasks so that when/if this FD is re-used, we
+ ;; don't resume stale tasks. Note that we don't need to
+ ;; remove the FD from the epoll set, as the kernel manages
+ ;; that for us.
+ ;;
+ ;; FIXME: Is there a way to wake all tasks in a thread-safe
+ ;; way? Note that this function may be invoked from a
+ ;; finalizer thread.
+ (set-cdr! fd-waiters '())
+ (set-car! fd-waiters #f))
+ (hashv-set! (scheduler-fd-waiters sched) fd fd-waiters)
+ (add-fdes-finalizer! fd finalize-fd)
+ (epoll-add*! (scheduler-epfd sched) fd
+ (logior events EPOLLONESHOT)))))))
+
+(define (schedule-task-when-fd-readable sched fd task)
+ "Arrange to schedule @var{task} on @var{sched} when the file
+descriptor @var{fd} becomes readable."
+ (schedule-task-when-fd-active sched fd (logior EPOLLIN EPOLLRDHUP) task))
+
+(define (run-task-when-fd-writable sched fd task)
+ "Arrange to schedule @var{k} on @var{sched} when the file descriptor
+@var{fd} becomes writable."
+ (schedule-task-when-fd-active sched fd EPOLLOUT task))
+
+(define (schedule-task-at-time sched expiry task)
+ "Arrange to schedule @var{task} when the absolute real time is
+greater than or equal to @var{expiry}, expressed in internal time
+units."
+ (set-scheduler-timers! sched
+ (psq-set (scheduler-timers sched)
+ (cons expiry task)
+ expiry)))
+
+;; Shim for Guile 2.1.5.
+(unless (defined? 'suspendable-continuation?)
+ (define! 'suspendable-continuation? (lambda (tag) #t)))
+
+(define* (suspend-current-task after-suspend)
+ "Suspend the current task. After suspending, call the
+@var{after-suspend} callback with two arguments: the current
+scheduler, and the continuation of the current task. The continuation
+passed to the @var{after-suspend} handler is the continuation of the
+@code{suspend-current-task} call."
+ (let ((tag (scheduler-prompt-tag (current-scheduler))))
+ (unless (suspendable-continuation? tag)
+ (error "Attempt to suspend fiber within continuation barrier"))
+ (abort-to-prompt tag after-suspend)))
+
+(define* (yield-current-task)
+ "Yield control to the current scheduler. Like calling
+@code{(suspend-current-task schedule-task)} except that it avoids
+suspending if the current continuation isn't suspendable. Returns
+@code{#t} if the yield succeeded, or @code{#f} otherwise."
+ (match (current-scheduler)
+ (#f #f)
+ (sched
+ (let ((tag (scheduler-prompt-tag sched)))
+ (and (suspendable-continuation? tag)
+ (begin
+ (abort-to-prompt tag schedule-task)
+ #t))))))