ClusterManager: Implements support of remote clusters.
After this change users will be able to:
* Create cluster and add/remove servers to it
* Install apps on remote cluster
* Move already installed apps between clusters
* Apps running on server being removed will auto-migrate
to another server from that same cluster
This is achieved by:
* Installing and running minimal version of dodo on remote cluster
* Ingress-nginx is installed automatically on new clusters
* Next to nginx we run VPN client in the same pod, so that
default cluster can establish secure communication with it
* Multiple reverse proxies are configured to get to the
remote cluster service from ingress installed on default cluster.
Next steps:
* Support remote clusters in dodo apps (prototype ready)
* Clean up old cluster when moving app to the new one. Currently
old cluster keeps running app pods even though no ingress can
reach it anymore.
Change-Id: Iffc908c93416d4126a8e1c2832eae7b659cb8044
diff --git a/core/installer/welcome/appmanager.go b/core/installer/welcome/appmanager.go
index 7f168dc..3016190 100644
--- a/core/installer/welcome/appmanager.go
+++ b/core/installer/welcome/appmanager.go
@@ -4,37 +4,57 @@
"context"
"embed"
"encoding/json"
+ "errors"
"fmt"
"html/template"
"io/ioutil"
"log"
+ "net"
"net/http"
+ "strconv"
+ "strings"
+ "sync"
"time"
"github.com/Masterminds/sprig/v3"
"github.com/gorilla/mux"
"github.com/giolekva/pcloud/core/installer"
+ "github.com/giolekva/pcloud/core/installer/cluster"
+ "github.com/giolekva/pcloud/core/installer/soft"
"github.com/giolekva/pcloud/core/installer/tasks"
)
//go:embed appmanager-tmpl/*
var appTmpls embed.FS
+type taskForward struct {
+ task tasks.Task
+ redirectTo string
+}
+
type AppManagerServer struct {
- port int
- m *installer.AppManager
- r installer.AppRepository
- reconciler *tasks.FixedReconciler
- h installer.HelmReleaseMonitor
- tasks map[string]tasks.Task
- ta map[string]installer.EnvApp
- tmpl tmplts
+ l sync.Locker
+ port int
+ repo soft.RepoIO
+ m *installer.AppManager
+ r installer.AppRepository
+ fr installer.AppRepository
+ reconciler *tasks.FixedReconciler
+ h installer.HelmReleaseMonitor
+ cnc installer.ClusterNetworkConfigurator
+ vpnAPIClient installer.VPNAPIClient
+ tasks map[string]taskForward
+ ta map[string]installer.EnvApp
+ tmpl tmplts
}
type tmplts struct {
- index *template.Template
- app *template.Template
+ index *template.Template
+ app *template.Template
+ allClusters *template.Template
+ cluster *template.Template
+ task *template.Template
}
func parseTemplatesAppManager(fs embed.FS) (tmplts, error) {
@@ -57,29 +77,50 @@
if err != nil {
return tmplts{}, err
}
- return tmplts{index, app}, nil
+ allClusters, err := parse("appmanager-tmpl/all-clusters.html")
+ if err != nil {
+ return tmplts{}, err
+ }
+ cluster, err := parse("appmanager-tmpl/cluster.html")
+ if err != nil {
+ return tmplts{}, err
+ }
+ task, err := parse("appmanager-tmpl/task.html")
+ if err != nil {
+ return tmplts{}, err
+ }
+ return tmplts{index, app, allClusters, cluster, task}, nil
}
func NewAppManagerServer(
port int,
+ repo soft.RepoIO,
m *installer.AppManager,
r installer.AppRepository,
+ fr installer.AppRepository,
reconciler *tasks.FixedReconciler,
h installer.HelmReleaseMonitor,
+ cnc installer.ClusterNetworkConfigurator,
+ vpnAPIClient installer.VPNAPIClient,
) (*AppManagerServer, error) {
tmpl, err := parseTemplatesAppManager(appTmpls)
if err != nil {
return nil, err
}
return &AppManagerServer{
- port: port,
- m: m,
- r: r,
- reconciler: reconciler,
- h: h,
- tasks: make(map[string]tasks.Task),
- ta: make(map[string]installer.EnvApp),
- tmpl: tmpl,
+ l: &sync.Mutex{},
+ port: port,
+ repo: repo,
+ m: m,
+ r: r,
+ fr: fr,
+ reconciler: reconciler,
+ h: h,
+ cnc: cnc,
+ vpnAPIClient: vpnAPIClient,
+ tasks: make(map[string]taskForward),
+ ta: make(map[string]installer.EnvApp),
+ tmpl: tmpl,
}, nil
}
@@ -102,8 +143,15 @@
r.HandleFunc("/api/instance/{slug}", s.handleInstance).Methods(http.MethodGet)
r.HandleFunc("/api/instance/{slug}/update", s.handleAppUpdate).Methods(http.MethodPost)
r.HandleFunc("/api/instance/{slug}/remove", s.handleAppRemove).Methods(http.MethodPost)
+ r.HandleFunc("/clusters/{cluster}/servers/{server}/remove", s.handleClusterRemoveServer).Methods(http.MethodPost)
+ r.HandleFunc("/clusters/{cluster}/servers", s.handleClusterAddServer).Methods(http.MethodPost)
+ r.HandleFunc("/clusters/{name}", s.handleCluster).Methods(http.MethodGet)
+ r.HandleFunc("/clusters/{name}/remove", s.handleRemoveCluster).Methods(http.MethodPost)
+ r.HandleFunc("/clusters", s.handleAllClusters).Methods(http.MethodGet)
+ r.HandleFunc("/clusters", s.handleCreateCluster).Methods(http.MethodPost)
r.HandleFunc("/app/{slug}", s.handleAppUI).Methods(http.MethodGet)
r.HandleFunc("/instance/{slug}", s.handleInstanceUI).Methods(http.MethodGet)
+ r.HandleFunc("/tasks/{slug}", s.handleTaskStatus).Methods(http.MethodGet)
r.HandleFunc("/{pageType}", s.handleAppsList).Methods(http.MethodGet)
r.HandleFunc("/", s.handleAppsList).Methods(http.MethodGet)
fmt.Printf("Starting HTTP server on port: %d\n", s.port)
@@ -201,6 +249,8 @@
}
func (s *AppManagerServer) handleAppInstall(w http.ResponseWriter, r *http.Request) {
+ s.l.Lock()
+ defer s.l.Unlock()
slug, ok := mux.Vars(r)["slug"]
if !ok {
http.Error(w, "empty slug", http.StatusBadRequest)
@@ -249,20 +299,27 @@
if _, ok := s.tasks[instanceId]; ok {
panic("MUST NOT REACH!")
}
- s.tasks[instanceId] = t
+ s.tasks[instanceId] = taskForward{t, fmt.Sprintf("/instance/%s", instanceId)}
s.ta[instanceId] = a
t.OnDone(func(err error) {
- delete(s.tasks, instanceId)
- delete(s.ta, instanceId)
+ go func() {
+ time.Sleep(30 * time.Second)
+ s.l.Lock()
+ defer s.l.Unlock()
+ delete(s.tasks, instanceId)
+ delete(s.ta, instanceId)
+ }()
})
go t.Start()
- if _, err := fmt.Fprintf(w, "/instance/%s", instanceId); err != nil {
+ if _, err := fmt.Fprintf(w, "/tasks/%s", instanceId); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (s *AppManagerServer) handleAppUpdate(w http.ResponseWriter, r *http.Request) {
+ s.l.Lock()
+ defer s.l.Unlock()
slug, ok := mux.Vars(r)["slug"]
if !ok {
http.Error(w, "empty slug", http.StatusBadRequest)
@@ -291,11 +348,16 @@
go s.reconciler.Reconcile(ctx)
t := tasks.NewMonitorRelease(s.h, rr)
t.OnDone(func(err error) {
- delete(s.tasks, slug)
+ go func() {
+ time.Sleep(30 * time.Second)
+ s.l.Lock()
+ defer s.l.Unlock()
+ delete(s.tasks, slug)
+ }()
})
- s.tasks[slug] = t
+ s.tasks[slug] = taskForward{t, fmt.Sprintf("/instance/%s", slug)}
go t.Start()
- if _, err := fmt.Fprintf(w, "/instance/%s", slug); err != nil {
+ if _, err := fmt.Fprintf(w, "/tasks/%s", slug); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@@ -373,6 +435,7 @@
Instance *installer.AppInstanceConfig
Instances []installer.AppInstanceConfig
AvailableNetworks []installer.Network
+ AvailableClusters []cluster.State
Task tasks.Task
CurrentPage string
}
@@ -403,10 +466,16 @@
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
+ clusters, err := s.m.GetClusters()
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
data := appPageData{
App: a,
Instances: instances,
AvailableNetworks: networks,
+ AvailableClusters: clusters,
CurrentPage: a.Name(),
}
if err := s.tmpl.app.Execute(w, data); err != nil {
@@ -416,6 +485,8 @@
}
func (s *AppManagerServer) handleInstanceUI(w http.ResponseWriter, r *http.Request) {
+ s.l.Lock()
+ defer s.l.Unlock()
global, err := s.m.Config()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
@@ -432,6 +503,10 @@
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
+ if ok && !(t.task.Status() == tasks.StatusDone || t.task.Status() == tasks.StatusFailed) {
+ http.Redirect(w, r, fmt.Sprintf("/tasks/%s", slug), http.StatusSeeOther)
+ return
+ }
var a installer.EnvApp
if instance != nil {
a, err = s.m.GetInstanceApp(instance.Id)
@@ -456,12 +531,18 @@
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
+ clusters, err := s.m.GetClusters()
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
data := appPageData{
App: a,
Instance: instance,
Instances: instances,
AvailableNetworks: networks,
- Task: t,
+ AvailableClusters: clusters,
+ Task: t.task,
CurrentPage: slug,
}
if err := s.tmpl.app.Execute(w, data); err != nil {
@@ -469,3 +550,319 @@
return
}
}
+
+type taskStatusData struct {
+ CurrentPage string
+ Task tasks.Task
+}
+
+func (s *AppManagerServer) handleTaskStatus(w http.ResponseWriter, r *http.Request) {
+ s.l.Lock()
+ defer s.l.Unlock()
+ slug, ok := mux.Vars(r)["slug"]
+ if !ok {
+ http.Error(w, "empty slug", http.StatusBadRequest)
+ return
+ }
+ t, ok := s.tasks[slug]
+ if !ok {
+ http.Error(w, "task not found", http.StatusInternalServerError)
+
+ return
+ }
+ if ok && (t.task.Status() == tasks.StatusDone || t.task.Status() == tasks.StatusFailed) {
+ http.Redirect(w, r, t.redirectTo, http.StatusSeeOther)
+ return
+ }
+ data := taskStatusData{
+ CurrentPage: "",
+ Task: t.task,
+ }
+ if err := s.tmpl.task.Execute(w, data); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+}
+
+type clustersData struct {
+ CurrentPage string
+ Clusters []cluster.State
+}
+
+func (s *AppManagerServer) handleAllClusters(w http.ResponseWriter, r *http.Request) {
+ clusters, err := s.m.GetClusters()
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ data := clustersData{
+ "clusters",
+ clusters,
+ }
+ if err := s.tmpl.allClusters.Execute(w, data); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+}
+
+type clusterData struct {
+ CurrentPage string
+ Cluster cluster.State
+}
+
+func (s *AppManagerServer) handleCluster(w http.ResponseWriter, r *http.Request) {
+ name, ok := mux.Vars(r)["name"]
+ if !ok {
+ http.Error(w, "empty name", http.StatusBadRequest)
+ return
+ }
+ m, err := s.getClusterManager(name)
+ if err != nil {
+ if errors.Is(err, installer.ErrorNotFound) {
+ http.Error(w, "not found", http.StatusNotFound)
+ } else {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ }
+ return
+ }
+ data := clusterData{
+ "clusters",
+ m.State(),
+ }
+ if err := s.tmpl.cluster.Execute(w, data); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+}
+
+func (s *AppManagerServer) handleClusterRemoveServer(w http.ResponseWriter, r *http.Request) {
+ s.l.Lock()
+ defer s.l.Unlock()
+ cName, ok := mux.Vars(r)["cluster"]
+ if !ok {
+ http.Error(w, "empty name", http.StatusBadRequest)
+ return
+ }
+ if _, ok := s.tasks[cName]; ok {
+ http.Error(w, "cluster task in progress", http.StatusLocked)
+ return
+ }
+ sName, ok := mux.Vars(r)["server"]
+ if !ok {
+ http.Error(w, "empty name", http.StatusBadRequest)
+ return
+ }
+ m, err := s.getClusterManager(cName)
+ if err != nil {
+ if errors.Is(err, installer.ErrorNotFound) {
+ http.Error(w, "not found", http.StatusNotFound)
+ } else {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ }
+ return
+ }
+ task := tasks.NewClusterRemoveServerTask(m, sName, s.repo)
+ task.OnDone(func(err error) {
+ go func() {
+ time.Sleep(30 * time.Second)
+ s.l.Lock()
+ defer s.l.Unlock()
+ delete(s.tasks, cName)
+ }()
+ })
+ go task.Start()
+ s.tasks[cName] = taskForward{task, fmt.Sprintf("/clusters/%s", cName)}
+ http.Redirect(w, r, fmt.Sprintf("/tasks/%s", cName), http.StatusSeeOther)
+}
+
+func (s *AppManagerServer) getClusterManager(cName string) (cluster.Manager, error) {
+ clusters, err := s.m.GetClusters()
+ if err != nil {
+ return nil, err
+ }
+ var c *cluster.State
+ for _, i := range clusters {
+ if i.Name == cName {
+ c = &i
+ break
+ }
+ }
+ if c == nil {
+ return nil, installer.ErrorNotFound
+ }
+ return cluster.RestoreKubeManager(*c)
+}
+
+func (s *AppManagerServer) handleClusterAddServer(w http.ResponseWriter, r *http.Request) {
+ s.l.Lock()
+ defer s.l.Unlock()
+ cName, ok := mux.Vars(r)["cluster"]
+ if !ok {
+ http.Error(w, "empty name", http.StatusBadRequest)
+ return
+ }
+ if _, ok := s.tasks[cName]; ok {
+ http.Error(w, "cluster task in progress", http.StatusLocked)
+ return
+ }
+ m, err := s.getClusterManager(cName)
+ if err != nil {
+ if errors.Is(err, installer.ErrorNotFound) {
+ http.Error(w, "not found", http.StatusNotFound)
+ } else {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ }
+ return
+ }
+ t := r.PostFormValue("type")
+ ip := net.ParseIP(r.PostFormValue("ip"))
+ if ip == nil {
+ http.Error(w, "invalid ip", http.StatusBadRequest)
+ return
+ }
+ port := 22
+ if p := r.PostFormValue("port"); p != "" {
+ port, err = strconv.Atoi(p)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+ }
+ server := cluster.Server{
+ IP: ip,
+ Port: port,
+ User: r.PostFormValue("user"),
+ Password: r.PostFormValue("password"),
+ }
+ var task tasks.Task
+ switch strings.ToLower(t) {
+ case "controller":
+ if len(m.State().Controllers) == 0 {
+ task = tasks.NewClusterInitTask(m, server, s.cnc, s.repo, s.setupRemoteCluster())
+ } else {
+ task = tasks.NewClusterJoinControllerTask(m, server, s.repo)
+ }
+ case "worker":
+ task = tasks.NewClusterJoinWorkerTask(m, server, s.repo)
+ default:
+ http.Error(w, "invalid type", http.StatusBadRequest)
+ return
+ }
+ task.OnDone(func(err error) {
+ go func() {
+ time.Sleep(30 * time.Second)
+ s.l.Lock()
+ defer s.l.Unlock()
+ delete(s.tasks, cName)
+ }()
+ })
+ go task.Start()
+ s.tasks[cName] = taskForward{task, fmt.Sprintf("/clusters/%s", cName)}
+ http.Redirect(w, r, fmt.Sprintf("/tasks/%s", cName), http.StatusSeeOther)
+}
+
+func (s *AppManagerServer) handleCreateCluster(w http.ResponseWriter, r *http.Request) {
+ cName := r.PostFormValue("name")
+ if cName == "" {
+ http.Error(w, "no name", http.StatusBadRequest)
+ return
+ }
+ st := cluster.State{Name: cName}
+ if _, err := s.repo.Do(func(fs soft.RepoFS) (string, error) {
+ if err := soft.WriteJson(fs, fmt.Sprintf("/clusters/%s/config.json", cName), st); err != nil {
+ return "", err
+ }
+ return fmt.Sprintf("create cluster: %s", cName), nil
+ }); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ http.Redirect(w, r, fmt.Sprintf("/clusters/%s", cName), http.StatusSeeOther)
+}
+
+func (s *AppManagerServer) handleRemoveCluster(w http.ResponseWriter, r *http.Request) {
+ cName, ok := mux.Vars(r)["name"]
+ if !ok {
+ http.Error(w, "empty name", http.StatusBadRequest)
+ return
+ }
+ if _, ok := s.tasks[cName]; ok {
+ http.Error(w, "cluster task in progress", http.StatusLocked)
+ return
+ }
+ m, err := s.getClusterManager(cName)
+ if err != nil {
+ if errors.Is(err, installer.ErrorNotFound) {
+ http.Error(w, "not found", http.StatusNotFound)
+ } else {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ }
+ return
+ }
+ task := tasks.NewRemoveClusterTask(m, s.cnc, s.repo)
+ task.OnDone(func(err error) {
+ go func() {
+ time.Sleep(30 * time.Second)
+ s.l.Lock()
+ defer s.l.Unlock()
+ delete(s.tasks, cName)
+ }()
+ })
+ go task.Start()
+ s.tasks[cName] = taskForward{task, fmt.Sprintf("/clusters/%s", cName)}
+ http.Redirect(w, r, fmt.Sprintf("/tasks/%s", cName), http.StatusSeeOther)
+}
+
+func (s *AppManagerServer) setupRemoteCluster() cluster.ClusterSetupFunc {
+ const vpnUser = "private-network-proxy"
+ return func(name, kubeconfig, ingressClassName string) (net.IP, error) {
+ hostname := fmt.Sprintf("cluster-%s", name)
+ t := tasks.NewInstallTask(s.h, func() (installer.ReleaseResources, error) {
+ app, err := installer.FindEnvApp(s.fr, "cluster-network")
+ if err != nil {
+ return installer.ReleaseResources{}, err
+ }
+ env, err := s.m.Config()
+ if err != nil {
+ return installer.ReleaseResources{}, err
+ }
+ instanceId := fmt.Sprintf("%s-%s", app.Slug(), name)
+ appDir := fmt.Sprintf("/clusters/%s/ingress", name)
+ namespace := fmt.Sprintf("%scluster-network-%s", env.NamespacePrefix, name)
+ rr, err := s.m.Install(app, instanceId, appDir, namespace, map[string]any{
+ "cluster": map[string]any{
+ "name": name,
+ "kubeconfig": kubeconfig,
+ "ingressClassName": ingressClassName,
+ },
+ // TODO(gio): remove hardcoded user
+ "vpnUser": vpnUser,
+ "vpnProxyHostname": hostname,
+ })
+ if err != nil {
+ return installer.ReleaseResources{}, err
+ }
+ ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
+ go s.reconciler.Reconcile(ctx)
+ return rr, err
+ })
+ ch := make(chan error)
+ t.OnDone(func(err error) {
+ ch <- err
+ })
+ go t.Start()
+ err := <-ch
+ if err != nil {
+ return nil, err
+ }
+ for {
+ ip, err := s.vpnAPIClient.GetNodeIP(vpnUser, hostname)
+ if err == nil {
+ return ip, nil
+ }
+ if errors.Is(err, installer.ErrorNotFound) {
+ time.Sleep(5 * time.Second)
+ }
+ }
+ }
+}