From 0c3fa5a7e76ea5000a7776c526a3707161e335ce Mon Sep 17 00:00:00 2001 From: Andy Wingo Date: Wed, 4 Jan 2017 21:11:55 +0100 Subject: Prepare for support for blocking operations * fibers/operations.scm (perform-operation): Block function takes sched and resume function, not fiber. Will allow blocking operations in the future. * fibers/internal.scm (add-timer): Rename from resume-on-timer; just call a thunk after a time has passed. * fibers/timers.scm (timer-operation): Adapt to resume-on-timer and operation changes. * fibers/channels.scm (put-operation, get-operation): Adapt to operation changes. --- fibers/channels.scm | 48 +++++++++++++++++++++++------------------------- fibers/internal.scm | 24 ++++++++---------------- fibers/operations.scm | 38 ++++++++++++++++++++++---------------- fibers/timers.scm | 10 +++++----- 4 files changed, 58 insertions(+), 62 deletions(-) diff --git a/fibers/channels.scm b/fibers/channels.scm index c38ac31..7676c37 100644 --- a/fibers/channels.scm +++ b/fibers/channels.scm @@ -73,7 +73,7 @@ with a receiver fiber to send @var{message} over @var{channel}." ;; Return #f if the getq was empty. (and getq* (match item - (#(get-flag get-fiber get-wrap-fn) + (#(get-flag resume-get get-wrap-fn) (let spin () (match (atomic-box-compare-and-swap! get-flag 'W 'S) ('W @@ -85,10 +85,10 @@ with a receiver fiber to send @var{message} over @var{channel}." ;; performs any other operation on this ;; channel. (maybe-commit) - (resume-fiber get-fiber (if get-wrap-fn - (lambda () - (get-wrap-fn message)) - (lambda () message))) + (resume-get (if get-wrap-fn + (lambda () + (get-wrap-fn message)) + (lambda () message))) ;; Continue directly. (lambda () (values))) ;; Get operation temporarily busy; try again. @@ -99,16 +99,16 @@ with a receiver fiber to send @var{message} over @var{channel}." ;; try again next time if no other fiber ;; handled it already. ('S (try (maybe-commit)))))))))))) - (define (block-fn put-flag put-fiber put-wrap-fn) + (define (block-fn put-flag put-sched resume-put put-wrap-fn) ;; We have suspended the current fiber; arrange for the fiber ;; to be resumed by a get operation by adding it to the channel's ;; putq. (define (not-me? item) (match item - (#(get-flag get-fiber get-wrap-fn) + (#(get-flag resume-get get-wrap-fn) (not (eq? put-flag get-flag))))) ;; First, publish this put operation. - (enqueue! putq-box (vector put-flag put-fiber put-wrap-fn message)) + (enqueue! putq-box (vector put-flag resume-put put-wrap-fn message)) ;; In the try phase, we scanned the getq for a get operation, ;; but we were unable to perform any of them. Since then, ;; there might be a new get operation on the queue. However @@ -131,7 +131,7 @@ with a receiver fiber to send @var{message} over @var{channel}." ;; We only have to service the getq if it is non-empty. (when getq* (match item - (#(get-flag get-fiber get-wrap-fn) + (#(get-flag resume-get get-wrap-fn) (match (atomic-box-ref get-flag) ('S ;; This get operation has already synchronized; @@ -151,11 +151,10 @@ with a receiver fiber to send @var{message} over @var{channel}." ;; getq, and resume both fibers. (atomic-box-set! put-flag 'S) (maybe-commit) - (resume-fiber get-fiber - (if get-wrap-fn - (lambda () (get-wrap-fn message)) - (lambda () message))) - (resume-fiber put-fiber (or put-wrap-fn values)) + (resume-get (if get-wrap-fn + (lambda () (get-wrap-fn message)) + (lambda () message))) + (resume-put (or put-wrap-fn values)) (values)) ('C ;; Other fiber trying to do the same @@ -196,7 +195,7 @@ with a sender fiber to receive one value from @var{channel}." ;; Return #f if the putq was empty. (and putq* (match item - (#(put-flag put-fiber put-wrap-fn message) + (#(put-flag resume-put put-wrap-fn message) (let spin () (match (atomic-box-compare-and-swap! put-flag 'W 'S) ('W @@ -206,7 +205,7 @@ with a sender fiber to receive one value from @var{channel}." ;; it before synchronizing any other ;; operation on this channel. (maybe-commit) - (resume-fiber put-fiber (or put-wrap-fn values)) + (resume-put (or put-wrap-fn values)) ;; Continue directly. (lambda () message)) ;; Put operation temporarily busy; try again. @@ -217,16 +216,16 @@ with a sender fiber to receive one value from @var{channel}." ;; try again next time if no other fiber ;; handled it already. ('S (try (maybe-commit)))))))))))) - (define (block-fn get-flag get-fiber get-wrap-fn) + (define (block-fn get-flag get-sched resume-get get-wrap-fn) ;; We have suspended the current fiber; arrange for the fiber ;; to be resumed by a put operation by adding it to the ;; channel's getq. (define (not-me? item) (match item - (#(put-flag put-fiber put-wrap-fn message) + (#(put-flag resume-put put-wrap-fn message) (not (eq? get-flag put-flag))))) ;; First, publish this get operation. - (enqueue! getq-box (vector get-flag get-fiber get-wrap-fn)) + (enqueue! getq-box (vector get-flag resume-get get-wrap-fn)) ;; In the try phase, we scanned the putq for a live put ;; operation, but we were unable to synchronize. Since then, ;; there might be a new operation on the putq. However only @@ -249,7 +248,7 @@ with a sender fiber to receive one value from @var{channel}." ;; We only have to service the putq if it is non-empty. (when putq* (match item - (#(put-flag put-fiber put-wrap-fn message) + (#(put-flag resume-put put-wrap-fn message) (match (atomic-box-ref put-flag) ('S ;; This put operation has already synchronized; @@ -270,11 +269,10 @@ with a sender fiber to receive one value from @var{channel}." ;; fibers for resumption. (atomic-box-set! get-flag 'S) (maybe-commit) - (resume-fiber get-fiber - (if get-wrap-fn - (lambda () (get-wrap-fn message)) - (lambda () message))) - (resume-fiber put-fiber (or put-wrap-fn values)) + (resume-get (if get-wrap-fn + (lambda () (get-wrap-fn message)) + (lambda () message))) + (resume-put (or put-wrap-fn values)) (values)) ('C ;; Other fiber trying to do the same diff --git a/fibers/internal.scm b/fibers/internal.scm index b283777..7f56c27 100644 --- a/fibers/internal.scm +++ b/fibers/internal.scm @@ -39,7 +39,7 @@ resume-on-readable-fd resume-on-writable-fd - resume-on-timer + add-timer create-fiber (current-fiber/public . current-fiber) @@ -445,18 +445,10 @@ becomes readable." becomes writable." (resume-on-fd-events fd EPOLLOUT fiber)) -(define (resume-on-timer fiber expiry get-thunk) - "Arrange to resume @var{fiber} when the absolute real time is -greater than or equal to @var{expiry}, expressed in internal time -units. The fiber will be resumed with the result of calling -@var{get-thunk}. If @var{get-thunk} returns @code{#f}, that indicates -that some other operation performed this operation first, and so no -resume is performed." - (let ((sched (fiber-scheduler fiber))) - (define (maybe-resume) - (let ((thunk (get-thunk))) - (when thunk (resume-fiber fiber thunk)))) - (set-scheduler-timers! sched - (psq-set (scheduler-timers sched) - (cons expiry maybe-resume) - expiry)))) +(define (add-timer sched expiry thunk) + "Arrange to call @var{thunk} when the absolute real time is greater +than or equal to @var{expiry}, expressed in internal time units." + (set-scheduler-timers! sched + (psq-set (scheduler-timers sched) + (cons expiry thunk) + expiry))) diff --git a/fibers/operations.scm b/fibers/operations.scm index e1a91cf..81ccf1b 100644 --- a/fibers/operations.scm +++ b/fibers/operations.scm @@ -120,26 +120,32 @@ succeeds, will succeed with one and only one of the sub-operations (define (perform-operation op) "Perform the operation @var{op} and return the resulting values. If the operation cannot complete directly, block until it can complete." - (define (block) - (suspend-current-fiber - (lambda (fiber) - (let ((flag (make-op-state))) - (match op - (($ wrap-fn try-fn block-fn) - (block-fn flag fiber wrap-fn)) - (($ base-ops) - (let lp ((i 0)) - (when (< i (vector-length base-ops)) - (match (vector-ref base-ops i) - (($ wrap-fn try-fn block-fn) - (block-fn flag fiber wrap-fn))) - (lp (1+ i)))))))))) + (define (block sched resume) + (let ((flag (make-op-state))) + (match op + (($ wrap-fn try-fn block-fn) + (block-fn flag sched resume wrap-fn)) + (($ base-ops) + (let lp ((i 0)) + (when (< i (vector-length base-ops)) + (match (vector-ref base-ops i) + (($ wrap-fn try-fn block-fn) + (block-fn flag sched resume wrap-fn))) + (lp (1+ i)))))))) + + (define (suspend) + (if (current-fiber) + (suspend-current-fiber + (lambda (fiber) + (define (resume thunk) (resume-fiber fiber thunk)) + (block (fiber-scheduler fiber) resume))) + (error "unimplemented"))) ;; First, try to sync on an op. If no op syncs, block. (match op (($ wrap-fn try-fn) (match (try-fn) - (#f (block)) + (#f (suspend)) (thunk (if wrap-fn (call-with-values thunk wrap-fn) @@ -157,4 +163,4 @@ the operation cannot complete directly, block until it can complete." (if wrap-fn (call-with-values thunk wrap-fn) (thunk)))))) - (block))))))) + (suspend))))))) diff --git a/fibers/timers.scm b/fibers/timers.scm index 9c743ae..d2468da 100644 --- a/fibers/timers.scm +++ b/fibers/timers.scm @@ -34,13 +34,13 @@ units. The operation will succeed with no values." (lambda () (and (< expiry (get-internal-real-time)) values)) - (lambda (flag fiber wrap-fn) - (define (get-resume-thunk) + (lambda (flag sched resume wrap-fn) + (define (timer) (match (atomic-box-compare-and-swap! flag 'W 'S) - ('W (or wrap-fn values)) - ('C (get-resume-thunk)) + ('W (resume (or wrap-fn values))) + ('C (timer)) ('S #f))) - (resume-on-timer fiber expiry get-resume-thunk)))) + (add-timer sched expiry timer)))) (define (wait-operation seconds) "Make an operation that will succeed with no values when -- cgit v1.2.3