Canvas: Persistent log storage
Change-Id: I3eac705329e6d68d8e4b9a371c6e9b9807f357ec
diff --git a/apps/canvas/back/prisma/migrations/20250702123930_logs/migration.sql b/apps/canvas/back/prisma/migrations/20250702123930_logs/migration.sql
new file mode 100644
index 0000000..cbfe7f9
--- /dev/null
+++ b/apps/canvas/back/prisma/migrations/20250702123930_logs/migration.sql
@@ -0,0 +1,12 @@
+-- CreateTable
+CREATE TABLE "Log" (
+ "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ "projectId" INTEGER NOT NULL,
+ "timestampMilli" INTEGER NOT NULL,
+ "contents" TEXT NOT NULL,
+ "commit" TEXT,
+ "serviceName" TEXT NOT NULL,
+ "workerId" TEXT NOT NULL,
+ "runId" TEXT NOT NULL,
+ CONSTRAINT "Log_projectId_fkey" FOREIGN KEY ("projectId") REFERENCES "Project" ("id") ON DELETE RESTRICT ON UPDATE CASCADE
+);
diff --git a/apps/canvas/back/prisma/migrations/20250702125305_/migration.sql b/apps/canvas/back/prisma/migrations/20250702125305_/migration.sql
new file mode 100644
index 0000000..f3dfec3
--- /dev/null
+++ b/apps/canvas/back/prisma/migrations/20250702125305_/migration.sql
@@ -0,0 +1,25 @@
+/*
+ Warnings:
+
+ - You are about to alter the column `timestampMilli` on the `Log` table. The data in that column could be lost. The data in that column will be cast from `Int` to `BigInt`.
+
+*/
+-- RedefineTables
+PRAGMA defer_foreign_keys=ON;
+PRAGMA foreign_keys=OFF;
+CREATE TABLE "new_Log" (
+ "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ "projectId" INTEGER NOT NULL,
+ "timestampMilli" BIGINT NOT NULL,
+ "contents" TEXT NOT NULL,
+ "commit" TEXT,
+ "serviceName" TEXT NOT NULL,
+ "workerId" TEXT NOT NULL,
+ "runId" TEXT NOT NULL,
+ CONSTRAINT "Log_projectId_fkey" FOREIGN KEY ("projectId") REFERENCES "Project" ("id") ON DELETE RESTRICT ON UPDATE CASCADE
+);
+INSERT INTO "new_Log" ("commit", "contents", "id", "projectId", "runId", "serviceName", "timestampMilli", "workerId") SELECT "commit", "contents", "id", "projectId", "runId", "serviceName", "timestampMilli", "workerId" FROM "Log";
+DROP TABLE "Log";
+ALTER TABLE "new_Log" RENAME TO "Log";
+PRAGMA foreign_keys=ON;
+PRAGMA defer_foreign_keys=OFF;
diff --git a/apps/canvas/back/prisma/schema.prisma b/apps/canvas/back/prisma/schema.prisma
index 07482f3..adc03cf 100644
--- a/apps/canvas/back/prisma/schema.prisma
+++ b/apps/canvas/back/prisma/schema.prisma
@@ -25,4 +25,17 @@
githubToken String?
access String?
geminiApiKey String?
+ logs Log[]
+}
+
+model Log {
+ id Int @id @default(autoincrement())
+ projectId Int
+ project Project @relation(fields: [projectId], references: [id])
+ timestampMilli BigInt
+ contents String
+ commit String?
+ serviceName String
+ workerId String
+ runId String
}
\ No newline at end of file
diff --git a/apps/canvas/back/src/index.ts b/apps/canvas/back/src/index.ts
index fdba5a0..f14cfd1 100644
--- a/apps/canvas/back/src/index.ts
+++ b/apps/canvas/back/src/index.ts
@@ -23,6 +23,7 @@
GithubRepository,
} from "config";
import { Instant, DateTimeFormatter, ZoneId } from "@js-joda/core";
+import LogStore from "./log.js";
async function generateKey(root: string): Promise<[string, string]> {
const privKeyPath = path.join(root, "key");
@@ -36,6 +37,7 @@
}
const db = new PrismaClient();
+const logStore = new LogStore(db);
const appManager = new AppManager();
const projectMonitors = new Map<number, ProjectMonitor>();
@@ -763,18 +765,19 @@
return;
}
- let lastLogCount = 0;
- const initialLogs = monitor.getWorkerLog(service, workerId) || [];
+ let lastLogId: number | undefined = undefined;
+ const initialLogs = (await logStore.get(projectId, service, workerId)) || [];
sendLogs(initialLogs);
- lastLogCount = initialLogs.length;
+ if (initialLogs.length > 0) {
+ lastLogId = initialLogs[initialLogs.length - 1].id;
+ }
resp.flushHeaders();
- const intervalId = setInterval(() => {
- const currentLogs = monitor.getWorkerLog(service, workerId) || [];
- if (currentLogs.length > lastLogCount) {
- const newLogs = currentLogs.slice(lastLogCount);
- sendLogs(newLogs);
- lastLogCount = currentLogs.length;
+ const intervalId = setInterval(async () => {
+ const currentLogs = (await logStore.get(projectId, service, workerId, lastLogId)) || [];
+ if (currentLogs.length > 0) {
+ sendLogs(currentLogs);
+ lastLogId = currentLogs[currentLogs.length - 1].id;
}
}, 500);
@@ -808,6 +811,9 @@
projectMonitors.set(projectId, monitor);
}
monitor.registerWorker(result.data);
+ if (result.data.logs) {
+ await logStore.store(projectId, result.data.service, result.data.id, result.data.logs);
+ }
resp.status(200);
resp.write(
JSON.stringify({
diff --git a/apps/canvas/back/src/log.ts b/apps/canvas/back/src/log.ts
new file mode 100644
index 0000000..68dc005
--- /dev/null
+++ b/apps/canvas/back/src/log.ts
@@ -0,0 +1,51 @@
+import { Prisma, PrismaClient } from "@prisma/client";
+import { LogItem } from "./project_monitor";
+
+type LogRecord = LogItem & {
+ id: number;
+};
+
+class LogStore {
+ constructor(private prisma: PrismaClient) {}
+
+ async store(projectId: number, serviceName: string, workerId: string, logs: LogItem[]) {
+ await this.prisma.log.createMany({
+ data: logs.map((log) => ({
+ projectId,
+ serviceName,
+ workerId,
+ runId: log.runId,
+ commit: log.commit ?? undefined,
+ contents: log.contents,
+ timestampMilli: log.timestampMilli,
+ })),
+ });
+ }
+
+ async get(
+ projectId: number,
+ serviceName: string,
+ workerId: string,
+ afterId?: number,
+ numRecords?: number,
+ ): Promise<LogRecord[]> {
+ const where: Prisma.LogWhereInput = { projectId, serviceName, workerId };
+ if (afterId) {
+ where.id = { gt: afterId };
+ }
+ const logs = await this.prisma.log.findMany({
+ where,
+ orderBy: { timestampMilli: "asc" },
+ take: numRecords ?? 100,
+ });
+ return logs.map((log) => ({
+ id: log.id,
+ timestampMilli: Number(log.timestampMilli),
+ contents: log.contents,
+ runId: log.runId,
+ commit: log.commit ?? undefined,
+ }));
+ }
+}
+
+export default LogStore;
diff --git a/apps/canvas/back/src/project_monitor.ts b/apps/canvas/back/src/project_monitor.ts
index 0f4fffe..4b234a5 100644
--- a/apps/canvas/back/src/project_monitor.ts
+++ b/apps/canvas/back/src/project_monitor.ts
@@ -50,21 +50,8 @@
constructor(public readonly serviceName: string) {}
- registerWorker(
- workerId: string,
- workerAddress: string,
- workerLog?: LogItem[],
- workerStatus?: Worker["status"],
- ): void {
+ registerWorker(workerId: string, workerAddress: string, workerStatus?: Worker["status"]): void {
this.workers.set(workerId, workerAddress);
- if (workerLog) {
- const existingLogs = this.logs.get(workerId);
- if (existingLogs) {
- existingLogs.push(...workerLog);
- } else {
- this.logs.set(workerId, workerLog);
- }
- }
if (workerStatus) {
this.statuses.set(workerId, workerStatus);
}
@@ -133,7 +120,7 @@
serviceMonitor = new ServiceMonitor(workerData.service);
this.serviceMonitors.set(workerData.service, serviceMonitor);
}
- serviceMonitor.registerWorker(workerData.id, workerData.address, workerData.logs, workerData.status);
+ serviceMonitor.registerWorker(workerData.id, workerData.address, workerData.status);
}
getWorkerAddresses(): string[] {