Canvas: Implement streaming state updates
Change-Id: I2bc5a51b5792839bde93f927f5ffea22b3250fe2
diff --git a/apps/canvas/back/src/index.ts b/apps/canvas/back/src/index.ts
index cc8d9ef..38122f8 100644
--- a/apps/canvas/back/src/index.ts
+++ b/apps/canvas/back/src/index.ts
@@ -16,11 +16,11 @@
Env,
generateDodoConfig,
ConfigSchema,
- AppNode,
ConfigWithInput,
configToGraph,
Network,
GithubRepository,
+ Graph,
} from "config";
import { Instant, DateTimeFormatter, ZoneId } from "@js-joda/core";
import LogStore from "./log.js";
@@ -114,63 +114,62 @@
}
};
+async function getState(projectId: number, userId: string, state: "deploy" | "draft"): Promise<Graph | null> {
+ const r = await db.project.findUnique({
+ where: {
+ id: projectId,
+ userId: userId,
+ },
+ select: {
+ state: true,
+ draft: true,
+ },
+ });
+ if (r == null) {
+ return null;
+ }
+ let currentState: Graph | null = null;
+ if (state === "deploy") {
+ if (r.state != null) {
+ currentState = JSON.parse(Buffer.from(r.state).toString("utf8"));
+ }
+ } else {
+ if (r.draft == null) {
+ if (r.state == null) {
+ currentState = {
+ nodes: [],
+ edges: [],
+ viewport: { x: 0, y: 0, zoom: 1 },
+ };
+ } else {
+ currentState = JSON.parse(Buffer.from(r.state).toString("utf8"));
+ }
+ } else {
+ currentState = JSON.parse(Buffer.from(r.draft).toString("utf8"));
+ }
+ }
+ return currentState;
+}
+
function handleSavedGet(state: "deploy" | "draft"): express.Handler {
return async (req, resp) => {
try {
- const r = await db.project.findUnique({
- where: {
- id: Number(req.params["projectId"]),
- userId: resp.locals.userId,
- },
- select: {
- state: true,
- draft: true,
- },
- });
- if (r == null) {
+ const projectId = Number(req.params["projectId"]);
+ const graph = await getState(projectId, resp.locals.userId, state);
+ if (graph == null) {
resp.status(404);
return;
}
+ const env = await getEnv(projectId, resp.locals.userId, resp.locals.username);
+ const config = generateDodoConfig(projectId.toString(), graph.nodes, env);
resp.status(200);
resp.header("content-type", "application/json");
- let currentState: Record<string, unknown> | null = null;
- if (state === "deploy") {
- if (r.state == null) {
- currentState = {
- nodes: [],
- edges: [],
- viewport: { x: 0, y: 0, zoom: 1 },
- };
- } else {
- currentState = JSON.parse(Buffer.from(r.state).toString("utf8"));
- }
- } else {
- if (r.draft == null) {
- if (r.state == null) {
- currentState = {
- nodes: [],
- edges: [],
- viewport: { x: 0, y: 0, zoom: 1 },
- };
- } else {
- currentState = JSON.parse(Buffer.from(r.state).toString("utf8"));
- }
- } else {
- currentState = JSON.parse(Buffer.from(r.draft).toString("utf8"));
- }
- }
- const env = await getEnv(Number(req.params["projectId"]), resp.locals.userId, resp.locals.username);
- if (currentState) {
- const config = generateDodoConfig(
- req.params["projectId"].toString(),
- currentState.nodes as AppNode[],
- env,
- );
- resp.send({
- state: currentState,
+ resp.write(
+ JSON.stringify({
+ state: graph,
config,
- });
- }
+ }),
+ );
} catch (e) {
console.log(e);
resp.status(500);
@@ -432,6 +431,60 @@
}
};
+const handleSaveFromConfig: express.Handler = async (req, resp) => {
+ try {
+ const projectId = Number(req.params["projectId"]);
+ const p = await db.project.findUnique({
+ where: {
+ id: projectId,
+ // userId: resp.locals.userId, TODO(gio): validate
+ },
+ select: {
+ instanceId: true,
+ githubToken: true,
+ deployKey: true,
+ deployKeyPublic: true,
+ state: true,
+ geminiApiKey: true,
+ anthropicApiKey: true,
+ },
+ });
+ if (p === null) {
+ resp.status(404);
+ return;
+ }
+ const config = ConfigSchema.safeParse(req.body.config);
+ if (!config.success) {
+ resp.status(400);
+ resp.write(JSON.stringify({ error: "Invalid configuration", issues: config.error.format() }));
+ return;
+ }
+ let repos: GithubRepository[] = [];
+ if (p.githubToken) {
+ const github = new GithubClient(p.githubToken);
+ repos = await github.getRepositories();
+ }
+ const state = JSON.stringify(
+ configToGraph(
+ config.data,
+ getNetworks(resp.locals.username),
+ repos,
+ p.state ? JSON.parse(Buffer.from(p.state).toString("utf8")) : null,
+ ),
+ );
+ await db.project.update({
+ where: { id: projectId },
+ data: { draft: state },
+ });
+ resp.status(200);
+ } catch (e) {
+ console.log(e);
+ resp.status(500);
+ } finally {
+ resp.end();
+ }
+};
+
const handleStatus: express.Handler = async (req, resp) => {
try {
const projectId = Number(req.params["projectId"]);
@@ -791,7 +844,7 @@
let lastLogId: number | undefined = undefined;
const initialLogs = (await logStore.get(projectId, service, workerId)) || [];
- sendLogs(initialLogs);
+ await sendLogs(initialLogs);
if (initialLogs.length > 0) {
lastLogId = initialLogs[initialLogs.length - 1].id;
}
@@ -800,7 +853,7 @@
const intervalId = setInterval(async () => {
const currentLogs = (await logStore.get(projectId, service, workerId, lastLogId)) || [];
if (currentLogs.length > 0) {
- sendLogs(currentLogs);
+ await sendLogs(currentLogs);
lastLogId = currentLogs[currentLogs.length - 1].id;
}
}, 500);
@@ -1099,6 +1152,45 @@
}
};
+function handleStateGetStream(state: "deploy" | "draft"): express.Handler {
+ return async (req, resp) => {
+ resp.setHeader("Content-Type", "text/event-stream");
+ resp.setHeader("Cache-Control", "no-cache");
+ resp.setHeader("Connection", "keep-alive");
+ resp.flushHeaders();
+
+ try {
+ let intervalId: NodeJS.Timeout | null = null;
+ let lastState: Graph | null = null;
+ const sendState = async () => {
+ const currentState = await getState(Number(req.params["projectId"]), resp.locals.userId, state);
+ if (currentState == null) {
+ resp.status(404).end();
+ return;
+ }
+ if (JSON.stringify(currentState) !== JSON.stringify(lastState)) {
+ lastState = currentState;
+ resp.write("event: message\n");
+ resp.write(`data: ${JSON.stringify(currentState)}\n\n`);
+ }
+ intervalId = setTimeout(sendState, 500);
+ };
+
+ await sendState();
+
+ req.on("close", () => {
+ if (intervalId) {
+ clearTimeout(intervalId);
+ }
+ resp.end();
+ });
+ } catch (e) {
+ console.log(e);
+ resp.end();
+ }
+ };
+}
+
async function start() {
await db.$connect();
const app = express();
@@ -1112,7 +1204,10 @@
const projectRouter = express.Router();
projectRouter.use(auth);
projectRouter.post("/:projectId/analyze", handleAnalyzeRepo);
+ projectRouter.post("/:projectId/saved/config", handleSaveFromConfig);
projectRouter.post("/:projectId/saved", handleSave);
+ projectRouter.get("/:projectId/state/stream/deploy", handleStateGetStream("deploy"));
+ projectRouter.get("/:projectId/state/stream/draft", handleStateGetStream("draft"));
projectRouter.get("/:projectId/saved/deploy", handleSavedGet("deploy"));
projectRouter.get("/:projectId/saved/draft", handleSavedGet("draft"));
projectRouter.post("/:projectId/deploy", handleDeploy);
diff --git a/apps/canvas/config/src/config.ts b/apps/canvas/config/src/config.ts
index b0aa744..dcc4318 100644
--- a/apps/canvas/config/src/config.ts
+++ b/apps/canvas/config/src/config.ts
@@ -189,6 +189,7 @@
export type Graph = {
nodes: AppNode[];
edges: Edge[];
+ viewport?: { x: number; y: number; zoom: number };
};
export function configToGraph(config: Config, networks: Network[], repos: GithubRepository[], current?: Graph): Graph {
diff --git a/apps/canvas/config/src/index.ts b/apps/canvas/config/src/index.ts
index f8db8fe..569d347 100644
--- a/apps/canvas/config/src/index.ts
+++ b/apps/canvas/config/src/index.ts
@@ -52,7 +52,7 @@
AgentAccess,
} from "./graph.js";
-export { generateDodoConfig, configToGraph } from "./config.js";
+export { generateDodoConfig, configToGraph, Graph } from "./config.js";
export {
GithubRepository,
diff --git a/apps/canvas/front/src/lib/state.ts b/apps/canvas/front/src/lib/state.ts
index cd5f29c..0132721 100644
--- a/apps/canvas/front/src/lib/state.ts
+++ b/apps/canvas/front/src/lib/state.ts
@@ -209,6 +209,7 @@
githubRepositories: GitHubRepository[];
githubRepositoriesLoading: boolean;
githubRepositoriesError: string | null;
+ stateEventSource: EventSource | null;
setHighlightCategory: (name: string, active: boolean) => void;
onNodesChange: OnNodesChange<AppNode>;
onEdgesChange: OnEdgesChange;
@@ -409,24 +410,6 @@
});
};
- const restoreSaved = async () => {
- const { projectId } = get();
- const resp = await fetch(`/api/project/${projectId}/saved/${get().mode === "deploy" ? "deploy" : "draft"}`, {
- method: "GET",
- });
- const inst = await resp.json();
- setN(inst.state.nodes);
- set({ edges: inst.state.edges });
- injectNetworkNodes();
- if (
- get().zoom.x !== inst.state.viewport.x ||
- get().zoom.y !== inst.state.viewport.y ||
- get().zoom.zoom !== inst.state.viewport.zoom
- ) {
- set({ zoom: inst.state.viewport });
- }
- };
-
function updateNodeData<T extends NodeType>(id: string, data: NodeDataUpdate<T>): void {
setN(
get().nodes.map((n) => {
@@ -629,6 +612,43 @@
}
};
+ const disconnectFromStateStream = () => {
+ const { stateEventSource } = get();
+ if (stateEventSource) {
+ stateEventSource.close();
+ set({ stateEventSource: null });
+ }
+ };
+
+ const connectToStateStream = (projectId: string, mode: "deploy" | "edit") => {
+ disconnectFromStateStream();
+
+ const eventSource = new EventSource(
+ `/api/project/${projectId}/state/stream/${mode === "edit" ? "draft" : "deploy"}`,
+ );
+ set({ stateEventSource: eventSource });
+
+ eventSource.onmessage = (event) => {
+ const inst = JSON.parse(event.data);
+ setN(inst.nodes);
+ set({ edges: inst.edges });
+ injectNetworkNodes();
+ if (
+ get().zoom.x !== inst.viewport.x ||
+ get().zoom.y !== inst.viewport.y ||
+ get().zoom.zoom !== inst.viewport.zoom
+ ) {
+ set({ zoom: inst.viewport });
+ }
+ };
+
+ eventSource.onerror = (err) => {
+ console.error("EventSource failed:", err);
+ eventSource.close();
+ set({ stateEventSource: null });
+ };
+ };
+
return {
projectId: undefined,
mode: "edit",
@@ -654,6 +674,7 @@
githubRepositories: [],
githubRepositoriesLoading: false,
githubRepositoriesError: null,
+ stateEventSource: null,
setViewport: (viewport) => {
const { viewport: vp } = get();
if (
@@ -780,7 +801,12 @@
}
},
setMode: (mode) => {
+ disconnectFromStateStream();
set({ mode });
+ const projectId = get().projectId;
+ if (projectId) {
+ connectToStateStream(projectId, mode);
+ }
},
setProject: async (projectId) => {
const currentProjectId = get().projectId;
@@ -788,6 +814,7 @@
return;
}
stopRefreshEnvInterval();
+ disconnectFromStateStream();
set({
projectId,
githubRepositories: [],
@@ -801,7 +828,8 @@
} else {
set({ mode: "edit" });
}
- restoreSaved();
+ const mode = get().mode;
+ connectToStateStream(projectId, mode);
startRefreshEnvInterval();
} else {
set({