diff options
| author | Andy Wingo <wingo@pobox.com> | 2016-10-03 16:31:15 +0200 |
|---|---|---|
| committer | Andy Wingo <wingo@pobox.com> | 2016-10-03 16:31:15 +0200 |
| commit | 184bdd6f4bb359a3827fbfc367942e111b63f660 (patch) | |
| tree | e4de309b9bcb9cf79875ea809abfd2b2b835d2c5 | |
| parent | Add support for ,spawn-fiber (diff) | |
| download | guile-fibers-184bdd6f4bb359a3827fbfc367942e111b63f660.tar.gz | |
Reimplement in terms of Parallel Concurrent ML
* fibers/operations.scm: New file.
* fibers/channels.scm: Reimplement in terms of operations (CML events).
* tests/basic.scm: Remove channels tests.
* tests/channels.scm: Add channels tests.
* Makefile.am: Update for new files.
| -rw-r--r-- | Makefile.am | 5 | ||||
| -rw-r--r-- | fibers/channels.scm | 384 | ||||
| -rw-r--r-- | fibers/operations.scm | 151 | ||||
| -rw-r--r-- | tests/basic.scm | 23 | ||||
| -rw-r--r-- | tests/channels.scm | 80 |
5 files changed, 542 insertions, 101 deletions
diff --git a/Makefile.am b/Makefile.am index 065f0e1..d7e0648 100644 --- a/Makefile.am +++ b/Makefile.am @@ -30,6 +30,7 @@ SOURCES = \ fibers/epoll.scm \ fibers/internal.scm \ fibers/nameset.scm \ + fibers/operations.scm \ fibers/psq.scm \ fibers/repl.scm \ web/server/fibers.scm @@ -42,7 +43,9 @@ epoll_la_CFLAGS = $(AM_CFLAGS) $(GUILE_CFLAGS) epoll_la_LIBADD = $(GUILE_LIBS) epoll_la_LDFLAGS = -export-dynamic -module -TESTS=tests/basic.scm +TESTS = \ + tests/basic.scm \ + tests/channels.scm TESTS_ENVIRONMENT=top_srcdir="$(abs_top_srcdir)" ./env $(GUILE) -s EXTRA_DIST += \ diff --git a/fibers/channels.scm b/fibers/channels.scm index 18f59d2..e7d0038 100644 --- a/fibers/channels.scm +++ b/fibers/channels.scm @@ -1,4 +1,4 @@ -;; channels +;; Channels ;;;; Copyright (C) 2016 Andy Wingo <wingo@pobox.com> ;;;; @@ -15,7 +15,16 @@ ;;;; You should have received a copy of the GNU Lesser General Public ;;;; License along with this library; if not, write to the Free Software ;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -;;;; + +;;; Channel implementation following the 2009 ICFP paper "Parallel +;;; Concurrent ML" by John Reppy, Claudio V. Russo, and Yingqui Xiao. +;;; +;;; Besides the general ways in which this implementation differs from +;;; the paper, this channel implementation avoids locks entirely. +;;; Still, we should disable interrupts while any operation is in a +;;; "claimed" state to avoid excess latency due to pre-emption. It +;;; would be great if we could verify our protocol though; the +;;; parallel channel operations are still gnarly. (define-module (fibers channels) #:use-module (srfi srfi-9) @@ -23,93 +32,312 @@ #:use-module (ice-9 atomic) #:use-module (ice-9 match) #:use-module (fibers internal) - #:export (channel? - make-channel + #:use-module (fibers operations) + #:export (make-channel + put-operation + get-operation put-message get-message)) -(define-record-type <read-wait-queue> - (make-read-wait-queue waiters) - read-wait-queue? - ;; list of fiber, lifo order - (waiters read-wait-queue-waiters)) +;; A functional queue has a head and a tail, which are both lists. +;; The head is in FIFO order and the tail is in LIFO order. +(define-inlinable (make-queue head tail) + (cons head tail)) + +(define (make-empty-queue) + (make-queue '() '())) + +(define (enqueue queue item) + (match queue + ((head . tail) + (make-queue head (cons item tail))))) -;; A message queue is a list of messages in FIFO order. A message is -;; any value. The list is in FIFO order to make adding to the list -;; more expensive (O(n)) than removing from it (O(1)). It's a list to -;; avoid record overhead. +(define (undequeue queue item) + (match queue + ((head . tail) + (make-queue (cons item head) tail)))) -(define-record-type <write-wait-queue> - (make-write-wait-queue messages waiters) - write-wait-queue? - (messages write-wait-queue-messages) - (waiters write-wait-queue-waiters)) +;; -> new queue, val | #f, #f +(define (dequeue queue) + (match queue + ((() . ()) (values #f #f)) + ((() . tail) + (dequeue (make-queue (reverse tail) '()))) + (((item . head) . tail) + (values (make-queue head tail) item)))) + +(define (dequeue-match queue pred) + (match queue + ((() . ()) (values #f #f)) + ((() . tail) + (dequeue (make-queue (reverse tail) '()))) + (((item . head) . tail) + (if (pred item) + (values (make-queue head tail) item) + (call-with-values (dequeue-match (make-queue head tail) pred) + (lambda (queue item*) + (values (undequeue queue item) item*))))))) + +(define (enqueue! qbox item) + (let spin ((q (atomic-box-ref qbox))) + (let* ((q* (enqueue q item)) + (q** (atomic-box-compare-and-swap! qbox q q*))) + (unless (eq? q q**) + (spin q**))))) (define-record-type <channel> - (%make-channel queue-size state) + (%make-channel getq putq) channel? - (queue-size channel-queue-size) - ;; Atomic reference to channel state. Channel state is either a - ;; <read-wait-queue>, if the queue is empty and readers are blocked, - ;; or a message queue (described above) if the queue is not blocked, - ;; or a <write-wait-queue> if the queue is full and there are - ;; blocked writers. - (state channel-message-state)) + ;; atomic box of queue + (getq channel-getq) + ;; atomic box of queue + (putq channel-putq)) -(define* (make-channel #:key (queue-size 1)) - (%make-channel queue-size (make-atomic-box '()))) +(define (make-channel) + (%make-channel (make-atomic-box (make-empty-queue)) + (make-atomic-box (make-empty-queue)))) -(define (put-message channel message) +(define (put-operation channel msg) (match channel - (($ <channel> queue-size state) - (let retry ((old-state (atomic-box-ref state))) - (define (commit new-state kt) - (let ((x (atomic-box-compare-and-swap! state old-state new-state))) - (if (eq? x old-state) (kt) (retry x)))) - (match old-state - ((or () (_ . _)) - (if (< (length old-state) queue-size) - (commit (append old-state (list message)) values) - (commit (make-write-wait-queue old-state (list (current-fiber))) - (lambda () - (suspend-current-fiber) - (retry (atomic-box-ref state)))))) - (($ <read-wait-queue> waiters) - (commit (list message) - (lambda () - (for-each (lambda (fiber) (resume-fiber fiber values)) - waiters) - (values)))) - (($ <write-wait-queue> messages waiters) - (commit (make-write-wait-queue messages (cons (current-fiber) waiters)) - (lambda () - (suspend-current-fiber) - (retry (atomic-box-ref state)))))))))) + (($ <channel> getq-box putq-box) + (define (try-fn) + ;; Try to find and perform a pending get operation. If that + ;; works, return a result thunk, or otherwise #f. + (let try ((getq (atomic-box-ref getq-box))) + (call-with-values (lambda () (dequeue getq)) + (lambda (getq* item) + (define (maybe-commit) + ;; Try to update getq. Return the new getq value in + ;; any case. + (let ((q (atomic-box-compare-and-swap! getq-box getq getq*))) + (if (eq? q getq) getq* getq))) + ;; Return #f if the getq was empty. + (and getq* + (match item + (#(get-flag get-fiber get-wrap-fn) + (let spin () + (match (atomic-box-compare-and-swap! get-flag 'W 'S) + ('W + ;; Success. Commit the dequeue operation, + ;; unless the getq changed in the + ;; meantime. If we don't manage to commit + ;; the dequeue, some other put operation will + ;; commit it before it successfully + ;; performs any other operation on this + ;; channel. + (maybe-commit) + (resume-fiber get-fiber (if get-wrap-fn + (lambda () + (get-wrap-fn msg)) + (lambda () msg))) + ;; Continue directly. + (lambda () (values))) + ;; Get operation temporarily busy; try again. + ('C (spin)) + ;; Get operation already performed; pop it + ;; off the getq (if we can) and try again. + ;; If we fail to commit, no big deal, we will + ;; try again next time if no other fiber + ;; handled it already. + ('S (try (maybe-commit)))))))))))) + (define (block-fn put-flag put-fiber put-wrap-fn) + ;; We have suspended the current fiber; arrange for the fiber + ;; to be resumed by a get operation by adding it to the channel's + ;; putq. + (define (not-me? item) + (match item + (#(get-flag get-fiber get-wrap-fn) + (not (eq? put-flag get-flag))))) + ;; First, publish this put operation. + (enqueue! putq-box (vector put-flag put-fiber put-wrap-fn msg)) + ;; In the try phase, we scanned the getq for a get operation, + ;; but we were unable to perform any of them. Since then, + ;; there might be a new get operation on the queue. However + ;; only get operations published *after* we publish our put + ;; operation to the putq are responsible for trying to complete + ;; this put operation; we are responsible for get operations + ;; published before we published our put. Therefore, here we + ;; visit the getq again. This is like the "try" phase, but + ;; with the difference that we've published our op state flag + ;; to the queue, so other fibers might be racing to synchronize + ;; on our own op. + (let service-get-ops ((getq (atomic-box-ref getq-box))) + (call-with-values (lambda () (dequeue-match getq not-me?)) + (lambda (getq* item) + (define (maybe-commit) + ;; Try to update getq. Return the new getq value in + ;; any case. + (let ((q (atomic-box-compare-and-swap! getq-box getq getq*))) + (if (eq? q getq) getq* getq))) + ;; We only have to service the getq if it is non-empty. + (when getq* + (match item + (#(get-flag get-fiber get-wrap-fn) + (match (atomic-box-ref get-flag) + ('S + ;; This get operation has already synchronized; + ;; try to commit and operation and in any + ;; case try again. + (service-get-ops (maybe-commit))) + (_ + (let spin () + (match (atomic-box-compare-and-swap! put-flag 'W 'C) + ('W + ;; We were able to claim our op. Now try to + ;; synchronize on a get operation as well. + (match (atomic-box-compare-and-swap! get-flag 'W 'S) + ('W + ;; It worked! Mark our own op as + ;; synchronized, try to commit the result + ;; getq, and resume both fibers. + (atomic-box-set! put-flag 'S) + (maybe-commit) + (resume-fiber get-fiber + (if get-wrap-fn + (lambda () (get-wrap-fn msg)) + (lambda () msg))) + (resume-fiber put-fiber (or put-wrap-fn values)) + (values)) + ('C + ;; Other fiber trying to do the same + ;; thing we are; reset our state and try + ;; again. + (atomic-box-set! put-flag 'W) + (spin)) + ('S + ;; Other op already synchronized. Reset + ;; our flag, try to remove this dead + ;; entry from the getq, and give it + ;; another go. + (atomic-box-set! put-flag 'W) + (service-get-ops (maybe-commit))))) + (_ + ;; Claiming our own op failed; this can only + ;; mean that some other fiber completed our + ;; op for us. + (values))))))))))))) + (make-base-operation #f try-fn block-fn)))) -(define (get-message channel) +(define (get-operation channel) (match channel - (($ <channel> queue-size state) - (let retry ((old-state (atomic-box-ref state))) - (define (commit new-state kt) - (let ((x (atomic-box-compare-and-swap! state old-state new-state))) - (if (eq? x old-state) (kt) (retry x)))) - (match old-state - (() - (commit (make-read-wait-queue (list (current-fiber))) - (lambda () - (suspend-current-fiber) - (retry (atomic-box-ref state))))) - ((message . messages) - (commit messages (lambda () message))) - (($ <read-wait-queue> waiters) - (commit (make-read-wait-queue (cons (current-fiber) waiters)) - (lambda () - (suspend-current-fiber) - (retry (atomic-box-ref state))))) - (($ <write-wait-queue> (message . messages) waiters) - (commit messages - (lambda () - (for-each (lambda (fiber) (resume-fiber fiber values)) - waiters) - message)))))))) + (($ <channel> getq-box putq-box) + (define (try-fn) + ;; Try to find and perform a pending put operation. If that + ;; works, return a result thunk, or otherwise #f. + (let try ((putq (atomic-box-ref putq-box))) + (call-with-values (lambda () (dequeue putq)) + (lambda (putq* item) + (define (maybe-commit) + ;; Try to update putq. Return the new putq value in + ;; any case. + (let ((q (atomic-box-compare-and-swap! putq-box putq putq*))) + (if (eq? q putq) putq* putq))) + ;; Return #f if the putq was empty. + (and putq* + (match item + (#(put-flag put-fiber put-wrap-fn msg) + (let spin () + (match (atomic-box-compare-and-swap! put-flag 'W 'S) + ('W + ;; Success. Commit the fresh putq if we + ;; can. If we don't manage to commit right + ;; now, some other get operation will commit + ;; it before synchronizing any other + ;; operation on this channel. + (maybe-commit) + (resume-fiber put-fiber (or put-wrap-fn values)) + ;; Continue directly. + (lambda () msg)) + ;; Put operation temporarily busy; try again. + ('C (spin)) + ;; Put operation already synchronized; pop it + ;; off the putq (if we can) and try again. + ;; If we fail to commit, no big deal, we will + ;; try again next time if no other fiber + ;; handled it already. + ('S (try (maybe-commit)))))))))))) + (define (block-fn get-flag get-fiber get-wrap-fn) + ;; We have suspended the current fiber; arrange for the fiber + ;; to be resumed by a put operation by adding it to the + ;; channel's getq. + (define (not-me? item) + (match item + (#(put-flag put-fiber put-wrap-fn msg) + (not (eq? get-flag put-flag))))) + ;; First, publish this get operation. + (enqueue! getq-box (vector get-flag get-fiber get-wrap-fn)) + ;; In the try phase, we scanned the putq for a live put + ;; operation, but we were unable to synchronize. Since then, + ;; there might be a new operation on the putq. However only + ;; put operations published *after* we publish our get + ;; operation to the getq are responsible for trying to complete + ;; this get operation; we are responsible for put operations + ;; published before we published our get. Therefore, here we + ;; visit the putq again. This is like the "try" phase, but + ;; with the difference that we've published our op state flag + ;; to the getq, so other fibers might be racing to synchronize + ;; on our own op. + (let service-put-ops ((putq (atomic-box-ref putq-box))) + (call-with-values (lambda () (dequeue-match putq not-me?)) + (lambda (putq* item) + (define (maybe-commit) + ;; Try to update putq. Return the new putq value in + ;; any case. + (let ((q (atomic-box-compare-and-swap! putq-box putq putq*))) + (if (eq? q putq) putq* putq))) + ;; We only have to service the putq if it is non-empty. + (when putq* + (match item + (#(put-flag put-fiber put-wrap-fn msg) + (match (atomic-box-ref put-flag) + ('S + ;; This put operation has already synchronized; + ;; try to commit the dequeue operation and in any + ;; case try again. + (service-put-ops (maybe-commit))) + (_ + (let spin () + (match (atomic-box-compare-and-swap! get-flag 'W 'C) + ('W + ;; We were able to claim our op. Now try + ;; to synchronize on a put operation as well. + (match (atomic-box-compare-and-swap! put-flag 'W 'S) + ('W + ;; It worked! Mark our own op as + ;; synchronized, try to commit the put + ;; dequeue operation, and mark both + ;; fibers for resumption. + (atomic-box-set! get-flag 'S) + (maybe-commit) + (resume-fiber get-fiber + (if get-wrap-fn + (lambda () (get-wrap-fn msg)) + (lambda () msg))) + (resume-fiber put-fiber (or put-wrap-fn values)) + (values)) + ('C + ;; Other fiber trying to do the same + ;; thing we are; reset our state and try + ;; again. + (atomic-box-set! get-flag 'W) + (spin)) + ('S + ;; Put op already synchronized. Reset + ;; get flag, try to remove this dead + ;; entry from the putq, and give it + ;; another go. + (atomic-box-set! get-flag 'W) + (service-put-ops (maybe-commit))))) + (_ + ;; Claiming our own op failed; this can + ;; only mean that some other fiber + ;; completed our op for us. + (values))))))))))))) + (make-base-operation #f try-fn block-fn)))) + +(define (put-message ch exp) + (perform-operation (put-operation ch exp))) + +(define (get-message ch) + (perform-operation (get-operation ch))) diff --git a/fibers/operations.scm b/fibers/operations.scm new file mode 100644 index 0000000..ae831a4 --- /dev/null +++ b/fibers/operations.scm @@ -0,0 +1,151 @@ +;; Parallel Concurrent ML for Guile + +;;;; Copyright (C) 2016 Andy Wingo <wingo@pobox.com> +;;;; +;;;; This library is free software; you can redistribute it and/or +;;;; modify it under the terms of the GNU Lesser General Public +;;;; License as published by the Free Software Foundation; either +;;;; version 3 of the License, or (at your option) any later version. +;;;; +;;;; This library is distributed in the hope that it will be useful, +;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;;; Lesser General Public License for more details. +;;;; +;;;; You should have received a copy of the GNU Lesser General Public +;;;; License along with this library; if not, write to the Free Software +;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +;;; An implementation of Parallel Concurrent ML following the 2009 +;;; ICFP paper "Parallel Concurrent ML" by John Reppy, Claudio +;;; V. Russo, and Yingqui Xiao. +;;; +;;; This implementation differs from the paper in a few ways: +;;; +;;; * Superficially, We use the term "operation" instead of "event". +;;; We say "wrap-operation" instead of "wrap", "choose-operation" +;;; instead of "choose", and "perform-operation" instead of "sync". +;;; +;;; * For the moment, this is an implementation of "primitive CML" +;;; only. This may change in the future. +;;; +;;; * The continuation handling is a little different; in Manticore +;;; (or at least in the paper), it appears that suspended threads +;;; are represented in a quite raw way, whereas in Guile there are +;;; wrapper <fiber> objects. Likewise unlike in CML, the +;;; continuations in Fibers are delimited and composable, so things +;;; are a little different. Suspended computations expect to be +;;; passed a thunk as the resume value, and that thunk gets invoked +;;; in the context of the fiber. For this reason we represent +;;; wrappers explicitly in events, using them to wrap the resume +;;; thunks. As in the C# implementation, we delay continuation +;;; creation / fiber suspension until after a failed "doFn" phase. +;;; +;;; * In Fibers we do away with the "poll" phase, instead merging it +;;; with the "try" phase. (Our "try" phase is more like what CML +;;; calls "do". In Fibers, there is no do; there is only try.) +;;; + +(define-module (fibers operations) + #:use-module (srfi srfi-9) + #:use-module (srfi srfi-9 gnu) + #:use-module (ice-9 atomic) + #:use-module (ice-9 match) + #:use-module (fibers internal) + #:export (wrap-operation + choose-operation + perform-operation + + make-base-operation)) + +;; Three possible values: W (waiting), C (claimed), or S (synched). +;; The meanings are as in the Parallel CML paper. +(define-inlinable (make-op-state) (make-atomic-box 'W)) + +(define-record-type <base-op> + (make-base-operation wrap-fn try-fn block-fn) + base-op? + ;; ((arg ...) -> (result ...)) | #f + (wrap-fn base-op-wrap-fn) + ;; () -> (thunk | #f) + (try-fn base-op-try-fn) + ;; (op-state resume-k wrap-fn) -> () + (block-fn base-op-block-fn)) + +(define-record-type <choice-op> + (make-choice-operation base-ops) + choice-op? + (base-ops choice-op-base-ops)) + +(define (wrap-operation op f) + (match op + (($ <base-op> wrap-fn try-fn block-fn) + (make-base-operation (match wrap-fn + (#f f) + (_ (lambda args + (call-with-values (lambda () + (apply wrap-fn args)) + f)))) + try-fn + block-fn)) + (($ <choice-op> base-ops) + (let* ((count (vector-length base-ops)) + (base-ops* (make-vector count))) + (let lp ((i 0)) + (when (< i count) + (vector-set! base-ops* i (wrap-operation (vector-ref base-ops i) f)) + (lp (1+ i)))) + (make-choice-operation base-ops*))))) + +(define (choose-operation . ops) + (define (flatten ops) + (match ops + (() '()) + ((op . ops) + (append (match op + (($ <base-op>) (list op)) + (($ <choice-op> base-ops) (vector->list base-ops))) + (flatten ops))))) + (match (flatten ops) + ((base-op) base-op) + (base-ops (make-choice-operation (list->vector base-ops))))) + +(define (perform-operation op) + (define (block) + (suspend-current-fiber + (lambda (fiber) + (let ((flag (make-op-state))) + (match op + (($ <base-op> wrap-fn try-fn block-fn) + (block-fn flag fiber wrap-fn)) + (($ <choice-op> base-ops) + (let lp ((i 0)) + (when (< i (vector-length base-ops)) + (match (vector-ref base-ops i) + (($ <base-op> wrap-fn try-fn block-fn) + (block-fn flag fiber wrap-fn))) + (lp (1+ i)))))))))) + + ;; First, try to sync on an op. If no op syncs, block. + (match op + (($ <base-op> wrap-fn try-fn) + (match (try-fn) + (#f (block)) + (thunk + (if wrap-fn + (call-with-values thunk wrap-fn) + (thunk))))) + (($ <choice-op> base-ops) + (let* ((count (vector-length base-ops)) + (offset (random count))) + (let lp ((i 0)) + (if (< i count) + (match (vector-ref base-ops (modulo (+ i offset) count)) + (($ <base-op> wrap-fn try-fn) + (match (try-fn) + (#f (lp (1+ i))) + (thunk + (if wrap-fn + (call-with-values thunk wrap-fn) + (thunk)))))) + (block))))))) diff --git a/tests/basic.scm b/tests/basic.scm index 1e1ab24..b3d92fa 100644 --- a/tests/basic.scm +++ b/tests/basic.scm @@ -18,8 +18,7 @@ ;;;; (define-module (tests basic) - #:use-module (fibers) - #:use-module (fibers channels)) + #:use-module (fibers)) (define failed? #f) @@ -127,20 +126,6 @@ (assert-run-fibers-returns (1) 1) -(define-syntax-rule (rpc exp) - (let ((ch (make-channel))) - (spawn-fiber (lambda () (put-message ch exp))) - (get-message ch))) - -(assert-run-fibers-returns (1) (rpc 1)) - -(define (rpc-fib n) - (rpc (if (< n 2) - 1 - (+ (rpc-fib (- n 1)) (rpc-fib (- n 2)))))) - -(assert-run-fibers-returns (75025) (rpc-fib 24)) - (define (check-sleep timeout) (spawn-fiber (lambda () (let ((start (get-internal-real-time))) @@ -155,14 +140,8 @@ (assert-run-fibers-terminates (do-times 20 (check-sleep (random 1.0)))) -;; timed channel wait - -;; multi-channel wait - ;; exceptions -;; cross-thread calls - ;; closing port causes pollerr ;; live threads list diff --git a/tests/channels.scm b/tests/channels.scm new file mode 100644 index 0000000..5a9b58b --- /dev/null +++ b/tests/channels.scm @@ -0,0 +1,80 @@ +;; Fibers: cooperative, event-driven user-space threads. + +;;;; Copyright (C) 2016 Free Software Foundation, Inc. +;;;; +;;;; This library is free software; you can redistribute it and/or +;;;; modify it under the terms of the GNU Lesser General Public +;;;; License as published by the Free Software Foundation; either +;;;; version 3 of the License, or (at your option) any later version. +;;;; +;;;; This library is distributed in the hope that it will be useful, +;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;;; Lesser General Public License for more details. +;;;; +;;;; You should have received a copy of the GNU Lesser General Public +;;;; License along with this library; if not, write to the Free Software +;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +;;;; + +(define-module (tests cml) + #:use-module (fibers) + #:use-module (fibers channels)) + +(define failed? #f) + +(define-syntax-rule (assert-equal expected actual) + (let ((x expected)) + (format #t "assert ~s equal to ~s: " 'actual x) + (force-output) + (let ((y actual)) + (cond + ((equal? x y) (format #t "ok\n")) + (else + (format #t "no (got ~s)\n" y) + (set! failed? #t)))))) + +(define-syntax-rule (assert-run-fibers-terminates exp) + (begin + (format #t "assert run-fibers on ~s terminates: " 'exp) + (force-output) + (let ((start (get-internal-real-time))) + (call-with-values (lambda () (run-fibers (lambda () exp))) + (lambda vals + (format #t "ok (~a s)\n" (/ (- (get-internal-real-time) start) + 1.0 internal-time-units-per-second)) + (apply values vals)))))) + +(define-syntax-rule (assert-run-fibers-returns (expected ...) exp) + (begin + (call-with-values (lambda () (assert-run-fibers-terminates exp)) + (lambda run-fiber-return-vals + (assert-equal '(expected ...) run-fiber-return-vals))))) + +(define-syntax-rule (do-times n exp) + (let lp ((count n)) + (let ((count (1- count))) + exp + (unless (zero? count) (lp count))))) + +(define-syntax-rule (rpc exp) + (let ((ch (make-channel))) + (spawn-fiber (lambda () (put-message ch exp))) + (get-message ch))) + +(assert-run-fibers-returns (1) (rpc 1)) + +(define (rpc-fib n) + (rpc (if (< n 2) + 1 + (+ (rpc-fib (- n 1)) (rpc-fib (- n 2)))))) + +(assert-run-fibers-returns (75025) (rpc-fib 24)) + +;; timed channel wait + +;; multi-channel wait + +;; cross-thread calls + +(exit (if failed? 1 0)) |
