// libraries
import _ from 'lodash'
import originalFetch from 'isomorphic-fetch'
import fetch from 'fetch-retry'
import { DataStream } from 'scramjet'
import ndjsonStream from 'can-ndjson-stream'

// constants
import {
  MAX_BATCH_NUM_ROWS,
  MAX_BATCH_INTERVAL_TIME,
  MAX_LIVE_REQUEST_RETRY_ATTEMPTS,
  MAX_NON_LIVE_DATA_REQUEST_RETRY_ATTEMPTS,
  RETRY_DELAY_BETWEEN_ATTEMPTS,
} from 'constants/unipipe'
import { MESSAGE_TYPES } from 'constants/message'

// helpers
import log, { reportMessage, reportException } from 'helpers/log'
import { retryOn, getFetchParams, getForceUseSuSession } from 'services/utils'
import { isLiveDataset, getDatasetName, isHeartBeatData } from 'helpers/unipipe'
import { validateGeojson } from 'helpers/geojson'
import {
  getElapsedSecondsToNow,
  getFormattedUtcNow,
  getFromNow,
} from 'helpers/datetime'

import type {
  UpBaseSpecification,
  UpSpecificationTimeliness,
} from 'types/unipipe'
import type { MapLayerData, GeojsonData } from 'types/map'
import type { DatasetMetadata } from 'types/common'
import to from 'await-to-js'

type OnMessage = (message: string, type: string) => void

type OnBatch = ({
  geojsonRows,
  isLoading,
  loadCanceled,
}: {
  geojsonRows: MapLayerData
  isLoading: boolean
  loadCanceled: boolean
}) => void

type GetDataStream = ({
  response,
  controller,
  onMessage,
}: {
  response: Response
  onMessage: OnMessage
  controller: AbortController
}) => void

const fetchRetry = fetch(originalFetch)

const dispatchMessages = _.debounce(
  ({
    message,
    error,
    type,
    onMessage,
  }: {
    message: string
    error?: Partial<Error>
    type: string
    onMessage: OnMessage
  }) => {
    onMessage(message, type)
    if (error) {
      reportMessage(`${message}.${error.message}`, type, error)
    } else {
      log.warn(message)
    }
  },
  200
)

export const getValidGeojsonRows = (
  geojsonRows: MapLayerData,
  identityProperty = 'name'
): { validGeojsons: MapLayerData; invalidGeojsons: MapLayerData } => {
  const invalidGeojsons: MapLayerData = []
  const validGeojsons = _(geojsonRows)
    .reject(isHeartBeatData)
    .filter(data => {
      if (!data || _.isEmpty(data)) return false

      const { [identityProperty]: name, time } = data.properties || {}
      if (_.isNil(name) || !time || !_.isEmpty(validateGeojson(data))) {
        invalidGeojsons.push(data)

        return false
      }
      return true
    })
    .value()
  return { validGeojsons, invalidGeojsons }
}

class DatasetService {
  readonly upBaseUrl: string

  readonly specification: UpBaseSpecification

  readonly displayName: string

  readonly metadata: DatasetMetadata

  readonly timeliness: UpSpecificationTimeliness

  readonly identityProperty: string

  readonly credentials: string

  readonly forceUseSuSession: boolean

  controller?: AbortController

  error: string

  rowsLoaded = 0

  loadCanceled = false

  isLoading = false

  heartbeatTimeout = false

  heartbeatCount = 0

  heartbeatRateInSeconds = 0

  lastGetValidDataTime?: string

  lastGetHeartbeatDataTime?: string

  constructor({
    upBaseUrl,
    specification,
    metadata,
    credentials,
  }: {
    upBaseUrl: string
    specification: UpBaseSpecification
    metadata: DatasetMetadata
    credentials: string
  }) {
    this.upBaseUrl = upBaseUrl
    this.specification = specification
    this.displayName = getDatasetName(metadata)
    this.metadata = metadata
    const { timeliness, identityProperty } = metadata || {}

    this.timeliness = timeliness
    this.identityProperty = identityProperty
    this.rowsLoaded = 0
    this.loadCanceled = false
    this.isLoading = false
    this.error = ''
    this.credentials = credentials
    this.forceUseSuSession = getForceUseSuSession(metadata)
    this.heartbeatCount = 0
    if (isLiveDataset(timeliness)) {
      this.heartbeatRateInSeconds = 30 + 30
    }
  }

