blob: ffa710c3ca6c1fea9e10dea01f5d4f70bda7468e [file] [log] [blame]
package appmanager
import (
"bytes"
"context"
"embed"
"encoding/json"
"errors"
"fmt"
"html/template"
"net"
"net/http"
"strconv"
"strings"
"sync"
"time"
"github.com/Masterminds/sprig/v3"
"github.com/gorilla/mux"
"github.com/giolekva/pcloud/core/installer"
"github.com/giolekva/pcloud/core/installer/cluster"
"github.com/giolekva/pcloud/core/installer/server"
"github.com/giolekva/pcloud/core/installer/soft"
"github.com/giolekva/pcloud/core/installer/status"
"github.com/giolekva/pcloud/core/installer/tasks"
)
//go:embed templates/*
var templates embed.FS
//go:embed static/*
var staticAssets embed.FS
type taskForward struct {
task tasks.Task
redirectTo string
id int
}
type Server struct {
l sync.Locker
port int
ssClient soft.Client
repo soft.RepoIO
m *installer.AppManager
r installer.AppRepository
fr installer.AppRepository
reconciler *tasks.FixedReconciler
h status.ResourceMonitor
im *status.InstanceMonitor
cnc installer.ClusterNetworkConfigurator
vpnAPIClient installer.VPNAPIClient
tasks map[string]*taskForward
tmpl tmplts
}
type tmplts struct {
index *template.Template
app *template.Template
allClusters *template.Template
cluster *template.Template
task *template.Template
}
func parseTemplates(fs embed.FS) (tmplts, error) {
base, err := template.New("base.html").Funcs(template.FuncMap(sprig.FuncMap())).ParseFS(fs, "templates/base.html")
if err != nil {
return tmplts{}, err
}
parse := func(path string) (*template.Template, error) {
if b, err := base.Clone(); err != nil {
return nil, err
} else {
return b.ParseFS(fs, path)
}
}
index, err := parse("templates/index.html")
if err != nil {
return tmplts{}, err
}
app, err := parse("templates/app.html")
if err != nil {
return tmplts{}, err
}
allClusters, err := parse("templates/all-clusters.html")
if err != nil {
return tmplts{}, err
}
cluster, err := parse("templates/cluster.html")
if err != nil {
return tmplts{}, err
}
task, err := parse("templates/task.html")
if err != nil {
return tmplts{}, err
}
return tmplts{index, app, allClusters, cluster, task}, nil
}
func NewServer(
port int,
ssClient soft.Client,
repo soft.RepoIO,
m *installer.AppManager,
r installer.AppRepository,
fr installer.AppRepository,
reconciler *tasks.FixedReconciler,
h status.ResourceMonitor,
im *status.InstanceMonitor,
cnc installer.ClusterNetworkConfigurator,
vpnAPIClient installer.VPNAPIClient,
) (*Server, error) {
tmpl, err := parseTemplates(templates)
if err != nil {
return nil, err
}
return &Server{
l: &sync.Mutex{},
port: port,
ssClient: ssClient,
repo: repo,
m: m,
r: r,
fr: fr,
reconciler: reconciler,
h: h,
im: im,
cnc: cnc,
vpnAPIClient: vpnAPIClient,
tasks: make(map[string]*taskForward),
tmpl: tmpl,
}, nil
}
func (s *Server) Start() error {
r := mux.NewRouter()
r.PathPrefix("/static/").Handler(server.NewCachingHandler(http.FileServer(http.FS(staticAssets))))
r.HandleFunc("/api/networks", s.handleNetworks).Methods(http.MethodGet)
r.HandleFunc("/api/clusters", s.handleClusters).Methods(http.MethodGet)
r.HandleFunc("/api/proxy/add", s.handleProxyAdd).Methods(http.MethodPost)
r.HandleFunc("/api/proxy/remove", s.handleProxyRemove).Methods(http.MethodPost)
r.HandleFunc("/api/app-repo", s.handleAppRepo)
r.HandleFunc("/api/app/{slug}/install", s.handleAppInstall).Methods(http.MethodPost)
r.HandleFunc("/api/app/{slug}", s.handleApp).Methods(http.MethodGet)
r.HandleFunc("/api/instance/{slug}", s.handleInstance).Methods(http.MethodGet)
r.HandleFunc("/api/instance/{slug}/update", s.handleAppUpdate).Methods(http.MethodPost)
r.HandleFunc("/api/instance/{slug}/remove", s.handleAppRemove).Methods(http.MethodPost)
r.HandleFunc("/api/instance/{instanceId}/status", s.handleInstanceStatusAPI).Methods(http.MethodGet)
r.HandleFunc("/api/dodo-app/{instanceId}", s.handleDodoAppUpdate).Methods(http.MethodPut)
r.HandleFunc("/api/dodo-app", s.handleDodoAppInstall).Methods(http.MethodPost)
r.HandleFunc("/clusters/{cluster}/servers/{server}/remove", s.handleClusterRemoveServer).Methods(http.MethodPost)
r.HandleFunc("/clusters/{cluster}/servers", s.handleClusterAddServer).Methods(http.MethodPost)
r.HandleFunc("/clusters/{name}", s.handleCluster).Methods(http.MethodGet)
r.HandleFunc("/clusters/{name}/setup-storage", s.handleClusterSetupStorage).Methods(http.MethodPost)
r.HandleFunc("/clusters/{name}/remove", s.handleRemoveCluster).Methods(http.MethodPost)
r.HandleFunc("/clusters", s.handleAllClusters).Methods(http.MethodGet)
r.HandleFunc("/clusters", s.handleCreateCluster).Methods(http.MethodPost)
r.HandleFunc("/app/{slug}", s.handleAppUI).Methods(http.MethodGet)
r.HandleFunc("/instance/{slug}", s.handleInstanceUI).Methods(http.MethodGet)
r.HandleFunc("/tasks/{slug}", s.handleTaskStatus).Methods(http.MethodGet)
r.HandleFunc("/{pageType}", s.handleAppsList).Methods(http.MethodGet)
r.HandleFunc("/", s.handleAppsList).Methods(http.MethodGet)
fmt.Printf("Starting HTTP server on port: %d\n", s.port)
return http.ListenAndServe(fmt.Sprintf(":%d", s.port), r)
}
type dodoAppInstallReq struct {
Config map[string]any `json:"config"`
}
type dodoAppInstallResp struct {
Id string `json:"id"`
DeployKey string `json:"deployKey"`
Access []installer.Access `json:"access"`
}
type dodoAppRendered struct {
Input struct {
Key struct {
Public string `json:"public"`
} `json:"key"`
} `json:"input"`
}
func (s *Server) handleDodoAppInstall(w http.ResponseWriter, r *http.Request) {
s.l.Lock()
defer s.l.Unlock()
var req dodoAppInstallReq
// TODO(gio): validate that no internal fields are overridden by request
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
clusters, err := s.m.GetClusters()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
req.Config["clusters"] = installer.ToAccessConfigs(clusters)
var cfg bytes.Buffer
if err := json.NewEncoder(&cfg).Encode(req.Config); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
app, err := installer.NewDodoApp(cfg.Bytes())
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if instanceId, rr, err := s.install(app, map[string]any{}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
} else {
outs, err := status.DecodeResourceOuts(rr.RenderedRaw)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
s.im.Monitor(instanceId, outs)
var cfg dodoAppRendered
if err := json.NewDecoder(bytes.NewReader(rr.RenderedRaw)).Decode(&cfg); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
if err := json.NewEncoder(w).Encode(dodoAppInstallResp{
Id: instanceId,
DeployKey: cfg.Input.Key.Public,
Access: rr.Access,
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
}
func (s *Server) handleDodoAppUpdate(w http.ResponseWriter, r *http.Request) {
s.l.Lock()
defer s.l.Unlock()
instanceId, ok := mux.Vars(r)["instanceId"]
if !ok {
http.Error(w, "missing instance id", http.StatusBadRequest)
}
if _, ok := s.tasks[instanceId]; ok {
http.Error(w, "task in progress", http.StatusTooEarly)
return
}
var req dodoAppInstallReq
// TODO(gio): validate that no internal fields are overridden by request
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
clusters, err := s.m.GetClusters()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
req.Config["clusters"] = installer.ToAccessConfigs(clusters)
var cfg bytes.Buffer
if err := json.NewEncoder(&cfg).Encode(req.Config); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
overrides := installer.CueAppData{
"app.cue": cfg.Bytes(),
}
rr, err := s.m.Update(instanceId, nil, overrides)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
outs, err := status.DecodeResourceOuts(rr.RenderedRaw)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
s.im.Monitor(instanceId, outs)
t := tasks.NewInstallTask(s.h, func() (installer.ReleaseResources, error) {
if err == nil {
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
go s.reconciler.Reconcile(ctx)
}
return rr, err
})
if _, ok := s.tasks[instanceId]; ok {
panic("MUST NOT REACH!")
}
s.tasks[instanceId] = &taskForward{t, fmt.Sprintf("/instance/%s", instanceId), 0}
t.OnDone(s.cleanTask(instanceId, 0))
go t.Start()
var rend dodoAppRendered
if err := json.NewDecoder(bytes.NewReader(rr.RenderedRaw)).Decode(&rend); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
if err := json.NewEncoder(w).Encode(dodoAppInstallResp{
Id: instanceId,
DeployKey: rend.Input.Key.Public,
Access: rr.Access,
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
func (s *Server) handleNetworks(w http.ResponseWriter, r *http.Request) {
env, err := s.m.Config()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
networks, err := s.m.CreateNetworks(env)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := json.NewEncoder(w).Encode(networks); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (s *Server) handleClusters(w http.ResponseWriter, r *http.Request) {
clusters, err := s.m.GetClusters()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := json.NewEncoder(w).Encode(installer.ToAccessConfigs(clusters)); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
type proxyPair struct {
From string `json:"from"`
To string `json:"to"`
}
func (s *Server) handleProxyAdd(w http.ResponseWriter, r *http.Request) {
var req proxyPair
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := s.cnc.AddIngressProxy(req.From, req.To); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (s *Server) handleProxyRemove(w http.ResponseWriter, r *http.Request) {
var req proxyPair
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := s.cnc.RemoveIngressProxy(req.From, req.To); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
type app struct {
Name string `json:"name"`
Icon template.HTML `json:"icon"`
ShortDescription string `json:"shortDescription"`
Slug string `json:"slug"`
Instances []installer.AppInstanceConfig `json:"instances,omitempty"`
}
func (s *Server) handleAppRepo(w http.ResponseWriter, r *http.Request) {
all, err := s.r.GetAll()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp := make([]app, len(all))
for i, a := range all {
resp[i] = app{a.Name(), a.Icon(), a.Description(), a.Slug(), nil}
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (s *Server) handleApp(w http.ResponseWriter, r *http.Request) {
slug, ok := mux.Vars(r)["slug"]
if !ok {
http.Error(w, "empty slug", http.StatusBadRequest)
return
}
a, err := s.r.Find(slug)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
instances, err := s.m.GetAllAppInstances(slug)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp := app{a.Name(), a.Icon(), a.Description(), a.Slug(), instances}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (s *Server) handleInstance(w http.ResponseWriter, r *http.Request) {
slug, ok := mux.Vars(r)["slug"]
if !ok {
http.Error(w, "empty slug", http.StatusBadRequest)
return
}
instance, err := s.m.GetInstance(slug)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
a, err := s.r.Find(instance.AppId)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp := app{a.Name(), a.Icon(), a.Description(), a.Slug(), []installer.AppInstanceConfig{*instance}}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (s *Server) install(app installer.EnvApp, values map[string]any) (string, installer.ReleaseResources, error) {
env, err := s.m.Config()
if err != nil {
return "", installer.ReleaseResources{}, err
}
suffixGen := installer.NewFixedLengthRandomSuffixGenerator(3)
suffix, err := suffixGen.Generate()
if err != nil {
return "", installer.ReleaseResources{}, err
}
instanceId := app.Slug() + suffix
appDir := fmt.Sprintf("/apps/%s", instanceId)
namespace := fmt.Sprintf("%s%s%s", env.NamespacePrefix, app.Namespace(), suffix)
rr, err := s.m.Install(app, instanceId, appDir, namespace, values)
t := tasks.NewInstallTask(s.h, func() (installer.ReleaseResources, error) {
if err == nil {
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
go s.reconciler.Reconcile(ctx)
}
return rr, err
})
if _, ok := s.tasks[instanceId]; ok {
panic("MUST NOT REACH!")
}
s.tasks[instanceId] = &taskForward{t, fmt.Sprintf("/instance/%s", instanceId), 0}
t.OnDone(s.cleanTask(instanceId, 0))
go t.Start()
return instanceId, rr, nil
}
func (s *Server) handleAppInstall(w http.ResponseWriter, r *http.Request) {
s.l.Lock()
defer s.l.Unlock()
slug, ok := mux.Vars(r)["slug"]
if !ok {
http.Error(w, "empty slug", http.StatusBadRequest)
return
}
var values map[string]any
if err := json.NewDecoder(r.Body).Decode(&values); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
app, err := installer.FindEnvApp(s.r, slug)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if instanceId, _, err := s.install(app, values); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
} else {
fmt.Fprintf(w, "/tasks/%s", instanceId)
}
}
func (s *Server) handleAppUpdate(w http.ResponseWriter, r *http.Request) {
s.l.Lock()
defer s.l.Unlock()
slug, ok := mux.Vars(r)["slug"]
if !ok {
http.Error(w, "empty slug", http.StatusBadRequest)
return
}
var values map[string]any
if err := json.NewDecoder(r.Body).Decode(&values); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
tid := 0
if t, ok := s.tasks[slug]; ok {
if t.task != nil {
http.Error(w, "Update already in progress", http.StatusBadRequest)
return
}
tid = t.id + 1
}
rr, err := s.m.Update(slug, values, nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
ctx, _ := context.WithTimeout(context.Background(), 2*time.Minute)
go s.reconciler.Reconcile(ctx)
t := tasks.NewMonitorRelease(s.h, rr)
t.OnDone(s.cleanTask(slug, tid))
s.tasks[slug] = &taskForward{t, fmt.Sprintf("/instance/%s", slug), tid}
go t.Start()
fmt.Printf("Created task for %s\n", slug)
if _, err := fmt.Fprintf(w, "/tasks/%s", slug); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (s *Server) handleAppRemove(w http.ResponseWriter, r *http.Request) {
slug, ok := mux.Vars(r)["slug"]
if !ok {
http.Error(w, "empty slug", http.StatusBadRequest)
return
}
if err := s.m.Remove(slug); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
ctx, _ := context.WithTimeout(context.Background(), 2*time.Minute)
go s.reconciler.Reconcile(ctx)
if _, err := fmt.Fprint(w, "/"); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
type PageData struct {
Apps []app
CurrentPage string
SearchTarget string
SearchValue string
}
func (s *Server) handleAppsList(w http.ResponseWriter, r *http.Request) {
pageType := mux.Vars(r)["pageType"]
if pageType == "" {
pageType = "all"
}
searchQuery := r.FormValue("query")
apps, err := s.r.Filter(searchQuery)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp := make([]app, 0)
for _, a := range apps {
instances, err := s.m.GetAllAppInstances(a.Slug())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
switch pageType {
case "installed":
if len(instances) != 0 {
resp = append(resp, app{a.Name(), a.Icon(), a.Description(), a.Slug(), instances})
}
case "not-installed":
if len(instances) == 0 {
resp = append(resp, app{a.Name(), a.Icon(), a.Description(), a.Slug(), nil})
}
default:
resp = append(resp, app{a.Name(), a.Icon(), a.Description(), a.Slug(), instances})
}
}
data := PageData{
Apps: resp,
CurrentPage: pageType,
SearchTarget: pageType,
SearchValue: searchQuery,
}
if err := s.tmpl.index.Execute(w, data); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
type appPageData struct {
App installer.EnvApp
Instance *installer.AppInstanceConfig
Instances []installer.AppInstanceConfig
AvailableNetworks []installer.Network
AvailableClusters []cluster.State
CurrentPage string
}
func (s *Server) handleAppUI(w http.ResponseWriter, r *http.Request) {
global, err := s.m.Config()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
slug, ok := mux.Vars(r)["slug"]
if !ok {
http.Error(w, "empty slug", http.StatusBadRequest)
return
}
a, err := installer.FindEnvApp(s.r, slug)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
instances, err := s.m.GetAllAppInstances(slug)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
networks, err := s.m.CreateNetworks(global)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
clusters, err := s.m.GetClusters()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
data := appPageData{
App: a,
Instances: instances,
AvailableNetworks: networks,
AvailableClusters: clusters,
CurrentPage: a.Name(),
}
if err := s.tmpl.app.Execute(w, data); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (s *Server) handleInstanceUI(w http.ResponseWriter, r *http.Request) {
s.l.Lock()
defer s.l.Unlock()
global, err := s.m.Config()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
slug, ok := mux.Vars(r)["slug"]
if !ok {
http.Error(w, "empty slug", http.StatusBadRequest)
return
}
if t, ok := s.tasks[slug]; ok && t.task != nil {
http.Redirect(w, r, fmt.Sprintf("/tasks/%s", slug), http.StatusSeeOther)
return
}
instance, err := s.m.GetInstance(slug)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
a, err := s.m.GetInstanceApp(instance.Id, nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
instances, err := s.m.GetAllAppInstances(a.Slug())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
networks, err := s.m.CreateNetworks(global)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
clusters, err := s.m.GetClusters()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
data := appPageData{
App: a,
Instance: instance,
Instances: instances,
AvailableNetworks: networks,
AvailableClusters: clusters,
CurrentPage: slug,
}
if err := s.tmpl.app.Execute(w, data); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
type taskStatusData struct {
CurrentPage string
Task tasks.Task
}
func (s *Server) handleTaskStatus(w http.ResponseWriter, r *http.Request) {
s.l.Lock()
defer s.l.Unlock()
slug, ok := mux.Vars(r)["slug"]
if !ok {
http.Error(w, "empty slug", http.StatusBadRequest)
return
}
t, ok := s.tasks[slug]
if !ok {
http.Error(w, "task not found", http.StatusInternalServerError)
return
}
if ok && t.task == nil {
http.Redirect(w, r, t.redirectTo, http.StatusSeeOther)
return
}
data := taskStatusData{
CurrentPage: "",
Task: t.task,
}
if err := s.tmpl.task.Execute(w, data); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
type resourceStatus struct {
Type string `json:"type"`
Name string `json:"name"`
Status string `json:"status"`
}
func extractResources(t tasks.Task) []resourceStatus {
var ret []resourceStatus
if t.Resource() != nil {
ret = append(ret, resourceStatus{
Type: t.Resource().Type,
Name: t.Resource().Name,
Status: tasks.StatusString(t.Status()),
})
}
for _, st := range t.Subtasks() {
ret = append(ret, extractResources(st)...)
}
return ret
}
func (s *Server) handleInstanceStatusAPI(w http.ResponseWriter, r *http.Request) {
s.l.Lock()
defer s.l.Unlock()
instanceId, ok := mux.Vars(r)["instanceId"]
if !ok {
http.Error(w, "empty slug", http.StatusBadRequest)
return
}
statuses, err := s.im.Get(instanceId)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
ret := []resourceStatus{}
for r, s := range statuses {
ret = append(ret, resourceStatus{
Type: r.Type,
Name: r.Name,
Status: status.StatusString(s),
})
}
json.NewEncoder(w).Encode(ret)
}
type clustersData struct {
CurrentPage string
Clusters []cluster.State
}
func (s *Server) handleAllClusters(w http.ResponseWriter, r *http.Request) {
clusters, err := s.m.GetClusters()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
data := clustersData{
"clusters",
clusters,
}
if err := s.tmpl.allClusters.Execute(w, data); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
type clusterData struct {
CurrentPage string
Cluster cluster.State
}
func (s *Server) handleCluster(w http.ResponseWriter, r *http.Request) {
name, ok := mux.Vars(r)["name"]
if !ok {
http.Error(w, "empty name", http.StatusBadRequest)
return
}
m, err := s.getClusterManager(name)
if err != nil {
if errors.Is(err, installer.ErrorNotFound) {
http.Error(w, "not found", http.StatusNotFound)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return
}
data := clusterData{
"clusters",
m.State(),
}
if err := s.tmpl.cluster.Execute(w, data); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (s *Server) handleClusterSetupStorage(w http.ResponseWriter, r *http.Request) {
cName, ok := mux.Vars(r)["name"]
if !ok {
http.Error(w, "empty name", http.StatusBadRequest)
return
}
tid := 0
if t, ok := s.tasks[cName]; ok {
if t.task != nil {
http.Error(w, "cluster task in progress", http.StatusLocked)
return
}
tid = t.id + 1
}
m, err := s.getClusterManager(cName)
if err != nil {
if errors.Is(err, installer.ErrorNotFound) {
http.Error(w, "not found", http.StatusNotFound)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return
}
task := tasks.NewClusterSetupTask(m, s.setupRemoteClusterStorage(), s.repo, fmt.Sprintf("cluster %s: setting up storage", m.State().Name))
task.OnDone(s.cleanTask(cName, tid))
go task.Start()
s.tasks[cName] = &taskForward{task, fmt.Sprintf("/clusters/%s", cName), tid}
http.Redirect(w, r, fmt.Sprintf("/tasks/%s", cName), http.StatusSeeOther)
}
func (s *Server) handleClusterRemoveServer(w http.ResponseWriter, r *http.Request) {
s.l.Lock()
defer s.l.Unlock()
cName, ok := mux.Vars(r)["cluster"]
if !ok {
http.Error(w, "empty name", http.StatusBadRequest)
return
}
tid := 0
if t, ok := s.tasks[cName]; ok {
if t.task != nil {
http.Error(w, "cluster task in progress", http.StatusLocked)
return
}
tid = t.id + 1
}
sName, ok := mux.Vars(r)["server"]
if !ok {
http.Error(w, "empty name", http.StatusBadRequest)
return
}
m, err := s.getClusterManager(cName)
if err != nil {
if errors.Is(err, installer.ErrorNotFound) {
http.Error(w, "not found", http.StatusNotFound)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return
}
task := tasks.NewClusterRemoveServerTask(m, sName, s.repo)
task.OnDone(s.cleanTask(cName, tid))
go task.Start()
s.tasks[cName] = &taskForward{task, fmt.Sprintf("/clusters/%s", cName), tid}
http.Redirect(w, r, fmt.Sprintf("/tasks/%s", cName), http.StatusSeeOther)
}
func (s *Server) getClusterManager(cName string) (cluster.Manager, error) {
clusters, err := s.m.GetClusters()
if err != nil {
return nil, err
}
var c *cluster.State
for _, i := range clusters {
if i.Name == cName {
c = &i
break
}
}
if c == nil {
return nil, installer.ErrorNotFound
}
return cluster.RestoreKubeManager(*c)
}
func (s *Server) handleClusterAddServer(w http.ResponseWriter, r *http.Request) {
s.l.Lock()
defer s.l.Unlock()
cName, ok := mux.Vars(r)["cluster"]
if !ok {
http.Error(w, "empty name", http.StatusBadRequest)
return
}
tid := 0
if t, ok := s.tasks[cName]; ok {
if t.task != nil {
http.Error(w, "cluster task in progress", http.StatusLocked)
return
}
tid = t.id + 1
}
m, err := s.getClusterManager(cName)
if err != nil {
if errors.Is(err, installer.ErrorNotFound) {
http.Error(w, "not found", http.StatusNotFound)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return
}
t := r.PostFormValue("type")
ip := net.ParseIP(strings.TrimSpace(r.PostFormValue("ip")))
if ip == nil {
http.Error(w, "invalid ip", http.StatusBadRequest)
return
}
port := 22
if p := r.PostFormValue("port"); p != "" {
port, err = strconv.Atoi(p)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}
server := cluster.Server{
IP: ip,
Port: port,
User: r.PostFormValue("user"),
Password: r.PostFormValue("password"),
}
var task tasks.Task
switch strings.ToLower(t) {
case "controller":
if len(m.State().Controllers) == 0 {
task = tasks.NewClusterInitTask(m, server, s.cnc, s.repo, s.setupRemoteCluster())
} else {
task = tasks.NewClusterJoinControllerTask(m, server, s.repo)
}
case "worker":
task = tasks.NewClusterJoinWorkerTask(m, server, s.repo)
default:
http.Error(w, "invalid type", http.StatusBadRequest)
return
}
task.OnDone(s.cleanTask(cName, tid))
go task.Start()
s.tasks[cName] = &taskForward{task, fmt.Sprintf("/clusters/%s", cName), tid}
http.Redirect(w, r, fmt.Sprintf("/tasks/%s", cName), http.StatusSeeOther)
}
func (s *Server) handleCreateCluster(w http.ResponseWriter, r *http.Request) {
cName := r.PostFormValue("name")
if cName == "" {
http.Error(w, "no name", http.StatusBadRequest)
return
}
st := cluster.State{Name: cName}
if _, err := s.repo.Do(func(fs soft.RepoFS) (string, error) {
if err := soft.WriteJson(fs, fmt.Sprintf("/clusters/%s/config.json", cName), st); err != nil {
return "", err
}
return fmt.Sprintf("create cluster: %s", cName), nil
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
http.Redirect(w, r, fmt.Sprintf("/clusters/%s", cName), http.StatusSeeOther)
}
func (s *Server) handleRemoveCluster(w http.ResponseWriter, r *http.Request) {
cName, ok := mux.Vars(r)["name"]
if !ok {
http.Error(w, "empty name", http.StatusBadRequest)
return
}
tid := 0
if t, ok := s.tasks[cName]; ok {
if t.task != nil {
http.Error(w, "cluster task in progress", http.StatusLocked)
return
}
tid = t.id + 1
}
m, err := s.getClusterManager(cName)
if err != nil {
if errors.Is(err, installer.ErrorNotFound) {
http.Error(w, "not found", http.StatusNotFound)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return
}
task := tasks.NewRemoveClusterTask(m, s.cnc, s.repo)
task.OnDone(s.cleanTask(cName, tid))
go task.Start()
s.tasks[cName] = &taskForward{task, fmt.Sprintf("/clusters/%s", cName), tid}
http.Redirect(w, r, fmt.Sprintf("/tasks/%s", cName), http.StatusSeeOther)
}
func (s *Server) setupRemoteCluster() cluster.ClusterIngressSetupFunc {
const vpnUser = "private-network-proxy"
return func(name, kubeconfig, ingressClassName string) (net.IP, error) {
hostname := fmt.Sprintf("cluster-%s", name)
t := tasks.NewInstallTask(s.h, func() (installer.ReleaseResources, error) {
app, err := installer.FindEnvApp(s.fr, "cluster-network")
if err != nil {
return installer.ReleaseResources{}, err
}
env, err := s.m.Config()
if err != nil {
return installer.ReleaseResources{}, err
}
keys, err := installer.NewSSHKeyPair("port-allocator")
if err != nil {
return installer.ReleaseResources{}, err
}
user := fmt.Sprintf("%s-cluster-%s-port-allocator", env.Id, name)
if err := s.ssClient.AddUser(user, keys.AuthorizedKey()); err != nil {
return installer.ReleaseResources{}, err
}
if err := s.ssClient.AddReadWriteCollaborator("config", user); err != nil {
return installer.ReleaseResources{}, err
}
instanceId := fmt.Sprintf("%s-%s", app.Slug(), name)
appDir := fmt.Sprintf("/clusters/%s/ingress", name)
namespace := fmt.Sprintf("%scluster-%s-network", env.NamespacePrefix, name)
rr, err := s.m.Install(app, instanceId, appDir, namespace, map[string]any{
"cluster": map[string]any{
"name": name,
"kubeconfig": kubeconfig,
"ingressClassName": ingressClassName,
},
// TODO(gio): remove hardcoded user
"vpnUser": vpnUser,
"vpnProxyHostname": hostname,
"sshPrivateKey": string(keys.RawPrivateKey()),
})
if err != nil {
return installer.ReleaseResources{}, err
}
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
go s.reconciler.Reconcile(ctx)
return rr, err
})
ch := make(chan error)
t.OnDone(func(err error) {
ch <- err
})
go t.Start()
err := <-ch
if err != nil {
return nil, err
}
for {
ip, err := s.vpnAPIClient.GetNodeIP(vpnUser, hostname)
if err == nil {
return ip, nil
}
if errors.Is(err, installer.ErrorNotFound) {
time.Sleep(5 * time.Second)
}
}
}
}
func (s *Server) setupRemoteClusterStorage() cluster.ClusterSetupFunc {
return func(cm cluster.Manager) error {
name := cm.State().Name
t := tasks.NewInstallTask(s.h, func() (installer.ReleaseResources, error) {
app, err := installer.FindEnvApp(s.fr, "longhorn")
if err != nil {
return installer.ReleaseResources{}, err
}
env, err := s.m.Config()
if err != nil {
return installer.ReleaseResources{}, err
}
instanceId := fmt.Sprintf("%s-%s", app.Slug(), name)
appDir := fmt.Sprintf("/clusters/%s/storage", name)
namespace := fmt.Sprintf("%scluster-%s-storage", env.NamespacePrefix, name)
rr, err := s.m.Install(app, instanceId, appDir, namespace, map[string]any{
"cluster": name,
})
if err != nil {
return installer.ReleaseResources{}, err
}
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
go s.reconciler.Reconcile(ctx)
return rr, err
})
ch := make(chan error)
t.OnDone(func(err error) {
ch <- err
})
go t.Start()
err := <-ch
if err != nil {
return err
}
cm.EnableStorage()
return nil
}
}
func (s *Server) cleanTask(name string, id int) func(error) {
return func(err error) {
if err != nil {
fmt.Printf("Task %s failed: %s", name, err.Error())
}
s.l.Lock()
defer s.l.Unlock()
s.tasks[name].task = nil
go func() {
time.Sleep(30 * time.Second)
s.l.Lock()
defer s.l.Unlock()
if t, ok := s.tasks[name]; ok && t.id == id {
delete(s.tasks, name)
}
}()
}
}