1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
|
;; Fibers: cooperative, event-driven user-space threads.
;;;; Copyright (C) 2016 Free Software Foundation, Inc.
;;;;
;;;; This library is free software; you can redistribute it and/or
;;;; modify it under the terms of the GNU Lesser General Public
;;;; License as published by the Free Software Foundation; either
;;;; version 3 of the License, or (at your option) any later version.
;;;;
;;;; This library is distributed in the hope that it will be useful,
;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;;; Lesser General Public License for more details.
;;;;
;;;; You should have received a copy of the GNU Lesser General Public
;;;; License along with this library; if not, write to the Free Software
;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
;;;;
(define-module (fibers scheduler)
#:use-module (srfi srfi-9)
#:use-module (fibers stack)
#:use-module (fibers epoll)
#:use-module (fibers psq)
#:use-module (ice-9 atomic)
#:use-module (ice-9 control)
#:use-module (ice-9 match)
#:use-module (ice-9 fdes-finalizers)
#:use-module ((ice-9 threads) #:select (current-thread))
#:export (;; Low-level interface: schedulers and tasks.
make-scheduler
(current-scheduler/public . current-scheduler)
scheduler-runcount
(scheduler-kernel-thread/public . scheduler-kernel-thread)
scheduler-remote-peers
scheduler-work-pending?
choose-parallel-scheduler
run-scheduler
destroy-scheduler
schedule-task
schedule-task-when-fd-readable
schedule-task-when-fd-writable
schedule-task-at-time
suspend-current-task
yield-current-task))
(define-record-type <scheduler>
(%make-scheduler epfd runcount-box prompt-tag
next-runqueue current-runqueue
fd-waiters timers kernel-thread
remote-peers choose-parallel-scheduler)
scheduler?
(epfd scheduler-epfd)
;; atomic variable of uint32
(runcount-box scheduler-runcount-box)
(prompt-tag scheduler-prompt-tag)
;; atomic stack of tasks to run next turn (reverse order)
(next-runqueue scheduler-next-runqueue)
;; atomic stack of tasks to run this turn
(current-runqueue scheduler-current-runqueue)
;; fd -> (total-events (events . task) ...)
(fd-waiters scheduler-fd-waiters)
;; PSQ of expiry -> task
(timers scheduler-timers set-scheduler-timers!)
;; atomic parameter of thread
(kernel-thread scheduler-kernel-thread)
;; list of sched
(remote-peers scheduler-remote-peers set-scheduler-remote-peers!)
;; () -> sched
(choose-parallel-scheduler scheduler-choose-parallel-scheduler
set-scheduler-choose-parallel-scheduler!))
(define (make-atomic-parameter init)
(let ((box (make-atomic-box init)))
(case-lambda
(() (atomic-box-ref box))
((new)
(if (eq? new init)
(atomic-box-set! box new)
(let ((prev (atomic-box-compare-and-swap! box init new)))
(unless (eq? prev init)
(error "owned by other thread" prev))))))))
(define (shuffle l)
(map cdr (sort (map (lambda (x) (cons (random 1.0) x)) l)
(lambda (a b) (< (car a) (car b))))))
(define (make-selector items)
(let ((items (list->vector (shuffle items))))
(match (vector-length items)
(0 (lambda () #f))
(1 (let ((item (vector-ref items 0))) (lambda () item)))
(n (let ((idx 0))
(lambda ()
(let ((item (vector-ref items idx)))
(set! idx (let ((idx (1+ idx)))
(if (= idx (vector-length items)) 0 idx)))
item)))))))
(define* (make-scheduler #:key parallelism
(prompt-tag (make-prompt-tag "fibers")))
"Make a new scheduler in which to run fibers."
(let ((epfd (epoll-create))
(runcount-box (make-atomic-box 0))
(next-runqueue (make-empty-stack))
(current-runqueue (make-empty-stack))
(fd-waiters (make-hash-table))
(timers (make-psq (match-lambda*
(((t1 . c1) (t2 . c2)) (< t1 t2)))
<))
(kernel-thread (make-atomic-parameter #f)))
(let* ((sched (%make-scheduler epfd runcount-box prompt-tag
next-runqueue current-runqueue
fd-waiters timers kernel-thread
#f #f))
(all-scheds
(cons sched
(if parallelism
(map (lambda (_)
(make-scheduler #:prompt-tag prompt-tag))
(iota (1- parallelism)))
'()))))
(for-each
(lambda (sched)
(let ((choose! (make-selector all-scheds)))
(set-scheduler-remote-peers! sched (delq sched all-scheds))
(set-scheduler-choose-parallel-scheduler! sched choose!)))
all-scheds)
sched)))
(define current-scheduler (fluid->parameter (make-thread-local-fluid #f)))
(define (current-scheduler/public)
"Return the current scheduler, or @code{#f} if no scheduler is current."
(current-scheduler))
(define-syntax-rule (with-scheduler scheduler body ...)
"Evaluate @code{(begin @var{body} ...)} in an environment in which
@var{scheduler} is bound to the current kernel thread and
@code{current-scheduler} is bound to @var{scheduler}. Signal an error
if @var{scheduler} is already running in some other kernel thread."
(let ((sched scheduler))
(dynamic-wind (lambda ()
((scheduler-kernel-thread sched) (current-thread)))
(lambda ()
(parameterize ((current-scheduler sched))
body ...))
(lambda ()
((scheduler-kernel-thread sched) #f)))))
(define (scheduler-runcount sched)
"Return the number of tasks that have been run on @var{sched} since
it was started, modulo 2@sup{32}."
(atomic-box-ref (scheduler-runcount-box sched)))
(define (scheduler-kernel-thread/public sched)
"Return the kernel thread on which @var{sched} is running, or
@code{#f} if @var{sched} is not running."
((scheduler-kernel-thread sched)))
(define (choose-parallel-scheduler sched)
((scheduler-choose-parallel-scheduler sched)))
(define-inlinable (schedule-task/no-wakeup sched task)
(stack-push! (scheduler-next-runqueue sched) task))
(define (schedule-task sched task)
"Add the task @var{task} to the run queue of the scheduler
@var{sched}. On the next turn, @var{sched} will invoke @var{task}
with no arguments.
This function is thread-safe even if @var{sched} is running on a
remote kernel thread."
(schedule-task/no-wakeup sched task)
(unless (eq? ((scheduler-kernel-thread sched)) (current-thread))
(epoll-wake! (scheduler-epfd sched)))
(values))
(define (schedule-tasks-for-active-fd fd revents sched)
(match (hashv-ref (scheduler-fd-waiters sched) fd)
(#f (warn "scheduler for unknown fd" fd))
((and events+waiters (active-events . waiters))
;; First, clear the active status, as the EPOLLONESHOT has
;; deactivated our entry in the epoll set.
(set-car! events+waiters #f)
(set-cdr! events+waiters '())
(unless (zero? (logand revents EPOLLERR))
(hashv-remove! (scheduler-fd-waiters sched) fd))
;; Now resume or re-schedule waiters, as appropriate.
(let lp ((waiters waiters))
(match waiters
(() #f)
(((events . task) . waiters)
(if (zero? (logand revents (logior events EPOLLERR)))
;; Re-schedule.
(schedule-task-when-fd-active sched fd events task)
;; Resume.
(schedule-task/no-wakeup sched task))
(lp waiters)))))))
(define (schedule-tasks-for-expired-timers sched)
;; Schedule expired timer tasks in the order that they expired.
(let ((now (get-internal-real-time)))
(let expire-timers ((timers (scheduler-timers sched)))
(cond
((or (psq-empty? timers)
(< now (car (psq-min timers))))
(set-scheduler-timers! sched timers))
(else
(call-with-values (lambda () (psq-pop timers))
(match-lambda*
(((_ . task) timers)
(schedule-task/no-wakeup sched task)
(expire-timers timers)))))))))
(define (schedule-tasks-for-next-turn sched)
;; Called when all tasks from the current turn have been run.
;; Note that there may be tasks already scheduled for the next
;; turn; one way this can happen is if a fiber suspended itself
;; because it was blocked on a channel, but then another fiber woke
;; it up, or if a remote thread scheduled a fiber on this scheduler.
;; In any case, check the kernel to see if any of the fd's that we
;; are interested in are active, and in that case schedule their
;; corresponding tasks. Also run any timers that have timed out.
(define (timers-expiry timers)
(and (not (psq-empty? timers))
(match (psq-min timers)
((expiry . task)
expiry))))
(define (update-expiry expiry)
;; If there are pending tasks, cause epoll to return
;; immediately.
(if (stack-empty? (scheduler-next-runqueue sched))
expiry
0))
(epoll (scheduler-epfd sched)
#:expiry (timers-expiry (scheduler-timers sched))
#:update-expiry update-expiry
#:folder (lambda (fd revents sched)
(schedule-tasks-for-active-fd fd revents sched)
sched)
#:seed sched)
(schedule-tasks-for-expired-timers sched))
(define (work-stealer sched)
"Steal some work from a random scheduler in the vector
@var{schedulers}. Return a task, or @code{#f} if no work could be
stolen."
(let ((selector (make-selector (scheduler-remote-peers sched))))
(lambda ()
(let ((peer (selector)))
(and peer
(stack-pop! (scheduler-current-runqueue peer) #f))))))
(define (scheduler-work-pending? sched)
"Return @code{#t} if @var{sched} has any work pending: any tasks or
any pending timeouts."
(not (and (psq-empty? (scheduler-timers sched))
(stack-empty? (scheduler-current-runqueue sched))
(stack-empty? (scheduler-next-runqueue sched)))))
(define* (run-scheduler sched finished?)
"Run @var{sched} until calling @code{finished?} returns a true
value. Return zero values."
(let ((tag (scheduler-prompt-tag sched))
(runcount-box (scheduler-runcount-box sched))
(next (scheduler-next-runqueue sched))
(cur (scheduler-current-runqueue sched))
(steal-work! (work-stealer sched)))
(define (run-task task)
(atomic-box-set! runcount-box
(logand (1+ (atomic-box-ref runcount-box)) #xffffFFFF))
(call-with-prompt tag
task
(lambda (k after-suspend)
(after-suspend sched k))))
(define (next-task)
(match (stack-pop! cur #f)
(#f
(when (stack-empty? next)
;; Both current and next runqueues are empty; steal a
;; little bit of work from a remote scheduler if we
;; can. Run it directly instead of pushing onto a
;; queue to avoid double stealing.
(let ((task (steal-work!)))
(when task
(run-task task))))
(next-turn))
(task
(run-task task)
(next-task))))
(define (next-turn)
(unless (finished?)
(schedule-tasks-for-next-turn sched)
(stack-push-list! cur (reverse (stack-pop-all! next)))
(next-task)))
(define (run-scheduler/error-handling)
(catch #t
next-task
(lambda _ (run-scheduler/error-handling))
(let ((err (current-error-port)))
(lambda (key . args)
(false-if-exception
(let ((stack (make-stack #t 4 tag)))
(format err "Uncaught exception in task:\n")
;; FIXME: Guile's display-backtrace isn't respecting
;; stack narrowing; manually passing stack-length as
;; depth is a workaround.
(display-backtrace stack err 0 (stack-length stack))
(print-exception err (stack-ref stack 0)
key args)))))))
(with-scheduler sched (run-scheduler/error-handling))))
(define (destroy-scheduler sched)
"Release any resources associated with @var{sched}."
(epoll-destroy (scheduler-epfd sched)))
(define (schedule-task-when-fd-active sched fd events task)
"Arrange for @var{sched} to schedule @var{task} when the file
descriptor @var{fd} becomes active with any of the given @var{events},
expressed as an epoll bitfield."
(let ((fd-waiters (hashv-ref (scheduler-fd-waiters sched) fd)))
(match fd-waiters
((active-events . waiters)
(set-cdr! fd-waiters (acons events task waiters))
(unless (and active-events
(= (logand events active-events) events))
(let ((active-events (logior events (or active-events 0))))
(set-car! fd-waiters active-events)
(epoll-add*! (scheduler-epfd sched) fd
(logior active-events EPOLLONESHOT)))))
(#f
(let ((fd-waiters (list events (cons events task))))
(define (finalize-fd fd)
;; When a file port is closed, clear out the list of
;; waiting tasks so that when/if this FD is re-used, we
;; don't resume stale tasks. Note that we don't need to
;; remove the FD from the epoll set, as the kernel manages
;; that for us.
;;
;; FIXME: Is there a way to wake all tasks in a thread-safe
;; way? Note that this function may be invoked from a
;; finalizer thread.
(set-cdr! fd-waiters '())
(set-car! fd-waiters #f))
(hashv-set! (scheduler-fd-waiters sched) fd fd-waiters)
(add-fdes-finalizer! fd finalize-fd)
(epoll-add*! (scheduler-epfd sched) fd
(logior events EPOLLONESHOT)))))))
(define (schedule-task-when-fd-readable sched fd task)
"Arrange to schedule @var{task} on @var{sched} when the file
descriptor @var{fd} becomes readable."
(schedule-task-when-fd-active sched fd (logior EPOLLIN EPOLLRDHUP) task))
(define (schedule-task-when-fd-writable sched fd task)
"Arrange to schedule @var{k} on @var{sched} when the file descriptor
@var{fd} becomes writable."
(schedule-task-when-fd-active sched fd EPOLLOUT task))
(define (schedule-task-at-time sched expiry task)
"Arrange to schedule @var{task} when the absolute real time is
greater than or equal to @var{expiry}, expressed in internal time
units."
(set-scheduler-timers! sched
(psq-set (scheduler-timers sched)
(cons expiry task)
expiry)))
;; Shim for Guile 2.1.5.
(unless (defined? 'suspendable-continuation?)
(define! 'suspendable-continuation? (lambda (tag) #t)))
(define* (suspend-current-task after-suspend)
"Suspend the current task. After suspending, call the
@var{after-suspend} callback with two arguments: the current
scheduler, and the continuation of the current task. The continuation
passed to the @var{after-suspend} handler is the continuation of the
@code{suspend-current-task} call."
(let ((tag (scheduler-prompt-tag (current-scheduler))))
(unless (suspendable-continuation? tag)
(error "Attempt to suspend fiber within continuation barrier"))
(abort-to-prompt tag after-suspend)))
(define* (yield-current-task)
"Yield control to the current scheduler. Like calling
@code{(suspend-current-task schedule-task)} except that it avoids
suspending if the current continuation isn't suspendable. Returns
@code{#t} if the yield succeeded, or @code{#f} otherwise."
(match (current-scheduler)
(#f #f)
(sched
(let ((tag (scheduler-prompt-tag sched)))
(and (suspendable-continuation? tag)
(begin
(abort-to-prompt tag schedule-task)
#t))))))
|