port-buffer.scm 19 KB


  1. ; Copyright (c) 1993-2008 by Richard Kelsey and Jonathan Rees. See file COPYING.
  2. (define (make-buffered-input-port handler data buffer index limit)
  3. (if (and (okay-buffer? buffer index limit)
  4. (port-handler? handler))
  5. (make-port handler
  6. (enum text-encoding-option latin-1)
  7. #f
  8. (bitwise-ior input-port-mask open-input-port-mask)
  9. #f ; timestamp (was lock)
  10. data
  11. buffer
  12. index
  13. limit
  14. #f ; pending-cr?
  15. #f) ; pending-eof?
  16. (assertion-violation 'make-buffered-input-port
  17. "invalid argument"
  18. handler data buffer index limit)))
  19. (define (make-buffered-output-port handler data buffer index limit)
  20. (if (and (okay-buffer? buffer index limit)
  21. (> limit 0)
  22. (port-handler? handler))
  23. (make-port handler
  24. (enum text-encoding-option latin-1)
  25. #f
  26. open-output-port-status
  27. #f ; timestamp (was lock)
  28. data
  29. buffer
  30. index
  31. limit
  32. #f ; pending-cr?
  33. #f) ; pending-eof?
  34. (assertion-violation 'make-buffered-output-port
  35. "invalid argument"
  36. handler data buffer index limit)))
  37. (define (okay-buffer? buffer index limit)
  38. (and (byte-vector? buffer)
  39. (integer? limit)
  40. (integer? index)
  41. (exact? limit)
  42. (exact? index)
  43. (<= 0 limit (byte-vector-length buffer))
  44. (<= 0 index limit)))
  45. ;----------------
  46. ; (buffered-input-port-handler discloser
  47. ; closer
  48. ; fill-buffer-proc) -> handler
  49. ;
  50. ; (fill-buffer-proc <port> <wait?>)
  51. ; -> <boolean> ; true if commit works, false if it fails
  52. ; Closer must also do a maybe-commit and return the result.
  53. ;
  54. ; If <wait?> is true then wait for input. If <wait?> is false then return
  55. ; immediately even if no input is available.
  56. (define (make-buffered-input-port-handler discloser
  57. closer!
  58. buffer-filler!
  59. ready?)
  60. (make-port-handler (lambda (port)
  61. (discloser (port-data port)))
  62. (lambda (port)
  63. (with-new-proposal (lose)
  64. (make-input-port-closed! port)
  65. (or (closer! (port-data port))
  66. (lose))))
  67. (make-one-byte-input buffer-filler!)
  68. (make-one-char-input buffer-filler!)
  69. (make-read-block buffer-filler!)
  70. (make-byte-ready? ready? #t)
  71. #f)) ; force
  72. ;----------------
  73. ; Rename an old field
  74. (define (note-buffer-reuse! port)
  75. (provisional-set-port-lock! port (cons 'timestamp '())))
  76. ; Calling this has the side-effect of getting the current proposal to check
  77. ; the timestamp value when committing.
  78. (define check-buffer-timestamp! provisional-port-lock)
  79. ; And a current field.
  80. (define port-flushed port-pending-eof?)
  81. (define set-port-flushed! set-port-pending-eof?!)
  82. ;----------------
  83. ; The READ? argument says whether we're doing a READ or a PEEK.
  84. (define (make-one-byte-input buffer-filler!)
  85. (lambda (port read?)
  86. (with-new-proposal (lose)
  87. (let ((index (provisional-port-index port))
  88. (limit (provisional-port-limit port)))
  89. (cond ((not (open-input-port? port))
  90. (remove-current-proposal!)
  91. (assertion-violation (if read? 'read-byte 'peek-byte)
  92. "invalid argument"
  93. port))
  94. ((< index limit)
  95. (if read?
  96. (provisional-set-port-index! port (+ 1 index)))
  97. (let ((b (provisional-byte-vector-ref (port-buffer port)
  98. index)))
  99. (if (maybe-commit)
  100. b
  101. (lose))))
  102. ((provisional-port-pending-eof? port)
  103. (if read?
  104. (provisional-set-port-pending-eof?! port #f))
  105. (if (maybe-commit)
  106. (eof-object)
  107. (lose)))
  108. (else
  109. (provisional-set-port-index! port 0)
  110. (provisional-set-port-limit! port 0)
  111. (buffer-filler! port #t)
  112. (lose)))))))
  113. ; The MODE argument says whether we're doing a READ (#f) , a PEEK (#t),
  114. ; or a CHAR-READY? ( () )
  115. (define (make-one-char-input buffer-filler!)
  116. (lambda (port mode)
  117. (let ((decode
  118. (text-codec-decode-char-proc (port-text-codec port))))
  119. (with-new-proposal (lose)
  120. (let ((limit (provisional-port-limit port)))
  121. (let loop ((index (provisional-port-index port)))
  122. (define (consume&deliver decode-count val)
  123. (if (not mode)
  124. (provisional-set-port-index! port
  125. (+ index decode-count)))
  126. (if (maybe-commit)
  127. val
  128. (lose)))
  129. (cond ((not (open-input-port? port))
  130. (remove-current-proposal!)
  131. (assertion-violation (cond
  132. ((not mode) 'read-char)
  133. ((null? mode) 'char-ready?)
  134. (else 'peek-char))
  135. "invalid argument"
  136. port))
  137. ((< index limit)
  138. (let ((buffer (port-buffer port)))
  139. (call-with-values
  140. (lambda ()
  141. (decode buffer index (- limit index)))
  142. (lambda (ch decode-count)
  143. (cond
  144. (ch
  145. ;; CR/LF handling. Great.
  146. (cond
  147. ((port-crlf? port)
  148. (cond
  149. ((char=? ch cr)
  150. (provisional-set-port-pending-cr?! port #t)
  151. (consume&deliver decode-count
  152. (if (null? mode) ; CHAR-READY?
  153. #t
  154. #\newline)))
  155. ((and (char=? ch #\newline)
  156. (provisional-port-pending-cr? port))
  157. (provisional-set-port-pending-cr?! port #f)
  158. (loop (+ index decode-count)))
  159. (else
  160. (provisional-set-port-pending-cr?! port #f)
  161. (consume&deliver decode-count
  162. (if (null? mode) ; CHAR-READY?
  163. #t
  164. ch)))))
  165. (else
  166. (provisional-set-port-pending-cr?! port #f)
  167. (consume&deliver decode-count
  168. (if (null? mode) ; CHAR-READY?
  169. #t
  170. ch)))))
  171. ((or (not decode-count) ; decoding error
  172. (provisional-port-pending-eof? port)) ; partial char
  173. (consume&deliver 1
  174. (if (null? mode)
  175. #t
  176. #\?)))
  177. ;; need at least DECODE-COUNT bytes
  178. (else
  179. (if (> decode-count
  180. (- (byte-vector-length buffer)
  181. limit))
  182. ;; copy what we have to the
  183. ;; beginning so there's space at the
  184. ;; end we can try to fill
  185. (begin
  186. ;; (debug-message "aligning port buffer")
  187. (attempt-copy-bytes! buffer index
  188. buffer 0
  189. (- limit index))
  190. (provisional-set-port-index! port 0)
  191. (provisional-set-port-limit! port (- limit index))))
  192. (if (or (not (buffer-filler! port (not (null? mode))))
  193. (not (null? mode)))
  194. (lose)
  195. #f)))))))
  196. ((provisional-port-pending-eof? port)
  197. (if (not mode)
  198. (provisional-set-port-pending-eof?! port #f))
  199. (cond
  200. ((not (maybe-commit))
  201. (lose))
  202. ((null? mode) #t)
  203. (else (eof-object))))
  204. (else
  205. (if (= index limit) ; we have zilch
  206. (begin
  207. (provisional-set-port-index! port 0)
  208. (provisional-set-port-limit! port 0))
  209. ;; may be out of synch because of CR/LF conversion
  210. (provisional-set-port-index! port index))
  211. (if (or (not (buffer-filler! port (not (null? mode))))
  212. (not (null? mode)))
  213. (lose)
  214. #f)))))))))
  215. ;----------------
  216. ; See if there is a byte available.
  217. (define (make-byte-ready? ready? read?)
  218. (lambda (port)
  219. (with-new-proposal (lose)
  220. (cond ((not ((if read?
  221. open-input-port?
  222. open-output-port?)
  223. port))
  224. (remove-current-proposal!)
  225. (assertion-violation 'byte-ready? "invalid argument" port))
  226. ((or (< (provisional-port-index port)
  227. (provisional-port-limit port))
  228. (and read?
  229. (provisional-port-pending-eof? port)))
  230. (if (maybe-commit)
  231. #t
  232. (lose)))
  233. (else
  234. (call-with-values
  235. (lambda ()
  236. (ready? port))
  237. (lambda (okay? ready?)
  238. (if okay?
  239. ready?
  240. (lose)))))))))
  241. ;----------------
  242. ; Block input
  243. ;
  244. ; If EOF-OKAY? is true the caller will pass an EOF back to the user. If it's
  245. ; false then the caller already has a value to pass back and we have to preserve
  246. ; an EOF for the next invocation.
  247. (define (make-read-block buffer-filler!)
  248. (lambda (port buffer start count wait?)
  249. (let loop ((have 0) (first? #t))
  250. (with-new-proposal (lose)
  251. (if (open-input-port? port)
  252. (let ((result (cond ((provisional-port-pending-eof? port)
  253. (if (= have 0)
  254. (provisional-set-port-pending-eof?! port #f))
  255. (eof-object))
  256. ((= count 0)
  257. 0)
  258. (else
  259. (get-available-bytes! buffer
  260. (+ start have)
  261. (- count have)
  262. port)))))
  263. (cond ((not result)
  264. (if (or wait? first?)
  265. (if (buffer-filler! port wait?)
  266. (loop have #f)
  267. (lose))
  268. (if (maybe-commit)
  269. 0
  270. (lose))))
  271. ((not (maybe-commit))
  272. (lose))
  273. ((eof-object? result)
  274. (if (= have 0)
  275. result
  276. have))
  277. (else
  278. (let ((have (+ have result)))
  279. (if (< have count)
  280. (loop have #f)
  281. have)))))
  282. (begin
  283. (remove-current-proposal!)
  284. (assertion-violation 'read-block "invalid argument"
  285. port buffer start count)))))))
  286. ; Copy whatever bytes are currently available.
  287. ;
  288. ; Reading the timestamp makes its value part of the current proposal. The
  289. ; timestamp is set whenever the buffer is refilled. Without it the proposal
  290. ; could be fooled if the buffer were refilled and the index and limit just
  291. ; happened to be reset to their current values.
  292. (define (get-available-bytes! buffer start count port)
  293. (let* ((index (provisional-port-index port))
  294. (have (- (provisional-port-limit port)
  295. index)))
  296. (if (< 0 have)
  297. (let ((copy-count (min have count)))
  298. (check-buffer-timestamp! port) ; makes the proposal check this
  299. (attempt-copy-bytes! (port-buffer port)
  300. index
  301. buffer
  302. start
  303. copy-count)
  304. (provisional-set-port-index! port
  305. (+ index copy-count))
  306. copy-count)
  307. (begin
  308. (provisional-set-port-index! port 0)
  309. (provisional-set-port-limit! port 0)
  310. #f))))
  311. ;----------------------------------------------------------------
  312. ; Buffered output ports
  313. ;
  314. ; (buffered-output-port-handler discloser
  315. ; closer
  316. ; empty-buffer-proc) -> handler
  317. ;
  318. ; (empty-buffer-proc <port>) -> whatever
  319. ;
  320. ; The buffer emptier must call maybe-commit.
  321. (define (make-buffered-output-port-handler discloser
  322. closer!
  323. buffer-emptier!
  324. ready?)
  325. (make-port-handler (lambda (port)
  326. (discloser (port-data port)))
  327. (make-closer closer! buffer-emptier!)
  328. (make-one-byte-output buffer-emptier!)
  329. (make-one-char-output buffer-emptier!)
  330. (make-write-block buffer-emptier!)
  331. (make-byte-ready? ready? #f)
  332. (make-forcer buffer-emptier!)))
  333. (define (make-closer closer! buffer-emptier!)
  334. (lambda (port)
  335. (with-new-proposal (lose)
  336. (let ((index (provisional-port-index port)))
  337. (cond ((not (open-output-port? port))
  338. (remove-current-proposal!)
  339. (unspecific))
  340. ((< 0 index)
  341. (buffer-emptier! port #t)
  342. (lose))
  343. (else
  344. (make-output-port-closed! port)
  345. (or (closer! (port-data port))
  346. (lose))))))))
  347. ; First check that PORT is open and then either put BYTE in PORT's buffer or
  348. ; empty the buffer and try again.
  349. (define (make-one-byte-output buffer-emptier!)
  350. (lambda (port byte)
  351. (with-new-proposal (lose)
  352. (let ((index (provisional-port-index port))
  353. (limit (byte-vector-length (port-buffer port))))
  354. (cond ((not (open-output-port? port))
  355. (remove-current-proposal!)
  356. (assertion-violation 'write-byte "invalid argument" port))
  357. ((< index limit)
  358. (provisional-byte-vector-set! (port-buffer port)
  359. index
  360. byte)
  361. (provisional-set-port-index! port (+ 1 index))
  362. (or (maybe-commit)
  363. (lose)))
  364. (else
  365. (call-to-flush! port (lambda () (buffer-emptier! port #t)))
  366. (lose)))))))
  367. (define (make-one-char-output buffer-emptier!)
  368. (lambda (port ch)
  369. (let ((encode
  370. (text-codec-encode-char-proc (port-text-codec port))))
  371. (with-new-proposal (lose)
  372. (let ((index (provisional-port-index port))
  373. (limit (byte-vector-length (port-buffer port))))
  374. (cond ((not (open-output-port? port))
  375. (remove-current-proposal!)
  376. (assertion-violation 'write-byte "invalid argument" port))
  377. ((< index limit)
  378. (let ((encode-count #f)
  379. (ok? #f))
  380. (cond
  381. ((not
  382. (maybe-commit-no-interrupts
  383. (lambda ()
  384. (if (and (port-crlf? port)
  385. (char=? ch #\newline))
  386. ;; CR/LF handling ruins our day once again
  387. (call-with-values
  388. (lambda ()
  389. (encode cr
  390. (port-buffer port)
  391. index (- limit index)))
  392. (lambda (the-ok? cr-encode-count)
  393. (cond
  394. ((or (not the-ok?)
  395. (>= (+ index cr-encode-count) limit))
  396. (set! ok? #f)
  397. (set! encode-count (+ 1 cr-encode-count))) ; LF will take at least one
  398. (else
  399. (call-with-values
  400. (lambda ()
  401. (encode #\newline
  402. (port-buffer port)
  403. (+ index cr-encode-count)
  404. (- limit (+ index cr-encode-count))))
  405. (lambda (the-ok? lf-encode-count)
  406. (set! ok? the-ok?)
  407. (if the-ok?
  408. (set-port-index! port
  409. (+ index
  410. cr-encode-count lf-encode-count))
  411. (set! encode-count (+ cr-encode-count lf-encode-count)))))))))
  412. (call-with-values
  413. (lambda ()
  414. (encode ch
  415. (port-buffer port)
  416. index (- limit index)))
  417. (lambda (the-ok? the-encode-count)
  418. (set! ok? the-ok?)
  419. (if the-ok?
  420. (set-port-index! port (+ index the-encode-count))
  421. (set! encode-count the-encode-count))))))))
  422. (lose))
  423. (ok?) ; we're done
  424. (encode-count ; need more space
  425. (with-new-proposal (_)
  426. (call-to-flush! port (lambda () (buffer-emptier! port #t))))
  427. (lose))
  428. (else ; encoding error
  429. (set! ch #\?) ; if we get an encoding error on
  430. ; the second go, we're toast
  431. (lose)))))
  432. (else
  433. (call-to-flush! port (lambda () (buffer-emptier! port #t)))
  434. (lose))))))))
  435. ; We have the following possibilities:
  436. ; - the port is no longer open
  437. ; -> raise an error
  438. ; - there is nothing to write
  439. ; -> do nothing
  440. ; - there is room left in the port's buffer
  441. ; -> copy bytes into it
  442. ; - there is no room left in the port's buffer
  443. ; -> write it out and try again
  444. (define (make-write-block buffer-emptier!)
  445. (lambda (port buffer start count)
  446. (let loop ((sent 0))
  447. (with-new-proposal (lose)
  448. (cond ((not (open-output-port? port))
  449. (remove-current-proposal!)
  450. (assertion-violation 'write-block "invalid argument"
  451. buffer start count port))
  452. ((= count 0)
  453. (if (maybe-commit)
  454. 0
  455. (lose)))
  456. ((copy-bytes-out! buffer
  457. (+ start sent)
  458. (- count sent)
  459. port)
  460. => (lambda (more)
  461. (if (maybe-commit)
  462. (let ((sent (+ sent more)))
  463. (if (< sent count)
  464. (loop sent)))
  465. (lose))))
  466. (else
  467. (call-to-flush! port (lambda () (buffer-emptier! port #t)))
  468. (lose)))))))
  469. (define (copy-bytes-out! buffer start count port)
  470. (let ((index (provisional-port-index port))
  471. (limit (byte-vector-length (port-buffer port))))
  472. (if (< index limit)
  473. (let ((copy-count (min (- limit index)
  474. count)))
  475. (check-buffer-timestamp! port) ; makes the proposal check this
  476. (provisional-set-port-index! port (+ index copy-count))
  477. (attempt-copy-bytes! buffer start
  478. (port-buffer port) index
  479. copy-count)
  480. copy-count)
  481. #f)))
  482. ; Write out anything in the buffer. When called by the auto-forcing code
  483. ; this may run across the occasional closed port.
  484. ;
  485. ; This loops by calling LOSE if the buffer-emptier's commit fails (in which
  486. ; case the emptier returns false) or if we are trying to empty the entire
  487. ; buffer (indicated by NECESSARY? being true).
  488. (define (make-forcer buffer-emptier!)
  489. (lambda (port necessary?)
  490. (with-new-proposal (lose)
  491. (cond ((not (open-output-port? port))
  492. (if necessary?
  493. (begin
  494. (remove-current-proposal!)
  495. (assertion-violation 'force-output "invalid argument" port)))
  496. (unspecific))
  497. ((< 0 (provisional-port-index port))
  498. (if (or (not (call-to-flush port (lambda () (buffer-emptier! port necessary?))))
  499. necessary?)
  500. (lose)))))))
  501. ;----------------
  502. (define (default-buffer-size)
  503. (channel-parameter (enum channel-parameter-option buffer-size)))
  504. ;----------------
  505. ; Code to periodically flush output ports.
  506. (define flush-these-ports
  507. (make-session-data-slot! (list #f)))
  508. (define (periodically-force-output! port)
  509. (let ((pair (session-data-ref flush-these-ports)))
  510. (set-cdr! pair
  511. (cons (make-weak-pointer port)
  512. (cdr pair)))))
  513. ; Return a list of thunks that will flush the buffer of each open port
  514. ; that contains bytes that have been there since the last time
  515. ; this was called. The actual i/o is done using separate threads to
  516. ; keep i/o errors from killing anything vital.
  517. ;
  518. ; If USE-FLUSHED-FLAGS? is true this won't flush buffers that have been
  519. ; flushed by someone else since the last call. If it is false then flush
  520. ; all non-empty buffers, because the system has nothing to do and is going
  521. ; to pause while waiting for external events.
  522. (define (output-port-forcers use-flushed-flags?)
  523. (let ((pair (session-data-ref flush-these-ports)))
  524. (let loop ((next (cdr pair))
  525. (last pair)
  526. (thunks '()))
  527. (if (null? next)
  528. ; (begin (debug-message "[forcing "
  529. ; (length thunks)
  530. ; " thunk(s)]")
  531. thunks ;)
  532. (let ((port (weak-pointer-ref (car next))))
  533. (cond ((or (not port) ; GCed or closed so
  534. (not (open-output-port? port))) ; drop it from the list
  535. (set-cdr! last (cdr next))
  536. (loop (cdr next) last thunks))
  537. ((eq? (port-flushed port) 'flushing) ; somebody else is doing it
  538. (loop (cdr next) next thunks))
  539. ((and use-flushed-flags? ; flushed recently
  540. (port-flushed port))
  541. (set-port-flushed! port #f) ; race condition, but harmless
  542. (loop (cdr next) next thunks))
  543. ((< 0 (port-index port)) ; non-empty
  544. (loop (cdr next) next
  545. (cons (make-forcing-thunk port)
  546. thunks)))
  547. (else ; empty
  548. (loop (cdr next) next thunks))))))))
  549. ; Returns a list of the current ports that are flushed whenever.
  550. ; This is used to flush channel ports before forking.
  551. (define (periodically-flushed-ports)
  552. (let* ((ints (set-enabled-interrupts! 0))
  553. (pair (session-data-ref flush-these-ports)))
  554. (let loop ((next (cdr pair))
  555. (last pair)
  556. (ports '()))
  557. (if (null? next)
  558. (begin
  559. (set-enabled-interrupts! ints)
  560. ports)
  561. (let ((port (weak-pointer-ref (car next))))
  562. (cond ((or (not port) ; GCed or closed
  563. (not (open-output-port? port))) ; so drop it from the list
  564. (set-cdr! last (cdr next))
  565. (loop (cdr next) last ports))
  566. (else
  567. (loop (cdr next)
  568. next
  569. (cons port ports)))))))))
  570. ; Write out PORT's buffer. If a problem occurs it is reported and PORT
  571. ; is closed.
  572. (define (make-forcing-thunk port)
  573. (lambda ()
  574. ; (debug-message "[forcing port]")
  575. (if (and (report-errors-as-warnings
  576. (lambda ()
  577. (force-output-if-open port))
  578. "error when flushing buffer; closing port"
  579. port)
  580. (open-output-port? port))
  581. (report-errors-as-warnings
  582. (lambda ()
  583. (atomically! (set-port-index! port 0)) ; prevent flushing
  584. ((port-handler-close (port-handler port))
  585. port))
  586. "error when closing port"
  587. port))))
  588. (define (call-to-flush! port thunk)
  589. (set-port-flushed! port 'flushing) ; don't let the periodic flusher go crazy
  590. (thunk)
  591. (set-port-flushed! port #t))
  592. (define (call-to-flush port thunk)
  593. (set-port-flushed! port 'flushing) ; don't let the periodic flusher go crazy
  594. (let ((retval (thunk))) ; one is enough
  595. (set-port-flushed! port #t)
  596. retval))