summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Wingo <wingo@pobox.com>2016-12-29 19:11:25 +0100
committerAndy Wingo <wingo@pobox.com>2016-12-29 19:11:25 +0100
commit675e2c26dad71ea7ed3c551c98159830aab70d3c (patch)
treee2977499ae8d4d9617ab0d95ed065e96b5f79a75
parentFlesh out remote peers interface (diff)
downloadguile-fibers-675e2c26dad71ea7ed3c551c98159830aab70d3c.tar.gz
Enable parallelism by default
* fibers.scm (run-fibers): Default to use all current processor cores. (%run-fibers, start-auxiliary-threads, stop-auxiliary-threads): New helpers.
-rw-r--r--fibers.scm68
1 files changed, 50 insertions, 18 deletions
diff --git a/fibers.scm b/fibers.scm
index 03a0214..2cad1b0 100644
--- a/fibers.scm
+++ b/fibers.scm
@@ -24,7 +24,8 @@
#:use-module (fibers repl)
#:use-module (fibers timers)
#:use-module (fibers interrupts)
- #:use-module ((ice-9 threads) #:select (current-thread))
+ #:use-module ((ice-9 threads)
+ #:select (current-thread current-processor-count))
#:use-module ((ice-9 ports internal)
#:select (port-read-wait-fd port-write-wait-fd))
#:use-module (ice-9 suspendable-ports)
@@ -40,27 +41,58 @@
(lambda (fiber)
(resume-on-writable-fd (port-read-wait-fd port) fiber))))
+(define (%run-fibers scheduler hz finished?)
+ (with-scheduler
+ scheduler
+ (parameterize ((current-read-waiter wait-for-readable)
+ (current-write-waiter wait-for-writable))
+ (with-interrupts
+ hz yield-current-fiber
+ (lambda ()
+ (run-scheduler scheduler finished?))))))
+
+(define (start-auxiliary-threads scheduler hz finished?)
+ (let ((scheds (scheduler-remote-peers scheduler)))
+ (let lp ((i 0))
+ (when (< i (vector-length scheds))
+ (let ((remote (vector-ref scheds i)))
+ (call-with-new-thread
+ (lambda ()
+ (%run-fibers remote hz finished?)))
+ (lp (1+ i)))))))
+
+(define (stop-auxiliary-threads scheduler)
+ (let ((scheds (scheduler-remote-peers scheduler)))
+ (let lp ((i 0))
+ (when (< i (vector-length scheds))
+ (let* ((remote (vector-ref scheds i))
+ (thread (scheduler-kernel-thread remote)))
+ (when thread
+ (cancel-thread thread)
+ (join-thread thread))
+ (lp (1+ i)))))))
+
(define* (run-fibers #:optional (init #f)
#:key (hz 0) (scheduler #f)
+ (parallelism (current-processor-count))
(install-suspendable-ports? #t))
(when install-suspendable-ports? (install-suspendable-ports!))
- (let ((keep-scheduler? (->bool scheduler))
- (scheduler (or scheduler (make-scheduler))))
- (with-scheduler
- scheduler
- (parameterize ((current-read-waiter wait-for-readable)
- (current-write-waiter wait-for-writable))
- (with-interrupts
- hz yield-current-fiber
- (lambda ()
- (let ((ret (make-atomic-box #f)))
- (spawn-fiber (lambda ()
- (call-with-values (or init values)
- (lambda vals (atomic-box-set! ret vals))))
- scheduler)
- (run-scheduler scheduler (lambda () (atomic-box-ref ret)))
- (unless keep-scheduler? (destroy-scheduler scheduler))
- (apply values (atomic-box-ref ret)))))))))
+ (let* ((fresh-scheduler? (not scheduler))
+ (scheduler (or scheduler
+ (make-scheduler #:parallelism parallelism)))
+ (ret (make-atomic-box #f))
+ (finished? (lambda () (atomic-box-ref ret))))
+ (spawn-fiber (lambda ()
+ (call-with-values (or init values)
+ (lambda vals (atomic-box-set! ret vals))))
+ scheduler)
+ (when fresh-scheduler?
+ (start-auxiliary-threads scheduler hz finished?))
+ (%run-fibers scheduler hz finished?)
+ (when fresh-scheduler?
+ (stop-auxiliary-threads scheduler)
+ (destroy-scheduler scheduler))
+ (apply values (atomic-box-ref ret))))
(define (current-fiber-scheduler)
(match (current-fiber)