summaryrefslogtreecommitdiff
path: root/fibers/internal.scm
blob: 6a39cfe1347f149b2769cbc947955f03b9160bea (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
;; Fibers: cooperative, event-driven user-space threads.

;;;; Copyright (C) 2016 Free Software Foundation, Inc.
;;;;
;;;; This library is free software; you can redistribute it and/or
;;;; modify it under the terms of the GNU Lesser General Public
;;;; License as published by the Free Software Foundation; either
;;;; version 3 of the License, or (at your option) any later version.
;;;;
;;;; This library is distributed in the hope that it will be useful,
;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
;;;; Lesser General Public License for more details.
;;;;
;;;; You should have received a copy of the GNU Lesser General Public
;;;; License along with this library; if not, write to the Free Software
;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
;;;;

(define-module (fibers internal)
  #:use-module (srfi srfi-9)
  #:use-module (fibers stack)
  #:use-module (fibers epoll)
  #:use-module (fibers psq)
  #:use-module (fibers nameset)
  #:use-module (ice-9 atomic)
  #:use-module (ice-9 control)
  #:use-module (ice-9 match)
  #:use-module (ice-9 fdes-finalizers)
  #:use-module ((ice-9 threads) #:select (current-thread))
  #:export (;; Low-level interface: schedulers and threads.
            make-scheduler
            with-scheduler
            scheduler-name
            (current-scheduler/public . current-scheduler)
            (scheduler-kernel-thread/public . scheduler-kernel-thread)
            run-scheduler
            destroy-scheduler

            resume-on-readable-fd
            resume-on-writable-fd
            resume-on-timer

            create-fiber
            (current-fiber/public . current-fiber)
            kill-fiber
            fiber-scheduler
            fiber-continuation

            fold-all-schedulers
            scheduler-by-name
            fold-all-fibers
            fiber-by-name

            suspend-current-fiber
            resume-fiber
            yield-current-fiber))

(define-once fibers-nameset (make-nameset))
(define-once schedulers-nameset (make-nameset))

(define (fold-all-schedulers f seed)
  "Fold @var{f} over the set of known schedulers.  @var{f} will be
invoked as @code{(@var{f} @var{name} @var{scheduler} @var{seed})}."
  (nameset-fold f schedulers-nameset seed))
(define (scheduler-by-name name)
  "Return the scheduler named @var{name}, or @code{#f} if no scheduler
of that name is known."
  (nameset-ref schedulers-nameset name))

(define (fold-all-fibers f seed)
  "Fold @var{f} over the set of known fibers.  @var{f} will be
invoked as @code{(@var{f} @var{name} @var{fiber} @var{seed})}."
  (nameset-fold f fibers-nameset seed))
(define (fiber-by-name name)
  "Return the fiber named @var{name}, or @code{#f} if no fiber of that
name is known."
  (nameset-ref fibers-nameset name))

