summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Wingo <wingo@pobox.com>2017-01-04 21:25:07 +0100
committerAndy Wingo <wingo@pobox.com>2017-01-04 21:25:07 +0100
commit7574fd55622e9cca16c268b97aded1f399036c12 (patch)
treee0745e46222c9ae5c4e5e6c3cf2a895cab145a01
parentPrepare for support for blocking operations (diff)
downloadguile-fibers-7574fd55622e9cca16c268b97aded1f399036c12.tar.gz
Centralize operation wrap handling
* fibers/operations.scm (<base-op>): Update comment. (perform-operation): Wrap resume proc before calling block function. * fibers/channels.scm (put-operation, get-operation): * fibers/timers.scm (timer-operation): Adapt to pre-wrapped resume proc.
-rw-r--r--fibers/channels.scm39
-rw-r--r--fibers/operations.scm13
-rw-r--r--fibers/timers.scm4
3 files changed, 28 insertions, 28 deletions
diff --git a/fibers/channels.scm b/fibers/channels.scm
index 7676c37..9f946da 100644
--- a/fibers/channels.scm
+++ b/fibers/channels.scm
@@ -73,7 +73,7 @@ with a receiver fiber to send @var{message} over @var{channel}."
;; Return #f if the getq was empty.
(and getq*
(match item
- (#(get-flag resume-get get-wrap-fn)
+ (#(get-flag resume-get)
(let spin ()
(match (atomic-box-compare-and-swap! get-flag 'W 'S)
('W
@@ -85,10 +85,7 @@ with a receiver fiber to send @var{message} over @var{channel}."
;; performs any other operation on this
;; channel.
(maybe-commit)
- (resume-get (if get-wrap-fn
- (lambda ()
- (get-wrap-fn message))
- (lambda () message)))
+ (resume-get (lambda () message))
;; Continue directly.
(lambda () (values)))
;; Get operation temporarily busy; try again.
@@ -99,16 +96,16 @@ with a receiver fiber to send @var{message} over @var{channel}."
;; try again next time if no other fiber
;; handled it already.
('S (try (maybe-commit))))))))))))
- (define (block-fn put-flag put-sched resume-put put-wrap-fn)
+ (define (block-fn put-flag put-sched resume-put)
;; We have suspended the current fiber; arrange for the fiber
;; to be resumed by a get operation by adding it to the channel's
;; putq.
(define (not-me? item)
(match item
- (#(get-flag resume-get get-wrap-fn)
+ (#(get-flag resume-get)
(not (eq? put-flag get-flag)))))
;; First, publish this put operation.
- (enqueue! putq-box (vector put-flag resume-put put-wrap-fn message))
+ (enqueue! putq-box (vector put-flag resume-put message))
;; In the try phase, we scanned the getq for a get operation,
;; but we were unable to perform any of them. Since then,
;; there might be a new get operation on the queue. However
@@ -131,7 +128,7 @@ with a receiver fiber to send @var{message} over @var{channel}."
;; We only have to service the getq if it is non-empty.
(when getq*
(match item
- (#(get-flag resume-get get-wrap-fn)
+ (#(get-flag resume-get)
(match (atomic-box-ref get-flag)
('S
;; This get operation has already synchronized;
@@ -151,10 +148,8 @@ with a receiver fiber to send @var{message} over @var{channel}."
;; getq, and resume both fibers.
(atomic-box-set! put-flag 'S)
(maybe-commit)
- (resume-get (if get-wrap-fn
- (lambda () (get-wrap-fn message))
- (lambda () message)))
- (resume-put (or put-wrap-fn values))
+ (resume-get (lambda () message))
+ (resume-put values)
(values))
('C
;; Other fiber trying to do the same
@@ -195,7 +190,7 @@ with a sender fiber to receive one value from @var{channel}."
;; Return #f if the putq was empty.
(and putq*
(match item
- (#(put-flag resume-put put-wrap-fn message)
+ (#(put-flag resume-put message)
(let spin ()
(match (atomic-box-compare-and-swap! put-flag 'W 'S)
('W
@@ -205,7 +200,7 @@ with a sender fiber to receive one value from @var{channel}."
;; it before synchronizing any other
;; operation on this channel.
(maybe-commit)
- (resume-put (or put-wrap-fn values))
+ (resume-put values)
;; Continue directly.
(lambda () message))
;; Put operation temporarily busy; try again.
@@ -216,16 +211,16 @@ with a sender fiber to receive one value from @var{channel}."
;; try again next time if no other fiber
;; handled it already.
('S (try (maybe-commit))))))))))))
- (define (block-fn get-flag get-sched resume-get get-wrap-fn)
+ (define (block-fn get-flag get-sched resume-get)
;; We have suspended the current fiber; arrange for the fiber
;; to be resumed by a put operation by adding it to the
;; channel's getq.
(define (not-me? item)
(match item
- (#(put-flag resume-put put-wrap-fn message)
+ (#(put-flag resume-put message)
(not (eq? get-flag put-flag)))))
;; First, publish this get operation.
- (enqueue! getq-box (vector get-flag resume-get get-wrap-fn))
+ (enqueue! getq-box (vector get-flag resume-get))
;; In the try phase, we scanned the putq for a live put
;; operation, but we were unable to synchronize. Since then,
;; there might be a new operation on the putq. However only
@@ -248,7 +243,7 @@ with a sender fiber to receive one value from @var{channel}."
;; We only have to service the putq if it is non-empty.
(when putq*
(match item
- (#(put-flag resume-put put-wrap-fn message)
+ (#(put-flag resume-put message)
(match (atomic-box-ref put-flag)
('S
;; This put operation has already synchronized;
@@ -269,10 +264,8 @@ with a sender fiber to receive one value from @var{channel}."
;; fibers for resumption.
(atomic-box-set! get-flag 'S)
(maybe-commit)
- (resume-get (if get-wrap-fn
- (lambda () (get-wrap-fn message))
- (lambda () message)))
- (resume-put (or put-wrap-fn values))
+ (resume-get (lambda () message))
+ (resume-put values)
(values))
('C
;; Other fiber trying to do the same
diff --git a/fibers/operations.scm b/fibers/operations.scm
index 81ccf1b..7ee21d5 100644
--- a/fibers/operations.scm
+++ b/fibers/operations.scm
@@ -69,7 +69,7 @@
(wrap-fn base-op-wrap-fn)
;; () -> (thunk | #f)
(try-fn base-op-try-fn)
- ;; (op-state resume-k wrap-fn) -> ()
+ ;; (op-state sched resume-k) -> ()
(block-fn base-op-block-fn))
(define-record-type <choice-op>
@@ -120,17 +120,24 @@ succeeds, will succeed with one and only one of the sub-operations
(define (perform-operation op)
"Perform the operation @var{op} and return the resulting values. If
the operation cannot complete directly, block until it can complete."
+ (define (wrap-resume resume wrap-fn)
+ (if wrap-fn
+ (lambda (thunk)
+ (resume (lambda ()
+ (call-with-values thunk wrap-fn))))
+ resume))
+
(define (block sched resume)
(let ((flag (make-op-state)))
(match op
(($ <base-op> wrap-fn try-fn block-fn)
- (block-fn flag sched resume wrap-fn))
+ (block-fn flag sched (wrap-resume resume wrap-fn)))
(($ <choice-op> base-ops)
(let lp ((i 0))
(when (< i (vector-length base-ops))
(match (vector-ref base-ops i)
(($ <base-op> wrap-fn try-fn block-fn)
- (block-fn flag sched resume wrap-fn)))
+ (block-fn flag sched (wrap-resume resume wrap-fn))))
(lp (1+ i))))))))
(define (suspend)
diff --git a/fibers/timers.scm b/fibers/timers.scm
index d2468da..4affd54 100644
--- a/fibers/timers.scm
+++ b/fibers/timers.scm
@@ -34,10 +34,10 @@ units. The operation will succeed with no values."
(lambda ()
(and (< expiry (get-internal-real-time))
values))
- (lambda (flag sched resume wrap-fn)
+ (lambda (flag sched resume)
(define (timer)
(match (atomic-box-compare-and-swap! flag 'W 'S)
- ('W (resume (or wrap-fn values)))
+ ('W (resume values))
('C (timer))
('S #f)))
(add-timer sched expiry timer))))