summaryrefslogtreecommitdiff
path: root/fibers/conditions.scm
blob: 5501135851fd016f9143989de2875622309ae06d (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
;; Conditions

;;;; Copyright (C) 2017 Andy Wingo <wingo@pobox.com>
;;;; 
;;;; This library is free software; you can redistribute it and/or
;;;; modify it under the terms of the GNU Lesser General Public
;;;; License as published by the Free Software Foundation; either
;;;; version 3 of the License, or (at your option) any later version.
;;;; 
;;;; This library is distributed in the hope that it will be useful,
;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
;;;; Lesser General Public License for more details.
;;;; 
;;;; You should have received a copy of the GNU Lesser General Public
;;;; License along with this library; if not, write to the Free Software
;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

;;; Condition variable (cvar) implementation following the 2009 ICFP
;;; paper "Parallel Concurrent ML" by John Reppy, Claudio V. Russo,
;;; and Yingqui Xiao.  See channels.scm for additional commentary.
;;;
;;; Besides the general ways in which this implementation differs from
;;; the paper, this channel implementation avoids locks entirely.
;;; Still, we should disable interrupts while any operation is in a
;;; "claimed" state to avoid excess latency due to pre-emption.  It
;;; would be great if we could verify our protocol though; the
;;; parallel channel operations are still gnarly.

(define-module (fibers conditions)
  #:use-module (srfi srfi-9)
  #:use-module (ice-9 atomic)
  #:use-module (ice-9 match)
  #:use-module (fibers stack)
  #:use-module (fibers operations)
  #:export (make-condition
            condition?
            signal-condition!
            wait-operation
            wait))

(define-record-type <condition>
  (%make-condition signalled? waiters)
  condition?
  ;; atomic box of bool
  (signalled? condition-signalled?)
  ;; stack of flag+resume pairs
  (waiters channel-waiters))

(define (make-condition)
  "Make a fresh condition variable."
  (%make-condition (make-atomic-box #f) (make-empty-stack)))

(define (resume-waiters! waiters)
  (define (resume-one flag resume)
    (match (atomic-box-compare-and-swap! flag 'W 'S)
      ('W (resume values))
      ('C (resume-one flag resume))
      ('S #f)))
  ;; Non-tail-recursion to resume waiters in the order they were added
  ;; to the waiters stack.
  (let lp ((waiters (stack-pop-all! waiters)))
    (match waiters
      (() #f)
      (((flag . resume) . waiters)
       (lp waiters)
       (resume-one flag resume)))))

(define (signal-condition! cvar)
  "Mark @var{cvar} as having been signalled.  Resume any fiber or
thread waiting for @var{cvar}.  If @var{cvar} is already signalled,
calling @code{signal-condition!} does nothing and returns @code{#f};
returns @code{#t} otherwise."
  (match cvar
    (($ <condition> signalled? waiters)
     (match (atomic-box-compare-and-swap! signalled? #f #t)
       (#f ;; We signalled the cvar.
        (resume-waiters! waiters)
        #t)
       (#t ;; Cvar already signalled.
        #f)))))

(define (wait-operation cvar)
  "Make an operation that will complete when @var{cvar} is signalled."
  (match cvar
    (($ <condition> signalled? waiters)
     (define (try-fn) (and (atomic-box-ref signalled?) values))
     (define (block-fn flag sched resume)
       ;; We have suspended the current fiber or thread; arrange for
       ;; signal-condition! to call resume-get by adding the flag and
       ;; resume callback to the cvar's waiters stack.
       (stack-push! waiters (cons flag resume))
       ;; It could be that the cvar was actually signalled in between
       ;; the calls to try-fn and block-fn.  In that case it could be
       ;; that resume-waiters! was called before our push above.  In
       ;; that case, call resume-waiters! to resolve the race.
       (when (atomic-box-ref signalled?)
         (resume-waiters! waiters))
       (values))
     (make-base-operation #f try-fn block-fn))))

(define (wait cvar)
  "Wait until @var{cvar} has been signalled."
  (perform-operation (wait-operation cvar)))