(define-record-type <scheduler>
  (%make-scheduler name epfd active-fd-count prompt-tag
                   next-runqueue current-runqueue
                   sources timers kernel-thread)
  scheduler?
  (name scheduler-name set-scheduler-name!)
  (epfd scheduler-epfd)
  (active-fd-count scheduler-active-fd-count set-scheduler-active-fd-count!)
  (prompt-tag scheduler-prompt-tag)
  ;; atomic stack of fiber to run next turn (reverse order)
  (next-runqueue scheduler-next-runqueue)
  ;; atomic stack of fiber to run this turn
  (current-runqueue scheduler-current-runqueue)
  ;; fd -> ((total-events . min-expiry) #(events expiry fiber) ...)
  (sources scheduler-sources)
  ;; PSQ of thunk -> expiry
  (timers scheduler-timers set-scheduler-timers!)
  ;; atomic parameter of thread
  (kernel-thread scheduler-kernel-thread))

(define-record-type <fiber>
  (make-fiber scheduler continuation)
  fiber?
  ;; The scheduler that a fiber runs in.  As a scheduler only runs in
  ;; one kernel thread, this binds a fiber to a kernel thread.
  (scheduler fiber-scheduler)
  ;; What the fiber should do when it resumes, or #f if the fiber is
  ;; currently running.
  (continuation fiber-continuation set-fiber-continuation!))

(define (make-atomic-parameter init)
  (let ((box (make-atomic-box init)))
    (case-lambda
      (() (atomic-box-ref box))
      ((new)
       (if (eq? new init)
           (atomic-box-set! box new)
           (let ((prev (atomic-box-compare-and-swap! box init new)))
             (unless (eq? prev init)
               (error "owned by other thread" prev))))))))

(define* (make-scheduler)
  "Make a new scheduler in which to run fibers."
  (let ((epfd (epoll-create))
        (active-fd-count 0)
        (prompt-tag (make-prompt-tag "fibers"))
        (next-runqueue (make-empty-stack))
        (current-runqueue (make-empty-stack))
        (sources (make-hash-table))
        (timers (make-psq (match-lambda*
                            (((t1 . c1) (t2 . c2)) (< t1 t2)))
                          <))
        (kernel-thread (make-atomic-parameter #f)))
    (let ((sched (%make-scheduler #f epfd active-fd-count prompt-tag
                                  next-runqueue current-runqueue
                                  sources timers kernel-thread)))
      (set-scheduler-name! sched (nameset-add! schedulers-nameset sched))
      sched)))

(define-syntax-rule (with-scheduler scheduler body ...)
  "Evaluate @code{(begin @var{body} ...)} in an environment in which
@var{scheduler} is bound to the current kernel thread and marked as
current.  Signal an error if @var{scheduler} is already running in
some other kernel thread."
  (let ((sched scheduler))
    (dynamic-wind (lambda ()
                    ((scheduler-kernel-thread sched) (current-thread)))
                  (lambda ()
                    (parameterize ((current-scheduler sched))
                      body ...))
                  (lambda ()
                    ((scheduler-kernel-thread sched) #f)))))

(define (scheduler-kernel-thread/public sched)
  "Return the kernel thread on which @var{sched} is running, or
@code{#f} if @var{sched} is not running."
  ((scheduler-kernel-thread sched)))

(define current-scheduler (make-parameter #f))
(define (current-scheduler/public)
  "Return the current scheduler, or @code{#f} if no scheduler is
current."
  (current-scheduler))
(define (make-source events expiry fiber) (vector events expiry fiber))
(define (source-events s) (vector-ref s 0))
(define (source-expiry s) (vector-ref s 1))
(define (source-fiber s) (vector-ref s 2))

(define current-fiber (make-parameter #f))
(define (current-fiber/public)
  "Return the current fiber, or @code{#f} if no fiber is current."
  (current-fiber))

(define (schedule-fiber! fiber thunk)
  ;; The fiber will be resumed at most once, and we are the ones that
  ;; will resume it, so we can set the thunk directly.  Adding the
  ;; fiber to the runqueue is an atomic operation with SEQ_CST
  ;; ordering, so that will make sure this operation is visible even
  ;; for a fiber scheduled on a remote thread.
  (set-fiber-continuation! fiber thunk)
  (let ((sched (fiber-scheduler fiber)))
    (stack-push! (scheduler-next-runqueue sched) fiber)
    (unless (eq? sched (current-scheduler))
      (epoll-wake! (scheduler-epfd sched)))
    (values)))

(define internal-time-units-per-millisecond
  (/ internal-time-units-per-second 1000))

(define (schedule-fibers-for-fd fd revents sched)
  (match (hashv-ref (scheduler-sources sched) fd)
    (#f (warn "scheduler for unknown fd" fd))
    (sources
     (set-scheduler-active-fd-count! sched
                                     (1- (scheduler-active-fd-count sched)))
     (for-each (lambda (source)
                 ;; FIXME: This fiber might have been woken up by
                 ;; another event.  A moot point while file descriptor
                 ;; operations aren't proper CML operations, though.
                 (unless (zero? (logand revents
                                        (logior (source-events source) EPOLLERR)))
                   ;; Fibers can't be stolen while they are in the
                   ;; sources table; the scheduler of the fiber must
                   ;; be SCHED, and so we are indeed responsible for
                   ;; resuming the fiber.
                   (resume-fiber (source-fiber source) (lambda () revents))))
               (cdr sources))
     (cond
      ((zero? (logand revents EPOLLERR))
       (hashv-remove! (scheduler-sources sched) fd)
       (epoll-remove! (scheduler-epfd sched) fd))
      (else
       (set-cdr! sources '())
       ;; Reset active events and expiration time, respectively.
       (set-car! (car sources) #f)
       (set-cdr! (car sources) #f))))))

(define (scheduler-poll-timeout sched)
  (cond
   ((not (stack-empty? (scheduler-next-runqueue sched)))
    ;; Don't sleep if there are fibers in the runqueue already.
    0)
   ((psq-empty? (scheduler-timers sched))
    ;; If there are no timers, only sleep if there are active fd's. (?)
    (cond
     ((zero? (scheduler-active-fd-count sched)) 0)
     (else -1)))
   (else
    (match (psq-min (scheduler-timers sched))
      ((expiry . thunk)
       (let ((now (get-internal-real-time)))
         (if (< expiry now)
             0
             (round/ (- expiry now)
                     internal-time-units-per-millisecond))))))))

(define (run-timers sched)
  ;; Run expired timer thunks in the order that they expired.
  (let ((now (get-internal-real-time)))
    (let run-timers ((timers (scheduler-timers sched)))
      (cond
       ((or (psq-empty? timers)
            (< now (car (psq-min timers))))
        (set-scheduler-timers! sched timers))
       (else
        (call-with-values (lambda () (psq-pop timers))
          (match-lambda*
            (((_ . thunk) timers)
             (thunk)
             (run-timers timers)))))))))

(define (schedule-runnables-for-next-turn sched)
  ;; Called when all runnables from the current turn have been run.
  ;; Note that there may be runnables already scheduled for the next
  ;; turn; one way this can happen is if a fiber suspended itself
  ;; because it was blocked on a channel, but then another fiber woke
  ;; it up, or if a remote thread scheduled a fiber on this scheduler.
  ;; In any case, check the kernel to see if any of the fd's that we
  ;; are interested in are active, and in that case schedule their
  ;; corresponding fibers.  Also run any timers that have timed out.
  (epoll (scheduler-epfd sched)
         #:get-timeout (lambda () (scheduler-poll-timeout sched))
         #:folder (lambda (fd revents seed)
                    (schedule-fibers-for-fd fd revents sched)
                    seed))
  (run-timers sched))

(define* (run-fiber fiber)
  (parameterize ((current-fiber fiber))
    (call-with-prompt
        (scheduler-prompt-tag (fiber-scheduler fiber))
      (lambda ()
        (let ((thunk (fiber-continuation fiber)))
          (set-fiber-continuation! fiber #f)
          (thunk)))
      (lambda (k after-suspend)
        (set-fiber-continuation! fiber k)
        (after-suspend fiber)))))

(define* (run-scheduler sched)
  "Run @var{sched} until there are no more fibers ready to run, no
file descriptors being waited on, and no more timers pending to run.
Return zero values."
  (let ((next (scheduler-next-runqueue sched))
        (cur (scheduler-current-runqueue sched)))
    (let next-turn ()
      (schedule-runnables-for-next-turn sched)
      (stack-push-list! cur (reverse (stack-pop-all! next)))
      (cond
       ((stack-empty? cur)
        ;; Could be the scheduler is stopping, or it could be that we
        ;; got a spurious wakeup.  In any case, this is the place to
        ;; check to see whether the scheduler is really done.
        (cond
         ((not (zero? (scheduler-active-fd-count sched))) (next-turn))
         ((not (psq-empty? (scheduler-timers sched))) (next-turn))
         (else (values))))
       (else
        (let next-fiber ()
          (match (stack-pop! cur #f)
            (#f (next-turn))
            (fiber (run-fiber fiber) (next-fiber)))))))))

(define (steal-work! sched)
  "Steal some work from @var{sched}.  Return a list of runnable fibers
in FIFO order, or the empty list if no work could be stolen."
  (match (stack-pop! (scheduler-current-runqueue sched) #f)
    (#f '())
    (fiber (list fiber))))

(define (destroy-scheduler sched)
  "Release any resources associated with @var{sched}."
  #;
  (for-each kill-fiber (list-copy (scheduler-fibers sched)))
  (epoll-destroy (scheduler-epfd sched)))

(define (create-fiber sched thunk)
  "Spawn a new fiber in @var{sched} with the continuation @var{thunk}.
The fiber will be scheduled on the next turn."
  (let ((fiber (make-fiber sched #f)))
    (nameset-add! fibers-nameset fiber)
    (schedule-fiber! fiber thunk)))

(define (kill-fiber fiber)
  "Try to kill @var{fiber}, causing it to raise an exception.  Note
that this is currently unimplemented!"
  (error "kill-fiber is unimplemented"))

;; Shim for Guile 2.1.5.
(unless (defined? 'suspendable-continuation?)
  (define! 'suspendable-continuation? (lambda (tag) #t)))

;; The AFTER-SUSPEND thunk allows the user to suspend the current
;; fiber, saving its state, and then perform some other nonlocal
;; control flow.
;;
(define* (suspend-current-fiber #:optional
                                (after-suspend (lambda (fiber) #f)))
  "Suspend the current fiber.  Call the optional @var{after-suspend}
callback, if present, with the suspended thread as its argument."
  (let ((tag (scheduler-prompt-tag (current-scheduler))))
    (unless (suspendable-continuation? tag)
      (error "Attempt to suspend fiber within continuation barrier"))
    ((abort-to-prompt tag after-suspend))))

(define* (resume-fiber fiber thunk)
  "Resume @var{fiber}, adding it to the run queue of its scheduler.
The fiber will start by applying @var{thunk}.  A fiber @emph{must}
only be resumed when it is suspended.  This function is thread-safe
even if @var{fiber} is running on a remote scheduler."
  (let ((cont (fiber-continuation fiber)))
    (unless cont (error "invalid fiber" fiber))
    (schedule-fiber! fiber (lambda () (cont thunk)))))

(define* (yield-current-fiber)
  "Yield control to the current scheduler.  Like
@code{suspend-current-fiber} followed directly by @code{resume-fiber},
except that it avoids suspending if the current continuation isn't
suspendable.  Returns @code{#t} if the yield succeeded, or @code{#f}
otherwise."
  (let ((tag (scheduler-prompt-tag (current-scheduler))))
    (and (suspendable-continuation? tag)
         (begin
           (abort-to-prompt tag (lambda (fiber) (resume-fiber fiber #f)))
           #t))))

(define (finalize-fd sched fd)
  "Remove data associated with @var{fd} from the scheduler @var{ctx}.
Called by Guile just before Guile goes to close a file descriptor, in
response either to an explicit call to @code{close-port}, or because
the port became unreachable.  In the latter case, this call may come
from a finalizer thread."
  ;; When a file descriptor is closed, the kernel silently removes it
  ;; from any associated epoll sets, so we don't need to do anything
  ;; there.
  ;;
  ;; FIXME: Take a lock on the sources table?
  ;; FIXME: Wake all sources with EPOLLERR.
  (let ((sources-table (scheduler-sources sched)))
    (when (hashv-ref sources-table fd)
      (set-scheduler-active-fd-count! sched
                                      (1- (scheduler-active-fd-count sched)))
      (hashv-remove! sources-table fd))))

(define (resume-on-fd-events fd events fiber)
  "Arrange to resume @var{fiber} when the file descriptor @var{fd} has
the given @var{events}, expressed as an epoll bitfield."
  (let* ((sched (fiber-scheduler fiber))
         (sources (hashv-ref (scheduler-sources sched) fd)))
    (cond
     (sources
      (set-cdr! sources (cons (make-source events #f fiber) (cdr sources)))
      (let ((active-events (caar sources)))
        (unless active-events
          (set-scheduler-active-fd-count! sched
                                          (1+ (scheduler-active-fd-count sched))))
        (unless (and active-events
                     (= (logand events active-events) events))
          (set-car! (car sources) (logior events (or active-events 0)))
          (epoll-modify! (scheduler-epfd sched) fd
                         (logior (caar sources) EPOLLONESHOT)))))
     (else
      (set-scheduler-active-fd-count! sched
                                      (1+ (scheduler-active-fd-count sched)))
      (hashv-set! (scheduler-sources sched)
                  fd (acons events #f
                            (list (make-source events #f fiber))))
      (add-fdes-finalizer! fd (lambda (fd) (finalize-fd sched fd)))
      (epoll-add! (scheduler-epfd sched) fd (logior events EPOLLONESHOT))))))

(define (resume-on-readable-fd fd fiber)
  "Arrange to resume @var{fiber} when the file descriptor @var{fd}
becomes readable."
  (resume-on-fd-events fd (logior EPOLLIN EPOLLRDHUP) fiber))

(define (resume-on-writable-fd fd fiber)
  "Arrange to resume @var{fiber} when the file descriptor @var{fd}
becomes writable."
  (resume-on-fd-events fd EPOLLOUT fiber))

(define (resume-on-timer fiber expiry get-thunk)
  "Arrange to resume @var{fiber} when the absolute real time is
greater than or equal to @var{expiry}, expressed in internal time
units.  The fiber will be resumed with the result of calling
@var{get-thunk}.  If @var{get-thunk} returns @code{#f}, that indicates
that some other operation performed this operation first, and so no
resume is performed."
  (let ((sched (fiber-scheduler fiber)))
    (define (maybe-resume)
      (let ((thunk (get-thunk)))
        (when thunk (resume-fiber fiber thunk))))
    (set-scheduler-timers! sched
                           (psq-set (scheduler-timers sched)
                                    (cons expiry maybe-resume)
                                    expiry))))