summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Wingo <wingo@pobox.com>2017-02-11 11:32:44 +0100
committerAndy Wingo <wingo@pobox.com>2017-02-11 12:17:32 +0100
commit1e2bd7c6d67947a277c92eed9ba26a4d75f06cb3 (patch)
tree4d57f6681d90fd7068f2cda1d0aae106c43fcff8
parentFix channel CAS logic (diff)
downloadguile-fibers-1e2bd7c6d67947a277c92eed9ba26a4d75f06cb3.tar.gz
Add condition variable implementation
* fibers/conditions.scm: * tests/conditions.scm: New files. * Makefile.am: Add new files. * fibers.texi (Conditions): New section. * fibers/timers.scm (sleep-operation): Rename from wait-operation. * tests/foreign.scm: Adapt to sleep-operation change.
-rw-r--r--Makefile.am2
-rw-r--r--fibers.texi38
-rw-r--r--fibers/conditions.scm104
-rw-r--r--fibers/timers.scm6
-rw-r--r--tests/conditions.scm81
-rw-r--r--tests/foreign.scm2
6 files changed, 227 insertions, 6 deletions
diff --git a/Makefile.am b/Makefile.am
index bc0616b..fe8eb67 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -27,6 +27,7 @@ info_TEXINFOS=fibers.texi
SOURCES = \
fibers.scm \
fibers/channels.scm \
+ fibers/conditions.scm \
fibers/config.scm \
fibers/deque.scm \
fibers/epoll.scm \
@@ -57,6 +58,7 @@ CLEANFILES += fibers/config.scm
TESTS = \
tests/basic.scm \
+ tests/conditions.scm \
tests/channels.scm \
tests/foreign.scm \
tests/parameters.scm \
diff --git a/fibers.texi b/fibers.texi
index 16cce2c..c6833c1 100644
--- a/fibers.texi
+++ b/fibers.texi
@@ -433,6 +433,7 @@ of operations for channels and timers, and an internals interface.
* Operations:: Composable abstractions for concurrency.
* Channels:: Share memory by communicating.
* Timers:: Operations on time.
+* Conditions:: Waiting for simple state changes.
* REPL Commands:: Experimenting with Fibers at the console.
* Internals:: Scheduler and fiber objects and operations.
@end menu
@@ -641,14 +642,14 @@ between fibers on different kernel threads.
@node Timers
@section Timers
-Timers are a kind of operation that, you guessed it, let you wait
+Timers are a kind of operation that, you guessed it, let you sleep
until a certain time.
@example
(use-modules (fibers timers))
@end example
-@defun wait-operation seconds
+@defun sleep-operation seconds
Make an operation that will succeed with no values when @var{seconds}
have elapsed.
@end defun
@@ -664,6 +665,39 @@ Block the calling fiber or kernel thread until @var{seconds} have
elapsed.
@end defun
+@node Conditions
+@section Conditions
+
+Condition variables are a simple one-bit form of concurrent
+communication. A condition variable has two states: it starts in the
+@dfn{unsignalled} state and later may transition to the
+@dfn{signalled} state. When a condition becomes signalled, any
+associated waiting operations complete.
+
+@example
+(use-modules (fibers contitions))
+@end example
+
+@defun make-condition
+Make a new condition variable.
+@end defun
+
+@defun condition? obj
+Return @code{#t} if @var{obj} is a condition variable, or @code{#f}
+otherwise.
+@end defun
+
+@defun wait-operation cvar
+Make an operation that will succeed with no values when @var{cvar}
+becomes signalled.
+@end defun
+
+@defun wait cvar
+Block the calling fiber or kernel thread until @var{cvar} becomes
+signalled. Equivalent to @code{(perform-operation (wait-operation
+cvar))}.
+@end defun
+
@node REPL Commands
@section REPL Commands
diff --git a/fibers/conditions.scm b/fibers/conditions.scm
new file mode 100644
index 0000000..5501135
--- /dev/null
+++ b/fibers/conditions.scm
@@ -0,0 +1,104 @@
+;; Conditions
+
+;;;; Copyright (C) 2017 Andy Wingo <wingo@pobox.com>
+;;;;
+;;;; This library is free software; you can redistribute it and/or
+;;;; modify it under the terms of the GNU Lesser General Public
+;;;; License as published by the Free Software Foundation; either
+;;;; version 3 of the License, or (at your option) any later version.
+;;;;
+;;;; This library is distributed in the hope that it will be useful,
+;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;;; Lesser General Public License for more details.
+;;;;
+;;;; You should have received a copy of the GNU Lesser General Public
+;;;; License along with this library; if not, write to the Free Software
+;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+;;; Condition variable (cvar) implementation following the 2009 ICFP
+;;; paper "Parallel Concurrent ML" by John Reppy, Claudio V. Russo,
+;;; and Yingqui Xiao. See channels.scm for additional commentary.
+;;;
+;;; Besides the general ways in which this implementation differs from
+;;; the paper, this channel implementation avoids locks entirely.
+;;; Still, we should disable interrupts while any operation is in a
+;;; "claimed" state to avoid excess latency due to pre-emption. It
+;;; would be great if we could verify our protocol though; the
+;;; parallel channel operations are still gnarly.
+
+(define-module (fibers conditions)
+ #:use-module (srfi srfi-9)
+ #:use-module (ice-9 atomic)
+ #:use-module (ice-9 match)
+ #:use-module (fibers stack)
+ #:use-module (fibers operations)
+ #:export (make-condition
+ condition?
+ signal-condition!
+ wait-operation
+ wait))
+
+(define-record-type <condition>
+ (%make-condition signalled? waiters)
+ condition?
+ ;; atomic box of bool
+ (signalled? condition-signalled?)
+ ;; stack of flag+resume pairs
+ (waiters channel-waiters))
+
+(define (make-condition)
+ "Make a fresh condition variable."
+ (%make-condition (make-atomic-box #f) (make-empty-stack)))
+
+(define (resume-waiters! waiters)
+ (define (resume-one flag resume)
+ (match (atomic-box-compare-and-swap! flag 'W 'S)
+ ('W (resume values))
+ ('C (resume-one flag resume))
+ ('S #f)))
+ ;; Non-tail-recursion to resume waiters in the order they were added
+ ;; to the waiters stack.
+ (let lp ((waiters (stack-pop-all! waiters)))
+ (match waiters
+ (() #f)
+ (((flag . resume) . waiters)
+ (lp waiters)
+ (resume-one flag resume)))))
+
+(define (signal-condition! cvar)
+ "Mark @var{cvar} as having been signalled. Resume any fiber or
+thread waiting for @var{cvar}. If @var{cvar} is already signalled,
+calling @code{signal-condition!} does nothing and returns @code{#f};
+returns @code{#t} otherwise."
+ (match cvar
+ (($ <condition> signalled? waiters)
+ (match (atomic-box-compare-and-swap! signalled? #f #t)
+ (#f ;; We signalled the cvar.
+ (resume-waiters! waiters)
+ #t)
+ (#t ;; Cvar already signalled.
+ #f)))))
+
+(define (wait-operation cvar)
+ "Make an operation that will complete when @var{cvar} is signalled."
+ (match cvar
+ (($ <condition> signalled? waiters)
+ (define (try-fn) (and (atomic-box-ref signalled?) values))
+ (define (block-fn flag sched resume)
+ ;; We have suspended the current fiber or thread; arrange for
+ ;; signal-condition! to call resume-get by adding the flag and
+ ;; resume callback to the cvar's waiters stack.
+ (stack-push! waiters (cons flag resume))
+ ;; It could be that the cvar was actually signalled in between
+ ;; the calls to try-fn and block-fn. In that case it could be
+ ;; that resume-waiters! was called before our push above. In
+ ;; that case, call resume-waiters! to resolve the race.
+ (when (atomic-box-ref signalled?)
+ (resume-waiters! waiters))
+ (values))
+ (make-base-operation #f try-fn block-fn))))
+
+(define (wait cvar)
+ "Wait until @var{cvar} has been signalled."
+ (perform-operation (wait-operation cvar)))
diff --git a/fibers/timers.scm b/fibers/timers.scm
index 1ccd088..505939f 100644
--- a/fibers/timers.scm
+++ b/fibers/timers.scm
@@ -23,7 +23,7 @@
#:use-module (ice-9 atomic)
#:use-module (ice-9 match)
#:use-module (ice-9 threads)
- #:export (wait-operation
+ #:export (sleep-operation
timer-operation)
#:replace (sleep))
@@ -63,7 +63,7 @@ units. The operation will succeed with no values."
(perform-operation (timer-operation expiry))
(timer)))))))
-(define (wait-operation seconds)
+(define (sleep-operation seconds)
"Make an operation that will succeed with no values when
@var{seconds} have elapsed."
(timer-operation
@@ -73,4 +73,4 @@ units. The operation will succeed with no values."
(define (sleep seconds)
"Block the calling fiber until @var{seconds} have elapsed."
- (perform-operation (wait-operation seconds)))
+ (perform-operation (sleep-operation seconds)))
diff --git a/tests/conditions.scm b/tests/conditions.scm
new file mode 100644
index 0000000..505c42a
--- /dev/null
+++ b/tests/conditions.scm
@@ -0,0 +1,81 @@
+;; Fibers: cooperative, event-driven user-space threads.
+
+;;;; Copyright (C) 2016 Free Software Foundation, Inc.
+;;;;
+;;;; This library is free software; you can redistribute it and/or
+;;;; modify it under the terms of the GNU Lesser General Public
+;;;; License as published by the Free Software Foundation; either
+;;;; version 3 of the License, or (at your option) any later version.
+;;;;
+;;;; This library is distributed in the hope that it will be useful,
+;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;;; Lesser General Public License for more details.
+;;;;
+;;;; You should have received a copy of the GNU Lesser General Public
+;;;; License along with this library; if not, write to the Free Software
+;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+;;;;
+
+(define-module (tests conditions)
+ #:use-module (fibers)
+ #:use-module (fibers conditions)
+ #:use-module (fibers operations)
+ #:use-module (fibers timers))
+
+(define failed? #f)
+
+(define-syntax-rule (assert-equal expected actual)
+ (let ((x expected))
+ (format #t "assert ~s equal to ~s: " 'actual x)
+ (force-output)
+ (let ((y actual))
+ (cond
+ ((equal? x y) (format #t "ok\n"))
+ (else
+ (format #t "no (got ~s)\n" y)
+ (set! failed? #t))))))
+
+(define-syntax-rule (assert-run-fibers-terminates exp)
+ (begin
+ (format #t "assert run-fibers on ~s terminates: " 'exp)
+ (force-output)
+ (let ((start (get-internal-real-time)))
+ (call-with-values (lambda () (run-fibers (lambda () exp)))
+ (lambda vals
+ (format #t "ok (~a s)\n" (/ (- (get-internal-real-time) start)
+ 1.0 internal-time-units-per-second))
+ (apply values vals))))))
+
+(define-syntax-rule (assert-run-fibers-returns (expected ...) exp)
+ (begin
+ (call-with-values (lambda () (assert-run-fibers-terminates exp))
+ (lambda run-fiber-return-vals
+ (assert-equal '(expected ...) run-fiber-return-vals)))))
+
+(define* (with-timeout op #:key (seconds 0.05) (wrap values))
+ (choice-operation op
+ (wrap-operation (sleep-operation seconds) wrap)))
+
+(define (wait/timeout cv)
+ (perform-operation
+ (with-timeout
+ (wrap-operation (wait-operation cv)
+ (lambda () #t))
+ #:wrap (lambda () #f))))
+
+(define cv (make-condition))
+(assert-equal #t (condition? cv))
+(assert-run-fibers-returns (#f) (wait/timeout cv))
+(assert-run-fibers-returns (#f) (wait/timeout cv))
+(assert-equal #t (signal-condition! cv))
+(assert-equal #f (signal-condition! cv))
+(assert-run-fibers-returns (#t) (wait/timeout cv))
+(assert-run-fibers-returns (#t) (wait/timeout cv))
+(assert-run-fibers-returns (#t)
+ (let ((cv (make-condition)))
+ (spawn-fiber (lambda () (signal-condition! cv)))
+ (wait cv)
+ #t))
+
+(exit (if failed? 1 0))
diff --git a/tests/foreign.scm b/tests/foreign.scm
index d3470bc..0d81cb5 100644
--- a/tests/foreign.scm
+++ b/tests/foreign.scm
@@ -64,7 +64,7 @@
(assert-equal #f #f)
(assert-terminates #t)
(assert-terminates (sleep 1))
-(assert-terminates (perform-operation (wait-operation 1)))
+(assert-terminates (perform-operation (sleep-operation 1)))
(assert-equal 42 (receive-from-fiber 42))
(assert-equal 42 (send-to-fiber 42))