AppRunner: Automatically annotate logs

Change-Id: I5e614fc1e841e183ac649758972428ae55162a67
diff --git a/apps/app-runner/server.go b/apps/app-runner/server.go
index 6efd222..7c74575 100644
--- a/apps/app-runner/server.go
+++ b/apps/app-runner/server.go
@@ -30,7 +30,9 @@
 }
 
 type Server struct {
-	l           sync.Locker
+	l sync.Locker
+	// TODO(gio): randomly generate string
+	runId       int
 	agentMode   bool
 	port        int
 	appId       string
@@ -46,14 +48,18 @@
 	runCommands []Command
 	self        string
 	managerAddr string
-	logs        *Log
+	logs        *Logger
+	logM        io.Writer
 	currDir     string
 	status      *Status
 }
 
 func NewServer(agentMode bool, port int, appId, service, id, repoAddr, branch, rootDir string, signer ssh.Signer, appDir string, runCommands []Command, self string, manager string) *Server {
+	logger := NewLogger("0")
+	logM := io.MultiWriter(os.Stdout, logger)
 	return &Server{
 		l:           &sync.Mutex{},
+		runId:       0,
 		agentMode:   agentMode,
 		port:        port,
 		ready:       false,
@@ -68,7 +74,8 @@
 		runCommands: runCommands,
 		self:        self,
 		managerAddr: manager,
-		logs:        &Log{},
+		logs:        logger,
+		logM:        logM,
 		currDir:     "",
 		status:      nil,
 	}
@@ -88,7 +95,11 @@
 }
 
 func (s *Server) handleLogs(w http.ResponseWriter, r *http.Request) {
-	fmt.Fprint(w, s.logs.Contents())
+	if logs, err := s.logs.Contents(); err != nil {
+		http.Error(w, "not ready", http.StatusInternalServerError)
+	} else {
+		fmt.Fprint(w, logs)
+	}
 }
 
 func (s *Server) handleReady(w http.ResponseWriter, r *http.Request) {
@@ -101,12 +112,16 @@
 	}
 }
 
+func (s *Server) log(tmpl string, args ...any) {
+	contents := fmt.Sprintf(tmpl, args...)
+	fmt.Fprintf(s.logs, "\033[38;5;212;136;141mdodo:\033[0;00m %s\n", contents)
+}
+
 func (s *Server) handleUpdate(w http.ResponseWriter, r *http.Request) {
-	fmt.Println("update")
 	s.l.Lock()
 	s.ready = false
 	s.l.Unlock()
-	fmt.Fprintf(s.logs, "!!! dodo: Reloading service\n")
+	s.log("Reloading service")
 	if err := s.run(); err != nil {
 		http.Error(w, err.Error(), http.StatusInternalServerError)
 		return
@@ -122,7 +137,6 @@
 }
 
 func (s *Server) run() error {
-	logM := io.MultiWriter(os.Stdout, s.logs)
 	newDir := s.appDir
 	commands := []command{}
 	if !s.agentMode {
@@ -136,13 +150,13 @@
 		if _, err := os.Stat(filepath.Join(newDir, ".git")); err != nil && os.IsNotExist(err) {
 			commit, err := CloneRepositoryBranch(s.repoAddr, s.branch, s.rootDir, s.signer, newDir)
 			if err != nil {
-				fmt.Fprintf(logM, "!!! dodo: Failed to clone repository: %s\n", err)
+				s.log("Failed to clone repository: %s", err)
 				s.status = &Status{
 					Commit: nil,
 				}
 				return err
 			}
-			fmt.Fprintf(logM, "!!! dodo: Successfully cloned repository %s\n", commit.Hash)
+			s.log("Successfully cloned repository %s", commit.Hash)
 			s.status = &Status{
 				Commit:   commit,
 				Commands: []CommandStatus{},
@@ -211,11 +225,11 @@
 			Path:   "/bin/sh",
 			Args:   []string{"/bin/sh", "-c", c.cmd},
 			Env:    append(os.Environ(), c.env...),
-			Stdout: logM,
-			Stderr: logM,
+			Stdout: s.logM,
+			Stderr: s.logM,
 		}
 		cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
-		fmt.Fprintf(logM, "!!! dodo: Running: %s\n", c)
+		s.log("Running: %s", c)
 		s.status.Commands[i].State = "running"
 		if i < len(commands)-1 {
 			if err := cmd.Run(); err != nil {
@@ -244,11 +258,24 @@
 }
 
 type pingReq struct {
-	Id      string  `json:"id"`
-	Service string  `json:"service"`
-	Address string  `json:"address"`
-	Status  *Status `json:"status,omitempty"`
-	Logs    string  `json:"logs"`
+	Id      string    `json:"id"`
+	Service string    `json:"service"`
+	Address string    `json:"address"`
+	Status  *Status   `json:"status,omitempty"`
+	Logs    []LogItem `json:"logs"`
+}
+
+type pingResp struct {
+	Success          bool `json:"success"`
+	LogItemsConsumed int  `json:"logItemsConsumed"`
+}
+
+func min(a, b int) int {
+	if a < b {
+		return a
+	} else {
+		return b
+	}
 }
 
 func (s *Server) pingManager() {
@@ -258,12 +285,14 @@
 			s.pingManager()
 		}()
 	}()
+	logItems := s.logs.Items()
+	logItems = logItems[:min(100, len(logItems))]
 	buf, err := json.Marshal(pingReq{
 		Id:      s.id,
 		Service: s.service,
 		Address: fmt.Sprintf("http://%s:%d", s.self, s.port),
 		Status:  s.status,
-		Logs:    s.logs.Contents(),
+		Logs:    logItems,
 	})
 	if err != nil {
 		return
@@ -273,8 +302,12 @@
 	if err != nil {
 		fmt.Println(err)
 	} else {
-		// check resp code
-		io.Copy(os.Stdout, resp.Body)
+		var r pingResp
+		if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
+			fmt.Printf("%s\n", err)
+		} else if r.Success {
+			s.logs.Trim(r.LogItemsConsumed)
+		}
 	}
 }