Canvas: Implement streaming state updates

Change-Id: I2bc5a51b5792839bde93f927f5ffea22b3250fe2
diff --git a/apps/canvas/back/src/index.ts b/apps/canvas/back/src/index.ts
index cc8d9ef..38122f8 100644
--- a/apps/canvas/back/src/index.ts
+++ b/apps/canvas/back/src/index.ts
@@ -16,11 +16,11 @@
 	Env,
 	generateDodoConfig,
 	ConfigSchema,
-	AppNode,
 	ConfigWithInput,
 	configToGraph,
 	Network,
 	GithubRepository,
+	Graph,
 } from "config";
 import { Instant, DateTimeFormatter, ZoneId } from "@js-joda/core";
 import LogStore from "./log.js";
@@ -114,63 +114,62 @@
 	}
 };
 
+async function getState(projectId: number, userId: string, state: "deploy" | "draft"): Promise<Graph | null> {
+	const r = await db.project.findUnique({
+		where: {
+			id: projectId,
+			userId: userId,
+		},
+		select: {
+			state: true,
+			draft: true,
+		},
+	});
+	if (r == null) {
+		return null;
+	}
+	let currentState: Graph | null = null;
+	if (state === "deploy") {
+		if (r.state != null) {
+			currentState = JSON.parse(Buffer.from(r.state).toString("utf8"));
+		}
+	} else {
+		if (r.draft == null) {
+			if (r.state == null) {
+				currentState = {
+					nodes: [],
+					edges: [],
+					viewport: { x: 0, y: 0, zoom: 1 },
+				};
+			} else {
+				currentState = JSON.parse(Buffer.from(r.state).toString("utf8"));
+			}
+		} else {
+			currentState = JSON.parse(Buffer.from(r.draft).toString("utf8"));
+		}
+	}
+	return currentState;
+}
+
 function handleSavedGet(state: "deploy" | "draft"): express.Handler {
 	return async (req, resp) => {
 		try {
-			const r = await db.project.findUnique({
-				where: {
-					id: Number(req.params["projectId"]),
-					userId: resp.locals.userId,
-				},
-				select: {
-					state: true,
-					draft: true,
-				},
-			});
-			if (r == null) {
+			const projectId = Number(req.params["projectId"]);
+			const graph = await getState(projectId, resp.locals.userId, state);
+			if (graph == null) {
 				resp.status(404);
 				return;
 			}
+			const env = await getEnv(projectId, resp.locals.userId, resp.locals.username);
+			const config = generateDodoConfig(projectId.toString(), graph.nodes, env);
 			resp.status(200);
 			resp.header("content-type", "application/json");
-			let currentState: Record<string, unknown> | null = null;
-			if (state === "deploy") {
-				if (r.state == null) {
-					currentState = {
-						nodes: [],
-						edges: [],
-						viewport: { x: 0, y: 0, zoom: 1 },
-					};
-				} else {
-					currentState = JSON.parse(Buffer.from(r.state).toString("utf8"));
-				}
-			} else {
-				if (r.draft == null) {
-					if (r.state == null) {
-						currentState = {
-							nodes: [],
-							edges: [],
-							viewport: { x: 0, y: 0, zoom: 1 },
-						};
-					} else {
-						currentState = JSON.parse(Buffer.from(r.state).toString("utf8"));
-					}
-				} else {
-					currentState = JSON.parse(Buffer.from(r.draft).toString("utf8"));
-				}
-			}
-			const env = await getEnv(Number(req.params["projectId"]), resp.locals.userId, resp.locals.username);
-			if (currentState) {
-				const config = generateDodoConfig(
-					req.params["projectId"].toString(),
-					currentState.nodes as AppNode[],
-					env,
-				);
-				resp.send({
-					state: currentState,
+			resp.write(
+				JSON.stringify({
+					state: graph,
 					config,
-				});
-			}
+				}),
+			);
 		} catch (e) {
 			console.log(e);
 			resp.status(500);
@@ -432,6 +431,60 @@
 	}
 };
 
+const handleSaveFromConfig: express.Handler = async (req, resp) => {
+	try {
+		const projectId = Number(req.params["projectId"]);
+		const p = await db.project.findUnique({
+			where: {
+				id: projectId,
+				// userId: resp.locals.userId, TODO(gio): validate
+			},
+			select: {
+				instanceId: true,
+				githubToken: true,
+				deployKey: true,
+				deployKeyPublic: true,
+				state: true,
+				geminiApiKey: true,
+				anthropicApiKey: true,
+			},
+		});
+		if (p === null) {
+			resp.status(404);
+			return;
+		}
+		const config = ConfigSchema.safeParse(req.body.config);
+		if (!config.success) {
+			resp.status(400);
+			resp.write(JSON.stringify({ error: "Invalid configuration", issues: config.error.format() }));
+			return;
+		}
+		let repos: GithubRepository[] = [];
+		if (p.githubToken) {
+			const github = new GithubClient(p.githubToken);
+			repos = await github.getRepositories();
+		}
+		const state = JSON.stringify(
+			configToGraph(
+				config.data,
+				getNetworks(resp.locals.username),
+				repos,
+				p.state ? JSON.parse(Buffer.from(p.state).toString("utf8")) : null,
+			),
+		);
+		await db.project.update({
+			where: { id: projectId },
+			data: { draft: state },
+		});
+		resp.status(200);
+	} catch (e) {
+		console.log(e);
+		resp.status(500);
+	} finally {
+		resp.end();
+	}
+};
+
 const handleStatus: express.Handler = async (req, resp) => {
 	try {
 		const projectId = Number(req.params["projectId"]);
@@ -791,7 +844,7 @@
 
 		let lastLogId: number | undefined = undefined;
 		const initialLogs = (await logStore.get(projectId, service, workerId)) || [];
-		sendLogs(initialLogs);
+		await sendLogs(initialLogs);
 		if (initialLogs.length > 0) {
 			lastLogId = initialLogs[initialLogs.length - 1].id;
 		}
@@ -800,7 +853,7 @@
 		const intervalId = setInterval(async () => {
 			const currentLogs = (await logStore.get(projectId, service, workerId, lastLogId)) || [];
 			if (currentLogs.length > 0) {
-				sendLogs(currentLogs);
+				await sendLogs(currentLogs);
 				lastLogId = currentLogs[currentLogs.length - 1].id;
 			}
 		}, 500);
@@ -1099,6 +1152,45 @@
 	}
 };
 
+function handleStateGetStream(state: "deploy" | "draft"): express.Handler {
+	return async (req, resp) => {
+		resp.setHeader("Content-Type", "text/event-stream");
+		resp.setHeader("Cache-Control", "no-cache");
+		resp.setHeader("Connection", "keep-alive");
+		resp.flushHeaders();
+
+		try {
+			let intervalId: NodeJS.Timeout | null = null;
+			let lastState: Graph | null = null;
+			const sendState = async () => {
+				const currentState = await getState(Number(req.params["projectId"]), resp.locals.userId, state);
+				if (currentState == null) {
+					resp.status(404).end();
+					return;
+				}
+				if (JSON.stringify(currentState) !== JSON.stringify(lastState)) {
+					lastState = currentState;
+					resp.write("event: message\n");
+					resp.write(`data: ${JSON.stringify(currentState)}\n\n`);
+				}
+				intervalId = setTimeout(sendState, 500);
+			};
+
+			await sendState();
+
+			req.on("close", () => {
+				if (intervalId) {
+					clearTimeout(intervalId);
+				}
+				resp.end();
+			});
+		} catch (e) {
+			console.log(e);
+			resp.end();
+		}
+	};
+}
+
 async function start() {
 	await db.$connect();
 	const app = express();
@@ -1112,7 +1204,10 @@
 	const projectRouter = express.Router();
 	projectRouter.use(auth);
 	projectRouter.post("/:projectId/analyze", handleAnalyzeRepo);
+	projectRouter.post("/:projectId/saved/config", handleSaveFromConfig);
 	projectRouter.post("/:projectId/saved", handleSave);
+	projectRouter.get("/:projectId/state/stream/deploy", handleStateGetStream("deploy"));
+	projectRouter.get("/:projectId/state/stream/draft", handleStateGetStream("draft"));
 	projectRouter.get("/:projectId/saved/deploy", handleSavedGet("deploy"));
 	projectRouter.get("/:projectId/saved/draft", handleSavedGet("draft"));
 	projectRouter.post("/:projectId/deploy", handleDeploy);