index.js 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. var a = require('assert')
  2. var macgyver = require('macgyver')
  3. var Stream = require('stream')
  4. function merge (to, from) {
  5. to = to || {}
  6. for (var k in from)
  7. if('undefined' === typeof to[k])
  8. to[k] = from[k]
  9. return to
  10. }
  11. module.exports = function (stream, opts) {
  12. a.ok(stream instanceof Stream)
  13. a.ok('function', typeof stream.pipe)
  14. a.ok('function', typeof stream.destroy)
  15. var mac = macgyver()
  16. var opts = merge(('string' == typeof opts ? {name: opts} : opts) || {}, {name: 'stream'})
  17. var spec = {}
  18. function add(name, method) {
  19. spec[name] = function (_opts) {
  20. method(mac, stream, merge(_opts, opts))
  21. return this
  22. }
  23. }
  24. add('through' , throughSpec)
  25. add('basic' , throughSpec) //legacy, remove this.
  26. add('duplex' , duplexSpec)
  27. add('readable' , readableSpec)
  28. add('writable' , writableSpec)
  29. add('pausable' , pauseSpec)
  30. add('drainable' , drainSpec)
  31. add('strictPause' , strictSpec)
  32. spec.all = function (opts) {
  33. if(stream.writable && stream.readable)
  34. return this.through(opts).pausable(opts)
  35. else if(stream.writable)
  36. return this.writable().pausable()
  37. else
  38. return this.readable()
  39. }
  40. spec.validate = function () {
  41. mac.validate()
  42. return this
  43. }
  44. spec.validateOnExit = function () {
  45. //your test framework probably has assigned a listener for on exit also,
  46. //make sure we are first. so the framework has a chance to detect a
  47. //validation error.
  48. if(process.listeners)
  49. process.listeners('exit').unshift(function () {
  50. try {
  51. mac.validate()
  52. } catch (err) {
  53. console.error(err && err.stack)
  54. throw err
  55. }
  56. })
  57. else
  58. setTimeout(mac.validate, 10e3)
  59. return this
  60. }
  61. return spec
  62. }
  63. function writableSpec (mac, stream, opts) {
  64. a.ok('function', typeof stream.destroy, opts.name + '.end *must* be a function')
  65. a.equal(stream.writable, true, opts.name + '.writable *must* == true')
  66. function e (n) { return opts.name + '.emit(\''+n+'\')' }
  67. function n (n) { return opts.name + '.'+n+'()' }
  68. stream.end = mac(stream.end, n('end')).returns(function () {
  69. a.equal(stream.writable, false, opts.name + ' must not be writable after end()')
  70. }).once()
  71. stream.write =
  72. mac(stream.write, n('write'))
  73. .throws(function (err, threw) {
  74. // a.equal(threw, !stream.writable, 'write should throw if !writable')
  75. })
  76. var onClose = mac(function (){
  77. if(opts.debug) console.error(e('close'))
  78. }, e('close')).once()
  79. var onError = mac(function (err){
  80. if(opts.debug) console.error(e('error'), err)
  81. }, e('error')).before(onClose)
  82. stream.on('close', onClose)
  83. stream.on('error', onError)
  84. if(opts.error === false)
  85. onError.never()
  86. if(opts.error === true)
  87. onError.once()
  88. }
  89. function readableSpec (mac, stream, opts) {
  90. merge(opts, {end: true})
  91. function e (n) { return opts.name + '.emit(\''+n+'\')' }
  92. function n (n) { return opts.name + '.'+n+'()' }
  93. var onError = mac(function (err){
  94. //'error' means the same thing as 'close'.
  95. onClose.maybeOnce()
  96. if(opts.debug) console.error(e('error'), err)
  97. }, e('error'))
  98. //.before(onClose) error does not emit close, officially, yet.
  99. var onEnd = mac(function end (){
  100. if(opts.debug) console.error(e('end'), err)
  101. }, e('end'))
  102. .isPassed(function () {
  103. a.equal(stream.readable, false, 'stream must not be readable on "end"')
  104. })
  105. var onClose = mac(function (){
  106. if(opts.debug) console.error(e('close'))
  107. }, e('close'))
  108. .once()
  109. //on end must occur before onClose or onError
  110. //that is to say, end MUST NOT occur after 'close' or 'error'
  111. onEnd.before(onClose).before(onError)
  112. var onData = mac(function data (){}, e('data')).before(onEnd)
  113. stream.on('close', onClose)
  114. stream.on('end', onEnd)
  115. stream.on('data', onData)
  116. if(opts.end !== false) onEnd.once()
  117. else onEnd.never()
  118. if(opts.error === false)
  119. onError.never()
  120. if(opts.error === true)
  121. onError.once()
  122. }
  123. function throughSpec (mac, stream, opts) {
  124. writableSpec(mac, stream, opts)
  125. readableSpec(mac, stream, opts)
  126. throughPauseSpec(mac, stream, opts)
  127. }
  128. function duplexSpec (mac, stream, opts) {
  129. writableSpec(mac, stream, opts)
  130. readableSpec(mac, stream, opts)
  131. pauseSpec(mac, stream, opts)
  132. drainSpec(mac, stream, opts)
  133. }
  134. function drainSpec (mac, stream, opts) {
  135. var paused = false
  136. function e (n) { return opts.name + '.emit(\''+n+'\')' }
  137. function n (n) { return opts.name + '.'+n+'()' }
  138. function drain() {
  139. paused = false
  140. }
  141. var onDrain = mac(drain).never()
  142. stream.on('drain', onDrain)
  143. stream.write =
  144. mac(stream.write, n('write'))
  145. .returns(function (written) {
  146. if(!paused && !written) {
  147. //after write returns false, it must emit drain eventually.
  148. onDrain.again()
  149. }
  150. paused = (written === false)
  151. })
  152. }
  153. //for through-streams
  154. function throughPauseSpec (mac, stream, opts) {
  155. var paused = false
  156. function e (n) { return opts.name + '.emit(\''+n+'\')' }
  157. function n (n) { return opts.name + '.'+n+'()' }
  158. function drain() {
  159. paused = false
  160. }
  161. var onDrain = mac(drain, e('drain')).never()
  162. a.ok(stream.pause, 'stream *must* have pause')
  163. if(!stream.readable)
  164. throw new Error('pause does not make sense for a non-readable stream')
  165. stream.pause = mac(stream.pause, n('pause'))
  166. .isPassed(function () {
  167. if(paused) return
  168. //console.log('entered pause state by pause()')
  169. paused = true
  170. onDrain.again()
  171. })
  172. stream.on('drain', onDrain)
  173. stream.write =
  174. mac(stream.write, n('write'))
  175. .returns(function (written) {
  176. if(!paused && !written) {
  177. //after write returns false, it must emit drain eventually.
  178. //console.log('entered pause state by write() === false')
  179. onDrain.again()
  180. }
  181. paused = (written === false)
  182. })
  183. if(opts.strict)
  184. stream.on('data', function onData(data) {
  185. //stream must not emit data when paused!
  186. a.equal(paused, false, 'a strict stream *must not* emit \'data\' when paused')
  187. })
  188. }
  189. /*
  190. demand that the stream does not emit any data when paused
  191. */
  192. function pauseSpec (mac, stream, opts) {
  193. var paused = false
  194. function e (n) { return opts.name + '.emit(\''+n+'\')' }
  195. function n (n) { return opts.name + '.'+n+'()' }
  196. if(!stream.readable)
  197. throw new Error('strict pause does not make sense for a non-readable stream')
  198. stream.pause = mac(stream.pause)
  199. .isPassed(function () {
  200. paused = true
  201. })
  202. stream.resume = mac(stream.resume)
  203. .isPassed(function () {
  204. paused = false
  205. })
  206. if(opts.strict)
  207. stream.on('data', function () {
  208. a.equal(paused, false, 'a strict pausing stream must not emit data when paused')
  209. })
  210. }
  211. function strictSpec (mac, stream, opts) {
  212. return pauseSpec(mac, stream, merge(opts, {strict: true}))
  213. }