From 0fa1fd6adf9980229a46956503a6bf36e8154a78 Mon Sep 17 00:00:00 2001 From: Christopher Allan Webber Date: Thu, 10 Aug 2017 14:06:51 -0500 Subject: Garbage collect synchronized items from channels put/get queues. * fibers/conditions.scm (make-counter, %steps-till-gc, counter-decrement!) (counter-reset!): Moved to new module, counter.scm. * fibers/counter.scm: New file. Rename `%steps-till-gc' to `%countdown-steps'. * Makefile.am: Add counter.scm. * fibers/channels.scm (, make-channel): Add new slots `getq-gc-counter' and `putq-gc-counter'. (put-operation, get-operation): Garbage collect synchronized items from queues. * fibers/deque.scm (dequeue-filter, dequeue-filter!): New procedures. --- Makefile.am | 1 + fibers/channels.scm | 30 +++++++++++++++++++++++++----- fibers/conditions.scm | 30 +----------------------------- fibers/counter.scm | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ fibers/deque.scm | 16 +++++++++++++++- 5 files changed, 91 insertions(+), 35 deletions(-) create mode 100644 fibers/counter.scm diff --git a/Makefile.am b/Makefile.am index 676a1dc..e2db57e 100644 --- a/Makefile.am +++ b/Makefile.am @@ -29,6 +29,7 @@ SOURCES = \ fibers/channels.scm \ fibers/conditions.scm \ fibers/config.scm \ + fibers/counter.scm \ fibers/deque.scm \ fibers/epoll.scm \ fibers/interrupts.scm \ diff --git a/fibers/channels.scm b/fibers/channels.scm index ce6f348..3638ebf 100644 --- a/fibers/channels.scm +++ b/fibers/channels.scm @@ -1,6 +1,7 @@ ;; Channels ;;;; Copyright (C) 2016 Andy Wingo +;;;; Copyright (C) 2017 Christopher Allan Webber ;;;; ;;;; This library is free software; you can redistribute it and/or ;;;; modify it under the terms of the GNU Lesser General Public @@ -31,6 +32,7 @@ #:use-module (srfi srfi-9 gnu) #:use-module (ice-9 atomic) #:use-module (ice-9 match) + #:use-module (fibers counter) #:use-module (fibers deque) #:use-module (fibers operations) #:export (make-channel @@ -41,23 +43,27 @@ get-message)) (define-record-type - (%make-channel getq putq) + (%make-channel getq getq-gc-counter putq putq-gc-counter) channel? ;; atomic box of deque (getq channel-getq) + (getq-gc-counter channel-getq-gc-counter) ;; atomic box of deque - (putq channel-putq)) + (putq channel-putq) + (putq-gc-counter channel-putq-gc-counter)) (define (make-channel) "Make a fresh channel." (%make-channel (make-atomic-box (make-empty-deque)) - (make-atomic-box (make-empty-deque)))) + (make-counter) + (make-atomic-box (make-empty-deque)) + (make-counter))) (define (put-operation channel message) "Make an operation that if and when it completes will rendezvous with a receiver fiber to send @var{message} over @var{channel}." (match channel - (($ getq-box putq-box) + (($ getq-box getq-gc-counter putq-box putq-gc-counter) (define (try-fn) ;; Try to find and perform a pending get operation. If that ;; works, return a result thunk, or otherwise #f. @@ -105,6 +111,13 @@ with a receiver fiber to send @var{message} over @var{channel}." (not (eq? put-flag get-flag))))) ;; First, publish this put operation. (enqueue! putq-box (vector put-flag resume-put message)) + ;; Next, possibly clear off any garbage from queue. + (when (= (counter-decrement! putq-gc-counter) 0) + (dequeue-filter! putq-box + (match-lambda + (#(flag resume) + (not (eq? (atomic-box-ref flag) 'S))))) + (counter-reset! putq-gc-counter)) ;; In the try phase, we scanned the getq for a get operation, ;; but we were unable to perform any of them. Since then, ;; there might be a new get operation on the queue. However @@ -174,7 +187,7 @@ with a receiver fiber to send @var{message} over @var{channel}." "Make an operation that if and when it completes will rendezvous with a sender fiber to receive one value from @var{channel}." (match channel - (($ getq-box putq-box) + (($ getq-box getq-gc-counter putq-box putq-gc-counter) (define (try-fn) ;; Try to find and perform a pending put operation. If that ;; works, return a result thunk, or otherwise #f. @@ -220,6 +233,13 @@ with a sender fiber to receive one value from @var{channel}." (not (eq? get-flag put-flag))))) ;; First, publish this get operation. (enqueue! getq-box (vector get-flag resume-get)) + ;; Next, possibly clear off any garbage from queue. + (when (= (counter-decrement! getq-gc-counter) 0) + (dequeue-filter! getq-box + (match-lambda + (#(flag resume) + (not (eq? (atomic-box-ref flag) 'S))))) + (counter-reset! getq-gc-counter)) ;; In the try phase, we scanned the putq for a live put ;; operation, but we were unable to synchronize. Since then, ;; there might be a new operation on the putq. However only diff --git a/fibers/conditions.scm b/fibers/conditions.scm index 0183107..cae5086 100644 --- a/fibers/conditions.scm +++ b/fibers/conditions.scm @@ -33,6 +33,7 @@ #:use-module (ice-9 atomic) #:use-module (ice-9 match) #:use-module (fibers stack) + #:use-module (fibers counter) #:use-module (fibers operations) #:export (make-condition condition? @@ -40,35 +41,6 @@ wait-operation wait)) - -;;; Counter utilities -;;; -;;; Counters here are an atomic box containing an integer which are -;;; either decremented or reset. - -;; How many times we run the block-fn until we gc -(define %steps-till-gc 42) ; haven't tried testing for the most efficient number - -(define (make-counter) - (make-atomic-box %steps-till-gc)) - -(define (counter-decrement! counter) - "Decrement integer in atomic box COUNTER." - (let spin ((x (atomic-box-ref counter))) - (let* ((x-new (1- x)) - (x* (atomic-box-compare-and-swap! counter x x-new))) - (if (= x* x) ; successful decrement - x-new - (spin x*))))) - -(define (counter-reset! counter) - "Reset a counter's contents." - (atomic-box-set! counter %steps-till-gc)) - - -;;; Conditions - - (define-record-type (%make-condition signalled? waiters gc-step) condition? diff --git a/fibers/counter.scm b/fibers/counter.scm new file mode 100644 index 0000000..569d74c --- /dev/null +++ b/fibers/counter.scm @@ -0,0 +1,49 @@ +;; Counters + +;;;; Copyright (C) 2017 Christopher Allan Webber +;;;; +;;;; This library is free software; you can redistribute it and/or +;;;; modify it under the terms of the GNU Lesser General Public +;;;; License as published by the Free Software Foundation; either +;;;; version 3 of the License, or (at your option) any later version. +;;;; +;;;; This library is distributed in the hope that it will be useful, +;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;;; Lesser General Public License for more details. +;;;; +;;;; You should have received a copy of the GNU Lesser General Public +;;;; License along with this library; if not, write to the Free Software +;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +;;; General atomic counters; currently used for garbage collection. + +(define-module (fibers counter) + #:use-module (ice-9 atomic) + #:export (make-counter + counter-decrement! + counter-reset!)) + +;;; Counter utilities +;;; +;;; Counters here are an atomic box containing an integer which are +;;; either decremented or reset. + +;; How many times we run the block-fn until we gc +(define %countdown-steps 42) ; haven't tried testing for the most efficient number + +(define* (make-counter) + (make-atomic-box %countdown-steps)) + +(define (counter-decrement! counter) + "Decrement integer in atomic box COUNTER." + (let spin ((x (atomic-box-ref counter))) + (let* ((x-new (1- x)) + (x* (atomic-box-compare-and-swap! counter x x-new))) + (if (= x* x) ; successful decrement + x-new + (spin x*))))) + +(define (counter-reset! counter) + "Reset a counter's contents." + (atomic-box-set! counter %countdown-steps)) diff --git a/fibers/deque.scm b/fibers/deque.scm index 6a41583..d427729 100644 --- a/fibers/deque.scm +++ b/fibers/deque.scm @@ -1,6 +1,7 @@ ;; Double-ended queue ;;;; Copyright (C) 2016 Andy Wingo +;;;; Copyright (C) 2017 Christopher Allan Webber ;;;; ;;;; This library is free software; you can redistribute it and/or ;;;; modify it under the terms of the GNU Lesser General Public @@ -27,10 +28,12 @@ dequeue dequeue-all dequeue-match + dequeue-filter undequeue dequeue! dequeue-all! - enqueue!)) + enqueue! + dequeue-filter!)) ;; A functional double-ended queue ("deque") has a head and a tail, ;; which are both lists. The head is in FIFO order and the tail is in @@ -82,6 +85,12 @@ ((head . tail) (make-deque (cons item head) tail)))) +(define (dequeue-filter dq pred) + (match dq + ((head . tail) + (cons (filter pred head) + (filter pred tail))))) + (define-inlinable (update! box f) (let spin ((x (atomic-box-ref box))) (call-with-values (lambda () (f x)) @@ -110,3 +119,8 @@ (update! dqbox (lambda (dq) (values (enqueue dq item) #f)))) + +(define (dequeue-filter! dqbox pred) + (update! dqbox (lambda (dq) + (values (dequeue-filter dq pred) + #f)))) -- cgit v1.2.3