summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile.am5
-rw-r--r--fibers/channels.scm384
-rw-r--r--fibers/operations.scm151
-rw-r--r--tests/basic.scm23
-rw-r--r--tests/channels.scm80
5 files changed, 542 insertions, 101 deletions
diff --git a/Makefile.am b/Makefile.am
index 065f0e1..d7e0648 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -30,6 +30,7 @@ SOURCES = \
fibers/epoll.scm \
fibers/internal.scm \
fibers/nameset.scm \
+ fibers/operations.scm \
fibers/psq.scm \
fibers/repl.scm \
web/server/fibers.scm
@@ -42,7 +43,9 @@ epoll_la_CFLAGS = $(AM_CFLAGS) $(GUILE_CFLAGS)
epoll_la_LIBADD = $(GUILE_LIBS)
epoll_la_LDFLAGS = -export-dynamic -module
-TESTS=tests/basic.scm
+TESTS = \
+ tests/basic.scm \
+ tests/channels.scm
TESTS_ENVIRONMENT=top_srcdir="$(abs_top_srcdir)" ./env $(GUILE) -s
EXTRA_DIST += \
diff --git a/fibers/channels.scm b/fibers/channels.scm
index 18f59d2..e7d0038 100644
--- a/fibers/channels.scm
+++ b/fibers/channels.scm
@@ -1,4 +1,4 @@
-;; channels
+;; Channels
;;;; Copyright (C) 2016 Andy Wingo <wingo@pobox.com>
;;;;
@@ -15,7 +15,16 @@
;;;; You should have received a copy of the GNU Lesser General Public
;;;; License along with this library; if not, write to the Free Software
;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-;;;;
+
+;;; Channel implementation following the 2009 ICFP paper "Parallel
+;;; Concurrent ML" by John Reppy, Claudio V. Russo, and Yingqui Xiao.
+;;;
+;;; Besides the general ways in which this implementation differs from
+;;; the paper, this channel implementation avoids locks entirely.
+;;; Still, we should disable interrupts while any operation is in a
+;;; "claimed" state to avoid excess latency due to pre-emption. It
+;;; would be great if we could verify our protocol though; the
+;;; parallel channel operations are still gnarly.
(define-module (fibers channels)
#:use-module (srfi srfi-9)
@@ -23,93 +32,312 @@
#:use-module (ice-9 atomic)
#:use-module (ice-9 match)
#:use-module (fibers internal)
- #:export (channel?
- make-channel
+ #:use-module (fibers operations)
+ #:export (make-channel
+ put-operation
+ get-operation
put-message
get-message))
-(define-record-type <read-wait-queue>
- (make-read-wait-queue waiters)
- read-wait-queue?
- ;; list of fiber, lifo order
- (waiters read-wait-queue-waiters))
+;; A functional queue has a head and a tail, which are both lists.
+;; The head is in FIFO order and the tail is in LIFO order.
+(define-inlinable (make-queue head tail)
+ (cons head tail))
+
+(define (make-empty-queue)
+ (make-queue '() '()))
+
+(define (enqueue queue item)
+ (match queue
+ ((head . tail)
+ (make-queue head (cons item tail)))))
-;; A message queue is a list of messages in FIFO order. A message is
-;; any value. The list is in FIFO order to make adding to the list
-;; more expensive (O(n)) than removing from it (O(1)). It's a list to
-;; avoid record overhead.
+(define (undequeue queue item)
+ (match queue
+ ((head . tail)
+ (make-queue (cons item head) tail))))
-(define-record-type <write-wait-queue>
- (make-write-wait-queue messages waiters)
- write-wait-queue?
- (messages write-wait-queue-messages)
- (waiters write-wait-queue-waiters))
+;; -> new queue, val | #f, #f
+(define (dequeue queue)
+ (match queue
+ ((() . ()) (values #f #f))
+ ((() . tail)
+ (dequeue (make-queue (reverse tail) '())))
+ (((item . head) . tail)
+ (values (make-queue head tail) item))))
+
+(define (dequeue-match queue pred)
+ (match queue
+ ((() . ()) (values #f #f))
+ ((() . tail)
+ (dequeue (make-queue (reverse tail) '())))
+ (((item . head) . tail)
+ (if (pred item)
+ (values (make-queue head tail) item)
+ (call-with-values (dequeue-match (make-queue head tail) pred)
+ (lambda (queue item*)
+ (values (undequeue queue item) item*)))))))
+
+(define (enqueue! qbox item)
+ (let spin ((q (atomic-box-ref qbox)))
+ (let* ((q* (enqueue q item))
+ (q** (atomic-box-compare-and-swap! qbox q q*)))
+ (unless (eq? q q**)
+ (spin q**)))))
(define-record-type <channel>
- (%make-channel queue-size state)
+ (%make-channel getq putq)
channel?
- (queue-size channel-queue-size)
- ;; Atomic reference to channel state. Channel state is either a
- ;; <read-wait-queue>, if the queue is empty and readers are blocked,
- ;; or a message queue (described above) if the queue is not blocked,
- ;; or a <write-wait-queue> if the queue is full and there are
- ;; blocked writers.
- (state channel-message-state))
+ ;; atomic box of queue
+ (getq channel-getq)
+ ;; atomic box of queue
+ (putq channel-putq))
-(define* (make-channel #:key (queue-size 1))
- (%make-channel queue-size (make-atomic-box '())))
+(define (make-channel)
+ (%make-channel (make-atomic-box (make-empty-queue))
+ (make-atomic-box (make-empty-queue))))
-(define (put-message channel message)
+(define (put-operation channel msg)
(match channel
- (($ <channel> queue-size state)
- (let retry ((old-state (atomic-box-ref state)))
- (define (commit new-state kt)
- (let ((x (atomic-box-compare-and-swap! state old-state new-state)))
- (if (eq? x old-state) (kt) (retry x))))
- (match old-state
- ((or () (_ . _))
- (if (< (length old-state) queue-size)
- (commit (append old-state (list message)) values)
- (commit (make-write-wait-queue old-state (list (current-fiber)))
- (lambda ()
- (suspend-current-fiber)
- (retry (atomic-box-ref state))))))
- (($ <read-wait-queue> waiters)
- (commit (list message)
- (lambda ()
- (for-each (lambda (fiber) (resume-fiber fiber values))
- waiters)
- (values))))
- (($ <write-wait-queue> messages waiters)
- (commit (make-write-wait-queue messages (cons (current-fiber) waiters))
- (lambda ()
- (suspend-current-fiber)
- (retry (atomic-box-ref state))))))))))
+ (($ <channel> getq-box putq-box)
+ (define (try-fn)
+ ;; Try to find and perform a pending get operation. If that
+ ;; works, return a result thunk, or otherwise #f.
+ (let try ((getq (atomic-box-ref getq-box)))
+ (call-with-values (lambda () (dequeue getq))
+ (lambda (getq* item)
+ (define (maybe-commit)
+ ;; Try to update getq. Return the new getq value in
+ ;; any case.
+ (let ((q (atomic-box-compare-and-swap! getq-box getq getq*)))
+ (if (eq? q getq) getq* getq)))
+ ;; Return #f if the getq was empty.
+ (and getq*
+ (match item
+ (#(get-flag get-fiber get-wrap-fn)
+ (let spin ()
+ (match (atomic-box-compare-and-swap! get-flag 'W 'S)
+ ('W
+ ;; Success. Commit the dequeue operation,
+ ;; unless the getq changed in the
+ ;; meantime. If we don't manage to commit
+ ;; the dequeue, some other put operation will
+ ;; commit it before it successfully
+ ;; performs any other operation on this
+ ;; channel.
+ (maybe-commit)
+ (resume-fiber get-fiber (if get-wrap-fn
+ (lambda ()
+ (get-wrap-fn msg))
+ (lambda () msg)))
+ ;; Continue directly.
+ (lambda () (values)))
+ ;; Get operation temporarily busy; try again.
+ ('C (spin))
+ ;; Get operation already performed; pop it
+ ;; off the getq (if we can) and try again.
+ ;; If we fail to commit, no big deal, we will
+ ;; try again next time if no other fiber
+ ;; handled it already.
+ ('S (try (maybe-commit))))))))))))
+ (define (block-fn put-flag put-fiber put-wrap-fn)
+ ;; We have suspended the current fiber; arrange for the fiber
+ ;; to be resumed by a get operation by adding it to the channel's
+ ;; putq.
+ (define (not-me? item)
+ (match item
+ (#(get-flag get-fiber get-wrap-fn)
+ (not (eq? put-flag get-flag)))))
+ ;; First, publish this put operation.
+ (enqueue! putq-box (vector put-flag put-fiber put-wrap-fn msg))
+ ;; In the try phase, we scanned the getq for a get operation,
+ ;; but we were unable to perform any of them. Since then,
+ ;; there might be a new get operation on the queue. However
+ ;; only get operations published *after* we publish our put
+ ;; operation to the putq are responsible for trying to complete
+ ;; this put operation; we are responsible for get operations
+ ;; published before we published our put. Therefore, here we
+ ;; visit the getq again. This is like the "try" phase, but
+ ;; with the difference that we've published our op state flag
+ ;; to the queue, so other fibers might be racing to synchronize
+ ;; on our own op.
+ (let service-get-ops ((getq (atomic-box-ref getq-box)))
+ (call-with-values (lambda () (dequeue-match getq not-me?))
+ (lambda (getq* item)
+ (define (maybe-commit)
+ ;; Try to update getq. Return the new getq value in
+ ;; any case.
+ (let ((q (atomic-box-compare-and-swap! getq-box getq getq*)))
+ (if (eq? q getq) getq* getq)))
+ ;; We only have to service the getq if it is non-empty.
+ (when getq*
+ (match item
+ (#(get-flag get-fiber get-wrap-fn)
+ (match (atomic-box-ref get-flag)
+ ('S
+ ;; This get operation has already synchronized;
+ ;; try to commit and operation and in any
+ ;; case try again.
+ (service-get-ops (maybe-commit)))
+ (_
+ (let spin ()
+ (match (atomic-box-compare-and-swap! put-flag 'W 'C)
+ ('W
+ ;; We were able to claim our op. Now try to
+ ;; synchronize on a get operation as well.
+ (match (atomic-box-compare-and-swap! get-flag 'W 'S)
+ ('W
+ ;; It worked! Mark our own op as
+ ;; synchronized, try to commit the result
+ ;; getq, and resume both fibers.
+ (atomic-box-set! put-flag 'S)
+ (maybe-commit)
+ (resume-fiber get-fiber
+ (if get-wrap-fn
+ (lambda () (get-wrap-fn msg))
+ (lambda () msg)))
+ (resume-fiber put-fiber (or put-wrap-fn values))
+ (values))
+ ('C
+ ;; Other fiber trying to do the same
+ ;; thing we are; reset our state and try
+ ;; again.
+ (atomic-box-set! put-flag 'W)
+ (spin))
+ ('S
+ ;; Other op already synchronized. Reset
+ ;; our flag, try to remove this dead
+ ;; entry from the getq, and give it
+ ;; another go.
+ (atomic-box-set! put-flag 'W)
+ (service-get-ops (maybe-commit)))))
+ (_
+ ;; Claiming our own op failed; this can only
+ ;; mean that some other fiber completed our
+ ;; op for us.
+ (values)))))))))))))
+ (make-base-operation #f try-fn block-fn))))
-(define (get-message channel)
+(define (get-operation channel)
(match channel
- (($ <channel> queue-size state)
- (let retry ((old-state (atomic-box-ref state)))
- (define (commit new-state kt)
- (let ((x (atomic-box-compare-and-swap! state old-state new-state)))
- (if (eq? x old-state) (kt) (retry x))))
- (match old-state
- (()
- (commit (make-read-wait-queue (list (current-fiber)))
- (lambda ()
- (suspend-current-fiber)
- (retry (atomic-box-ref state)))))
- ((message . messages)
- (commit messages (lambda () message)))
- (($ <read-wait-queue> waiters)
- (commit (make-read-wait-queue (cons (current-fiber) waiters))
- (lambda ()
- (suspend-current-fiber)
- (retry (atomic-box-ref state)))))
- (($ <write-wait-queue> (message . messages) waiters)
- (commit messages
- (lambda ()
- (for-each (lambda (fiber) (resume-fiber fiber values))
- waiters)
- message))))))))
+ (($ <channel> getq-box putq-box)
+ (define (try-fn)
+ ;; Try to find and perform a pending put operation. If that
+ ;; works, return a result thunk, or otherwise #f.
+ (let try ((putq (atomic-box-ref putq-box)))
+ (call-with-values (lambda () (dequeue putq))
+ (lambda (putq* item)
+ (define (maybe-commit)
+ ;; Try to update putq. Return the new putq value in
+ ;; any case.
+ (let ((q (atomic-box-compare-and-swap! putq-box putq putq*)))
+ (if (eq? q putq) putq* putq)))
+ ;; Return #f if the putq was empty.
+ (and putq*
+ (match item
+ (#(put-flag put-fiber put-wrap-fn msg)
+ (let spin ()
+ (match (atomic-box-compare-and-swap! put-flag 'W 'S)
+ ('W
+ ;; Success. Commit the fresh putq if we
+ ;; can. If we don't manage to commit right
+ ;; now, some other get operation will commit
+ ;; it before synchronizing any other
+ ;; operation on this channel.
+ (maybe-commit)
+ (resume-fiber put-fiber (or put-wrap-fn values))
+ ;; Continue directly.
+ (lambda () msg))
+ ;; Put operation temporarily busy; try again.
+ ('C (spin))
+ ;; Put operation already synchronized; pop it
+ ;; off the putq (if we can) and try again.
+ ;; If we fail to commit, no big deal, we will
+ ;; try again next time if no other fiber
+ ;; handled it already.
+ ('S (try (maybe-commit))))))))))))
+ (define (block-fn get-flag get-fiber get-wrap-fn)
+ ;; We have suspended the current fiber; arrange for the fiber
+ ;; to be resumed by a put operation by adding it to the
+ ;; channel's getq.
+ (define (not-me? item)
+ (match item
+ (#(put-flag put-fiber put-wrap-fn msg)
+ (not (eq? get-flag put-flag)))))
+ ;; First, publish this get operation.
+ (enqueue! getq-box (vector get-flag get-fiber get-wrap-fn))
+ ;; In the try phase, we scanned the putq for a live put
+ ;; operation, but we were unable to synchronize. Since then,
+ ;; there might be a new operation on the putq. However only
+ ;; put operations published *after* we publish our get
+ ;; operation to the getq are responsible for trying to complete
+ ;; this get operation; we are responsible for put operations
+ ;; published before we published our get. Therefore, here we
+ ;; visit the putq again. This is like the "try" phase, but
+ ;; with the difference that we've published our op state flag
+ ;; to the getq, so other fibers might be racing to synchronize
+ ;; on our own op.
+ (let service-put-ops ((putq (atomic-box-ref putq-box)))
+ (call-with-values (lambda () (dequeue-match putq not-me?))
+ (lambda (putq* item)
+ (define (maybe-commit)
+ ;; Try to update putq. Return the new putq value in
+ ;; any case.
+ (let ((q (atomic-box-compare-and-swap! putq-box putq putq*)))
+ (if (eq? q putq) putq* putq)))
+ ;; We only have to service the putq if it is non-empty.
+ (when putq*
+ (match item
+ (#(put-flag put-fiber put-wrap-fn msg)
+ (match (atomic-box-ref put-flag)
+ ('S
+ ;; This put operation has already synchronized;
+ ;; try to commit the dequeue operation and in any
+ ;; case try again.
+ (service-put-ops (maybe-commit)))
+ (_
+ (let spin ()
+ (match (atomic-box-compare-and-swap! get-flag 'W 'C)
+ ('W
+ ;; We were able to claim our op. Now try
+ ;; to synchronize on a put operation as well.
+ (match (atomic-box-compare-and-swap! put-flag 'W 'S)
+ ('W
+ ;; It worked! Mark our own op as
+ ;; synchronized, try to commit the put
+ ;; dequeue operation, and mark both
+ ;; fibers for resumption.
+ (atomic-box-set! get-flag 'S)
+ (maybe-commit)
+ (resume-fiber get-fiber
+ (if get-wrap-fn
+ (lambda () (get-wrap-fn msg))
+ (lambda () msg)))
+ (resume-fiber put-fiber (or put-wrap-fn values))
+ (values))
+ ('C
+ ;; Other fiber trying to do the same
+ ;; thing we are; reset our state and try
+ ;; again.
+ (atomic-box-set! get-flag 'W)
+ (spin))
+ ('S
+ ;; Put op already synchronized. Reset
+ ;; get flag, try to remove this dead
+ ;; entry from the putq, and give it
+ ;; another go.
+ (atomic-box-set! get-flag 'W)
+ (service-put-ops (maybe-commit)))))
+ (_
+ ;; Claiming our own op failed; this can
+ ;; only mean that some other fiber
+ ;; completed our op for us.
+ (values)))))))))))))
+ (make-base-operation #f try-fn block-fn))))
+
+(define (put-message ch exp)
+ (perform-operation (put-operation ch exp)))
+
+(define (get-message ch)
+ (perform-operation (get-operation ch)))
diff --git a/fibers/operations.scm b/fibers/operations.scm
new file mode 100644
index 0000000..ae831a4
--- /dev/null
+++ b/fibers/operations.scm
@@ -0,0 +1,151 @@
+;; Parallel Concurrent ML for Guile
+
+;;;; Copyright (C) 2016 Andy Wingo <wingo@pobox.com>
+;;;;
+;;;; This library is free software; you can redistribute it and/or
+;;;; modify it under the terms of the GNU Lesser General Public
+;;;; License as published by the Free Software Foundation; either
+;;;; version 3 of the License, or (at your option) any later version.
+;;;;
+;;;; This library is distributed in the hope that it will be useful,
+;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;;; Lesser General Public License for more details.
+;;;;
+;;;; You should have received a copy of the GNU Lesser General Public
+;;;; License along with this library; if not, write to the Free Software
+;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+;;; An implementation of Parallel Concurrent ML following the 2009
+;;; ICFP paper "Parallel Concurrent ML" by John Reppy, Claudio
+;;; V. Russo, and Yingqui Xiao.
+;;;
+;;; This implementation differs from the paper in a few ways:
+;;;
+;;; * Superficially, We use the term "operation" instead of "event".
+;;; We say "wrap-operation" instead of "wrap", "choose-operation"
+;;; instead of "choose", and "perform-operation" instead of "sync".
+;;;
+;;; * For the moment, this is an implementation of "primitive CML"
+;;; only. This may change in the future.
+;;;
+;;; * The continuation handling is a little different; in Manticore
+;;; (or at least in the paper), it appears that suspended threads
+;;; are represented in a quite raw way, whereas in Guile there are
+;;; wrapper <fiber> objects. Likewise unlike in CML, the
+;;; continuations in Fibers are delimited and composable, so things
+;;; are a little different. Suspended computations expect to be
+;;; passed a thunk as the resume value, and that thunk gets invoked
+;;; in the context of the fiber. For this reason we represent
+;;; wrappers explicitly in events, using them to wrap the resume
+;;; thunks. As in the C# implementation, we delay continuation
+;;; creation / fiber suspension until after a failed "doFn" phase.
+;;;
+;;; * In Fibers we do away with the "poll" phase, instead merging it
+;;; with the "try" phase. (Our "try" phase is more like what CML
+;;; calls "do". In Fibers, there is no do; there is only try.)
+;;;
+
+(define-module (fibers operations)
+ #:use-module (srfi srfi-9)
+ #:use-module (srfi srfi-9 gnu)
+ #:use-module (ice-9 atomic)
+ #:use-module (ice-9 match)
+ #:use-module (fibers internal)
+ #:export (wrap-operation
+ choose-operation
+ perform-operation
+
+ make-base-operation))
+
+;; Three possible values: W (waiting), C (claimed), or S (synched).
+;; The meanings are as in the Parallel CML paper.
+(define-inlinable (make-op-state) (make-atomic-box 'W))
+
+(define-record-type <base-op>
+ (make-base-operation wrap-fn try-fn block-fn)
+ base-op?
+ ;; ((arg ...) -> (result ...)) | #f
+ (wrap-fn base-op-wrap-fn)
+ ;; () -> (thunk | #f)
+ (try-fn base-op-try-fn)
+ ;; (op-state resume-k wrap-fn) -> ()
+ (block-fn base-op-block-fn))
+
+(define-record-type <choice-op>
+ (make-choice-operation base-ops)
+ choice-op?
+ (base-ops choice-op-base-ops))
+
+(define (wrap-operation op f)
+ (match op
+ (($ <base-op> wrap-fn try-fn block-fn)
+ (make-base-operation (match wrap-fn
+ (#f f)
+ (_ (lambda args
+ (call-with-values (lambda ()
+ (apply wrap-fn args))
+ f))))
+ try-fn
+ block-fn))
+ (($ <choice-op> base-ops)
+ (let* ((count (vector-length base-ops))
+ (base-ops* (make-vector count)))
+ (let lp ((i 0))
+ (when (< i count)
+ (vector-set! base-ops* i (wrap-operation (vector-ref base-ops i) f))
+ (lp (1+ i))))
+ (make-choice-operation base-ops*)))))
+
+(define (choose-operation . ops)
+ (define (flatten ops)
+ (match ops
+ (() '())
+ ((op . ops)
+ (append (match op
+ (($ <base-op>) (list op))
+ (($ <choice-op> base-ops) (vector->list base-ops)))
+ (flatten ops)))))
+ (match (flatten ops)
+ ((base-op) base-op)
+ (base-ops (make-choice-operation (list->vector base-ops)))))
+
+(define (perform-operation op)
+ (define (block)
+ (suspend-current-fiber
+ (lambda (fiber)
+ (let ((flag (make-op-state)))
+ (match op
+ (($ <base-op> wrap-fn try-fn block-fn)
+ (block-fn flag fiber wrap-fn))
+ (($ <choice-op> base-ops)
+ (let lp ((i 0))
+ (when (< i (vector-length base-ops))
+ (match (vector-ref base-ops i)
+ (($ <base-op> wrap-fn try-fn block-fn)
+ (block-fn flag fiber wrap-fn)))
+ (lp (1+ i))))))))))
+
+ ;; First, try to sync on an op. If no op syncs, block.
+ (match op
+ (($ <base-op> wrap-fn try-fn)
+ (match (try-fn)
+ (#f (block))
+ (thunk
+ (if wrap-fn
+ (call-with-values thunk wrap-fn)
+ (thunk)))))
+ (($ <choice-op> base-ops)
+ (let* ((count (vector-length base-ops))
+ (offset (random count)))
+ (let lp ((i 0))
+ (if (< i count)
+ (match (vector-ref base-ops (modulo (+ i offset) count))
+ (($ <base-op> wrap-fn try-fn)
+ (match (try-fn)
+ (#f (lp (1+ i)))
+ (thunk
+ (if wrap-fn
+ (call-with-values thunk wrap-fn)
+ (thunk))))))
+ (block)))))))
diff --git a/tests/basic.scm b/tests/basic.scm
index 1e1ab24..b3d92fa 100644
--- a/tests/basic.scm
+++ b/tests/basic.scm
@@ -18,8 +18,7 @@
;;;;
(define-module (tests basic)
- #:use-module (fibers)
- #:use-module (fibers channels))
+ #:use-module (fibers))
(define failed? #f)
@@ -127,20 +126,6 @@
(assert-run-fibers-returns (1) 1)
-(define-syntax-rule (rpc exp)
- (let ((ch (make-channel)))
- (spawn-fiber (lambda () (put-message ch exp)))
- (get-message ch)))
-
-(assert-run-fibers-returns (1) (rpc 1))
-
-(define (rpc-fib n)
- (rpc (if (< n 2)
- 1
- (+ (rpc-fib (- n 1)) (rpc-fib (- n 2))))))
-
-(assert-run-fibers-returns (75025) (rpc-fib 24))
-
(define (check-sleep timeout)
(spawn-fiber (lambda ()
(let ((start (get-internal-real-time)))
@@ -155,14 +140,8 @@
(assert-run-fibers-terminates
(do-times 20 (check-sleep (random 1.0))))
-;; timed channel wait
-
-;; multi-channel wait
-
;; exceptions
-;; cross-thread calls
-
;; closing port causes pollerr
;; live threads list
diff --git a/tests/channels.scm b/tests/channels.scm
new file mode 100644
index 0000000..5a9b58b
--- /dev/null
+++ b/tests/channels.scm
@@ -0,0 +1,80 @@
+;; Fibers: cooperative, event-driven user-space threads.
+
+;;;; Copyright (C) 2016 Free Software Foundation, Inc.
+;;;;
+;;;; This library is free software; you can redistribute it and/or
+;;;; modify it under the terms of the GNU Lesser General Public
+;;;; License as published by the Free Software Foundation; either
+;;;; version 3 of the License, or (at your option) any later version.
+;;;;
+;;;; This library is distributed in the hope that it will be useful,
+;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;;; Lesser General Public License for more details.
+;;;;
+;;;; You should have received a copy of the GNU Lesser General Public
+;;;; License along with this library; if not, write to the Free Software
+;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+;;;;
+
+(define-module (tests cml)
+ #:use-module (fibers)
+ #:use-module (fibers channels))
+
+(define failed? #f)
+
+(define-syntax-rule (assert-equal expected actual)
+ (let ((x expected))
+ (format #t "assert ~s equal to ~s: " 'actual x)
+ (force-output)
+ (let ((y actual))
+ (cond
+ ((equal? x y) (format #t "ok\n"))
+ (else
+ (format #t "no (got ~s)\n" y)
+ (set! failed? #t))))))
+
+(define-syntax-rule (assert-run-fibers-terminates exp)
+ (begin
+ (format #t "assert run-fibers on ~s terminates: " 'exp)
+ (force-output)
+ (let ((start (get-internal-real-time)))
+ (call-with-values (lambda () (run-fibers (lambda () exp)))
+ (lambda vals
+ (format #t "ok (~a s)\n" (/ (- (get-internal-real-time) start)
+ 1.0 internal-time-units-per-second))
+ (apply values vals))))))
+
+(define-syntax-rule (assert-run-fibers-returns (expected ...) exp)
+ (begin
+ (call-with-values (lambda () (assert-run-fibers-terminates exp))
+ (lambda run-fiber-return-vals
+ (assert-equal '(expected ...) run-fiber-return-vals)))))
+
+(define-syntax-rule (do-times n exp)
+ (let lp ((count n))
+ (let ((count (1- count)))
+ exp
+ (unless (zero? count) (lp count)))))
+
+(define-syntax-rule (rpc exp)
+ (let ((ch (make-channel)))
+ (spawn-fiber (lambda () (put-message ch exp)))
+ (get-message ch)))
+
+(assert-run-fibers-returns (1) (rpc 1))
+
+(define (rpc-fib n)
+ (rpc (if (< n 2)
+ 1
+ (+ (rpc-fib (- n 1)) (rpc-fib (- n 2))))))
+
+(assert-run-fibers-returns (75025) (rpc-fib 24))
+
+;; timed channel wait
+
+;; multi-channel wait
+
+;; cross-thread calls
+
+(exit (if failed? 1 0))