1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
;; Conditions
;;;; Copyright (C) 2017 Andy Wingo <wingo@pobox.com>
;;;;
;;;; This library is free software; you can redistribute it and/or
;;;; modify it under the terms of the GNU Lesser General Public
;;;; License as published by the Free Software Foundation; either
;;;; version 3 of the License, or (at your option) any later version.
;;;;
;;;; This library is distributed in the hope that it will be useful,
;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;;; Lesser General Public License for more details.
;;;;
;;;; You should have received a copy of the GNU Lesser General Public
;;;; License along with this library; if not, write to the Free Software
;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
;;; Condition variable (cvar) implementation following the 2009 ICFP
;;; paper "Parallel Concurrent ML" by John Reppy, Claudio V. Russo,
;;; and Yingqui Xiao. See channels.scm for additional commentary.
;;;
;;; Besides the general ways in which this implementation differs from
;;; the paper, this channel implementation avoids locks entirely.
;;; Still, we should disable interrupts while any operation is in a
;;; "claimed" state to avoid excess latency due to pre-emption. It
;;; would be great if we could verify our protocol though; the
;;; parallel channel operations are still gnarly.
(define-module (fibers conditions)
#:use-module (srfi srfi-9)
#:use-module (ice-9 atomic)
#:use-module (ice-9 match)
#:use-module (fibers stack)
#:use-module (fibers operations)
#:export (make-condition
condition?
signal-condition!
wait-operation
wait))
;;; Counter utilities
;;;
;;; Counters here are an atomic box containing an integer which are
;;; either decremented or reset.
;; How many times we run the block-fn until we gc
(define %steps-till-gc 42) ; haven't tried testing for the most efficient number
(define (make-counter)
(make-atomic-box %steps-till-gc))
(define (counter-decrement! counter)
"Decrement integer in atomic box COUNTER."
(let spin ((x (atomic-box-ref counter)))
(let* ((x-new (1- x))
(x* (atomic-box-compare-and-swap! counter x x-new)))
(if (= x* x) ; successful decrement
x-new
(spin x*)))))
(define (counter-reset! counter)
"Reset a counter's contents."
(atomic-box-set! counter %steps-till-gc))
;;; Conditions
(define-record-type <condition>
(%make-condition signalled? waiters gc-step)
condition?
;; atomic box of bool
(signalled? condition-signalled?)
;; stack of flag+resume pairs
(waiters channel-waiters)
;; count until garbage collection
(gc-step channel-gc-step))
(define (make-condition)
"Make a fresh condition variable."
(%make-condition (make-atomic-box #f) (make-empty-stack) (make-counter)))
(define (resume-waiters! waiters)
(define (resume-one flag resume)
(match (atomic-box-compare-and-swap! flag 'W 'S)
('W (resume values))
('C (resume-one flag resume))
('S #f)))
;; Non-tail-recursion to resume waiters in the order they were added
;; to the waiters stack.
(let lp ((waiters (stack-pop-all! waiters)))
(match waiters
(() #f)
(((flag . resume) . waiters)
(lp waiters)
(resume-one flag resume)))))
(define (signal-condition! cvar)
"Mark @var{cvar} as having been signalled. Resume any fiber or
thread waiting for @var{cvar}. If @var{cvar} is already signalled,
calling @code{signal-condition!} does nothing and returns @code{#f};
returns @code{#t} otherwise."
(match cvar
(($ <condition> signalled? waiters)
(match (atomic-box-compare-and-swap! signalled? #f #t)
(#f ;; We signalled the cvar.
(resume-waiters! waiters)
#t)
(#t ;; Cvar already signalled.
#f)))))
(define (wait-operation cvar)
"Make an operation that will complete when @var{cvar} is signalled."
(match cvar
(($ <condition> signalled? waiters gc-step)
(define (try-fn) (and (atomic-box-ref signalled?) values))
(define (block-fn flag sched resume)
;; Decrement the garbage collection counter.
;; If we've surpassed the number of steps until garbage collection,
;; prune out waiters that have already succeeded.
;;
;; Note that it's possible that this number will go negative,
;; but stack-filter! should handle this without errors (though
;; possibly extra spin), and testing against zero rather than
;; less than zero will prevent multiple threads from repeating
;; this work.
(when (= (counter-decrement! gc-step) 0)
(stack-filter! waiters
(match-lambda
((flag . resume)
(not (eq? (atomic-box-ref flag) 'S)))))
(counter-reset! gc-step))
;; We have suspended the current fiber or thread; arrange for
;; signal-condition! to call resume-get by adding the flag and
;; resume callback to the cvar's waiters stack.
(stack-push! waiters (cons flag resume))
;; It could be that the cvar was actually signalled in between
;; the calls to try-fn and block-fn. In that case it could be
;; that resume-waiters! was called before our push above. In
;; that case, call resume-waiters! to resolve the race.
(when (atomic-box-ref signalled?)
(resume-waiters! waiters))
(values))
(make-base-operation #f try-fn block-fn))))
(define (wait cvar)
"Wait until @var{cvar} has been signalled."
(perform-operation (wait-operation cvar)))
|