Canvas: Render logs using XTerm
Use Server Sent Events to stream logs.
Change-Id: I3790a22a39b71409636a81dbe2a2cc8bf4977cb4
diff --git a/apps/canvas/back/src/index.ts b/apps/canvas/back/src/index.ts
index 4190f70..fdba5a0 100644
--- a/apps/canvas/back/src/index.ts
+++ b/apps/canvas/back/src/index.ts
@@ -6,7 +6,7 @@
import { GithubClient } from "./github.js";
import { AppManager } from "./app_manager.js";
import { z } from "zod";
-import { ProjectMonitor, WorkerSchema } from "./project_monitor.js";
+import { ProjectMonitor, WorkerSchema, LogItem } from "./project_monitor.js";
import tmp from "tmp";
import { NodeJSAnalyzer } from "./lib/nodejs.js";
import shell from "shelljs";
@@ -22,6 +22,7 @@
Network,
GithubRepository,
} from "config";
+import { Instant, DateTimeFormatter, ZoneId } from "@js-joda/core";
async function generateKey(root: string): Promise<[string, string]> {
const privKeyPath = path.join(root, "key");
@@ -721,41 +722,69 @@
};
const handleServiceLogs: express.Handler = async (req, resp) => {
+ const projectId = Number(req.params["projectId"]);
+ const service = req.params["service"];
+ const workerId = req.params["workerId"];
+
+ resp.setHeader("Content-Type", "text/event-stream");
+ resp.setHeader("Cache-Control", "no-cache");
+ resp.setHeader("Connection", "keep-alive");
+ resp.flushHeaders();
+
+ const timestampFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ const sendLogs = (logs: LogItem[]) => {
+ if (logs.length == 0) {
+ return;
+ }
+ const logString = logs
+ .map((l) => {
+ const t = Instant.ofEpochMilli(l.timestampMilli);
+ const formattedTimestamp = t.atZone(ZoneId.UTC).format(timestampFormat);
+ return `\x1b[38;5;240m${formattedTimestamp}\x1b[0m ${l.contents}`;
+ })
+ .join("\n");
+ resp.write("event: message\n");
+ resp.write(`data: ${JSON.stringify({ logs: logString })}\n\n`);
+ };
+
try {
- const projectId = Number(req.params["projectId"]);
- const service = req.params["service"];
- const workerId = req.params["workerId"];
const project = await db.project.findUnique({
- where: {
- id: projectId,
- userId: resp.locals.userId,
- },
+ where: { id: projectId, userId: resp.locals.userId },
});
- if (project == null) {
- resp.status(404);
- resp.write(JSON.stringify({ error: "Project not found" }));
+
+ if (!project) {
+ resp.status(404).end();
return;
}
+
const monitor = projectMonitors.get(projectId);
- if (!monitor || !monitor.hasLogs()) {
- resp.status(404);
- resp.write(JSON.stringify({ error: "No logs found for this project" }));
+ if (!monitor) {
+ resp.status(404).end();
return;
}
- const serviceLog = monitor.getWorkerLog(service, workerId);
- if (!serviceLog) {
- resp.status(404);
- resp.write(JSON.stringify({ error: "No logs found for this service/worker" }));
- return;
- }
- resp.status(200);
- resp.write(JSON.stringify({ logs: serviceLog }));
+
+ let lastLogCount = 0;
+ const initialLogs = monitor.getWorkerLog(service, workerId) || [];
+ sendLogs(initialLogs);
+ lastLogCount = initialLogs.length;
+ resp.flushHeaders();
+
+ const intervalId = setInterval(() => {
+ const currentLogs = monitor.getWorkerLog(service, workerId) || [];
+ if (currentLogs.length > lastLogCount) {
+ const newLogs = currentLogs.slice(lastLogCount);
+ sendLogs(newLogs);
+ lastLogCount = currentLogs.length;
+ }
+ }, 500);
+
+ req.on("close", () => {
+ clearInterval(intervalId);
+ resp.end();
+ });
} catch (e) {
console.log(e);
- resp.status(500);
- resp.write(JSON.stringify({ error: "Failed to get service logs" }));
- } finally {
- resp.end();
+ resp.status(500).end();
}
};
@@ -783,6 +812,7 @@
resp.write(
JSON.stringify({
success: true,
+ logItemsConsumed: result.data.logs?.length ?? 0,
}),
);
} catch (e) {
diff --git a/apps/canvas/back/src/project_monitor.ts b/apps/canvas/back/src/project_monitor.ts
index b494854..0f4fffe 100644
--- a/apps/canvas/back/src/project_monitor.ts
+++ b/apps/canvas/back/src/project_monitor.ts
@@ -1,5 +1,21 @@
import { z } from "zod";
+const LogItemSchema = z.object({
+ runId: z.string(),
+ timestampMilli: z.number(),
+ commit: z.string().optional(),
+ contents: z.preprocess((val) => {
+ if (typeof val === "string") {
+ return Buffer.from(val, "base64").toString("utf-8");
+ }
+ throw new Error("Log item contents is not a string");
+ }, z.string()),
+});
+
+export type LogItem = z.infer<typeof LogItemSchema>;
+
+const LogItemsSchema = z.array(LogItemSchema);
+
export const WorkerSchema = z.object({
id: z.string(),
service: z.string(),
@@ -22,22 +38,32 @@
),
}),
),
- logs: z.optional(z.string()),
+ logs: LogItemsSchema.optional(),
});
export type Worker = z.infer<typeof WorkerSchema>;
class ServiceMonitor {
private workers: Map<string, string> = new Map();
- private logs: Map<string, string> = new Map();
+ private logs: Map<string, LogItem[]> = new Map();
private statuses: Map<string, Worker["status"]> = new Map();
constructor(public readonly serviceName: string) {}
- registerWorker(workerId: string, workerAddress: string, workerLog?: string, workerStatus?: Worker["status"]): void {
+ registerWorker(
+ workerId: string,
+ workerAddress: string,
+ workerLog?: LogItem[],
+ workerStatus?: Worker["status"],
+ ): void {
this.workers.set(workerId, workerAddress);
if (workerLog) {
- this.logs.set(workerId, workerLog);
+ const existingLogs = this.logs.get(workerId);
+ if (existingLogs) {
+ existingLogs.push(...workerLog);
+ } else {
+ this.logs.set(workerId, workerLog);
+ }
}
if (workerStatus) {
this.statuses.set(workerId, workerStatus);
@@ -48,7 +74,7 @@
return this.workers.get(workerId);
}
- getWorkerLog(workerId: string): string | undefined {
+ getWorkerLog(workerId: string): LogItem[] | undefined {
return this.logs.get(workerId);
}
@@ -56,7 +82,7 @@
return this.statuses.get(workerId);
}
- getAllLogs(): Map<string, string> {
+ getAllLogs(): Map<string, LogItem[]> {
return new Map(this.logs);
}
@@ -118,7 +144,7 @@
return Array.from(new Set(allAddresses));
}
- getWorkerLog(serviceName: string, workerId: string): string | undefined {
+ getWorkerLog(serviceName: string, workerId: string): LogItem[] | undefined {
const serviceMonitor = this.serviceMonitors.get(serviceName);
if (serviceMonitor) {
return serviceMonitor.getWorkerLog(workerId);