Canvas: Render logs using XTerm

Use Server Sent Events to stream logs.

Change-Id: I3790a22a39b71409636a81dbe2a2cc8bf4977cb4
diff --git a/apps/canvas/back/src/index.ts b/apps/canvas/back/src/index.ts
index 4190f70..fdba5a0 100644
--- a/apps/canvas/back/src/index.ts
+++ b/apps/canvas/back/src/index.ts
@@ -6,7 +6,7 @@
 import { GithubClient } from "./github.js";
 import { AppManager } from "./app_manager.js";
 import { z } from "zod";
-import { ProjectMonitor, WorkerSchema } from "./project_monitor.js";
+import { ProjectMonitor, WorkerSchema, LogItem } from "./project_monitor.js";
 import tmp from "tmp";
 import { NodeJSAnalyzer } from "./lib/nodejs.js";
 import shell from "shelljs";
@@ -22,6 +22,7 @@
 	Network,
 	GithubRepository,
 } from "config";
+import { Instant, DateTimeFormatter, ZoneId } from "@js-joda/core";
 
 async function generateKey(root: string): Promise<[string, string]> {
 	const privKeyPath = path.join(root, "key");
@@ -721,41 +722,69 @@
 };
 
 const handleServiceLogs: express.Handler = async (req, resp) => {
+	const projectId = Number(req.params["projectId"]);
+	const service = req.params["service"];
+	const workerId = req.params["workerId"];
+
+	resp.setHeader("Content-Type", "text/event-stream");
+	resp.setHeader("Cache-Control", "no-cache");
+	resp.setHeader("Connection", "keep-alive");
+	resp.flushHeaders();
+
+	const timestampFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+	const sendLogs = (logs: LogItem[]) => {
+		if (logs.length == 0) {
+			return;
+		}
+		const logString = logs
+			.map((l) => {
+				const t = Instant.ofEpochMilli(l.timestampMilli);
+				const formattedTimestamp = t.atZone(ZoneId.UTC).format(timestampFormat);
+				return `\x1b[38;5;240m${formattedTimestamp}\x1b[0m ${l.contents}`;
+			})
+			.join("\n");
+		resp.write("event: message\n");
+		resp.write(`data: ${JSON.stringify({ logs: logString })}\n\n`);
+	};
+
 	try {
-		const projectId = Number(req.params["projectId"]);
-		const service = req.params["service"];
-		const workerId = req.params["workerId"];
 		const project = await db.project.findUnique({
-			where: {
-				id: projectId,
-				userId: resp.locals.userId,
-			},
+			where: { id: projectId, userId: resp.locals.userId },
 		});
-		if (project == null) {
-			resp.status(404);
-			resp.write(JSON.stringify({ error: "Project not found" }));
+
+		if (!project) {
+			resp.status(404).end();
 			return;
 		}
+
 		const monitor = projectMonitors.get(projectId);
-		if (!monitor || !monitor.hasLogs()) {
-			resp.status(404);
-			resp.write(JSON.stringify({ error: "No logs found for this project" }));
+		if (!monitor) {
+			resp.status(404).end();
 			return;
 		}
-		const serviceLog = monitor.getWorkerLog(service, workerId);
-		if (!serviceLog) {
-			resp.status(404);
-			resp.write(JSON.stringify({ error: "No logs found for this service/worker" }));
-			return;
-		}
-		resp.status(200);
-		resp.write(JSON.stringify({ logs: serviceLog }));
+
+		let lastLogCount = 0;
+		const initialLogs = monitor.getWorkerLog(service, workerId) || [];
+		sendLogs(initialLogs);
+		lastLogCount = initialLogs.length;
+		resp.flushHeaders();
+
+		const intervalId = setInterval(() => {
+			const currentLogs = monitor.getWorkerLog(service, workerId) || [];
+			if (currentLogs.length > lastLogCount) {
+				const newLogs = currentLogs.slice(lastLogCount);
+				sendLogs(newLogs);
+				lastLogCount = currentLogs.length;
+			}
+		}, 500);
+
+		req.on("close", () => {
+			clearInterval(intervalId);
+			resp.end();
+		});
 	} catch (e) {
 		console.log(e);
-		resp.status(500);
-		resp.write(JSON.stringify({ error: "Failed to get service logs" }));
-	} finally {
-		resp.end();
+		resp.status(500).end();
 	}
 };
 
@@ -783,6 +812,7 @@
 		resp.write(
 			JSON.stringify({
 				success: true,
+				logItemsConsumed: result.data.logs?.length ?? 0,
 			}),
 		);
 	} catch (e) {