diff options
Diffstat (limited to 'fibers/scheduler.scm')
| -rw-r--r-- | fibers/scheduler.scm | 383 |
1 files changed, 383 insertions, 0 deletions
diff --git a/fibers/scheduler.scm b/fibers/scheduler.scm new file mode 100644 index 0000000..28a08b3 --- /dev/null +++ b/fibers/scheduler.scm @@ -0,0 +1,383 @@ +;; Fibers: cooperative, event-driven user-space threads. + +;;;; Copyright (C) 2016 Free Software Foundation, Inc. +;;;; +;;;; This library is free software; you can redistribute it and/or +;;;; modify it under the terms of the GNU Lesser General Public +;;;; License as published by the Free Software Foundation; either +;;;; version 3 of the License, or (at your option) any later version. +;;;; +;;;; This library is distributed in the hope that it will be useful, +;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;;; Lesser General Public License for more details. +;;;; +;;;; You should have received a copy of the GNU Lesser General Public +;;;; License along with this library; if not, write to the Free Software +;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +;;;; + +(define-module (fibers scheduler) + #:use-module (srfi srfi-9) + #:use-module (fibers stack) + #:use-module (fibers epoll) + #:use-module (fibers psq) + #:use-module (ice-9 atomic) + #:use-module (ice-9 control) + #:use-module (ice-9 match) + #:use-module (ice-9 fdes-finalizers) + #:use-module ((ice-9 threads) #:select (current-thread)) + #:export (;; Low-level interface: schedulers and tasks. + make-scheduler + (current-scheduler/public . current-scheduler) + scheduler-runcount + (scheduler-kernel-thread/public . scheduler-kernel-thread) + scheduler-remote-peers + scheduler-work-pending? + choose-parallel-scheduler + run-scheduler + destroy-scheduler + + schedule-task + schedule-task-when-fd-readable + schedule-task-when-fd-writable + schedule-task-at-time + + suspend-current-task + yield-current-task)) + +(define-record-type <scheduler> + (%make-scheduler epfd runcount-box prompt-tag + next-runqueue current-runqueue + fd-waiters timers kernel-thread + remote-peers choose-parallel-scheduler) + scheduler? + (epfd scheduler-epfd) + ;; atomic variable of uint32 + (runcount-box scheduler-runcount-box) + (prompt-tag scheduler-prompt-tag) + ;; atomic stack of tasks to run next turn (reverse order) + (next-runqueue scheduler-next-runqueue) + ;; atomic stack of tasks to run this turn + (current-runqueue scheduler-current-runqueue) + ;; fd -> (total-events (events . task) ...) + (fd-waiters scheduler-fd-waiters) + ;; PSQ of expiry -> task + (timers scheduler-timers set-scheduler-timers!) + ;; atomic parameter of thread + (kernel-thread scheduler-kernel-thread) + ;; list of sched + (remote-peers scheduler-remote-peers set-scheduler-remote-peers!) + ;; () -> sched + (choose-parallel-scheduler scheduler-choose-parallel-scheduler + set-scheduler-choose-parallel-scheduler!)) + +(define (make-atomic-parameter init) + (let ((box (make-atomic-box init))) + (case-lambda + (() (atomic-box-ref box)) + ((new) + (if (eq? new init) + (atomic-box-set! box new) + (let ((prev (atomic-box-compare-and-swap! box init new))) + (unless (eq? prev init) + (error "owned by other thread" prev)))))))) + +(define (shuffle l) + (map cdr (sort (map (lambda (x) (cons (random 1.0) x)) l) + (lambda (a b) (< (car a) (car b)))))) + +(define (make-selector items) + (let ((items (list->vector (shuffle items)))) + (match (vector-length items) + (0 (lambda () #f)) + (1 (let ((item (vector-ref items 0))) (lambda () item))) + (n (let ((idx 0)) + (lambda () + (let ((item (vector-ref items idx))) + (set! idx (let ((idx (1+ idx))) + (if (= idx (vector-length items)) 0 idx))) + item))))))) + +(define* (make-scheduler #:key parallelism + (prompt-tag (make-prompt-tag "fibers"))) + "Make a new scheduler in which to run fibers." + (let ((epfd (epoll-create)) + (runcount-box (make-atomic-box 0)) + (next-runqueue (make-empty-stack)) + (current-runqueue (make-empty-stack)) + (fd-waiters (make-hash-table)) + (timers (make-psq (match-lambda* + (((t1 . c1) (t2 . c2)) (< t1 t2))) + <)) + (kernel-thread (make-atomic-parameter #f))) + (let* ((sched (%make-scheduler epfd runcount-box prompt-tag + next-runqueue current-runqueue + fd-waiters timers kernel-thread + #f #f)) + (all-scheds + (cons sched + (if parallelism + (map (lambda (_) + (make-scheduler #:prompt-tag prompt-tag)) + (iota (1- parallelism))) + '())))) + (for-each + (lambda (sched) + (let ((choose! (make-selector all-scheds))) + (set-scheduler-remote-peers! sched (delq sched all-scheds)) + (set-scheduler-choose-parallel-scheduler! sched choose!))) + all-scheds) + sched))) + +(define current-scheduler (fluid->parameter (make-thread-local-fluid #f))) +(define (current-scheduler/public) + "Return the current scheduler, or @code{#f} if no scheduler is current." + (current-scheduler)) + +(define-syntax-rule (with-scheduler scheduler body ...) + "Evaluate @code{(begin @var{body} ...)} in an environment in which +@var{scheduler} is bound to the current kernel thread and +@code{current-scheduler} is bound to @var{scheduler}. Signal an error +if @var{scheduler} is already running in some other kernel thread." + (let ((sched scheduler)) + (dynamic-wind (lambda () + ((scheduler-kernel-thread sched) (current-thread))) + (lambda () + (parameterize ((current-scheduler sched)) + body ...)) + (lambda () + ((scheduler-kernel-thread sched) #f))))) + +(define (scheduler-runcount sched) + "Return the number of tasks that have been run on @var{sched} since +it was started, modulo 2@sup{32}." + (atomic-box-ref (scheduler-runcount-box sched))) + +(define (scheduler-kernel-thread/public sched) + "Return the kernel thread on which @var{sched} is running, or +@code{#f} if @var{sched} is not running." + ((scheduler-kernel-thread sched))) + +(define (choose-parallel-scheduler sched) + ((scheduler-choose-parallel-scheduler sched))) + +(define-inlinable (schedule-task/no-wakeup sched task) + (stack-push! (scheduler-next-runqueue sched) task)) + +(define (schedule-task sched task) + "Add the task @var{task} to the run queue of the scheduler +@var{sched}. On the next turn, @var{sched} will invoke @var{task} +with no arguments. + +This function is thread-safe even if @var{sched} is running on a +remote kernel thread." + (schedule-task/no-wakeup sched task) + (unless (eq? ((scheduler-kernel-thread sched)) (current-thread)) + (epoll-wake! (scheduler-epfd sched))) + (values)) + +(define (schedule-tasks-for-active-fd fd revents sched) + (match (hashv-ref (scheduler-fd-waiters sched) fd) + (#f (warn "scheduler for unknown fd" fd)) + ((and events+waiters (active-events . waiters)) + ;; First, clear the active status, as the EPOLLONESHOT has + ;; deactivated our entry in the epoll set. + (set-car! events+waiters #f) + (set-cdr! events+waiters '()) + (unless (zero? (logand revents EPOLLERR)) + (hashv-remove! (scheduler-fd-waiters sched) fd)) + ;; Now resume or re-schedule waiters, as appropriate. + (let lp ((waiters waiters)) + (match waiters + (() #f) + (((events . task) . waiters) + (if (zero? (logand revents (logior events EPOLLERR))) + ;; Re-schedule. + (schedule-task-when-fd-active sched fd events task) + ;; Resume. + (schedule-task/no-wakeup sched task)) + (lp waiters))))))) + +(define (schedule-tasks-for-expired-timers sched) + ;; Schedule expired timer tasks in the order that they expired. + (let ((now (get-internal-real-time))) + (let expire-timers ((timers (scheduler-timers sched))) + (cond + ((or (psq-empty? timers) + (< now (car (psq-min timers)))) + (set-scheduler-timers! sched timers)) + (else + (call-with-values (lambda () (psq-pop timers)) + (match-lambda* + (((_ . task) timers) + (schedule-task/no-wakeup sched task) + (expire-timers timers))))))))) + +(define (schedule-tasks-for-next-turn sched) + ;; Called when all tasks from the current turn have been run. + ;; Note that there may be tasks already scheduled for the next + ;; turn; one way this can happen is if a fiber suspended itself + ;; because it was blocked on a channel, but then another fiber woke + ;; it up, or if a remote thread scheduled a fiber on this scheduler. + ;; In any case, check the kernel to see if any of the fd's that we + ;; are interested in are active, and in that case schedule their + ;; corresponding tasks. Also run any timers that have timed out. + (define (timers-expiry timers) + (and (not (psq-empty? timers)) + (match (psq-min timers) + ((expiry . task) + expiry)))) + (define (update-expiry expiry) + ;; If there are pending tasks, cause epoll to return + ;; immediately. + (if (stack-empty? (scheduler-next-runqueue sched)) + expiry + 0)) + (epoll (scheduler-epfd sched) + #:expiry (timers-expiry (scheduler-timers sched)) + #:update-expiry update-expiry + #:folder (lambda (fd revents sched) + (schedule-tasks-for-active-fd fd revents sched) + sched) + #:seed sched) + (schedule-tasks-for-expired-timers sched)) + +(define (work-stealer sched) + "Steal some work from a random scheduler in the vector +@var{schedulers}. Return a task, or @code{#f} if no work could be +stolen." + (let ((selector (make-selector (scheduler-remote-peers sched)))) + (lambda () + (let ((peer (selector))) + (and peer + (stack-pop! (scheduler-current-runqueue peer) #f)))))) + +(define (scheduler-work-pending? sched) + "Return @code{#t} if @var{sched} has any work pending: any tasks or +any pending timeouts." + (not (and (psq-empty? (scheduler-timers sched)) + (stack-empty? (scheduler-current-runqueue sched)) + (stack-empty? (scheduler-next-runqueue sched))))) + +(define* (run-scheduler sched finished?) + "Run @var{sched} until calling @code{finished?} returns a true +value. Return zero values." + (let ((tag (scheduler-prompt-tag sched)) + (runcount-box (scheduler-runcount-box sched)) + (next (scheduler-next-runqueue sched)) + (cur (scheduler-current-runqueue sched)) + (steal-work! (work-stealer sched))) + (define (run-task task) + (atomic-box-set! runcount-box + (logand (1+ (atomic-box-ref runcount-box)) #xffffFFFF)) + (call-with-prompt tag + task + (lambda (k after-suspend) + (after-suspend sched k)))) + (with-scheduler + sched + (let next-turn () + (unless (finished?) + (schedule-tasks-for-next-turn sched) + (stack-push-list! cur (reverse (stack-pop-all! next))) + (let next-task () + (match (stack-pop! cur #f) + (#f + (when (stack-empty? next) + ;; Both current and next runqueues are empty; steal a + ;; little bit of work from a remote scheduler if we + ;; can. Run it directly instead of pushing onto a + ;; queue to avoid double stealing. + (let ((task (steal-work!))) + (when task + (run-task task)))) + (next-turn)) + (task + (run-task task) + (next-task))))))))) + +(define (destroy-scheduler sched) + "Release any resources associated with @var{sched}." + (epoll-destroy (scheduler-epfd sched))) + +(define (schedule-task-when-fd-active sched fd events task) + "Arrange for @var{sched} to schedule @var{task} when the file +descriptor @var{fd} becomes active with any of the given @var{events}, +expressed as an epoll bitfield." + (let ((fd-waiters (hashv-ref (scheduler-fd-waiters sched) fd))) + (match fd-waiters + ((active-events . waiters) + (set-cdr! fd-waiters (acons events task waiters)) + (unless (and active-events + (= (logand events active-events) events)) + (let ((active-events (logior events (or active-events 0)))) + (set-car! fd-waiters active-events) + (epoll-add*! (scheduler-epfd sched) fd + (logior active-events EPOLLONESHOT))))) + (#f + (let ((fd-waiters (list events (cons events task)))) + (define (finalize-fd fd) + ;; When a file port is closed, clear out the list of + ;; waiting tasks so that when/if this FD is re-used, we + ;; don't resume stale tasks. Note that we don't need to + ;; remove the FD from the epoll set, as the kernel manages + ;; that for us. + ;; + ;; FIXME: Is there a way to wake all tasks in a thread-safe + ;; way? Note that this function may be invoked from a + ;; finalizer thread. + (set-cdr! fd-waiters '()) + (set-car! fd-waiters #f)) + (hashv-set! (scheduler-fd-waiters sched) fd fd-waiters) + (add-fdes-finalizer! fd finalize-fd) + (epoll-add*! (scheduler-epfd sched) fd + (logior events EPOLLONESHOT))))))) + +(define (schedule-task-when-fd-readable sched fd task) + "Arrange to schedule @var{task} on @var{sched} when the file +descriptor @var{fd} becomes readable." + (schedule-task-when-fd-active sched fd (logior EPOLLIN EPOLLRDHUP) task)) + +(define (run-task-when-fd-writable sched fd task) + "Arrange to schedule @var{k} on @var{sched} when the file descriptor +@var{fd} becomes writable." + (schedule-task-when-fd-active sched fd EPOLLOUT task)) + +(define (schedule-task-at-time sched expiry task) + "Arrange to schedule @var{task} when the absolute real time is +greater than or equal to @var{expiry}, expressed in internal time +units." + (set-scheduler-timers! sched + (psq-set (scheduler-timers sched) + (cons expiry task) + expiry))) + +;; Shim for Guile 2.1.5. +(unless (defined? 'suspendable-continuation?) + (define! 'suspendable-continuation? (lambda (tag) #t))) + +(define* (suspend-current-task after-suspend) + "Suspend the current task. After suspending, call the +@var{after-suspend} callback with two arguments: the current +scheduler, and the continuation of the current task. The continuation +passed to the @var{after-suspend} handler is the continuation of the +@code{suspend-current-task} call." + (let ((tag (scheduler-prompt-tag (current-scheduler)))) + (unless (suspendable-continuation? tag) + (error "Attempt to suspend fiber within continuation barrier")) + (abort-to-prompt tag after-suspend))) + +(define* (yield-current-task) + "Yield control to the current scheduler. Like calling +@code{(suspend-current-task schedule-task)} except that it avoids +suspending if the current continuation isn't suspendable. Returns +@code{#t} if the yield succeeded, or @code{#f} otherwise." + (match (current-scheduler) + (#f #f) + (sched + (let ((tag (scheduler-prompt-tag sched))) + (and (suspendable-continuation? tag) + (begin + (abort-to-prompt tag schedule-task) + #t)))))) |
