blob: 8c3d9bf95bd3bd8353e53db2ddcc160d6b5dcb7b [file] [log] [blame]
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"sync"
"syscall"
"time"
"golang.org/x/crypto/ssh"
)
type Server struct {
l sync.Locker
port int
appId string
ready bool
cmd *exec.Cmd
repoAddr string
branch string
rootDir string
signer ssh.Signer
appDir string
runCommands []Command
self string
managerAddr string
logs *Log
currDir string
}
func NewServer(port int, appId string, repoAddr, branch, rootDir string, signer ssh.Signer, appDir string, runCommands []Command, self string, manager string) *Server {
return &Server{
l: &sync.Mutex{},
port: port,
ready: false,
appId: appId,
repoAddr: repoAddr,
branch: branch,
rootDir: rootDir,
signer: signer,
appDir: appDir,
runCommands: runCommands,
self: self,
managerAddr: manager,
logs: &Log{},
currDir: "",
}
}
func (s *Server) Start() error {
http.HandleFunc("/update", s.handleUpdate)
http.HandleFunc("/ready", s.handleReady)
http.HandleFunc("/logs", s.handleLogs)
if err := s.run(); err != nil {
return err
}
go s.pingManager()
return http.ListenAndServe(fmt.Sprintf(":%d", s.port), nil)
}
func (s *Server) handleLogs(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, s.logs.Contents())
}
func (s *Server) handleReady(w http.ResponseWriter, r *http.Request) {
s.l.Lock()
defer s.l.Unlock()
if s.ready {
fmt.Fprintln(w, "ok")
} else {
http.Error(w, "not ready", http.StatusInternalServerError)
}
}
func (s *Server) handleUpdate(w http.ResponseWriter, r *http.Request) {
fmt.Println("update")
s.l.Lock()
s.ready = false
s.l.Unlock()
if err := s.run(); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
s.l.Lock()
s.ready = true
s.l.Unlock()
}
func (s *Server) run() error {
newDir, err := os.MkdirTemp(s.appDir, "code-*")
if err != nil {
return err
}
if err := CloneRepositoryBranch(s.repoAddr, s.branch, s.rootDir, s.signer, newDir); err != nil {
return err
}
logM := io.MultiWriter(os.Stdout, s.logs)
for i, c := range s.runCommands {
args := []string{c.Bin}
args = append(args, c.Args...)
cmd := &exec.Cmd{
Dir: filepath.Join(newDir, s.rootDir),
Path: c.Bin,
Args: args,
Env: append(os.Environ(), c.Env...),
Stdout: logM,
Stderr: logM,
}
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
fmt.Printf("Running: %s %s\n", c.Bin, c.Args)
if i < len(s.runCommands)-1 {
if err := cmd.Run(); err != nil {
return err
}
} else {
if s.cmd != nil {
// TODO(gio): make this point configurable
if err := s.kill(); err != nil {
return err
}
if err := os.RemoveAll(s.currDir); err != nil {
return err
}
}
if err := cmd.Start(); err != nil {
return err
}
s.cmd = cmd
}
}
s.currDir = newDir
return nil
}
type pingReq struct {
Address string `json:"address"`
Logs string `json:"logs"`
}
func (s *Server) pingManager() {
// TODO(gio): do we need runnert -> manager communication?
return
defer func() {
go func() {
time.Sleep(5 * time.Second)
s.pingManager()
}()
}()
buf, err := json.Marshal(pingReq{
Address: fmt.Sprintf("%s:%d", s.self, s.port),
Logs: s.logs.Contents(),
})
if err != nil {
return
}
registerWorkerAddr := fmt.Sprintf("%s/api/apps/%s/workers", s.managerAddr, s.appId)
http.Post(registerWorkerAddr, "application/json", bytes.NewReader(buf))
}
func (s *Server) kill() error {
if s.cmd == nil {
return nil
}
err := syscall.Kill(-s.cmd.Process.Pid, syscall.SIGKILL)
if err != nil {
return err
}
// NOTE(gio): No need to check err as we just killed the process
s.cmd.Wait()
s.cmd = nil
return nil
}