Multer is built on busboy, and is essentially this file:

var is = require('type-is')
var Busboy = require('busboy')
var extend = require('xtend')
var onFinished = require('on-finished')
var appendField = require('append-field')

var Counter = require('./counter')
var MulterError = require('./multer-error')
var FileAppender = require('./file-appender')
var removeUploadedFiles = require('./remove-uploaded-files')

function drainStream (stream) {
  stream.on('readable', stream.read.bind(stream))
}

function makeMiddleware (setup) {
  return function multerMiddleware (req, res, next) {
    if (!is(req, ['multipart'])) return next()

    var options = setup()

    var limits = options.limits
    var storage = options.storage
    var fileFilter = options.fileFilter
    var fileStrategy = options.fileStrategy
    var preservePath = options.preservePath

    req.body = Object.create(null)

    var busboy

    try {
      busboy = new Busboy({ headers: req.headers, limits: limits, preservePath: preservePath })
    } catch (err) {
      return next(err)
    }

    var appender = new FileAppender(fileStrategy, req)
    var isDone = false
    var readFinished = false
    var errorOccured = false
    var pendingWrites = new Counter()
    var uploadedFiles = []

    function done (err) {
      if (isDone) return
      isDone = true

      req.unpipe(busboy)
      drainStream(req)
      busboy.removeAllListeners()

      onFinished(req, function () { next(err) })
    }

    function indicateDone () {
      if (readFinished && pendingWrites.isZero() && !errorOccured) done()
    }

    function abortWithError (uploadError) {
      if (errorOccured) return
      errorOccured = true

      pendingWrites.onceZero(function () {
        function remove (file, cb) {
          storage._removeFile(req, file, cb)
        }

        removeUploadedFiles(uploadedFiles, remove, function (err, storageErrors) {
          if (err) return done(err)

          uploadError.storageErrors = storageErrors
          done(uploadError)
        })
      })
    }

    function abortWithCode (code, optionalField) {
      abortWithError(new MulterError(code, optionalField))
    }

    // handle text field data
    busboy.on('field', function (fieldname, value, fieldnameTruncated, valueTruncated) {
      if (fieldname == null) return abortWithCode('MISSING_FIELD_NAME')
      if (fieldnameTruncated) return abortWithCode('LIMIT_FIELD_KEY')
      if (valueTruncated) return abortWithCode('LIMIT_FIELD_VALUE', fieldname)

      // Work around bug in Busboy (https://github.com/mscdex/busboy/issues/6)
      if (limits && Object.prototype.hasOwnProperty.call(limits, 'fieldNameSize')) {
        if (fieldname.length > limits.fieldNameSize) return abortWithCode('LIMIT_FIELD_KEY')
      }

      appendField(req.body, fieldname, value)
    })

    // handle files
    busboy.on('file', function (fieldname, fileStream, filename, encoding, mimetype) {
      // don't attach to the files object, if there is no file
      if (!filename) return fileStream.resume()

      // Work around bug in Busboy (https://github.com/mscdex/busboy/issues/6)
      if (limits && Object.prototype.hasOwnProperty.call(limits, 'fieldNameSize')) {
        if (fieldname.length > limits.fieldNameSize) return abortWithCode('LIMIT_FIELD_KEY')
      }

      var file = {
        fieldname: fieldname,
        originalname: filename,
        encoding: encoding,
        mimetype: mimetype
      }

      var placeholder = appender.insertPlaceholder(file)

      fileFilter(req, file, function (err, includeFile) {
        if (err) {
          appender.removePlaceholder(placeholder)
          return abortWithError(err)
        }

        if (!includeFile) {
          appender.removePlaceholder(placeholder)
          return fileStream.resume()
        }

        var aborting = false
        pendingWrites.increment()

        Object.defineProperty(file, 'stream', {
          configurable: true,
          enumerable: false,
          value: fileStream
        })

        fileStream.on('error', function (err) {
          pendingWrites.decrement()
          abortWithError(err)
        })

        fileStream.on('limit', function () {
          aborting = true
          abortWithCode('LIMIT_FILE_SIZE', fieldname)
        })

        storage._handleFile(req, file, function (err, info) {
          if (aborting) {
            appender.removePlaceholder(placeholder)
            uploadedFiles.push(extend(file, info))
            return pendingWrites.decrement()
          }

          if (err) {
            appender.removePlaceholder(placeholder)
            pendingWrites.decrement()
            return abortWithError(err)
          }

          var fileInfo = extend(file, info)

          appender.replacePlaceholder(placeholder, fileInfo)
          uploadedFiles.push(fileInfo)
          pendingWrites.decrement()
          indicateDone()
        })
      })
    })

    busboy.on('error', function (err) { abortWithError(err) })
    busboy.on('partsLimit', function () { abortWithCode('LIMIT_PART_COUNT') })
    busboy.on('filesLimit', function () { abortWithCode('LIMIT_FILE_COUNT') })
    busboy.on('fieldsLimit', function () { abortWithCode('LIMIT_FIELD_COUNT') })
    busboy.on('finish', function () {
      readFinished = true
      indicateDone()
    })

    req.pipe(busboy)
  }
}

