Canvas: Implement streaming state updates
Change-Id: I2bc5a51b5792839bde93f927f5ffea22b3250fe2
diff --git a/apps/canvas/back/src/index.ts b/apps/canvas/back/src/index.ts
index cc8d9ef..38122f8 100644
--- a/apps/canvas/back/src/index.ts
+++ b/apps/canvas/back/src/index.ts
@@ -16,11 +16,11 @@
Env,
generateDodoConfig,
ConfigSchema,
- AppNode,
ConfigWithInput,
configToGraph,
Network,
GithubRepository,
+ Graph,
} from "config";
import { Instant, DateTimeFormatter, ZoneId } from "@js-joda/core";
import LogStore from "./log.js";
@@ -114,63 +114,62 @@
}
};
+async function getState(projectId: number, userId: string, state: "deploy" | "draft"): Promise<Graph | null> {
+ const r = await db.project.findUnique({
+ where: {
+ id: projectId,
+ userId: userId,
+ },
+ select: {
+ state: true,
+ draft: true,
+ },
+ });
+ if (r == null) {
+ return null;
+ }
+ let currentState: Graph | null = null;
+ if (state === "deploy") {
+ if (r.state != null) {
+ currentState = JSON.parse(Buffer.from(r.state).toString("utf8"));
+ }
+ } else {
+ if (r.draft == null) {
+ if (r.state == null) {
+ currentState = {
+ nodes: [],
+ edges: [],
+ viewport: { x: 0, y: 0, zoom: 1 },
+ };
+ } else {
+ currentState = JSON.parse(Buffer.from(r.state).toString("utf8"));
+ }
+ } else {
+ currentState = JSON.parse(Buffer.from(r.draft).toString("utf8"));
+ }
+ }
+ return currentState;
+}
+
function handleSavedGet(state: "deploy" | "draft"): express.Handler {
return async (req, resp) => {
try {
- const r = await db.project.findUnique({
- where: {
- id: Number(req.params["projectId"]),
- userId: resp.locals.userId,
- },
- select: {
- state: true,
- draft: true,
- },
- });
- if (r == null) {
+ const projectId = Number(req.params["projectId"]);
+ const graph = await getState(projectId, resp.locals.userId, state);
+ if (graph == null) {
resp.status(404);
return;
}
+ const env = await getEnv(projectId, resp.locals.userId, resp.locals.username);
+ const config = generateDodoConfig(projectId.toString(), graph.nodes, env);
resp.status(200);
resp.header("content-type", "application/json");
- let currentState: Record<string, unknown> | null = null;
- if (state === "deploy") {
- if (r.state == null) {
- currentState = {
- nodes: [],
- edges: [],
- viewport: { x: 0, y: 0, zoom: 1 },
- };
- } else {
- currentState = JSON.parse(Buffer.from(r.state).toString("utf8"));
- }
- } else {
- if (r.draft == null) {
- if (r.state == null) {
- currentState = {
- nodes: [],
- edges: [],
- viewport: { x: 0, y: 0, zoom: 1 },
- };
- } else {
- currentState = JSON.parse(Buffer.from(r.state).toString("utf8"));
- }
- } else {
- currentState = JSON.parse(Buffer.from(r.draft).toString("utf8"));
- }
- }
- const env = await getEnv(Number(req.params["projectId"]), resp.locals.userId, resp.locals.username);
- if (currentState) {
- const config = generateDodoConfig(
- req.params["projectId"].toString(),
- currentState.nodes as AppNode[],
- env,
- );
- resp.send({
- state: currentState,
+ resp.write(
+ JSON.stringify({
+ state: graph,
config,
- });
- }
+ }),
+ );
} catch (e) {
console.log(e);
resp.status(500);
@@ -432,6 +431,60 @@
}
};
+const handleSaveFromConfig: express.Handler = async (req, resp) => {
+ try {
+ const projectId = Number(req.params["projectId"]);
+ const p = await db.project.findUnique({
+ where: {
+ id: projectId,
+ // userId: resp.locals.userId, TODO(gio): validate
+ },
+ select: {
+ instanceId: true,
+ githubToken: true,
+ deployKey: true,
+ deployKeyPublic: true,
+ state: true,
+ geminiApiKey: true,
+ anthropicApiKey: true,
+ },
+ });
+ if (p === null) {
+ resp.status(404);
+ return;
+ }
+ const config = ConfigSchema.safeParse(req.body.config);
+ if (!config.success) {
+ resp.status(400);
+ resp.write(JSON.stringify({ error: "Invalid configuration", issues: config.error.format() }));
+ return;
+ }
+ let repos: GithubRepository[] = [];
+ if (p.githubToken) {
+ const github = new GithubClient(p.githubToken);
+ repos = await github.getRepositories();
+ }
+ const state = JSON.stringify(
+ configToGraph(
+ config.data,
+ getNetworks(resp.locals.username),
+ repos,
+ p.state ? JSON.parse(Buffer.from(p.state).toString("utf8")) : null,
+ ),
+ );
+ await db.project.update({
+ where: { id: projectId },
+ data: { draft: state },
+ });
+ resp.status(200);
+ } catch (e) {
+ console.log(e);
+ resp.status(500);
+ } finally {
+ resp.end();
+ }
+};
+
const handleStatus: express.Handler = async (req, resp) => {
try {
const projectId = Number(req.params["projectId"]);
@@ -791,7 +844,7 @@
let lastLogId: number | undefined = undefined;
const initialLogs = (await logStore.get(projectId, service, workerId)) || [];
- sendLogs(initialLogs);
+ await sendLogs(initialLogs);
if (initialLogs.length > 0) {
lastLogId = initialLogs[initialLogs.length - 1].id;
}
@@ -800,7 +853,7 @@
const intervalId = setInterval(async () => {
const currentLogs = (await logStore.get(projectId, service, workerId, lastLogId)) || [];
if (currentLogs.length > 0) {
- sendLogs(currentLogs);
+ await sendLogs(currentLogs);
lastLogId = currentLogs[currentLogs.length - 1].id;
}
}, 500);
@@ -1099,6 +1152,45 @@
}
};
+function handleStateGetStream(state: "deploy" | "draft"): express.Handler {
+ return async (req, resp) => {
+ resp.setHeader("Content-Type", "text/event-stream");
+ resp.setHeader("Cache-Control", "no-cache");
+ resp.setHeader("Connection", "keep-alive");
+ resp.flushHeaders();
+
+ try {
+ let intervalId: NodeJS.Timeout | null = null;
+ let lastState: Graph | null = null;
+ const sendState = async () => {
+ const currentState = await getState(Number(req.params["projectId"]), resp.locals.userId, state);
+ if (currentState == null) {
+ resp.status(404).end();
+ return;
+ }
+ if (JSON.stringify(currentState) !== JSON.stringify(lastState)) {
+ lastState = currentState;
+ resp.write("event: message\n");
+ resp.write(`data: ${JSON.stringify(currentState)}\n\n`);
+ }
+ intervalId = setTimeout(sendState, 500);
+ };
+
+ await sendState();
+
+ req.on("close", () => {
+ if (intervalId) {
+ clearTimeout(intervalId);
+ }
+ resp.end();
+ });
+ } catch (e) {
+ console.log(e);
+ resp.end();
+ }
+ };
+}
+
async function start() {
await db.$connect();
const app = express();
@@ -1112,7 +1204,10 @@
const projectRouter = express.Router();
projectRouter.use(auth);
projectRouter.post("/:projectId/analyze", handleAnalyzeRepo);
+ projectRouter.post("/:projectId/saved/config", handleSaveFromConfig);
projectRouter.post("/:projectId/saved", handleSave);
+ projectRouter.get("/:projectId/state/stream/deploy", handleStateGetStream("deploy"));
+ projectRouter.get("/:projectId/state/stream/draft", handleStateGetStream("draft"));
projectRouter.get("/:projectId/saved/deploy", handleSavedGet("deploy"));
projectRouter.get("/:projectId/saved/draft", handleSavedGet("draft"));
projectRouter.post("/:projectId/deploy", handleDeploy);