diff options
Diffstat (limited to 'fibers/internal.scm')
| -rw-r--r-- | fibers/internal.scm | 489 |
1 files changed, 0 insertions, 489 deletions
diff --git a/fibers/internal.scm b/fibers/internal.scm deleted file mode 100644 index 79b1011..0000000 --- a/fibers/internal.scm +++ /dev/null @@ -1,489 +0,0 @@ -;; Fibers: cooperative, event-driven user-space threads. - -;;;; Copyright (C) 2016 Free Software Foundation, Inc. -;;;; -;;;; This library is free software; you can redistribute it and/or -;;;; modify it under the terms of the GNU Lesser General Public -;;;; License as published by the Free Software Foundation; either -;;;; version 3 of the License, or (at your option) any later version. -;;;; -;;;; This library is distributed in the hope that it will be useful, -;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of -;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -;;;; Lesser General Public License for more details. -;;;; -;;;; You should have received a copy of the GNU Lesser General Public -;;;; License along with this library; if not, write to the Free Software -;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -;;;; - -(define-module (fibers internal) - #:use-module (srfi srfi-9) - #:use-module (fibers stack) - #:use-module (fibers epoll) - #:use-module (fibers psq) - #:use-module (fibers nameset) - #:use-module (ice-9 atomic) - #:use-module (ice-9 control) - #:use-module (ice-9 match) - #:use-module (ice-9 fdes-finalizers) - #:use-module ((ice-9 threads) #:select (current-thread)) - #:export (;; Low-level interface: schedulers and threads. - make-scheduler - with-scheduler - scheduler-name - scheduler-runcount - (scheduler-kernel-thread/public . scheduler-kernel-thread) - scheduler-remote-peers - scheduler-work-pending? - choose-parallel-scheduler - run-scheduler - destroy-scheduler - - resume-on-readable-fd - resume-on-writable-fd - add-timer - - create-fiber - (current-fiber/public . current-fiber) - kill-fiber - fiber-scheduler - fiber-continuation - - fold-all-schedulers - scheduler-by-name - fold-all-fibers - fiber-by-name - - suspend-current-fiber - resume-fiber - yield-current-fiber)) - -(define-once fibers-nameset (make-nameset)) -(define-once schedulers-nameset (make-nameset)) - -(define (fold-all-schedulers f seed) - "Fold @var{f} over the set of known schedulers. @var{f} will be -invoked as @code{(@var{f} @var{name} @var{scheduler} @var{seed})}." - (nameset-fold f schedulers-nameset seed)) -(define (scheduler-by-name name) - "Return the scheduler named @var{name}, or @code{#f} if no scheduler -of that name is known." - (nameset-ref schedulers-nameset name)) - -(define (fold-all-fibers f seed) - "Fold @var{f} over the set of known fibers. @var{f} will be -invoked as @code{(@var{f} @var{name} @var{fiber} @var{seed})}." - (nameset-fold f fibers-nameset seed)) -(define (fiber-by-name name) - "Return the fiber named @var{name}, or @code{#f} if no fiber of that -name is known." - (nameset-ref fibers-nameset name)) - -(define-record-type <scheduler> - (%make-scheduler name epfd runcount-box prompt-tag - next-runqueue current-runqueue - sources timers kernel-thread - remote-peers choose-parallel-scheduler) - scheduler? - (name scheduler-name set-scheduler-name!) - (epfd scheduler-epfd) - ;; atomic variable of uint32 - (runcount-box scheduler-runcount-box) - (prompt-tag scheduler-prompt-tag) - ;; atomic stack of fiber to run next turn (reverse order) - (next-runqueue scheduler-next-runqueue) - ;; atomic stack of fiber to run this turn - (current-runqueue scheduler-current-runqueue) - ;; fd -> (total-events (events . resume-fn) ...) - (sources scheduler-sources) - ;; PSQ of thunk -> expiry - (timers scheduler-timers set-scheduler-timers!) - ;; atomic parameter of thread - (kernel-thread scheduler-kernel-thread) - ;; list of sched - (remote-peers scheduler-remote-peers set-scheduler-remote-peers!) - ;; () -> sched - (choose-parallel-scheduler scheduler-choose-parallel-scheduler - set-scheduler-choose-parallel-scheduler!)) - -(define-record-type <fiber> - (make-fiber scheduler continuation) - fiber? - ;; The scheduler to which a fiber is currently bound. - (scheduler fiber-scheduler set-fiber-scheduler!) - ;; What the fiber should do when it resumes, or #f if the fiber is - ;; currently running. - (continuation fiber-continuation set-fiber-continuation!)) - -(define (make-atomic-parameter init) - (let ((box (make-atomic-box init))) - (case-lambda - (() (atomic-box-ref box)) - ((new) - (if (eq? new init) - (atomic-box-set! box new) - (let ((prev (atomic-box-compare-and-swap! box init new))) - (unless (eq? prev init) - (error "owned by other thread" prev)))))))) - -(define (shuffle l) - (map cdr (sort (map (lambda (x) (cons (random 1.0) x)) l) - (lambda (a b) (< (car a) (car b)))))) - -(define (make-selector items) - (let ((items (list->vector (shuffle items)))) - (match (vector-length items) - (0 (lambda () #f)) - (1 (let ((item (vector-ref items 0))) (lambda () item))) - (n (let ((idx 0)) - (lambda () - (let ((item (vector-ref items idx))) - (set! idx (let ((idx (1+ idx))) - (if (= idx (vector-length items)) 0 idx))) - item))))))) - -(define* (make-scheduler #:key parallelism - (prompt-tag (make-prompt-tag "fibers"))) - "Make a new scheduler in which to run fibers." - (let ((epfd (epoll-create)) - (runcount-box (make-atomic-box 0)) - (next-runqueue (make-empty-stack)) - (current-runqueue (make-empty-stack)) - (sources (make-hash-table)) - (timers (make-psq (match-lambda* - (((t1 . c1) (t2 . c2)) (< t1 t2))) - <)) - (kernel-thread (make-atomic-parameter #f))) - (let ((sched (%make-scheduler #f epfd runcount-box prompt-tag - next-runqueue current-runqueue - sources timers kernel-thread - #f #f))) - (set-scheduler-name! sched (nameset-add! schedulers-nameset sched)) - (let ((all-scheds - (cons sched - (if parallelism - (map (lambda (_) - (make-scheduler #:prompt-tag prompt-tag)) - (iota (1- parallelism))) - '())))) - (for-each - (lambda (sched) - (let ((choose! (make-selector all-scheds))) - (set-scheduler-remote-peers! sched (delq sched all-scheds)) - (set-scheduler-choose-parallel-scheduler! sched choose!))) - all-scheds)) - sched))) - -(define-syntax-rule (with-scheduler scheduler body ...) - "Evaluate @code{(begin @var{body} ...)} in an environment in which -@var{scheduler} is bound to the current kernel thread. Signal an -error if @var{scheduler} is already running in some other kernel -thread." - (let ((sched scheduler)) - (dynamic-wind (lambda () - ((scheduler-kernel-thread sched) (current-thread))) - (lambda () - body ...) - (lambda () - ((scheduler-kernel-thread sched) #f))))) - -(define (scheduler-runcount sched) - "Return the number of fibers that have been scheduled on -@var{sched} since it was started, modulo 2@sup{32}." - (atomic-box-ref (scheduler-runcount-box sched))) - -(define (scheduler-kernel-thread/public sched) - "Return the kernel thread on which @var{sched} is running, or -@code{#f} if @var{sched} is not running." - ((scheduler-kernel-thread sched))) - -(define (choose-parallel-scheduler sched) - ((scheduler-choose-parallel-scheduler sched))) - -(define current-fiber (make-parameter #f)) -(define (current-fiber/public) - "Return the current fiber, or @code{#f} if no fiber is current." - (current-fiber)) - -(define (schedule-fiber! fiber thunk) - ;; The fiber will be resumed at most once, and we are the ones that - ;; will resume it, so we can set the thunk directly. Adding the - ;; fiber to the runqueue is an atomic operation with SEQ_CST - ;; ordering, so that will make sure this operation is visible even - ;; for a fiber scheduled on a remote thread. - (set-fiber-continuation! fiber thunk) - (let ((sched (fiber-scheduler fiber))) - (stack-push! (scheduler-next-runqueue sched) fiber) - (unless (eq? ((scheduler-kernel-thread sched)) (current-thread)) - (epoll-wake! (scheduler-epfd sched))) - (values))) - -(define internal-time-units-per-millisecond - (/ internal-time-units-per-second 1000)) - -(define (schedule-fibers-for-fd fd revents sched) - (match (hashv-ref (scheduler-sources sched) fd) - (#f (warn "scheduler for unknown fd" fd)) - ((and sources (active-events . waiters)) - ;; First, clear the active status, as the EPOLLONESHOT has - ;; deactivated our entry in the epoll set. - (set-car! sources #f) - (set-cdr! sources '()) - (unless (zero? (logand revents EPOLLERR)) - (hashv-remove! (scheduler-sources sched) fd)) - ;; Now resume or re-enqueue fibers, as appropriate. - (let lp ((waiters waiters)) - (match waiters - (() #f) - (((events . resume) . waiters) - (if (zero? (logand revents (logior events EPOLLERR))) - ;; Re-enqueue. - (add-fd-event-waiter sched fd events resume) - ;; Resume. - (resume revents)) - (lp waiters))))))) - -(define (run-timers sched) - ;; Run expired timer thunks in the order that they expired. - (let ((now (get-internal-real-time))) - (let run-timers ((timers (scheduler-timers sched))) - (cond - ((or (psq-empty? timers) - (< now (car (psq-min timers)))) - (set-scheduler-timers! sched timers)) - (else - (call-with-values (lambda () (psq-pop timers)) - (match-lambda* - (((_ . thunk) timers) - (thunk) - (run-timers timers))))))))) - -(define (schedule-runnables-for-next-turn sched) - ;; Called when all runnables from the current turn have been run. - ;; Note that there may be runnables already scheduled for the next - ;; turn; one way this can happen is if a fiber suspended itself - ;; because it was blocked on a channel, but then another fiber woke - ;; it up, or if a remote thread scheduled a fiber on this scheduler. - ;; In any case, check the kernel to see if any of the fd's that we - ;; are interested in are active, and in that case schedule their - ;; corresponding fibers. Also run any timers that have timed out. - (define (timers-expiry timers) - (and (not (psq-empty? timers)) - (match (psq-min timers) - ((expiry . thunk) - expiry)))) - (define (update-expiry expiry) - ;; If there are pending runnables, cause epoll to return - ;; immediately. - (if (stack-empty? (scheduler-next-runqueue sched)) - expiry - 0)) - (epoll (scheduler-epfd sched) - #:expiry (timers-expiry (scheduler-timers sched)) - #:update-expiry update-expiry - #:folder (lambda (fd revents sched) - (schedule-fibers-for-fd fd revents sched) - sched) - #:seed sched) - (run-timers sched)) - -(define (fiber-stealer sched) - "Steal some work from a random scheduler in the vector -@var{schedulers}. Return a fiber, or @code{#f} if no work could be -stolen." - (let ((selector (make-selector (scheduler-remote-peers sched)))) - (lambda () - (let ((peer (selector))) - (and peer - (stack-pop! (scheduler-current-runqueue peer) #f)))))) - -(define (scheduler-work-pending? sched) - "Return @code{#t} if @var{sched} has any work pending: any runnable -fibers or any pending timeouts." - (not (and (psq-empty? (scheduler-timers sched)) - (stack-empty? (scheduler-current-runqueue sched)) - (stack-empty? (scheduler-next-runqueue sched))))) - -(define* (run-scheduler sched finished?) - "Run @var{sched} until there are no more fibers ready to run, no -file descriptors being waited on, and no more timers pending to run. -Return zero values." - (let ((tag (scheduler-prompt-tag sched)) - (runcount-box (scheduler-runcount-box sched)) - (next (scheduler-next-runqueue sched)) - (cur (scheduler-current-runqueue sched)) - (steal-fiber! (fiber-stealer sched))) - (define (run-fiber fiber) - (atomic-box-set! runcount-box - (logand (1+ (atomic-box-ref runcount-box)) #xffffFFFF)) - (call-with-prompt tag - (lambda () - (let ((thunk (fiber-continuation fiber))) - (set-fiber-continuation! fiber #f) - (thunk))) - (lambda (k after-suspend) - (set-fiber-continuation! fiber k) - (after-suspend fiber)))) - (let next-turn () - (unless (finished?) - (schedule-runnables-for-next-turn sched) - (stack-push-list! cur (reverse (stack-pop-all! next))) - (let next-fiber () - (match (stack-pop! cur #f) - (#f - (when (stack-empty? next) - ;; Both current and next runqueues are empty; steal a - ;; little bit of work from a remote scheduler if we - ;; can. Run it directly instead of pushing onto a - ;; queue to avoid double stealing. - (let ((fiber (steal-fiber!))) - (when fiber - (set-fiber-scheduler! fiber sched) - (run-fiber fiber)))) - (next-turn)) - (fiber - (run-fiber fiber) - (next-fiber)))))))) - -(define (destroy-scheduler sched) - "Release any resources associated with @var{sched}." - #; - (for-each kill-fiber (list-copy (scheduler-fibers sched))) - (epoll-destroy (scheduler-epfd sched))) - -(define (create-fiber sched thunk) - "Spawn a new fiber in @var{sched} with the continuation @var{thunk}. -The fiber will be scheduled on the next turn. @var{thunk} will run -with a copy of the current dynamic state, isolating fluid and -parameter mutations to the fiber." - (let* ((fiber (make-fiber sched #f)) - (thunk (let ((dynamic-state (current-dynamic-state))) - (lambda () - (with-dynamic-state - dynamic-state - (lambda () - (current-fiber fiber) - (catch #t - (lambda () - (%start-stack #t thunk)) - (lambda _ #f) - (let ((err (current-error-port))) - (lambda (key . args) - (false-if-exception - (let ((stack (make-stack #t 4))) - (format err "Uncaught exception in fiber #~a:\n" - (nameset-ref fibers-nameset fiber)) - (display-backtrace stack err) - (print-exception err (stack-ref stack 0) - key args)))))))))))) - (nameset-add! fibers-nameset fiber) - (schedule-fiber! fiber thunk))) - -(define (kill-fiber fiber) - "Try to kill @var{fiber}, causing it to raise an exception. Note -that this is currently unimplemented!" - (error "kill-fiber is unimplemented")) - -;; Shim for Guile 2.1.5. -(unless (defined? 'suspendable-continuation?) - (define! 'suspendable-continuation? (lambda (tag) #t))) - -;; The AFTER-SUSPEND thunk allows the user to suspend the current -;; fiber, saving its state, and then perform some other nonlocal -;; control flow. -;; -(define* (suspend-current-fiber #:optional - (after-suspend (lambda (fiber) #f))) - "Suspend the current fiber. Call the optional @var{after-suspend} -callback, if present, with the suspended thread as its argument." - (let ((tag (scheduler-prompt-tag (fiber-scheduler (current-fiber))))) - (unless (suspendable-continuation? tag) - (error "Attempt to suspend fiber within continuation barrier")) - ((abort-to-prompt tag after-suspend)))) - -(define* (resume-fiber fiber thunk) - "Resume @var{fiber}, adding it to the run queue of its scheduler. -The fiber will start by applying @var{thunk}. A fiber @emph{must} -only be resumed when it is suspended. This function is thread-safe -even if @var{fiber} is running on a remote scheduler." - (let ((cont (fiber-continuation fiber))) - (unless cont (error "invalid fiber" fiber)) - (schedule-fiber! fiber (lambda () (cont thunk))))) - -(define* (yield-current-fiber) - "Yield control to the current scheduler. Like -@code{suspend-current-fiber} followed directly by @code{resume-fiber}, -except that it avoids suspending if the current continuation isn't -suspendable. Returns @code{#t} if the yield succeeded, or @code{#f} -otherwise." - (match (current-fiber) - (#f #f) - (fiber - (let ((tag (scheduler-prompt-tag (fiber-scheduler fiber)))) - (and (suspendable-continuation? tag) - (begin - (abort-to-prompt tag (lambda (fiber) (resume-fiber fiber #f))) - #t)))))) - -(define (finalize-fd sources) - "Remove data associated with @var{fd} from the scheduler @var{ctx}. -Called by Guile just before Guile goes to close a file descriptor, in -response either to an explicit call to @code{close-port}, or because -the port became unreachable. In the latter case, this call may come -from a finalizer thread." - ;; When a file descriptor is closed, the kernel silently removes it - ;; from any associated epoll sets, so we don't need to do anything - ;; there. But because this call could come from any thread, - ;; especially given the fact that the fiber might be migrated, we - ;; have to operate locally, just nulling out the sources pair and - ;; not mucking with the sources table. - ;; - ;; FIXME: Wake all sources with EPOLLERR. - (set-cdr! sources '()) - (set-car! sources #f)) - -(define (add-fd-event-waiter sched fd events resume) - "Arrange to resume @var{fiber} when the file descriptor @var{fd} has -the given @var{events}, expressed as an epoll bitfield." - (let ((sources (hashv-ref (scheduler-sources sched) fd))) - (match sources - ((active-events . waiters) - (set-cdr! sources (acons events resume waiters)) - (unless (and active-events - (= (logand events active-events) events)) - (let ((active-events (logior events (or active-events 0)))) - (set-car! sources active-events) - (epoll-add*! (scheduler-epfd sched) fd - (logior active-events EPOLLONESHOT))))) - (#f - (let ((sources (list events (cons events resume)))) - (hashv-set! (scheduler-sources sched) fd sources) - (add-fdes-finalizer! fd (lambda (fd) (finalize-fd sources))) - (epoll-add*! (scheduler-epfd sched) fd - (logior events EPOLLONESHOT))))))) - -(define (resume-on-fd-events fd events fiber) - "Arrange to resume @var{fiber} when the file descriptor @var{fd} has -the given @var{events}, expressed as an epoll bitfield." - (add-fd-event-waiter (fiber-scheduler fiber) fd events - (lambda (revents) - (resume-fiber fiber (lambda () revents))))) - -(define (resume-on-readable-fd fd fiber) - "Arrange to resume @var{fiber} when the file descriptor @var{fd} -becomes readable." - (resume-on-fd-events fd (logior EPOLLIN EPOLLRDHUP) fiber)) - -(define (resume-on-writable-fd fd fiber) - "Arrange to resume @var{fiber} when the file descriptor @var{fd} -becomes writable." - (resume-on-fd-events fd EPOLLOUT fiber)) - -(define (add-timer sched expiry thunk) - "Arrange to call @var{thunk} when the absolute real time is greater -than or equal to @var{expiry}, expressed in internal time units." - (set-scheduler-timers! sched - (psq-set (scheduler-timers sched) - (cons expiry thunk) - expiry))) |
