summaryrefslogtreecommitdiff
path: root/fibers/channels.scm
diff options
context:
space:
mode:
Diffstat (limited to 'fibers/channels.scm')
-rw-r--r--fibers/channels.scm30
1 files changed, 25 insertions, 5 deletions
diff --git a/fibers/channels.scm b/fibers/channels.scm
index ce6f348..3638ebf 100644
--- a/fibers/channels.scm
+++ b/fibers/channels.scm
@@ -1,6 +1,7 @@
;; Channels
;;;; Copyright (C) 2016 Andy Wingo <wingo@pobox.com>
+;;;; Copyright (C) 2017 Christopher Allan Webber <cwebber@dustycloud.org>
;;;;
;;;; This library is free software; you can redistribute it and/or
;;;; modify it under the terms of the GNU Lesser General Public
@@ -31,6 +32,7 @@
#:use-module (srfi srfi-9 gnu)
#:use-module (ice-9 atomic)
#:use-module (ice-9 match)
+ #:use-module (fibers counter)
#:use-module (fibers deque)
#:use-module (fibers operations)
#:export (make-channel
@@ -41,23 +43,27 @@
get-message))
(define-record-type <channel>
- (%make-channel getq putq)
+ (%make-channel getq getq-gc-counter putq putq-gc-counter)
channel?
;; atomic box of deque
(getq channel-getq)
+ (getq-gc-counter channel-getq-gc-counter)
;; atomic box of deque
- (putq channel-putq))
+ (putq channel-putq)
+ (putq-gc-counter channel-putq-gc-counter))
(define (make-channel)
"Make a fresh channel."
(%make-channel (make-atomic-box (make-empty-deque))
- (make-atomic-box (make-empty-deque))))
+ (make-counter)
+ (make-atomic-box (make-empty-deque))
+ (make-counter)))
(define (put-operation channel message)
"Make an operation that if and when it completes will rendezvous
with a receiver fiber to send @var{message} over @var{channel}."
(match channel
- (($ <channel> getq-box putq-box)
+ (($ <channel> getq-box getq-gc-counter putq-box putq-gc-counter)
(define (try-fn)
;; Try to find and perform a pending get operation. If that
;; works, return a result thunk, or otherwise #f.
@@ -105,6 +111,13 @@ with a receiver fiber to send @var{message} over @var{channel}."
(not (eq? put-flag get-flag)))))
;; First, publish this put operation.
(enqueue! putq-box (vector put-flag resume-put message))
+ ;; Next, possibly clear off any garbage from queue.
+ (when (= (counter-decrement! putq-gc-counter) 0)
+ (dequeue-filter! putq-box
+ (match-lambda
+ (#(flag resume)
+ (not (eq? (atomic-box-ref flag) 'S)))))
+ (counter-reset! putq-gc-counter))
;; In the try phase, we scanned the getq for a get operation,
;; but we were unable to perform any of them. Since then,
;; there might be a new get operation on the queue. However
@@ -174,7 +187,7 @@ with a receiver fiber to send @var{message} over @var{channel}."
"Make an operation that if and when it completes will rendezvous
with a sender fiber to receive one value from @var{channel}."
(match channel
- (($ <channel> getq-box putq-box)
+ (($ <channel> getq-box getq-gc-counter putq-box putq-gc-counter)
(define (try-fn)
;; Try to find and perform a pending put operation. If that
;; works, return a result thunk, or otherwise #f.
@@ -220,6 +233,13 @@ with a sender fiber to receive one value from @var{channel}."
(not (eq? get-flag put-flag)))))
;; First, publish this get operation.
(enqueue! getq-box (vector get-flag resume-get))
+ ;; Next, possibly clear off any garbage from queue.
+ (when (= (counter-decrement! getq-gc-counter) 0)
+ (dequeue-filter! getq-box
+ (match-lambda
+ (#(flag resume)
+ (not (eq? (atomic-box-ref flag) 'S)))))
+ (counter-reset! getq-gc-counter))
;; In the try phase, we scanned the putq for a live put
;; operation, but we were unable to synchronize. Since then,
;; there might be a new operation on the putq. However only