blob: 44e86fca82cefceace2906bf9810555e2f56bc1c [file] [log] [blame]
package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
"golang.org/x/crypto/ssh"
)
type CommandState string
type CommandStatus struct {
Command string `json:"command"`
State CommandState `json:"state"`
}
type Status struct {
Commit *Commit `json:"commit"`
Commands []CommandStatus `json:"commands"`
}
type Server struct {
l sync.Locker
hs *http.Server
// TODO(gio): randomly generate string
runId int
agentMode bool
port int
appId string
service string
id string
ready bool
cmd *exec.Cmd
repoAddr string
branch string
rootDir string
signer ssh.Signer
appDir string
runCommands []Command
self string
managerAddr string
logs *Logger
logM io.Writer
currDir string
status *Status
stop bool
}
func NewServer(agentMode bool, port int, appId, service, id, repoAddr, branch, rootDir string, signer ssh.Signer, appDir string, runCommands []Command, self string, manager string) *Server {
logger := NewLogger("0")
logM := io.MultiWriter(os.Stdout, logger)
return &Server{
l: &sync.Mutex{},
hs: nil,
runId: 0,
agentMode: agentMode,
port: port,
ready: false,
appId: appId,
service: service,
id: id,
repoAddr: repoAddr,
branch: branch,
rootDir: rootDir,
signer: signer,
appDir: appDir,
runCommands: runCommands,
self: self,
managerAddr: manager,
logs: logger,
logM: logM,
currDir: "",
status: nil,
stop: false,
}
}
func (s *Server) Start() error {
http.HandleFunc("/update", s.handleUpdate)
http.HandleFunc("/ready", s.handleReady)
http.HandleFunc("/logs", s.handleLogs)
http.HandleFunc("/quitquitquit", s.handleQuit)
if s.managerAddr != "" && s.appId != "" {
go s.pingManager()
}
if err := s.run(); err != nil {
return err
}
hs := &http.Server{
Addr: fmt.Sprintf(":%d", s.port),
}
s.hs = hs
if err := s.hs.ListenAndServe(); err == nil || errors.Is(err, http.ErrServerClosed) {
return nil
} else {
return err
}
}
func (s *Server) Stop() {
fmt.Println("Stopping")
s.l.Lock()
defer s.l.Unlock()
s.stop = true
if err := s.kill(); err != nil {
fmt.Printf("Failed to stop last command: %s\n", err)
} else {
fmt.Println("Stopped last command")
}
if s.hs != nil {
if err := s.hs.Shutdown(context.Background()); err != nil {
fmt.Printf("Failed to stop web server: %s\n", err)
} else {
fmt.Println("Stopped web server")
}
}
}
func (s *Server) handleQuit(w http.ResponseWriter, r *http.Request) {
go s.Stop()
}
func (s *Server) handleLogs(w http.ResponseWriter, r *http.Request) {
if logs, err := s.logs.Contents(); err != nil {
http.Error(w, "not ready", http.StatusInternalServerError)
} else {
fmt.Fprint(w, logs)
}
}
func (s *Server) handleReady(w http.ResponseWriter, r *http.Request) {
s.l.Lock()
defer s.l.Unlock()
if s.ready {
fmt.Fprintln(w, "ok")
} else {
http.Error(w, "not ready", http.StatusInternalServerError)
}
}
func (s *Server) log(tmpl string, args ...any) {
contents := fmt.Sprintf(tmpl, args...)
fmt.Fprintf(s.logs, "\033[38;5;212;136;141mdodo:\033[0;00m %s\n", contents)
}
func (s *Server) handleUpdate(w http.ResponseWriter, r *http.Request) {
s.l.Lock()
s.ready = false
s.l.Unlock()
s.log("Reloading service")
if err := s.run(); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
s.l.Lock()
s.ready = true
s.l.Unlock()
}
type command struct {
cmd string
env []string
}
func (s *Server) run() error {
newDir := s.appDir
commands := []command{}
if !s.agentMode {
var err error
newDir, err = os.MkdirTemp(s.appDir, "code-*")
if err != nil {
return err
}
}
if s.repoAddr != "" {
if _, err := os.Stat(filepath.Join(newDir, ".git")); err != nil && os.IsNotExist(err) {
commit, err := CloneRepositoryBranch(s.repoAddr, s.branch, s.rootDir, s.signer, newDir)
if err != nil {
s.log("Failed to clone repository: %s", err)
s.status = &Status{
Commit: nil,
}
return err
}
s.log("Successfully cloned repository %s", commit.Hash)
s.status = &Status{
Commit: commit,
Commands: []CommandStatus{},
}
} else {
s.status = &Status{
Commands: []CommandStatus{},
}
}
} else {
s.status = &Status{
Commit: nil,
Commands: []CommandStatus{},
}
}
if s.status.Commit != nil {
s.logs.commitHash = s.status.Commit.Hash
} else {
s.logs.commitHash = ""
}
if s.agentMode && s.repoAddr == "" {
if _, err := os.Stat(filepath.Join(newDir, ".git")); err != nil && os.IsNotExist(err) {
commands = append(commands, command{cmd: "git config --global user.name dodo"})
s.status.Commands = append(s.status.Commands, CommandStatus{
Command: commands[len(commands)-1].cmd,
State: "waiting",
})
commands = append(commands, command{cmd: "git config --global user.email dodo@dodo.cloud"})
s.status.Commands = append(s.status.Commands, CommandStatus{
Command: commands[len(commands)-1].cmd,
State: "waiting",
})
commands = append(commands, command{cmd: "git init ."})
s.status.Commands = append(s.status.Commands, CommandStatus{
Command: commands[len(commands)-1].cmd,
State: "waiting",
})
commands = append(commands, command{cmd: "echo \"TODO: Describe project\" > README.md"})
s.status.Commands = append(s.status.Commands, CommandStatus{
Command: commands[len(commands)-1].cmd,
State: "waiting",
})
commands = append(commands, command{cmd: "git add README.md"})
s.status.Commands = append(s.status.Commands, CommandStatus{
Command: commands[len(commands)-1].cmd,
State: "waiting",
})
commands = append(commands, command{cmd: "git commit -m \"init\""})
s.status.Commands = append(s.status.Commands, CommandStatus{
Command: commands[len(commands)-1].cmd,
State: "waiting",
})
}
}
for _, c := range s.runCommands {
args := []string{c.Bin}
args = append(args, c.Args...)
cmd := strings.Join(args, " ")
commands = append(commands, command{cmd, c.Env})
s.status.Commands = append(s.status.Commands, CommandStatus{
Command: cmd,
State: "waiting",
})
}
for i, c := range commands {
if i > 0 {
s.status.Commands[i-1].State = "success"
}
cmd := &exec.Cmd{
Dir: filepath.Join(newDir, s.rootDir),
Path: "/bin/sh",
Args: []string{"/bin/sh", "-c", c.cmd},
Env: append(os.Environ(), c.env...),
Stdout: s.logM,
Stderr: s.logM,
}
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
s.log("Running: %s", c)
s.status.Commands[i].State = "running"
if i < len(commands)-1 {
if err := cmd.Run(); err != nil {
return err
}
} else {
if s.cmd != nil {
// TODO(gio): make this point configurable
if err := s.kill(); err != nil {
return err
}
if s.currDir != "" && !s.agentMode {
if err := os.RemoveAll(s.currDir); err != nil {
return err
}
}
}
if err := cmd.Start(); err != nil {
return err
}
s.cmd = cmd
}
}
s.currDir = newDir
return nil
}
type pingReq struct {
Id string `json:"id"`
Service string `json:"service"`
Address string `json:"address"`
Status *Status `json:"status,omitempty"`
Logs []LogItem `json:"logs"`
}
type pingResp struct {
Success bool `json:"success"`
LogItemsConsumed int `json:"logItemsConsumed"`
}
func min(a, b int) int {
if a < b {
return a
} else {
return b
}
}
func (s *Server) pingManager() {
defer func() {
go func() {
s.l.Lock()
defer s.l.Unlock()
// TODO(gio): Wait until all logs are sent over to the manager.
if !s.stop {
time.Sleep(1 * time.Second)
s.pingManager()
}
}()
}()
logItems := s.logs.Items()
logItems = logItems[:min(100, len(logItems))]
buf, err := json.Marshal(pingReq{
Id: s.id,
Service: s.service,
Address: fmt.Sprintf("http://%s:%d", s.self, s.port),
Status: s.status,
Logs: logItems,
})
if err != nil {
return
}
registerWorkerAddr := fmt.Sprintf("%s/api/project/%s/workers", s.managerAddr, s.appId)
resp, err := http.Post(registerWorkerAddr, "application/json", bytes.NewReader(buf))
if err != nil {
fmt.Println(err)
} else {
var r pingResp
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
fmt.Printf("%s\n", err)
} else if r.Success {
s.logs.Trim(r.LogItemsConsumed)
}
}
}
func (s *Server) kill() error {
if s.cmd == nil {
return nil
}
err := syscall.Kill(-s.cmd.Process.Pid, syscall.SIGKILL)
if err != nil {
return err
}
// NOTE(gio): No need to check err as we just killed the process
s.cmd.Wait()
s.cmd = nil
return nil
}