module.exports = makeMiddleware

In that code they have essentially

abortWithError(() => pendingWrites.onceZero(() => removeFiles()...))

So it seems like it’s waiting for all the files to finish writing to disk, before it then removes them, which seems like a waste.

I think having access to the file stream somehow and canceling writing to the disk file stream would save a lot on large files. But I don’t know the proper way to cancel the chain of streams.

The streaming starts with:

req.pipe(busboy)

Pipe the request into the busboy parser instance, which will emit file and field values. The file one comes with a stream, which you then use like this:

fileStream.on('error', function (err) {
  pendingWrites.decrement()
  abortWithError(err)
})

fileStream.on('limit', function () {
  aborting = true
  abortWithCode('LIMIT_FILE_SIZE', fieldname)
})

const writeStream = fs.createWriteStream('/tmp/blah')
fileStream.pipe(writeStream)

Unfortunately, multer internals doesn’t have access to the file writeStream, which is why I think it is written the way it is. So it does abortWithError and then waits for all the writes to finish before moving on.

But if you did have access to the file write stream, what would you do?

fileStream.on('error', function (err) {
  // 1. should you close the write stream?
  writeStream.close()
  // 2. should you instead `destroy` it?
  writeStream.destroy()
  // 3. would just unpiping work? 
  // (and the added fileStream.on('unpipe') handler)
  fileStream.unpipe(writeStream)
  // 4. then delete the file immediately on error after everything is stopped?
  fs.unlink('/tmp/blah')
})

const writeStream = fs.createWriteStream('/tmp/blah')

fileStream.on('unpipe', () => {
  // 3. do you need to explicitly close the write stream?
  writeStream.close()
})

fileStream.pipe(writeStream)

Which one of those approaches (or mixture thereof) is the appropriate solution to stopping streaming the file from the HTTP request, to stopping writing to the tmp file, to removing the tmp file on error?

This is what I ended up with:

import { Request, Response } from 'express'
import _ from 'lodash'
import { ReadStream } from 'fs'
import onFinished from 'on-finished'
import busboy, { FileInfo } from 'busboy'
import DEBUG from 'debug'
import kink from './kink.js'

const debug = DEBUG('load-file')

export type HandleFileInput = {
  req: Request
  stream: ReadStream
  file: LoadFile
}

export type LoadFile = {
  name: string
  encoding: string
  mimeType: string
  buffer?: Buffer
  path?: string
}

export type LoadFileLink = {
  path: string
  min?: number
  max?: number
}

export type HandleFile = (
  props: HandleFileInput,
) => Promise<string | Buffer>

