summaryrefslogtreecommitdiff
path: root/fibers/internal.scm
blob: 79b10117ec3b9d2ac329eab7a651c3403f97d24b (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
;; Fibers: cooperative, event-driven user-space threads.

;;;; Copyright (C) 2016 Free Software Foundation, Inc.
;;;;
;;;; This library is free software; you can redistribute it and/or
;;;; modify it under the terms of the GNU Lesser General Public
;;;; License as published by the Free Software Foundation; either
;;;; version 3 of the License, or (at your option) any later version.
;;;;
;;;; This library is distributed in the hope that it will be useful,
;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
;;;; Lesser General Public License for more details.
;;;;
;;;; You should have received a copy of the GNU Lesser General Public
;;;; License along with this library; if not, write to the Free Software
;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
;;;;

(define-module (fibers internal)
  #:use-module (srfi srfi-9)
  #:use-module (fibers stack)
  #:use-module (fibers epoll)
  #:use-module (fibers psq)
  #:use-module (fibers nameset)
  #:use-module (ice-9 atomic)
  #:use-module (ice-9 control)
  #:use-module (ice-9 match)
  #:use-module (ice-9 fdes-finalizers)
  #:use-module ((ice-9 threads) #:select (current-thread))
  #:export (;; Low-level interface: schedulers and threads.
            make-scheduler
            with-scheduler
            scheduler-name
            scheduler-runcount
            (scheduler-kernel-thread/public . scheduler-kernel-thread)
            scheduler-remote-peers
            scheduler-work-pending?
            choose-parallel-scheduler
            run-scheduler
            destroy-scheduler

            resume-on-readable-fd
            resume-on-writable-fd
            add-timer

            create-fiber
            (current-fiber/public . current-fiber)
            kill-fiber
            fiber-scheduler
            fiber-continuation

            fold-all-schedulers
            scheduler-by-name
            fold-all-fibers
            fiber-by-name

            suspend-current-fiber
            resume-fiber
            yield-current-fiber))

(define-once fibers-nameset (make-nameset))
(define-once schedulers-nameset (make-nameset))

(define (fold-all-schedulers f seed)
  "Fold @var{f} over the set of known schedulers.  @var{f} will be
invoked as @code{(@var{f} @var{name} @var{scheduler} @var{seed})}."
  (nameset-fold f schedulers-nameset seed))
(define (scheduler-by-name name)
  "Return the scheduler named @var{name}, or @code{#f} if no scheduler
of that name is known."
  (nameset-ref schedulers-nameset name))

(define (fold-all-fibers f seed)
  "Fold @var{f} over the set of known fibers.  @var{f} will be
invoked as @code{(@var{f} @var{name} @var{fiber} @var{seed})}."
  (nameset-fold f fibers-nameset seed))
(define (fiber-by-name name)
  "Return the fiber named @var{name}, or @code{#f} if no fiber of that
name is known."
  (nameset-ref fibers-nameset name))

(define-record-type <scheduler>
  (%make-scheduler name epfd runcount-box prompt-tag
                   next-runqueue current-runqueue
                   sources timers kernel-thread
                   remote-peers choose-parallel-scheduler)
  scheduler?
  (name scheduler-name set-scheduler-name!)
  (epfd scheduler-epfd)
  ;; atomic variable of uint32
  (runcount-box scheduler-runcount-box)
  (prompt-tag scheduler-prompt-tag)
  ;; atomic stack of fiber to run next turn (reverse order)
  (next-runqueue scheduler-next-runqueue)
  ;; atomic stack of fiber to run this turn
  (current-runqueue scheduler-current-runqueue)
  ;; fd -> (total-events (events . resume-fn) ...)
  (sources scheduler-sources)
  ;; PSQ of thunk -> expiry
  (timers scheduler-timers set-scheduler-timers!)
  ;; atomic parameter of thread
  (kernel-thread scheduler-kernel-thread)
  ;; list of sched
  (remote-peers scheduler-remote-peers set-scheduler-remote-peers!)
  ;; () -> sched
  (choose-parallel-scheduler scheduler-choose-parallel-scheduler
                             set-scheduler-choose-parallel-scheduler!))