  getStreamResponse = (signal: AbortSignal): Promise<Response> => {
    const retries = isLiveDataset(this.timeliness)
      ? MAX_LIVE_REQUEST_RETRY_ATTEMPTS
      : MAX_NON_LIVE_DATA_REQUEST_RETRY_ATTEMPTS

    const retryOptions = {
      retries,
      retryOn: retryOn(retries),
      retryDelay: RETRY_DELAY_BETWEEN_ATTEMPTS,
    }

    const fetchParams = getFetchParams({
      ...retryOptions,
      method: 'post',
      body: JSON.stringify(this.specification),
      signal,
      credentials: this.credentials,
      forceUseSuSession: this.forceUseSuSession,
    })

    return fetchRetry(`${this.upBaseUrl}/retrieve`, fetchParams)
  }

  fetchUnipipeData = (onBatch: OnBatch, onMessage: OnMessage): void => {
    this.isLoading = true

    const controller = new AbortController()
    this.controller = controller

    this.getStreamResponse(controller.signal)
      .then(response => {
        if (!response.ok) {
          this.handleStreamResponseError(response, onMessage)
          return
        }

        this.handleStreamResponse({ response, controller, onBatch, onMessage })
      })
      .catch(err => {
        this.isLoading = false
        dispatchMessages({
          message: 'Error occurred while fetching data',
          type: MESSAGE_TYPES.error,
          onMessage,
          error: err,
        })
      })
  }

  handleStreamResponseError = async (
    response: Response,
    onMessage: OnMessage
  ): Promise<void> => {
    this.isLoading = false
    let message
    const is404Error = response.status === 404
    if (is404Error) {
      message = response.statusText
    } else {
      const responseJson = await response.json()
      message = responseJson.error
    }

    dispatchMessages({
      message: `Error fetching ${this.displayName}`,
      type: MESSAGE_TYPES.error,
      onMessage,
      error: { message },
    })
  }

  getDataStream = ({
    response,
    controller,
    onMessage,
  }: {
    response: Response
    onMessage: OnMessage
    controller: AbortController
  }): (() => AsyncGenerator<GeojsonData, void, unknown>) => {
    const self = this
    const jsonReader = ndjsonStream(response.body).getReader()

    let jsonRow
    return async function* readRow() {
      try {
        while (true) {
          const [err, data] = await to(jsonReader.read())
          if (!err || data) yield

          jsonRow = data
          if (jsonRow.done || self.loadCanceled) {
            if (self.loadCanceled && !isLiveDataset(self.timeliness)) {
              self.isLoading = false
              self.loadCanceled = true

              dispatchMessages({
                message: `Request canceled by the user. Received ${self.rowsLoaded} rows of ${self.displayName}`,
                type: MESSAGE_TYPES.info,
                onMessage,
              })
            }
            jsonReader.cancel()
            controller.abort()
            return
          }
          yield jsonRow.value
        }
      } catch (e) {
        reportException(
          `DatasetService read data error:${(e as Error).message}`,
          {
            displayName: self.displayName,
          }
        )
      }
    }
  }

  handleStreamResponse = ({
    response,
    controller,
    onBatch,
    onMessage,
  }: {
    response: Response
    controller: AbortController
    onBatch: OnBatch
    onMessage: OnMessage
  }): void => {
    const { hints } = this.metadata || {}
    const {
      timeBatchMaxItems = MAX_BATCH_NUM_ROWS,
      timeBatchMaxTime = MAX_BATCH_INTERVAL_TIME,
    } = hints || {}

    this.dataStreamTransformer({
      getDataStream: this.getDataStream({ response, controller, onMessage }),
      timeBatchMaxItems,
      timeBatchMaxTime,
      onBatch,
      onMessage,
    })
      .then(() => {
        if (this.heartbeatTimeout) {
          this.fetchUnipipeData(onBatch, onMessage)
          this.heartbeatTimeout = false
        } else {
          this.isLoading = false
          onBatch({
            geojsonRows: [],
            isLoading: this.isLoading,
            loadCanceled: this.loadCanceled,
          })
        }
      })
      .catch(err => {
        if (err !== this.error) {
          this.error = err
          this.handleDataStreamTransformerError({ err, onMessage, controller })
        }
      })
  }

  handleDataStreamTransformerError = ({
    err,
    onMessage,
    controller,
  }: {
    err: Error
    onMessage: OnMessage
    controller: AbortController
  }): void => {
    this.isLoading = false
    this.loadCanceled = true
    dispatchMessages({
      message: `[${this.displayName}]Error occurred while saving data.`,
      type: MESSAGE_TYPES.error,
      onMessage,
      error: err,
    })
    controller.abort()
  }

