Canvas: Render logs using XTerm

Use Server Sent Events to stream logs.

Change-Id: I3790a22a39b71409636a81dbe2a2cc8bf4977cb4
diff --git a/apps/canvas/back/src/index.ts b/apps/canvas/back/src/index.ts
index 4190f70..fdba5a0 100644
--- a/apps/canvas/back/src/index.ts
+++ b/apps/canvas/back/src/index.ts
@@ -6,7 +6,7 @@
 import { GithubClient } from "./github.js";
 import { AppManager } from "./app_manager.js";
 import { z } from "zod";
-import { ProjectMonitor, WorkerSchema } from "./project_monitor.js";
+import { ProjectMonitor, WorkerSchema, LogItem } from "./project_monitor.js";
 import tmp from "tmp";
 import { NodeJSAnalyzer } from "./lib/nodejs.js";
 import shell from "shelljs";
@@ -22,6 +22,7 @@
 	Network,
 	GithubRepository,
 } from "config";
+import { Instant, DateTimeFormatter, ZoneId } from "@js-joda/core";
 
 async function generateKey(root: string): Promise<[string, string]> {
 	const privKeyPath = path.join(root, "key");
@@ -721,41 +722,69 @@
 };
 
 const handleServiceLogs: express.Handler = async (req, resp) => {
+	const projectId = Number(req.params["projectId"]);
+	const service = req.params["service"];
+	const workerId = req.params["workerId"];
+
+	resp.setHeader("Content-Type", "text/event-stream");
+	resp.setHeader("Cache-Control", "no-cache");
+	resp.setHeader("Connection", "keep-alive");
+	resp.flushHeaders();
+
+	const timestampFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+	const sendLogs = (logs: LogItem[]) => {
+		if (logs.length == 0) {
+			return;
+		}
+		const logString = logs
+			.map((l) => {
+				const t = Instant.ofEpochMilli(l.timestampMilli);
+				const formattedTimestamp = t.atZone(ZoneId.UTC).format(timestampFormat);
+				return `\x1b[38;5;240m${formattedTimestamp}\x1b[0m ${l.contents}`;
+			})
+			.join("\n");
+		resp.write("event: message\n");
+		resp.write(`data: ${JSON.stringify({ logs: logString })}\n\n`);
+	};
+
 	try {
-		const projectId = Number(req.params["projectId"]);
-		const service = req.params["service"];
-		const workerId = req.params["workerId"];
 		const project = await db.project.findUnique({
-			where: {
-				id: projectId,
-				userId: resp.locals.userId,
-			},
+			where: { id: projectId, userId: resp.locals.userId },
 		});
-		if (project == null) {
-			resp.status(404);
-			resp.write(JSON.stringify({ error: "Project not found" }));
+
+		if (!project) {
+			resp.status(404).end();
 			return;
 		}
+
 		const monitor = projectMonitors.get(projectId);
-		if (!monitor || !monitor.hasLogs()) {
-			resp.status(404);
-			resp.write(JSON.stringify({ error: "No logs found for this project" }));
+		if (!monitor) {
+			resp.status(404).end();
 			return;
 		}
-		const serviceLog = monitor.getWorkerLog(service, workerId);
-		if (!serviceLog) {
-			resp.status(404);
-			resp.write(JSON.stringify({ error: "No logs found for this service/worker" }));
-			return;
-		}
-		resp.status(200);
-		resp.write(JSON.stringify({ logs: serviceLog }));
+
+		let lastLogCount = 0;
+		const initialLogs = monitor.getWorkerLog(service, workerId) || [];
+		sendLogs(initialLogs);
+		lastLogCount = initialLogs.length;
+		resp.flushHeaders();
+
+		const intervalId = setInterval(() => {
+			const currentLogs = monitor.getWorkerLog(service, workerId) || [];
+			if (currentLogs.length > lastLogCount) {
+				const newLogs = currentLogs.slice(lastLogCount);
+				sendLogs(newLogs);
+				lastLogCount = currentLogs.length;
+			}
+		}, 500);
+
+		req.on("close", () => {
+			clearInterval(intervalId);
+			resp.end();
+		});
 	} catch (e) {
 		console.log(e);
-		resp.status(500);
-		resp.write(JSON.stringify({ error: "Failed to get service logs" }));
-	} finally {
-		resp.end();
+		resp.status(500).end();
 	}
 };
 
@@ -783,6 +812,7 @@
 		resp.write(
 			JSON.stringify({
 				success: true,
+				logItemsConsumed: result.data.logs?.length ?? 0,
 			}),
 		);
 	} catch (e) {
diff --git a/apps/canvas/back/src/project_monitor.ts b/apps/canvas/back/src/project_monitor.ts
index b494854..0f4fffe 100644
--- a/apps/canvas/back/src/project_monitor.ts
+++ b/apps/canvas/back/src/project_monitor.ts
@@ -1,5 +1,21 @@
 import { z } from "zod";
 
+const LogItemSchema = z.object({
+	runId: z.string(),
+	timestampMilli: z.number(),
+	commit: z.string().optional(),
+	contents: z.preprocess((val) => {
+		if (typeof val === "string") {
+			return Buffer.from(val, "base64").toString("utf-8");
+		}
+		throw new Error("Log item contents is not a string");
+	}, z.string()),
+});
+
+export type LogItem = z.infer<typeof LogItemSchema>;
+
+const LogItemsSchema = z.array(LogItemSchema);
+
 export const WorkerSchema = z.object({
 	id: z.string(),
 	service: z.string(),
@@ -22,22 +38,32 @@
 			),
 		}),
 	),
