diff options
| -rw-r--r-- | epoll.c | 27 | ||||
| -rw-r--r-- | fibers.scm | 30 | ||||
| -rw-r--r-- | fibers.texi | 15 | ||||
| -rw-r--r-- | fibers/epoll.scm | 43 | ||||
| -rw-r--r-- | fibers/internal.scm | 93 | ||||
| -rw-r--r-- | tests/basic.scm | 61 | ||||
| -rw-r--r-- | tests/speedup.scm | 5 |
7 files changed, 148 insertions, 126 deletions
@@ -130,17 +130,20 @@ do_epoll_wait (void *p) return NULL; } +static scm_t_uint64 time_units_per_millisecond; + /* Wait on the files whose descriptors were registered on EPFD, and write the resulting events in EVENTSV, a bytevector. Returns the number of struct epoll_event values that were written to EVENTSV, which may be zero if no files triggered wakeups within TIMEOUT - milliseconds. */ + internal time units. */ static SCM scm_primitive_epoll_wait (SCM epfd, SCM wakefd, SCM wokefd, SCM eventsv, SCM timeout) #define FUNC_NAME "primitive-epoll-wait" { - int c_epfd, c_wakefd, c_wokefd, maxevents, rv, c_timeout; + int c_epfd, c_wakefd, c_wokefd, maxevents, rv, millis; + scm_t_int64 c_timeout; struct epoll_event *events; c_epfd = scm_to_int (epfd); @@ -153,16 +156,24 @@ scm_primitive_epoll_wait (SCM epfd, SCM wakefd, SCM wokefd, events = (struct epoll_event *) SCM_BYTEVECTOR_CONTENTS (eventsv); maxevents = SCM_BYTEVECTOR_LENGTH (eventsv) / sizeof (*events); - c_timeout = scm_to_int (timeout); + c_timeout = scm_to_int64 (timeout); + if (c_timeout < 0) + millis = -1; + else + millis = c_timeout / time_units_per_millisecond; - if (scm_c_prepare_to_wait_on_fd (c_wakefd)) + if (millis != 0 && scm_c_prepare_to_wait_on_fd (c_wakefd)) rv = 0; else { - struct epoll_wait_data data = { c_epfd, events, maxevents, c_timeout }; - scm_without_guile (do_epoll_wait, &data); + struct epoll_wait_data data = { c_epfd, events, maxevents, millis }; + if (millis != 0) + scm_without_guile (do_epoll_wait, &data); + else + do_epoll_wait (&data); rv = data.rv; - scm_c_wait_finished (); + if (millis != 0) + scm_c_wait_finished (); if (rv < 0) { if (data.err == EINTR) @@ -207,6 +218,8 @@ scm_primitive_epoll_wait (SCM epfd, SCM wakefd, SCM wokefd, void init_fibers_epoll (void) { + time_units_per_millisecond = scm_c_time_units_per_second / 1000; + scm_c_define_gsubr ("primitive-epoll-wake", 1, 0, 0, scm_primitive_epoll_wake); scm_c_define_gsubr ("primitive-epoll-create", 1, 0, 0, @@ -55,19 +55,17 @@ affinity (with-scheduler scheduler - (parameterize ((current-read-waiter wait-for-readable) - (current-write-waiter wait-for-writable)) - (with-interrupts - hz - (let ((last-runcount 0)) - (lambda () - (let* ((runcount (scheduler-runcount scheduler)) - (res (eqv? runcount last-runcount))) - (set! last-runcount runcount) - res))) - yield-current-fiber + (with-interrupts + hz + (let ((last-runcount 0)) (lambda () - (run-scheduler scheduler finished?))))))) + (let* ((runcount (scheduler-runcount scheduler)) + (res (eqv? runcount last-runcount))) + (set! last-runcount runcount) + res))) + yield-current-fiber + (lambda () + (run-scheduler scheduler finished?)))))) (define (start-auxiliary-threads scheduler hz finished? affinities) (for-each (lambda (sched affinity) @@ -105,7 +103,8 @@ #:key (hz 100) (scheduler #f) (parallelism (current-processor-count)) (cpus (getaffinity 0)) - (install-suspendable-ports? #t)) + (install-suspendable-ports? #t) + (drain? #f)) (when install-suspendable-ports? (install-suspendable-ports!)) (cond (scheduler @@ -115,7 +114,10 @@ (else (let* ((scheduler (make-scheduler #:parallelism parallelism)) (ret (make-atomic-box #f)) - (finished? (lambda () (atomic-box-ref ret))) + (finished? (lambda () + (and (atomic-box-ref ret) + (or (not drain?) + (not (scheduler-work-pending? scheduler)))))) (affinities (compute-affinities cpus parallelism))) (unless init (error "run-fibers requires initial fiber thunk when creating sched")) diff --git a/fibers.texi b/fibers.texi index 27e7079..1549136 100644 --- a/fibers.texi +++ b/fibers.texi @@ -372,11 +372,6 @@ fully cooperative scheduling model. To enable expressive cross-kernel-thread communications, channel sends and receives are atomic and thread-safe. -To start scheduling fibers, user code will typically create a -scheduler, instate it on the thread, add some fibers, then run the -scheduler. That call to run the scheduler will only return when there -there are no more fibers waiting to be scheduled. - @node Parallelism @section Parallelism @@ -480,10 +475,10 @@ thread, use @code{run-fibers}. [#:scheduler=@code{#f}] @ [#:parallelism=@code{(current-processor-count)}] @ [#:cpus=@code{(getaffinity 0)}] @ - [#:hz=100] + [#:hz=@code{100}] [#:drain?=@code{#f}] Run @var{init-thunk} within a fiber in a fresh scheduler, blocking -until the scheduler has no more runnable fibers. Return the value(s) -returned by the call to @var{init-thunk}. +until @var{init-thunk} returns. Return the value(s) returned by the +call to @var{init-thunk}. For example: @example @@ -510,6 +505,10 @@ create one), you can pass it to @code{run-fibers} using the @code{#:scheduler} keyword argument. In that case the scheduler will not be destroyed when @code{run-fibers} finishes. +@code{run-fibers} will return when the @var{init-thunk} call returns. +To make it additionally wait until there are no more runnable fibers +or pending timeouts, specify the @code{#:drain? #t} keyword argument. + If @code{run-fibers} creates a scheduler on your behalf, it will arrange for a number of ``peer'' schedulers to also be created, up to a total scheduler count controlled by the @var{parallelism} keyword diff --git a/fibers/epoll.scm b/fibers/epoll.scm index d4674a7..55d88a3 100644 --- a/fibers/epoll.scm +++ b/fibers/epoll.scm @@ -150,24 +150,35 @@ epoll wait (if appropriate)." (set-epoll-eventsv! epoll v) v)))) -(define* (epoll epoll #:key (get-timeout (lambda () -1)) +(define* (epoll epoll #:key (expiry #f) + (update-expiry (lambda (expiry) expiry)) (folder epoll-default-folder) (seed '())) - (atomic-box-set! (epoll-state epoll) 'waiting) + (define (expiry->timeout expiry) + (cond + ((not expiry) -1) + (else + (let ((now (get-internal-real-time))) + (cond + ((< expiry now) 0) + (else (- expiry now))))))) (let* ((maxevents (epoll-maxevents epoll)) (eventsv (ensure-epoll-eventsv epoll maxevents)) (write-pipe-fd (fileno (epoll-wake-write-pipe epoll))) (read-pipe-fd (fileno (epoll-wake-read-pipe epoll))) - (n (primitive-epoll-wait (epoll-fd epoll) write-pipe-fd read-pipe-fd - eventsv (get-timeout)))) - ;; If we received `maxevents' events, it means that probably there - ;; are more active fd's in the queue that we were unable to - ;; receive. Expand our event buffer in that case. - (when (= n maxevents) - (set-epoll-maxevents! epoll (* maxevents 2))) - (atomic-box-set! (epoll-state epoll) 'not-waiting) - (let lp ((seed seed) (i 0)) - (if (< i n) - (let ((fd (bytevector-s32-native-ref eventsv (fd-offset i))) - (events (bytevector-u32-native-ref eventsv (events-offset i)))) - (lp (folder fd events seed) (1+ i))) - seed)))) + (timeout (expiry->timeout (update-expiry expiry)))) + (atomic-box-set! (epoll-state epoll) 'waiting) + (let ((n (primitive-epoll-wait (epoll-fd epoll) + write-pipe-fd read-pipe-fd + eventsv timeout))) + (atomic-box-set! (epoll-state epoll) 'not-waiting) + ;; If we received `maxevents' events, it means that probably there + ;; are more active fd's in the queue that we were unable to + ;; receive. Expand our event buffer in that case. + (when (= n maxevents) + (set-epoll-maxevents! epoll (* maxevents 2))) + (let lp ((seed seed) (i 0)) + (if (< i n) + (let ((fd (bytevector-s32-native-ref eventsv (fd-offset i))) + (events (bytevector-u32-native-ref eventsv (events-offset i)))) + (lp (folder fd events seed) (1+ i))) + seed))))) diff --git a/fibers/internal.scm b/fibers/internal.scm index 31b17b8..79b1011 100644 --- a/fibers/internal.scm +++ b/fibers/internal.scm @@ -35,6 +35,7 @@ scheduler-runcount (scheduler-kernel-thread/public . scheduler-kernel-thread) scheduler-remote-peers + scheduler-work-pending? choose-parallel-scheduler run-scheduler destroy-scheduler @@ -243,27 +244,6 @@ thread." (resume revents)) (lp waiters))))))) -(define (scheduler-finished? sched finished?) - (and (finished?) - (psq-empty? (scheduler-timers sched)))) - -(define (scheduler-poll-timeout sched finished?) - (cond - ((not (stack-empty? (scheduler-next-runqueue sched))) - ;; Don't sleep if there are fibers in the runqueue already. - 0) - ((psq-empty? (scheduler-timers sched)) - ;; Avoid sleeping if the scheduler is actually finished. - (if (finished?) 0 -1)) - (else - (match (psq-min (scheduler-timers sched)) - ((expiry . thunk) - (let ((now (get-internal-real-time))) - (if (< expiry now) - 0 - (round/ (- expiry now) - internal-time-units-per-millisecond)))))))) - (define (run-timers sched) ;; Run expired timer thunks in the order that they expired. (let ((now (get-internal-real-time))) @@ -279,7 +259,7 @@ thread." (thunk) (run-timers timers))))))))) -(define (schedule-runnables-for-next-turn sched finished?) +(define (schedule-runnables-for-next-turn sched) ;; Called when all runnables from the current turn have been run. ;; Note that there may be runnables already scheduled for the next ;; turn; one way this can happen is if a fiber suspended itself @@ -288,11 +268,24 @@ thread." ;; In any case, check the kernel to see if any of the fd's that we ;; are interested in are active, and in that case schedule their ;; corresponding fibers. Also run any timers that have timed out. + (define (timers-expiry timers) + (and (not (psq-empty? timers)) + (match (psq-min timers) + ((expiry . thunk) + expiry)))) + (define (update-expiry expiry) + ;; If there are pending runnables, cause epoll to return + ;; immediately. + (if (stack-empty? (scheduler-next-runqueue sched)) + expiry + 0)) (epoll (scheduler-epfd sched) - #:get-timeout (lambda () (scheduler-poll-timeout sched finished?)) - #:folder (lambda (fd revents seed) + #:expiry (timers-expiry (scheduler-timers sched)) + #:update-expiry update-expiry + #:folder (lambda (fd revents sched) (schedule-fibers-for-fd fd revents sched) - seed)) + sched) + #:seed sched) (run-timers sched)) (define (fiber-stealer sched) @@ -305,6 +298,13 @@ stolen." (and peer (stack-pop! (scheduler-current-runqueue peer) #f)))))) +(define (scheduler-work-pending? sched) + "Return @code{#t} if @var{sched} has any work pending: any runnable +fibers or any pending timeouts." + (not (and (psq-empty? (scheduler-timers sched)) + (stack-empty? (scheduler-current-runqueue sched)) + (stack-empty? (scheduler-next-runqueue sched))))) + (define* (run-scheduler sched finished?) "Run @var{sched} until there are no more fibers ready to run, no file descriptors being waited on, and no more timers pending to run. @@ -326,30 +326,25 @@ Return zero values." (set-fiber-continuation! fiber k) (after-suspend fiber)))) (let next-turn () - (schedule-runnables-for-next-turn sched finished?) - (stack-push-list! cur (reverse (stack-pop-all! next))) - (let next-fiber () - (match (stack-pop! cur #f) - (#f - (cond - ((stack-empty? next) - ;; Both current and next runqueues are empty; steal a - ;; little bit of work from a remote scheduler if we - ;; can. Run it directly instead of pushing onto a - ;; queue to avoid double stealing. - (match (steal-fiber!) - (#f - (unless (scheduler-finished? sched finished?) - (next-turn))) - (fiber - (set-fiber-scheduler! fiber sched) - (run-fiber fiber) - (next-turn)))) - (else - (next-turn)))) - (fiber - (run-fiber fiber) - (next-fiber))))))) + (unless (finished?) + (schedule-runnables-for-next-turn sched) + (stack-push-list! cur (reverse (stack-pop-all! next))) + (let next-fiber () + (match (stack-pop! cur #f) + (#f + (when (stack-empty? next) + ;; Both current and next runqueues are empty; steal a + ;; little bit of work from a remote scheduler if we + ;; can. Run it directly instead of pushing onto a + ;; queue to avoid double stealing. + (let ((fiber (steal-fiber!))) + (when fiber + (set-fiber-scheduler! fiber sched) + (run-fiber fiber)))) + (next-turn)) + (fiber + (run-fiber fiber) + (next-fiber)))))))) (define (destroy-scheduler sched) "Release any resources associated with @var{sched}." diff --git a/tests/basic.scm b/tests/basic.scm index dbc0e50..927d4c5 100644 --- a/tests/basic.scm +++ b/tests/basic.scm @@ -42,7 +42,7 @@ (define-syntax-rule (assert-run-fibers-terminates exp kw ...) (begin - (format #t "assert run-fibers on ~s terminates: " 'exp) + (format #t "assert terminates: ~s: " '(run-fibers (lambda () exp) kw ...)) (force-output) (let ((start (get-internal-real-time))) (call-with-values (lambda () (run-fibers (lambda () exp) kw ...)) @@ -53,7 +53,8 @@ (define-syntax-rule (assert-run-fibers-returns (expected ...) exp) (begin - (call-with-values (lambda () (assert-run-fibers-terminates exp)) + (call-with-values (lambda () + (assert-run-fibers-terminates exp #:drain? #t)) (lambda run-fiber-return-vals (assert-equal '(expected ...) run-fiber-return-vals))))) @@ -66,25 +67,25 @@ (assert-equal #f #f) (assert-terminates #t) (assert-equal #f (false-if-exception (begin (run-fibers) #t))) -(assert-run-fibers-terminates (sleep 1)) -(assert-run-fibers-terminates (do-times 1 (spawn-fiber (lambda () #t)))) -(assert-run-fibers-terminates (do-times 10 (spawn-fiber (lambda () #t)))) -(assert-run-fibers-terminates (do-times 100 (spawn-fiber (lambda () #t)))) -(assert-run-fibers-terminates (do-times 1000 (spawn-fiber (lambda () #t)))) -(assert-run-fibers-terminates (do-times 10000 (spawn-fiber (lambda () #t)))) -(assert-run-fibers-terminates (do-times 100000 (spawn-fiber (lambda () #t)))) +(assert-run-fibers-terminates (sleep 1) #:drain? #t) +(assert-run-fibers-terminates (do-times 1 (spawn-fiber (lambda () #t))) #:drain? #t) +(assert-run-fibers-terminates (do-times 10 (spawn-fiber (lambda () #t))) #:drain? #t) +(assert-run-fibers-terminates (do-times 100 (spawn-fiber (lambda () #t))) #:drain? #t) +(assert-run-fibers-terminates (do-times 1000 (spawn-fiber (lambda () #t))) #:drain? #t) +(assert-run-fibers-terminates (do-times 10000 (spawn-fiber (lambda () #t))) #:drain? #t) +(assert-run-fibers-terminates (do-times 100000 (spawn-fiber (lambda () #t))) #:drain? #t) (assert-run-fibers-terminates (do-times 100000 - (spawn-fiber (lambda () #t) #:parallel? #t))) + (spawn-fiber (lambda () #t) #:parallel? #t)) #:drain? #t) (define (loop-to-1e4) (let lp ((i 0)) (when (< i #e1e4) (lp (1+ i))))) -(assert-run-fibers-terminates (do-times 100000 (spawn-fiber loop-to-1e4))) -(assert-run-fibers-terminates (do-times 100000 (spawn-fiber loop-to-1e4 #:parallel? #t))) -(assert-run-fibers-terminates (do-times 1 (spawn-fiber (lambda () (sleep 1))))) -(assert-run-fibers-terminates (do-times 10 (spawn-fiber (lambda () (sleep 1))))) -(assert-run-fibers-terminates (do-times 100 (spawn-fiber (lambda () (sleep 1))))) -(assert-run-fibers-terminates (do-times 1000 (spawn-fiber (lambda () (sleep 1))))) -(assert-run-fibers-terminates (do-times 10000 (spawn-fiber (lambda () (sleep 1))))) -(assert-run-fibers-terminates (do-times 20000 (spawn-fiber (lambda () (sleep 1))))) -(assert-run-fibers-terminates (do-times 40000 (spawn-fiber (lambda () (sleep 1))))) +(assert-run-fibers-terminates (do-times 100000 (spawn-fiber loop-to-1e4)) #:drain? #t) +(assert-run-fibers-terminates (do-times 100000 (spawn-fiber loop-to-1e4 #:parallel? #t)) #:drain? #t) +(assert-run-fibers-terminates (do-times 1 (spawn-fiber (lambda () (sleep 1)))) #:drain? #t) +(assert-run-fibers-terminates (do-times 10 (spawn-fiber (lambda () (sleep 1)))) #:drain? #t) +(assert-run-fibers-terminates (do-times 100 (spawn-fiber (lambda () (sleep 1)))) #:drain? #t) +(assert-run-fibers-terminates (do-times 1000 (spawn-fiber (lambda () (sleep 1)))) #:drain? #t) +(assert-run-fibers-terminates (do-times 10000 (spawn-fiber (lambda () (sleep 1)))) #:drain? #t) +(assert-run-fibers-terminates (do-times 20000 (spawn-fiber (lambda () (sleep 1)))) #:drain? #t) +(assert-run-fibers-terminates (do-times 40000 (spawn-fiber (lambda () (sleep 1)))) #:drain? #t) (define (spawn-fiber-tree n leaf) (do-times n (spawn-fiber @@ -92,20 +93,20 @@ (if (= n 1) (leaf) (spawn-fiber-tree (1- n) leaf)))))) -(assert-run-fibers-terminates (spawn-fiber-tree 5 (lambda () (sleep 1)))) +(assert-run-fibers-terminates (spawn-fiber-tree 5 (lambda () (sleep 1))) #:drain? #t) (define (spawn-fiber-chain n) (spawn-fiber (lambda () (unless (zero? (1- n)) (spawn-fiber-chain (1- n)))))) -(assert-run-fibers-terminates (spawn-fiber-chain 5)) -(assert-run-fibers-terminates (spawn-fiber-chain 50)) -(assert-run-fibers-terminates (spawn-fiber-chain 500)) -(assert-run-fibers-terminates (spawn-fiber-chain 5000)) -(assert-run-fibers-terminates (spawn-fiber-chain 50000)) -(assert-run-fibers-terminates (spawn-fiber-chain 500000)) -(assert-run-fibers-terminates (spawn-fiber-chain 5000000)) +(assert-run-fibers-terminates (spawn-fiber-chain 5) #:drain? #t) +(assert-run-fibers-terminates (spawn-fiber-chain 50) #:drain? #t) +(assert-run-fibers-terminates (spawn-fiber-chain 500) #:drain? #t) +(assert-run-fibers-terminates (spawn-fiber-chain 5000) #:drain? #t) +(assert-run-fibers-terminates (spawn-fiber-chain 50000) #:drain? #t) +(assert-run-fibers-terminates (spawn-fiber-chain 500000) #:drain? #t) +(assert-run-fibers-terminates (spawn-fiber-chain 5000000) #:drain? #t) (let ((run-order 0)) (define (test-run-order count) @@ -116,7 +117,7 @@ (error "bad run order" run-order n)) (set! run-order (1+ n))))) (iota count))) - (assert-run-fibers-terminates (test-run-order 10) #:parallelism 1)) + (assert-run-fibers-terminates (test-run-order 10) #:parallelism 1 #:drain? #t)) (let ((run-order 0)) (define (test-wakeup-order count) @@ -127,7 +128,7 @@ (error "bad run order" run-order n)) (set! run-order (1+ n))))) (iota count))) - (assert-run-fibers-terminates (test-wakeup-order 10) #:parallelism 1)) + (assert-run-fibers-terminates (test-wakeup-order 10) #:parallelism 1 #:drain? #t)) (assert-run-fibers-returns (1) 1) @@ -143,7 +144,7 @@ (set! failed? (< elapsed timeout))))))) (assert-run-fibers-terminates - (do-times 20 (check-sleep (random 1.0)))) + (do-times 20 (check-sleep (random 1.0))) #:drain? #t) ;; exceptions diff --git a/tests/speedup.scm b/tests/speedup.scm index ea67d6a..6dfe969 100644 --- a/tests/speedup.scm +++ b/tests/speedup.scm @@ -40,9 +40,10 @@ (format #t "speedup for ~s: " 'exp) (force-output) (let ((thunk (lambda () exp))) - (let ((t1 (time (lambda () (run-fibers thunk #:parallelism 1))))) + (let ((t1 (time (lambda () + (run-fibers thunk #:parallelism 1 #:drain? #t))))) (format #t "~a s" t1) - (let ((t2 (time (lambda () (run-fibers thunk))))) + (let ((t2 (time (lambda () (run-fibers thunk #:drain? #t))))) (format #t " / ~a s = ~ax (~a cpus)\n" t2 (/ t1 t2) (current-processor-count))))))) |
