diff options
| author | Andy Wingo <wingo@pobox.com> | 2017-01-04 21:25:07 +0100 |
|---|---|---|
| committer | Andy Wingo <wingo@pobox.com> | 2017-01-04 21:25:07 +0100 |
| commit | 7574fd55622e9cca16c268b97aded1f399036c12 (patch) | |
| tree | e0745e46222c9ae5c4e5e6c3cf2a895cab145a01 | |
| parent | Prepare for support for blocking operations (diff) | |
| download | guile-fibers-7574fd55622e9cca16c268b97aded1f399036c12.tar.gz | |
Centralize operation wrap handling
* fibers/operations.scm (<base-op>): Update comment.
(perform-operation): Wrap resume proc before calling block function.
* fibers/channels.scm (put-operation, get-operation):
* fibers/timers.scm (timer-operation): Adapt to pre-wrapped resume proc.
| -rw-r--r-- | fibers/channels.scm | 39 | ||||
| -rw-r--r-- | fibers/operations.scm | 13 | ||||
| -rw-r--r-- | fibers/timers.scm | 4 |
3 files changed, 28 insertions, 28 deletions
diff --git a/fibers/channels.scm b/fibers/channels.scm index 7676c37..9f946da 100644 --- a/fibers/channels.scm +++ b/fibers/channels.scm @@ -73,7 +73,7 @@ with a receiver fiber to send @var{message} over @var{channel}." ;; Return #f if the getq was empty. (and getq* (match item - (#(get-flag resume-get get-wrap-fn) + (#(get-flag resume-get) (let spin () (match (atomic-box-compare-and-swap! get-flag 'W 'S) ('W @@ -85,10 +85,7 @@ with a receiver fiber to send @var{message} over @var{channel}." ;; performs any other operation on this ;; channel. (maybe-commit) - (resume-get (if get-wrap-fn - (lambda () - (get-wrap-fn message)) - (lambda () message))) + (resume-get (lambda () message)) ;; Continue directly. (lambda () (values))) ;; Get operation temporarily busy; try again. @@ -99,16 +96,16 @@ with a receiver fiber to send @var{message} over @var{channel}." ;; try again next time if no other fiber ;; handled it already. ('S (try (maybe-commit)))))))))))) - (define (block-fn put-flag put-sched resume-put put-wrap-fn) + (define (block-fn put-flag put-sched resume-put) ;; We have suspended the current fiber; arrange for the fiber ;; to be resumed by a get operation by adding it to the channel's ;; putq. (define (not-me? item) (match item - (#(get-flag resume-get get-wrap-fn) + (#(get-flag resume-get) (not (eq? put-flag get-flag))))) ;; First, publish this put operation. - (enqueue! putq-box (vector put-flag resume-put put-wrap-fn message)) + (enqueue! putq-box (vector put-flag resume-put message)) ;; In the try phase, we scanned the getq for a get operation, ;; but we were unable to perform any of them. Since then, ;; there might be a new get operation on the queue. However @@ -131,7 +128,7 @@ with a receiver fiber to send @var{message} over @var{channel}." ;; We only have to service the getq if it is non-empty. (when getq* (match item - (#(get-flag resume-get get-wrap-fn) + (#(get-flag resume-get) (match (atomic-box-ref get-flag) ('S ;; This get operation has already synchronized; @@ -151,10 +148,8 @@ with a receiver fiber to send @var{message} over @var{channel}." ;; getq, and resume both fibers. (atomic-box-set! put-flag 'S) (maybe-commit) - (resume-get (if get-wrap-fn - (lambda () (get-wrap-fn message)) - (lambda () message))) - (resume-put (or put-wrap-fn values)) + (resume-get (lambda () message)) + (resume-put values) (values)) ('C ;; Other fiber trying to do the same @@ -195,7 +190,7 @@ with a sender fiber to receive one value from @var{channel}." ;; Return #f if the putq was empty. (and putq* (match item - (#(put-flag resume-put put-wrap-fn message) + (#(put-flag resume-put message) (let spin () (match (atomic-box-compare-and-swap! put-flag 'W 'S) ('W @@ -205,7 +200,7 @@ with a sender fiber to receive one value from @var{channel}." ;; it before synchronizing any other ;; operation on this channel. (maybe-commit) - (resume-put (or put-wrap-fn values)) + (resume-put values) ;; Continue directly. (lambda () message)) ;; Put operation temporarily busy; try again. @@ -216,16 +211,16 @@ with a sender fiber to receive one value from @var{channel}." ;; try again next time if no other fiber ;; handled it already. ('S (try (maybe-commit)))))))))))) - (define (block-fn get-flag get-sched resume-get get-wrap-fn) + (define (block-fn get-flag get-sched resume-get) ;; We have suspended the current fiber; arrange for the fiber ;; to be resumed by a put operation by adding it to the ;; channel's getq. (define (not-me? item) (match item - (#(put-flag resume-put put-wrap-fn message) + (#(put-flag resume-put message) (not (eq? get-flag put-flag))))) ;; First, publish this get operation. - (enqueue! getq-box (vector get-flag resume-get get-wrap-fn)) + (enqueue! getq-box (vector get-flag resume-get)) ;; In the try phase, we scanned the putq for a live put ;; operation, but we were unable to synchronize. Since then, ;; there might be a new operation on the putq. However only @@ -248,7 +243,7 @@ with a sender fiber to receive one value from @var{channel}." ;; We only have to service the putq if it is non-empty. (when putq* (match item - (#(put-flag resume-put put-wrap-fn message) + (#(put-flag resume-put message) (match (atomic-box-ref put-flag) ('S ;; This put operation has already synchronized; @@ -269,10 +264,8 @@ with a sender fiber to receive one value from @var{channel}." ;; fibers for resumption. (atomic-box-set! get-flag 'S) (maybe-commit) - (resume-get (if get-wrap-fn - (lambda () (get-wrap-fn message)) - (lambda () message))) - (resume-put (or put-wrap-fn values)) + (resume-get (lambda () message)) + (resume-put values) (values)) ('C ;; Other fiber trying to do the same diff --git a/fibers/operations.scm b/fibers/operations.scm index 81ccf1b..7ee21d5 100644 --- a/fibers/operations.scm +++ b/fibers/operations.scm @@ -69,7 +69,7 @@ (wrap-fn base-op-wrap-fn) ;; () -> (thunk | #f) (try-fn base-op-try-fn) - ;; (op-state resume-k wrap-fn) -> () + ;; (op-state sched resume-k) -> () (block-fn base-op-block-fn)) (define-record-type <choice-op> @@ -120,17 +120,24 @@ succeeds, will succeed with one and only one of the sub-operations (define (perform-operation op) "Perform the operation @var{op} and return the resulting values. If the operation cannot complete directly, block until it can complete." + (define (wrap-resume resume wrap-fn) + (if wrap-fn + (lambda (thunk) + (resume (lambda () + (call-with-values thunk wrap-fn)))) + resume)) + (define (block sched resume) (let ((flag (make-op-state))) (match op (($ <base-op> wrap-fn try-fn block-fn) - (block-fn flag sched resume wrap-fn)) + (block-fn flag sched (wrap-resume resume wrap-fn))) (($ <choice-op> base-ops) (let lp ((i 0)) (when (< i (vector-length base-ops)) (match (vector-ref base-ops i) (($ <base-op> wrap-fn try-fn block-fn) - (block-fn flag sched resume wrap-fn))) + (block-fn flag sched (wrap-resume resume wrap-fn)))) (lp (1+ i)))))))) (define (suspend) diff --git a/fibers/timers.scm b/fibers/timers.scm index d2468da..4affd54 100644 --- a/fibers/timers.scm +++ b/fibers/timers.scm @@ -34,10 +34,10 @@ units. The operation will succeed with no values." (lambda () (and (< expiry (get-internal-real-time)) values)) - (lambda (flag sched resume wrap-fn) + (lambda (flag sched resume) (define (timer) (match (atomic-box-compare-and-swap! flag 'W 'S) - ('W (resume (or wrap-fn values))) + ('W (resume values)) ('C (timer)) ('S #f))) (add-timer sched expiry timer)))) |
