diff options
| author | Andy Wingo <wingo@pobox.com> | 2017-01-05 00:04:51 +0100 |
|---|---|---|
| committer | Andy Wingo <wingo@pobox.com> | 2017-01-05 00:09:40 +0100 |
| commit | 3a75c2ef00cff215d016a47ceef1259b3a0dfc26 (patch) | |
| tree | 829475ca35a8231a42f141e6ad5a51a2e306b015 | |
| parent | Remove epoll dep on suspendable ports (diff) | |
| download | guile-fibers-3a75c2ef00cff215d016a47ceef1259b3a0dfc26.tar.gz | |
Add support for operations from foreign threads
* tests/foreign.scm: New file.
* Makefile.am: Add new file.
* fibers/operations.scm (perform-operation): Support blocking operations
from foreign threads (without a scheduler).
* fibers/timers.scm (timer-sched, *timer-sched*, timer-operation): Add
support for timeouts that use an auxiliary thread instead of relying
on the current scheduler.
| -rw-r--r-- | Makefile.am | 1 | ||||
| -rw-r--r-- | fibers/operations.scm | 29 | ||||
| -rw-r--r-- | fibers/timers.scm | 24 | ||||
| -rw-r--r-- | tests/foreign.scm | 71 |
4 files changed, 123 insertions, 2 deletions
diff --git a/Makefile.am b/Makefile.am index 40892c5..d18fa4c 100644 --- a/Makefile.am +++ b/Makefile.am @@ -51,6 +51,7 @@ epoll_la_LDFLAGS = -export-dynamic -module TESTS = \ tests/basic.scm \ tests/channels.scm \ + tests/foreign.scm \ tests/parameters.scm \ tests/preemption.scm TESTS_ENVIRONMENT=top_srcdir="$(abs_top_srcdir)" ./env $(GUILE) -s diff --git a/fibers/operations.scm b/fibers/operations.scm index 7ee21d5..8e9c85c 100644 --- a/fibers/operations.scm +++ b/fibers/operations.scm @@ -141,12 +141,39 @@ the operation cannot complete directly, block until it can complete." (lp (1+ i)))))))) (define (suspend) + ;; Two cases. If there is a current fiber, then we suspend the + ;; current fiber and arrange to restart it when the operation + ;; succeeds. Otherwise we block the current thread until the + ;; operation succeeds, to allow for communication between fibers + ;; and foreign threads. (if (current-fiber) (suspend-current-fiber (lambda (fiber) (define (resume thunk) (resume-fiber fiber thunk)) (block (fiber-scheduler fiber) resume))) - (error "unimplemented"))) + (let ((k #f) + (thread (current-thread)) + (mutex (make-mutex)) + (condvar (make-condition-variable))) + (define (resume thunk) + (cond + ((eq? (current-thread) thread) + (set! k thunk)) + (else + (lock-mutex mutex) + (set! k thunk) + (signal-condition-variable condvar) + (unlock-mutex mutex)))) + (lock-mutex mutex) + (block #f resume) + (let lp () + (cond + (k + (unlock-mutex mutex) + (k)) + (else + (wait-condition-variable condvar mutex) + (lp))))))) ;; First, try to sync on an op. If no op syncs, block. (match op diff --git a/fibers/timers.scm b/fibers/timers.scm index 4affd54..15638b0 100644 --- a/fibers/timers.scm +++ b/fibers/timers.scm @@ -22,10 +22,26 @@ #:use-module (fibers operations) #:use-module (ice-9 atomic) #:use-module (ice-9 match) + #:use-module (ice-9 threads) #:export (wait-operation timer-operation) #:replace (sleep)) +(define *timer-sched* (make-atomic-box #f)) + +(define (timer-sched) + (or (atomic-box-ref *timer-sched*) + (let ((sched (make-scheduler))) + (cond + ((atomic-box-compare-and-swap! *timer-sched* #f sched)) + (else + ;; FIXME: Would be nice to clean up this thread at some point. + (call-with-new-thread + (lambda () + (define (finished?) #f) + (with-scheduler sched (run-scheduler sched finished?)))) + sched))))) + (define (timer-operation expiry) "Make an operation that will succeed when the current time is greater than or equal to @var{expiry}, expressed in internal time @@ -40,7 +56,13 @@ units. The operation will succeed with no values." ('W (resume values)) ('C (timer)) ('S #f))) - (add-timer sched expiry timer)))) + (if sched + (add-timer sched expiry timer) + (create-fiber (timer-sched) + (lambda () + (perform-operation (timer-operation expiry)) + (timer)) + (current-dynamic-state)))))) (define (wait-operation seconds) "Make an operation that will succeed with no values when diff --git a/tests/foreign.scm b/tests/foreign.scm new file mode 100644 index 0000000..d3470bc --- /dev/null +++ b/tests/foreign.scm @@ -0,0 +1,71 @@ +;; Fibers: cooperative, event-driven user-space threads. + +;;;; Copyright (C) 2016 Free Software Foundation, Inc. +;;;; +;;;; This library is free software; you can redistribute it and/or +;;;; modify it under the terms of the GNU Lesser General Public +;;;; License as published by the Free Software Foundation; either +;;;; version 3 of the License, or (at your option) any later version. +;;;; +;;;; This library is distributed in the hope that it will be useful, +;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;;; Lesser General Public License for more details. +;;;; +;;;; You should have received a copy of the GNU Lesser General Public +;;;; License along with this library; if not, write to the Free Software +;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +;;;; + +(define-module (tests foreign) + #:use-module (fibers) + #:use-module (fibers operations) + #:use-module (fibers channels) + #:use-module (fibers timers) + #:use-module (ice-9 threads)) + +(define failed? #f) + +(define-syntax-rule (assert-equal expected actual) + (let ((x expected)) + (format #t "assert ~s equal to ~s: " 'actual x) + (force-output) + (let ((y actual)) + (cond + ((equal? x y) (format #t "ok\n")) + (else + (format #t "no (got ~s)\n" y) + (set! failed? #t)))))) + +(define-syntax-rule (assert-terminates exp) + (begin + (format #t "assert ~s terminates: " 'exp) + (force-output) + exp + (format #t "ok\n"))) + +(define (receive-from-fiber x) + (let* ((ch (make-channel)) + (t (call-with-new-thread + (lambda () + (run-fibers (lambda () (put-message ch 42)))))) + (v (get-message ch))) + (join-thread t) + v)) + +(define (send-to-fiber x) + (let* ((ch (make-channel)) + (t (call-with-new-thread + (lambda () + (run-fibers (lambda () (get-message ch))))))) + (put-message ch x) + (join-thread t))) + +(assert-equal #f #f) +(assert-terminates #t) +(assert-terminates (sleep 1)) +(assert-terminates (perform-operation (wait-operation 1))) +(assert-equal 42 (receive-from-fiber 42)) +(assert-equal 42 (send-to-fiber 42)) + +(exit (if failed? 1 0)) |