export default function loadFiles(
  req: Request,
  _res: Response,
  {
    link,
    hook,
    halt,
  }: {
    hook: HandleFile
    link: Array<LoadFileLink>
    halt?: {}
  },
) {
  return new Promise((res, rej) => {
    let bb: busboy.Busboy

    const fieldMap = link.reduce<Record<string, LoadFileLink>>(
      (m, x) => {
        m[x.path] = x
        return m
      },
      {},
    )

    req.body ??= {}

    bb = busboy({
      headers: req.headers,
      limits: halt,
    })

    let uploadError
    let processing = 0

    // handle text field data
    bb.on(
      'field',
      (
        fieldName: string,
        value: string,
        fieldNameTruncated: boolean,
        valueTruncated: boolean,
      ) => {
        if (fieldName == null) {
          return abort(kink('missing_field_name'))
        }
        if (fieldNameTruncated) {
          return abort(kink('field_name_truncated'))
        }
        if (valueTruncated) {
          return abort(
            kink('field_value_truncated', { name: fieldName }),
          )
        }

        _.set(req.body, fieldName, value)
      },
    )

    let isDone = false
    let readFinished = false

    // handle files
    const files: Record<string, LoadFile> = {}
    const sizes: Record<string, number> = link.reduce<
      Record<string, number>
    >((m, x) => {
      m[x.path] = 0
      return m
    }, {})
    const streams: Record<string, ReadStream> = {}
    const aborted: Record<string, boolean> = {}

    bb.on(
      'file',
      async (fieldName: string, stream: ReadStream, info: FileInfo) => {
        if (!fieldMap[fieldName]) {
          return abort(
            kink('invalid_file_field_name', {
              name: fieldName,
            }),
          )
        }

        sizes[fieldName]++

        const field = fieldMap[fieldName]
        const size = sizes[fieldName]

        if (field?.max && size && size > field.max) {
          return abort(kink('too_many_files'))
        }

        processing++

        const file: LoadFile = (files[fieldName] = {
          name: info.filename,
          encoding: info.encoding,
          mimeType: info.mimeType,
        })

        streams[fieldName] = stream

        const handleClear = () => {
          debug('handle upload stream clear')
          stream.off('clear', handleClear)
          processing--
          process.nextTick(finish)
        }

        const handleError = err => {
          debug('handle upload stream error')
          // stream.off('error', handleError)
          // handled in custom handler
          aborted[fieldName] = true
          stream.emit('abort', err)
          abort(err)
        }

        const handleLimit = () => {
          debug('handle upload stream limit')
          stream.off('limit', handleLimit)
          const err = kink('file_size_limit_reached')
          aborted[fieldName] = true
          stream.emit('abort', err)
          abort(err)
        }

        // emit in custom handler.
        stream.on('clear', handleClear)
        stream.on('error', handleError)
        stream.on('limit', handleLimit)

        try {
          debug('before upload handling')
          const data = await hook({ req, file, stream })
          debug('after upload handling')

          if (data instanceof Buffer) {
            file.buffer = data
          } else if (typeof data === 'string') {
            file.path = data
          }

          _.set(req.body, fieldName, file)

          stream.emit('clear')
        } catch (e) {
          debug('error uploading')
          return abort(e)
        }
      },
    )

    bb.on('error', err => {
      abort(err)
    })
    bb.on('partsLimit', () => {
      // abort(kink('LIMIT_PART_COUNT'))
    })
    bb.on('filesLimit', () => {
      // abort(kink('LIMIT_FILE_COUNT'))
    })
    bb.on('fieldsLimit', () => {
      // abort(kink('LIMIT_FIELD_COUNT'))
    })
    bb.on('finish', () => {
      readFinished = true
      finish()
    })

    req.pipe(bb)

    function abort(err) {
      if (uploadError) {
        return
      }

      debug('stream abort')

      uploadError = err

      for (const fieldName in streams) {
        const stream = streams[fieldName]
        if (stream && !aborted[fieldName]) {
          aborted[fieldName] = true
          stream.emit('abort', err)
        }
      }
    }

    function finalize(err: Error | null) {
      if (isDone) {
        return
      }

      isDone = true

      debug('upload finalize')

      req.unpipe(bb)
      req.on('readable', req.read.bind(req))
      bb.removeAllListeners()

      onFinished(req, () => {
        if (err) {
          return rej(err)
        }
        res(undefined)
      })
    }

    function finish() {
      debug(
        'upload finish ' +
          JSON.stringify({ uploadError, readFinished, processing }),
      )
      debug(uploadError)
      if ((uploadError || readFinished) && !processing) {
        finalize(uploadError)
      }
    }
  })
}

With this as the usage:

export const buildFileHandle = (ext: string) => {
  const handleFile = async ({
    stream,
  }: HandleFileInput): Promise<string> => {
    return new Promise(async (res, rej) => {
      const tmpPath = (await tmpName()) + `.${ext}`
      const outStream = fs.createWriteStream(tmpPath)

      debug('create download stream')

      const handleStreamAbort = async err => {
        debug('handle stream abort ' + err.message)
        stream.off('abort', handleStreamAbort)
        try {
          await fsp.unlink(tmpPath)
        } catch (e) {}
        stream.emit('clear')
        rej(err)
      }

      const handleFinish = () => {
        debug('handle file finish')
        stream.off('abort', handleStreamAbort)
        res(tmpPath)
      }

      stream.on('abort', handleStreamAbort)
      stream.pipe(outStream)
      outStream.on('error', handleStreamAbort)
      outStream.on('finish', handleFinish)
    })
  }

  return handleFile
}

app.post(
  `/:a/convert/:b`,
  async (req, res) => {
    // attach to req.body.input.file
    await loadFiles(req, res, {
      link: [{ path: 'input[file]', max: 1, min: 1 }],
      hook: buildFileHandle(req.params.a),
    })
  }
)