diff options
| author | Andy Wingo <wingo@pobox.com> | 2016-12-29 19:11:25 +0100 |
|---|---|---|
| committer | Andy Wingo <wingo@pobox.com> | 2016-12-29 19:11:25 +0100 |
| commit | 675e2c26dad71ea7ed3c551c98159830aab70d3c (patch) | |
| tree | e2977499ae8d4d9617ab0d95ed065e96b5f79a75 | |
| parent | Flesh out remote peers interface (diff) | |
| download | guile-fibers-675e2c26dad71ea7ed3c551c98159830aab70d3c.tar.gz | |
Enable parallelism by default
* fibers.scm (run-fibers): Default to use all current processor cores.
(%run-fibers, start-auxiliary-threads, stop-auxiliary-threads): New
helpers.
| -rw-r--r-- | fibers.scm | 68 |
1 files changed, 50 insertions, 18 deletions
@@ -24,7 +24,8 @@ #:use-module (fibers repl) #:use-module (fibers timers) #:use-module (fibers interrupts) - #:use-module ((ice-9 threads) #:select (current-thread)) + #:use-module ((ice-9 threads) + #:select (current-thread current-processor-count)) #:use-module ((ice-9 ports internal) #:select (port-read-wait-fd port-write-wait-fd)) #:use-module (ice-9 suspendable-ports) @@ -40,27 +41,58 @@ (lambda (fiber) (resume-on-writable-fd (port-read-wait-fd port) fiber)))) +(define (%run-fibers scheduler hz finished?) + (with-scheduler + scheduler + (parameterize ((current-read-waiter wait-for-readable) + (current-write-waiter wait-for-writable)) + (with-interrupts + hz yield-current-fiber + (lambda () + (run-scheduler scheduler finished?)))))) + +(define (start-auxiliary-threads scheduler hz finished?) + (let ((scheds (scheduler-remote-peers scheduler))) + (let lp ((i 0)) + (when (< i (vector-length scheds)) + (let ((remote (vector-ref scheds i))) + (call-with-new-thread + (lambda () + (%run-fibers remote hz finished?))) + (lp (1+ i))))))) + +(define (stop-auxiliary-threads scheduler) + (let ((scheds (scheduler-remote-peers scheduler))) + (let lp ((i 0)) + (when (< i (vector-length scheds)) + (let* ((remote (vector-ref scheds i)) + (thread (scheduler-kernel-thread remote))) + (when thread + (cancel-thread thread) + (join-thread thread)) + (lp (1+ i))))))) + (define* (run-fibers #:optional (init #f) #:key (hz 0) (scheduler #f) + (parallelism (current-processor-count)) (install-suspendable-ports? #t)) (when install-suspendable-ports? (install-suspendable-ports!)) - (let ((keep-scheduler? (->bool scheduler)) - (scheduler (or scheduler (make-scheduler)))) - (with-scheduler - scheduler - (parameterize ((current-read-waiter wait-for-readable) - (current-write-waiter wait-for-writable)) - (with-interrupts - hz yield-current-fiber - (lambda () - (let ((ret (make-atomic-box #f))) - (spawn-fiber (lambda () - (call-with-values (or init values) - (lambda vals (atomic-box-set! ret vals)))) - scheduler) - (run-scheduler scheduler (lambda () (atomic-box-ref ret))) - (unless keep-scheduler? (destroy-scheduler scheduler)) - (apply values (atomic-box-ref ret))))))))) + (let* ((fresh-scheduler? (not scheduler)) + (scheduler (or scheduler + (make-scheduler #:parallelism parallelism))) + (ret (make-atomic-box #f)) + (finished? (lambda () (atomic-box-ref ret)))) + (spawn-fiber (lambda () + (call-with-values (or init values) + (lambda vals (atomic-box-set! ret vals)))) + scheduler) + (when fresh-scheduler? + (start-auxiliary-threads scheduler hz finished?)) + (%run-fibers scheduler hz finished?) + (when fresh-scheduler? + (stop-auxiliary-threads scheduler) + (destroy-scheduler scheduler)) + (apply values (atomic-box-ref ret)))) (define (current-fiber-scheduler) (match (current-fiber) |
