summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--epoll.c27
-rw-r--r--fibers.scm30
-rw-r--r--fibers.texi15
-rw-r--r--fibers/epoll.scm43
-rw-r--r--fibers/internal.scm93
-rw-r--r--tests/basic.scm61
-rw-r--r--tests/speedup.scm5
7 files changed, 148 insertions, 126 deletions
diff --git a/epoll.c b/epoll.c
index dd6fa6b..d2df39f 100644
--- a/epoll.c
+++ b/epoll.c
@@ -130,17 +130,20 @@ do_epoll_wait (void *p)
return NULL;
}
+static scm_t_uint64 time_units_per_millisecond;
+
/* Wait on the files whose descriptors were registered on EPFD, and
write the resulting events in EVENTSV, a bytevector. Returns the
number of struct epoll_event values that were written to EVENTSV,
which may be zero if no files triggered wakeups within TIMEOUT
- milliseconds. */
+ internal time units. */
static SCM
scm_primitive_epoll_wait (SCM epfd, SCM wakefd, SCM wokefd,
SCM eventsv, SCM timeout)
#define FUNC_NAME "primitive-epoll-wait"
{
- int c_epfd, c_wakefd, c_wokefd, maxevents, rv, c_timeout;
+ int c_epfd, c_wakefd, c_wokefd, maxevents, rv, millis;
+ scm_t_int64 c_timeout;
struct epoll_event *events;
c_epfd = scm_to_int (epfd);
@@ -153,16 +156,24 @@ scm_primitive_epoll_wait (SCM epfd, SCM wakefd, SCM wokefd,
events = (struct epoll_event *) SCM_BYTEVECTOR_CONTENTS (eventsv);
maxevents = SCM_BYTEVECTOR_LENGTH (eventsv) / sizeof (*events);
- c_timeout = scm_to_int (timeout);
+ c_timeout = scm_to_int64 (timeout);
+ if (c_timeout < 0)
+ millis = -1;
+ else
+ millis = c_timeout / time_units_per_millisecond;
- if (scm_c_prepare_to_wait_on_fd (c_wakefd))
+ if (millis != 0 && scm_c_prepare_to_wait_on_fd (c_wakefd))
rv = 0;
else
{
- struct epoll_wait_data data = { c_epfd, events, maxevents, c_timeout };
- scm_without_guile (do_epoll_wait, &data);
+ struct epoll_wait_data data = { c_epfd, events, maxevents, millis };
+ if (millis != 0)
+ scm_without_guile (do_epoll_wait, &data);
+ else
+ do_epoll_wait (&data);
rv = data.rv;
- scm_c_wait_finished ();
+ if (millis != 0)
+ scm_c_wait_finished ();
if (rv < 0)
{
if (data.err == EINTR)
@@ -207,6 +218,8 @@ scm_primitive_epoll_wait (SCM epfd, SCM wakefd, SCM wokefd,
void
init_fibers_epoll (void)
{
+ time_units_per_millisecond = scm_c_time_units_per_second / 1000;
+
scm_c_define_gsubr ("primitive-epoll-wake", 1, 0, 0,
scm_primitive_epoll_wake);
scm_c_define_gsubr ("primitive-epoll-create", 1, 0, 0,
diff --git a/fibers.scm b/fibers.scm
index ee02f67..2aa9354 100644
--- a/fibers.scm
+++ b/fibers.scm
@@ -55,19 +55,17 @@
affinity
(with-scheduler
scheduler
- (parameterize ((current-read-waiter wait-for-readable)
- (current-write-waiter wait-for-writable))
- (with-interrupts
- hz
- (let ((last-runcount 0))
- (lambda ()
- (let* ((runcount (scheduler-runcount scheduler))
- (res (eqv? runcount last-runcount)))
- (set! last-runcount runcount)
- res)))
- yield-current-fiber
+ (with-interrupts
+ hz
+ (let ((last-runcount 0))
(lambda ()
- (run-scheduler scheduler finished?)))))))
+ (let* ((runcount (scheduler-runcount scheduler))
+ (res (eqv? runcount last-runcount)))
+ (set! last-runcount runcount)
+ res)))
+ yield-current-fiber
+ (lambda ()
+ (run-scheduler scheduler finished?))))))
(define (start-auxiliary-threads scheduler hz finished? affinities)
(for-each (lambda (sched affinity)
@@ -105,7 +103,8 @@
#:key (hz 100) (scheduler #f)
(parallelism (current-processor-count))
(cpus (getaffinity 0))
- (install-suspendable-ports? #t))
+ (install-suspendable-ports? #t)
+ (drain? #f))
(when install-suspendable-ports? (install-suspendable-ports!))
(cond
(scheduler
@@ -115,7 +114,10 @@
(else
(let* ((scheduler (make-scheduler #:parallelism parallelism))
(ret (make-atomic-box #f))
- (finished? (lambda () (atomic-box-ref ret)))
+ (finished? (lambda ()
+ (and (atomic-box-ref ret)
+ (or (not drain?)
+ (not (scheduler-work-pending? scheduler))))))
(affinities (compute-affinities cpus parallelism)))
(unless init
(error "run-fibers requires initial fiber thunk when creating sched"))
diff --git a/fibers.texi b/fibers.texi
index 27e7079..1549136 100644
--- a/fibers.texi
+++ b/fibers.texi
@@ -372,11 +372,6 @@ fully cooperative scheduling model.
To enable expressive cross-kernel-thread communications, channel sends
and receives are atomic and thread-safe.
-To start scheduling fibers, user code will typically create a
-scheduler, instate it on the thread, add some fibers, then run the
-scheduler. That call to run the scheduler will only return when there
-there are no more fibers waiting to be scheduled.
-
@node Parallelism
@section Parallelism
@@ -480,10 +475,10 @@ thread, use @code{run-fibers}.
[#:scheduler=@code{#f}] @
[#:parallelism=@code{(current-processor-count)}] @
[#:cpus=@code{(getaffinity 0)}] @
- [#:hz=100]
+ [#:hz=@code{100}] [#:drain?=@code{#f}]
Run @var{init-thunk} within a fiber in a fresh scheduler, blocking
-until the scheduler has no more runnable fibers. Return the value(s)
-returned by the call to @var{init-thunk}.
+until @var{init-thunk} returns. Return the value(s) returned by the
+call to @var{init-thunk}.
For example:
@example
@@ -510,6 +505,10 @@ create one), you can pass it to @code{run-fibers} using the
@code{#:scheduler} keyword argument. In that case the scheduler will
not be destroyed when @code{run-fibers} finishes.
+@code{run-fibers} will return when the @var{init-thunk} call returns.
+To make it additionally wait until there are no more runnable fibers
+or pending timeouts, specify the @code{#:drain? #t} keyword argument.
+
If @code{run-fibers} creates a scheduler on your behalf, it will
arrange for a number of ``peer'' schedulers to also be created, up to
a total scheduler count controlled by the @var{parallelism} keyword
diff --git a/fibers/epoll.scm b/fibers/epoll.scm
index d4674a7..55d88a3 100644
--- a/fibers/epoll.scm
+++ b/fibers/epoll.scm
@@ -150,24 +150,35 @@ epoll wait (if appropriate)."
(set-epoll-eventsv! epoll v)
v))))
-(define* (epoll epoll #:key (get-timeout (lambda () -1))
+(define* (epoll epoll #:key (expiry #f)
+ (update-expiry (lambda (expiry) expiry))
(folder epoll-default-folder) (seed '()))
- (atomic-box-set! (epoll-state epoll) 'waiting)
+ (define (expiry->timeout expiry)
+ (cond
+ ((not expiry) -1)
+ (else
+ (let ((now (get-internal-real-time)))
+ (cond
+ ((< expiry now) 0)
+ (else (- expiry now)))))))
(let* ((maxevents (epoll-maxevents epoll))
(eventsv (ensure-epoll-eventsv epoll maxevents))
(write-pipe-fd (fileno (epoll-wake-write-pipe epoll)))
(read-pipe-fd (fileno (epoll-wake-read-pipe epoll)))
- (n (primitive-epoll-wait (epoll-fd epoll) write-pipe-fd read-pipe-fd
- eventsv (get-timeout))))
- ;; If we received `maxevents' events, it means that probably there
- ;; are more active fd's in the queue that we were unable to
- ;; receive. Expand our event buffer in that case.
- (when (= n maxevents)
- (set-epoll-maxevents! epoll (* maxevents 2)))
- (atomic-box-set! (epoll-state epoll) 'not-waiting)
- (let lp ((seed seed) (i 0))
- (if (< i n)
- (let ((fd (bytevector-s32-native-ref eventsv (fd-offset i)))
- (events (bytevector-u32-native-ref eventsv (events-offset i))))
- (lp (folder fd events seed) (1+ i)))
- seed))))
+ (timeout (expiry->timeout (update-expiry expiry))))
+ (atomic-box-set! (epoll-state epoll) 'waiting)
+ (let ((n (primitive-epoll-wait (epoll-fd epoll)
+ write-pipe-fd read-pipe-fd
+ eventsv timeout)))
+ (atomic-box-set! (epoll-state epoll) 'not-waiting)
+ ;; If we received `maxevents' events, it means that probably there
+ ;; are more active fd's in the queue that we were unable to
+ ;; receive. Expand our event buffer in that case.
+ (when (= n maxevents)
+ (set-epoll-maxevents! epoll (* maxevents 2)))
+ (let lp ((seed seed) (i 0))
+ (if (< i n)
+ (let ((fd (bytevector-s32-native-ref eventsv (fd-offset i)))
+ (events (bytevector-u32-native-ref eventsv (events-offset i))))
+ (lp (folder fd events seed) (1+ i)))
+ seed)))))
diff --git a/fibers/internal.scm b/fibers/internal.scm
index 31b17b8..79b1011 100644
--- a/fibers/internal.scm
+++ b/fibers/internal.scm
@@ -35,6 +35,7 @@
scheduler-runcount
(scheduler-kernel-thread/public . scheduler-kernel-thread)
scheduler-remote-peers
+ scheduler-work-pending?
choose-parallel-scheduler
run-scheduler
destroy-scheduler
@@ -243,27 +244,6 @@ thread."
(resume revents))
(lp waiters)))))))
-(define (scheduler-finished? sched finished?)
- (and (finished?)
- (psq-empty? (scheduler-timers sched))))
-
-(define (scheduler-poll-timeout sched finished?)
- (cond
- ((not (stack-empty? (scheduler-next-runqueue sched)))
- ;; Don't sleep if there are fibers in the runqueue already.
- 0)
- ((psq-empty? (scheduler-timers sched))
- ;; Avoid sleeping if the scheduler is actually finished.
- (if (finished?) 0 -1))
- (else
- (match (psq-min (scheduler-timers sched))
- ((expiry . thunk)
- (let ((now (get-internal-real-time)))
- (if (< expiry now)
- 0
- (round/ (- expiry now)
- internal-time-units-per-millisecond))))))))
-
(define (run-timers sched)
;; Run expired timer thunks in the order that they expired.
(let ((now (get-internal-real-time)))
@@ -279,7 +259,7 @@ thread."
(thunk)
(run-timers timers)))))))))
-(define (schedule-runnables-for-next-turn sched finished?)
+(define (schedule-runnables-for-next-turn sched)
;; Called when all runnables from the current turn have been run.
;; Note that there may be runnables already scheduled for the next
;; turn; one way this can happen is if a fiber suspended itself
@@ -288,11 +268,24 @@ thread."
;; In any case, check the kernel to see if any of the fd's that we
;; are interested in are active, and in that case schedule their
;; corresponding fibers. Also run any timers that have timed out.
+ (define (timers-expiry timers)
+ (and (not (psq-empty? timers))
+ (match (psq-min timers)
+ ((expiry . thunk)
+ expiry))))
+ (define (update-expiry expiry)
+ ;; If there are pending runnables, cause epoll to return
+ ;; immediately.
+ (if (stack-empty? (scheduler-next-runqueue sched))
+ expiry
+ 0))
(epoll (scheduler-epfd sched)
- #:get-timeout (lambda () (scheduler-poll-timeout sched finished?))
- #:folder (lambda (fd revents seed)
+ #:expiry (timers-expiry (scheduler-timers sched))
+ #:update-expiry update-expiry
+ #:folder (lambda (fd revents sched)
(schedule-fibers-for-fd fd revents sched)
- seed))
+ sched)
+ #:seed sched)
(run-timers sched))
(define (fiber-stealer sched)
@@ -305,6 +298,13 @@ stolen."
(and peer
(stack-pop! (scheduler-current-runqueue peer) #f))))))
+(define (scheduler-work-pending? sched)
+ "Return @code{#t} if @var{sched} has any work pending: any runnable
+fibers or any pending timeouts."
+ (not (and (psq-empty? (scheduler-timers sched))
+ (stack-empty? (scheduler-current-runqueue sched))
+ (stack-empty? (scheduler-next-runqueue sched)))))
+
(define* (run-scheduler sched finished?)
"Run @var{sched} until there are no more fibers ready to run, no
file descriptors being waited on, and no more timers pending to run.
@@ -326,30 +326,25 @@ Return zero values."
(set-fiber-continuation! fiber k)
(after-suspend fiber))))
(let next-turn ()
- (schedule-runnables-for-next-turn sched finished?)
- (stack-push-list! cur (reverse (stack-pop-all! next)))
- (let next-fiber ()
- (match (stack-pop! cur #f)
- (#f
- (cond
- ((stack-empty? next)
- ;; Both current and next runqueues are empty; steal a
- ;; little bit of work from a remote scheduler if we
- ;; can. Run it directly instead of pushing onto a
- ;; queue to avoid double stealing.
- (match (steal-fiber!)
- (#f
- (unless (scheduler-finished? sched finished?)
- (next-turn)))
- (fiber
- (set-fiber-scheduler! fiber sched)
- (run-fiber fiber)
- (next-turn))))
- (else
- (next-turn))))
- (fiber
- (run-fiber fiber)
- (next-fiber)))))))
+ (unless (finished?)
+ (schedule-runnables-for-next-turn sched)
+ (stack-push-list! cur (reverse (stack-pop-all! next)))
+ (let next-fiber ()
+ (match (stack-pop! cur #f)
+ (#f
+ (when (stack-empty? next)
+ ;; Both current and next runqueues are empty; steal a
+ ;; little bit of work from a remote scheduler if we
+ ;; can. Run it directly instead of pushing onto a
+ ;; queue to avoid double stealing.
+ (let ((fiber (steal-fiber!)))
+ (when fiber
+ (set-fiber-scheduler! fiber sched)
+ (run-fiber fiber))))
+ (next-turn))
+ (fiber
+ (run-fiber fiber)
+ (next-fiber))))))))
(define (destroy-scheduler sched)
"Release any resources associated with @var{sched}."
diff --git a/tests/basic.scm b/tests/basic.scm
index dbc0e50..927d4c5 100644
--- a/tests/basic.scm
+++ b/tests/basic.scm
@@ -42,7 +42,7 @@
(define-syntax-rule (assert-run-fibers-terminates exp kw ...)
(begin
- (format #t "assert run-fibers on ~s terminates: " 'exp)
+ (format #t "assert terminates: ~s: " '(run-fibers (lambda () exp) kw ...))
(force-output)
(let ((start (get-internal-real-time)))
(call-with-values (lambda () (run-fibers (lambda () exp) kw ...))
@@ -53,7 +53,8 @@
(define-syntax-rule (assert-run-fibers-returns (expected ...) exp)
(begin
- (call-with-values (lambda () (assert-run-fibers-terminates exp))
+ (call-with-values (lambda ()
+ (assert-run-fibers-terminates exp #:drain? #t))
(lambda run-fiber-return-vals
(assert-equal '(expected ...) run-fiber-return-vals)))))
@@ -66,25 +67,25 @@
(assert-equal #f #f)
(assert-terminates #t)
(assert-equal #f (false-if-exception (begin (run-fibers) #t)))
-(assert-run-fibers-terminates (sleep 1))
-(assert-run-fibers-terminates (do-times 1 (spawn-fiber (lambda () #t))))
-(assert-run-fibers-terminates (do-times 10 (spawn-fiber (lambda () #t))))
-(assert-run-fibers-terminates (do-times 100 (spawn-fiber (lambda () #t))))
-(assert-run-fibers-terminates (do-times 1000 (spawn-fiber (lambda () #t))))
-(assert-run-fibers-terminates (do-times 10000 (spawn-fiber (lambda () #t))))
-(assert-run-fibers-terminates (do-times 100000 (spawn-fiber (lambda () #t))))
+(assert-run-fibers-terminates (sleep 1) #:drain? #t)
+(assert-run-fibers-terminates (do-times 1 (spawn-fiber (lambda () #t))) #:drain? #t)
+(assert-run-fibers-terminates (do-times 10 (spawn-fiber (lambda () #t))) #:drain? #t)
+(assert-run-fibers-terminates (do-times 100 (spawn-fiber (lambda () #t))) #:drain? #t)
+(assert-run-fibers-terminates (do-times 1000 (spawn-fiber (lambda () #t))) #:drain? #t)
+(assert-run-fibers-terminates (do-times 10000 (spawn-fiber (lambda () #t))) #:drain? #t)
+(assert-run-fibers-terminates (do-times 100000 (spawn-fiber (lambda () #t))) #:drain? #t)
(assert-run-fibers-terminates (do-times 100000
- (spawn-fiber (lambda () #t) #:parallel? #t)))
+ (spawn-fiber (lambda () #t) #:parallel? #t)) #:drain? #t)
(define (loop-to-1e4) (let lp ((i 0)) (when (< i #e1e4) (lp (1+ i)))))
-(assert-run-fibers-terminates (do-times 100000 (spawn-fiber loop-to-1e4)))
-(assert-run-fibers-terminates (do-times 100000 (spawn-fiber loop-to-1e4 #:parallel? #t)))
-(assert-run-fibers-terminates (do-times 1 (spawn-fiber (lambda () (sleep 1)))))
-(assert-run-fibers-terminates (do-times 10 (spawn-fiber (lambda () (sleep 1)))))
-(assert-run-fibers-terminates (do-times 100 (spawn-fiber (lambda () (sleep 1)))))
-(assert-run-fibers-terminates (do-times 1000 (spawn-fiber (lambda () (sleep 1)))))
-(assert-run-fibers-terminates (do-times 10000 (spawn-fiber (lambda () (sleep 1)))))
-(assert-run-fibers-terminates (do-times 20000 (spawn-fiber (lambda () (sleep 1)))))
-(assert-run-fibers-terminates (do-times 40000 (spawn-fiber (lambda () (sleep 1)))))
+(assert-run-fibers-terminates (do-times 100000 (spawn-fiber loop-to-1e4)) #:drain? #t)
+(assert-run-fibers-terminates (do-times 100000 (spawn-fiber loop-to-1e4 #:parallel? #t)) #:drain? #t)
+(assert-run-fibers-terminates (do-times 1 (spawn-fiber (lambda () (sleep 1)))) #:drain? #t)
+(assert-run-fibers-terminates (do-times 10 (spawn-fiber (lambda () (sleep 1)))) #:drain? #t)
+(assert-run-fibers-terminates (do-times 100 (spawn-fiber (lambda () (sleep 1)))) #:drain? #t)
+(assert-run-fibers-terminates (do-times 1000 (spawn-fiber (lambda () (sleep 1)))) #:drain? #t)
+(assert-run-fibers-terminates (do-times 10000 (spawn-fiber (lambda () (sleep 1)))) #:drain? #t)
+(assert-run-fibers-terminates (do-times 20000 (spawn-fiber (lambda () (sleep 1)))) #:drain? #t)
+(assert-run-fibers-terminates (do-times 40000 (spawn-fiber (lambda () (sleep 1)))) #:drain? #t)
(define (spawn-fiber-tree n leaf)
(do-times n (spawn-fiber
@@ -92,20 +93,20 @@
(if (= n 1)
(leaf)
(spawn-fiber-tree (1- n) leaf))))))
-(assert-run-fibers-terminates (spawn-fiber-tree 5 (lambda () (sleep 1))))
+(assert-run-fibers-terminates (spawn-fiber-tree 5 (lambda () (sleep 1))) #:drain? #t)
(define (spawn-fiber-chain n)
(spawn-fiber
(lambda ()
(unless (zero? (1- n))
(spawn-fiber-chain (1- n))))))
-(assert-run-fibers-terminates (spawn-fiber-chain 5))
-(assert-run-fibers-terminates (spawn-fiber-chain 50))
-(assert-run-fibers-terminates (spawn-fiber-chain 500))
-(assert-run-fibers-terminates (spawn-fiber-chain 5000))
-(assert-run-fibers-terminates (spawn-fiber-chain 50000))
-(assert-run-fibers-terminates (spawn-fiber-chain 500000))
-(assert-run-fibers-terminates (spawn-fiber-chain 5000000))
+(assert-run-fibers-terminates (spawn-fiber-chain 5) #:drain? #t)
+(assert-run-fibers-terminates (spawn-fiber-chain 50) #:drain? #t)
+(assert-run-fibers-terminates (spawn-fiber-chain 500) #:drain? #t)
+(assert-run-fibers-terminates (spawn-fiber-chain 5000) #:drain? #t)
+(assert-run-fibers-terminates (spawn-fiber-chain 50000) #:drain? #t)
+(assert-run-fibers-terminates (spawn-fiber-chain 500000) #:drain? #t)
+(assert-run-fibers-terminates (spawn-fiber-chain 5000000) #:drain? #t)
(let ((run-order 0))
(define (test-run-order count)
@@ -116,7 +117,7 @@
(error "bad run order" run-order n))
(set! run-order (1+ n)))))
(iota count)))
- (assert-run-fibers-terminates (test-run-order 10) #:parallelism 1))
+ (assert-run-fibers-terminates (test-run-order 10) #:parallelism 1 #:drain? #t))
(let ((run-order 0))
(define (test-wakeup-order count)
@@ -127,7 +128,7 @@
(error "bad run order" run-order n))
(set! run-order (1+ n)))))
(iota count)))
- (assert-run-fibers-terminates (test-wakeup-order 10) #:parallelism 1))
+ (assert-run-fibers-terminates (test-wakeup-order 10) #:parallelism 1 #:drain? #t))
(assert-run-fibers-returns (1) 1)
@@ -143,7 +144,7 @@
(set! failed? (< elapsed timeout)))))))
(assert-run-fibers-terminates
- (do-times 20 (check-sleep (random 1.0))))
+ (do-times 20 (check-sleep (random 1.0))) #:drain? #t)
;; exceptions
diff --git a/tests/speedup.scm b/tests/speedup.scm
index ea67d6a..6dfe969 100644
--- a/tests/speedup.scm
+++ b/tests/speedup.scm
@@ -40,9 +40,10 @@
(format #t "speedup for ~s: " 'exp)
(force-output)
(let ((thunk (lambda () exp)))
- (let ((t1 (time (lambda () (run-fibers thunk #:parallelism 1)))))
+ (let ((t1 (time (lambda ()
+ (run-fibers thunk #:parallelism 1 #:drain? #t)))))
(format #t "~a s" t1)
- (let ((t2 (time (lambda () (run-fibers thunk)))))
+ (let ((t2 (time (lambda () (run-fibers thunk #:drain? #t)))))
(format #t " / ~a s = ~ax (~a cpus)\n" t2 (/ t1 t2)
(current-processor-count)))))))