AppRunner: Automatically annotate logs

Change-Id: I5e614fc1e841e183ac649758972428ae55162a67
diff --git a/Jenkinsfile b/Jenkinsfile
index 8f651cf..3ad77fd 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -46,6 +46,11 @@
 						sh 'go run cuelang.org/go/cmd/cue fmt app_configs/*.cue --check'
                 		sh 'go test ./...'
 					}
+                    dir('apps/app-runner') {
+                        sh 'go mod tidy'
+                		sh 'go build *.go'
+                		sh 'go test ./...'
+					}
                     dir('core/auth/memberships') {
                         sh 'go mod tidy'
                 		sh 'go build *.go'
diff --git a/apps/app-runner/Makefile b/apps/app-runner/Makefile
index ef1c483..07cc97f 100644
--- a/apps/app-runner/Makefile
+++ b/apps/app-runner/Makefile
@@ -22,6 +22,14 @@
 build: clean
 	/usr/local/go/bin/go build -o app-runner *.go
 
+test: export CGO_ENABLED=0
+test:
+	/usr/local/go/bin/go test ./...
+
+test: export CGO_ENABLED=0
+testv:
+	/usr/local/go/bin/go test -v ./...
+
 build_arm64: export CGO_ENABLED=0
 build_arm64: export GO111MODULE=on
 build_arm64: export GOOS=linux
diff --git a/apps/app-runner/log.go b/apps/app-runner/log.go
index 2e5f524..084e3d1 100644
--- a/apps/app-runner/log.go
+++ b/apps/app-runner/log.go
@@ -1,24 +1,97 @@
 package main
 
 import (
+	"bytes"
 	"strings"
 	"sync"
+	"time"
 )
 
-type Log struct {
-	l sync.Mutex
-	d strings.Builder
+type LogItem struct {
+	RunId          string `json:"runId"`
+	TimestampMilli int64  `json:"timestampMilli"`
+	Commit         string `json:"commit,omitempty"`
+	Contents       []byte `json:"contents"`
 }
 
-func (l *Log) Write(p []byte) (n int, err error) {
+type Logger struct {
+	l          sync.Mutex
+	runId      string
+	commitHash string
+	items      []LogItem
+	curr       LogItem
+}
+
+func NewLogger(runId string) *Logger {
+	return &Logger{
+		l:     sync.Mutex{},
+		runId: runId,
+		items: []LogItem{},
+		curr: LogItem{
+			RunId:    runId,
+			Contents: []byte{},
+		},
+	}
+}
+
+func (l *Logger) Write(p []byte) (n int, err error) {
 	l.l.Lock()
 	defer l.l.Unlock()
+	cnt := 0
 	// TODO(gio): Reset s.logs periodically
-	return l.d.Write(p)
+	for len(p) > 0 {
+		pos := bytes.Index(p, []byte("\n"))
+		if pos != -1 {
+			if l.curr.TimestampMilli == 0 {
+				l.curr.TimestampMilli = time.Now().UnixMilli()
+			}
+			l.curr.Contents = append(l.curr.Contents, p[:pos]...)
+			l.items = append(l.items, l.curr)
+			l.curr = LogItem{
+				RunId:    l.runId,
+				Contents: []byte{},
+			}
+			p = p[pos+len([]byte("\n")):]
+			cnt += pos + len([]byte("\n"))
+		} else {
+			if l.curr.TimestampMilli == 0 {
+				l.curr.TimestampMilli = time.Now().UnixMilli()
+			}
+			l.curr.Contents = append(l.curr.Contents, p...)
+			cnt += len(p)
+			p = []byte{}
+		}
+	}
+	return cnt, nil
 }
 
-func (l *Log) Contents() string {
+func (l *Logger) Items() []LogItem {
 	l.l.Lock()
 	defer l.l.Unlock()
-	return l.d.String()
+	ret := []LogItem{}
+	for _, i := range l.items {
+		ret = append(ret, i)
+	}
+	return ret
+}
+
+func (l *Logger) Trim(n int) {
+	l.l.Lock()
+	defer l.l.Unlock()
+	l.items = l.items[n:]
+}
+
+func (l *Logger) Contents() (string, error) {
+	l.l.Lock()
+	defer l.l.Unlock()
+	var ret strings.Builder
+	for _, i := range l.items {
+		if _, err := ret.Write(i.Contents); err != nil {
+			return "", err
+		}
+		if _, err := ret.WriteString("\n"); err != nil {
+			return "", err
+		}
+	}
+	return ret.String(), nil
 }
diff --git a/apps/app-runner/log_test.go b/apps/app-runner/log_test.go
new file mode 100644
index 0000000..777cf8f
--- /dev/null
+++ b/apps/app-runner/log_test.go
@@ -0,0 +1,38 @@
+package main
+
+import (
+	"fmt"
+	"testing"
+)
+
+func TestLoggerWriteFullLine(t *testing.T) {
+	l := NewLogger("foo")
+	fmt.Fprintf(l, "hi\n")
+	c, err := l.Contents()
+	if err != nil {
+		t.Fatal(err)
+	}
+	if c != "hi\n" {
+		t.Fatal(c)
+	}
+}
+
+func TestLoggerWritePartialLine(t *testing.T) {
+	l := NewLogger("foo")
+	fmt.Fprintf(l, "hi\nfoo")
+	c, err := l.Contents()
+	if err != nil {
+		t.Fatal(err)
+	}
+	if c != "hi\n" {
+		t.Fatal(c)
+	}
+	fmt.Fprintf(l, "bar\n")
+	c, err = l.Contents()
+	if err != nil {
+		t.Fatal(err)
+	}
+	if c != "hi\nfoobar\n" {
+		t.Fatal(c)
+	}
+}
diff --git a/apps/app-runner/server.go b/apps/app-runner/server.go
index 6efd222..7c74575 100644
--- a/apps/app-runner/server.go
+++ b/apps/app-runner/server.go
@@ -30,7 +30,9 @@
 }
 
 type Server struct {
-	l           sync.Locker
+	l sync.Locker
+	// TODO(gio): randomly generate string
+	runId       int
 	agentMode   bool
 	port        int
 	appId       string
@@ -46,14 +48,18 @@
 	runCommands []Command
 	self        string
 	managerAddr string
-	logs        *Log
+	logs        *Logger
+	logM        io.Writer
 	currDir     string
 	status      *Status
 }
 
 func NewServer(agentMode bool, port int, appId, service, id, repoAddr, branch, rootDir string, signer ssh.Signer, appDir string, runCommands []Command, self string, manager string) *Server {
+	logger := NewLogger("0")
+	logM := io.MultiWriter(os.Stdout, logger)
 	return &Server{
 		l:           &sync.Mutex{},
+		runId:       0,
 		agentMode:   agentMode,
 		port:        port,
 		ready:       false,
@@ -68,7 +74,8 @@
 		runCommands: runCommands,
 		self:        self,
 		managerAddr: manager,
-		logs:        &Log{},
+		logs:        logger,
+		logM:        logM,
 		currDir:     "",
 		status:      nil,
 	}
@@ -88,7 +95,11 @@
 }
 
 func (s *Server) handleLogs(w http.ResponseWriter, r *http.Request) {
-	fmt.Fprint(w, s.logs.Contents())
+	if logs, err := s.logs.Contents(); err != nil {
+		http.Error(w, "not ready", http.StatusInternalServerError)
+	} else {
+		fmt.Fprint(w, logs)
+	}
 }
 
 func (s *Server) handleReady(w http.ResponseWriter, r *http.Request) {
@@ -101,12 +112,16 @@
 	}
 }
 
+func (s *Server) log(tmpl string, args ...any) {
+	contents := fmt.Sprintf(tmpl, args...)
+	fmt.Fprintf(s.logs, "\033[38;5;212;136;141mdodo:\033[0;00m %s\n", contents)
+}
+
 func (s *Server) handleUpdate(w http.ResponseWriter, r *http.Request) {
-	fmt.Println("update")
 	s.l.Lock()
 	s.ready = false
 	s.l.Unlock()
-	fmt.Fprintf(s.logs, "!!! dodo: Reloading service\n")
+	s.log("Reloading service")
 	if err := s.run(); err != nil {
 		http.Error(w, err.Error(), http.StatusInternalServerError)
 		return
@@ -122,7 +137,6 @@
 }
 
 func (s *Server) run() error {
-	logM := io.MultiWriter(os.Stdout, s.logs)
 	newDir := s.appDir
 	commands := []command{}
 	if !s.agentMode {
@@ -136,13 +150,13 @@
 		if _, err := os.Stat(filepath.Join(newDir, ".git")); err != nil && os.IsNotExist(err) {
 			commit, err := CloneRepositoryBranch(s.repoAddr, s.branch, s.rootDir, s.signer, newDir)
 			if err != nil {
-				fmt.Fprintf(logM, "!!! dodo: Failed to clone repository: %s\n", err)
+				s.log("Failed to clone repository: %s", err)
 				s.status = &Status{
 					Commit: nil,
 				}
 				return err
 			}
-			fmt.Fprintf(logM, "!!! dodo: Successfully cloned repository %s\n", commit.Hash)
+			s.log("Successfully cloned repository %s", commit.Hash)
 			s.status = &Status{
 				Commit:   commit,
 				Commands: []CommandStatus{},
@@ -211,11 +225,11 @@
 			Path:   "/bin/sh",
 			Args:   []string{"/bin/sh", "-c", c.cmd},
 			Env:    append(os.Environ(), c.env...),
-			Stdout: logM,
-			Stderr: logM,
+			Stdout: s.logM,
+			Stderr: s.logM,
 		}
 		cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
-		fmt.Fprintf(logM, "!!! dodo: Running: %s\n", c)
+		s.log("Running: %s", c)
 		s.status.Commands[i].State = "running"
 		if i < len(commands)-1 {
 			if err := cmd.Run(); err != nil {
@@ -244,11 +258,24 @@
 }
 
 type pingReq struct {
-	Id      string  `json:"id"`
-	Service string  `json:"service"`
-	Address string  `json:"address"`
-	Status  *Status `json:"status,omitempty"`
-	Logs    string  `json:"logs"`
+	Id      string    `json:"id"`
+	Service string    `json:"service"`
+	Address string    `json:"address"`
+	Status  *Status   `json:"status,omitempty"`
+	Logs    []LogItem `json:"logs"`
+}
+
+type pingResp struct {
+	Success          bool `json:"success"`
+	LogItemsConsumed int  `json:"logItemsConsumed"`
+}
+
+func min(a, b int) int {
+	if a < b {
+		return a
+	} else {
+		return b
+	}
 }
 
 func (s *Server) pingManager() {
@@ -258,12 +285,14 @@
 			s.pingManager()
 		}()
 	}()
+	logItems := s.logs.Items()
+	logItems = logItems[:min(100, len(logItems))]
 	buf, err := json.Marshal(pingReq{
 		Id:      s.id,
 		Service: s.service,
 		Address: fmt.Sprintf("http://%s:%d", s.self, s.port),
 		Status:  s.status,
-		Logs:    s.logs.Contents(),
+		Logs:    logItems,
 	})
 	if err != nil {
 		return
@@ -273,8 +302,12 @@
 	if err != nil {
 		fmt.Println(err)
 	} else {
-		// check resp code
-		io.Copy(os.Stdout, resp.Body)
+		var r pingResp
+		if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
+			fmt.Printf("%s\n", err)
+		} else if r.Success {
+			s.logs.Trim(r.LogItemsConsumed)
+		}
 	}
 }