(define-record-type <fiber>
  (make-fiber scheduler continuation)
  fiber?
  ;; The scheduler to which a fiber is currently bound.
  (scheduler fiber-scheduler set-fiber-scheduler!)
  ;; What the fiber should do when it resumes, or #f if the fiber is
  ;; currently running.
  (continuation fiber-continuation set-fiber-continuation!))

(define (make-atomic-parameter init)
  (let ((box (make-atomic-box init)))
    (case-lambda
      (() (atomic-box-ref box))
      ((new)
       (if (eq? new init)
           (atomic-box-set! box new)
           (let ((prev (atomic-box-compare-and-swap! box init new)))
             (unless (eq? prev init)
               (error "owned by other thread" prev))))))))

(define (shuffle l)
  (map cdr (sort (map (lambda (x) (cons (random 1.0) x)) l)
                 (lambda (a b) (< (car a) (car b))))))

(define (make-selector items)
  (let ((items (list->vector (shuffle items))))
    (match (vector-length items)
      (0 (lambda () #f))
      (1 (let ((item (vector-ref items 0))) (lambda () item)))
      (n (let ((idx 0))
           (lambda ()
             (let ((item (vector-ref items idx)))
               (set! idx (let ((idx (1+ idx)))
                           (if (= idx (vector-length items)) 0 idx)))
               item)))))))

(define* (make-scheduler #:key parallelism
                         (prompt-tag (make-prompt-tag "fibers")))
  "Make a new scheduler in which to run fibers."
  (let ((epfd (epoll-create))
        (runcount-box (make-atomic-box 0))
        (next-runqueue (make-empty-stack))
        (current-runqueue (make-empty-stack))
        (sources (make-hash-table))
        (timers (make-psq (match-lambda*
                            (((t1 . c1) (t2 . c2)) (< t1 t2)))
                          <))
        (kernel-thread (make-atomic-parameter #f)))
    (let ((sched (%make-scheduler #f epfd runcount-box prompt-tag
                                  next-runqueue current-runqueue
                                  sources timers kernel-thread
                                  #f #f)))
      (set-scheduler-name! sched (nameset-add! schedulers-nameset sched))
      (let ((all-scheds
             (cons sched
                   (if parallelism
                       (map (lambda (_)
                              (make-scheduler #:prompt-tag prompt-tag))
                            (iota (1- parallelism)))
                       '()))))
        (for-each
         (lambda (sched)
           (let ((choose! (make-selector all-scheds)))
             (set-scheduler-remote-peers! sched (delq sched all-scheds))
             (set-scheduler-choose-parallel-scheduler! sched choose!)))
         all-scheds))
      sched)))

(define-syntax-rule (with-scheduler scheduler body ...)
  "Evaluate @code{(begin @var{body} ...)} in an environment in which
@var{scheduler} is bound to the current kernel thread.  Signal an
error if @var{scheduler} is already running in some other kernel
thread."
  (let ((sched scheduler))
    (dynamic-wind (lambda ()
                    ((scheduler-kernel-thread sched) (current-thread)))
                  (lambda ()
                    body ...)
                  (lambda ()
                    ((scheduler-kernel-thread sched) #f)))))

(define (scheduler-runcount sched)
  "Return the number of fibers that have been scheduled on
@var{sched} since it was started, modulo 2@sup{32}."
  (atomic-box-ref (scheduler-runcount-box sched)))

(define (scheduler-kernel-thread/public sched)
  "Return the kernel thread on which @var{sched} is running, or
@code{#f} if @var{sched} is not running."
  ((scheduler-kernel-thread sched)))

(define (choose-parallel-scheduler sched)
  ((scheduler-choose-parallel-scheduler sched)))

(define current-fiber (make-parameter #f))
(define (current-fiber/public)
  "Return the current fiber, or @code{#f} if no fiber is current."
  (current-fiber))

(define (schedule-fiber! fiber thunk)
  ;; The fiber will be resumed at most once, and we are the ones that
  ;; will resume it, so we can set the thunk directly.  Adding the
  ;; fiber to the runqueue is an atomic operation with SEQ_CST
  ;; ordering, so that will make sure this operation is visible even
  ;; for a fiber scheduled on a remote thread.
  (set-fiber-continuation! fiber thunk)
  (let ((sched (fiber-scheduler fiber)))
    (stack-push! (scheduler-next-runqueue sched) fiber)
    (unless (eq? ((scheduler-kernel-thread sched)) (current-thread))
      (epoll-wake! (scheduler-epfd sched)))
    (values)))

(define internal-time-units-per-millisecond
  (/ internal-time-units-per-second 1000))

(define (schedule-fibers-for-fd fd revents sched)
  (match (hashv-ref (scheduler-sources sched) fd)
    (#f (warn "scheduler for unknown fd" fd))
    ((and sources (active-events . waiters))
     ;; First, clear the active status, as the EPOLLONESHOT has
     ;; deactivated our entry in the epoll set.
     (set-car! sources #f)
     (set-cdr! sources '())
     (unless (zero? (logand revents EPOLLERR))
       (hashv-remove! (scheduler-sources sched) fd))
     ;; Now resume or re-enqueue fibers, as appropriate.
     (let lp ((waiters waiters))
       (match waiters
         (() #f)
         (((events . resume) . waiters)
          (if (zero? (logand revents (logior events EPOLLERR)))
              ;; Re-enqueue.
              (add-fd-event-waiter sched fd events resume)
              ;; Resume.
              (resume revents))
          (lp waiters)))))))

(define (run-timers sched)
  ;; Run expired timer thunks in the order that they expired.
  (let ((now (get-internal-real-time)))
    (let run-timers ((timers (scheduler-timers sched)))
      (cond
       ((or (psq-empty? timers)
            (< now (car (psq-min timers))))
        (set-scheduler-timers! sched timers))
       (else
        (call-with-values (lambda () (psq-pop timers))
          (match-lambda*
            (((_ . thunk) timers)
             (thunk)
             (run-timers timers)))))))))

(define (schedule-runnables-for-next-turn sched)
  ;; Called when all runnables from the current turn have been run.
  ;; Note that there may be runnables already scheduled for the next
  ;; turn; one way this can happen is if a fiber suspended itself
  ;; because it was blocked on a channel, but then another fiber woke
  ;; it up, or if a remote thread scheduled a fiber on this scheduler.
  ;; In any case, check the kernel to see if any of the fd's that we
  ;; are interested in are active, and in that case schedule their
  ;; corresponding fibers.  Also run any timers that have timed out.
  (define (timers-expiry timers)
    (and (not (psq-empty? timers))
         (match (psq-min timers)
           ((expiry . thunk)
            expiry))))
  (define (update-expiry expiry)
    ;; If there are pending runnables, cause epoll to return
    ;; immediately.
    (if (stack-empty? (scheduler-next-runqueue sched))
        expiry
        0))
  (epoll (scheduler-epfd sched)
         #:expiry (timers-expiry (scheduler-timers sched))
         #:update-expiry update-expiry
         #:folder (lambda (fd revents sched)
                    (schedule-fibers-for-fd fd revents sched)
                    sched)
         #:seed sched)
  (run-timers sched))

(define (fiber-stealer sched)
  "Steal some work from a random scheduler in the vector
@var{schedulers}.  Return a fiber, or @code{#f} if no work could be
stolen."
  (let ((selector (make-selector (scheduler-remote-peers sched))))
    (lambda ()
      (let ((peer (selector)))
        (and peer
             (stack-pop! (scheduler-current-runqueue peer) #f))))))

(define (scheduler-work-pending? sched)
  "Return @code{#t} if @var{sched} has any work pending: any runnable
fibers or any pending timeouts."
  (not (and (psq-empty? (scheduler-timers sched))
            (stack-empty? (scheduler-current-runqueue sched))
            (stack-empty? (scheduler-next-runqueue sched)))))

(define* (run-scheduler sched finished?)
  "Run @var{sched} until there are no more fibers ready to run, no
file descriptors being waited on, and no more timers pending to run.
Return zero values."
  (let ((tag (scheduler-prompt-tag sched))
        (runcount-box (scheduler-runcount-box sched))
        (next (scheduler-next-runqueue sched))
        (cur (scheduler-current-runqueue sched))
        (steal-fiber! (fiber-stealer sched)))
    (define (run-fiber fiber)
      (atomic-box-set! runcount-box
                       (logand (1+ (atomic-box-ref runcount-box)) #xffffFFFF))
      (call-with-prompt tag
        (lambda ()
          (let ((thunk (fiber-continuation fiber)))
            (set-fiber-continuation! fiber #f)
            (thunk)))
        (lambda (k after-suspend)
          (set-fiber-continuation! fiber k)
          (after-suspend fiber))))
    (let next-turn ()
      (unless (finished?)
        (schedule-runnables-for-next-turn sched)
        (stack-push-list! cur (reverse (stack-pop-all! next)))
        (let next-fiber ()
          (match (stack-pop! cur #f)
            (#f
             (when (stack-empty? next)
               ;; Both current and next runqueues are empty; steal a
               ;; little bit of work from a remote scheduler if we
               ;; can.  Run it directly instead of pushing onto a
               ;; queue to avoid double stealing.
               (let ((fiber (steal-fiber!)))
                 (when fiber
                   (set-fiber-scheduler! fiber sched)
                   (run-fiber fiber))))
             (next-turn))
            (fiber
             (run-fiber fiber)
             (next-fiber))))))))

(define (destroy-scheduler sched)
  "Release any resources associated with @var{sched}."
  #;
  (for-each kill-fiber (list-copy (scheduler-fibers sched)))
  (epoll-destroy (scheduler-epfd sched)))

(define (create-fiber sched thunk)
  "Spawn a new fiber in @var{sched} with the continuation @var{thunk}.
The fiber will be scheduled on the next turn.  @var{thunk} will run
with a copy of the current dynamic state, isolating fluid and
parameter mutations to the fiber."
  (let* ((fiber (make-fiber sched #f))
         (thunk (let ((dynamic-state (current-dynamic-state)))
                  (lambda ()
                    (with-dynamic-state
                     dynamic-state
                     (lambda ()
                       (current-fiber fiber)
                       (catch #t
                         (lambda ()
                           (%start-stack #t thunk))
                         (lambda _ #f)
                         (let ((err (current-error-port)))
                           (lambda (key . args)
                             (false-if-exception
                              (let ((stack (make-stack #t 4)))
                                (format err "Uncaught exception in fiber #~a:\n"
                                        (nameset-ref fibers-nameset fiber))
                                (display-backtrace stack err)
                                (print-exception err (stack-ref stack 0)
                                                 key args))))))))))))
    (nameset-add! fibers-nameset fiber)
    (schedule-fiber! fiber thunk)))

(define (kill-fiber fiber)
  "Try to kill @var{fiber}, causing it to raise an exception.  Note
that this is currently unimplemented!"
  (error "kill-fiber is unimplemented"))

;; Shim for Guile 2.1.5.
(unless (defined? 'suspendable-continuation?)
  (define! 'suspendable-continuation? (lambda (tag) #t)))

;; The AFTER-SUSPEND thunk allows the user to suspend the current
;; fiber, saving its state, and then perform some other nonlocal
;; control flow.
;;
(define* (suspend-current-fiber #:optional
                                (after-suspend (lambda (fiber) #f)))
  "Suspend the current fiber.  Call the optional @var{after-suspend}
callback, if present, with the suspended thread as its argument."
  (let ((tag (scheduler-prompt-tag (fiber-scheduler (current-fiber)))))
    (unless (suspendable-continuation? tag)
      (error "Attempt to suspend fiber within continuation barrier"))
    ((abort-to-prompt tag after-suspend))))

(define* (resume-fiber fiber thunk)
  "Resume @var{fiber}, adding it to the run queue of its scheduler.
The fiber will start by applying @var{thunk}.  A fiber @emph{must}
only be resumed when it is suspended.  This function is thread-safe
even if @var{fiber} is running on a remote scheduler."
  (let ((cont (fiber-continuation fiber)))
    (unless cont (error "invalid fiber" fiber))
    (schedule-fiber! fiber (lambda () (cont thunk)))))

(define* (yield-current-fiber)
  "Yield control to the current scheduler.  Like
@code{suspend-current-fiber} followed directly by @code{resume-fiber},
except that it avoids suspending if the current continuation isn't
suspendable.  Returns @code{#t} if the yield succeeded, or @code{#f}
otherwise."
  (match (current-fiber)
    (#f #f)
    (fiber
     (let ((tag (scheduler-prompt-tag (fiber-scheduler fiber))))
       (and (suspendable-continuation? tag)
            (begin
              (abort-to-prompt tag (lambda (fiber) (resume-fiber fiber #f)))
              #t))))))

(define (finalize-fd sources)
  "Remove data associated with @var{fd} from the scheduler @var{ctx}.
Called by Guile just before Guile goes to close a file descriptor, in
response either to an explicit call to @code{close-port}, or because
the port became unreachable.  In the latter case, this call may come
from a finalizer thread."
  ;; When a file descriptor is closed, the kernel silently removes it
  ;; from any associated epoll sets, so we don't need to do anything
  ;; there.  But because this call could come from any thread,
  ;; especially given the fact that the fiber might be migrated, we
  ;; have to operate locally, just nulling out the sources pair and
  ;; not mucking with the sources table.
  ;;
  ;; FIXME: Wake all sources with EPOLLERR.
  (set-cdr! sources '())
  (set-car! sources #f))

(define (add-fd-event-waiter sched fd events resume)
  "Arrange to resume @var{fiber} when the file descriptor @var{fd} has
the given @var{events}, expressed as an epoll bitfield."
  (let ((sources (hashv-ref (scheduler-sources sched) fd)))
    (match sources
      ((active-events . waiters)
       (set-cdr! sources (acons events resume waiters))
       (unless (and active-events
                    (= (logand events active-events) events))
         (let ((active-events (logior events (or active-events 0))))
           (set-car! sources active-events)
           (epoll-add*! (scheduler-epfd sched) fd
                        (logior active-events EPOLLONESHOT)))))
      (#f
       (let ((sources (list events (cons events resume))))
         (hashv-set! (scheduler-sources sched) fd sources)
         (add-fdes-finalizer! fd (lambda (fd) (finalize-fd sources)))
         (epoll-add*! (scheduler-epfd sched) fd
                      (logior events EPOLLONESHOT)))))))

(define (resume-on-fd-events fd events fiber)
  "Arrange to resume @var{fiber} when the file descriptor @var{fd} has
the given @var{events}, expressed as an epoll bitfield."
  (add-fd-event-waiter (fiber-scheduler fiber) fd events
                       (lambda (revents)
                         (resume-fiber fiber (lambda () revents)))))

(define (resume-on-readable-fd fd fiber)
  "Arrange to resume @var{fiber} when the file descriptor @var{fd}
becomes readable."
  (resume-on-fd-events fd (logior EPOLLIN EPOLLRDHUP) fiber))

(define (resume-on-writable-fd fd fiber)
  "Arrange to resume @var{fiber} when the file descriptor @var{fd}
becomes writable."
  (resume-on-fd-events fd EPOLLOUT fiber))

(define (add-timer sched expiry thunk)
  "Arrange to call @var{thunk} when the absolute real time is greater
than or equal to @var{expiry}, expressed in internal time units."
  (set-scheduler-timers! sched
                         (psq-set (scheduler-timers sched)
                                  (cons expiry thunk)
                                  expiry)))