AppRunner: Automatically annotate logs
Change-Id: I5e614fc1e841e183ac649758972428ae55162a67
diff --git a/apps/app-runner/Makefile b/apps/app-runner/Makefile
index ef1c483..07cc97f 100644
--- a/apps/app-runner/Makefile
+++ b/apps/app-runner/Makefile
@@ -22,6 +22,14 @@
build: clean
/usr/local/go/bin/go build -o app-runner *.go
+test: export CGO_ENABLED=0
+test:
+ /usr/local/go/bin/go test ./...
+
+test: export CGO_ENABLED=0
+testv:
+ /usr/local/go/bin/go test -v ./...
+
build_arm64: export CGO_ENABLED=0
build_arm64: export GO111MODULE=on
build_arm64: export GOOS=linux
diff --git a/apps/app-runner/log.go b/apps/app-runner/log.go
index 2e5f524..084e3d1 100644
--- a/apps/app-runner/log.go
+++ b/apps/app-runner/log.go
@@ -1,24 +1,97 @@
package main
import (
+ "bytes"
"strings"
"sync"
+ "time"
)
-type Log struct {
- l sync.Mutex
- d strings.Builder
+type LogItem struct {
+ RunId string `json:"runId"`
+ TimestampMilli int64 `json:"timestampMilli"`
+ Commit string `json:"commit,omitempty"`
+ Contents []byte `json:"contents"`
}
-func (l *Log) Write(p []byte) (n int, err error) {
+type Logger struct {
+ l sync.Mutex
+ runId string
+ commitHash string
+ items []LogItem
+ curr LogItem
+}
+
+func NewLogger(runId string) *Logger {
+ return &Logger{
+ l: sync.Mutex{},
+ runId: runId,
+ items: []LogItem{},
+ curr: LogItem{
+ RunId: runId,
+ Contents: []byte{},
+ },
+ }
+}
+
+func (l *Logger) Write(p []byte) (n int, err error) {
l.l.Lock()
defer l.l.Unlock()
+ cnt := 0
// TODO(gio): Reset s.logs periodically
- return l.d.Write(p)
+ for len(p) > 0 {
+ pos := bytes.Index(p, []byte("\n"))
+ if pos != -1 {
+ if l.curr.TimestampMilli == 0 {
+ l.curr.TimestampMilli = time.Now().UnixMilli()
+ }
+ l.curr.Contents = append(l.curr.Contents, p[:pos]...)
+ l.items = append(l.items, l.curr)
+ l.curr = LogItem{
+ RunId: l.runId,
+ Contents: []byte{},
+ }
+ p = p[pos+len([]byte("\n")):]
+ cnt += pos + len([]byte("\n"))
+ } else {
+ if l.curr.TimestampMilli == 0 {
+ l.curr.TimestampMilli = time.Now().UnixMilli()
+ }
+ l.curr.Contents = append(l.curr.Contents, p...)
+ cnt += len(p)
+ p = []byte{}
+ }
+ }
+ return cnt, nil
}
-func (l *Log) Contents() string {
+func (l *Logger) Items() []LogItem {
l.l.Lock()
defer l.l.Unlock()
- return l.d.String()
+ ret := []LogItem{}
+ for _, i := range l.items {
+ ret = append(ret, i)
+ }
+ return ret
+}
+
+func (l *Logger) Trim(n int) {
+ l.l.Lock()
+ defer l.l.Unlock()
+ l.items = l.items[n:]
+}
+
+func (l *Logger) Contents() (string, error) {
+ l.l.Lock()
+ defer l.l.Unlock()
+ var ret strings.Builder
+ for _, i := range l.items {
+ if _, err := ret.Write(i.Contents); err != nil {
+ return "", err
+ }
+ if _, err := ret.WriteString("\n"); err != nil {
+ return "", err
+ }
+ }
+ return ret.String(), nil
}
diff --git a/apps/app-runner/log_test.go b/apps/app-runner/log_test.go
new file mode 100644
index 0000000..777cf8f
--- /dev/null
+++ b/apps/app-runner/log_test.go
@@ -0,0 +1,38 @@
+package main
+
+import (
+ "fmt"
+ "testing"
+)
+
+func TestLoggerWriteFullLine(t *testing.T) {
+ l := NewLogger("foo")
+ fmt.Fprintf(l, "hi\n")
+ c, err := l.Contents()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if c != "hi\n" {
+ t.Fatal(c)
+ }
+}
+
+func TestLoggerWritePartialLine(t *testing.T) {
+ l := NewLogger("foo")
+ fmt.Fprintf(l, "hi\nfoo")
+ c, err := l.Contents()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if c != "hi\n" {
+ t.Fatal(c)
+ }
+ fmt.Fprintf(l, "bar\n")
+ c, err = l.Contents()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if c != "hi\nfoobar\n" {
+ t.Fatal(c)
+ }
+}
diff --git a/apps/app-runner/server.go b/apps/app-runner/server.go
index 6efd222..7c74575 100644
--- a/apps/app-runner/server.go
+++ b/apps/app-runner/server.go
@@ -30,7 +30,9 @@
}
type Server struct {
- l sync.Locker
+ l sync.Locker
+ // TODO(gio): randomly generate string
+ runId int
agentMode bool
port int
appId string
@@ -46,14 +48,18 @@
runCommands []Command
self string
managerAddr string
- logs *Log
+ logs *Logger
+ logM io.Writer
currDir string
status *Status
}
func NewServer(agentMode bool, port int, appId, service, id, repoAddr, branch, rootDir string, signer ssh.Signer, appDir string, runCommands []Command, self string, manager string) *Server {
+ logger := NewLogger("0")
+ logM := io.MultiWriter(os.Stdout, logger)
return &Server{
l: &sync.Mutex{},
+ runId: 0,
agentMode: agentMode,
port: port,
ready: false,
@@ -68,7 +74,8 @@
runCommands: runCommands,
self: self,
managerAddr: manager,
- logs: &Log{},
+ logs: logger,
+ logM: logM,
currDir: "",
status: nil,
}
@@ -88,7 +95,11 @@
}
func (s *Server) handleLogs(w http.ResponseWriter, r *http.Request) {
- fmt.Fprint(w, s.logs.Contents())
+ if logs, err := s.logs.Contents(); err != nil {
+ http.Error(w, "not ready", http.StatusInternalServerError)
+ } else {
+ fmt.Fprint(w, logs)
+ }
}
func (s *Server) handleReady(w http.ResponseWriter, r *http.Request) {
@@ -101,12 +112,16 @@
}
}
+func (s *Server) log(tmpl string, args ...any) {
+ contents := fmt.Sprintf(tmpl, args...)
+ fmt.Fprintf(s.logs, "\033[38;5;212;136;141mdodo:\033[0;00m %s\n", contents)
+}
+
func (s *Server) handleUpdate(w http.ResponseWriter, r *http.Request) {
- fmt.Println("update")
s.l.Lock()
s.ready = false
s.l.Unlock()
- fmt.Fprintf(s.logs, "!!! dodo: Reloading service\n")
+ s.log("Reloading service")
if err := s.run(); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -122,7 +137,6 @@
}
func (s *Server) run() error {
- logM := io.MultiWriter(os.Stdout, s.logs)
newDir := s.appDir
commands := []command{}
if !s.agentMode {
@@ -136,13 +150,13 @@
if _, err := os.Stat(filepath.Join(newDir, ".git")); err != nil && os.IsNotExist(err) {
commit, err := CloneRepositoryBranch(s.repoAddr, s.branch, s.rootDir, s.signer, newDir)
if err != nil {
- fmt.Fprintf(logM, "!!! dodo: Failed to clone repository: %s\n", err)
+ s.log("Failed to clone repository: %s", err)
s.status = &Status{
Commit: nil,
}
return err
}
- fmt.Fprintf(logM, "!!! dodo: Successfully cloned repository %s\n", commit.Hash)
+ s.log("Successfully cloned repository %s", commit.Hash)
s.status = &Status{
Commit: commit,
Commands: []CommandStatus{},
@@ -211,11 +225,11 @@
Path: "/bin/sh",
Args: []string{"/bin/sh", "-c", c.cmd},
Env: append(os.Environ(), c.env...),
- Stdout: logM,
- Stderr: logM,
+ Stdout: s.logM,
+ Stderr: s.logM,
}
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
- fmt.Fprintf(logM, "!!! dodo: Running: %s\n", c)
+ s.log("Running: %s", c)
s.status.Commands[i].State = "running"
if i < len(commands)-1 {
if err := cmd.Run(); err != nil {
@@ -244,11 +258,24 @@
}
type pingReq struct {
- Id string `json:"id"`
- Service string `json:"service"`
- Address string `json:"address"`
- Status *Status `json:"status,omitempty"`
- Logs string `json:"logs"`
+ Id string `json:"id"`
+ Service string `json:"service"`
+ Address string `json:"address"`
+ Status *Status `json:"status,omitempty"`
+ Logs []LogItem `json:"logs"`
+}
+
+type pingResp struct {
+ Success bool `json:"success"`
+ LogItemsConsumed int `json:"logItemsConsumed"`
+}
+
+func min(a, b int) int {
+ if a < b {
+ return a
+ } else {
+ return b
+ }
}
func (s *Server) pingManager() {
@@ -258,12 +285,14 @@
s.pingManager()
}()
}()
+ logItems := s.logs.Items()
+ logItems = logItems[:min(100, len(logItems))]
buf, err := json.Marshal(pingReq{
Id: s.id,
Service: s.service,
Address: fmt.Sprintf("http://%s:%d", s.self, s.port),
Status: s.status,
- Logs: s.logs.Contents(),
+ Logs: logItems,
})
if err != nil {
return
@@ -273,8 +302,12 @@
if err != nil {
fmt.Println(err)
} else {
- // check resp code
- io.Copy(os.Stdout, resp.Body)
+ var r pingResp
+ if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
+ fmt.Printf("%s\n", err)
+ } else if r.Success {
+ s.logs.Trim(r.LogItemsConsumed)
+ }
}
}