Canvas: Render logs using XTerm

Use Server Sent Events to stream logs.

Change-Id: I3790a22a39b71409636a81dbe2a2cc8bf4977cb4
diff --git a/apps/canvas/back/package-lock.json b/apps/canvas/back/package-lock.json
index 25d7214..55e9382 100644
--- a/apps/canvas/back/package-lock.json
+++ b/apps/canvas/back/package-lock.json
@@ -9,6 +9,7 @@
       "version": "1.0.0",
       "license": "ISC",
       "dependencies": {
+        "@js-joda/core": "^5.6.5",
         "@loancrate/prisma-schema-parser": "^3.0.0",
         "@prisma/client": "^6.6.0",
         "axios": "^1.8.4",
@@ -2320,6 +2321,12 @@
         "@jridgewell/sourcemap-codec": "^1.4.14"
       }
     },
+    "node_modules/@js-joda/core": {
+      "version": "5.6.5",
+      "resolved": "https://registry.npmjs.org/@js-joda/core/-/core-5.6.5.tgz",
+      "integrity": "sha512-3zwefSMwHpu8iVUW8YYz227sIv6UFqO31p1Bf1ZH/Vom7CmNyUsXjDBlnNzcuhmOL1XfxZ3nvND42kR23XlbcQ==",
+      "license": "BSD-3-Clause"
+    },
     "node_modules/@jsonjoy.com/base64": {
       "version": "1.1.2",
       "resolved": "https://registry.npmjs.org/@jsonjoy.com/base64/-/base64-1.1.2.tgz",
diff --git a/apps/canvas/back/package.json b/apps/canvas/back/package.json
index dd63048..927ebe4 100644
--- a/apps/canvas/back/package.json
+++ b/apps/canvas/back/package.json
@@ -16,6 +16,7 @@
   "author": "",
   "license": "ISC",
   "dependencies": {
+    "@js-joda/core": "^5.6.5",
     "@loancrate/prisma-schema-parser": "^3.0.0",
     "@prisma/client": "^6.6.0",
     "axios": "^1.8.4",
diff --git a/apps/canvas/back/src/index.ts b/apps/canvas/back/src/index.ts
index 4190f70..fdba5a0 100644
--- a/apps/canvas/back/src/index.ts
+++ b/apps/canvas/back/src/index.ts
@@ -6,7 +6,7 @@
 import { GithubClient } from "./github.js";
 import { AppManager } from "./app_manager.js";
 import { z } from "zod";
-import { ProjectMonitor, WorkerSchema } from "./project_monitor.js";
+import { ProjectMonitor, WorkerSchema, LogItem } from "./project_monitor.js";
 import tmp from "tmp";
 import { NodeJSAnalyzer } from "./lib/nodejs.js";
 import shell from "shelljs";
@@ -22,6 +22,7 @@
 	Network,
 	GithubRepository,
 } from "config";
+import { Instant, DateTimeFormatter, ZoneId } from "@js-joda/core";
 
 async function generateKey(root: string): Promise<[string, string]> {
 	const privKeyPath = path.join(root, "key");
@@ -721,41 +722,69 @@
 };
 
 const handleServiceLogs: express.Handler = async (req, resp) => {
+	const projectId = Number(req.params["projectId"]);
+	const service = req.params["service"];
+	const workerId = req.params["workerId"];
+
+	resp.setHeader("Content-Type", "text/event-stream");
+	resp.setHeader("Cache-Control", "no-cache");
+	resp.setHeader("Connection", "keep-alive");
+	resp.flushHeaders();
+
+	const timestampFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+	const sendLogs = (logs: LogItem[]) => {
+		if (logs.length == 0) {
+			return;
+		}
+		const logString = logs
+			.map((l) => {
+				const t = Instant.ofEpochMilli(l.timestampMilli);
+				const formattedTimestamp = t.atZone(ZoneId.UTC).format(timestampFormat);
+				return `\x1b[38;5;240m${formattedTimestamp}\x1b[0m ${l.contents}`;
+			})
+			.join("\n");
+		resp.write("event: message\n");
+		resp.write(`data: ${JSON.stringify({ logs: logString })}\n\n`);
+	};
+
 	try {
-		const projectId = Number(req.params["projectId"]);
-		const service = req.params["service"];
-		const workerId = req.params["workerId"];
 		const project = await db.project.findUnique({
-			where: {
-				id: projectId,
-				userId: resp.locals.userId,
-			},
+			where: { id: projectId, userId: resp.locals.userId },
 		});
-		if (project == null) {
-			resp.status(404);
-			resp.write(JSON.stringify({ error: "Project not found" }));
+
+		if (!project) {
+			resp.status(404).end();
 			return;
 		}
+
 		const monitor = projectMonitors.get(projectId);
-		if (!monitor || !monitor.hasLogs()) {
-			resp.status(404);
-			resp.write(JSON.stringify({ error: "No logs found for this project" }));
+		if (!monitor) {
+			resp.status(404).end();
 			return;
 		}
-		const serviceLog = monitor.getWorkerLog(service, workerId);
-		if (!serviceLog) {
-			resp.status(404);
-			resp.write(JSON.stringify({ error: "No logs found for this service/worker" }));
-			return;
-		}
-		resp.status(200);
-		resp.write(JSON.stringify({ logs: serviceLog }));
+
+		let lastLogCount = 0;
+		const initialLogs = monitor.getWorkerLog(service, workerId) || [];
+		sendLogs(initialLogs);
+		lastLogCount = initialLogs.length;
+		resp.flushHeaders();
+
+		const intervalId = setInterval(() => {
+			const currentLogs = monitor.getWorkerLog(service, workerId) || [];
+			if (currentLogs.length > lastLogCount) {
+				const newLogs = currentLogs.slice(lastLogCount);
+				sendLogs(newLogs);
+				lastLogCount = currentLogs.length;
+			}
+		}, 500);
+
+		req.on("close", () => {
+			clearInterval(intervalId);
+			resp.end();
+		});
 	} catch (e) {
 		console.log(e);
-		resp.status(500);
-		resp.write(JSON.stringify({ error: "Failed to get service logs" }));
-	} finally {
-		resp.end();
+		resp.status(500).end();
 	}
 };
 
@@ -783,6 +812,7 @@
 		resp.write(
 			JSON.stringify({
 				success: true,
+				logItemsConsumed: result.data.logs?.length ?? 0,
 			}),
 		);
 	} catch (e) {
diff --git a/apps/canvas/back/src/project_monitor.ts b/apps/canvas/back/src/project_monitor.ts
index b494854..0f4fffe 100644
--- a/apps/canvas/back/src/project_monitor.ts
+++ b/apps/canvas/back/src/project_monitor.ts
@@ -1,5 +1,21 @@
 import { z } from "zod";
 
+const LogItemSchema = z.object({
+	runId: z.string(),
+	timestampMilli: z.number(),
+	commit: z.string().optional(),
+	contents: z.preprocess((val) => {
+		if (typeof val === "string") {
+			return Buffer.from(val, "base64").toString("utf-8");
+		}
+		throw new Error("Log item contents is not a string");
+	}, z.string()),
+});
+
+export type LogItem = z.infer<typeof LogItemSchema>;
+
+const LogItemsSchema = z.array(LogItemSchema);
+
 export const WorkerSchema = z.object({
 	id: z.string(),
 	service: z.string(),
@@ -22,22 +38,32 @@
 			),
 		}),
 	),
-	logs: z.optional(z.string()),
+	logs: LogItemsSchema.optional(),
 });
 
 export type Worker = z.infer<typeof WorkerSchema>;
 
 class ServiceMonitor {
 	private workers: Map<string, string> = new Map();
-	private logs: Map<string, string> = new Map();
+	private logs: Map<string, LogItem[]> = new Map();
 	private statuses: Map<string, Worker["status"]> = new Map();
 
 	constructor(public readonly serviceName: string) {}
 
-	registerWorker(workerId: string, workerAddress: string, workerLog?: string, workerStatus?: Worker["status"]): void {
+	registerWorker(
+		workerId: string,
+		workerAddress: string,
+		workerLog?: LogItem[],
+		workerStatus?: Worker["status"],
+	): void {
 		this.workers.set(workerId, workerAddress);
 		if (workerLog) {
-			this.logs.set(workerId, workerLog);
+			const existingLogs = this.logs.get(workerId);
+			if (existingLogs) {
+				existingLogs.push(...workerLog);
+			} else {
+				this.logs.set(workerId, workerLog);
+			}
 		}
 		if (workerStatus) {
 			this.statuses.set(workerId, workerStatus);
@@ -48,7 +74,7 @@
 		return this.workers.get(workerId);
 	}
 
-	getWorkerLog(workerId: string): string | undefined {
+	getWorkerLog(workerId: string): LogItem[] | undefined {
 		return this.logs.get(workerId);
 	}
 
@@ -56,7 +82,7 @@
 		return this.statuses.get(workerId);
 	}
 
-	getAllLogs(): Map<string, string> {
+	getAllLogs(): Map<string, LogItem[]> {
 		return new Map(this.logs);
 	}
 
@@ -118,7 +144,7 @@
 		return Array.from(new Set(allAddresses));
 	}
 
-	getWorkerLog(serviceName: string, workerId: string): string | undefined {
+	getWorkerLog(serviceName: string, workerId: string): LogItem[] | undefined {
 		const serviceMonitor = this.serviceMonitors.get(serviceName);
 		if (serviceMonitor) {
 			return serviceMonitor.getWorkerLog(workerId);
diff --git a/apps/canvas/front/package-lock.json b/apps/canvas/front/package-lock.json
index 1913878..247a513 100644
--- a/apps/canvas/front/package-lock.json
+++ b/apps/canvas/front/package-lock.json
@@ -39,6 +39,8 @@
 				"tailwind-merge": "^2.5.4",
 				"tailwindcss-animate": "^1.0.7",
 				"uuid": "^11.0.2",
+				"xterm": "^5.3.0",
+				"xterm-addon-fit": "^0.8.0",
 				"zod": "^3.23.8",
 				"zustand": "^5.0.1"
 			},
@@ -7089,6 +7091,23 @@
 				"url": "https://github.com/chalk/ansi-styles?sponsor=1"
 			}
 		},
+		"node_modules/xterm": {
+			"version": "5.3.0",
+			"resolved": "https://registry.npmjs.org/xterm/-/xterm-5.3.0.tgz",
+			"integrity": "sha512-8QqjlekLUFTrU6x7xck1MsPzPA571K5zNqWm0M0oroYEWVOptZ0+ubQSkQ3uxIEhcIHRujJy6emDWX4A7qyFzg==",
+			"deprecated": "This package is now deprecated. Move to @xterm/xterm instead.",
+			"license": "MIT"
+		},
+		"node_modules/xterm-addon-fit": {
+			"version": "0.8.0",
+			"resolved": "https://registry.npmjs.org/xterm-addon-fit/-/xterm-addon-fit-0.8.0.tgz",
+			"integrity": "sha512-yj3Np7XlvxxhYF/EJ7p3KHaMt6OdwQ+HDu573Vx1lRXsVxOcnVJs51RgjZOouIZOczTsskaS+CpXspK81/DLqw==",
+			"deprecated": "This package is now deprecated. Move to @xterm/addon-fit instead.",
+			"license": "MIT",
+			"peerDependencies": {
+				"xterm": "^5.0.0"
+			}
+		},
 		"node_modules/yallist": {
 			"version": "3.1.1",
 			"resolved": "https://registry.npmjs.org/yallist/-/yallist-3.1.1.tgz",
diff --git a/apps/canvas/front/package.json b/apps/canvas/front/package.json
index 4f21636..ea53fe0 100644
--- a/apps/canvas/front/package.json
+++ b/apps/canvas/front/package.json
@@ -47,6 +47,8 @@
 		"tailwind-merge": "^2.5.4",
 		"tailwindcss-animate": "^1.0.7",
 		"uuid": "^11.0.2",
+		"xterm": "^5.3.0",
+		"xterm-addon-fit": "^0.8.0",
 		"zod": "^3.23.8",
 		"zustand": "^5.0.1"
 	},
diff --git a/apps/canvas/front/src/Monitoring.tsx b/apps/canvas/front/src/Monitoring.tsx
index 6f7446c..3bc0b63 100644
--- a/apps/canvas/front/src/Monitoring.tsx
+++ b/apps/canvas/front/src/Monitoring.tsx
@@ -1,4 +1,4 @@
-import { useCallback, useEffect, useState, useRef, useMemo } from "react";
+import { useCallback, useEffect, useState, useMemo } from "react";
 import { useProjectId, useEnv } from "@/lib/state";
 import { Accordion, AccordionContent, AccordionItem, AccordionTrigger } from "@/components/ui/accordion";
 import { Button } from "@/components/ui/button";
@@ -7,14 +7,7 @@
 import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from "@/components/ui/tooltip";
 import { useToast } from "@/hooks/use-toast";
 import { Check, Ellipsis, LoaderCircle, LogsIcon, X, RefreshCw } from "lucide-react";
-
-// ANSI escape sequence regex
-// eslint-disable-next-line no-control-regex
-const ANSI_ESCAPE_REGEX = /\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])/g;
-
-function cleanAnsiEscapeSequences(text: string): string {
-	return text.replace(ANSI_ESCAPE_REGEX, "");
-}
+import { XTerm } from "@/components/XTerm";
 
 export function Logs() {
 	const { toast } = useToast();
@@ -25,66 +18,44 @@
 	const [selectedWorkerIdForLogs, setSelectedWorkerIdForLogs] = useState<string | null>(null);
 
 	const [logs, setLogs] = useState<string>("");
-	const preRef = useRef<HTMLPreElement>(null);
-	const wasAtBottom = useRef(true);
-
-	const checkIfAtBottom = useCallback(() => {
-		if (!preRef.current) return;
-		const { scrollTop, scrollHeight, clientHeight } = preRef.current;
-		wasAtBottom.current = Math.abs(scrollHeight - clientHeight - scrollTop) < 10;
-	}, []);
-
-	const scrollToBottom = useCallback(() => {
-		if (!preRef.current) return;
-		preRef.current.scrollTop = preRef.current.scrollHeight;
-	}, []);
-
-	const fetchLogs = useCallback(
-		async (serviceName: string, workerId: string) => {
-			if (!projectId || !serviceName || !workerId) return;
-			try {
-				const resp = await fetch(`/api/project/${projectId}/logs/${serviceName}/${workerId}`);
-				if (!resp.ok) {
-					throw new Error(`Failed to fetch logs: ${resp.statusText}`);
-				}
-				const data = await resp.json();
-				setLogs(data.logs || "");
-			} catch (e) {
-				console.error(e);
-				toast({
-					variant: "destructive",
-					title: "Failed to fetch logs",
-				});
-			}
-		},
-		[projectId, toast],
-	);
 
 	useEffect(() => {
-		if (selectedServiceForLogs && selectedWorkerIdForLogs) {
-			fetchLogs(selectedServiceForLogs, selectedWorkerIdForLogs);
-			const interval = setInterval(() => {
-				fetchLogs(selectedServiceForLogs, selectedWorkerIdForLogs);
-			}, 5000);
-			return () => clearInterval(interval);
-		} else {
+		console.log("selectedServiceForLogs", selectedServiceForLogs);
+		if (!selectedServiceForLogs || !selectedWorkerIdForLogs || !projectId) {
 			setLogs("");
+			return;
 		}
-	}, [selectedServiceForLogs, selectedWorkerIdForLogs, fetchLogs]);
 
-	useEffect(() => {
-		const pre = preRef.current;
-		if (!pre) return;
-		const handleScroll = () => checkIfAtBottom();
-		pre.addEventListener("scroll", handleScroll);
-		return () => pre.removeEventListener("scroll", handleScroll);
-	}, [checkIfAtBottom]);
+		setLogs(""); // Clear logs for the new selection
 
-	useEffect(() => {
-		if (wasAtBottom.current) {
-			scrollToBottom();
-		}
-	}, [logs, scrollToBottom]);
+		const eventSource = new EventSource(
+			`/api/project/${projectId}/logs/${selectedServiceForLogs}/${selectedWorkerIdForLogs}`,
+		);
+
+		eventSource.onmessage = (event) => {
+			try {
+				const data = JSON.parse(event.data);
+				if (data.logs) {
+					setLogs((prevLogs) => prevLogs + data.logs + "\n");
+				}
+			} catch (e) {
+				console.error("Failed to parse log data:", e);
+			}
+		};
+
+		eventSource.onerror = () => {
+			toast({
+				variant: "destructive",
+				title: "Log stream error",
+				description: "Connection to the log stream has been closed.",
+			});
+			eventSource.close();
+		};
+
+		return () => {
+			eventSource.close();
+		};
+	}, [selectedServiceForLogs, selectedWorkerIdForLogs, projectId, toast]);
 
 	const sortedServices = useMemo(
 		() => (env?.services ? [...env.services].sort((a, b) => a.name.localeCompare(b.name)) : []),
@@ -285,13 +256,9 @@
 							<div className="p-2 border-b text-sm text-muted-foreground">
 								Logs for: {selectedServiceForLogs} / {selectedWorkerIdForLogs}
 							</div>
-							<pre
-								ref={preRef}
-								className="flex-1 h-full p-4 bg-muted overflow-auto font-['JetBrains_Mono'] text-xs whitespace-pre-wrap break-all"
-							>
-								{cleanAnsiEscapeSequences(logs) ||
-									`No logs available for ${selectedServiceForLogs} / ${selectedWorkerIdForLogs}.`}
-							</pre>
+							<div className="flex-1 h-full p-4 bg-muted overflow-auto">
+								<XTerm logs={logs} />
+							</div>
 						</>
 					) : (
 						<div className="flex-1 flex items-center justify-center h-full p-4 bg-muted text-sm text-gray-500">
diff --git a/apps/canvas/front/src/components/XTerm.tsx b/apps/canvas/front/src/components/XTerm.tsx
new file mode 100644
index 0000000..4f542f1
--- /dev/null
+++ b/apps/canvas/front/src/components/XTerm.tsx
@@ -0,0 +1,98 @@
+import { useEffect, useRef } from "react";
+import { Terminal } from "xterm";
+import { FitAddon } from "xterm-addon-fit";
+import "xterm/css/xterm.css";
+
+interface XTermProps {
+	logs: string;
+}
+
+const scrollbarHack = `
+.xterm-viewport {
+	scrollbar-width: auto;
+	scrollbar-color: #a0a0a0 #f1f5f9;
+}
+.xterm-viewport::-webkit-scrollbar {
+	width: 8px;
+}
+.xterm-viewport::-webkit-scrollbar-track {
+	background: #f1f5f9;
+}
+.xterm-viewport::-webkit-scrollbar-thumb {
+	background-color: #a0a0a0;
+	border-radius: 4px;
+	border: 2px solid #f1f5f9;
+}
+`;
+
+export function XTerm({ logs }: XTermProps) {
+	const termRef = useRef<HTMLDivElement>(null);
+	const termInstance = useRef<Terminal | null>(null);
+	const fitAddon = useRef<FitAddon | null>(null);
+	const prevLogs = useRef<string>("");
+
+	useEffect(() => {
+		if (termRef.current && !termInstance.current) {
+			const term = new Terminal({
+				disableStdin: true,
+				convertEol: true,
+				fontFamily: `'JetBrains Mono', monospace`,
+				fontSize: 12,
+				theme: {
+					background: "#f1f5f9", // bg-muted color
+					foreground: "#020817", // text-foreground color
+					cursor: "#f1f5f9",
+					selectionBackground: "#bfdbfe", // blue-200
+					selectionInactiveBackground: "#e2e8f0", // slate-200
+				},
+			});
+			const addon = new FitAddon();
+			fitAddon.current = addon;
+			term.loadAddon(addon);
+			term.open(termRef.current);
+			addon.fit();
+			termInstance.current = term;
+
+			const resizeObserver = new ResizeObserver(() => {
+				fitAddon.current?.fit();
+			});
+			resizeObserver.observe(termRef.current);
+
+			return () => {
+				resizeObserver.disconnect();
+				term.dispose();
+			};
+		}
+	}, []);
+
+	useEffect(() => {
+		if (termInstance.current) {
+			if (logs === "") {
+				termInstance.current.clear();
+				prevLogs.current = "";
+				return;
+			}
+
+			const buffer = termInstance.current.buffer.active;
+			const wasAtBottom = buffer.viewportY + termInstance.current.rows >= buffer.length;
+
+			const newLogContent = logs.substring(prevLogs.current.length);
+			prevLogs.current = logs;
+
+			if (newLogContent) {
+				termInstance.current.write(newLogContent, () => {
+					if (wasAtBottom) {
+						termInstance.current?.scrollToBottom();
+					}
+				});
+			}
+		}
+	}, [logs]);
+
+	return (
+		<>
+			<style>{scrollbarHack}</style>
+			<div ref={termRef} className="h-full w-full" />
+		</>
+	);
+}