diff options
| author | Artyom V. Poptsov <poptsov.artyom@gmail.com> | 2015-11-01 09:13:11 +0300 |
|---|---|---|
| committer | Artyom V. Poptsov <poptsov.artyom@gmail.com> | 2015-11-01 09:13:11 +0300 |
| commit | eed3bba0828efc900ed6515868e24aecef8b4a7c (patch) | |
| tree | 25e0c788fe14a8e52a242e6d3dbff409ba49be50 /modules/ssh | |
| parent | Merge branch 'master' into wip-sftp (diff) | |
| parent | doc/api-channels.texi: Update (diff) | |
| download | guile-ssh-eed3bba0828efc900ed6515868e24aecef8b4a7c.tar.gz | |
Merge branch 'master' into wip-sftp
Diffstat (limited to 'modules/ssh')
| -rw-r--r-- | modules/ssh/dist.scm | 33 | ||||
| -rw-r--r-- | modules/ssh/dist/node.scm | 120 |
2 files changed, 84 insertions, 69 deletions
diff --git a/modules/ssh/dist.scm b/modules/ssh/dist.scm index b24b4a3..cf54d9e 100644 --- a/modules/ssh/dist.scm +++ b/modules/ssh/dist.scm @@ -60,9 +60,12 @@ "Flatten a list LST one level down. Return a flattened list." (fold-right append '() lst)) -(define (warning fmt . args) +(define (format-warning fmt . args) (apply format (current-error-port) (string-append "WARNING: " fmt) args)) +(define (format-error fmt . args) + (apply format (current-error-port) (string-append "ERROR: " fmt) args)) + (define (execute-job nodes job) "Execute a JOB, handle errors." @@ -72,17 +75,20 @@ (lambda () (hand-out-job job)) (lambda args - (format (current-error-port) - "ERROR: In ~a:~%~a:~%~a~%" - job (cadr args) (caddr args)) + (format-error "In ~a:~%~a:~%~a~%" job (cadr args) (caddr args)) (error "Could not execute a job" job)))) (lambda args - (warning "Could not execute a job ~a~%" job) + (format-warning "Could not execute a job ~a~%" job) (let ((nodes (delete (job-node job) nodes))) (and (null? nodes) (error "Could not execute a job" job)) - (warning "Passing a job ~a to a node ~a ...~%" job (car nodes)) + (format-warning "Passing a job ~a to a node ~a ...~%" job (car nodes)) (execute-job nodes (set-job-node job (car nodes))))))) + +(define (execute-jobs nodes jobs) + "Execute JOBS on NODES, return the result." + (flatten-1 (n-par-map (length jobs) (cut execute-job nodes <>) jobs))) + ;;; @@ -91,16 +97,21 @@ "Evaluate each EXPR in parallel, using distributed computation. Split the job to nearly equal parts and hand out each of resulting sub-jobs to a NODES list. Return the results of N expressions as a set of N multiple values." - (let ((jobs (assign-eval nodes (list (quote expr) ...)))) - (apply values (flatten-1 (n-par-map (length jobs) (cut execute-job nodes <>) - jobs))))) + (let* ((jobs (assign-eval nodes (list (quote expr) ...))) + (results (execute-jobs nodes jobs))) + (and (null? results) + (error "Could not execute jobs" nodes jobs)) + (apply values results))) (define-syntax-rule (dist-map nodes proc lst) "Do list mapping using distributed computation. The job is splitted to nearly equal parts and hand out resulting jobs to a NODES list. Return the result of computation." - (let ((jobs (assign-map nodes lst (quote proc)))) - (flatten-1 (n-par-map (length jobs) (cut execute-job nodes <>) jobs)))) + (let* ((jobs (assign-map nodes lst (quote proc))) + (results (execute-jobs nodes jobs))) + (and (null? results) + (error "Could not execute jobs" nodes jobs)) + results)) (define-syntax-rule (with-ssh node exp ...) diff --git a/modules/ssh/dist/node.scm b/modules/ssh/dist/node.scm index b0e739e..7167fd0 100644 --- a/modules/ssh/dist/node.scm +++ b/modules/ssh/dist/node.scm @@ -74,6 +74,11 @@ rrepl-get-result)) +(define (eof-or-null? str) + "Return #t if a STR is an EOF object or an empty string, #f otherwise." + (or (eof-object? str) (string-null? str))) + + ;;; Error reporting (define (node-error . args) @@ -90,9 +95,9 @@ (define-record-type <node> (%make-node tunnel repl-port start-repl-server?) node? - (tunnel node-tunnel) - (repl-port node-repl-port) - (start-repl-server? node-start-repl-server?)) + (tunnel node-tunnel) ; <tunnel> + (repl-port node-repl-port) ; number + (start-repl-server? node-start-repl-server?)) ; boolean (define (node-session node) "Get node session." @@ -124,6 +129,10 @@ automatically in case when it is not started yet." ;;; Remote REPL (RREPL) +(define (read-string str) + "Read a string STR." + (call-with-input-string str read)) + (define (rexec node cmd) "Execute a command CMD on the remote side. Return two values: the first line returned by CMD and its exit code." @@ -174,72 +183,67 @@ name. Throw 'node-repl-error' on an error." (define (raise-repl-error result) (let loop ((line (read-line repl-channel)) (result result)) - (if (or (eof-object? line) (string-null? line)) + (if (eof-or-null? line) (node-repl-error "Evaluation failed" result) (loop (read-line repl-channel) (string-append result "\n" line))))) + (define (parse-result 1st-match) + (let loop ((line (read-line repl-channel)) + (matches (list 1st-match))) + (if (or (eof-or-null? line) + (regexp-exec %repl-undefined-result-regexp line)) + (reverse matches) + (loop (read-line repl-channel) + (cons (regexp-exec %repl-result-2-regexp line) matches))))) + (define (read-result match) - (let ((matches - (let loop ((line (read-line repl-channel)) - (matches (list match))) - (if (or (eof-object? line) (string-null? line) - (regexp-exec %repl-undefined-result-regexp line)) - (reverse matches) - (loop (read-line repl-channel) - (cons (regexp-exec %repl-result-2-regexp line) matches)))))) - (let ((len (length matches))) - (if (= len 1) + (let* ((matches (parse-result match)) + (len (length matches))) + (if (= len 1) + (let ((m (car matches))) + (values (read-string (match:substring m 4)) + (string->number (match:substring m 3)))) + (let ((rv (make-vector len)) + (nv (make-vector len))) + + ;; The 1st match also contains a module name and language name, + ;; but we want only the evaluation result and the result number. (let ((m (car matches))) - (values (call-with-input-string (match:substring m 4) - read) - (string->number (match:substring m 3)))) - (let ((rv (make-vector len)) - (nv (make-vector len))) - (vector-set! rv 0 - (call-with-input-string (match:substring (car matches) - 4) - read)) - (vector-set! nv 0 - (string->number (match:substring (car matches) 3))) - (do ((i 1 (1+ i))) - ((= i len)) - (vector-set! rv i - (call-with-input-string - (match:substring (list-ref matches i) - 2) - read)) - (vector-set! nv i - (string->number (match:substring (list-ref matches - i) - 1)))) - (values rv nv)))))) + (vector-set! rv 0 (read-string (match:substring m 4))) + (vector-set! nv 0 (string->number (match:substring m 3)))) + + (do ((i 1 (1+ i))) + ((= i len)) + (let ((m (list-ref matches i))) + (vector-set! rv i (read-string (match:substring m 2))) + (vector-set! nv i (string->number (match:substring m 1))))) + (values rv nv))))) (let ((result (read-line repl-channel))) (if (string-null? result) (rrepl-get-result repl-channel) - (begin - (cond - ((regexp-exec %repl-result-regexp result) => - (lambda (match) - (receive (result eval-num) - (read-result match) - (values - result ; Result - eval-num ; # of evaluation - (match:substring match 2) ; Module - (match:substring match 1))))) ; Language - ((regexp-exec %repl-error-regexp result) => - (lambda (match) (raise-repl-error result))) - ((regexp-exec %repl-undefined-result-regexp result) => - (lambda (match) + (cond + ((regexp-exec %repl-result-regexp result) => + (lambda (match) + (receive (result eval-num) + (read-result match) (values - *unspecified* ; Result - *unspecified* ; # of evaluation - (match:substring match 2) ; Module - (match:substring match 1)))) ; Language - (else - (raise-repl-error result))))))) + result ; Result + eval-num ; # of evaluation + (match:substring match 2) ; Module + (match:substring match 1))))) ; Language + ((regexp-exec %repl-error-regexp result) => + (lambda (match) (raise-repl-error result))) + ((regexp-exec %repl-undefined-result-regexp result) => + (lambda (match) + (values + *unspecified* ; Result + *unspecified* ; # of evaluation + (match:substring match 2) ; Module + (match:substring match 1)))) ; Language + (else + (raise-repl-error result)))))) (define (rrepl-eval rrepl-channel quoted-exp) "Evaluate QUOTED-EXP using RREPL-CHANNEL, return four values: an evaluation |
