import { PortfolioExportJobSchema } from "@/tools/aggregate/portfolio-export/types";

export class PortfolioExportProgressTransformStream extends TransformStream {
  public schemas: PortfolioExportJobSchema[] = [];

  constructor() {
    console.log("TransformStream");
    const decoder = new TextDecoder();

    let lastTail = "";

    const convertLine = (line: string) => {
      let result;
      try {
        result = line.startsWith("\x1E") ? JSON.parse(line.slice(1)) : JSON.parse(line);
      } catch (err) {
        console.error(err);
        return [];
      }

      return Object.values(result);
    };

    super({
      transform: async (_chunk, controller) => {
        let chunk: BufferSource;
        try {
          chunk = await _chunk;
        } catch (err) {
          controller.terminate();
          return;
        }
        const lines = decoder.decode(chunk).split("\n");

        if (lines.length < 2) {
          // the chunk is not a complete line
          // accumulate & skip to the next chunk
          lastTail += lines[0];
          return;
        }

        lines.forEach((line, index) => {
          switch (index) {
            case 0:
              for (const row of convertLine(lastTail + line)) {
                controller.enqueue(row);
              }
              break;

            case lines.length - 1:
              lastTail = line;
              break;

            default:
              for (const row of convertLine(line)) {
                controller.enqueue(row);
              }
          }
        });
      },
      flush: (controller) => {
        // Send the very final line.
        if (lastTail) {
          for (const row of convertLine(lastTail)) {
            controller.enqueue(row);
          }
        }

        controller.terminate();
      },
    });
  }

  public progress = async () => {
    console.log("new Schema: ", this.schemas);
  };
}
