ClusterManager: Implements support of remote clusters.
After this change users will be able to:
* Create cluster and add/remove servers to it
* Install apps on remote cluster
* Move already installed apps between clusters
* Apps running on server being removed will auto-migrate
to another server from that same cluster
This is achieved by:
* Installing and running minimal version of dodo on remote cluster
* Ingress-nginx is installed automatically on new clusters
* Next to nginx we run VPN client in the same pod, so that
default cluster can establish secure communication with it
* Multiple reverse proxies are configured to get to the
remote cluster service from ingress installed on default cluster.
Next steps:
* Support remote clusters in dodo apps (prototype ready)
* Clean up old cluster when moving app to the new one. Currently
old cluster keeps running app pods even though no ingress can
reach it anymore.
Change-Id: Iffc908c93416d4126a8e1c2832eae7b659cb8044
diff --git a/core/installer/app_manager.go b/core/installer/app_manager.go
index d31d9ff..0bddf29 100644
--- a/core/installer/app_manager.go
+++ b/core/installer/app_manager.go
@@ -13,7 +13,9 @@
"strings"
"sync"
+ "github.com/giolekva/pcloud/core/installer/cluster"
gio "github.com/giolekva/pcloud/core/installer/io"
+ "github.com/giolekva/pcloud/core/installer/kube"
"github.com/giolekva/pcloud/core/installer/soft"
helmv2 "github.com/fluxcd/helm-controller/api/v2"
@@ -31,36 +33,39 @@
type AppManager struct {
l sync.Locker
- repoIO soft.RepoIO
+ repo soft.RepoIO
nsc NamespaceCreator
jc JobCreator
hf HelmFetcher
vpnAPIClient VPNAPIClient
+ cnc ClusterNetworkConfigurator
appDirRoot string
}
func NewAppManager(
- repoIO soft.RepoIO,
+ repo soft.RepoIO,
nsc NamespaceCreator,
jc JobCreator,
hf HelmFetcher,
vpnKeyGen VPNAPIClient,
+ cnc ClusterNetworkConfigurator,
appDirRoot string,
) (*AppManager, error) {
return &AppManager{
&sync.Mutex{},
- repoIO,
+ repo,
nsc,
jc,
hf,
vpnKeyGen,
+ cnc,
appDirRoot,
}, nil
}
func (m *AppManager) Config() (EnvConfig, error) {
var cfg EnvConfig
- if err := soft.ReadYaml(m.repoIO, configFileName, &cfg); err != nil {
+ if err := soft.ReadYaml(m.repo, configFileName, &cfg); err != nil {
return EnvConfig{}, err
} else {
return cfg, nil
@@ -69,7 +74,7 @@
func (m *AppManager) appConfig(path string) (AppInstanceConfig, error) {
var cfg AppInstanceConfig
- if err := soft.ReadJson(m.repoIO, path, &cfg); err != nil {
+ if err := soft.ReadJson(m.repo, path, &cfg); err != nil {
return AppInstanceConfig{}, err
} else {
return cfg, nil
@@ -77,9 +82,12 @@
}
func (m *AppManager) GetAllInstances() ([]AppInstanceConfig, error) {
- m.repoIO.Pull()
- kust, err := soft.ReadKustomization(m.repoIO, filepath.Join(m.appDirRoot, "kustomization.yaml"))
+ m.repo.Pull()
+ kust, err := soft.ReadKustomization(m.repo, filepath.Join(m.appDirRoot, "kustomization.yaml"))
if err != nil {
+ if errors.Is(err, fs.ErrNotExist) {
+ return nil, nil
+ }
return nil, err
}
ret := make([]AppInstanceConfig, 0)
@@ -95,7 +103,7 @@
}
func (m *AppManager) GetAllAppInstances(name string) ([]AppInstanceConfig, error) {
- kust, err := soft.ReadKustomization(m.repoIO, filepath.Join(m.appDirRoot, "kustomization.yaml"))
+ kust, err := soft.ReadKustomization(m.repo, filepath.Join(m.appDirRoot, "kustomization.yaml"))
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
return nil, nil
@@ -120,27 +128,12 @@
func (m *AppManager) GetInstance(id string) (*AppInstanceConfig, error) {
appDir := filepath.Clean(filepath.Join(m.appDirRoot, id))
cfgPath := filepath.Join(appDir, "config.json")
- // kust, err := soft.ReadKustomization(m.repoIO, filepath.Join(m.appDirRoot, "kustomization.yaml"))
- // if err != nil {
- // return nil, err
- // }
- // for _, app := range kust.Resources {
- // if app == id {
- // cfg, err := m.appConfig(filepath.Join(m.appDirRoot, app, "config.json"))
cfg, err := m.appConfig(cfgPath)
if err != nil {
return nil, err
}
cfg.Id = id
return &cfg, err
- // if err != nil {
- // return nil, err
- // }
- // cfg.Id = id
- // return &cfg, nil
- // }
- // }
- // return nil, ErrorNotFound
}
func GetCueAppData(fs soft.RepoFS, dir string) (CueAppData, error) {
@@ -162,7 +155,7 @@
}
func (m *AppManager) GetInstanceApp(id string) (EnvApp, error) {
- cfg, err := GetCueAppData(m.repoIO, filepath.Join(m.appDirRoot, id))
+ cfg, err := GetCueAppData(m.repo, filepath.Join(m.appDirRoot, id))
if err != nil {
return nil, err
}
@@ -339,7 +332,7 @@
dopts = append(dopts, soft.WithNoLock())
}
_, err := repo.Do(func(r soft.RepoFS) (string, error) {
- if err := r.RemoveDir(appDir); err != nil {
+ if err := r.RemoveAll(appDir); err != nil {
return "", err
}
resourcesDir := path.Join(appDir, "resources")
@@ -428,7 +421,7 @@
}
appDir = filepath.Clean(appDir)
if !o.NoPull {
- if err := m.repoIO.Pull(); err != nil {
+ if err := m.repo.Pull(); err != nil {
return ReleaseResources{}, err
}
}
@@ -456,6 +449,10 @@
return ReleaseResources{}, err
}
}
+ clusters, err := m.GetClusters()
+ if err != nil {
+ return ReleaseResources{}, err
+ }
var lg LocalChartGenerator
if o.LG != nil {
lg = o.LG
@@ -465,10 +462,10 @@
release := Release{
AppInstanceId: instanceId,
Namespace: namespace,
- RepoAddr: m.repoIO.FullAddress(),
+ RepoAddr: m.repo.FullAddress(),
AppDir: appDir,
}
- rendered, err := app.Render(release, env, networks, values, nil, m.vpnAPIClient)
+ rendered, err := app.Render(release, env, networks, ToAccessConfigs(clusters), values, nil, m.vpnAPIClient)
if err != nil {
return ReleaseResources{}, err
}
@@ -492,7 +489,7 @@
return ReleaseResources{}, err
}
}
- charts, err := pullHelmCharts(m.hf, rendered.HelmCharts, m.repoIO, "/helm-charts")
+ charts, err := pullHelmCharts(m.hf, rendered.HelmCharts, m.repo, "/helm-charts")
if err != nil {
return ReleaseResources{}, err
}
@@ -500,17 +497,37 @@
if o.FetchContainerImages {
release.ImageRegistry = imageRegistry
}
- rendered, err = app.Render(release, env, networks, values, localCharts, m.vpnAPIClient)
+ rendered, err = app.Render(release, env, networks, ToAccessConfigs(clusters), values, localCharts, m.vpnAPIClient)
if err != nil {
return ReleaseResources{}, err
}
- if err := installApp(m.repoIO, appDir, rendered.Name, rendered.Config, rendered.Resources, rendered.Data, opts...); err != nil {
+ for _, ns := range rendered.Namespaces {
+ if ns.Name == "" {
+ return ReleaseResources{}, fmt.Errorf("namespace name missing")
+ }
+ if ns.Kubeconfig == "" {
+ continue
+ }
+ nsc, err := NewNamespaceCreator(kube.KubeConfigOpts{KubeConfig: ns.Kubeconfig})
+ if err != nil {
+ return ReleaseResources{}, err
+ }
+ if err := nsc.Create(ns.Name); err != nil {
+ return ReleaseResources{}, err
+ }
+ }
+ if err := installApp(m.repo, appDir, rendered.Name, rendered.Config, rendered.Resources, rendered.Data, opts...); err != nil {
return ReleaseResources{}, err
}
// TODO(gio): add ingress-nginx to release resources
if err := openPorts(rendered.Ports, portReservations, allocators); err != nil {
return ReleaseResources{}, err
}
+ for _, p := range rendered.ClusterProxies {
+ if err := m.cnc.AddProxy(p.From, p.To); err != nil {
+ return ReleaseResources{}, err
+ }
+ }
return ReleaseResources{
Release: rendered.Config.Release,
RenderedRaw: rendered.Raw,
@@ -568,7 +585,7 @@
) (ReleaseResources, error) {
m.l.Lock()
defer m.l.Unlock()
- if err := m.repoIO.Pull(); err != nil {
+ if err := m.repo.Pull(); err != nil {
return ReleaseResources{}, err
}
env, err := m.Config()
@@ -585,7 +602,7 @@
if err != nil {
return ReleaseResources{}, err
}
- renderedCfg, err := readRendered(m.repoIO, filepath.Join(instanceDir, "rendered.json"))
+ renderedCfg, err := readRendered(m.repo, filepath.Join(instanceDir, "rendered.json"))
if err != nil {
return ReleaseResources{}, err
}
@@ -593,13 +610,60 @@
if err != nil {
return ReleaseResources{}, err
}
- rendered, err := app.Render(config.Release, env, networks, values, renderedCfg.LocalCharts, m.vpnAPIClient)
+ clusters, err := m.GetClusters()
if err != nil {
return ReleaseResources{}, err
}
- if err := installApp(m.repoIO, instanceDir, rendered.Name, rendered.Config, rendered.Resources, rendered.Data, opts...); err != nil {
+ rendered, err := app.Render(config.Release, env, networks, ToAccessConfigs(clusters), values, renderedCfg.LocalCharts, m.vpnAPIClient)
+ if err != nil {
return ReleaseResources{}, err
}
+ for _, ns := range rendered.Namespaces {
+ if ns.Name == "" {
+ return ReleaseResources{}, fmt.Errorf("namespace name missing")
+ }
+ if ns.Kubeconfig == "" {
+ continue
+ }
+ nsc, err := NewNamespaceCreator(kube.KubeConfigOpts{KubeConfig: ns.Kubeconfig})
+ if err != nil {
+ return ReleaseResources{}, err
+ }
+ if err := nsc.Create(ns.Name); err != nil {
+ return ReleaseResources{}, err
+ }
+ }
+ if err := installApp(m.repo, instanceDir, rendered.Name, rendered.Config, rendered.Resources, rendered.Data, opts...); err != nil {
+ return ReleaseResources{}, err
+ }
+ for _, ocp := range renderedCfg.Out.ClusterProxy {
+ found := false
+ for _, ncp := range rendered.ClusterProxies {
+ if ocp == ncp {
+ found = true
+ break
+ }
+ }
+ if !found {
+ if err := m.cnc.RemoveProxy(ocp.From, ocp.To); err != nil {
+ return ReleaseResources{}, err
+ }
+ }
+ }
+ for _, ncp := range rendered.ClusterProxies {
+ found := false
+ for _, ocp := range renderedCfg.Out.ClusterProxy {
+ if ocp == ncp {
+ found = true
+ break
+ }
+ }
+ if !found {
+ if err := m.cnc.AddProxy(ncp.From, ncp.To); err != nil {
+ return ReleaseResources{}, err
+ }
+ }
+ }
return ReleaseResources{
Release: rendered.Config.Release,
RenderedRaw: rendered.Raw,
@@ -610,18 +674,18 @@
func (m *AppManager) Remove(instanceId string) error {
m.l.Lock()
defer m.l.Unlock()
- if err := m.repoIO.Pull(); err != nil {
+ if err := m.repo.Pull(); err != nil {
return err
}
var cfg renderedInstance
- if _, err := m.repoIO.Do(func(r soft.RepoFS) (string, error) {
+ if _, err := m.repo.Do(func(r soft.RepoFS) (string, error) {
instanceDir := filepath.Join(m.appDirRoot, instanceId)
- renderedCfg, err := readRendered(m.repoIO, filepath.Join(instanceDir, "rendered.json"))
+ renderedCfg, err := readRendered(m.repo, filepath.Join(instanceDir, "rendered.json"))
if err != nil {
return "", err
}
cfg = renderedCfg
- r.RemoveDir(instanceDir)
+ r.RemoveAll(instanceDir)
kustPath := filepath.Join(m.appDirRoot, "kustomization.yaml")
kust, err := soft.ReadKustomization(r, kustPath)
if err != nil {
@@ -636,6 +700,11 @@
if err := closePorts(cfg.PortForward); err != nil {
return err
}
+ for _, cp := range cfg.Out.ClusterProxy {
+ if err := m.cnc.RemoveProxy(cp.From, cp.To); err != nil {
+ return err
+ }
+ }
for vmName, vmCfg := range cfg.Out.VM {
if vmCfg.VPN.Enabled {
if err := m.vpnAPIClient.ExpireNode(vmCfg.Username, vmName); err != nil {
@@ -692,6 +761,36 @@
return ret, nil
}
+func (m *AppManager) GetClusters() ([]cluster.State, error) {
+ ret := []cluster.State{
+ {
+ Name: "default",
+ },
+ }
+ files, err := m.repo.ListDir("/clusters")
+ if err != nil {
+ if errors.Is(err, fs.ErrNotExist) {
+ return ret, nil
+ }
+ return nil, err
+ }
+ for _, f := range files {
+ if !f.IsDir() {
+ continue
+ }
+ cfgPath := filepath.Clean(filepath.Join("/clusters", f.Name(), "config.json"))
+ var c cluster.State
+ if err := soft.ReadJson(m.repo, cfgPath, &c); err != nil {
+ if errors.Is(err, fs.ErrNotExist) {
+ continue
+ }
+ return nil, err
+ }
+ ret = append(ret, c)
+ }
+ return ret, nil
+}
+
type installOptions struct {
NoPull bool
NoPublish bool
@@ -968,7 +1067,8 @@
}
type outRendered struct {
- VM map[string]vmRendered `json:"vm"`
+ ClusterProxy map[string]ClusterProxy
+ VM map[string]vmRendered `json:"vm"`
}
type vmRendered struct {
@@ -1028,6 +1128,8 @@
return []string{""}
case KindVPNAuthKey:
return []string{}
+ case KindCluster:
+ return []string{}
default:
panic("MUST NOT REACH!")
}
@@ -1063,3 +1165,25 @@
}
return nil
}
+
+type Cluster struct {
+ Name string `json:"name"`
+ Kubeconfig string `json:"kubeconfig"`
+ IngressClassName string `json:"ingressClassName"`
+}
+
+func ClusterStateToAccessConfig(c cluster.State) Cluster {
+ return Cluster{
+ Name: c.Name,
+ Kubeconfig: c.Kubeconfig,
+ IngressClassName: c.IngressClassName,
+ }
+}
+
+func ToAccessConfigs(clusters []cluster.State) []Cluster {
+ ret := make([]Cluster, 0, len(clusters))
+ for _, c := range clusters {
+ ret = append(ret, ClusterStateToAccessConfig(c))
+ }
+ return ret
+}