Queue.js

import {_setImmediate} from './_setImmediate'
import PrioArray from './PrioArray'

/**
* Run queued `items` through an asynchronous `task`.
*
* Once finishing the `task` an optional callback is called.
* While pushing to the queue, you may define a priority for execution.
* Lower values means faster execution.
*
* @name Queue
* @methodOf: module:parallel
* @class
* @param {Function} task - iterator function of type `function (item: any, cb: Function, index: Number)`
* @param {Number} concurrency - max. number of tasks running in parallel
* @example <caption>Default usage</caption>
* var arr = []
* var q = new Queue((item, cb) => {
*   arr.push(item)
*   cb(null, item)
* })
* // push item "one" at end of queue
* q.push('one', (err, res) => {
*   console.log(res + ' finished')
* })
* // add item "two" at start of queue
* q.unshift('two', () => {
*   console.log('two finished')
* })
* // called when all items in queue where processed
* q.drain(() => {
*   console.log(arr)
*   //> arr = ['one', 'two']
* })
* @example <caption>Using priorities</caption>
* let arr = []
*
* let q = new Queue(function (item, cb) {
*   arr.push(item)
*   cb()
* }, 2)
*
* q.concat([100, 101, 102], 3) // priority = 3 - last (but 2 items already processed)
* q.concat([0, 1, 2], 1)       // priority = 1 - first
* q.concat([10, 11, 12], 2)    // priority = 2 - second
*
* q.drain(() => {
*   //> arr = [ 100, 101, 0, 1, 2, 10, 11, 12, 102 ])
* })
*/
export default function Queue (task, concurrency) {
  this._task = task
  this._concurrency = Math.abs(concurrency || 1)
  this._worker = 0
  this._paused = false
  this._items = new PrioArray()
}

Queue.prototype = {
  /**
  * process items in queue
  * @private
  */
  _run () {
    let {_items, _drain} = this
    this._worker -= 1
    if (_items.length === 0) {
      if (this._worker <= 0) {
        this._worker = 0
        _drain && _drain()
      }
    } else {
      this._worker += 1
      let [item, cb] = _items.shift()
      this._task(item, (err, res) => {
        cb && cb(err, res)
        _setImmediate(() => { // prevent RangeError: Maximum call stack size exceeded for sync tasks
          this._run()
        })
      })
    }
  },

  /**
  * start processing queue or add workers up to concurrency
  * @private
  */
  _start () {
    while (!this._paused && this._worker < Math.min(this._concurrency, this._items.length)) {
      this._worker += 1
      this._run()
    }
    return this
  },

  /**
  * Check if queue is paused
  * @return {Boolean} `true` if paused
  */
  get paused () {
    return this._paused
  },

  /**
  * Check if queue is idle - means no items in queue and no workers running
  * @return {Boolean} `true` if idle
  */
  get idle () {
    return !this.length && this._worker === 0
  },

  /**
  * Number of items waiting in the queue to get processed
  * @return {Number} number of items in queue
  */
  get length () {
    return this._items.length
  },

  /**
  * Pause processing
  * @return {this} for chaining
  */
  pause () {
    this._paused = true
    return this
  },

  /**
  * Resume processing
  * @return {this} for chaining
  */
  resume () {
    this._paused = false
    return this._start()
  },

  /**
  * Reset the queue by removing all pending items from the queue
  * @return {this} for chaining
  */
  reset () {
    this._items.reset()
    return this
  },

  /**
  * Number of items being processed
  * @return {Number} number of items processed
  */
  running () {
    return this._worker
  },

  /**
  * push `item` onto queue
  * @param {Any} item
  * @param {Function} [callback] - optional callback if item was processed
  * @param {Number} [priority] - priority `0 ... Infinity` of the item to process. Smaller values, faster processing
  * @return {this} for chaining
  */
  push (item, callback, priority) {
    return this.concat([item], callback, priority)
  },

  /**
  * concat `items` onto queue - fills the queue first with `items` before starting processing
  * @param {Any[]} items
  * @param {Function} [callback] - optional callback if single item was processed
  * @param {Number} [priority] - priority `0 ... Infinity` of the item to process. Smaller values, faster processing
  * @return {this} for chaining
  */
  concat (items, callback, priority) {
    if (typeof callback === 'number') {
      priority = callback
      callback = undefined
    }
    items.forEach((item) => {
      this._items.push([item, callback], priority)
    })
    return this._start()
  },

  /**
  * put `item` at the very beginnning of the queue
  * @param {Any} item
  * @param {Function} [callback] - optional callback if item was processed
  * @return {this} for chaining
  */
  unshift (item, callback) {
    this._items.unshift([item, callback])
    return this._start()
  },

  /**
  * @param {Function} [callback] - optional callback called if all queue items got processed
  * @return {this} for chaining
  */
  drain (callback) {
    this._drain = callback
    return this
  }
}

/**
* Run queued `items` through an asynchronous `task`.
*
* Once finishing the `task` an optional callback is called.
* While pushing to the queue, you may define a priority for execution.
* Lower values means faster execution.
*
* See full API here {@link Queue}.
*
* @name queue
* @memberOf module:parallel
* @static
* @method
* @param {Function} task - iterator function of type `function (item: any, cb: Function, index: Number)`
* @param {Number} concurrency - max. number of tasks running in parallel
* @return {Queue}
*/
export function queue (task, concurrency) {
  return new Queue(task, concurrency)
}