diff options
| author | Andy Wingo <wingo@pobox.com> | 2016-09-08 15:00:00 +0200 |
|---|---|---|
| committer | Andy Wingo <wingo@pobox.com> | 2016-09-08 15:02:09 +0200 |
| commit | 985df09c94f2f47f2efc6ee8ff990424c49838ee (patch) | |
| tree | cbb9550a988cae4435ae675c5117ce96bc646b98 | |
| parent | Enable remote fiber wakeup (diff) | |
| download | guile-fibers-985df09c94f2f47f2efc6ee8ff990424c49838ee.tar.gz | |
Quit the scheduler when there is nothing left to do
* fibers/internal.scm: Detect when there is nothing left to run, and
stop the scheduler in that case.
* examples/ping-client.scm (client-loop):
* examples/memcached-client.scm (client-loop): Remove exit call, as we
will exit when needed.
| -rw-r--r-- | examples/memcached-client.scm | 8 | ||||
| -rw-r--r-- | examples/ping-client.scm | 8 | ||||
| -rw-r--r-- | fibers/internal.scm | 128 |
3 files changed, 81 insertions, 63 deletions
diff --git a/examples/memcached-client.scm b/examples/memcached-client.scm index 3b6e1ec..d9d0330 100644 --- a/examples/memcached-client.scm +++ b/examples/memcached-client.scm @@ -109,10 +109,7 @@ (connect port (addrinfo:addr addrinfo)) port)) -(define *active-clients* 0) - (define (client-loop addrinfo n num-connections) - (set! *active-clients* (1+ *active-clients*)) (let ((port (connect-to-server addrinfo)) (key (string-append "test-" (number->string n)))) (let lp ((m 0)) @@ -126,10 +123,7 @@ (unless (equal? (item-bv item) v) (server-error port "Bad response: ~A (expected ~A)" (item-bv item) v)) (lp (1+ m)))))) - (close-port port)) - (set! *active-clients* (1- *active-clients*)) - (when (zero? *active-clients*) - (exit 0))) + (close-port port))) (define (run-memcached-test num-clients num-connections) ;; The getaddrinfo call blocks, unfortunately. Call it once before diff --git a/examples/ping-client.scm b/examples/ping-client.scm index 6b8dd79..afbfd0d 100644 --- a/examples/ping-client.scm +++ b/examples/ping-client.scm @@ -38,10 +38,7 @@ (connect port (addrinfo:addr addrinfo)) port)) -(define *active-clients* 0) - (define (client-loop addrinfo n num-connections) - (set! *active-clients* (1+ *active-clients*)) (let ((port (connect-to-server addrinfo)) (test (string-append "test-" (number->string n)))) (let lp ((m 0)) @@ -54,10 +51,7 @@ (close-port port) (error "Bad response: ~A (expected ~A)" response test)) (lp (1+ m))))) - (close-port port)) - (set! *active-clients* (1- *active-clients*)) - (when (zero? *active-clients*) - (exit 0))) + (close-port port))) (define (run-ping-test num-clients num-connections) ;; The getaddrinfo call blocks, unfortunately. Call it once before diff --git a/fibers/internal.scm b/fibers/internal.scm index 69af10a..6df419c 100644 --- a/fibers/internal.scm +++ b/fibers/internal.scm @@ -46,10 +46,11 @@ resume-fiber)) (define-record-type <scheduler> - (%make-scheduler epfd prompt-tag runnables sources sleepers - inbox inbox-state wake-pipe) + (%make-scheduler epfd active-fd-count prompt-tag runnables + sources sleepers inbox inbox-state wake-pipe) nio? (epfd scheduler-epfd) + (active-fd-count scheduler-active-fd-count set-scheduler-active-fd-count!) (prompt-tag scheduler-prompt-tag) ;; (fiber ...) (runnables scheduler-runnables set-scheduler-runnables!) @@ -95,7 +96,7 @@ (match wake-pipe ((read-pipe . _) (epoll-add! epfd (fileno read-pipe) EPOLLIN))) - (%make-scheduler epfd (make-prompt-tag "fibers") + (%make-scheduler epfd 0 (make-prompt-tag "fibers") '() (make-hash-table) '() (make-atomic-box '()) (make-atomic-box 'will-check) wake-pipe))) @@ -127,8 +128,8 @@ (when (eq? (fiber-state fiber) 'suspended) (set-fiber-state! fiber 'runnable) (set-fiber-data! fiber thunk) - (let ((runnables (scheduler-runnables sched))) - (set-scheduler-runnables! sched (cons fiber runnables))))) + (set-scheduler-runnables! sched + (cons fiber (scheduler-runnables sched))))) (define (schedule/remote) (atomic-box-prepend! (scheduler-inbox sched) (cons fiber thunk)) (match (atomic-box-ref (scheduler-inbox-state sched)) @@ -161,6 +162,8 @@ (else (warn "scheduler for unknown fd" fd)))))) (sources + (set-scheduler-active-fd-count! sched + (1- (scheduler-active-fd-count sched))) (for-each (lambda (source) ;; FIXME: If we were waiting with a timeout, this ;; fiber might still be in "sleepers", and we should @@ -181,22 +184,25 @@ (set-cdr! (car sources) #f)))))) (define (scheduler-poll-timeout sched) - (match (atomic-box-ref (scheduler-inbox sched)) - ((_ . _) - ;; There are pending requests in our inbox, so we don't need to - ;; sleep at all. - 0) - (() - (match (scheduler-sleepers sched) - ;; The sleepers list is sorted so the first element - ;; should be the one whose wake time is soonest. - (((fiber . expiry) . sleepers) - (let ((now (get-internal-real-time))) - (if (< expiry now) - 0 - (round/ (- expiry now) - internal-time-units-per-millisecond)))) - (_ -1))))) + (cond + ((not (null? (atomic-box-ref (scheduler-inbox sched)))) + ;; There are pending requests in our inbox, so we don't need to + ;; sleep at all. + 0) + ((not (null? (scheduler-runnables sched))) + ;; Likewise, don't sleep if there are runnables scheduled already. + 0) + (else + (match (scheduler-sleepers sched) + ;; The sleepers list is sorted so the first element + ;; should be the one whose wake time is soonest. + (((fiber . expiry) . sleepers) + (let ((now (get-internal-real-time))) + (if (< expiry now) + 0 + (round/ (- expiry now) + internal-time-units-per-millisecond)))) + (_ -1))))) (define (wake-sleepers sched) (let ((now (get-internal-real-time))) @@ -220,26 +226,33 @@ (resume-fiber fiber thunk))) (atomic-box-swap! (scheduler-inbox sched) '()))) -(define (poll-for-events sched) - ;; Called when the runnables list is empty. Poll for some active - ;; FD's and schedule their corresponding fibers. Also schedule any - ;; sleepers that have timed out. - (atomic-box-set! (scheduler-inbox-state sched) 'needs-wake) - (epoll (scheduler-epfd sched) - 32 ; maxevents - (scheduler-poll-timeout sched) - #:folder (lambda (fd revents seed) - (schedule-fibers-for-fd fd revents sched) - seed)) - (atomic-box-set! (scheduler-inbox-state sched) 'will-check) +(define (schedule-runnables-for-next-turn sched) + ;; Called when all runnables from the current turn have been run. + ;; Note that the there may be runnables already scheduled for the + ;; next turn; one way this can happen is if a fiber suspended itself + ;; because it was blocked on a channel, but then another fiber woke + ;; it up. In any case, check the kernel to see if any of the fd's + ;; that we are interested in are active, and in that case schedule + ;; their corresponding fibers. Also schedule any sleepers that have + ;; timed out, and process the inbox that receives + ;; requests-to-schedule from remote threads. + (unless (zero? (scheduler-active-fd-count sched)) + (atomic-box-set! (scheduler-inbox-state sched) 'needs-wake) + (epoll (scheduler-epfd sched) + 32 ; maxevents + (scheduler-poll-timeout sched) + #:folder (lambda (fd revents seed) + (schedule-fibers-for-fd fd revents sched) + seed)) + (atomic-box-set! (scheduler-inbox-state sched) 'will-check)) (handle-inbox sched) (wake-sleepers sched)) -(define* (run-fiber sched fiber) +(define* (run-fiber fiber) (when (eq? (fiber-state fiber) 'runnable) (parameterize ((current-fiber fiber)) (call-with-prompt - (scheduler-prompt-tag sched) + (scheduler-prompt-tag (fiber-scheduler fiber)) (lambda () (let ((thunk (fiber-data fiber))) (set-fiber-state! fiber 'running) @@ -250,20 +263,28 @@ (set-fiber-data! fiber k) (after-suspend fiber)))))) +(define (scheduler-finished? sched) + (let/ec return + (define (only-finished-if bool) + (if bool #t (return #f))) + (only-finished-if (zero? (scheduler-active-fd-count sched))) + (only-finished-if (null? (atomic-box-ref (scheduler-inbox sched)))) + (only-finished-if (null? (scheduler-sleepers sched))))) + (define (run-scheduler sched) (let lp () - (let ((runnables (scheduler-runnables sched))) - (cond - ((pair? runnables) - (let ((fiber (car runnables))) - (set-scheduler-runnables! sched (cdr runnables)) - (run-fiber sched fiber) - (lp))) - ((poll-for-events sched) - (lp)) - (else - ;; Nothing runnable; quit. - (values)))))) + (schedule-runnables-for-next-turn sched) + (match (scheduler-runnables sched) + (() + ;; Could be the scheduler is stopping, or it could be that we + ;; got a spurious wakeup. In any case, this is the place to + ;; check to see whether the scheduler is really done. + (unless (scheduler-finished? sched) + (lp))) + (runnables + (set-scheduler-runnables! sched '()) + (for-each run-fiber runnables) + (lp))))) (define (destroy-scheduler sched) #; @@ -303,7 +324,7 @@ (thunk (lambda () (cont thunk)))) (schedule-fiber! fiber thunk))) -(define (finalize-fd ctx fd) +(define (finalize-fd sched fd) "Remove data associated with @var{fd} from the scheduler @var{ctx}. Called by Guile just before Guile goes to close a file descriptor, in response either to an explicit call to @code{close-port}, or because @@ -315,7 +336,11 @@ from a finalizer thread." ;; ;; FIXME: Take a lock on the sources table? ;; FIXME: Wake all sources with EPOLLERR. - (hashv-remove! (scheduler-sources ctx) fd)) + (let ((sources-table (scheduler-sources sched))) + (when (hashv-ref sources-table fd) + (set-scheduler-active-fd-count! sched + (1- (scheduler-active-fd-count sched))) + (hashv-remove! sources-table fd)))) (define (add-fd-events! sched fd events fiber) (let ((sources (hashv-ref (scheduler-sources sched) fd))) @@ -323,12 +348,17 @@ from a finalizer thread." (sources (set-cdr! sources (cons (make-source events #f fiber) (cdr sources))) (let ((active-events (caar sources))) + (unless active-events + (set-scheduler-active-fd-count! sched + (1+ (scheduler-active-fd-count sched)))) (unless (and active-events (= (logand events active-events) events)) (set-car! (car sources) (logior events (or active-events 0))) (epoll-modify! (scheduler-epfd sched) fd (logior (caar sources) EPOLLONESHOT))))) (else + (set-scheduler-active-fd-count! sched + (1+ (scheduler-active-fd-count sched))) (hashv-set! (scheduler-sources sched) fd (acons events #f (list (make-source events #f fiber)))) |
