summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Wingo <wingo@pobox.com>2017-01-04 21:11:55 +0100
committerAndy Wingo <wingo@pobox.com>2017-01-04 21:11:55 +0100
commit0c3fa5a7e76ea5000a7776c526a3707161e335ce (patch)
treea0d71b84f5ddd2682c46d6f166d5cd8fac3faee0
parentFix ,spawn-fiber (diff)
downloadguile-fibers-0c3fa5a7e76ea5000a7776c526a3707161e335ce.tar.gz
Prepare for support for blocking operations
* fibers/operations.scm (perform-operation): Block function takes sched and resume function, not fiber. Will allow blocking operations in the future. * fibers/internal.scm (add-timer): Rename from resume-on-timer; just call a thunk after a time has passed. * fibers/timers.scm (timer-operation): Adapt to resume-on-timer and operation changes. * fibers/channels.scm (put-operation, get-operation): Adapt to operation changes.
-rw-r--r--fibers/channels.scm48
-rw-r--r--fibers/internal.scm24
-rw-r--r--fibers/operations.scm38
-rw-r--r--fibers/timers.scm10
4 files changed, 58 insertions, 62 deletions
diff --git a/fibers/channels.scm b/fibers/channels.scm
index c38ac31..7676c37 100644
--- a/fibers/channels.scm
+++ b/fibers/channels.scm
@@ -73,7 +73,7 @@ with a receiver fiber to send @var{message} over @var{channel}."
;; Return #f if the getq was empty.
(and getq*
(match item
- (#(get-flag get-fiber get-wrap-fn)
+ (#(get-flag resume-get get-wrap-fn)
(let spin ()
(match (atomic-box-compare-and-swap! get-flag 'W 'S)
('W
@@ -85,10 +85,10 @@ with a receiver fiber to send @var{message} over @var{channel}."
;; performs any other operation on this
;; channel.
(maybe-commit)
- (resume-fiber get-fiber (if get-wrap-fn
- (lambda ()
- (get-wrap-fn message))
- (lambda () message)))
+ (resume-get (if get-wrap-fn
+ (lambda ()
+ (get-wrap-fn message))
+ (lambda () message)))
;; Continue directly.
(lambda () (values)))
;; Get operation temporarily busy; try again.
@@ -99,16 +99,16 @@ with a receiver fiber to send @var{message} over @var{channel}."
;; try again next time if no other fiber
;; handled it already.
('S (try (maybe-commit))))))))))))
- (define (block-fn put-flag put-fiber put-wrap-fn)
+ (define (block-fn put-flag put-sched resume-put put-wrap-fn)
;; We have suspended the current fiber; arrange for the fiber
;; to be resumed by a get operation by adding it to the channel's
;; putq.
(define (not-me? item)
(match item
- (#(get-flag get-fiber get-wrap-fn)
+ (#(get-flag resume-get get-wrap-fn)
(not (eq? put-flag get-flag)))))
;; First, publish this put operation.
- (enqueue! putq-box (vector put-flag put-fiber put-wrap-fn message))
+ (enqueue! putq-box (vector put-flag resume-put put-wrap-fn message))
;; In the try phase, we scanned the getq for a get operation,
;; but we were unable to perform any of them. Since then,
;; there might be a new get operation on the queue. However
@@ -131,7 +131,7 @@ with a receiver fiber to send @var{message} over @var{channel}."
;; We only have to service the getq if it is non-empty.
(when getq*
(match item
- (#(get-flag get-fiber get-wrap-fn)
+ (#(get-flag resume-get get-wrap-fn)
(match (atomic-box-ref get-flag)
('S
;; This get operation has already synchronized;
@@ -151,11 +151,10 @@ with a receiver fiber to send @var{message} over @var{channel}."
;; getq, and resume both fibers.
(atomic-box-set! put-flag 'S)
(maybe-commit)
- (resume-fiber get-fiber
- (if get-wrap-fn
- (lambda () (get-wrap-fn message))
- (lambda () message)))
- (resume-fiber put-fiber (or put-wrap-fn values))
+ (resume-get (if get-wrap-fn
+ (lambda () (get-wrap-fn message))
+ (lambda () message)))
+ (resume-put (or put-wrap-fn values))
(values))
('C
;; Other fiber trying to do the same
@@ -196,7 +195,7 @@ with a sender fiber to receive one value from @var{channel}."
;; Return #f if the putq was empty.
(and putq*
(match item
- (#(put-flag put-fiber put-wrap-fn message)
+ (#(put-flag resume-put put-wrap-fn message)
(let spin ()
(match (atomic-box-compare-and-swap! put-flag 'W 'S)
('W
@@ -206,7 +205,7 @@ with a sender fiber to receive one value from @var{channel}."
;; it before synchronizing any other
;; operation on this channel.
(maybe-commit)
- (resume-fiber put-fiber (or put-wrap-fn values))
+ (resume-put (or put-wrap-fn values))
;; Continue directly.
(lambda () message))
;; Put operation temporarily busy; try again.
@@ -217,16 +216,16 @@ with a sender fiber to receive one value from @var{channel}."
;; try again next time if no other fiber
;; handled it already.
('S (try (maybe-commit))))))))))))
- (define (block-fn get-flag get-fiber get-wrap-fn)
+ (define (block-fn get-flag get-sched resume-get get-wrap-fn)
;; We have suspended the current fiber; arrange for the fiber
;; to be resumed by a put operation by adding it to the
;; channel's getq.
(define (not-me? item)
(match item
- (#(put-flag put-fiber put-wrap-fn message)
+ (#(put-flag resume-put put-wrap-fn message)
(not (eq? get-flag put-flag)))))
;; First, publish this get operation.
- (enqueue! getq-box (vector get-flag get-fiber get-wrap-fn))
+ (enqueue! getq-box (vector get-flag resume-get get-wrap-fn))
;; In the try phase, we scanned the putq for a live put
;; operation, but we were unable to synchronize. Since then,
;; there might be a new operation on the putq. However only
@@ -249,7 +248,7 @@ with a sender fiber to receive one value from @var{channel}."
;; We only have to service the putq if it is non-empty.
(when putq*
(match item
- (#(put-flag put-fiber put-wrap-fn message)
+ (#(put-flag resume-put put-wrap-fn message)
(match (atomic-box-ref put-flag)
('S
;; This put operation has already synchronized;
@@ -270,11 +269,10 @@ with a sender fiber to receive one value from @var{channel}."
;; fibers for resumption.
(atomic-box-set! get-flag 'S)
(maybe-commit)
- (resume-fiber get-fiber
- (if get-wrap-fn
- (lambda () (get-wrap-fn message))
- (lambda () message)))
- (resume-fiber put-fiber (or put-wrap-fn values))
+ (resume-get (if get-wrap-fn
+ (lambda () (get-wrap-fn message))
+ (lambda () message)))
+ (resume-put (or put-wrap-fn values))
(values))
('C
;; Other fiber trying to do the same
diff --git a/fibers/internal.scm b/fibers/internal.scm
index b283777..7f56c27 100644
--- a/fibers/internal.scm
+++ b/fibers/internal.scm
@@ -39,7 +39,7 @@
resume-on-readable-fd
resume-on-writable-fd
- resume-on-timer
+ add-timer
create-fiber
(current-fiber/public . current-fiber)
@@ -445,18 +445,10 @@ becomes readable."
becomes writable."
(resume-on-fd-events fd EPOLLOUT fiber))
-(define (resume-on-timer fiber expiry get-thunk)
- "Arrange to resume @var{fiber} when the absolute real time is
-greater than or equal to @var{expiry}, expressed in internal time
-units. The fiber will be resumed with the result of calling
-@var{get-thunk}. If @var{get-thunk} returns @code{#f}, that indicates
-that some other operation performed this operation first, and so no
-resume is performed."
- (let ((sched (fiber-scheduler fiber)))
- (define (maybe-resume)
- (let ((thunk (get-thunk)))
- (when thunk (resume-fiber fiber thunk))))
- (set-scheduler-timers! sched
- (psq-set (scheduler-timers sched)
- (cons expiry maybe-resume)
- expiry))))
+(define (add-timer sched expiry thunk)
+ "Arrange to call @var{thunk} when the absolute real time is greater
+than or equal to @var{expiry}, expressed in internal time units."
+ (set-scheduler-timers! sched
+ (psq-set (scheduler-timers sched)
+ (cons expiry thunk)
+ expiry)))
diff --git a/fibers/operations.scm b/fibers/operations.scm
index e1a91cf..81ccf1b 100644
--- a/fibers/operations.scm
+++ b/fibers/operations.scm
@@ -120,26 +120,32 @@ succeeds, will succeed with one and only one of the sub-operations
(define (perform-operation op)
"Perform the operation @var{op} and return the resulting values. If
the operation cannot complete directly, block until it can complete."
- (define (block)
- (suspend-current-fiber
- (lambda (fiber)
- (let ((flag (make-op-state)))
- (match op
- (($ <base-op> wrap-fn try-fn block-fn)
- (block-fn flag fiber wrap-fn))
- (($ <choice-op> base-ops)
- (let lp ((i 0))
- (when (< i (vector-length base-ops))
- (match (vector-ref base-ops i)
- (($ <base-op> wrap-fn try-fn block-fn)
- (block-fn flag fiber wrap-fn)))
- (lp (1+ i))))))))))
+ (define (block sched resume)
+ (let ((flag (make-op-state)))
+ (match op
+ (($ <base-op> wrap-fn try-fn block-fn)
+ (block-fn flag sched resume wrap-fn))
+ (($ <choice-op> base-ops)
+ (let lp ((i 0))
+ (when (< i (vector-length base-ops))
+ (match (vector-ref base-ops i)
+ (($ <base-op> wrap-fn try-fn block-fn)
+ (block-fn flag sched resume wrap-fn)))
+ (lp (1+ i))))))))
+
+ (define (suspend)
+ (if (current-fiber)
+ (suspend-current-fiber
+ (lambda (fiber)
+ (define (resume thunk) (resume-fiber fiber thunk))
+ (block (fiber-scheduler fiber) resume)))
+ (error "unimplemented")))
;; First, try to sync on an op. If no op syncs, block.
(match op
(($ <base-op> wrap-fn try-fn)
(match (try-fn)
- (#f (block))
+ (#f (suspend))
(thunk
(if wrap-fn
(call-with-values thunk wrap-fn)
@@ -157,4 +163,4 @@ the operation cannot complete directly, block until it can complete."
(if wrap-fn
(call-with-values thunk wrap-fn)
(thunk))))))
- (block)))))))
+ (suspend)))))))
diff --git a/fibers/timers.scm b/fibers/timers.scm
index 9c743ae..d2468da 100644
--- a/fibers/timers.scm
+++ b/fibers/timers.scm
@@ -34,13 +34,13 @@ units. The operation will succeed with no values."
(lambda ()
(and (< expiry (get-internal-real-time))
values))
- (lambda (flag fiber wrap-fn)
- (define (get-resume-thunk)
+ (lambda (flag sched resume wrap-fn)
+ (define (timer)
(match (atomic-box-compare-and-swap! flag 'W 'S)
- ('W (or wrap-fn values))
- ('C (get-resume-thunk))
+ ('W (resume (or wrap-fn values)))
+ ('C (timer))
('S #f)))
- (resume-on-timer fiber expiry get-resume-thunk))))
+ (add-timer sched expiry timer))))
(define (wait-operation seconds)
"Make an operation that will succeed with no values when