diff options
| author | Andy Wingo <wingo@pobox.com> | 2017-02-15 21:12:21 +0100 |
|---|---|---|
| committer | Andy Wingo <wingo@pobox.com> | 2017-02-15 21:12:21 +0100 |
| commit | 2c97babd3b12a0c23053bb072ecefb0438fdec2a (patch) | |
| tree | 315b4a61321307ad2a69d7f31dd772b608c5d79e | |
| parent | Add epoll-add*! (diff) | |
| download | guile-fibers-2c97babd3b12a0c23053bb072ecefb0438fdec2a.tar.gz | |
Fix epoll-modify logic
Because this case wasn't being hit before, there were bugs.
* fibers/internal.scm (<scheduler>): Remove active fd count; we'll add a
#:forever? option to run-fibers instead. Simplify sources.
(make-scheduler): Adapt.
(schedule-fibers-for-fd): Use resume callbacks, to allow for other
kinds of callbacks.
(scheduler-poll-timeout): Don't wait for active fd count to go to
zero.
(finalize-fd): Avoid mucking with hash tables from this potentially-gc
callback.
(add-fd-event-waiter): New function.
(resume-on-fd-events): Call new function.
| -rw-r--r-- | fibers/internal.scm | 118 |
1 files changed, 52 insertions, 66 deletions
diff --git a/fibers/internal.scm b/fibers/internal.scm index dceb5bf..1345ae0 100644 --- a/fibers/internal.scm +++ b/fibers/internal.scm @@ -79,20 +79,19 @@ name is known." (nameset-ref fibers-nameset name)) (define-record-type <scheduler> - (%make-scheduler name epfd active-fd-count prompt-tag + (%make-scheduler name epfd prompt-tag next-runqueue current-runqueue sources timers kernel-thread remote-peers choose-parallel-scheduler) scheduler? (name scheduler-name set-scheduler-name!) (epfd scheduler-epfd) - (active-fd-count scheduler-active-fd-count set-scheduler-active-fd-count!) (prompt-tag scheduler-prompt-tag) ;; atomic stack of fiber to run next turn (reverse order) (next-runqueue scheduler-next-runqueue) ;; atomic stack of fiber to run this turn (current-runqueue scheduler-current-runqueue) - ;; fd -> ((total-events . min-expiry) #(events expiry fiber) ...) + ;; fd -> (total-events (events . resume-fn) ...) (sources scheduler-sources) ;; PSQ of thunk -> expiry (timers scheduler-timers set-scheduler-timers!) @@ -144,7 +143,6 @@ name is known." (prompt-tag (make-prompt-tag "fibers"))) "Make a new scheduler in which to run fibers." (let ((epfd (epoll-create)) - (active-fd-count 0) (next-runqueue (make-empty-stack)) (current-runqueue (make-empty-stack)) (sources (make-hash-table)) @@ -152,7 +150,7 @@ name is known." (((t1 . c1) (t2 . c2)) (< t1 t2))) <)) (kernel-thread (make-atomic-parameter #f))) - (let ((sched (%make-scheduler #f epfd active-fd-count prompt-tag + (let ((sched (%make-scheduler #f epfd prompt-tag next-runqueue current-runqueue sources timers kernel-thread #f #f))) @@ -193,11 +191,6 @@ thread." (define (choose-parallel-scheduler sched) ((scheduler-choose-parallel-scheduler sched))) -(define (make-source events expiry fiber) (vector events expiry fiber)) -(define (source-events s) (vector-ref s 0)) -(define (source-expiry s) (vector-ref s 1)) -(define (source-fiber s) (vector-ref s 2)) - (define current-fiber (make-parameter #f)) (define (current-fiber/public) "Return the current fiber, or @code{#f} if no fiber is current." @@ -222,30 +215,24 @@ thread." (define (schedule-fibers-for-fd fd revents sched) (match (hashv-ref (scheduler-sources sched) fd) (#f (warn "scheduler for unknown fd" fd)) - (sources - (set-scheduler-active-fd-count! sched - (1- (scheduler-active-fd-count sched))) - (for-each (lambda (source) - ;; FIXME: This fiber might have been woken up by - ;; another event. A moot point while file descriptor - ;; operations aren't proper CML operations, though. - (unless (zero? (logand revents - (logior (source-events source) EPOLLERR))) - ;; Fibers can't be stolen while they are in the - ;; sources table; the scheduler of the fiber must - ;; be SCHED, and so we are indeed responsible for - ;; resuming the fiber. - (resume-fiber (source-fiber source) (lambda () revents)))) - (cdr sources)) - (cond - ((not (zero? (logand revents EPOLLERR))) - (hashv-remove! (scheduler-sources sched) fd) - (epoll-remove! (scheduler-epfd sched) fd)) - (else - (set-cdr! sources '()) - ;; Reset active events and expiration time, respectively. - (set-car! (car sources) #f) - (set-cdr! (car sources) #f)))))) + ((and sources (active-events . waiters)) + ;; First, clear the active status, as the EPOLLONESHOT has + ;; deactivated our entry in the epoll set. + (set-car! sources #f) + (set-cdr! sources '()) + (unless (zero? (logand revents EPOLLERR)) + (hashv-remove! (scheduler-sources sched) fd)) + ;; Now resume or re-enqueue fibers, as appropriate. + (let lp ((waiters waiters)) + (match waiters + (() #f) + (((events . resume) . waiters) + (if (zero? (logand revents (logior events EPOLLERR))) + ;; Re-enqueue. + (add-fd-event-waiter sched fd events resume) + ;; Resume. + (resume revents)) + (lp waiters))))))) (define (scheduler-finished? sched finished?) (and (finished?) @@ -258,8 +245,7 @@ thread." 0) ((psq-empty? (scheduler-timers sched)) ;; Avoid sleeping if the scheduler is actually finished. - (let ((done? (and (finished?) (zero? (scheduler-active-fd-count sched))))) - (if done? 0 -1))) + (if (finished?) 0 -1)) (else (match (psq-min (scheduler-timers sched)) ((expiry . thunk) @@ -433,7 +419,7 @@ otherwise." (abort-to-prompt tag (lambda (fiber) (resume-fiber fiber #f))) #t)))))) -(define (finalize-fd sched fd) +(define (finalize-fd sources) "Remove data associated with @var{fd} from the scheduler @var{ctx}. Called by Guile just before Guile goes to close a file descriptor, in response either to an explicit call to @code{close-port}, or because @@ -441,41 +427,41 @@ the port became unreachable. In the latter case, this call may come from a finalizer thread." ;; When a file descriptor is closed, the kernel silently removes it ;; from any associated epoll sets, so we don't need to do anything - ;; there. + ;; there. But because this call could come from any thread, + ;; especially given the fact that the fiber might be migrated, we + ;; have to operate locally, just nulling out the sources pair and + ;; not mucking with the sources table. ;; - ;; FIXME: Take a lock on the sources table? ;; FIXME: Wake all sources with EPOLLERR. - (let ((sources-table (scheduler-sources sched))) - (when (hashv-ref sources-table fd) - (set-scheduler-active-fd-count! sched - (1- (scheduler-active-fd-count sched))) - (hashv-remove! sources-table fd)))) + (set-cdr! sources '()) + (set-car! sources #f)) + +(define (add-fd-event-waiter sched fd events resume) + "Arrange to resume @var{fiber} when the file descriptor @var{fd} has +the given @var{events}, expressed as an epoll bitfield." + (let ((sources (hashv-ref (scheduler-sources sched) fd))) + (match sources + ((active-events . waiters) + (set-cdr! sources (acons events resume waiters)) + (unless (and active-events + (= (logand events active-events) events)) + (let ((active-events (logior events (or active-events 0)))) + (set-car! sources active-events) + (epoll-add*! (scheduler-epfd sched) fd + (logior active-events EPOLLONESHOT))))) + (#f + (let ((sources (list events (cons events resume)))) + (hashv-set! (scheduler-sources sched) fd sources) + (add-fdes-finalizer! fd (lambda (fd) (finalize-fd sources))) + (epoll-add*! (scheduler-epfd sched) fd + (logior events EPOLLONESHOT))))))) (define (resume-on-fd-events fd events fiber) "Arrange to resume @var{fiber} when the file descriptor @var{fd} has the given @var{events}, expressed as an epoll bitfield." - (let* ((sched (fiber-scheduler fiber)) - (sources (hashv-ref (scheduler-sources sched) fd))) - (cond - (sources - (set-cdr! sources (cons (make-source events #f fiber) (cdr sources))) - (let ((active-events (caar sources))) - (unless active-events - (set-scheduler-active-fd-count! sched - (1+ (scheduler-active-fd-count sched)))) - (unless (and active-events - (= (logand events active-events) events)) - (set-car! (car sources) (logior events (or active-events 0))) - (epoll-modify! (scheduler-epfd sched) fd - (logior (caar sources) EPOLLONESHOT))))) - (else - (set-scheduler-active-fd-count! sched - (1+ (scheduler-active-fd-count sched))) - (hashv-set! (scheduler-sources sched) - fd (acons events #f - (list (make-source events #f fiber)))) - (add-fdes-finalizer! fd (lambda (fd) (finalize-fd sched fd))) - (epoll-add! (scheduler-epfd sched) fd (logior events EPOLLONESHOT)))))) + (add-fd-event-waiter (fiber-scheduler fiber) fd events + (lambda (revents) + (resume-fiber fiber (lambda () revents))))) (define (resume-on-readable-fd fd fiber) "Arrange to resume @var{fiber} when the file descriptor @var{fd} |
