diff options
| author | Andy Wingo <wingo@pobox.com> | 2017-02-11 11:32:44 +0100 |
|---|---|---|
| committer | Andy Wingo <wingo@pobox.com> | 2017-02-11 12:17:32 +0100 |
| commit | 1e2bd7c6d67947a277c92eed9ba26a4d75f06cb3 (patch) | |
| tree | 4d57f6681d90fd7068f2cda1d0aae106c43fcff8 | |
| parent | Fix channel CAS logic (diff) | |
| download | guile-fibers-1e2bd7c6d67947a277c92eed9ba26a4d75f06cb3.tar.gz | |
Add condition variable implementation
* fibers/conditions.scm:
* tests/conditions.scm: New files.
* Makefile.am: Add new files.
* fibers.texi (Conditions): New section.
* fibers/timers.scm (sleep-operation): Rename from wait-operation.
* tests/foreign.scm: Adapt to sleep-operation change.
| -rw-r--r-- | Makefile.am | 2 | ||||
| -rw-r--r-- | fibers.texi | 38 | ||||
| -rw-r--r-- | fibers/conditions.scm | 104 | ||||
| -rw-r--r-- | fibers/timers.scm | 6 | ||||
| -rw-r--r-- | tests/conditions.scm | 81 | ||||
| -rw-r--r-- | tests/foreign.scm | 2 |
6 files changed, 227 insertions, 6 deletions
diff --git a/Makefile.am b/Makefile.am index bc0616b..fe8eb67 100644 --- a/Makefile.am +++ b/Makefile.am @@ -27,6 +27,7 @@ info_TEXINFOS=fibers.texi SOURCES = \ fibers.scm \ fibers/channels.scm \ + fibers/conditions.scm \ fibers/config.scm \ fibers/deque.scm \ fibers/epoll.scm \ @@ -57,6 +58,7 @@ CLEANFILES += fibers/config.scm TESTS = \ tests/basic.scm \ + tests/conditions.scm \ tests/channels.scm \ tests/foreign.scm \ tests/parameters.scm \ diff --git a/fibers.texi b/fibers.texi index 16cce2c..c6833c1 100644 --- a/fibers.texi +++ b/fibers.texi @@ -433,6 +433,7 @@ of operations for channels and timers, and an internals interface. * Operations:: Composable abstractions for concurrency. * Channels:: Share memory by communicating. * Timers:: Operations on time. +* Conditions:: Waiting for simple state changes. * REPL Commands:: Experimenting with Fibers at the console. * Internals:: Scheduler and fiber objects and operations. @end menu @@ -641,14 +642,14 @@ between fibers on different kernel threads. @node Timers @section Timers -Timers are a kind of operation that, you guessed it, let you wait +Timers are a kind of operation that, you guessed it, let you sleep until a certain time. @example (use-modules (fibers timers)) @end example -@defun wait-operation seconds +@defun sleep-operation seconds Make an operation that will succeed with no values when @var{seconds} have elapsed. @end defun @@ -664,6 +665,39 @@ Block the calling fiber or kernel thread until @var{seconds} have elapsed. @end defun +@node Conditions +@section Conditions + +Condition variables are a simple one-bit form of concurrent +communication. A condition variable has two states: it starts in the +@dfn{unsignalled} state and later may transition to the +@dfn{signalled} state. When a condition becomes signalled, any +associated waiting operations complete. + +@example +(use-modules (fibers contitions)) +@end example + +@defun make-condition +Make a new condition variable. +@end defun + +@defun condition? obj +Return @code{#t} if @var{obj} is a condition variable, or @code{#f} +otherwise. +@end defun + +@defun wait-operation cvar +Make an operation that will succeed with no values when @var{cvar} +becomes signalled. +@end defun + +@defun wait cvar +Block the calling fiber or kernel thread until @var{cvar} becomes +signalled. Equivalent to @code{(perform-operation (wait-operation +cvar))}. +@end defun + @node REPL Commands @section REPL Commands diff --git a/fibers/conditions.scm b/fibers/conditions.scm new file mode 100644 index 0000000..5501135 --- /dev/null +++ b/fibers/conditions.scm @@ -0,0 +1,104 @@ +;; Conditions + +;;;; Copyright (C) 2017 Andy Wingo <wingo@pobox.com> +;;;; +;;;; This library is free software; you can redistribute it and/or +;;;; modify it under the terms of the GNU Lesser General Public +;;;; License as published by the Free Software Foundation; either +;;;; version 3 of the License, or (at your option) any later version. +;;;; +;;;; This library is distributed in the hope that it will be useful, +;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;;; Lesser General Public License for more details. +;;;; +;;;; You should have received a copy of the GNU Lesser General Public +;;;; License along with this library; if not, write to the Free Software +;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +;;; Condition variable (cvar) implementation following the 2009 ICFP +;;; paper "Parallel Concurrent ML" by John Reppy, Claudio V. Russo, +;;; and Yingqui Xiao. See channels.scm for additional commentary. +;;; +;;; Besides the general ways in which this implementation differs from +;;; the paper, this channel implementation avoids locks entirely. +;;; Still, we should disable interrupts while any operation is in a +;;; "claimed" state to avoid excess latency due to pre-emption. It +;;; would be great if we could verify our protocol though; the +;;; parallel channel operations are still gnarly. + +(define-module (fibers conditions) + #:use-module (srfi srfi-9) + #:use-module (ice-9 atomic) + #:use-module (ice-9 match) + #:use-module (fibers stack) + #:use-module (fibers operations) + #:export (make-condition + condition? + signal-condition! + wait-operation + wait)) + +(define-record-type <condition> + (%make-condition signalled? waiters) + condition? + ;; atomic box of bool + (signalled? condition-signalled?) + ;; stack of flag+resume pairs + (waiters channel-waiters)) + +(define (make-condition) + "Make a fresh condition variable." + (%make-condition (make-atomic-box #f) (make-empty-stack))) + +(define (resume-waiters! waiters) + (define (resume-one flag resume) + (match (atomic-box-compare-and-swap! flag 'W 'S) + ('W (resume values)) + ('C (resume-one flag resume)) + ('S #f))) + ;; Non-tail-recursion to resume waiters in the order they were added + ;; to the waiters stack. + (let lp ((waiters (stack-pop-all! waiters))) + (match waiters + (() #f) + (((flag . resume) . waiters) + (lp waiters) + (resume-one flag resume))))) + +(define (signal-condition! cvar) + "Mark @var{cvar} as having been signalled. Resume any fiber or +thread waiting for @var{cvar}. If @var{cvar} is already signalled, +calling @code{signal-condition!} does nothing and returns @code{#f}; +returns @code{#t} otherwise." + (match cvar + (($ <condition> signalled? waiters) + (match (atomic-box-compare-and-swap! signalled? #f #t) + (#f ;; We signalled the cvar. + (resume-waiters! waiters) + #t) + (#t ;; Cvar already signalled. + #f))))) + +(define (wait-operation cvar) + "Make an operation that will complete when @var{cvar} is signalled." + (match cvar + (($ <condition> signalled? waiters) + (define (try-fn) (and (atomic-box-ref signalled?) values)) + (define (block-fn flag sched resume) + ;; We have suspended the current fiber or thread; arrange for + ;; signal-condition! to call resume-get by adding the flag and + ;; resume callback to the cvar's waiters stack. + (stack-push! waiters (cons flag resume)) + ;; It could be that the cvar was actually signalled in between + ;; the calls to try-fn and block-fn. In that case it could be + ;; that resume-waiters! was called before our push above. In + ;; that case, call resume-waiters! to resolve the race. + (when (atomic-box-ref signalled?) + (resume-waiters! waiters)) + (values)) + (make-base-operation #f try-fn block-fn)))) + +(define (wait cvar) + "Wait until @var{cvar} has been signalled." + (perform-operation (wait-operation cvar))) diff --git a/fibers/timers.scm b/fibers/timers.scm index 1ccd088..505939f 100644 --- a/fibers/timers.scm +++ b/fibers/timers.scm @@ -23,7 +23,7 @@ #:use-module (ice-9 atomic) #:use-module (ice-9 match) #:use-module (ice-9 threads) - #:export (wait-operation + #:export (sleep-operation timer-operation) #:replace (sleep)) @@ -63,7 +63,7 @@ units. The operation will succeed with no values." (perform-operation (timer-operation expiry)) (timer))))))) -(define (wait-operation seconds) +(define (sleep-operation seconds) "Make an operation that will succeed with no values when @var{seconds} have elapsed." (timer-operation @@ -73,4 +73,4 @@ units. The operation will succeed with no values." (define (sleep seconds) "Block the calling fiber until @var{seconds} have elapsed." - (perform-operation (wait-operation seconds))) + (perform-operation (sleep-operation seconds))) diff --git a/tests/conditions.scm b/tests/conditions.scm new file mode 100644 index 0000000..505c42a --- /dev/null +++ b/tests/conditions.scm @@ -0,0 +1,81 @@ +;; Fibers: cooperative, event-driven user-space threads. + +;;;; Copyright (C) 2016 Free Software Foundation, Inc. +;;;; +;;;; This library is free software; you can redistribute it and/or +;;;; modify it under the terms of the GNU Lesser General Public +;;;; License as published by the Free Software Foundation; either +;;;; version 3 of the License, or (at your option) any later version. +;;;; +;;;; This library is distributed in the hope that it will be useful, +;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;;; Lesser General Public License for more details. +;;;; +;;;; You should have received a copy of the GNU Lesser General Public +;;;; License along with this library; if not, write to the Free Software +;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +;;;; + +(define-module (tests conditions) + #:use-module (fibers) + #:use-module (fibers conditions) + #:use-module (fibers operations) + #:use-module (fibers timers)) + +(define failed? #f) + +(define-syntax-rule (assert-equal expected actual) + (let ((x expected)) + (format #t "assert ~s equal to ~s: " 'actual x) + (force-output) + (let ((y actual)) + (cond + ((equal? x y) (format #t "ok\n")) + (else + (format #t "no (got ~s)\n" y) + (set! failed? #t)))))) + +(define-syntax-rule (assert-run-fibers-terminates exp) + (begin + (format #t "assert run-fibers on ~s terminates: " 'exp) + (force-output) + (let ((start (get-internal-real-time))) + (call-with-values (lambda () (run-fibers (lambda () exp))) + (lambda vals + (format #t "ok (~a s)\n" (/ (- (get-internal-real-time) start) + 1.0 internal-time-units-per-second)) + (apply values vals)))))) + +(define-syntax-rule (assert-run-fibers-returns (expected ...) exp) + (begin + (call-with-values (lambda () (assert-run-fibers-terminates exp)) + (lambda run-fiber-return-vals + (assert-equal '(expected ...) run-fiber-return-vals))))) + +(define* (with-timeout op #:key (seconds 0.05) (wrap values)) + (choice-operation op + (wrap-operation (sleep-operation seconds) wrap))) + +(define (wait/timeout cv) + (perform-operation + (with-timeout + (wrap-operation (wait-operation cv) + (lambda () #t)) + #:wrap (lambda () #f)))) + +(define cv (make-condition)) +(assert-equal #t (condition? cv)) +(assert-run-fibers-returns (#f) (wait/timeout cv)) +(assert-run-fibers-returns (#f) (wait/timeout cv)) +(assert-equal #t (signal-condition! cv)) +(assert-equal #f (signal-condition! cv)) +(assert-run-fibers-returns (#t) (wait/timeout cv)) +(assert-run-fibers-returns (#t) (wait/timeout cv)) +(assert-run-fibers-returns (#t) + (let ((cv (make-condition))) + (spawn-fiber (lambda () (signal-condition! cv))) + (wait cv) + #t)) + +(exit (if failed? 1 0)) diff --git a/tests/foreign.scm b/tests/foreign.scm index d3470bc..0d81cb5 100644 --- a/tests/foreign.scm +++ b/tests/foreign.scm @@ -64,7 +64,7 @@ (assert-equal #f #f) (assert-terminates #t) (assert-terminates (sleep 1)) -(assert-terminates (perform-operation (wait-operation 1))) +(assert-terminates (perform-operation (sleep-operation 1))) (assert-equal 42 (receive-from-fiber 42)) (assert-equal 42 (send-to-fiber 42)) |
