stream.js

'use strict'

// Module dependencies.
const Instagram = require('./instagram')
const Readable = require('stream').Readable

const FILTER_TYPE_TAG = 'tag'
const FILTER_TYPE_USER = 'user'

const FILTER_PATH_MAP = {
  [FILTER_TYPE_TAG]: '/v1/tags/:key/media/recent',
  [FILTER_TYPE_USER]: '/v1/users/:key/media/recent'
}

/**
 * Class representing Instagram stream.
 */

class InstagramStream extends Readable {
  /**
   * Create an instance.
   *
   * Authorization options are passed to
   * underlying Instagram client.
   *
   * @param {Object} [options] Instance options.
   * @param {string} [options.client_id] Client ID.
   * @param {string} [options.client_secret] Client Secret.
   * @param {string} [options.access_token] Access token.
   */

  constructor (options) {
    super({ objectMode: true })

    options || (options = {})
    this._client = options.client || new Instagram(options)
    this._token = options.access_token

    this._filters = {
      [FILTER_TYPE_TAG]: {},
      [FILTER_TYPE_USER]: {}
    }

    this.timer = null
    this.fetchFilterType = 0
    this.interval = 1000
    this.lastFetch = 0
  }

  /**
   * Instance of {@link Instagram} that serves
   * as underlying source of data.
   */

  get client () {
    return this._client
  }

  /**
   * `true` if there is tracks set up, `false` otherwise.
   */

  get isEmpty () {
    return !this.tracking().length &&
           !this.following().length
  }

  /**
   * Set access token.
   *
   * @param {string} token Access token.
   *
   * @returns {InstagramStream} This instance. 
   */

  setAccessToken (token) {
    this._token = token
    return this
  }

  /**
   * Track tag.
   *
   * @param {string} tag Tag name.
   *
   * @returns {InstagramStream} This instance.
   */

  track (tag) {
    this._addFilter(FILTER_TYPE_TAG, tag)
    return this
  }

  /**
   * Untrack tag.
   *
   * @param {string} tag Tag name.
   *
   * @returns {InstagramStream} This instance.
   */

  untrack (tag) {
    this._removeFilter(FILTER_TYPE_TAG, tag)
    return this
  }

  /**
   * Follow user.
   *
   * @param {string} userId User ID.
   *
   * @returns {InstagramStream} This instance.
   */

  follow (userId) {
    this._addFilter(FILTER_TYPE_USER, userId)
    return this
  }

  /**
   * Unfollow user.
   *
   * @param {string} userId User ID.
   *
   * @returns {InstagramStream} This instance.
   */

  unfollow (userId) {
    this._removeFilter(FILTER_TYPE_USER, userId)
    return this
  }

  /**
   * Get currently tracked tags.
   *
   * @returns {string[]} List of tags.
   */

  tracking () {
    return Object.keys(this._filters[FILTER_TYPE_TAG])
  }

  /**
   * Get currently tracked users.
   *
   * @returns {string[]} List of user IDs.
   */

  following () {
    return Object.keys(this._filters[FILTER_TYPE_USER])
  }

  _read (size) {
    this._scheduleFetch()
  }

  _fetch () {
    if (this.isEmpty) return

    let filterTypes = Object.keys(this._filters)
    this.fetchFilterType = ++this.fetchFilterType % filterTypes.length
    let filterType = filterTypes[this.fetchFilterType]
    let filters = this._filters[filterType]
    let value = Object.keys(filters)
      .sort((a, b) => filters[a].lastUpdate - filters[b].lastUpdate)
      .shift()

    if (!value) return this._fetch()

    let path = FILTER_PATH_MAP[filterType].replace(':key', encodeURIComponent(value))

    this.lastFetch = Date.now()
    this._client.request('GET', path, {
      query: {
        access_token: this._token,
        count: 50
      }
    }).then((res) => {
      let meta = filters[value]
      if (!meta) return

      meta.lastUpdate = Date.now()

      res.body.data
        .filter((post) => !~meta.lastPostIds.indexOf(post.id))
        .forEach((data) => {
          meta.lastPostIds.push(data.id)
          this.push(data)
        })

      meta.lastPostIds = meta.lastPostIds.splice(-50)
    }).then(() => {
      this._scheduleFetch()
    }).catch((err) => {
      this.emit('error', err)
      this._scheduleFetch(err)
    })
  }

  _scheduleFetch (err) {
    if (this.isEmpty || this.timer) return
    let elapsedTime = Date.now() - this.lastFetch
    if (elapsedTime > this.interval) return this._fetch()
    this.timer = setTimeout(() => {
      this._clearTimer()
      this._fetch()
    }, this.interval - elapsedTime)
  }

  _clearTimer () {
    clearTimeout(this.timer)
    this.timer = null
  }

  _addFilter (filter, values) {
    if (!Array.isArray(values)) {
      values = [values]
    }

    values.forEach((value) => {
      if (!this._filters[filter][value]) {
        this._filters[filter][value] = {
          count: 0,
          lastUpdate: 0,
          lastPostIds: []
        }
      }
      this._filters[filter][value].count++
    })

    process.nextTick(this._scheduleFetch.bind(this))
  }

  _removeFilter (filter, values) {
    if (!Array.isArray(values)) {
      values = [values]
    }

    values.forEach((value) => {
      if (!this._filters[filter][value]) return
      if (--this._filters[filter][value].count === 0) {
        delete this._filters[filter][value]
      }
    })

    if (this.isEmpty) {
      this._clearTimer()
    }
  }
}

module.exports = InstagramStream