-	logs: z.optional(z.string()),
+	logs: LogItemsSchema.optional(),
 });
 
 export type Worker = z.infer<typeof WorkerSchema>;
 
 class ServiceMonitor {
 	private workers: Map<string, string> = new Map();
-	private logs: Map<string, string> = new Map();
+	private logs: Map<string, LogItem[]> = new Map();
 	private statuses: Map<string, Worker["status"]> = new Map();
 
 	constructor(public readonly serviceName: string) {}
 
-	registerWorker(workerId: string, workerAddress: string, workerLog?: string, workerStatus?: Worker["status"]): void {
+	registerWorker(
+		workerId: string,
+		workerAddress: string,
+		workerLog?: LogItem[],
+		workerStatus?: Worker["status"],
+	): void {
 		this.workers.set(workerId, workerAddress);
 		if (workerLog) {
-			this.logs.set(workerId, workerLog);
+			const existingLogs = this.logs.get(workerId);
+			if (existingLogs) {
+				existingLogs.push(...workerLog);
+			} else {
+				this.logs.set(workerId, workerLog);
+			}
 		}
 		if (workerStatus) {
 			this.statuses.set(workerId, workerStatus);
@@ -48,7 +74,7 @@
 		return this.workers.get(workerId);
 	}
 
-	getWorkerLog(workerId: string): string | undefined {
+	getWorkerLog(workerId: string): LogItem[] | undefined {
 		return this.logs.get(workerId);
 	}
 
@@ -56,7 +82,7 @@
 		return this.statuses.get(workerId);
 	}
 
-	getAllLogs(): Map<string, string> {
+	getAllLogs(): Map<string, LogItem[]> {
 		return new Map(this.logs);
 	}
 
@@ -118,7 +144,7 @@
 		return Array.from(new Set(allAddresses));
 	}
 
-	getWorkerLog(serviceName: string, workerId: string): string | undefined {
+	getWorkerLog(serviceName: string, workerId: string): LogItem[] | undefined {
 		const serviceMonitor = this.serviceMonitors.get(serviceName);
 		if (serviceMonitor) {
 			return serviceMonitor.getWorkerLog(workerId);