  onFoundHeartbeatData = (heartBeatData: GeojsonData): void => {
    const {
      properties: { time },
    } = heartBeatData
    this.heartbeatCount += 1

    log.info(
      `${this.metadata.label}/${this.metadata.dataset}: heartbeat ${time}`
    )

    log.debug(
      `[${this.heartbeatCount}] ${this.metadata.label}/${
        this.metadata.dataset
      }: ${
        this.lastGetHeartbeatDataTime
          ? `Last heartbeat time: ${this.lastGetHeartbeatDataTime}; `
          : ''
      } ${
        this.lastGetValidDataTime
          ? `Last valid data time: ${this.lastGetValidDataTime}; `
          : ''
      }Current heartbeat time: ${time}; Now: ${getFormattedUtcNow()}`
    )
    this.lastGetHeartbeatDataTime = time
  }

  onFoundEmptyValidData = (): void => {
    const allowedWaitingSeconds = 5 * 60 * 1000

    const validDataElapsedSecondsToNow = getElapsedSecondsToNow(
      this.lastGetValidDataTime
    )

    if (validDataElapsedSecondsToNow > allowedWaitingSeconds) {
      log.warn(
        `${this.metadata.label}/${
          this.metadata.dataset
        }: No live data update since ${this.lastGetValidDataTime}(${getFromNow(
          this.lastGetValidDataTime
        )}). ${getFormattedUtcNow()}`
      )
    }
  }

  onBatchLiveData = ({
    geojsonRows,
    onBatch,
  }: {
    geojsonRows: MapLayerData
    onBatch: OnBatch
  }): void => {
    if (
      getElapsedSecondsToNow(this.lastGetHeartbeatDataTime) >
      this.heartbeatRateInSeconds
    ) {
      log.warn('No heartbeat data, will re-initialize a connection')
      this.heartbeatTimeout = true
      this.jsonReader?.cancel()
    }

    const lastKnownHeartBeatData = _(geojsonRows)
      .filter(isHeartBeatData)
      .orderBy(['properties.time'], ['desc'])
      .first()

    if (!_.isEmpty(lastKnownHeartBeatData)) {
      this.onFoundHeartbeatData(lastKnownHeartBeatData)
    }

    const validData = _.isEmpty(lastKnownHeartBeatData)
      ? geojsonRows
      : _.reject(geojsonRows, isHeartBeatData)

    log.info(
      `${this.metadata.label}/${this.metadata.dataset}: Received ${
        validData?.length
      } rows of live data. ${getFormattedUtcNow()}`
    )

    if (_.isEmpty(validData)) {
      this.onFoundEmptyValidData()
    } else {
      this.lastGetValidDataTime = getFormattedUtcNow()
      this.heartbeatCount = 0

      onBatch({
        isLoading: this.isLoading,
        loadCanceled: this.loadCanceled,
        geojsonRows: validData,
      })
      // will be used for data loading toast messages
      // live dataset service will always be active unless is being canceled
      this.isLoading = false
    }
  }

  dataStreamTransformer = async ({
    getDataStream,
    timeBatchMaxItems, // maximum time in ms
    timeBatchMaxTime, // maximum items in batch
    onBatch,
    onMessage,
    // a number of simultaneous number of parallel operations that are
    // currently happening.if the value is bigger than one, which allows
    // processing items without keeping order
    maxParallel = 1,
  }: {
    getDataStream: GetDataStream
    timeBatchMaxItems: number
    timeBatchMaxTime: number
    onBatch: OnBatch
    onMessage: OnMessage
    maxParallel?: number
  }): Promise<void> => {
    return DataStream.from(getDataStream)
      .setOptions({ maxParallel })
      .timeBatch(timeBatchMaxTime, timeBatchMaxItems)
      .each(async batch => {
        if (this.loadCanceled) {
          return
        }

        const { validGeojsons: geojsonRows, invalidGeojsons } =
          getValidGeojsonRows(batch, this.identityProperty)

        if (!_.isEmpty(invalidGeojsons)) {
          reportException(
            `Found ${invalidGeojsons.length} rows of invalid data from ${this.metadata.label} / ${this.metadata.dataset} (missing identity property - ${this.identityProperty} value or time or geojson is not valid)`,
            {
              invalidGeojsons,
              dataset: this.metadata.dataset,
            }
          )
        }

        this.rowsLoaded += geojsonRows.length
        const isLiveData = isLiveDataset(this.timeliness)
        if (isLiveData) {
          this.onBatchLiveData({
            geojsonRows,
            timeBatchMaxTime,
            onBatch,
            onMessage,
          })
        } else {
          onBatch({
            geojsonRows,
            isLoading: this.isLoading,
            loadCanceled: this.loadCanceled,
          })
        }
      })
      .run()
  }

  cancel = (): void => {
    this.loadCanceled = true
    this.isLoading = false
    this.heartbeatCount = 0
    this.lastGetHeartbeatDataTime = undefined
    this.lastGetValidDataTime = undefined
  }
}

export default DatasetService
