Canvas: Persistent log storage

Change-Id: I3eac705329e6d68d8e4b9a371c6e9b9807f357ec
diff --git a/apps/canvas/back/src/index.ts b/apps/canvas/back/src/index.ts
index fdba5a0..f14cfd1 100644
--- a/apps/canvas/back/src/index.ts
+++ b/apps/canvas/back/src/index.ts
@@ -23,6 +23,7 @@
 	GithubRepository,
 } from "config";
 import { Instant, DateTimeFormatter, ZoneId } from "@js-joda/core";
+import LogStore from "./log.js";
 
 async function generateKey(root: string): Promise<[string, string]> {
 	const privKeyPath = path.join(root, "key");
@@ -36,6 +37,7 @@
 }
 
 const db = new PrismaClient();
+const logStore = new LogStore(db);
 const appManager = new AppManager();
 
 const projectMonitors = new Map<number, ProjectMonitor>();
@@ -763,18 +765,19 @@
 			return;
 		}
 
-		let lastLogCount = 0;
-		const initialLogs = monitor.getWorkerLog(service, workerId) || [];
+		let lastLogId: number | undefined = undefined;
+		const initialLogs = (await logStore.get(projectId, service, workerId)) || [];
 		sendLogs(initialLogs);
-		lastLogCount = initialLogs.length;
+		if (initialLogs.length > 0) {
+			lastLogId = initialLogs[initialLogs.length - 1].id;
+		}
 		resp.flushHeaders();
 
-		const intervalId = setInterval(() => {
-			const currentLogs = monitor.getWorkerLog(service, workerId) || [];
-			if (currentLogs.length > lastLogCount) {
-				const newLogs = currentLogs.slice(lastLogCount);
-				sendLogs(newLogs);
-				lastLogCount = currentLogs.length;
+		const intervalId = setInterval(async () => {
+			const currentLogs = (await logStore.get(projectId, service, workerId, lastLogId)) || [];
+			if (currentLogs.length > 0) {
+				sendLogs(currentLogs);
+				lastLogId = currentLogs[currentLogs.length - 1].id;
 			}
 		}, 500);
 
@@ -808,6 +811,9 @@
 			projectMonitors.set(projectId, monitor);
 		}
 		monitor.registerWorker(result.data);
+		if (result.data.logs) {
+			await logStore.store(projectId, result.data.service, result.data.id, result.data.logs);
+		}
 		resp.status(200);
 		resp.write(
 			JSON.stringify({
diff --git a/apps/canvas/back/src/log.ts b/apps/canvas/back/src/log.ts
new file mode 100644
index 0000000..68dc005
--- /dev/null
+++ b/apps/canvas/back/src/log.ts
@@ -0,0 +1,51 @@
+import { Prisma, PrismaClient } from "@prisma/client";
+import { LogItem } from "./project_monitor";
+
+type LogRecord = LogItem & {
+	id: number;
+};
+
+class LogStore {
+	constructor(private prisma: PrismaClient) {}
+
+	async store(projectId: number, serviceName: string, workerId: string, logs: LogItem[]) {
+		await this.prisma.log.createMany({
+			data: logs.map((log) => ({
+				projectId,
+				serviceName,
+				workerId,
+				runId: log.runId,
+				commit: log.commit ?? undefined,
+				contents: log.contents,
+				timestampMilli: log.timestampMilli,
+			})),
+		});
+	}
+
+	async get(
+		projectId: number,
+		serviceName: string,
+		workerId: string,
+		afterId?: number,
+		numRecords?: number,
+	): Promise<LogRecord[]> {
+		const where: Prisma.LogWhereInput = { projectId, serviceName, workerId };
+		if (afterId) {
+			where.id = { gt: afterId };
+		}
+		const logs = await this.prisma.log.findMany({
+			where,
+			orderBy: { timestampMilli: "asc" },
+			take: numRecords ?? 100,
+		});
+		return logs.map((log) => ({
+			id: log.id,
+			timestampMilli: Number(log.timestampMilli),
+			contents: log.contents,
+			runId: log.runId,
+			commit: log.commit ?? undefined,
+		}));
+	}
+}
+
+export default LogStore;
diff --git a/apps/canvas/back/src/project_monitor.ts b/apps/canvas/back/src/project_monitor.ts
index 0f4fffe..4b234a5 100644
--- a/apps/canvas/back/src/project_monitor.ts
+++ b/apps/canvas/back/src/project_monitor.ts
@@ -50,21 +50,8 @@
 
 	constructor(public readonly serviceName: string) {}
 
-	registerWorker(
-		workerId: string,
-		workerAddress: string,
-		workerLog?: LogItem[],
-		workerStatus?: Worker["status"],
-	): void {
+	registerWorker(workerId: string, workerAddress: string, workerStatus?: Worker["status"]): void {
 		this.workers.set(workerId, workerAddress);
-		if (workerLog) {
-			const existingLogs = this.logs.get(workerId);
-			if (existingLogs) {
-				existingLogs.push(...workerLog);
-			} else {
-				this.logs.set(workerId, workerLog);
-			}
-		}
 		if (workerStatus) {
 			this.statuses.set(workerId, workerStatus);
 		}
@@ -133,7 +120,7 @@
 			serviceMonitor = new ServiceMonitor(workerData.service);
 			this.serviceMonitors.set(workerData.service, serviceMonitor);
 		}
-		serviceMonitor.registerWorker(workerData.id, workerData.address, workerData.logs, workerData.status);
+		serviceMonitor.registerWorker(workerData.id, workerData.address, workerData.status);
 	}
 
 	getWorkerAddresses(): string[] {