diff options
| author | Andy Wingo <wingo@pobox.com> | 2017-01-17 07:09:11 +0100 |
|---|---|---|
| committer | Andy Wingo <wingo@pobox.com> | 2017-01-17 07:09:11 +0100 |
| commit | 4fc82243b3f63c60a352bdb4fa3f4d1796c3a97f (patch) | |
| tree | a1abd0cec00a87934648a4df6da1027aa3a8d7bc | |
| parent | Update TODO (diff) | |
| download | guile-fibers-4fc82243b3f63c60a352bdb4fa3f4d1796c3a97f.tar.gz | |
Pin worker threads to CPUs if appropriate
* fibers.scm (with-affinity): New helper.
(%run-fibers): Add affinity argument.
(start-auxiliary-threads): Add affinities argument.
(compute-affinities): New helper.
(run-fibers): If the parallelism is complete, pin threads to CPUs.
| -rw-r--r-- | fibers.scm | 79 |
1 files changed, 58 insertions, 21 deletions
@@ -40,25 +40,39 @@ (lambda (fiber) (resume-on-writable-fd (port-read-wait-fd port) fiber)))) -(define (%run-fibers scheduler hz finished?) - (with-scheduler - scheduler - (parameterize ((current-read-waiter wait-for-readable) - (current-write-waiter wait-for-writable)) - (with-interrupts - hz yield-current-fiber +(define-syntax-rule (with-affinity affinity exp ...) + (let ((saved #f)) + (dynamic-wind (lambda () - (run-scheduler scheduler finished?)))))) + (set! saved (getaffinity 0)) + (setaffinity 0 affinity)) + (lambda () exp ...) + (lambda () + (setaffinity 0 saved))))) + +(define (%run-fibers scheduler hz finished? affinity) + (with-affinity + affinity + (with-scheduler + scheduler + (parameterize ((current-read-waiter wait-for-readable) + (current-write-waiter wait-for-writable)) + (with-interrupts + hz yield-current-fiber + (lambda () + (run-scheduler scheduler finished?))))))) -(define (start-auxiliary-threads scheduler hz finished?) +(define (start-auxiliary-threads scheduler hz finished? affinities) (let ((scheds (scheduler-remote-peers scheduler))) - (let lp ((i 0)) + (let lp ((i 0) (affinities affinities)) (when (< i (vector-length scheds)) - (let ((remote (vector-ref scheds i))) - (call-with-new-thread - (lambda () - (%run-fibers remote hz finished?))) - (lp (1+ i))))))) + (match affinities + ((affinity . affinities) + (let ((remote (vector-ref scheds i))) + (call-with-new-thread + (lambda () + (%run-fibers remote hz finished? affinity))) + (lp (1+ i) affinities)))))))) (define (stop-auxiliary-threads scheduler) (let ((scheds (scheduler-remote-peers scheduler))) @@ -71,30 +85,53 @@ (join-thread thread)) (lp (1+ i))))))) +(define (compute-affinities group-affinity parallelism) + (define (each-thread-has-group-affinity) + (make-list parallelism group-affinity)) + (define (one-thread-per-cpu) + (let lp ((cpu 0)) + (match (bit-position #t group-affinity cpu) + (#f '()) + (cpu (let ((affinity + (make-bitvector (bitvector-length group-affinity) #f))) + (bitvector-set! affinity cpu #t) + (cons affinity (lp (1+ cpu)))))))) + (let ((cpu-count (bit-count #t group-affinity))) + (if (eq? parallelism cpu-count) + (one-thread-per-cpu) + (each-thread-has-group-affinity)))) + (define* (run-fibers #:optional (init #f) #:key (hz 100) (scheduler #f) (parallelism (current-processor-count)) + (cpus (getaffinity 0)) (install-suspendable-ports? #t)) (when install-suspendable-ports? (install-suspendable-ports!)) (cond (scheduler (let ((finished? (lambda () #f))) (when init (spawn-fiber init scheduler)) - (%run-fibers scheduler hz finished?))) + (%run-fibers scheduler hz finished? cpus))) (else (let* ((scheduler (make-scheduler #:parallelism parallelism)) (ret (make-atomic-box #f)) - (finished? (lambda () (atomic-box-ref ret)))) + (finished? (lambda () (atomic-box-ref ret))) + (affinities (compute-affinities cpus parallelism))) (unless init (error "run-fibers requires initial fiber thunk when creating sched")) (spawn-fiber (lambda () (call-with-values init (lambda vals (atomic-box-set! ret vals)))) scheduler) - (dynamic-wind - (lambda () (start-auxiliary-threads scheduler hz finished?)) - (lambda () (%run-fibers scheduler hz finished?)) - (lambda () (stop-auxiliary-threads scheduler))) + (match affinities + ((affinity . affinities) + (dynamic-wind + (lambda () + (start-auxiliary-threads scheduler hz finished? affinities)) + (lambda () + (%run-fibers scheduler hz finished? affinity)) + (lambda () + (stop-auxiliary-threads scheduler))))) (destroy-scheduler scheduler) (apply values (atomic-box-ref ret)))))) |
