Canvas: Implement streaming state updates

Change-Id: I2bc5a51b5792839bde93f927f5ffea22b3250fe2
diff --git a/apps/canvas/back/src/index.ts b/apps/canvas/back/src/index.ts
index cc8d9ef..38122f8 100644
--- a/apps/canvas/back/src/index.ts
+++ b/apps/canvas/back/src/index.ts
@@ -16,11 +16,11 @@
 	Env,
 	generateDodoConfig,
 	ConfigSchema,
-	AppNode,
 	ConfigWithInput,
 	configToGraph,
 	Network,
 	GithubRepository,
+	Graph,
 } from "config";
 import { Instant, DateTimeFormatter, ZoneId } from "@js-joda/core";
 import LogStore from "./log.js";
@@ -114,63 +114,62 @@
 	}
 };
 
+async function getState(projectId: number, userId: string, state: "deploy" | "draft"): Promise<Graph | null> {
+	const r = await db.project.findUnique({
+		where: {
+			id: projectId,
+			userId: userId,
+		},
+		select: {
+			state: true,
+			draft: true,
+		},
+	});
+	if (r == null) {
+		return null;
+	}
+	let currentState: Graph | null = null;
+	if (state === "deploy") {
+		if (r.state != null) {
+			currentState = JSON.parse(Buffer.from(r.state).toString("utf8"));
+		}
+	} else {
+		if (r.draft == null) {
+			if (r.state == null) {
+				currentState = {
+					nodes: [],
+					edges: [],
+					viewport: { x: 0, y: 0, zoom: 1 },
+				};
+			} else {
+				currentState = JSON.parse(Buffer.from(r.state).toString("utf8"));
+			}
+		} else {
+			currentState = JSON.parse(Buffer.from(r.draft).toString("utf8"));
+		}
+	}
+	return currentState;
+}
+
 function handleSavedGet(state: "deploy" | "draft"): express.Handler {
 	return async (req, resp) => {
 		try {
-			const r = await db.project.findUnique({
-				where: {
-					id: Number(req.params["projectId"]),
-					userId: resp.locals.userId,
-				},
-				select: {
-					state: true,
-					draft: true,
-				},
-			});
-			if (r == null) {
+			const projectId = Number(req.params["projectId"]);
+			const graph = await getState(projectId, resp.locals.userId, state);
+			if (graph == null) {
 				resp.status(404);
 				return;
 			}
+			const env = await getEnv(projectId, resp.locals.userId, resp.locals.username);
+			const config = generateDodoConfig(projectId.toString(), graph.nodes, env);
 			resp.status(200);
 			resp.header("content-type", "application/json");
-			let currentState: Record<string, unknown> | null = null;
-			if (state === "deploy") {
-				if (r.state == null) {
-					currentState = {
-						nodes: [],
-						edges: [],
-						viewport: { x: 0, y: 0, zoom: 1 },
-					};
-				} else {
-					currentState = JSON.parse(Buffer.from(r.state).toString("utf8"));
-				}
-			} else {
-				if (r.draft == null) {
-					if (r.state == null) {
-						currentState = {
-							nodes: [],
-							edges: [],
-							viewport: { x: 0, y: 0, zoom: 1 },
-						};
-					} else {
-						currentState = JSON.parse(Buffer.from(r.state).toString("utf8"));
-					}
-				} else {
-					currentState = JSON.parse(Buffer.from(r.draft).toString("utf8"));
-				}
-			}
-			const env = await getEnv(Number(req.params["projectId"]), resp.locals.userId, resp.locals.username);
-			if (currentState) {
-				const config = generateDodoConfig(
-					req.params["projectId"].toString(),
-					currentState.nodes as AppNode[],
-					env,
-				);
-				resp.send({
-					state: currentState,
+			resp.write(
+				JSON.stringify({
+					state: graph,
 					config,
-				});
-			}
+				}),
+			);
 		} catch (e) {
 			console.log(e);
 			resp.status(500);
@@ -432,6 +431,60 @@
 	}
 };
 
+const handleSaveFromConfig: express.Handler = async (req, resp) => {
+	try {
+		const projectId = Number(req.params["projectId"]);
+		const p = await db.project.findUnique({
+			where: {
+				id: projectId,
+				// userId: resp.locals.userId, TODO(gio): validate
+			},
+			select: {
+				instanceId: true,
+				githubToken: true,
+				deployKey: true,
+				deployKeyPublic: true,
+				state: true,
+				geminiApiKey: true,
+				anthropicApiKey: true,
+			},
+		});
+		if (p === null) {
+			resp.status(404);
+			return;
+		}
+		const config = ConfigSchema.safeParse(req.body.config);
+		if (!config.success) {
+			resp.status(400);
+			resp.write(JSON.stringify({ error: "Invalid configuration", issues: config.error.format() }));
+			return;
+		}
+		let repos: GithubRepository[] = [];
+		if (p.githubToken) {
+			const github = new GithubClient(p.githubToken);
+			repos = await github.getRepositories();
+		}
+		const state = JSON.stringify(
+			configToGraph(
+				config.data,
+				getNetworks(resp.locals.username),
+				repos,
+				p.state ? JSON.parse(Buffer.from(p.state).toString("utf8")) : null,
+			),
+		);
+		await db.project.update({
+			where: { id: projectId },
+			data: { draft: state },
+		});
+		resp.status(200);
+	} catch (e) {
+		console.log(e);
+		resp.status(500);
+	} finally {
+		resp.end();
+	}
+};
+
 const handleStatus: express.Handler = async (req, resp) => {
 	try {
 		const projectId = Number(req.params["projectId"]);
@@ -791,7 +844,7 @@
 
 		let lastLogId: number | undefined = undefined;
 		const initialLogs = (await logStore.get(projectId, service, workerId)) || [];
-		sendLogs(initialLogs);
+		await sendLogs(initialLogs);
 		if (initialLogs.length > 0) {
 			lastLogId = initialLogs[initialLogs.length - 1].id;
 		}
@@ -800,7 +853,7 @@
 		const intervalId = setInterval(async () => {
 			const currentLogs = (await logStore.get(projectId, service, workerId, lastLogId)) || [];
 			if (currentLogs.length > 0) {
-				sendLogs(currentLogs);
+				await sendLogs(currentLogs);
 				lastLogId = currentLogs[currentLogs.length - 1].id;
 			}
 		}, 500);
@@ -1099,6 +1152,45 @@
 	}
 };
 
+function handleStateGetStream(state: "deploy" | "draft"): express.Handler {
+	return async (req, resp) => {
+		resp.setHeader("Content-Type", "text/event-stream");
+		resp.setHeader("Cache-Control", "no-cache");
+		resp.setHeader("Connection", "keep-alive");
+		resp.flushHeaders();
+
+		try {
+			let intervalId: NodeJS.Timeout | null = null;
+			let lastState: Graph | null = null;
+			const sendState = async () => {
+				const currentState = await getState(Number(req.params["projectId"]), resp.locals.userId, state);
+				if (currentState == null) {
+					resp.status(404).end();
+					return;
+				}
+				if (JSON.stringify(currentState) !== JSON.stringify(lastState)) {
+					lastState = currentState;
+					resp.write("event: message\n");
+					resp.write(`data: ${JSON.stringify(currentState)}\n\n`);
+				}
+				intervalId = setTimeout(sendState, 500);
+			};
+
+			await sendState();
+
+			req.on("close", () => {
+				if (intervalId) {
+					clearTimeout(intervalId);
+				}
+				resp.end();
+			});
+		} catch (e) {
+			console.log(e);
+			resp.end();
+		}
+	};
+}
+
 async function start() {
 	await db.$connect();
 	const app = express();
@@ -1112,7 +1204,10 @@
 	const projectRouter = express.Router();
 	projectRouter.use(auth);
 	projectRouter.post("/:projectId/analyze", handleAnalyzeRepo);
+	projectRouter.post("/:projectId/saved/config", handleSaveFromConfig);
 	projectRouter.post("/:projectId/saved", handleSave);
+	projectRouter.get("/:projectId/state/stream/deploy", handleStateGetStream("deploy"));
+	projectRouter.get("/:projectId/state/stream/draft", handleStateGetStream("draft"));
 	projectRouter.get("/:projectId/saved/deploy", handleSavedGet("deploy"));
 	projectRouter.get("/:projectId/saved/draft", handleSavedGet("draft"));
 	projectRouter.post("/:projectId/deploy", handleDeploy);
diff --git a/apps/canvas/config/src/config.ts b/apps/canvas/config/src/config.ts
index b0aa744..dcc4318 100644
--- a/apps/canvas/config/src/config.ts
+++ b/apps/canvas/config/src/config.ts
@@ -189,6 +189,7 @@
 export type Graph = {
 	nodes: AppNode[];
 	edges: Edge[];
+	viewport?: { x: number; y: number; zoom: number };
 };
 
 export function configToGraph(config: Config, networks: Network[], repos: GithubRepository[], current?: Graph): Graph {
diff --git a/apps/canvas/config/src/index.ts b/apps/canvas/config/src/index.ts
index f8db8fe..569d347 100644
--- a/apps/canvas/config/src/index.ts
+++ b/apps/canvas/config/src/index.ts
@@ -52,7 +52,7 @@
 	AgentAccess,
 } from "./graph.js";
 
-export { generateDodoConfig, configToGraph } from "./config.js";
+export { generateDodoConfig, configToGraph, Graph } from "./config.js";
 
 export {
 	GithubRepository,
diff --git a/apps/canvas/front/src/lib/state.ts b/apps/canvas/front/src/lib/state.ts
index cd5f29c..0132721 100644
--- a/apps/canvas/front/src/lib/state.ts
+++ b/apps/canvas/front/src/lib/state.ts
@@ -209,6 +209,7 @@
 	githubRepositories: GitHubRepository[];
 	githubRepositoriesLoading: boolean;
 	githubRepositoriesError: string | null;
+	stateEventSource: EventSource | null;
 	setHighlightCategory: (name: string, active: boolean) => void;
 	onNodesChange: OnNodesChange<AppNode>;
 	onEdgesChange: OnEdgesChange;
@@ -409,24 +410,6 @@
 		});
 	};
 
-	const restoreSaved = async () => {
-		const { projectId } = get();
-		const resp = await fetch(`/api/project/${projectId}/saved/${get().mode === "deploy" ? "deploy" : "draft"}`, {
-			method: "GET",
-		});
-		const inst = await resp.json();
-		setN(inst.state.nodes);
-		set({ edges: inst.state.edges });
-		injectNetworkNodes();
-		if (
-			get().zoom.x !== inst.state.viewport.x ||
-			get().zoom.y !== inst.state.viewport.y ||
-			get().zoom.zoom !== inst.state.viewport.zoom
-		) {
-			set({ zoom: inst.state.viewport });
-		}
-	};
-
 	function updateNodeData<T extends NodeType>(id: string, data: NodeDataUpdate<T>): void {
 		setN(
 			get().nodes.map((n) => {
@@ -629,6 +612,43 @@
 		}
 	};
 
+	const disconnectFromStateStream = () => {
+		const { stateEventSource } = get();
+		if (stateEventSource) {
+			stateEventSource.close();
+			set({ stateEventSource: null });
+		}
+	};
+
+	const connectToStateStream = (projectId: string, mode: "deploy" | "edit") => {
+		disconnectFromStateStream();
+
+		const eventSource = new EventSource(
+			`/api/project/${projectId}/state/stream/${mode === "edit" ? "draft" : "deploy"}`,
+		);
+		set({ stateEventSource: eventSource });
+
+		eventSource.onmessage = (event) => {
+			const inst = JSON.parse(event.data);
+			setN(inst.nodes);
+			set({ edges: inst.edges });
+			injectNetworkNodes();
+			if (
+				get().zoom.x !== inst.viewport.x ||
+				get().zoom.y !== inst.viewport.y ||
+				get().zoom.zoom !== inst.viewport.zoom
+			) {
+				set({ zoom: inst.viewport });
+			}
+		};
+
+		eventSource.onerror = (err) => {
+			console.error("EventSource failed:", err);
+			eventSource.close();
+			set({ stateEventSource: null });
+		};
+	};
+
 	return {
 		projectId: undefined,
 		mode: "edit",
@@ -654,6 +674,7 @@
 		githubRepositories: [],
 		githubRepositoriesLoading: false,
 		githubRepositoriesError: null,
+		stateEventSource: null,
 		setViewport: (viewport) => {
 			const { viewport: vp } = get();
 			if (
@@ -780,7 +801,12 @@
 			}
 		},
 		setMode: (mode) => {
+			disconnectFromStateStream();
 			set({ mode });
+			const projectId = get().projectId;
+			if (projectId) {
+				connectToStateStream(projectId, mode);
+			}
 		},
 		setProject: async (projectId) => {
 			const currentProjectId = get().projectId;
@@ -788,6 +814,7 @@
 				return;
 			}
 			stopRefreshEnvInterval();
+			disconnectFromStateStream();
 			set({
 				projectId,
 				githubRepositories: [],
@@ -801,7 +828,8 @@
 				} else {
 					set({ mode: "edit" });
 				}
-				restoreSaved();
+				const mode = get().mode;
+				connectToStateStream(projectId, mode);
 				startRefreshEnvInterval();
 			} else {
 				set({