diff options
Diffstat (limited to 'fibers/channels.scm')
| -rw-r--r-- | fibers/channels.scm | 30 |
1 files changed, 25 insertions, 5 deletions
diff --git a/fibers/channels.scm b/fibers/channels.scm index ce6f348..3638ebf 100644 --- a/fibers/channels.scm +++ b/fibers/channels.scm @@ -1,6 +1,7 @@ ;; Channels ;;;; Copyright (C) 2016 Andy Wingo <wingo@pobox.com> +;;;; Copyright (C) 2017 Christopher Allan Webber <cwebber@dustycloud.org> ;;;; ;;;; This library is free software; you can redistribute it and/or ;;;; modify it under the terms of the GNU Lesser General Public @@ -31,6 +32,7 @@ #:use-module (srfi srfi-9 gnu) #:use-module (ice-9 atomic) #:use-module (ice-9 match) + #:use-module (fibers counter) #:use-module (fibers deque) #:use-module (fibers operations) #:export (make-channel @@ -41,23 +43,27 @@ get-message)) (define-record-type <channel> - (%make-channel getq putq) + (%make-channel getq getq-gc-counter putq putq-gc-counter) channel? ;; atomic box of deque (getq channel-getq) + (getq-gc-counter channel-getq-gc-counter) ;; atomic box of deque - (putq channel-putq)) + (putq channel-putq) + (putq-gc-counter channel-putq-gc-counter)) (define (make-channel) "Make a fresh channel." (%make-channel (make-atomic-box (make-empty-deque)) - (make-atomic-box (make-empty-deque)))) + (make-counter) + (make-atomic-box (make-empty-deque)) + (make-counter))) (define (put-operation channel message) "Make an operation that if and when it completes will rendezvous with a receiver fiber to send @var{message} over @var{channel}." (match channel - (($ <channel> getq-box putq-box) + (($ <channel> getq-box getq-gc-counter putq-box putq-gc-counter) (define (try-fn) ;; Try to find and perform a pending get operation. If that ;; works, return a result thunk, or otherwise #f. @@ -105,6 +111,13 @@ with a receiver fiber to send @var{message} over @var{channel}." (not (eq? put-flag get-flag))))) ;; First, publish this put operation. (enqueue! putq-box (vector put-flag resume-put message)) + ;; Next, possibly clear off any garbage from queue. + (when (= (counter-decrement! putq-gc-counter) 0) + (dequeue-filter! putq-box + (match-lambda + (#(flag resume) + (not (eq? (atomic-box-ref flag) 'S))))) + (counter-reset! putq-gc-counter)) ;; In the try phase, we scanned the getq for a get operation, ;; but we were unable to perform any of them. Since then, ;; there might be a new get operation on the queue. However @@ -174,7 +187,7 @@ with a receiver fiber to send @var{message} over @var{channel}." "Make an operation that if and when it completes will rendezvous with a sender fiber to receive one value from @var{channel}." (match channel - (($ <channel> getq-box putq-box) + (($ <channel> getq-box getq-gc-counter putq-box putq-gc-counter) (define (try-fn) ;; Try to find and perform a pending put operation. If that ;; works, return a result thunk, or otherwise #f. @@ -220,6 +233,13 @@ with a sender fiber to receive one value from @var{channel}." (not (eq? get-flag put-flag))))) ;; First, publish this get operation. (enqueue! getq-box (vector get-flag resume-get)) + ;; Next, possibly clear off any garbage from queue. + (when (= (counter-decrement! getq-gc-counter) 0) + (dequeue-filter! getq-box + (match-lambda + (#(flag resume) + (not (eq? (atomic-box-ref flag) 'S))))) + (counter-reset! getq-gc-counter)) ;; In the try phase, we scanned the putq for a live put ;; operation, but we were unable to synchronize. Since then, ;; there might be a new operation on the putq. However only |
