summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Wingo <wingo@pobox.com>2016-09-08 15:00:00 +0200
committerAndy Wingo <wingo@pobox.com>2016-09-08 15:02:09 +0200
commit985df09c94f2f47f2efc6ee8ff990424c49838ee (patch)
treecbb9550a988cae4435ae675c5117ce96bc646b98
parentEnable remote fiber wakeup (diff)
downloadguile-fibers-985df09c94f2f47f2efc6ee8ff990424c49838ee.tar.gz
Quit the scheduler when there is nothing left to do
* fibers/internal.scm: Detect when there is nothing left to run, and stop the scheduler in that case. * examples/ping-client.scm (client-loop): * examples/memcached-client.scm (client-loop): Remove exit call, as we will exit when needed.
-rw-r--r--examples/memcached-client.scm8
-rw-r--r--examples/ping-client.scm8
-rw-r--r--fibers/internal.scm128
3 files changed, 81 insertions, 63 deletions
diff --git a/examples/memcached-client.scm b/examples/memcached-client.scm
index 3b6e1ec..d9d0330 100644
--- a/examples/memcached-client.scm
+++ b/examples/memcached-client.scm
@@ -109,10 +109,7 @@
(connect port (addrinfo:addr addrinfo))
port))
-(define *active-clients* 0)
-
(define (client-loop addrinfo n num-connections)
- (set! *active-clients* (1+ *active-clients*))
(let ((port (connect-to-server addrinfo))
(key (string-append "test-" (number->string n))))
(let lp ((m 0))
@@ -126,10 +123,7 @@
(unless (equal? (item-bv item) v)
(server-error port "Bad response: ~A (expected ~A)" (item-bv item) v))
(lp (1+ m))))))
- (close-port port))
- (set! *active-clients* (1- *active-clients*))
- (when (zero? *active-clients*)
- (exit 0)))
+ (close-port port)))
(define (run-memcached-test num-clients num-connections)
;; The getaddrinfo call blocks, unfortunately. Call it once before
diff --git a/examples/ping-client.scm b/examples/ping-client.scm
index 6b8dd79..afbfd0d 100644
--- a/examples/ping-client.scm
+++ b/examples/ping-client.scm
@@ -38,10 +38,7 @@
(connect port (addrinfo:addr addrinfo))
port))
-(define *active-clients* 0)
-
(define (client-loop addrinfo n num-connections)
- (set! *active-clients* (1+ *active-clients*))
(let ((port (connect-to-server addrinfo))
(test (string-append "test-" (number->string n))))
(let lp ((m 0))
@@ -54,10 +51,7 @@
(close-port port)
(error "Bad response: ~A (expected ~A)" response test))
(lp (1+ m)))))
- (close-port port))
- (set! *active-clients* (1- *active-clients*))
- (when (zero? *active-clients*)
- (exit 0)))
+ (close-port port)))
(define (run-ping-test num-clients num-connections)
;; The getaddrinfo call blocks, unfortunately. Call it once before
diff --git a/fibers/internal.scm b/fibers/internal.scm
index 69af10a..6df419c 100644
--- a/fibers/internal.scm
+++ b/fibers/internal.scm
@@ -46,10 +46,11 @@
resume-fiber))
(define-record-type <scheduler>
- (%make-scheduler epfd prompt-tag runnables sources sleepers
- inbox inbox-state wake-pipe)
+ (%make-scheduler epfd active-fd-count prompt-tag runnables
+ sources sleepers inbox inbox-state wake-pipe)
nio?
(epfd scheduler-epfd)
+ (active-fd-count scheduler-active-fd-count set-scheduler-active-fd-count!)
(prompt-tag scheduler-prompt-tag)
;; (fiber ...)
(runnables scheduler-runnables set-scheduler-runnables!)
@@ -95,7 +96,7 @@
(match wake-pipe
((read-pipe . _)
(epoll-add! epfd (fileno read-pipe) EPOLLIN)))
- (%make-scheduler epfd (make-prompt-tag "fibers")
+ (%make-scheduler epfd 0 (make-prompt-tag "fibers")
'() (make-hash-table) '()
(make-atomic-box '()) (make-atomic-box 'will-check)
wake-pipe)))
@@ -127,8 +128,8 @@
(when (eq? (fiber-state fiber) 'suspended)
(set-fiber-state! fiber 'runnable)
(set-fiber-data! fiber thunk)
- (let ((runnables (scheduler-runnables sched)))
- (set-scheduler-runnables! sched (cons fiber runnables)))))
+ (set-scheduler-runnables! sched
+ (cons fiber (scheduler-runnables sched)))))
(define (schedule/remote)
(atomic-box-prepend! (scheduler-inbox sched) (cons fiber thunk))
(match (atomic-box-ref (scheduler-inbox-state sched))
@@ -161,6 +162,8 @@
(else
(warn "scheduler for unknown fd" fd))))))
(sources
+ (set-scheduler-active-fd-count! sched
+ (1- (scheduler-active-fd-count sched)))
(for-each (lambda (source)
;; FIXME: If we were waiting with a timeout, this
;; fiber might still be in "sleepers", and we should
@@ -181,22 +184,25 @@
(set-cdr! (car sources) #f))))))
(define (scheduler-poll-timeout sched)
- (match (atomic-box-ref (scheduler-inbox sched))
- ((_ . _)
- ;; There are pending requests in our inbox, so we don't need to
- ;; sleep at all.
- 0)
- (()
- (match (scheduler-sleepers sched)
- ;; The sleepers list is sorted so the first element
- ;; should be the one whose wake time is soonest.
- (((fiber . expiry) . sleepers)
- (let ((now (get-internal-real-time)))
- (if (< expiry now)
- 0
- (round/ (- expiry now)
- internal-time-units-per-millisecond))))
- (_ -1)))))
+ (cond
+ ((not (null? (atomic-box-ref (scheduler-inbox sched))))
+ ;; There are pending requests in our inbox, so we don't need to
+ ;; sleep at all.
+ 0)
+ ((not (null? (scheduler-runnables sched)))
+ ;; Likewise, don't sleep if there are runnables scheduled already.
+ 0)
+ (else
+ (match (scheduler-sleepers sched)
+ ;; The sleepers list is sorted so the first element
+ ;; should be the one whose wake time is soonest.
+ (((fiber . expiry) . sleepers)
+ (let ((now (get-internal-real-time)))
+ (if (< expiry now)
+ 0
+ (round/ (- expiry now)
+ internal-time-units-per-millisecond))))
+ (_ -1)))))
(define (wake-sleepers sched)
(let ((now (get-internal-real-time)))
@@ -220,26 +226,33 @@
(resume-fiber fiber thunk)))
(atomic-box-swap! (scheduler-inbox sched) '())))
-(define (poll-for-events sched)
- ;; Called when the runnables list is empty. Poll for some active
- ;; FD's and schedule their corresponding fibers. Also schedule any
- ;; sleepers that have timed out.
- (atomic-box-set! (scheduler-inbox-state sched) 'needs-wake)
- (epoll (scheduler-epfd sched)
- 32 ; maxevents
- (scheduler-poll-timeout sched)
- #:folder (lambda (fd revents seed)
- (schedule-fibers-for-fd fd revents sched)
- seed))
- (atomic-box-set! (scheduler-inbox-state sched) 'will-check)
+(define (schedule-runnables-for-next-turn sched)
+ ;; Called when all runnables from the current turn have been run.
+ ;; Note that the there may be runnables already scheduled for the
+ ;; next turn; one way this can happen is if a fiber suspended itself
+ ;; because it was blocked on a channel, but then another fiber woke
+ ;; it up. In any case, check the kernel to see if any of the fd's
+ ;; that we are interested in are active, and in that case schedule
+ ;; their corresponding fibers. Also schedule any sleepers that have
+ ;; timed out, and process the inbox that receives
+ ;; requests-to-schedule from remote threads.
+ (unless (zero? (scheduler-active-fd-count sched))
+ (atomic-box-set! (scheduler-inbox-state sched) 'needs-wake)
+ (epoll (scheduler-epfd sched)
+ 32 ; maxevents
+ (scheduler-poll-timeout sched)
+ #:folder (lambda (fd revents seed)
+ (schedule-fibers-for-fd fd revents sched)
+ seed))
+ (atomic-box-set! (scheduler-inbox-state sched) 'will-check))
(handle-inbox sched)
(wake-sleepers sched))
-(define* (run-fiber sched fiber)
+(define* (run-fiber fiber)
(when (eq? (fiber-state fiber) 'runnable)
(parameterize ((current-fiber fiber))
(call-with-prompt
- (scheduler-prompt-tag sched)
+ (scheduler-prompt-tag (fiber-scheduler fiber))
(lambda ()
(let ((thunk (fiber-data fiber)))
(set-fiber-state! fiber 'running)
@@ -250,20 +263,28 @@
(set-fiber-data! fiber k)
(after-suspend fiber))))))
+(define (scheduler-finished? sched)
+ (let/ec return
+ (define (only-finished-if bool)
+ (if bool #t (return #f)))
+ (only-finished-if (zero? (scheduler-active-fd-count sched)))
+ (only-finished-if (null? (atomic-box-ref (scheduler-inbox sched))))
+ (only-finished-if (null? (scheduler-sleepers sched)))))
+
(define (run-scheduler sched)
(let lp ()
- (let ((runnables (scheduler-runnables sched)))
- (cond
- ((pair? runnables)
- (let ((fiber (car runnables)))
- (set-scheduler-runnables! sched (cdr runnables))
- (run-fiber sched fiber)
- (lp)))
- ((poll-for-events sched)
- (lp))
- (else
- ;; Nothing runnable; quit.
- (values))))))
+ (schedule-runnables-for-next-turn sched)
+ (match (scheduler-runnables sched)
+ (()
+ ;; Could be the scheduler is stopping, or it could be that we
+ ;; got a spurious wakeup. In any case, this is the place to
+ ;; check to see whether the scheduler is really done.
+ (unless (scheduler-finished? sched)
+ (lp)))
+ (runnables
+ (set-scheduler-runnables! sched '())
+ (for-each run-fiber runnables)
+ (lp)))))
(define (destroy-scheduler sched)
#;
@@ -303,7 +324,7 @@
(thunk (lambda () (cont thunk))))
(schedule-fiber! fiber thunk)))
-(define (finalize-fd ctx fd)
+(define (finalize-fd sched fd)
"Remove data associated with @var{fd} from the scheduler @var{ctx}.
Called by Guile just before Guile goes to close a file descriptor, in
response either to an explicit call to @code{close-port}, or because
@@ -315,7 +336,11 @@ from a finalizer thread."
;;
;; FIXME: Take a lock on the sources table?
;; FIXME: Wake all sources with EPOLLERR.
- (hashv-remove! (scheduler-sources ctx) fd))
+ (let ((sources-table (scheduler-sources sched)))
+ (when (hashv-ref sources-table fd)
+ (set-scheduler-active-fd-count! sched
+ (1- (scheduler-active-fd-count sched)))
+ (hashv-remove! sources-table fd))))
(define (add-fd-events! sched fd events fiber)
(let ((sources (hashv-ref (scheduler-sources sched) fd)))
@@ -323,12 +348,17 @@ from a finalizer thread."
(sources
(set-cdr! sources (cons (make-source events #f fiber) (cdr sources)))
(let ((active-events (caar sources)))
+ (unless active-events
+ (set-scheduler-active-fd-count! sched
+ (1+ (scheduler-active-fd-count sched))))
(unless (and active-events
(= (logand events active-events) events))
(set-car! (car sources) (logior events (or active-events 0)))
(epoll-modify! (scheduler-epfd sched) fd
(logior (caar sources) EPOLLONESHOT)))))
(else
+ (set-scheduler-active-fd-count! sched
+ (1+ (scheduler-active-fd-count sched)))
(hashv-set! (scheduler-sources sched)
fd (acons events #f
(list (make-source events #f fiber))))