// @flow

import { from } from 'rxjs';
import {
  concatMap, mergeMap,
} from 'rxjs/operators';
import toposort from 'toposort';
import Papa from 'papaparse';

import type {
  ActionsObservable,
  StateObservable,
} from 'redux-observable';

import ce from './ce';

import {
  CREATE_NODE_AND_LOAD_DATA, LOAD_DATA, PARAM_CHANGE, RESONATE, RECOMPUTE,
  createNodeAndLoadData, loadData, paramChange, resonate, recompute,
} from '../actions/computes';
import {
  setNodeData, setNodeParams, setNodeOutputs, setNodeInputs, addNode,
} from '../actions/nodes';

import type { STATE_TYPE } from '../data';
import type {
  CONNECTORS_STATE_TYPE,
} from '../data/connectors';
import type {
  CREATE_NODE_AND_LOAD_DATA_TYPE,
  LOAD_DATA_TYPE,
  PARAM_CHANGE_TYPE,
  RESONATE_TYPE,
  RECOMPUTE_TYPE,
} from '../actions/computes';

function subgraph ( connectors: CONNECTORS_STATE_TYPE, fromNode: string, acc: Array<string> ) {
  acc.push( fromNode );
  connectors.forEach( ( connector ) => {
    connector.fromNode === fromNode && subgraph( connectors, connector.toNode, acc );
  } );
}

/* eslint max-len: ["error", { "code": 150 }] */

const computes = ce( {

  [ CREATE_NODE_AND_LOAD_DATA ]: ( action$: ActionsObservable<createNodeAndLoadData>, state$: StateObservable<STATE_TYPE> ) => action$.pipe(
    mergeMap( ( action: { payload: CREATE_NODE_AND_LOAD_DATA_TYPE, nid: string } ) => {
      const { nid } = action;
      const {
        nodeType, x, y, file,
      } = action.payload;

      // console.log( 'CREATE_NODE_AND_LOAD_DATA', { nid, nodeType, x, y, file } );

      const createPromise = new Promise( ( resolve ) => {
        import( `../components/nodes/${ nodeType }.js` ).then( ( module ) => {
          // get input specs
          const { inputs } = module;

          resolve( { inputs } );
        } );
      } );

      const parsePromise = new Promise( ( resolve ) => {
        Papa.parse( file, {
          header: false,
          trimHeaders: true,
          dynamicTyping: true,
          complete: ( res ) => {
            const { data } = res;
            const headerRow = data[ 0 ];
            const firstRow = data[ 1 ];
            const meta = headerRow.map( ( d, i ) => ( {
              name: d,
              type: typeof firstRow[ i ],
            } ) );
            resolve( { data: data.slice( 1 ), meta } );
          },
        } );
      } );

      return from( Promise.all( [ createPromise, parsePromise ] ) ).pipe(
        mergeMap( results => [
          addNode( {
            nid, nodeType, x, y, inputs: [], outputs: [],
          } ),
          setNodeInputs( { nid, inputs: results[ 0 ].inputs } ),
          setNodeOutputs( { nid, outputs: results[ 1 ].meta } ),
          setNodeData( { nid, data: results[ 1 ].data, meta: results[ 1 ].meta } ),
        ] )
      );
    } )
  ),

  [ LOAD_DATA ]: ( action$: ActionsObservable<loadData>, state$: StateObservable<STATE_TYPE> ) => action$.pipe(
    mergeMap( ( action: { payload: LOAD_DATA_TYPE } ) => {
      const { nid, file } = action.payload;

      // console.log( 'LOAD_DATA', { nid, file } )

      const parsePromise = new Promise( ( resolve ) => {
        Papa.parse( file, {
          header: false,
          trimHeaders: true,
          dynamicTyping: true,
          complete: ( res ) => {
            const { data } = res;
            const headerRow = data[ 0 ];
            const firstRow = data[ 1 ];
            const meta = headerRow.map( ( d, i ) => ( {
              name: d,
              type: typeof firstRow[ i ],
            } ) );
            resolve( { data: data.slice( 1 ), meta } );
          },
        } );
      } );

      return from( parsePromise ).pipe(
        mergeMap( ( res ) => {
          const { data, meta } = res;
          return [
            setNodeOutputs( { nid, outputs: meta } ),
            setNodeData( { nid, data, meta } ),
            resonate( { nid } ),
          ];
        } )
      );
    } )
  ),

  [ PARAM_CHANGE ]: ( action$: ActionsObservable<paramChange>, state$: StateObservable<STATE_TYPE> ) => action$.pipe(
    mergeMap( ( action: { payload: PARAM_CHANGE_TYPE } ) => {
      const { nid, params } = action.payload;

      return [
        setNodeParams( { nid, params } ),
        resonate( { nid } ),
      ];
    } )
  ),

  [ RESONATE ]: ( action$: ActionsObservable<resonate>, state$: StateObservable<STATE_TYPE> ) => action$.pipe(
    concatMap( ( action: { payload: RESONATE_TYPE } ) => {
      const { nid } = action.payload;
      const { nodes, connectors } = state$.value;

      // select downstream indices from toposort
      const toposortedIndices = toposort(
        connectors.map( connector => ( [ connector.fromNode, connector.toNode ] ) )
      );
      const downstreamIndices = toposortedIndices.slice(
        toposortedIndices.findIndex( el => el === nid )
      );

      // select subgraph indices from connectors graph
      const subgraphIndices = [];
      subgraph( connectors, nid, subgraphIndices );

      // select downstream nodes that are only available in subgraph
      const downstreamNodes = downstreamIndices.reduce( ( acc, el ) => {
        if ( subgraphIndices.indexOf( el ) >= 0 ) {
          acc.push( nodes.find( node => node.nid === el ) );
        }
        return acc;
      }, [] );

      // recompute merged downstream nodes
      const recomputations = downstreamNodes.reduce( ( acc, node ) => {
        acc.push( recompute( { nid: node.nid } ) );
        return acc;
      }, [] );

      return recomputations;
    } )
  ),

  [ RECOMPUTE ]: ( action$: ActionsObservable<recompute>, state$: StateObservable<STATE_TYPE> ) => action$.pipe(
    concatMap( ( action: { payload: RECOMPUTE_TYPE } ) => {
      const { nid } = action.payload;
      const { nodes, connectors } = state$.value;

      const currentNode = nodes.find( node => node.nid === nid );

      // get inputs from upstream nodes into current node
      const inputs = connectors
        .filter( connector => connector.toNode === nid )
        .reduce( ( acc, connector ) => {
          const fromNode = nodes.find( node => node.nid === connector.fromNode );
          acc[ connector.toPin ] = {
            fromPin: connector.fromPin,
            data: fromNode.data,
            meta: fromNode.meta,
          };
          return acc;
        }, {} );

      // wrap transform async into promise
      const transformPromise = new Promise( ( resolve ) => {
        // dynamically import node implementation
        import( `../components/nodes/${ currentNode.nodeType }.js` ).then( ( module ) => {
          // get transform function
          const { transform } = module;

          // transform data
          const { data, meta } = transform( inputs, currentNode );

          // set node data
          resolve( { data, meta } );
        } );
      } );

      // fire off setNodeData and setNodeOutputs
      return from( transformPromise ).pipe(
        mergeMap( ( result ) => {
          const { data, meta } = result;
          return [
            setNodeData( { nid, data, meta } ),
            setNodeOutputs( { nid, outputs: meta } ),
          ];
        } )
      );
    } )
  ),

} );

export default computes;
