ClusterManager: Implements support of remote clusters.
After this change users will be able to:
* Create cluster and add/remove servers to it
* Install apps on remote cluster
* Move already installed apps between clusters
* Apps running on server being removed will auto-migrate
to another server from that same cluster
This is achieved by:
* Installing and running minimal version of dodo on remote cluster
* Ingress-nginx is installed automatically on new clusters
* Next to nginx we run VPN client in the same pod, so that
default cluster can establish secure communication with it
* Multiple reverse proxies are configured to get to the
remote cluster service from ingress installed on default cluster.
Next steps:
* Support remote clusters in dodo apps (prototype ready)
* Clean up old cluster when moving app to the new one. Currently
old cluster keeps running app pods even though no ingress can
reach it anymore.
Change-Id: Iffc908c93416d4126a8e1c2832eae7b659cb8044
diff --git a/core/dns-api/Makefile b/core/dns-api/Makefile
index 1f127cb..bd78618 100644
--- a/core/dns-api/Makefile
+++ b/core/dns-api/Makefile
@@ -1,7 +1,7 @@
repo_name ?= dtabidze
podman ?= docker
ifeq ($(podman), podman)
-manifest_dest=docker://docker.io/$(repo_name)/pcloud-installer:latest
+manifest_dest=docker://docker.io/$(repo_name)/dns-api:latest
endif
clean:
diff --git a/core/dns-api/main.go b/core/dns-api/main.go
index 7ab1bb2..c3d429d 100644
--- a/core/dns-api/main.go
+++ b/core/dns-api/main.go
@@ -23,6 +23,9 @@
if err != nil {
panic(err)
}
+ if err := store.Log(); err != nil {
+ panic(err)
+ }
server := NewServer(*port, *zone, ds, store, nameserverIP)
server.Start()
}
diff --git a/core/dns-api/records_file.go b/core/dns-api/records_file.go
index 6916a5e..a25c342 100644
--- a/core/dns-api/records_file.go
+++ b/core/dns-api/records_file.go
@@ -35,11 +35,8 @@
func (z *RecordsFile) DeleteTxtRecord(name, value string) {
z.lock.Lock()
defer z.lock.Unlock()
- fmt.Printf("%s %s\n", name, value)
for i, rr := range z.rrs {
- fmt.Printf("%+v\n", rr)
if txt, ok := rr.(*dns.TXT); ok {
- fmt.Printf("%+v\n", txt)
if txt.Hdr.Name == name && strings.Join(txt.Txt, "") == value {
z.rrs = append(z.rrs[:i], z.rrs[i+1:]...)
}
@@ -47,6 +44,20 @@
}
}
+func (z *RecordsFile) DeleteARecord(name, value string) error {
+ z.lock.Lock()
+ defer z.lock.Unlock()
+ for i, rr := range z.rrs {
+ if a, ok := rr.(*dns.A); ok {
+ if a.Hdr.Name == name && a.A.String() == value {
+ z.rrs = append(z.rrs[:i], z.rrs[i+1:]...)
+ return nil
+ }
+ }
+ }
+ return fmt.Errorf("not found")
+}
+
// func (z *RecordsFile) DeleteRecordsFor(name string) {
// z.lock.Lock()
// defer z.lock.Unlock()
diff --git a/core/dns-api/server.go b/core/dns-api/server.go
index 37db8ff..d67ccf6 100644
--- a/core/dns-api/server.go
+++ b/core/dns-api/server.go
@@ -31,6 +31,8 @@
}
m.HandleFunc("/records-to-publish", s.recordsToPublish)
m.HandleFunc("/create-txt-record", s.createTxtRecord)
+ m.HandleFunc("/create-a-record", s.createARecord)
+ m.HandleFunc("/delete-a-record", s.deleteARecord)
m.HandleFunc("/delete-txt-record", s.deleteTxtRecord)
return s
}
@@ -69,22 +71,44 @@
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
- fmt.Printf("CREATE: %+v\n", req)
if err := s.store.Add(req.Entry, req.Text); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
+func (s *Server) createARecord(w http.ResponseWriter, r *http.Request) {
+ var req record
+ if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+ if err := s.store.AddARecord(req.Entry, req.Text); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+}
+
func (s *Server) deleteTxtRecord(w http.ResponseWriter, r *http.Request) {
var req record
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
- fmt.Printf("DELETE: %+v\n", req)
if err := s.store.Delete(req.Entry, req.Text); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
+
+func (s *Server) deleteARecord(w http.ResponseWriter, r *http.Request) {
+ var req record
+ if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+ if err := s.store.DeleteARecord(req.Entry, req.Text); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+}
diff --git a/core/dns-api/store.go b/core/dns-api/store.go
index 6d0a7c1..8dfd914 100644
--- a/core/dns-api/store.go
+++ b/core/dns-api/store.go
@@ -2,11 +2,16 @@
import (
"fmt"
+ "io"
+ "os"
)
type RecordStore interface {
+ Log() error
Add(entry, txt string) error
+ AddARecord(entry, ip string) error
Delete(entry, txt string) error
+ DeleteARecord(entry, ip string) error
}
type fsRecordStore struct {
@@ -16,21 +21,32 @@
db string
}
+func (s *fsRecordStore) Log() error {
+ r, err := s.fs.Reader(s.db)
+ if err != nil {
+ return err
+ }
+ defer r.Close()
+ _, err = io.Copy(os.Stdout, r)
+ return err
+}
+
func (s *fsRecordStore) read() (*RecordsFile, error) {
r, err := s.fs.Reader(s.db)
if err != nil {
return nil, err
}
defer r.Close()
- return NewRecordsFile(r)
+ return NewRecordsFile(io.TeeReader(r, os.Stdout))
}
+
func (s *fsRecordStore) write(z *RecordsFile) error {
w, err := s.fs.Writer(s.db)
if err != nil {
return err
}
defer w.Close()
- return z.Write(w)
+ return z.Write(io.MultiWriter(w, os.Stdout))
}
func (s *fsRecordStore) Add(entry, txt string) error {
@@ -46,6 +62,19 @@
return s.write(z)
}
+func (s *fsRecordStore) AddARecord(entry, ip string) error {
+ z, err := s.read()
+ if err != nil {
+ return err
+ }
+ fqdn := fmt.Sprintf("%s.%s.", entry, s.zone)
+ z.CreateARecord(fqdn, ip)
+ // for _, ip := range s.publicIP {
+ // z.CreateARecord(fqdn, ip)
+ // }
+ return s.write(z)
+}
+
func (s *fsRecordStore) Delete(entry, txt string) error {
z, err := s.read()
if err != nil {
@@ -56,3 +85,16 @@
// z.DeleteRecordsFor(fqdn)
return s.write(z)
}
+
+func (s *fsRecordStore) DeleteARecord(entry, ip string) error {
+ z, err := s.read()
+ if err != nil {
+ return err
+ }
+ fqdn := fmt.Sprintf("%s.%s.", entry, s.zone)
+ if err := z.DeleteARecord(fqdn, ip); err != nil {
+ return err
+ }
+ // z.DeleteRecordsFor(fqdn)
+ return s.write(z)
+}
diff --git a/core/headscale/client.go b/core/headscale/client.go
index 18c37df..159fffb 100644
--- a/core/headscale/client.go
+++ b/core/headscale/client.go
@@ -5,12 +5,14 @@
"encoding/json"
"errors"
"fmt"
+ "net"
"os/exec"
"strconv"
"strings"
)
var ErrorAlreadyExists = errors.New("already exists")
+var ErrorNotFound = errors.New("not found")
type client struct {
config string
@@ -89,8 +91,9 @@
}
type nodeInfo struct {
- Id int `json:"id"`
- Name string `json:"name"`
+ Id int `json:"id"`
+ Name string `json:"name"`
+ IPAddresses []net.IP `json:"ip_addresses"`
}
func (c *client) getNodeId(user, node string) (string, error) {
@@ -111,6 +114,24 @@
return "", fmt.Errorf("not found")
}
+func (c *client) getNodeAddresses(user, node string) ([]net.IP, error) {
+ cmd := exec.Command("headscale", c.config, "--user", user, "node", "list", "-o", "json")
+ out, err := cmd.Output()
+ if err != nil {
+ return nil, err
+ }
+ var nodes []nodeInfo
+ if err := json.NewDecoder(bytes.NewReader(out)).Decode(&nodes); err != nil {
+ return nil, err
+ }
+ for _, n := range nodes {
+ if n.Name == node {
+ return n.IPAddresses, nil
+ }
+ }
+ return nil, ErrorNotFound
+}
+
func extractLastLine(s string) (string, error) {
items := strings.Split(s, "\n")
for i := len(items) - 1; i >= 0; i-- {
diff --git a/core/headscale/main.go b/core/headscale/main.go
index 698d9d2..194da06 100644
--- a/core/headscale/main.go
+++ b/core/headscale/main.go
@@ -39,6 +39,11 @@
},
},
"acls": [
+ // {
+ // "action": "accept",
+ // "src": ["10.42.0.0/16", "10.43.0.0/16", "135.181.48.180/32", "65.108.39.172/32"],
+ // "dst": ["10.42.0.0/16:*", "10.43.0.0/16:*", "135.181.48.180/32:*", "65.108.39.172/32:*"],
+ // },
{{- range .cidrs }}
{ // Everyone has passthough access to private-network-proxy node
"action": "accept",
@@ -46,8 +51,18 @@
"dst": ["{{ . }}:*", "private-network-proxy:0"],
},
{{- end }}
+ { // Everyone has access to every port of nodes owned by private-network-proxy
+ "action": "accept",
+ "src": ["*"],
+ "dst": ["private-network-proxy:*"],
+ },
+ {
+ "action": "accept",
+ "src": ["private-network-proxy"],
+ "dst": ["private-network-proxy:*"],
+ },
{{- range .users }}
- { // Everyone has passthough access to private-network-proxy node
+ {
"action": "accept",
"src": ["{{ . }}"],
"dst": ["{{ . }}:*"],
@@ -90,6 +105,7 @@
r.HandleFunc("/user/{user}/preauthkey", s.createReusablePreAuthKey).Methods(http.MethodPost)
r.HandleFunc("/user/{user}/preauthkey", s.expireReusablePreAuthKey).Methods(http.MethodDelete)
r.HandleFunc("/user/{user}/node/{node}/expire", s.expireUserNode).Methods(http.MethodPost)
+ r.HandleFunc("/user/{user}/node/{node}/ip", s.getNodeIP).Methods(http.MethodGet)
r.HandleFunc("/user/{user}/node/{node}", s.removeUserNode).Methods(http.MethodDelete)
r.HandleFunc("/user", s.createUser).Methods(http.MethodPost)
r.HandleFunc("/routes/{id}/enable", s.enableRoute).Methods(http.MethodPost)
@@ -247,6 +263,33 @@
}
}
+func (s *server) getNodeIP(w http.ResponseWriter, r *http.Request) {
+ user, ok := mux.Vars(r)["user"]
+ if !ok || user == "" {
+ http.Error(w, "no user", http.StatusBadRequest)
+ return
+ }
+ node, ok := mux.Vars(r)["node"]
+ if !ok || node == "" {
+ http.Error(w, "no name", http.StatusBadRequest)
+ return
+ }
+ addr, err := s.client.getNodeAddresses(user, node)
+ if err != nil {
+ if errors.Is(err, ErrorNotFound) {
+ http.Error(w, err.Error(), http.StatusNotFound)
+ } else {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ }
+ return
+ }
+ if len(addr) == 0 || addr[0] == nil {
+ http.Error(w, "no address", http.StatusPreconditionFailed)
+ return
+ }
+ fmt.Fprintf(w, "%s", addr[0].String())
+}
+
func updateACLs(aclsPath string, cidrs []string, users []string) ([]byte, error) {
tmpl, err := template.New("acls").Parse(defaultACLs)
if err != nil {
diff --git a/core/installer/app.go b/core/installer/app.go
index 91f290f..f023b18 100644
--- a/core/installer/app.go
+++ b/core/installer/app.go
@@ -33,10 +33,13 @@
type rendered struct {
Name string
Readme string
+ Cluster string
+ Namespaces []Namespace
Resources CueAppData
HelmCharts HelmCharts
ContainerImages map[string]ContainerImage
Ports []PortForward
+ ClusterProxies map[string]ClusterProxy
Data CueAppData
URL string
Help []HelpDocument
@@ -44,6 +47,11 @@
Raw []byte
}
+type Namespace struct {
+ Name string `json:"name"`
+ Kubeconfig string `json:"kubeconfig,omitempty"`
+}
+
type HelpDocument struct {
Title string
Contents string
@@ -81,6 +89,11 @@
Config InfraAppInstanceConfig
}
+type ClusterProxy struct {
+ From string `json:"from"`
+ To string `json:"to"`
+}
+
type PortForward struct {
Allocator string `json:"allocator"`
ReserveAddr string `json:"reservator"`
@@ -200,6 +213,7 @@
release Release,
env EnvConfig,
networks []Network,
+ clusters []Cluster,
values map[string]any,
charts map[string]helmv2.HelmChartTemplateSpec,
vpnKeyGen VPNAPIClient,
@@ -342,6 +356,13 @@
return rendered{}, err
}
ret.Readme = readme
+ res.LookupPath(cue.ParsePath("input.cluster.name")).Decode(&ret.Cluster)
+ if err := res.LookupPath(cue.ParsePath("output.clusterProxy")).Decode(&ret.ClusterProxies); err != nil {
+ return rendered{}, err
+ }
+ if err := res.LookupPath(cue.ParsePath("namespaces")).Decode(&ret.Namespaces); err != nil {
+ return rendered{}, err
+ }
if err := res.LookupPath(cue.ParsePath("portForward")).Decode(&ret.Ports); err != nil {
return rendered{}, err
}
@@ -457,14 +478,16 @@
release Release,
env EnvConfig,
networks []Network,
+ clusters []Cluster,
values map[string]any,
charts map[string]helmv2.HelmChartTemplateSpec,
vpnKeyGen VPNAPIClient,
) (EnvAppRendered, error) {
- derived, err := deriveValues(values, values, a.Schema(), networks, vpnKeyGen)
+ derived, err := deriveValues(values, values, a.Schema(), networks, clusters, vpnKeyGen)
if err != nil {
return EnvAppRendered{}, err
}
+ // return EnvAppRendered{}, fmt.Errorf("asdasd")
if charts == nil {
charts = make(map[string]helmv2.HelmChartTemplateSpec)
}
diff --git a/core/installer/app_configs/app_base.cue b/core/installer/app_configs/app_base.cue
index ac2d159..192716f 100644
--- a/core/installer/app_configs/app_base.cue
+++ b/core/installer/app_configs/app_base.cue
@@ -5,6 +5,11 @@
"net"
)
+input: {
+ cluster?: #Cluster @name(Cluster)
+ ...
+}
+
name: string | *""
description: string | *""
readme: string | *""
@@ -21,6 +26,13 @@
url: string | *""
+#Namespace: {
+ name: string
+ kubeconfig?: string
+}
+
+namespaces: [...#Namespace] | *[]
+
#AppType: "infra" | "env"
appType: #AppType | *"env"
@@ -98,6 +110,12 @@
portForward: [...#PortForward] | *[]
+#Cluster: {
+ name: string
+ kubeconfig: string
+ ingressClassName: string
+}
+
global: #Global
release: #Release
@@ -131,6 +149,7 @@
username: string
domain: string
vpn: #VPN | *{ enabled: false }
+ codeServerEnabled: bool | *false
cpuCores: int
memory: string
sshKnownHosts: [...string] | *[]
@@ -142,6 +161,27 @@
_memory: memory
_codeServerPort: 9090
+ _codeServerCmd: [...[...string]] | *[]
+ if codeServerEnabled {
+ _codeServerCmd: [
+ ["sh", "-c", "curl -fsSL https://code-server.dev/install.sh | HOME=/home/\(username) sh"],
+ ["sh", "-c", "systemctl enable --now code-server@\(username)"],
+ ["sh", "-c", "sleep 10"],
+ // TODO(gio): (maybe) listen only on tailscale interface
+ ["sh", "-c", "sed -i -e 's/127.0.0.1:8080/0.0.0.0:\(_codeServerPort)/g' /home/\(username)/.config/code-server/config.yaml"],
+ ["sh", "-c", "sed -i -e 's/auth: password/auth: none/g' /home/\(username)/.config/code-server/config.yaml"],
+ ["sh", "-c", "systemctl restart --now code-server@\(username)"],
+ ]
+ }
+
+ _vpnCmd: [...[...string]] | *[]
+ if vpn.enabled {
+ _vpnCmd: [
+ ["sh", "-c", "curl -fsSL https://tailscale.com/install.sh | sh"],
+ // TODO(gio): (maybe) enable tailscale ssh
+ ["sh", "-c", "tailscale up --login-server=\(vpn.loginServer) --auth-key=\(vpn.authKey)"],
+ ]
+ }
images: {}
charts: {
@@ -241,32 +281,24 @@
owner: "\(username):\(username)"
permissions: "0644"
}], cloudInit.writeFiles])
- runcmd: list.Concat([[
- ["sh", "-c", "chown -R \(username):\(username) /home/\(username)"],
- ["sh", "-c", "ssh-keygen -t ed25519 -f /home/\(username)/.ssh/id_ed25519 -q -N ''"],
- ["sh", "-c", "chown \(username):\(username) /home/\(username)/.ssh/id_ed25519*"],
- ["sh", "-c", "chmod 0600 /home/\(username)/.ssh/id_ed25519*"],
- // TODO(gio): implement post app delete webhook to remove ssh key from memberships
- // TODO(gio): make memberships-api addr configurable
- ["sh", "-c", "PUBKEY=$(cat /home/\(username)/.ssh/id_ed25519.pub) && curl --request POST --data \"{\\\"user\\\":\\\"\(username)\\\",\\\"publicKey\\\":\\\"${PUBKEY}\\\"}\" http://memberships-api.\(global.namespacePrefix)core-auth-memberships.svc.cluster.local/api/users/\(username)/keys"],
- // TODO(gio): this waits for user keys are synced from memberships service back to the dodo-app.
- // We should inject this key into the dodo-app directly as well.
- ["sh", "-c", "sleep 20"],
- if vpn.enabled {
- ["sh", "-c", "curl -fsSL https://tailscale.com/install.sh | sh"],
- }
- if vpn.enabled {
- // TODO(gio): (maybe) enable tailscale ssh
- ["sh", "-c", "tailscale up --login-server=\(vpn.loginServer) --auth-key=\(vpn.authKey)"],
- }
- ["sh", "-c", "curl -fsSL https://code-server.dev/install.sh | HOME=/home/\(username) sh"],
- ["sh", "-c", "systemctl enable --now code-server@\(username)"],
- ["sh", "-c", "sleep 10"],
- // TODO(gio): (maybe) listen only on tailscale interface
- ["sh", "-c", "sed -i -e 's/127.0.0.1:8080/0.0.0.0:\(_codeServerPort)/g' /home/\(username)/.config/code-server/config.yaml"],
- ["sh", "-c", "sed -i -e 's/auth: password/auth: none/g' /home/\(username)/.config/code-server/config.yaml"],
- ["sh", "-c", "systemctl restart --now code-server@\(username)"],
- ], cloudInit.runCmd])
+ runcmd: list.Concat([
+ [
+ ["sh", "-c", "chown -R \(username):\(username) /home/\(username)"],
+ ["sh", "-c", "ssh-keygen -t ed25519 -f /home/\(username)/.ssh/id_ed25519 -q -N ''"],
+ ["sh", "-c", "chown \(username):\(username) /home/\(username)/.ssh/id_ed25519*"],
+ ["sh", "-c", "chmod 0600 /home/\(username)/.ssh/id_ed25519*"],
+ // TODO(gio): implement post app delete webhook to remove ssh key from memberships
+ // TODO(gio): make memberships-api addr configurable
+ ["sh", "-c", "PUBKEY=$(cat /home/\(username)/.ssh/id_ed25519.pub) && curl --request POST --data \"{\\\"user\\\":\\\"\(username)\\\",\\\"publicKey\\\":\\\"${PUBKEY}\\\"}\" http://memberships-api.\(global.namespacePrefix)core-auth-memberships.svc.cluster.local/api/users/\(username)/keys"],
+ ],
+ _vpnCmd,
+ _codeServerCmd,
+ [
+ // TODO(gio): this waits for user keys are synced from memberships service back to the dodo-app.
+ // We should inject this key into the dodo-app directly as well.
+ // ["sh", "-c", "sleep 20"],
+ ],
+ cloudInit.runCmd])
}
}
}
@@ -370,6 +402,8 @@
dependsOn: [...#ResourceReference] | *[]
info: string | *""
annotations: {...} | *{}
+ cluster?: #Cluster | null
+ targetNamespace?: string
...
}
@@ -380,12 +414,15 @@
_dependencies: [...#ResourceReference] | *[]
_info: string | *""
_annotations: {...} | *{}
+ _cluster?: #Cluster | null
+ _namespace: string
+ _targetNamespace?: string
apiVersion: "helm.toolkit.fluxcd.io/v2beta1"
kind: "HelmRelease"
metadata: {
name: _name
- namespace: release.namespace
+ namespace: _namespace
annotations: _annotations & {
"dodo.cloud/installer-info": _info
}
@@ -394,6 +431,19 @@
interval: "1m0s"
dependsOn: _dependencies
chart: spec: _chart
+ if _targetNamespace != _|_ {
+ targetNamespace: _targetNamespace
+ }
+ if _cluster != _|_ {
+ if _cluster != null {
+ kubeConfig: secretRef: {
+ name: "cluster-kubeconfig"
+ key: "kubeconfig"
+ }
+ }
+ }
+ install: remediation: retries: -1
+ upgrade: remediation: retries: -1
values: _values
}
}
@@ -401,6 +451,7 @@
output: {
images: out.images
charts: out.charts
+ clusterProxy: out.clusterProxy
_lc: _localCharts & {
for k, v in out.charts {
"\(k)": {
@@ -417,6 +468,13 @@
_dependencies: r.dependsOn
_info: r.info
_annotations: r.annotations
+ _namespace: release.namespace
+ if r.cluster != _|_ {
+ _cluster: r.cluster
+ }
+ if r.targetNamespace != _|_ {
+ _targetNamespace: r.targetNamespace
+ }
}
}
}
@@ -441,9 +499,10 @@
images: {...}
charts: {...}
helm: {...}
+ clusterProxy: {...}
images: {
- for key, value in images {
- "\(key)": #Image & value
+ for k, v in images {
+ "\(k)": #Image & v
}
}
charts: {
@@ -453,10 +512,24 @@
}
}
}
+ helm: {
+ for k, v in helm {
+ "\(k)": #Helm & v & {
+ name: k
+ }
+ }
+ }
+ clusterProxy: {
+ for k, v in clusterProxy {
+ "\(k)": #ClusterProxy & v
+ }
+ }
helmR: {
- for key, value in helm {
- "\(key)": #Helm & value & {
- name: key
+ for k, v in helm {
+ "\(k)": v & {
+ if v.cluster == _|_ && input.cluster != _|_ {
+ cluster: input.cluster
+ }
}
}
...
@@ -472,6 +545,12 @@
branch: "main"
path: "charts/volumes"
}
+ secret: {
+ kind: "GitRepository"
+ address: "https://code.v1.dodo.cloud/helm-charts"
+ branch: "main"
+ path: "charts/secret"
+ }
...
}
volumes: {...}
@@ -571,3 +650,43 @@
out: {}
_codeServerPortName: "code-server"
+
+resources: { ... }
+
+#ClusterProxy: {
+ from: string
+ to: string
+}
+
+if input.cluster != _|_ {
+ {
+ out: {
+ charts: {
+ secret: {
+ kind: "GitRepository"
+ address: "https://code.v1.dodo.cloud/helm-charts"
+ branch: "main"
+ path: "charts/secret"
+ }
+ }
+ helm: {
+ "cluster-kubeconfig": {
+ chart: charts.secret
+ cluster: null
+ info: "Connecting to \(input.cluster.name) cluster"
+ values: {
+ name: "cluster-kubeconfig"
+ key: "kubeconfig"
+ value: base64.Encode(null, input.cluster.kubeconfig)
+ keep: true
+ }
+ }
+ }
+ }
+ }
+
+ namespaces: [{
+ name: release.namespace
+ kubeconfig: input.cluster.kubeconfig
+ }]
+}
diff --git a/core/installer/app_configs/app_global_env.cue b/core/installer/app_configs/app_global_env.cue
index e8c8f57..8d4a7db 100644
--- a/core/installer/app_configs/app_global_env.cue
+++ b/core/installer/app_configs/app_global_env.cue
@@ -1,3 +1,7 @@
+import (
+ "strings"
+)
+
#Global: {
id: string | *""
pcloudEnvName: string | *""
@@ -44,6 +48,15 @@
_authProxyName: "\(name)-auth-proxy"
_authProxyHTTPPortName: "http"
+ if input.cluster != _|_ {
+ clusterProxy: {
+ "\(name)": {
+ from: _domain
+ _sanitizedDomain: strings.Replace(_domain, ".", "-", -1)
+ to: "\(_sanitizedDomain).\(input.cluster.name).cluster.\(global.privateDomain)"
+ }
+ }
+ }
images: {
authProxy: {
repository: "giolekva"
@@ -71,6 +84,8 @@
"\(name)-auth-proxy": {
chart: charts.authProxy
info: "Installing authentication proxy"
+ // NOTE(gio): Force to install in default cluster.
+ cluster: null
_name: name
values: {
name: _authProxyName
@@ -94,25 +109,28 @@
}
}
}
- "\(name)-ingress": {
- chart: charts.ingress
- _service: service
- info: "Generating TLS certificate for https://\(_domain)"
- annotations: {
- "dodo.cloud/resource-type": "ingress"
- "dodo.cloud/resource.ingress.host": "https://\(_domain)"
- }
- values: {
- domain: _domain
- appRoot: _appRoot
- ingressClassName: network.ingressClass
- certificateIssuer: network.certificateIssuer
- service: {
- if auth.enabled {
- name: _authProxyName
- port: name: _authProxyHTTPPortName
+ if input.cluster != _|_ {
+ "\(name)-ingress-\(input.cluster.name)": {
+ chart: charts.ingress
+ cluster: input.cluster
+ _service: service
+ _sanitizedDomain: strings.Replace(_domain, ".", "-", -1)
+ _clusterDomain: "\(_sanitizedDomain).\(input.cluster.name).cluster.\(global.privateDomain)"
+ info: "Configuring secure route to \(input.cluster.name) cluster"
+ annotations: {
+ // TODO(gio): Change type to cluster-gateway or sth similar.
+ "dodo.cloud/resource-type": "ingress"
+ "dodo.cloud/resource.ingress.host": "https://\(_clusterDomain)"
+ }
+ values: {
+ domain: _clusterDomain
+ ingressClassName: input.cluster.ingressClassName
+ certificateIssuer: ""
+ annotations: {
+ "nginx.ingress.kubernetes.io/force-ssl-redirect": "false"
+ "nginx.ingress.kubernetes.io/ssl-redirect": "false"
}
- if !auth.enabled {
+ service: {
name: _service.name
if _service.port.name != _|_ {
port: name: _service.port.name
@@ -123,6 +141,70 @@
}
}
}
+ "\(release.appInstanceId)-\(name)-ingress": {
+ chart: charts.ingress
+ // NOTE(gio): Force to install in default cluster.
+ cluster: null
+ // TODO(gio): take it from input.network.namespace
+ targetNamespace: "\(global.namespacePrefix)ingress-private"
+ _service: service
+ info: "Generating TLS certificate for https://\(_domain)"
+ annotations: {
+ "dodo.cloud/resource-type": "ingress"
+ "dodo.cloud/resource.ingress.host": "https://\(_domain)"
+ }
+ values: {
+ domain: _domain
+ appRoot: _appRoot
+ ingressClassName: network.ingressClass
+ certificateIssuer: network.certificateIssuer
+ service: {
+ if auth.enabled {
+ name: _authProxyName
+ port: name: _authProxyHTTPPortName
+ }
+ if !auth.enabled {
+ // TODO(gio): make this variables part of the env configuration
+ name: "proxy-backend-service"
+ port: name: "http"
+ }
+ }
+ }
+ }
+ }
+ if input.cluster == _|_ {
+ "\(name)-ingress": {
+ chart: charts.ingress
+ // NOTE(gio): Force to install in default cluster.
+ cluster: null
+ _service: service
+ info: "Generating TLS certificate for https://\(_domain)"
+ annotations: {
+ "dodo.cloud/resource-type": "ingress"
+ "dodo.cloud/resource.ingress.host": "https://\(_domain)"
+ }
+ values: {
+ domain: _domain
+ appRoot: _appRoot
+ ingressClassName: network.ingressClass
+ certificateIssuer: network.certificateIssuer
+ service: {
+ if auth.enabled {
+ name: _authProxyName
+ port: name: _authProxyHTTPPortName
+ }
+ if !auth.enabled {
+ name: _service.name
+ if _service.port.name != _|_ {
+ port: name: _service.port.name
+ }
+ if _service.port.number != _|_ {
+ port: number: _service.port.number
+ }
+ }
+ }
+ }
+ }
}
}
}
@@ -138,6 +220,14 @@
}
...
}
+ clusterProxy: {
+ for k, v in ingress {
+ for i, j in v.clusterProxy {
+ "\(k)-\(i)": j
+ }
+ }
+ ...
+ }
images: {
for k, v in ingress {
for x, y in v.images {
diff --git a/core/installer/app_configs/dodo_app.cue b/core/installer/app_configs/dodo_app.cue
index 2456838..b9357ec 100644
--- a/core/installer/app_configs/dodo_app.cue
+++ b/core/installer/app_configs/dodo_app.cue
@@ -32,6 +32,7 @@
loginServer: "https://headscale.\(global.domain)"
authKey: input.vpnAuthKey
}
+ codeServerEnabled: true
cpuCores: 1
memory: "1Gi"
cloudInit: {
diff --git a/core/installer/app_manager.go b/core/installer/app_manager.go
index d31d9ff..0bddf29 100644
--- a/core/installer/app_manager.go
+++ b/core/installer/app_manager.go
@@ -13,7 +13,9 @@
"strings"
"sync"
+ "github.com/giolekva/pcloud/core/installer/cluster"
gio "github.com/giolekva/pcloud/core/installer/io"
+ "github.com/giolekva/pcloud/core/installer/kube"
"github.com/giolekva/pcloud/core/installer/soft"
helmv2 "github.com/fluxcd/helm-controller/api/v2"
@@ -31,36 +33,39 @@
type AppManager struct {
l sync.Locker
- repoIO soft.RepoIO
+ repo soft.RepoIO
nsc NamespaceCreator
jc JobCreator
hf HelmFetcher
vpnAPIClient VPNAPIClient
+ cnc ClusterNetworkConfigurator
appDirRoot string
}
func NewAppManager(
- repoIO soft.RepoIO,
+ repo soft.RepoIO,
nsc NamespaceCreator,
jc JobCreator,
hf HelmFetcher,
vpnKeyGen VPNAPIClient,
+ cnc ClusterNetworkConfigurator,
appDirRoot string,
) (*AppManager, error) {
return &AppManager{
&sync.Mutex{},
- repoIO,
+ repo,
nsc,
jc,
hf,
vpnKeyGen,
+ cnc,
appDirRoot,
}, nil
}
func (m *AppManager) Config() (EnvConfig, error) {
var cfg EnvConfig
- if err := soft.ReadYaml(m.repoIO, configFileName, &cfg); err != nil {
+ if err := soft.ReadYaml(m.repo, configFileName, &cfg); err != nil {
return EnvConfig{}, err
} else {
return cfg, nil
@@ -69,7 +74,7 @@
func (m *AppManager) appConfig(path string) (AppInstanceConfig, error) {
var cfg AppInstanceConfig
- if err := soft.ReadJson(m.repoIO, path, &cfg); err != nil {
+ if err := soft.ReadJson(m.repo, path, &cfg); err != nil {
return AppInstanceConfig{}, err
} else {
return cfg, nil
@@ -77,9 +82,12 @@
}
func (m *AppManager) GetAllInstances() ([]AppInstanceConfig, error) {
- m.repoIO.Pull()
- kust, err := soft.ReadKustomization(m.repoIO, filepath.Join(m.appDirRoot, "kustomization.yaml"))
+ m.repo.Pull()
+ kust, err := soft.ReadKustomization(m.repo, filepath.Join(m.appDirRoot, "kustomization.yaml"))
if err != nil {
+ if errors.Is(err, fs.ErrNotExist) {
+ return nil, nil
+ }
return nil, err
}
ret := make([]AppInstanceConfig, 0)
@@ -95,7 +103,7 @@
}
func (m *AppManager) GetAllAppInstances(name string) ([]AppInstanceConfig, error) {
- kust, err := soft.ReadKustomization(m.repoIO, filepath.Join(m.appDirRoot, "kustomization.yaml"))
+ kust, err := soft.ReadKustomization(m.repo, filepath.Join(m.appDirRoot, "kustomization.yaml"))
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
return nil, nil
@@ -120,27 +128,12 @@
func (m *AppManager) GetInstance(id string) (*AppInstanceConfig, error) {
appDir := filepath.Clean(filepath.Join(m.appDirRoot, id))
cfgPath := filepath.Join(appDir, "config.json")
- // kust, err := soft.ReadKustomization(m.repoIO, filepath.Join(m.appDirRoot, "kustomization.yaml"))
- // if err != nil {
- // return nil, err
- // }
- // for _, app := range kust.Resources {
- // if app == id {
- // cfg, err := m.appConfig(filepath.Join(m.appDirRoot, app, "config.json"))
cfg, err := m.appConfig(cfgPath)
if err != nil {
return nil, err
}
cfg.Id = id
return &cfg, err
- // if err != nil {
- // return nil, err
- // }
- // cfg.Id = id
- // return &cfg, nil
- // }
- // }
- // return nil, ErrorNotFound
}
func GetCueAppData(fs soft.RepoFS, dir string) (CueAppData, error) {
@@ -162,7 +155,7 @@
}
func (m *AppManager) GetInstanceApp(id string) (EnvApp, error) {
- cfg, err := GetCueAppData(m.repoIO, filepath.Join(m.appDirRoot, id))
+ cfg, err := GetCueAppData(m.repo, filepath.Join(m.appDirRoot, id))
if err != nil {
return nil, err
}
@@ -339,7 +332,7 @@
dopts = append(dopts, soft.WithNoLock())
}
_, err := repo.Do(func(r soft.RepoFS) (string, error) {
- if err := r.RemoveDir(appDir); err != nil {
+ if err := r.RemoveAll(appDir); err != nil {
return "", err
}
resourcesDir := path.Join(appDir, "resources")
@@ -428,7 +421,7 @@
}
appDir = filepath.Clean(appDir)
if !o.NoPull {
- if err := m.repoIO.Pull(); err != nil {
+ if err := m.repo.Pull(); err != nil {
return ReleaseResources{}, err
}
}
@@ -456,6 +449,10 @@
return ReleaseResources{}, err
}
}
+ clusters, err := m.GetClusters()
+ if err != nil {
+ return ReleaseResources{}, err
+ }
var lg LocalChartGenerator
if o.LG != nil {
lg = o.LG
@@ -465,10 +462,10 @@
release := Release{
AppInstanceId: instanceId,
Namespace: namespace,
- RepoAddr: m.repoIO.FullAddress(),
+ RepoAddr: m.repo.FullAddress(),
AppDir: appDir,
}
- rendered, err := app.Render(release, env, networks, values, nil, m.vpnAPIClient)
+ rendered, err := app.Render(release, env, networks, ToAccessConfigs(clusters), values, nil, m.vpnAPIClient)
if err != nil {
return ReleaseResources{}, err
}
@@ -492,7 +489,7 @@
return ReleaseResources{}, err
}
}
- charts, err := pullHelmCharts(m.hf, rendered.HelmCharts, m.repoIO, "/helm-charts")
+ charts, err := pullHelmCharts(m.hf, rendered.HelmCharts, m.repo, "/helm-charts")
if err != nil {
return ReleaseResources{}, err
}
@@ -500,17 +497,37 @@
if o.FetchContainerImages {
release.ImageRegistry = imageRegistry
}
- rendered, err = app.Render(release, env, networks, values, localCharts, m.vpnAPIClient)
+ rendered, err = app.Render(release, env, networks, ToAccessConfigs(clusters), values, localCharts, m.vpnAPIClient)
if err != nil {
return ReleaseResources{}, err
}
- if err := installApp(m.repoIO, appDir, rendered.Name, rendered.Config, rendered.Resources, rendered.Data, opts...); err != nil {
+ for _, ns := range rendered.Namespaces {
+ if ns.Name == "" {
+ return ReleaseResources{}, fmt.Errorf("namespace name missing")
+ }
+ if ns.Kubeconfig == "" {
+ continue
+ }
+ nsc, err := NewNamespaceCreator(kube.KubeConfigOpts{KubeConfig: ns.Kubeconfig})
+ if err != nil {
+ return ReleaseResources{}, err
+ }
+ if err := nsc.Create(ns.Name); err != nil {
+ return ReleaseResources{}, err
+ }
+ }
+ if err := installApp(m.repo, appDir, rendered.Name, rendered.Config, rendered.Resources, rendered.Data, opts...); err != nil {
return ReleaseResources{}, err
}
// TODO(gio): add ingress-nginx to release resources
if err := openPorts(rendered.Ports, portReservations, allocators); err != nil {
return ReleaseResources{}, err
}
+ for _, p := range rendered.ClusterProxies {
+ if err := m.cnc.AddProxy(p.From, p.To); err != nil {
+ return ReleaseResources{}, err
+ }
+ }
return ReleaseResources{
Release: rendered.Config.Release,
RenderedRaw: rendered.Raw,
@@ -568,7 +585,7 @@
) (ReleaseResources, error) {
m.l.Lock()
defer m.l.Unlock()
- if err := m.repoIO.Pull(); err != nil {
+ if err := m.repo.Pull(); err != nil {
return ReleaseResources{}, err
}
env, err := m.Config()
@@ -585,7 +602,7 @@
if err != nil {
return ReleaseResources{}, err
}
- renderedCfg, err := readRendered(m.repoIO, filepath.Join(instanceDir, "rendered.json"))
+ renderedCfg, err := readRendered(m.repo, filepath.Join(instanceDir, "rendered.json"))
if err != nil {
return ReleaseResources{}, err
}
@@ -593,13 +610,60 @@
if err != nil {
return ReleaseResources{}, err
}
- rendered, err := app.Render(config.Release, env, networks, values, renderedCfg.LocalCharts, m.vpnAPIClient)
+ clusters, err := m.GetClusters()
if err != nil {
return ReleaseResources{}, err
}
- if err := installApp(m.repoIO, instanceDir, rendered.Name, rendered.Config, rendered.Resources, rendered.Data, opts...); err != nil {
+ rendered, err := app.Render(config.Release, env, networks, ToAccessConfigs(clusters), values, renderedCfg.LocalCharts, m.vpnAPIClient)
+ if err != nil {
return ReleaseResources{}, err
}
+ for _, ns := range rendered.Namespaces {
+ if ns.Name == "" {
+ return ReleaseResources{}, fmt.Errorf("namespace name missing")
+ }
+ if ns.Kubeconfig == "" {
+ continue
+ }
+ nsc, err := NewNamespaceCreator(kube.KubeConfigOpts{KubeConfig: ns.Kubeconfig})
+ if err != nil {
+ return ReleaseResources{}, err
+ }
+ if err := nsc.Create(ns.Name); err != nil {
+ return ReleaseResources{}, err
+ }
+ }
+ if err := installApp(m.repo, instanceDir, rendered.Name, rendered.Config, rendered.Resources, rendered.Data, opts...); err != nil {
+ return ReleaseResources{}, err
+ }
+ for _, ocp := range renderedCfg.Out.ClusterProxy {
+ found := false
+ for _, ncp := range rendered.ClusterProxies {
+ if ocp == ncp {
+ found = true
+ break
+ }
+ }
+ if !found {
+ if err := m.cnc.RemoveProxy(ocp.From, ocp.To); err != nil {
+ return ReleaseResources{}, err
+ }
+ }
+ }
+ for _, ncp := range rendered.ClusterProxies {
+ found := false
+ for _, ocp := range renderedCfg.Out.ClusterProxy {
+ if ocp == ncp {
+ found = true
+ break
+ }
+ }
+ if !found {
+ if err := m.cnc.AddProxy(ncp.From, ncp.To); err != nil {
+ return ReleaseResources{}, err
+ }
+ }
+ }
return ReleaseResources{
Release: rendered.Config.Release,
RenderedRaw: rendered.Raw,
@@ -610,18 +674,18 @@
func (m *AppManager) Remove(instanceId string) error {
m.l.Lock()
defer m.l.Unlock()
- if err := m.repoIO.Pull(); err != nil {
+ if err := m.repo.Pull(); err != nil {
return err
}
var cfg renderedInstance
- if _, err := m.repoIO.Do(func(r soft.RepoFS) (string, error) {
+ if _, err := m.repo.Do(func(r soft.RepoFS) (string, error) {
instanceDir := filepath.Join(m.appDirRoot, instanceId)
- renderedCfg, err := readRendered(m.repoIO, filepath.Join(instanceDir, "rendered.json"))
+ renderedCfg, err := readRendered(m.repo, filepath.Join(instanceDir, "rendered.json"))
if err != nil {
return "", err
}
cfg = renderedCfg
- r.RemoveDir(instanceDir)
+ r.RemoveAll(instanceDir)
kustPath := filepath.Join(m.appDirRoot, "kustomization.yaml")
kust, err := soft.ReadKustomization(r, kustPath)
if err != nil {
@@ -636,6 +700,11 @@
if err := closePorts(cfg.PortForward); err != nil {
return err
}
+ for _, cp := range cfg.Out.ClusterProxy {
+ if err := m.cnc.RemoveProxy(cp.From, cp.To); err != nil {
+ return err
+ }
+ }
for vmName, vmCfg := range cfg.Out.VM {
if vmCfg.VPN.Enabled {
if err := m.vpnAPIClient.ExpireNode(vmCfg.Username, vmName); err != nil {
@@ -692,6 +761,36 @@
return ret, nil
}
+func (m *AppManager) GetClusters() ([]cluster.State, error) {
+ ret := []cluster.State{
+ {
+ Name: "default",
+ },
+ }
+ files, err := m.repo.ListDir("/clusters")
+ if err != nil {
+ if errors.Is(err, fs.ErrNotExist) {
+ return ret, nil
+ }
+ return nil, err
+ }
+ for _, f := range files {
+ if !f.IsDir() {
+ continue
+ }
+ cfgPath := filepath.Clean(filepath.Join("/clusters", f.Name(), "config.json"))
+ var c cluster.State
+ if err := soft.ReadJson(m.repo, cfgPath, &c); err != nil {
+ if errors.Is(err, fs.ErrNotExist) {
+ continue
+ }
+ return nil, err
+ }
+ ret = append(ret, c)
+ }
+ return ret, nil
+}
+
type installOptions struct {
NoPull bool
NoPublish bool
@@ -968,7 +1067,8 @@
}
type outRendered struct {
- VM map[string]vmRendered `json:"vm"`
+ ClusterProxy map[string]ClusterProxy
+ VM map[string]vmRendered `json:"vm"`
}
type vmRendered struct {
@@ -1028,6 +1128,8 @@
return []string{""}
case KindVPNAuthKey:
return []string{}
+ case KindCluster:
+ return []string{}
default:
panic("MUST NOT REACH!")
}
@@ -1063,3 +1165,25 @@
}
return nil
}
+
+type Cluster struct {
+ Name string `json:"name"`
+ Kubeconfig string `json:"kubeconfig"`
+ IngressClassName string `json:"ingressClassName"`
+}
+
+func ClusterStateToAccessConfig(c cluster.State) Cluster {
+ return Cluster{
+ Name: c.Name,
+ Kubeconfig: c.Kubeconfig,
+ IngressClassName: c.IngressClassName,
+ }
+}
+
+func ToAccessConfigs(clusters []cluster.State) []Cluster {
+ ret := make([]Cluster, 0, len(clusters))
+ for _, c := range clusters {
+ ret = append(ret, ClusterStateToAccessConfig(c))
+ }
+ return ret
+}
diff --git a/core/installer/app_repository.go b/core/installer/app_repository.go
index 0de0c5a..e4bfa4d 100644
--- a/core/installer/app_repository.go
+++ b/core/installer/app_repository.go
@@ -54,6 +54,7 @@
"values-tmpl/launcher.cue",
"values-tmpl/env-dns.cue",
"values-tmpl/launcher.cue",
+ "values-tmpl/cluster-network.cue",
}
var infraAppConfigs = []string{
@@ -99,10 +100,7 @@
func CreateAllApps() []App {
return append(
createInfraApps(),
- append(
- CreateEnvApps(storeEnvAppConfigs),
- CreateEnvApps(envAppConfigs)...,
- )...,
+ CreateAllEnvApps()...,
)
}
@@ -123,6 +121,13 @@
return CreateEnvApps(storeEnvAppConfigs)
}
+func CreateAllEnvApps() []App {
+ return append(
+ CreateStoreApps(),
+ CreateEnvApps(envAppConfigs)...,
+ )
+}
+
func CreateEnvApps(configs []string) []App {
ret := make([]App, 0)
for _, cfgFile := range configs {
diff --git a/core/installer/app_test.go b/core/installer/app_test.go
index 18cf85c..4f3f7dd 100644
--- a/core/installer/app_test.go
+++ b/core/installer/app_test.go
@@ -82,7 +82,7 @@
"groups": "a,b",
},
}
- rendered, err := a.Render(release, env, networks, values, nil, nil)
+ rendered, err := a.Render(release, env, networks, nil, values, nil, nil)
if err != nil {
t.Fatal(err)
}
@@ -112,7 +112,7 @@
"enabled": false,
},
}
- rendered, err := a.Render(release, env, networks, values, nil, nil)
+ rendered, err := a.Render(release, env, networks, nil, values, nil, nil)
if err != nil {
t.Fatal(err)
}
@@ -138,7 +138,7 @@
"network": "Public",
"authGroups": "foo,bar",
}
- rendered, err := a.Render(release, env, networks, values, nil, nil)
+ rendered, err := a.Render(release, env, networks, nil, values, nil, nil)
if err != nil {
t.Fatal(err)
}
@@ -168,7 +168,7 @@
},
"sshPort": 22,
}
- rendered, err := a.Render(release, env, networks, values, nil, nil)
+ rendered, err := a.Render(release, env, networks, nil, values, nil, nil)
if err != nil {
t.Fatal(err)
}
@@ -193,7 +193,7 @@
"subdomain": "jenkins",
"network": "Private",
}
- rendered, err := a.Render(release, env, networks, values, nil, nil)
+ rendered, err := a.Render(release, env, networks, nil, values, nil, nil)
if err != nil {
t.Fatal(err)
}
@@ -252,7 +252,7 @@
},
"sshPrivateKey": "private",
}
- rendered, err := a.Render(release, env, networks, values, nil, nil)
+ rendered, err := a.Render(release, env, networks, nil, values, nil, nil)
if err != nil {
t.Fatal(err)
}
@@ -285,7 +285,7 @@
"groups": "a,b",
},
}
- rendered, err := app.Render(release, env, networks, values, nil, nil)
+ rendered, err := app.Render(release, env, networks, nil, values, nil, nil)
if err != nil {
t.Fatal(err)
}
@@ -388,7 +388,7 @@
AppDir: "/foo/bar",
}
keyGen := testKeyGen{}
- r, err := app.Render(release, env, networks, map[string]any{
+ r, err := app.Render(release, env, networks, nil, map[string]any{
"repoAddr": "",
"repoPublicAddr": "",
"managerAddr": "",
@@ -425,7 +425,7 @@
AppDir: "/foo/bar",
}
keyGen := testKeyGen{}
- r, err := app.Render(release, env, networks, map[string]any{
+ r, err := app.Render(release, env, networks, nil, map[string]any{
"repoAddr": "",
"repoPublicAddr": "",
"managerAddr": "",
@@ -463,7 +463,7 @@
"gitRepoPublicKey": "",
"username": "",
}
- rendered, err := a.Render(release, env, networks, values, nil, nil)
+ rendered, err := a.Render(release, env, networks, nil, values, nil, nil)
if err != nil {
t.Fatal(err)
}
@@ -487,3 +487,37 @@
}
t.Log(app.Schema())
}
+
+func TestAppVirtualMachine(t *testing.T) {
+ contents, err := valuesTmpls.ReadFile("values-tmpl/virtual-machine.cue")
+ if err != nil {
+ t.Fatal(err)
+ }
+ app, err := NewCueEnvApp(CueAppData{
+ "base.cue": []byte(cueBaseConfig),
+ "app.cue": []byte(contents),
+ "global.cue": []byte(cueEnvAppGlobal),
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ release := Release{
+ Namespace: "foo",
+ }
+ values := map[string]any{
+ "name": "foo",
+ "username": "bar",
+ "cpuCores": 1,
+ "memory": "1Gi",
+ }
+ rendered, err := app.Render(release, env, networks, nil, values, nil, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ for _, r := range rendered.Resources {
+ t.Log(string(r))
+ }
+ for _, r := range rendered.Data {
+ t.Log(string(r))
+ }
+}
diff --git a/core/installer/cluster.go b/core/installer/cluster.go
new file mode 100644
index 0000000..9ebffcf
--- /dev/null
+++ b/core/installer/cluster.go
@@ -0,0 +1,326 @@
+package installer
+
+import (
+ "bytes"
+ "crypto/sha256"
+ "encoding/base64"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "text/template"
+
+ "github.com/giolekva/pcloud/core/installer/soft"
+
+ "sigs.k8s.io/yaml"
+)
+
+type ClusterNetworkConfigurator interface {
+ AddCluster(name string, ingressIP net.IP) error
+ RemoveCluster(name string, ingressIP net.IP) error
+ AddProxy(src, dst string) error
+ RemoveProxy(src, dst string) error
+}
+
+type NginxProxyConfigurator struct {
+ PrivateSubdomain string
+ DNSAPIAddr string
+ Repo soft.RepoIO
+ NginxConfigPath string
+}
+
+type createARecordReq struct {
+ Entry string `json:"entry"`
+ IP net.IP `json:"text"`
+}
+
+func (c *NginxProxyConfigurator) AddCluster(name string, ingressIP net.IP) error {
+ req := createARecordReq{
+ Entry: fmt.Sprintf("*.%s.cluster.%s", name, c.PrivateSubdomain),
+ IP: ingressIP,
+ }
+ var buf bytes.Buffer
+ if err := json.NewEncoder(&buf).Encode(req); err != nil {
+ return err
+ }
+ resp, err := http.Post(fmt.Sprintf("%s/create-a-record", c.DNSAPIAddr), "application/json", &buf)
+ if err != nil {
+ return err
+ }
+ if resp.StatusCode != http.StatusOK {
+ var buf bytes.Buffer
+ io.Copy(&buf, resp.Body)
+ return fmt.Errorf(buf.String())
+ }
+ return nil
+}
+
+func (c *NginxProxyConfigurator) RemoveCluster(name string, ingressIP net.IP) error {
+ req := createARecordReq{
+ Entry: fmt.Sprintf("*.%s.cluster.%s", name, c.PrivateSubdomain),
+ IP: ingressIP,
+ }
+ var buf bytes.Buffer
+ if err := json.NewEncoder(&buf).Encode(req); err != nil {
+ return err
+ }
+ resp, err := http.Post(fmt.Sprintf("%s/delete-a-record", c.DNSAPIAddr), "application/json", &buf)
+ if err != nil {
+ return err
+ }
+ if resp.StatusCode != http.StatusOK {
+ var buf bytes.Buffer
+ io.Copy(&buf, resp.Body)
+ return fmt.Errorf(buf.String())
+ }
+ return nil
+}
+
+func (c *NginxProxyConfigurator) AddProxy(src, dst string) error {
+ _, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
+ cfg, err := func() (NginxProxyConfig, error) {
+ r, err := fs.Reader(c.NginxConfigPath)
+ if err != nil {
+ return NginxProxyConfig{}, err
+ }
+ defer r.Close()
+ return ParseNginxProxyConfig(r)
+ }()
+ if err != nil {
+ return "", err
+ }
+ if v, ok := cfg.Proxies[src]; ok {
+ return "", fmt.Errorf("mapping from %s already exists (%s)", src, v)
+ }
+ cfg.Proxies[src] = dst
+ w, err := fs.Writer(c.NginxConfigPath)
+ if err != nil {
+ return "", err
+ }
+ defer w.Close()
+ h := sha256.New()
+ o := io.MultiWriter(w, h)
+ if err := cfg.Render(o); err != nil {
+ return "", err
+ }
+ hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
+ nginxPath := filepath.Join(filepath.Dir(c.NginxConfigPath), "ingress-nginx.yaml")
+ nginx, err := func() (map[string]any, error) {
+ r, err := fs.Reader(nginxPath)
+ if err != nil {
+ return nil, err
+ }
+ defer r.Close()
+ var buf bytes.Buffer
+ if _, err := io.Copy(&buf, r); err != nil {
+ return nil, err
+ }
+ ret := map[string]any{}
+ if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
+ return nil, err
+ }
+ return ret, nil
+ }()
+ if err != nil {
+ return "", err
+ }
+ cv := nginx["spec"].(map[string]any)["values"].(map[string]any)["controller"].(map[string]any)
+ var annotations map[string]any
+ if a, ok := cv["podAnnotations"]; ok {
+ annotations = a.(map[string]any)
+ } else {
+ annotations = map[string]any{}
+ cv["podAnnotations"] = annotations
+ }
+ annotations["dodo.cloud/hash"] = string(hash)
+ buf, err := yaml.Marshal(nginx)
+ if err != nil {
+ return "", err
+ }
+ w, err = fs.Writer(nginxPath)
+ if err != nil {
+ return "", err
+ }
+ defer w.Close()
+ if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
+ return "", err
+ }
+ return fmt.Sprintf("add proxy mapping: %s %s", src, dst), nil
+ })
+ return err
+}
+
+func (c *NginxProxyConfigurator) RemoveProxy(src, dst string) error {
+ _, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
+ cfg, err := func() (NginxProxyConfig, error) {
+ r, err := fs.Reader(c.NginxConfigPath)
+ if err != nil {
+ return NginxProxyConfig{}, err
+ }
+ defer r.Close()
+ return ParseNginxProxyConfig(r)
+ }()
+ if err != nil {
+ return "", err
+ }
+ if v, ok := cfg.Proxies[src]; !ok || v != dst {
+ return "", fmt.Errorf("mapping does not exist: %s %s", src, dst)
+ }
+ delete(cfg.Proxies, src)
+ w, err := fs.Writer(c.NginxConfigPath)
+ if err != nil {
+ return "", err
+ }
+ defer w.Close()
+ h := sha256.New()
+ o := io.MultiWriter(w, h)
+ if err := cfg.Render(o); err != nil {
+ return "", err
+ }
+ hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
+ nginxPath := filepath.Join(filepath.Dir(c.NginxConfigPath), "ingress-nginx.yaml")
+ nginx, err := func() (map[string]any, error) {
+ r, err := fs.Reader(nginxPath)
+ if err != nil {
+ return nil, err
+ }
+ defer r.Close()
+ var buf bytes.Buffer
+ if _, err := io.Copy(&buf, r); err != nil {
+ return nil, err
+ }
+ ret := map[string]any{}
+ if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
+ return nil, err
+ }
+ return ret, nil
+ }()
+ if err != nil {
+ return "", err
+ }
+ cv := nginx["spec"].(map[string]any)["values"].(map[string]any)["controller"].(map[string]any)
+ var annotations map[string]any
+ if a, ok := cv["podAnnotations"]; ok {
+ annotations = a.(map[string]any)
+ } else {
+ annotations = map[string]any{}
+ cv["podAnnotations"] = annotations
+ }
+ annotations["dodo.cloud/hash"] = string(hash)
+ buf, err := yaml.Marshal(nginx)
+ if err != nil {
+ return "", err
+ }
+ w, err = fs.Writer(nginxPath)
+ if err != nil {
+ return "", err
+ }
+ defer w.Close()
+ if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
+ return "", err
+ }
+ return fmt.Sprintf("remove proxy mapping: %s %s", src, dst), nil
+ })
+ return err
+}
+
+type NginxProxyConfig struct {
+ Port int
+ Resolvers []net.IP
+ Proxies map[string]string
+ PreConf []string
+}
+
+func ParseNginxProxyConfig(r io.Reader) (NginxProxyConfig, error) {
+ var buf strings.Builder
+ if _, err := io.Copy(&buf, r); err != nil {
+ return NginxProxyConfig{}, err
+ }
+ ret := NginxProxyConfig{
+ Port: -1,
+ Resolvers: nil,
+ Proxies: make(map[string]string),
+ }
+ lines := strings.Split(buf.String(), "\n")
+ insideConf := true
+ insideMap := false
+ for _, l := range lines {
+ items := strings.Fields(strings.TrimSuffix(l, ";"))
+ if len(items) == 0 {
+ continue
+ }
+ if strings.Contains(l, "nginx.conf") {
+ ret.PreConf = append(ret.PreConf, l)
+ insideConf = false
+ } else if insideConf {
+ ret.PreConf = append(ret.PreConf, l)
+ } else if strings.Contains(l, "listen") {
+ if len(items) < 2 {
+ return NginxProxyConfig{}, fmt.Errorf("invalid listen: %s\n", l)
+ }
+ port, err := strconv.Atoi(items[1])
+ if err != nil {
+ return NginxProxyConfig{}, err
+ }
+ ret.Port = port
+ } else if strings.Contains(l, "resolver") {
+ if len(items) < 2 {
+ return NginxProxyConfig{}, fmt.Errorf("invalid resolver: %s", l)
+ }
+ ip := net.ParseIP(items[1])
+ if ip == nil {
+ return NginxProxyConfig{}, fmt.Errorf("invalid resolver ip: %s", l)
+ }
+ ret.Resolvers = append(ret.Resolvers, ip)
+ } else if insideMap {
+ if items[0] == "}" {
+ insideMap = false
+ continue
+ }
+ if len(items) < 2 {
+ return NginxProxyConfig{}, fmt.Errorf("invalid map: %s", l)
+ }
+ ret.Proxies[items[0]] = items[1]
+ } else if items[0] == "map" {
+ insideMap = true
+ }
+ }
+ return ret, nil
+}
+
+func (c NginxProxyConfig) Render(w io.Writer) error {
+ for _, l := range c.PreConf {
+ fmt.Fprintln(w, l)
+ }
+ tmpl, err := template.New("nginx.conf").Parse(nginxConfigTmpl)
+ if err != nil {
+ return err
+ }
+ return tmpl.Execute(w, c)
+}
+
+const nginxConfigTmpl = ` worker_processes 1;
+ worker_rlimit_nofile 8192;
+ events {
+ worker_connections 1024;
+ }
+ http {
+ map $http_host $backend {
+ {{- range $from, $to := .Proxies }}
+ {{ $from }} {{ $to }};
+ {{- end }}
+ }
+ server {
+ listen {{ .Port }};
+ location / {
+ {{- range .Resolvers }}
+ resolver {{ . }};
+ {{- end }}
+ proxy_pass http://$backend;
+ }
+ }
+ }`
diff --git a/core/installer/cluster/kube.go b/core/installer/cluster/kube.go
new file mode 100644
index 0000000..ee123fa
--- /dev/null
+++ b/core/installer/cluster/kube.go
@@ -0,0 +1,280 @@
+package cluster
+
+import (
+ "context"
+ "fmt"
+ "golang.org/x/crypto/ssh"
+ "net"
+ "os"
+ "sync"
+ "time"
+
+ "github.com/giolekva/pcloud/core/installer/kube"
+
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/kubectl/pkg/drain"
+)
+
+type KubeManager struct {
+ l sync.Locker
+ name string
+ ingressClassName string
+ ingressIP net.IP
+ kubeCfg string
+ serverAddr string
+ serverToken string
+ controllers []Server
+ workers []Server
+}
+
+func NewKubeManager() *KubeManager {
+ return &KubeManager{l: &sync.Mutex{}}
+}
+
+func RestoreKubeManager(st State) (*KubeManager, error) {
+ return &KubeManager{
+ l: &sync.Mutex{},
+ name: st.Name,
+ ingressClassName: st.IngressClassName,
+ ingressIP: st.IngressIP,
+ kubeCfg: st.Kubeconfig,
+ serverAddr: st.ServerAddr,
+ serverToken: st.ServerToken,
+ controllers: st.Controllers,
+ workers: st.Workers,
+ }, nil
+}
+
+func (m *KubeManager) State() State {
+ m.l.Lock()
+ defer m.l.Unlock()
+ return State{
+ m.name,
+ m.ingressClassName,
+ m.ingressIP,
+ m.serverAddr,
+ m.serverToken,
+ m.kubeCfg,
+ m.controllers,
+ m.workers,
+ }
+}
+
+func (m *KubeManager) Init(s Server, setupFn ClusterSetupFunc) (net.IP, error) {
+ m.l.Lock()
+ defer m.l.Unlock()
+ if m.kubeCfg != "" {
+ return nil, fmt.Errorf("already initialized")
+ }
+ c, err := m.connect(&s)
+ if err != nil {
+ return nil, err
+ }
+ defer c.Close()
+ if err := InstallTailscale(c); err != nil {
+ return nil, err
+ }
+ const loginServer = "https://headscale.v1.dodo.cloud"
+ if err := TailscaleUp(c, loginServer, s.Name, s.AuthKey); err != nil {
+ return nil, err
+ }
+ if err := InstallK3s(c); err != nil {
+ return nil, err
+ }
+ kubeCfg, err := GetKubeconfig(c)
+ if err != nil {
+ return nil, err
+ }
+ m.kubeCfg = kubeCfg
+ serverIP, err := GetTailscaleIP(c)
+ if err != nil {
+ return nil, err
+ }
+ m.serverAddr = fmt.Sprintf("%s:6443", serverIP)
+ serverToken, err := GetServerToken(c)
+ if err != nil {
+ return nil, err
+ }
+ m.serverToken = serverToken
+ m.controllers = []Server{s}
+ m.ingressClassName = "default"
+ ingressIP, err := setupFn(m.name, m.kubeCfg, m.ingressClassName)
+ if err != nil {
+ return nil, err
+ }
+ m.ingressIP = ingressIP
+ return ingressIP, nil
+}
+
+func (m *KubeManager) JoinController(s Server) error {
+ m.l.Lock()
+ defer m.l.Unlock()
+ if m.kubeCfg == "" {
+ return fmt.Errorf("not initialized")
+ }
+ if i := m.findServerByIP(s.IP); i != nil {
+ return fmt.Errorf("already exists")
+ }
+ c, err := m.connect(&s)
+ if err != nil {
+ return err
+ }
+ defer c.Close()
+ if err := InstallTailscale(c); err != nil {
+ return err
+ }
+ const loginServer = "https://headscale.v1.dodo.cloud"
+ if err := TailscaleUp(c, loginServer, s.Name, s.AuthKey); err != nil {
+ return err
+ }
+ if err := InstallK3sJoinServer(c, m.serverAddr, m.serverToken); err != nil {
+ return err
+ }
+ m.controllers = append(m.controllers, s)
+ return nil
+}
+
+func (m *KubeManager) JoinWorker(s Server) error {
+ m.l.Lock()
+ defer m.l.Unlock()
+ if m.kubeCfg == "" {
+ return fmt.Errorf("not initialized")
+ }
+ if i := m.findServerByIP(s.IP); i != nil {
+ return fmt.Errorf("already exists")
+ }
+ c, err := m.connect(&s)
+ if err != nil {
+ return err
+ }
+ defer c.Close()
+ if err := InstallTailscale(c); err != nil {
+ return err
+ }
+ const loginServer = "https://headscale.v1.dodo.cloud"
+ if err := TailscaleUp(c, loginServer, s.Name, s.AuthKey); err != nil {
+ return err
+ }
+ if err := InstallK3sJoinAgent(c, m.serverAddr, m.serverToken); err != nil {
+ return err
+ }
+ m.workers = append(m.workers, s)
+ return nil
+}
+
+func (m *KubeManager) RemoveServer(name string) error {
+ m.l.Lock()
+ defer m.l.Unlock()
+ client, err := kube.NewKubeClient(kube.KubeConfigOpts{
+ KubeConfig: m.kubeCfg,
+ })
+ if err != nil {
+ return err
+ }
+ helper := &drain.Helper{
+ Ctx: context.Background(),
+ Client: client,
+ Force: true,
+ GracePeriodSeconds: -1,
+ IgnoreAllDaemonSets: true,
+ Out: os.Stdout,
+ ErrOut: os.Stdout,
+ // We want to proceed even when pods are using emptyDir volumes
+ DeleteEmptyDirData: true,
+ Timeout: 10 * time.Minute,
+ }
+ node, err := client.CoreV1().Nodes().Get(context.Background(), name, metav1.GetOptions{})
+ if err != nil {
+ return err
+ }
+ if err := drain.RunCordonOrUncordon(helper, node, true); err != nil {
+ return err
+ }
+ if err := drain.RunNodeDrain(helper, name); err != nil {
+ return err
+ }
+ if err := client.CoreV1().Nodes().Delete(context.Background(), name, metav1.DeleteOptions{}); err != nil {
+ return err
+ }
+ for i, s := range m.controllers {
+ if s.Name == name {
+ c, err := m.connect(&s)
+ if err != nil {
+ return err
+ }
+ defer c.Close()
+ if err := UninstallK3sServer(c); err != nil {
+ return err
+ }
+ m.controllers = append(m.controllers[:i], m.controllers[i+1:]...)
+ return nil
+ }
+ }
+ for i, s := range m.workers {
+ if s.Name == name {
+ c, err := m.connect(&s)
+ if err != nil {
+ return err
+ }
+ defer c.Close()
+ if err := UninstallK3sAgent(c); err != nil {
+ return err
+ }
+ m.workers = append(m.workers[:i], m.workers[i+1:]...)
+ return nil
+ }
+ }
+ return fmt.Errorf("not found")
+}
+
+// Expects manager state to be locked by caller.
+func (m *KubeManager) findServerByIP(ip net.IP) *Server {
+ for _, s := range m.controllers {
+ if s.IP.Equal(ip) {
+ return &s
+ }
+ }
+ for _, s := range m.workers {
+ if ip.Equal(s.IP) {
+ return &s
+ }
+ }
+ return nil
+}
+
+func (m *KubeManager) connect(s *Server) (*SSHClient, error) {
+ cfg := &ssh.ClientConfig{
+ User: s.User,
+ Auth: []ssh.AuthMethod{},
+ Timeout: 10 * time.Second,
+ }
+ if s.ClientKey != "" {
+ clientKey, err := ssh.ParsePrivateKey([]byte(s.ClientKey))
+ if err != nil {
+ return nil, err
+ }
+ cfg.Auth = append(cfg.Auth, ssh.PublicKeys(clientKey))
+ }
+ if s.Password != "" {
+ cfg.Auth = append(cfg.Auth, ssh.Password(s.Password))
+ }
+ if s.HostKey != "" {
+ hostKey, err := ssh.ParsePublicKey([]byte(s.HostKey))
+ if err != nil {
+ return nil, err
+ }
+ cfg.HostKeyCallback = ssh.FixedHostKey(hostKey)
+ } else {
+ cfg.HostKeyCallback = ssh.InsecureIgnoreHostKey()
+ }
+ client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", s.IP.String(), s.Port), cfg)
+ if err != nil {
+ return nil, err
+ }
+ ret := &SSHClient{client}
+ s.Name, err = GetHostname(ret)
+ if err != nil {
+ return nil, err
+ }
+ return ret, nil
+}
diff --git a/core/installer/cluster/manager.go b/core/installer/cluster/manager.go
new file mode 100644
index 0000000..c8e62d1
--- /dev/null
+++ b/core/installer/cluster/manager.go
@@ -0,0 +1,43 @@
+package cluster
+
+import (
+ "net"
+)
+
+const (
+ whichTailscale = "which tailscale"
+ tailscaleInstallCmd = "curl -fsSL https://tailscale.com/install.sh | sh"
+ tailscaleUpCmd = "sudo tailscale up --login-server=%s --auth-key=%s --hostname=%s --reset"
+)
+
+type Server struct {
+ Name string `json:"name"`
+ IP net.IP `json:"ip"`
+ Port int `json:"port"`
+ HostKey string `json:"hostKey"`
+ User string `json:"user"`
+ Password string `json:"password"`
+ ClientKey string `json:"clientKey"`
+ AuthKey string `json:"authKey"`
+}
+
+type State struct {
+ Name string `json:"name"`
+ IngressClassName string `json:"ingressClassName"`
+ IngressIP net.IP `json:"ingressIP"`
+ ServerAddr string `json:"serverAddr"`
+ ServerToken string `json:"serverToken"`
+ Kubeconfig string `json:"kubeconfig"`
+ Controllers []Server `json:"controllers"`
+ Workers []Server `json:"workers"`
+}
+
+type ClusterSetupFunc func(name, kubeconfig, ingressClassName string) (net.IP, error)
+
+type Manager interface {
+ Init(s Server, setupFn ClusterSetupFunc) (net.IP, error)
+ JoinController(s Server) error
+ JoinWorker(s Server) error
+ RemoveServer(name string) error
+ State() State
+}
diff --git a/core/installer/cluster/ssh.go b/core/installer/cluster/ssh.go
new file mode 100644
index 0000000..751a797
--- /dev/null
+++ b/core/installer/cluster/ssh.go
@@ -0,0 +1,145 @@
+package cluster
+
+import (
+ "bytes"
+ "fmt"
+ "golang.org/x/crypto/ssh"
+ "os"
+ "strings"
+)
+
+type SSHClient struct {
+ client *ssh.Client
+}
+
+func (c *SSHClient) Close() error {
+ return c.client.Close()
+}
+
+func (c *SSHClient) Exec(cmd string) (string, error) {
+ ses, err := c.client.NewSession()
+ if err != nil {
+ return "", err
+ }
+ defer ses.Close()
+ var out bytes.Buffer
+ ses.Stdout = &out
+ ses.Stderr = os.Stdout
+ err = ses.Run(cmd)
+ if err != nil {
+ return "", err
+ }
+ return out.String(), nil
+}
+
+func GetHostname(c *SSHClient) (string, error) {
+ name, err := c.Exec("hostname")
+ if err != nil {
+ return "", nil
+ }
+ return strings.TrimSpace(name), nil
+}
+
+func InstallTailscale(c *SSHClient) error {
+ return nil
+ fmt.Println("Installing Tailscale")
+ if _, err := c.Exec("which tailscale"); err == nil {
+ return nil
+ }
+ _, err := c.Exec(tailscaleInstallCmd)
+ return err
+}
+
+func TailscaleUp(c *SSHClient, loginServer, hostname, authKey string) error {
+ return nil
+ fmt.Println("Starting up Tailscale")
+ if _, err := c.Exec("sudo tailscale down"); err != nil {
+ return err
+ }
+ cmd := fmt.Sprintf(tailscaleUpCmd, loginServer, authKey, hostname)
+ fmt.Println(cmd)
+ _, err := c.Exec(cmd)
+ return err
+}
+
+func InstallK3s(c *SSHClient) error {
+ fmt.Println("Starting k3s")
+ if _, err := c.Exec("which k3s"); err == nil {
+ return nil
+ }
+ _, err := c.Exec("curl -sfL https://get.k3s.io | sh -s - --cluster-init --disable traefik --disable local-storage --disable servicelb --kube-proxy-arg proxy-mode=ipvs --kube-proxy-arg ipvs-strict-arp --flannel-backend vxlan --cluster-cidr=10.45.0.0/16 --service-cidr=10.46.0.0/16 # --flannel-iface=tailscale0")
+ return err
+}
+
+func InstallK3sJoinServer(c *SSHClient, serverAddr, token string) error {
+ fmt.Println("Starting k3s")
+ if _, err := c.Exec("which k3s"); err == nil {
+ return nil
+ }
+ _, err := c.Exec(fmt.Sprintf("curl -sfL https://get.k3s.io | sh -s - server --server=https://%s --token=%s --disable traefik --disable local-storage --disable servicelb --kube-proxy-arg proxy-mode=ipvs --kube-proxy-arg ipvs-strict-arp --flannel-backend vxlan --cluster-cidr=10.45.0.0/16 --service-cidr=10.46.0.0/16 # --flannel-iface=tailscale0", serverAddr, token))
+ return err
+}
+
+func InstallK3sJoinAgent(c *SSHClient, serverAddr, token string) error {
+ fmt.Println("Starting k3s")
+ if _, err := c.Exec("which k3s"); err == nil {
+ return nil
+ }
+ _, err := c.Exec(fmt.Sprintf("curl -sfL https://get.k3s.io | sh -s - agent --server=https://%s --token=%s", serverAddr, token))
+ return err
+}
+
+func UninstallK3sServer(c *SSHClient) error {
+ fmt.Println("Uninstalling k3s")
+ if _, err := c.Exec("which k3s-uninstall.sh"); err != nil {
+ return nil
+ }
+ _, err := c.Exec("k3s-uninstall.sh")
+ return err
+}
+
+func UninstallK3sAgent(c *SSHClient) error {
+ fmt.Println("Uninstalling k3s")
+ if _, err := c.Exec("which k3s-agent-uninstall.sh"); err != nil {
+ return nil
+ }
+ _, err := c.Exec("k3s-agent-uninstall.sh")
+ return err
+}
+
+func GetTailscaleIP(c *SSHClient) (string, error) {
+ fmt.Println("Getting Tailscale IP")
+ if _, err := c.Exec("sudo apt-get install net-tools -y"); err != nil {
+ return "", err
+ }
+ ip, err := c.Exec("sudo ifconfig | grep 10.42")
+ if err != nil {
+ return "", err
+ }
+ return strings.Fields(ip)[1], nil
+ // ip, err := c.Exec("sudo tailscale ip")
+ // return strings.TrimSpace(ip), err
+}
+
+func GetKubeconfig(c *SSHClient) (string, error) {
+ // return "", nil
+ fmt.Println("Getting Kubeconfig")
+ out, err := c.Exec("sudo cat /etc/rancher/k3s/k3s.yaml")
+ if err != nil {
+ return "", err
+ }
+ ip, err := GetTailscaleIP(c)
+ if err != nil {
+ return "", err
+ }
+ return strings.Replace(out, "server: https://127.0.0.1:6443", fmt.Sprintf("server: https://%s:6443", ip), 1), nil
+}
+
+func GetServerToken(c *SSHClient) (string, error) {
+ fmt.Println("Getting server token")
+ out, err := c.Exec("sudo cat /var/lib/rancher/k3s/server/node-token")
+ if err != nil {
+ return "", err
+ }
+ return strings.TrimSpace(out), err
+}
diff --git a/core/installer/cluster_test.go b/core/installer/cluster_test.go
new file mode 100644
index 0000000..e315202
--- /dev/null
+++ b/core/installer/cluster_test.go
@@ -0,0 +1,70 @@
+package installer
+
+import (
+ "net"
+ "strings"
+ "testing"
+)
+
+func TestParseNginxProxyConfig(t *testing.T) {
+ cfg, err := ParseNginxProxyConfig(strings.NewReader(`nginx.conf: |
+# user www www;
+worker_processes 1;
+error_log /dev/null crit;
+# pid logs/nginx.pid;
+worker_rlimit_nofile 8192;
+events {
+ worker_connections 1024;
+}
+http {
+ error_log /var/log/nginx/error.log debug;
+ log_format dodo '$http_host $proxy_host $status';
+ access_log /var/log/nginx/access.log dodo;
+ map $http_host $backend {
+ a A;
+ b B;
+ }
+ server {
+ listen 9090;
+ location / {
+ resolver 1.1.1.1;
+ resolver 2.2.2.2;
+ proxy_pass http://$backend;
+ }
+ }
+}
+`))
+ if err != nil {
+ t.Fatal(err)
+ }
+ if cfg.Port != 9090 {
+ t.Errorf("invalid port: expected 9090, got %d", cfg.Port)
+ }
+ if len(cfg.Resolvers) != 2 ||
+ !cfg.Resolvers[0].Equal(net.ParseIP("1.1.1.1")) ||
+ !cfg.Resolvers[1].Equal(net.ParseIP("2.2.2.2")) {
+ t.Errorf("invalid resolvers: expected [1.1.1.1 2.2.2.2], got %s", cfg.Resolvers)
+ }
+ if len(cfg.Proxies) != 2 ||
+ cfg.Proxies["a"] != "A" ||
+ cfg.Proxies["b"] != "B" {
+ t.Errorf("invalid proxies: expected map[a:A, b:B], got %s", cfg.Proxies)
+ }
+}
+
+func TestRenderNginxProxyConfig(t *testing.T) {
+ cfg := NginxProxyConfig{
+ Port: 8080,
+ Resolvers: []net.IP{net.ParseIP("1.1.1.1"), net.ParseIP("2.2.2.2")},
+ Proxies: map[string]string{
+ "a": "A",
+ "b": "B",
+ },
+ PreConf: []string{"line1", "line2"},
+ }
+ var buf strings.Builder
+ if err := cfg.Render(&buf); err != nil {
+ t.Fatal(err)
+ }
+ t.Log(buf.String())
+}
diff --git a/core/installer/cmd/app_manager.go b/core/installer/cmd/app_manager.go
index a6224a0..16ac83a 100644
--- a/core/installer/cmd/app_manager.go
+++ b/core/installer/cmd/app_manager.go
@@ -16,11 +16,13 @@
)
var appManagerFlags struct {
- sshKey string
- repoAddr string
- port int
- appRepoAddr string
- headscaleAPIAddr string
+ sshKey string
+ repoAddr string
+ port int
+ appRepoAddr string
+ headscaleAPIAddr string
+ dnsAPIAddr string
+ clusterProxyConfigPath string
}
func appManagerCmd() *cobra.Command {
@@ -58,6 +60,18 @@
"",
"",
)
+ cmd.Flags().StringVar(
+ &appManagerFlags.dnsAPIAddr,
+ "dns-api-addr",
+ "",
+ "",
+ )
+ cmd.Flags().StringVar(
+ &appManagerFlags.clusterProxyConfigPath,
+ "cluster-proxy-config-path",
+ "",
+ "",
+ )
return cmd
}
@@ -92,8 +106,15 @@
return err
}
hf := installer.NewGitHelmFetcher()
- vpnKeyGen := installer.NewHeadscaleAPIClient(appManagerFlags.headscaleAPIAddr)
- m, err := installer.NewAppManager(repoIO, nsc, jc, hf, vpnKeyGen, "/apps")
+ vpnAPIClient := installer.NewHeadscaleAPIClient(appManagerFlags.headscaleAPIAddr)
+ cnc := &installer.NginxProxyConfigurator{
+ // TODO(gio): read from env config
+ PrivateSubdomain: "p",
+ DNSAPIAddr: appManagerFlags.dnsAPIAddr,
+ Repo: repoIO,
+ NginxConfigPath: appManagerFlags.clusterProxyConfigPath,
+ }
+ m, err := installer.NewAppManager(repoIO, nsc, jc, hf, vpnAPIClient, cnc, "/apps")
if err != nil {
return err
}
@@ -117,16 +138,21 @@
} else {
r = installer.NewInMemoryAppRepository(installer.CreateStoreApps())
}
+ fr := installer.NewInMemoryAppRepository(installer.CreateAllEnvApps())
helmMon, err := newHelmReleaseMonitor()
if err != nil {
return err
}
s, err := welcome.NewAppManagerServer(
appManagerFlags.port,
+ repoIO,
m,
r,
+ fr,
tasks.NewFixedReconciler(env.Id, env.Id),
helmMon,
+ cnc,
+ vpnAPIClient,
)
if err != nil {
return err
diff --git a/core/installer/cmd/dodo_app.go b/core/installer/cmd/dodo_app.go
index 2fe1697..8512691 100644
--- a/core/installer/cmd/dodo_app.go
+++ b/core/installer/cmd/dodo_app.go
@@ -201,6 +201,8 @@
},
}
vpnKeyGen := installer.NewHeadscaleAPIClient(dodoAppFlags.headscaleAPIAddr)
+ // TOOD(gio): implement
+ var cnc installer.ClusterNetworkConfigurator
s, err := welcome.NewDodoAppServer(
st,
nf,
@@ -217,6 +219,7 @@
nsc,
jc,
vpnKeyGen,
+ cnc,
env,
dodoAppFlags.external,
dodoAppFlags.fetchUsersAddr,
diff --git a/core/installer/cmd/kube.go b/core/installer/cmd/kube.go
index f31ad8f..1a74731 100644
--- a/core/installer/cmd/kube.go
+++ b/core/installer/cmd/kube.go
@@ -2,10 +2,13 @@
import (
"github.com/giolekva/pcloud/core/installer"
+ "github.com/giolekva/pcloud/core/installer/kube"
)
func newNSCreator() (installer.NamespaceCreator, error) {
- return installer.NewNamespaceCreator(rootFlags.kubeConfig)
+ return installer.NewNamespaceCreator(kube.KubeConfigOpts{
+ KubeConfigPath: rootFlags.kubeConfig,
+ })
}
func newZoneFetcher() (installer.ZoneStatusFetcher, error) {
@@ -17,7 +20,9 @@
}
func newJobCreator() (installer.JobCreator, error) {
- clientset, err := installer.NewKubeConfig(rootFlags.kubeConfig)
+ clientset, err := kube.NewKubeClient(kube.KubeConfigOpts{
+ KubeConfigPath: rootFlags.kubeConfig,
+ })
if err != nil {
return nil, err
}
diff --git a/core/installer/cmd/launcher.go b/core/installer/cmd/launcher.go
index e671fe6..85e811f 100644
--- a/core/installer/cmd/launcher.go
+++ b/core/installer/cmd/launcher.go
@@ -74,7 +74,7 @@
if err != nil {
return err
}
- appManager, err := installer.NewAppManager(repoIO, nil, nil, nil, nil, "/apps")
+ appManager, err := installer.NewAppManager(repoIO, nil, nil, nil, nil, nil, "/apps")
if err != nil {
return err
}
diff --git a/core/installer/cmd/rewrite.go b/core/installer/cmd/rewrite.go
index 3e2961b..0f018e4 100644
--- a/core/installer/cmd/rewrite.go
+++ b/core/installer/cmd/rewrite.go
@@ -1,7 +1,6 @@
package main
import (
- "fmt"
"log"
"os"
@@ -62,16 +61,7 @@
log.Println("Creating repository")
r := installer.NewInMemoryAppRepository(installer.CreateAllApps())
hf := installer.NewGitHelmFetcher()
- mgr, err := installer.NewAppManager(repoIO, nil, nil, hf, nil, "/apps")
- if err != nil {
- return err
- }
- env, err := mgr.Config()
- if err != nil {
- return err
- }
- fmt.Printf("%+v\n", env)
- log.Println("Read config")
+ mgr, err := installer.NewAppManager(repoIO, nil, nil, hf, nil, nil, "/apps")
if err != nil {
return err
}
diff --git a/core/installer/derived.go b/core/installer/derived.go
index 0351b21..6c1305c 100644
--- a/core/installer/derived.go
+++ b/core/installer/derived.go
@@ -6,6 +6,8 @@
"strings"
)
+const defaultClusterName = "default"
+
type Release struct {
AppInstanceId string `json:"appInstanceId"`
Namespace string `json:"namespace"`
@@ -69,6 +71,7 @@
values any,
schema Schema,
networks []Network,
+ clusters []Cluster,
vpnKeyGen VPNAPIClient,
) (map[string]any, error) {
ret := make(map[string]any)
@@ -95,7 +98,9 @@
// TODO(gio): Improve getField
enabled, ok = getField(root, v).(bool)
if !ok {
- return nil, fmt.Errorf("could not resolve enabled: %+v %s %+v", def.Meta(), v, root)
+ enabled = false
+ // TODO(gio): validate that enabled field exists in the schema
+ // return nil, fmt.Errorf("could not resolve enabled: %+v %s %+v", def.Meta(), v, root)
}
}
if !enabled {
@@ -164,20 +169,36 @@
picked = append(picked, n)
}
ret[k] = picked
+ case KindCluster:
+ name, ok := v.(string)
+ if !ok {
+ // TODO(gio): validate that value has cluster schema
+ ret[k] = v
+ } else {
+ c, err := findCluster(clusters, name)
+ if err != nil {
+ return nil, err
+ }
+ if c == nil {
+ delete(ret, k)
+ } else {
+ ret[k] = c
+ }
+ }
case KindAuth:
- r, err := deriveValues(root, v, AuthSchema, networks, vpnKeyGen)
+ r, err := deriveValues(root, v, AuthSchema, networks, clusters, vpnKeyGen)
if err != nil {
return nil, err
}
ret[k] = r
case KindSSHKey:
- r, err := deriveValues(root, v, SSHKeySchema, networks, vpnKeyGen)
+ r, err := deriveValues(root, v, SSHKeySchema, networks, clusters, vpnKeyGen)
if err != nil {
return nil, err
}
ret[k] = r
case KindStruct:
- r, err := deriveValues(root, v, def, networks, vpnKeyGen)
+ r, err := deriveValues(root, v, def, networks, clusters, vpnKeyGen)
if err != nil {
return nil, err
}
@@ -274,6 +295,16 @@
return nil, err
}
ret[k] = r
+ case KindCluster:
+ vm, ok := v.(map[string]any)
+ if !ok {
+ return nil, fmt.Errorf("expected map")
+ }
+ name, ok := vm["name"]
+ if !ok {
+ return nil, fmt.Errorf("expected cluster name")
+ }
+ ret[k] = name
default:
return nil, fmt.Errorf("Should not reach!")
}
@@ -289,3 +320,15 @@
}
return Network{}, fmt.Errorf("Network not found: %s", name)
}
+
+func findCluster(clusters []Cluster, name string) (*Cluster, error) {
+ if name == defaultClusterName {
+ return nil, nil
+ }
+ for _, c := range clusters {
+ if c.Name == name {
+ return &c, nil
+ }
+ }
+ return nil, fmt.Errorf("Cluster not found: %s", name)
+}
diff --git a/core/installer/derived_test.go b/core/installer/derived_test.go
index 83e07ec..d9a873b 100644
--- a/core/installer/derived_test.go
+++ b/core/installer/derived_test.go
@@ -1,6 +1,7 @@
package installer
import (
+ "net"
"testing"
)
@@ -22,6 +23,10 @@
return nil
}
+func (g testKeyGen) GetNodeIP(username, node string) (net.IP, error) {
+ return nil, nil
+}
+
func TestDeriveVPNAuthKey(t *testing.T) {
schema := structSchema{
"input",
@@ -36,7 +41,7 @@
input := map[string]any{
"username": "foo",
}
- v, err := deriveValues(input, input, schema, nil, testKeyGen{})
+ v, err := deriveValues(input, input, schema, nil, nil, testKeyGen{})
if err != nil {
t.Fatal(err)
}
@@ -62,7 +67,7 @@
"username": "foo",
"enabled": false,
}
- v, err := deriveValues(input, input, schema, nil, testKeyGen{})
+ v, err := deriveValues(input, input, schema, nil, nil, testKeyGen{})
if err != nil {
t.Fatal(err)
}
@@ -88,7 +93,7 @@
"username": "foo",
"enabled": true,
}
- v, err := deriveValues(input, input, schema, nil, testKeyGen{})
+ v, err := deriveValues(input, input, schema, nil, nil, testKeyGen{})
if err != nil {
t.Fatal(err)
}
diff --git a/core/installer/helm.go b/core/installer/helm.go
index 7d3c334..dc742d5 100644
--- a/core/installer/helm.go
+++ b/core/installer/helm.go
@@ -100,7 +100,7 @@
if err != nil {
return err
}
- if err := rfs.RemoveDir(root); err != nil {
+ if err := rfs.RemoveAll(root); err != nil {
return err
}
return util.Walk(wtFS, "/", func(path string, info fs.FileInfo, err error) error {
diff --git a/core/installer/kube.go b/core/installer/kube.go
index c8251ff..7f861af 100644
--- a/core/installer/kube.go
+++ b/core/installer/kube.go
@@ -8,14 +8,14 @@
"io"
"net/http"
+ "github.com/giolekva/pcloud/core/installer/kube"
+
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
- "k8s.io/client-go/rest"
- "k8s.io/client-go/tools/clientcmd"
)
type NamespaceCreator interface {
@@ -77,8 +77,8 @@
return buf.String(), nil
}
-func NewNamespaceCreator(kubeconfig string) (NamespaceCreator, error) {
- clientset, err := NewKubeConfig(kubeconfig)
+func NewNamespaceCreator(opts kube.KubeConfigOpts) (NamespaceCreator, error) {
+ clientset, err := kube.NewKubeClient(opts)
if err != nil {
return nil, err
}
@@ -120,27 +120,10 @@
}
func NewHelmReleaseMonitor(kubeconfig string) (HelmReleaseMonitor, error) {
- c, err := NewKubeConfig(kubeconfig)
+ c, err := kube.NewKubeClient(kube.KubeConfigOpts{KubeConfigPath: kubeconfig})
if err != nil {
return nil, err
}
d := dynamic.New(c.RESTClient())
return &realHelmReleaseMonitor{d}, nil
}
-
-func NewKubeConfig(kubeconfig string) (*kubernetes.Clientset, error) {
- if kubeconfig == "" {
- config, err := rest.InClusterConfig()
- if err != nil {
- return nil, err
- }
- return kubernetes.NewForConfig(config)
-
- } else {
- config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
- if err != nil {
- return nil, err
- }
- return kubernetes.NewForConfig(config)
- }
-}
diff --git a/core/installer/kube/client.go b/core/installer/kube/client.go
new file mode 100644
index 0000000..fc19eab
--- /dev/null
+++ b/core/installer/kube/client.go
@@ -0,0 +1,51 @@
+package kube
+
+import (
+ "fmt"
+
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/rest"
+ "k8s.io/client-go/tools/clientcmd"
+ clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
+ clientcmdlatest "k8s.io/client-go/tools/clientcmd/api/latest"
+)
+
+type KubeConfigOpts struct {
+ KubeConfig string
+ KubeConfigPath string
+}
+
+func NewKubeClient(opts KubeConfigOpts) (*kubernetes.Clientset, error) {
+ if opts.KubeConfig != "" && opts.KubeConfigPath != "" {
+ return nil, fmt.Errorf("both path and config can not be defined")
+ }
+ if opts.KubeConfig != "" {
+ var cfg clientcmdapi.Config
+ decoded, _, err := clientcmdlatest.Codec.Decode([]byte(opts.KubeConfig), &schema.GroupVersionKind{Version: clientcmdlatest.Version, Kind: "Config"}, &cfg)
+ if err != nil {
+ return nil, err
+ }
+ getter := func() (*clientcmdapi.Config, error) {
+ return decoded.(*clientcmdapi.Config), nil
+ }
+ config, err := clientcmd.BuildConfigFromKubeconfigGetter("", getter)
+ if err != nil {
+ return nil, err
+ }
+ return kubernetes.NewForConfig(config)
+ }
+ if opts.KubeConfigPath == "" {
+ config, err := rest.InClusterConfig()
+ if err != nil {
+ return nil, err
+ }
+ return kubernetes.NewForConfig(config)
+ } else {
+ config, err := clientcmd.BuildConfigFromFlags("", opts.KubeConfigPath)
+ if err != nil {
+ return nil, err
+ }
+ return kubernetes.NewForConfig(config)
+ }
+}
diff --git a/core/installer/schema.go b/core/installer/schema.go
index fcdebd4..5a70519 100644
--- a/core/installer/schema.go
+++ b/core/installer/schema.go
@@ -23,6 +23,7 @@
KindArrayString = 8
KindPort = 9
KindVPNAuthKey = 11
+ KindCluster = 12
)
type Field struct {
@@ -56,6 +57,34 @@
advanced: true,
}
+const clusterSchema = `
+#Cluster: {
+ name: string
+ kubeconfig: string
+ ingressClassName: string
+}
+
+value: { %s }
+`
+
+func isCluster(v cue.Value) bool {
+ if v.Value().Kind() != cue.StructKind {
+ return false
+ }
+ s := fmt.Sprintf(clusterSchema, fmt.Sprintf("%#v", v))
+ c := cuecontext.New()
+ u := c.CompileString(s)
+ if err := u.Validate(); err != nil {
+ return false
+ }
+ cluster := u.LookupPath(cue.ParsePath("#Cluster"))
+ vv := u.LookupPath(cue.ParsePath("value"))
+ if err := cluster.Subsume(vv); err == nil {
+ return true
+ }
+ return false
+}
+
const networkSchema = `
#Network: {
name: string
@@ -233,18 +262,18 @@
meta := map[string]string{}
usernameFieldAttr := v.Attribute("usernameField")
if usernameFieldAttr.Err() == nil {
- meta["usernameField"] = strings.ToLower(usernameFieldAttr.Contents())
+ meta["usernameField"] = usernameFieldAttr.Contents()
}
usernameAttr := v.Attribute("username")
if usernameAttr.Err() == nil {
- meta["username"] = strings.ToLower(usernameAttr.Contents())
+ meta["username"] = usernameAttr.Contents()
}
if len(meta) != 1 {
return nil, fmt.Errorf("invalid vpn auth key field meta: %+v", meta)
}
enabledFieldAttr := v.Attribute("enabledField")
if enabledFieldAttr.Err() == nil {
- meta["enabledField"] = strings.ToLower(enabledFieldAttr.Contents())
+ meta["enabledField"] = enabledFieldAttr.Contents()
}
return basicSchema{name, KindVPNAuthKey, true, meta}, nil
} else {
@@ -272,9 +301,11 @@
return basicSchema{name, KindAuth, false, nil}, nil
} else if isSSHKey(v) {
return basicSchema{name, KindSSHKey, true, nil}, nil
+ } else if isCluster(v) {
+ return basicSchema{name, KindCluster, false, nil}, nil
}
s := structSchema{name, make([]Field, 0), false}
- f, err := v.Fields(cue.Schema())
+ f, err := v.Fields(cue.All())
if err != nil {
return nil, err
}
@@ -283,10 +314,14 @@
if err != nil {
return nil, err
}
- s.fields = append(s.fields, Field{f.Selector().String(), scm})
+ s.fields = append(s.fields, Field{cleanFieldName(f.Selector().String()), scm})
}
return s, nil
default:
return nil, fmt.Errorf("SHOULD NOT REACH!")
}
}
+
+func cleanFieldName(name string) string {
+ return strings.ReplaceAll(strings.ReplaceAll(name, "?", ""), "!", "")
+}
diff --git a/core/installer/soft/repoio.go b/core/installer/soft/repoio.go
index 458f688..de2f75a 100644
--- a/core/installer/soft/repoio.go
+++ b/core/installer/soft/repoio.go
@@ -29,7 +29,7 @@
Reader(path string) (io.ReadCloser, error)
Writer(path string) (io.WriteCloser, error)
CreateDir(path string) error
- RemoveDir(path string) error
+ RemoveAll(path string) error
ListDir(path string) ([]os.FileInfo, error)
}
@@ -125,7 +125,7 @@
return r.fs.MkdirAll(path, fs.ModePerm)
}
-func (r *repoFS) RemoveDir(path string) error {
+func (r *repoFS) RemoveAll(path string) error {
if err := util.RemoveAll(r.fs, path); err != nil {
if errors.Is(err, fs.ErrNotExist) {
return nil
diff --git a/core/installer/tasks/infra.go b/core/installer/tasks/infra.go
index 3682533..3d39eea 100644
--- a/core/installer/tasks/infra.go
+++ b/core/installer/tasks/infra.go
@@ -18,7 +18,7 @@
if err != nil {
return err
}
- appManager, err := installer.NewAppManager(r, st.nsCreator, st.jc, st.hf, nil, "/apps")
+ appManager, err := installer.NewAppManager(r, st.nsCreator, st.jc, st.hf, nil, nil, "/apps")
if err != nil {
return err
}
diff --git a/core/installer/tasks/install.go b/core/installer/tasks/install.go
index 8b5bef7..6ff939b 100644
--- a/core/installer/tasks/install.go
+++ b/core/installer/tasks/install.go
@@ -1,7 +1,12 @@
package tasks
import (
+ "fmt"
+ "path/filepath"
+
"github.com/giolekva/pcloud/core/installer"
+ "github.com/giolekva/pcloud/core/installer/cluster"
+ "github.com/giolekva/pcloud/core/installer/soft"
)
type InstallFunc func() (installer.ReleaseResources, error)
@@ -50,3 +55,144 @@
t := newParentTask("Installing application", true, start, d)
return &t
}
+
+func NewClusterInitTask(m cluster.Manager, server cluster.Server, cnc installer.ClusterNetworkConfigurator, repo soft.RepoIO, setupFn cluster.ClusterSetupFunc) Task {
+ d := &dynamicTaskSlice{t: []Task{}}
+ done := make(chan error)
+ setupTask := newLeafTask(fmt.Sprintf("Installing dodo on %s", server.IP.String()), func() error {
+ _, err := m.Init(server, setupFn)
+ return err
+ })
+ d.Append(&setupTask)
+ setupTask.OnDone(func(err error) {
+ if err != nil {
+ done <- err
+ return
+ }
+ if err := cnc.AddCluster(m.State().Name, m.State().IngressIP); err != nil {
+ done <- err
+ return
+ }
+ _, err = repo.Do(func(fs soft.RepoFS) (string, error) {
+ if err := soft.WriteJson(fs, fmt.Sprintf("/clusters/%s/config.json", m.State().Name), m.State()); err != nil {
+ return "", err
+ }
+ return fmt.Sprintf("add server to cluster: %s", m.State().Name), nil
+ })
+ done <- err
+ })
+ start := func() error {
+ setupTask.Start()
+ return <-done
+ }
+ t := newParentTask("Installing application", true, start, d)
+ return &t
+}
+
+func NewRemoveClusterTask(m cluster.Manager, cnc installer.ClusterNetworkConfigurator,
+ repo soft.RepoIO) Task {
+ t := newLeafTask(fmt.Sprintf("Removing %s cluster", m.State().Name), func() error {
+ if err := cnc.RemoveCluster(m.State().Name, m.State().IngressIP); err != nil {
+ return err
+ }
+ _, err := repo.Do(func(fs soft.RepoFS) (string, error) {
+ if err := fs.RemoveAll(fmt.Sprintf("/clusters/%s", m.State().Name)); err != nil {
+ return "", err
+ }
+ kustPath := filepath.Join("/clusters", "kustomization.yaml")
+ kust, err := soft.ReadKustomization(fs, kustPath)
+ if err != nil {
+ return "", err
+ }
+ kust.RemoveResources(m.State().Name)
+ soft.WriteYaml(fs, kustPath, kust)
+ return fmt.Sprintf("remove cluster: %s", m.State().Name), nil
+ })
+ return err
+ })
+ return &t
+}
+
+func NewClusterJoinControllerTask(m cluster.Manager, server cluster.Server, repo soft.RepoIO) Task {
+ d := &dynamicTaskSlice{t: []Task{}}
+ done := make(chan error)
+ setupTask := newLeafTask(fmt.Sprintf("Joining %s to %s cluster", server.IP.String(), m.State().Name), func() error {
+ return m.JoinController(server)
+ })
+ d.Append(&setupTask)
+ setupTask.OnDone(func(err error) {
+ if err != nil {
+ done <- err
+ return
+ }
+ _, err = repo.Do(func(fs soft.RepoFS) (string, error) {
+ if err := soft.WriteJson(fs, fmt.Sprintf("/clusters/%s/config.json", m.State().Name), m.State()); err != nil {
+ return "", err
+ }
+ return fmt.Sprintf("add controller server to cluster: %s", m.State().Name), nil
+ })
+ done <- err
+ })
+ start := func() error {
+ setupTask.Start()
+ return <-done
+ }
+ t := newParentTask("Installing application", true, start, d)
+ return &t
+}
+
+func NewClusterJoinWorkerTask(m cluster.Manager, server cluster.Server, repo soft.RepoIO) Task {
+ d := &dynamicTaskSlice{t: []Task{}}
+ done := make(chan error)
+ setupTask := newLeafTask(fmt.Sprintf("Joining %s to %s cluster", server.IP.String(), m.State().Name), func() error {
+ return m.JoinWorker(server)
+ })
+ d.Append(&setupTask)
+ setupTask.OnDone(func(err error) {
+ if err != nil {
+ done <- err
+ return
+ }
+ _, err = repo.Do(func(fs soft.RepoFS) (string, error) {
+ if err := soft.WriteJson(fs, fmt.Sprintf("/clusters/%s/config.json", m.State().Name), m.State()); err != nil {
+ return "", err
+ }
+ return fmt.Sprintf("add worker server to cluster: %s", m.State().Name), nil
+ })
+ done <- err
+ })
+ start := func() error {
+ setupTask.Start()
+ return <-done
+ }
+ t := newParentTask("Installing application", true, start, d)
+ return &t
+}
+
+func NewClusterRemoveServerTask(m cluster.Manager, server string, repo soft.RepoIO) Task {
+ d := &dynamicTaskSlice{t: []Task{}}
+ done := make(chan error)
+ setupTask := newLeafTask(fmt.Sprintf("Removing %s from %s cluster", server, m.State().Name), func() error {
+ return m.RemoveServer(server)
+ })
+ d.Append(&setupTask)
+ setupTask.OnDone(func(err error) {
+ if err != nil {
+ done <- err
+ return
+ }
+ _, err = repo.Do(func(fs soft.RepoFS) (string, error) {
+ if err := soft.WriteJson(fs, fmt.Sprintf("/clusters/%s/config.json", m.State().Name), m.State()); err != nil {
+ return "", err
+ }
+ return fmt.Sprintf("remove %s from cluster: %s", server, m.State().Name), nil
+ })
+ done <- err
+ })
+ start := func() error {
+ setupTask.Start()
+ return <-done
+ }
+ t := newParentTask("Installing application", true, start, d)
+ return &t
+}
diff --git a/core/installer/values-tmpl/appmanager.cue b/core/installer/values-tmpl/appmanager.cue
index 24d0d8d..1cd7dbf 100644
--- a/core/installer/values-tmpl/appmanager.cue
+++ b/core/installer/values-tmpl/appmanager.cue
@@ -69,7 +69,10 @@
values: {
repoAddr: input.repoAddr
sshPrivateKey: base64.Encode(null, input.sshPrivateKey)
+ // TODO(gio): de-hardcode these variables
headscaleAPIAddr: "http://headscale-api.\(global.namespacePrefix)app-headscale.svc.cluster.local"
+ dnsAPIAddr: "http://dns-api.\(global.namespacePrefix)dns.svc.cluster.local"
+ clusterProxyConfigPath: "/apps/private-network/resources/proxy-backend-config.yaml"
ingress: {
className: input.network.ingressClass
domain: _domain
diff --git a/core/installer/values-tmpl/certificate-issuer-custom.cue b/core/installer/values-tmpl/certificate-issuer-custom.cue
index 8721e7f..469382d 100644
--- a/core/installer/values-tmpl/certificate-issuer-custom.cue
+++ b/core/installer/values-tmpl/certificate-issuer-custom.cue
@@ -47,6 +47,7 @@
helm: {
"certificate-issuer": {
chart: charts["certificate-issuer"]
+ Info: "Configuring SSL certificate issuer for \(input.domain)"
dependsOn: [{
name: "ingress-nginx"
namespace: "\(global.namespacePrefix)ingress-private"
diff --git a/core/installer/values-tmpl/cluster-network.cue b/core/installer/values-tmpl/cluster-network.cue
new file mode 100644
index 0000000..d470ff1
--- /dev/null
+++ b/core/installer/values-tmpl/cluster-network.cue
@@ -0,0 +1,138 @@
+import (
+ // "encoding/base64"
+)
+
+input: {
+ cluster: #Cluster
+ vpnUser: string
+ vpnProxyHostname: string
+ vpnAuthKey: string @role(VPNAuthKey) @usernameField(vpnUser)
+ // TODO(gio): support port allocator
+}
+
+name: "Cluster Network"
+namespace: "cluster-network"
+
+out: {
+ images: {
+ "ingress-nginx": {
+ registry: "registry.k8s.io"
+ repository: "ingress-nginx"
+ name: "controller"
+ tag: "v1.8.0"
+ pullPolicy: "IfNotPresent"
+ }
+ "tailscale-proxy": {
+ repository: "tailscale"
+ name: "tailscale"
+ tag: "v1.42.0"
+ pullPolicy: "IfNotPresent"
+ }
+ // portAllocator: {
+ // repository: "giolekva"
+ // name: "port-allocator"
+ // tag: "latest"
+ // pullPolicy: "Always"
+ // }
+ }
+
+ charts: {
+ "access-secrets": {
+ kind: "GitRepository"
+ address: "https://code.v1.dodo.cloud/helm-charts"
+ branch: "main"
+ path: "charts/access-secrets"
+ }
+ "ingress-nginx": {
+ kind: "GitRepository"
+ address: "https://code.v1.dodo.cloud/helm-charts"
+ branch: "main"
+ path: "charts/ingress-nginx"
+ }
+ "tailscale-proxy": {
+ kind: "GitRepository"
+ address: "https://code.v1.dodo.cloud/helm-charts"
+ branch: "main"
+ path: "charts/tailscale-proxy"
+ }
+ // portAllocator: {
+ // kind: "GitRepository"
+ // address: "https://code.v1.dodo.cloud/helm-charts"
+ // branch: "main"
+ // path: "charts/port-allocator"
+ // }
+ }
+
+ helm: {
+ _fullnameOverride: "\(global.id)-nginx-cluster-\(input.cluster.name)"
+ "access-secrets": {
+ chart: charts["access-secrets"]
+ values: {
+ serviceAccountName: _fullnameOverride
+ }
+ }
+ "ingress-nginx": {
+ chart: charts["ingress-nginx"]
+ dependsOn: [{
+ name: "access-secrets"
+ namespace: release.namespace
+ }]
+ values: {
+ fullnameOverride: _fullnameOverride
+ controller: {
+ service: enabled: false
+ ingressClassByName: true
+ ingressClassResource: {
+ name: input.cluster.ingressClassName
+ enabled: true
+ default: false
+ controllerValue: "k8s.io/\(input.cluster.name)"
+ }
+ config: {
+ "proxy-body-size": "200M" // TODO(giolekva): configurable
+ "force-ssl-redirect": "true"
+ "server-snippet": """
+ more_clear_headers "X-Frame-Options";
+ """
+ }
+ admissionWebhooks: {
+ enabled: false
+ }
+ image: {
+ registry: images["ingress-nginx"].registry
+ image: images["ingress-nginx"].imageName
+ tag: images["ingress-nginx"].tag
+ pullPolicy: images["ingress-nginx"].pullPolicy
+ }
+ extraContainers: [{
+ name: "proxy"
+ image: images["tailscale-proxy"].fullNameWithTag
+ env: [{
+ name: "TS_AUTHKEY"
+ value: input.vpnAuthKey
+ }, {
+ name: "TS_HOSTNAME"
+ value: input.vpnProxyHostname
+ }, {
+ name: "TS_EXTRA_ARGS"
+ value: "--login-server=https://headscale.\(global.domain)"
+ }]
+ }]
+ }
+ }
+ }
+ // "port-allocator": {
+ // chart: charts.portAllocator
+ // values: {
+ // repoAddr: release.repoAddr
+ // sshPrivateKey: base64.Encode(null, input.sshPrivateKey)
+ // ingressNginxPath: "\(release.appDir)/resources/ingress-nginx.yaml"
+ // image: {
+ // repository: images.portAllocator.fullName
+ // tag: images.portAllocator.tag
+ // pullPolicy: images.portAllocator.pullPolicy
+ // }
+ // }
+ // }
+ }
+}
diff --git a/core/installer/values-tmpl/virtual-machine.cue b/core/installer/values-tmpl/virtual-machine.cue
index 6f311d9..841cb8b 100644
--- a/core/installer/values-tmpl/virtual-machine.cue
+++ b/core/installer/values-tmpl/virtual-machine.cue
@@ -4,7 +4,8 @@
authKey?: string @name(Auth Key) @role(VPNAuthKey) @usernameField(username) @enabledField(vpnEnabled)
cpuCores: int | *1 @name(CPU Cores)
memory: string | *"2Gi" @name(Memory)
- vpnEnabled: bool @name(Enable VPN)
+ vpnEnabled?: bool @name(Enable VPN)
+ codeServerEnabled?: bool @name(Install VSCode Server)
}
name: "Virutal Machine"
@@ -21,15 +22,20 @@
domain: global.domain
cpuCores: input.cpuCores
memory: input.memory
- if !input.vpnEnabled {
- vpn: enabled: false
- }
- if input.vpnEnabled {
- vpn: {
- enabled: true
- loginServer: "https://headscale.\(global.domain)"
- authKey: input.authKey
+ if input.vpnEnabled != _|_ {
+ if !input.vpnEnabled {
+ vpn: enabled: false
}
+ if input.vpnEnabled {
+ vpn: {
+ enabled: true
+ loginServer: "https://headscale.\(global.domain)"
+ authKey: input.authKey
+ }
+ }
+ }
+ if input.codeServerEnabled != _|_ {
+ codeServerEnabled: input.codeServerEnabled
}
}
}
diff --git a/core/installer/vpn.go b/core/installer/vpn.go
index 01161df..beeb8e4 100644
--- a/core/installer/vpn.go
+++ b/core/installer/vpn.go
@@ -6,6 +6,7 @@
"errors"
"fmt"
"io"
+ "net"
"net/http"
"net/url"
)
@@ -15,6 +16,7 @@
ExpireKey(username, key string) error
ExpireNode(username, node string) error
RemoveNode(username, node string) error
+ GetNodeIP(username, node string) (net.IP, error)
}
type headscaleAPIClient struct {
@@ -108,3 +110,34 @@
}
return nil
}
+
+func (g *headscaleAPIClient) GetNodeIP(username, node string) (net.IP, error) {
+ addr, err := url.Parse(fmt.Sprintf("%s/user/%s/node/%s/ip", g.apiAddr, username, node))
+ if err != nil {
+ return nil, err
+ }
+ resp, err := g.c.Do(&http.Request{
+ URL: addr,
+ Method: http.MethodGet,
+ Body: nil,
+ })
+ if err != nil {
+ return nil, err
+ }
+ var buf bytes.Buffer
+ if _, err := io.Copy(&buf, resp.Body); err != nil {
+ return nil, err
+ }
+ bufS := buf.String()
+ if resp.StatusCode == http.StatusNotFound {
+ return nil, ErrorNotFound
+ }
+ if resp.StatusCode != http.StatusOK {
+ return nil, errors.New(bufS)
+ }
+ ip := net.ParseIP(bufS)
+ if ip == nil {
+ return nil, fmt.Errorf("invalid ip")
+ }
+ return ip, nil
+}
diff --git a/core/installer/welcome/appmanager-tmpl/all-clusters.html b/core/installer/welcome/appmanager-tmpl/all-clusters.html
new file mode 100644
index 0000000..843381f
--- /dev/null
+++ b/core/installer/welcome/appmanager-tmpl/all-clusters.html
@@ -0,0 +1,21 @@
+{{ define "header" }}
+<h1>Clusters</h1>
+{{ end }}
+
+{{ define "content"}}
+<form action="/clusters" method="POST">
+ <fieldset class="grid">
+ <input type="text" name="name" placeholder="name" />
+ <button type="submit" name="create-cluster">create cluster</button>
+ </fieldset>
+</form>
+<aside>
+ <nav>
+ <ul>
+ {{ range .Clusters }}
+ <li><a href="/clusters/{{ .Name }}">{{ .Name }}</a></li>
+ {{ end }}
+ </ul>
+ </nav>
+</aside>
+{{ end }}
diff --git a/core/installer/welcome/appmanager-tmpl/app.html b/core/installer/welcome/appmanager-tmpl/app.html
index c6874cb..c1387ad 100644
--- a/core/installer/welcome/appmanager-tmpl/app.html
+++ b/core/installer/welcome/appmanager-tmpl/app.html
@@ -1,19 +1,7 @@
-{{ define "task" }}
-{{ range . }}
-<li aria-busy="{{ eq .Status 1 }}">
- {{ if eq .Status 3 }}<svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" viewBox="0 0 24 24"><path fill="black" d="M21 7L9 19l-5.5-5.5l1.41-1.41L9 16.17L19.59 5.59z"/></svg>{{ end }}{{ .Title }}{{ if .Err }} - {{ .Err.Error }} {{ end }}
- {{ if .Subtasks }}
- <ul>
- {{ template "task" .Subtasks }}
- </ul>
- {{ end }}
-</li>
-{{ end }}
-{{ end }}
-
{{ define "schema-form" }}
{{ $readonly := .ReadOnly }}
{{ $networks := .AvailableNetworks }}
+ {{ $clusters := .AvailableClusters }}
{{ $data := .Data }}
{{ range $f := .Schema.Fields }}
{{ $name := $f.Name }}
@@ -38,6 +26,25 @@
{{ $schema.Name }}
<input type="text" name="{{ $name }}" oninput="valueChanged({{ $name }}, this.value)" {{ if $readonly }}disabled{{ end }} value="{{ index $data $name }}" />
</label>
+ {{ else if eq $schema.Kind 12 }}
+ <label {{ if $schema.Advanced }}hidden{{ end }}>
+ {{ $schema.Name }}
+ <details class="dropdown">
+ {{ $selectedCluster := index $data $name }}
+ <summary id="{{ $name }}">{{ $selectedCluster }}</summary>
+ <ul>
+ {{ range $clusters }}
+ {{ $selected := eq $selectedCluster .Name }}
+ <li>
+ <label>
+ <input type="radio" name="{{ $name }}" oninput="clusterSelected('{{ $name }}', '{{ .Name }}', this.checked)" {{ if $selected }}checked{{ end }} />
+ {{ .Name }}
+ </label>
+ </li>
+ {{ end }}
+ </ul>
+ </details>
+ </label>
{{ else if eq $schema.Kind 3 }}
<label {{ if $schema.Advanced }}hidden{{ end }}>
{{ $schema.Name }}
@@ -49,8 +56,8 @@
{{ $selected := eq $selectedNetwork .Name }}
<li>
<label>
- <input type="radio" name="{{ $name }}" oninput="networkSelected('{{ $name }}', '{{ .Name }}', this.checked)" {{ if $selected }}checked{{ end }} />
- {{ .Name }}
+ <input type="radio" name="{{ $name }}" oninput="networkSelected('{{ $name }}', '{{ .Name }}', '{{ .Name }} - {{ .Domain }}', this.checked)" {{ if $selected }}checked{{ end }} />
+ {{ .Name }} - {{ .Domain }}
</label>
</li>
{{ end }}
@@ -75,7 +82,7 @@
<li>
<label>
<input type="checkbox" name="{{ $networkName }}" oninput="multiNetworkSelected('{{ $name }}', '{{ $networkName }}', this.checked)" {{ if $selected }}checked{{ end }} />
- {{ .Name }}
+ {{ .Name }} - {{ .Domain }}
</label>
</li>
{{ end }}
@@ -132,25 +139,14 @@
{{ define "content"}}
{{ $schema := .App.Schema }}
{{ $networks := .AvailableNetworks }}
+ {{ $clusters := .AvailableClusters }}
{{ $instance := .Instance }}
- {{ $renderForm := true }}
- {{ if .Task }}
- {{if or (eq .Task.Status 0) (eq .Task.Status 1) }}
- {{ $renderForm = false }}
- Installation in progress (feel free to navigate away from this page):
- <ul class="progress">
- {{ template "task" .Task.Subtasks }}
- </ul>
- {{ end }}
- {{ end }}
-
- {{ if $renderForm }}
<form id="config-form">
{{ if $instance }}
- {{ template "schema-form" (dict "Schema" $schema "AvailableNetworks" $networks "ReadOnly" false "Data" ($instance.InputToValues $schema)) }}
+ {{ template "schema-form" (dict "Schema" $schema "AvailableNetworks" $networks "AvailableClusters" $clusters "ReadOnly" false "Data" ($instance.InputToValues $schema)) }}
{{ else }}
- {{ template "schema-form" (dict "Schema" $schema "AvailableNetworks" $networks "ReadOnly" false "Data" (dict)) }}
+ {{ template "schema-form" (dict "Schema" $schema "AvailableNetworks" $networks "AvailableClusters" $clusters "ReadOnly" false "Data" (dict)) }}
{{ end }}
{{ if $instance }}
<div class="grid">
@@ -161,7 +157,6 @@
<button type="submit" id="submit">{{ if $instance }}Update{{ else }}Install{{ end }}</button>
{{ end }}
</form>
- {{ end }}
<div id="toast-failure" class="toast hidden">
<svg xmlns="http://www.w3.org/2000/svg" width="36" height="36" viewBox="0 0 24 24"><path fill="none" stroke="currentColor" stroke-linecap="round" stroke-linejoin="round" stroke-width="1.5" d="M12 22c5.523 0 10-4.477 10-10S17.523 2 12 2S2 6.477 2 12s4.477 10 10 10Zm3-6L9 8m0 8l6-8"/></svg> {{ if $instance }}Update failed{{ else}}Install failed{{ end }}
@@ -200,11 +195,19 @@
setValue(name, value, config);
}
- function networkSelected(name, network, selected) {
+ function clusterSelected(name, cluster, selected) {
+ console.log(selected);
+ setValue(name, cluster, config);
+ let summary = document.getElementById(name);
+ summary.innerHTML = cluster;
+ summary.parentNode.removeAttribute("open");
+ }
+
+ function networkSelected(name, network, label, selected) {
console.log(selected);
setValue(name, network, config);
let summary = document.getElementById(name);
- summary.innerHTML = network;
+ summary.innerHTML = label;
summary.parentNode.removeAttribute("open");
}
@@ -310,29 +313,6 @@
}
});
}
-
- {{ if .Task }}
- async function refresh() {
- try {
- const resp = await fetch(window.location.href);
- if (resp.ok) {
- var tmp = document.createElement("html");
- tmp.innerHTML = await resp.text();
- const progress = tmp.getElementsByClassName("progress")[0];
- if (progress) {
- document.getElementsByClassName("progress")[0].innerHTML = progress.innerHTML;
- } else {
- location.reload();
- }
- }
- } catch (error) {
- console.log(error);
- } finally {
- setTimeout(refresh, 3000);
- }
- }
- setTimeout(refresh, 3000);
- {{ end }}
</script>
{{end}}
diff --git a/core/installer/welcome/appmanager-tmpl/base.html b/core/installer/welcome/appmanager-tmpl/base.html
index 4458485..cb69970 100644
--- a/core/installer/welcome/appmanager-tmpl/base.html
+++ b/core/installer/welcome/appmanager-tmpl/base.html
@@ -18,6 +18,8 @@
<li><a href="/installed" class="{{ if (eq .CurrentPage "installed") }}primary{{ end }}">Installed</a></li>
<li><a href="/not-installed" class="{{ if (eq .CurrentPage "not-installed") }}primary{{ end }}">Not Installed</a></li>
<hr>
+ <li><a href="/clusters" class="{{ if (eq .CurrentPage "clusters") }}primary{{ end }}">Clusters</a></li>
+ <hr>
{{ block "extra_menu" . }}{{ end }}
</ul>
</nav>
@@ -29,3 +31,16 @@
<script src="/stat/app-manager.js?v=0.0.11"></script>
</body>
</html>
+
+{{ define "task" }}
+{{ range . }}
+<li aria-busy="{{ eq .Status 1 }}">
+ {{ if eq .Status 3 }}<svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" viewBox="0 0 24 24"><path fill="black" d="M21 7L9 19l-5.5-5.5l1.41-1.41L9 16.17L19.59 5.59z"/></svg>{{ end }}{{ .Title }}{{ if .Err }} - {{ .Err.Error }} {{ end }}
+ {{ if .Subtasks }}
+ <ul>
+ {{ template "task" .Subtasks }}
+ </ul>
+ {{ end }}
+</li>
+{{ end }}
+{{ end }}
diff --git a/core/installer/welcome/appmanager-tmpl/cluster.html b/core/installer/welcome/appmanager-tmpl/cluster.html
new file mode 100644
index 0000000..2ef2fbe
--- /dev/null
+++ b/core/installer/welcome/appmanager-tmpl/cluster.html
@@ -0,0 +1,70 @@
+{{ define "header" }}
+<h1>Cluster - {{ .Cluster.Name }}</h1>
+{{ end }}
+
+{{ define "content" }}
+<form action="/clusters/{{ .Cluster.Name }}/remove" method="POST">
+ <button type="submit" name="remove-cluster">remove cluster</button>
+</form>
+<form action="/clusters/{{ .Cluster.Name }}/servers" method="POST" autocomplete="off">
+ <details class="dropdown">
+ <summary id="type">worker</summary>
+ <ul>
+ <li>
+ <label>
+ <input type="radio" name="type" value="worker" checked />
+ worker
+ </label>
+ </li>
+ <li>
+ <label>
+ <input type="radio" name="type" value="controller" />
+ controller
+ </label>
+ </li>
+ </ul>
+ </details>
+ <input type="text" name="ip" placeholder="ip" />
+ <input type="text" name="port" placeholder="22 (optional)" />
+ <input type="text" name="user" placeholder="user" />
+ <input type="password" name="password" placeholder="password" />
+ <button type="submit" name="add-server">add server</button>
+</form>
+{{ $c := .Cluster }}
+<table class="striped">
+ <thead>
+ <tr>
+ <th scope="col">type</th>
+ <th scope="col">hostname</th>
+ <th scope="col">ip</th>
+ <th scope="col">remove</th>
+ </tr>
+ </thead>
+ <tbody>
+ {{ range $s := .Cluster.Controllers }}
+ <tr>
+ <th>controller</th>
+ <th scope="row">{{ $s.Name }}</th>
+ <td>{{ $s.IP }} </td>
+ <td>
+ <form action="/clusters/{{ $c.Name }}/servers/{{ $s.Name }}/remove" method="POST">
+ <button type="submit">remove</button>
+ </form>
+ </td>
+ </tr>
+ {{ end }}
+ {{ range $s := .Cluster.Workers }}
+ <tr>
+ <th>worker</th>
+ <th scope="row">{{ $s.Name }}</th>
+ <td>{{ $s.IP }} </td>
+ <td>
+ <form action="/clusters/{{ $c.Name }}/servers/{{ $s.Name }}/remove" method="POST">
+ <button type="submit">remove</button>
+ </form>
+ </td>
+ </tr>
+ {{ end }}
+ </tbody>
+</table>
+{{ end }}
diff --git a/core/installer/welcome/appmanager-tmpl/task.html b/core/installer/welcome/appmanager-tmpl/task.html
new file mode 100644
index 0000000..699a660
--- /dev/null
+++ b/core/installer/welcome/appmanager-tmpl/task.html
@@ -0,0 +1,30 @@
+{{ define "content"}}
+Installation in progress (feel free to navigate away from this page):
+<ul class="progress">
+ {{ template "task" .Task.Subtasks }}
+</ul>
+
+<script>
+ async function refresh() {
+ try {
+ const resp = await fetch(window.location.href);
+ console.log(window.location.href, resp);
+ if (resp.ok) {
+ if (window.location.href != resp.url) {
+ location.assign(resp.url);
+ } else {
+ var tmp = document.createElement("html");
+ tmp.innerHTML = await resp.text();
+ const progress = tmp.getElementsByClassName("progress")[0];
+ document.getElementsByClassName("progress")[0].innerHTML = progress.innerHTML;
+ }
+ }
+ } catch (error) {
+ console.log(error);
+ } finally {
+ setTimeout(refresh, 3000);
+ }
+ }
+ setTimeout(refresh, 3000);
+</script>
+{{ end }}
diff --git a/core/installer/welcome/appmanager.go b/core/installer/welcome/appmanager.go
index 7f168dc..3016190 100644
--- a/core/installer/welcome/appmanager.go
+++ b/core/installer/welcome/appmanager.go
@@ -4,37 +4,57 @@
"context"
"embed"
"encoding/json"
+ "errors"
"fmt"
"html/template"
"io/ioutil"
"log"
+ "net"
"net/http"
+ "strconv"
+ "strings"
+ "sync"
"time"
"github.com/Masterminds/sprig/v3"
"github.com/gorilla/mux"
"github.com/giolekva/pcloud/core/installer"
+ "github.com/giolekva/pcloud/core/installer/cluster"
+ "github.com/giolekva/pcloud/core/installer/soft"
"github.com/giolekva/pcloud/core/installer/tasks"
)
//go:embed appmanager-tmpl/*
var appTmpls embed.FS
+type taskForward struct {
+ task tasks.Task
+ redirectTo string
+}
+
type AppManagerServer struct {
- port int
- m *installer.AppManager
- r installer.AppRepository
- reconciler *tasks.FixedReconciler
- h installer.HelmReleaseMonitor
- tasks map[string]tasks.Task
- ta map[string]installer.EnvApp
- tmpl tmplts
+ l sync.Locker
+ port int
+ repo soft.RepoIO
+ m *installer.AppManager
+ r installer.AppRepository
+ fr installer.AppRepository
+ reconciler *tasks.FixedReconciler
+ h installer.HelmReleaseMonitor
+ cnc installer.ClusterNetworkConfigurator
+ vpnAPIClient installer.VPNAPIClient
+ tasks map[string]taskForward
+ ta map[string]installer.EnvApp
+ tmpl tmplts
}
type tmplts struct {
- index *template.Template
- app *template.Template
+ index *template.Template
+ app *template.Template
+ allClusters *template.Template
+ cluster *template.Template
+ task *template.Template
}
func parseTemplatesAppManager(fs embed.FS) (tmplts, error) {
@@ -57,29 +77,50 @@
if err != nil {
return tmplts{}, err
}
- return tmplts{index, app}, nil
+ allClusters, err := parse("appmanager-tmpl/all-clusters.html")
+ if err != nil {
+ return tmplts{}, err
+ }
+ cluster, err := parse("appmanager-tmpl/cluster.html")
+ if err != nil {
+ return tmplts{}, err
+ }
+ task, err := parse("appmanager-tmpl/task.html")
+ if err != nil {
+ return tmplts{}, err
+ }
+ return tmplts{index, app, allClusters, cluster, task}, nil
}
func NewAppManagerServer(
port int,
+ repo soft.RepoIO,
m *installer.AppManager,
r installer.AppRepository,
+ fr installer.AppRepository,
reconciler *tasks.FixedReconciler,
h installer.HelmReleaseMonitor,
+ cnc installer.ClusterNetworkConfigurator,
+ vpnAPIClient installer.VPNAPIClient,
) (*AppManagerServer, error) {
tmpl, err := parseTemplatesAppManager(appTmpls)
if err != nil {
return nil, err
}
return &AppManagerServer{
- port: port,
- m: m,
- r: r,
- reconciler: reconciler,
- h: h,
- tasks: make(map[string]tasks.Task),
- ta: make(map[string]installer.EnvApp),
- tmpl: tmpl,
+ l: &sync.Mutex{},
+ port: port,
+ repo: repo,
+ m: m,
+ r: r,
+ fr: fr,
+ reconciler: reconciler,
+ h: h,
+ cnc: cnc,
+ vpnAPIClient: vpnAPIClient,
+ tasks: make(map[string]taskForward),
+ ta: make(map[string]installer.EnvApp),
+ tmpl: tmpl,
}, nil
}
@@ -102,8 +143,15 @@
r.HandleFunc("/api/instance/{slug}", s.handleInstance).Methods(http.MethodGet)
r.HandleFunc("/api/instance/{slug}/update", s.handleAppUpdate).Methods(http.MethodPost)
r.HandleFunc("/api/instance/{slug}/remove", s.handleAppRemove).Methods(http.MethodPost)
+ r.HandleFunc("/clusters/{cluster}/servers/{server}/remove", s.handleClusterRemoveServer).Methods(http.MethodPost)
+ r.HandleFunc("/clusters/{cluster}/servers", s.handleClusterAddServer).Methods(http.MethodPost)
+ r.HandleFunc("/clusters/{name}", s.handleCluster).Methods(http.MethodGet)
+ r.HandleFunc("/clusters/{name}/remove", s.handleRemoveCluster).Methods(http.MethodPost)
+ r.HandleFunc("/clusters", s.handleAllClusters).Methods(http.MethodGet)
+ r.HandleFunc("/clusters", s.handleCreateCluster).Methods(http.MethodPost)
r.HandleFunc("/app/{slug}", s.handleAppUI).Methods(http.MethodGet)
r.HandleFunc("/instance/{slug}", s.handleInstanceUI).Methods(http.MethodGet)
+ r.HandleFunc("/tasks/{slug}", s.handleTaskStatus).Methods(http.MethodGet)
r.HandleFunc("/{pageType}", s.handleAppsList).Methods(http.MethodGet)
r.HandleFunc("/", s.handleAppsList).Methods(http.MethodGet)
fmt.Printf("Starting HTTP server on port: %d\n", s.port)
@@ -201,6 +249,8 @@
}
func (s *AppManagerServer) handleAppInstall(w http.ResponseWriter, r *http.Request) {
+ s.l.Lock()
+ defer s.l.Unlock()
slug, ok := mux.Vars(r)["slug"]
if !ok {
http.Error(w, "empty slug", http.StatusBadRequest)
@@ -249,20 +299,27 @@
if _, ok := s.tasks[instanceId]; ok {
panic("MUST NOT REACH!")
}
- s.tasks[instanceId] = t
+ s.tasks[instanceId] = taskForward{t, fmt.Sprintf("/instance/%s", instanceId)}
s.ta[instanceId] = a
t.OnDone(func(err error) {
- delete(s.tasks, instanceId)
- delete(s.ta, instanceId)
+ go func() {
+ time.Sleep(30 * time.Second)
+ s.l.Lock()
+ defer s.l.Unlock()
+ delete(s.tasks, instanceId)
+ delete(s.ta, instanceId)
+ }()
})
go t.Start()
- if _, err := fmt.Fprintf(w, "/instance/%s", instanceId); err != nil {
+ if _, err := fmt.Fprintf(w, "/tasks/%s", instanceId); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (s *AppManagerServer) handleAppUpdate(w http.ResponseWriter, r *http.Request) {
+ s.l.Lock()
+ defer s.l.Unlock()
slug, ok := mux.Vars(r)["slug"]
if !ok {
http.Error(w, "empty slug", http.StatusBadRequest)
@@ -291,11 +348,16 @@
go s.reconciler.Reconcile(ctx)
t := tasks.NewMonitorRelease(s.h, rr)
t.OnDone(func(err error) {
- delete(s.tasks, slug)
+ go func() {
+ time.Sleep(30 * time.Second)
+ s.l.Lock()
+ defer s.l.Unlock()
+ delete(s.tasks, slug)
+ }()
})
- s.tasks[slug] = t
+ s.tasks[slug] = taskForward{t, fmt.Sprintf("/instance/%s", slug)}
go t.Start()
- if _, err := fmt.Fprintf(w, "/instance/%s", slug); err != nil {
+ if _, err := fmt.Fprintf(w, "/tasks/%s", slug); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@@ -373,6 +435,7 @@
Instance *installer.AppInstanceConfig
Instances []installer.AppInstanceConfig
AvailableNetworks []installer.Network
+ AvailableClusters []cluster.State
Task tasks.Task
CurrentPage string
}
@@ -403,10 +466,16 @@
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
+ clusters, err := s.m.GetClusters()
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
data := appPageData{
App: a,
Instances: instances,
AvailableNetworks: networks,
+ AvailableClusters: clusters,
CurrentPage: a.Name(),
}
if err := s.tmpl.app.Execute(w, data); err != nil {
@@ -416,6 +485,8 @@
}
func (s *AppManagerServer) handleInstanceUI(w http.ResponseWriter, r *http.Request) {
+ s.l.Lock()
+ defer s.l.Unlock()
global, err := s.m.Config()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
@@ -432,6 +503,10 @@
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
+ if ok && !(t.task.Status() == tasks.StatusDone || t.task.Status() == tasks.StatusFailed) {
+ http.Redirect(w, r, fmt.Sprintf("/tasks/%s", slug), http.StatusSeeOther)
+ return
+ }
var a installer.EnvApp
if instance != nil {
a, err = s.m.GetInstanceApp(instance.Id)
@@ -456,12 +531,18 @@
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
+ clusters, err := s.m.GetClusters()
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
data := appPageData{
App: a,
Instance: instance,
Instances: instances,
AvailableNetworks: networks,
- Task: t,
+ AvailableClusters: clusters,
+ Task: t.task,
CurrentPage: slug,
}
if err := s.tmpl.app.Execute(w, data); err != nil {
@@ -469,3 +550,319 @@
return
}
}
+
+type taskStatusData struct {
+ CurrentPage string
+ Task tasks.Task
+}
+
+func (s *AppManagerServer) handleTaskStatus(w http.ResponseWriter, r *http.Request) {
+ s.l.Lock()
+ defer s.l.Unlock()
+ slug, ok := mux.Vars(r)["slug"]
+ if !ok {
+ http.Error(w, "empty slug", http.StatusBadRequest)
+ return
+ }
+ t, ok := s.tasks[slug]
+ if !ok {
+ http.Error(w, "task not found", http.StatusInternalServerError)
+
+ return
+ }
+ if ok && (t.task.Status() == tasks.StatusDone || t.task.Status() == tasks.StatusFailed) {
+ http.Redirect(w, r, t.redirectTo, http.StatusSeeOther)
+ return
+ }
+ data := taskStatusData{
+ CurrentPage: "",
+ Task: t.task,
+ }
+ if err := s.tmpl.task.Execute(w, data); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+}
+
+type clustersData struct {
+ CurrentPage string
+ Clusters []cluster.State
+}
+
+func (s *AppManagerServer) handleAllClusters(w http.ResponseWriter, r *http.Request) {
+ clusters, err := s.m.GetClusters()
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ data := clustersData{
+ "clusters",
+ clusters,
+ }
+ if err := s.tmpl.allClusters.Execute(w, data); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+}
+
+type clusterData struct {
+ CurrentPage string
+ Cluster cluster.State
+}
+
+func (s *AppManagerServer) handleCluster(w http.ResponseWriter, r *http.Request) {
+ name, ok := mux.Vars(r)["name"]
+ if !ok {
+ http.Error(w, "empty name", http.StatusBadRequest)
+ return
+ }
+ m, err := s.getClusterManager(name)
+ if err != nil {
+ if errors.Is(err, installer.ErrorNotFound) {
+ http.Error(w, "not found", http.StatusNotFound)
+ } else {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ }
+ return
+ }
+ data := clusterData{
+ "clusters",
+ m.State(),
+ }
+ if err := s.tmpl.cluster.Execute(w, data); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+}
+
+func (s *AppManagerServer) handleClusterRemoveServer(w http.ResponseWriter, r *http.Request) {
+ s.l.Lock()
+ defer s.l.Unlock()
+ cName, ok := mux.Vars(r)["cluster"]
+ if !ok {
+ http.Error(w, "empty name", http.StatusBadRequest)
+ return
+ }
+ if _, ok := s.tasks[cName]; ok {
+ http.Error(w, "cluster task in progress", http.StatusLocked)
+ return
+ }
+ sName, ok := mux.Vars(r)["server"]
+ if !ok {
+ http.Error(w, "empty name", http.StatusBadRequest)
+ return
+ }
+ m, err := s.getClusterManager(cName)
+ if err != nil {
+ if errors.Is(err, installer.ErrorNotFound) {
+ http.Error(w, "not found", http.StatusNotFound)
+ } else {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ }
+ return
+ }
+ task := tasks.NewClusterRemoveServerTask(m, sName, s.repo)
+ task.OnDone(func(err error) {
+ go func() {
+ time.Sleep(30 * time.Second)
+ s.l.Lock()
+ defer s.l.Unlock()
+ delete(s.tasks, cName)
+ }()
+ })
+ go task.Start()
+ s.tasks[cName] = taskForward{task, fmt.Sprintf("/clusters/%s", cName)}
+ http.Redirect(w, r, fmt.Sprintf("/tasks/%s", cName), http.StatusSeeOther)
+}
+
+func (s *AppManagerServer) getClusterManager(cName string) (cluster.Manager, error) {
+ clusters, err := s.m.GetClusters()
+ if err != nil {
+ return nil, err
+ }
+ var c *cluster.State
+ for _, i := range clusters {
+ if i.Name == cName {
+ c = &i
+ break
+ }
+ }
+ if c == nil {
+ return nil, installer.ErrorNotFound
+ }
+ return cluster.RestoreKubeManager(*c)
+}
+
+func (s *AppManagerServer) handleClusterAddServer(w http.ResponseWriter, r *http.Request) {
+ s.l.Lock()
+ defer s.l.Unlock()
+ cName, ok := mux.Vars(r)["cluster"]
+ if !ok {
+ http.Error(w, "empty name", http.StatusBadRequest)
+ return
+ }
+ if _, ok := s.tasks[cName]; ok {
+ http.Error(w, "cluster task in progress", http.StatusLocked)
+ return
+ }
+ m, err := s.getClusterManager(cName)
+ if err != nil {
+ if errors.Is(err, installer.ErrorNotFound) {
+ http.Error(w, "not found", http.StatusNotFound)
+ } else {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ }
+ return
+ }
+ t := r.PostFormValue("type")
+ ip := net.ParseIP(r.PostFormValue("ip"))
+ if ip == nil {
+ http.Error(w, "invalid ip", http.StatusBadRequest)
+ return
+ }
+ port := 22
+ if p := r.PostFormValue("port"); p != "" {
+ port, err = strconv.Atoi(p)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+ }
+ server := cluster.Server{
+ IP: ip,
+ Port: port,
+ User: r.PostFormValue("user"),
+ Password: r.PostFormValue("password"),
+ }
+ var task tasks.Task
+ switch strings.ToLower(t) {
+ case "controller":
+ if len(m.State().Controllers) == 0 {
+ task = tasks.NewClusterInitTask(m, server, s.cnc, s.repo, s.setupRemoteCluster())
+ } else {
+ task = tasks.NewClusterJoinControllerTask(m, server, s.repo)
+ }
+ case "worker":
+ task = tasks.NewClusterJoinWorkerTask(m, server, s.repo)
+ default:
+ http.Error(w, "invalid type", http.StatusBadRequest)
+ return
+ }
+ task.OnDone(func(err error) {
+ go func() {
+ time.Sleep(30 * time.Second)
+ s.l.Lock()
+ defer s.l.Unlock()
+ delete(s.tasks, cName)
+ }()
+ })
+ go task.Start()
+ s.tasks[cName] = taskForward{task, fmt.Sprintf("/clusters/%s", cName)}
+ http.Redirect(w, r, fmt.Sprintf("/tasks/%s", cName), http.StatusSeeOther)
+}
+
+func (s *AppManagerServer) handleCreateCluster(w http.ResponseWriter, r *http.Request) {
+ cName := r.PostFormValue("name")
+ if cName == "" {
+ http.Error(w, "no name", http.StatusBadRequest)
+ return
+ }
+ st := cluster.State{Name: cName}
+ if _, err := s.repo.Do(func(fs soft.RepoFS) (string, error) {
+ if err := soft.WriteJson(fs, fmt.Sprintf("/clusters/%s/config.json", cName), st); err != nil {
+ return "", err
+ }
+ return fmt.Sprintf("create cluster: %s", cName), nil
+ }); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ http.Redirect(w, r, fmt.Sprintf("/clusters/%s", cName), http.StatusSeeOther)
+}
+
+func (s *AppManagerServer) handleRemoveCluster(w http.ResponseWriter, r *http.Request) {
+ cName, ok := mux.Vars(r)["name"]
+ if !ok {
+ http.Error(w, "empty name", http.StatusBadRequest)
+ return
+ }
+ if _, ok := s.tasks[cName]; ok {
+ http.Error(w, "cluster task in progress", http.StatusLocked)
+ return
+ }
+ m, err := s.getClusterManager(cName)
+ if err != nil {
+ if errors.Is(err, installer.ErrorNotFound) {
+ http.Error(w, "not found", http.StatusNotFound)
+ } else {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ }
+ return
+ }
+ task := tasks.NewRemoveClusterTask(m, s.cnc, s.repo)
+ task.OnDone(func(err error) {
+ go func() {
+ time.Sleep(30 * time.Second)
+ s.l.Lock()
+ defer s.l.Unlock()
+ delete(s.tasks, cName)
+ }()
+ })
+ go task.Start()
+ s.tasks[cName] = taskForward{task, fmt.Sprintf("/clusters/%s", cName)}
+ http.Redirect(w, r, fmt.Sprintf("/tasks/%s", cName), http.StatusSeeOther)
+}
+
+func (s *AppManagerServer) setupRemoteCluster() cluster.ClusterSetupFunc {
+ const vpnUser = "private-network-proxy"
+ return func(name, kubeconfig, ingressClassName string) (net.IP, error) {
+ hostname := fmt.Sprintf("cluster-%s", name)
+ t := tasks.NewInstallTask(s.h, func() (installer.ReleaseResources, error) {
+ app, err := installer.FindEnvApp(s.fr, "cluster-network")
+ if err != nil {
+ return installer.ReleaseResources{}, err
+ }
+ env, err := s.m.Config()
+ if err != nil {
+ return installer.ReleaseResources{}, err
+ }
+ instanceId := fmt.Sprintf("%s-%s", app.Slug(), name)
+ appDir := fmt.Sprintf("/clusters/%s/ingress", name)
+ namespace := fmt.Sprintf("%scluster-network-%s", env.NamespacePrefix, name)
+ rr, err := s.m.Install(app, instanceId, appDir, namespace, map[string]any{
+ "cluster": map[string]any{
+ "name": name,
+ "kubeconfig": kubeconfig,
+ "ingressClassName": ingressClassName,
+ },
+ // TODO(gio): remove hardcoded user
+ "vpnUser": vpnUser,
+ "vpnProxyHostname": hostname,
+ })
+ if err != nil {
+ return installer.ReleaseResources{}, err
+ }
+ ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
+ go s.reconciler.Reconcile(ctx)
+ return rr, err
+ })
+ ch := make(chan error)
+ t.OnDone(func(err error) {
+ ch <- err
+ })
+ go t.Start()
+ err := <-ch
+ if err != nil {
+ return nil, err
+ }
+ for {
+ ip, err := s.vpnAPIClient.GetNodeIP(vpnUser, hostname)
+ if err == nil {
+ return ip, nil
+ }
+ if errors.Is(err, installer.ErrorNotFound) {
+ time.Sleep(5 * time.Second)
+ }
+ }
+ }
+}
diff --git a/core/installer/welcome/dodo_app.go b/core/installer/welcome/dodo_app.go
index 7f3d383..9a93afd 100644
--- a/core/installer/welcome/dodo_app.go
+++ b/core/installer/welcome/dodo_app.go
@@ -104,6 +104,7 @@
nsc installer.NamespaceCreator
jc installer.JobCreator
vpnKeyGen installer.VPNAPIClient
+ cnc installer.ClusterNetworkConfigurator
workers map[string]map[string]struct{}
appConfigs map[string]appConfig
tmplts dodoAppTmplts
@@ -136,6 +137,7 @@
nsc installer.NamespaceCreator,
jc installer.JobCreator,
vpnKeyGen installer.VPNAPIClient,
+ cnc installer.ClusterNetworkConfigurator,
env installer.EnvConfig,
external bool,
fetchUsersAddr string,
@@ -171,6 +173,7 @@
nsc,
jc,
vpnKeyGen,
+ cnc,
map[string]map[string]struct{}{},
map[string]appConfig{},
tmplts,
@@ -1073,7 +1076,7 @@
return err
}
hf := installer.NewGitHelmFetcher()
- m, err := installer.NewAppManager(configRepo, s.nsc, s.jc, hf, s.vpnKeyGen, "/")
+ m, err := installer.NewAppManager(configRepo, s.nsc, s.jc, hf, s.vpnKeyGen, s.cnc, "/")
if err != nil {
return err
}
@@ -1220,7 +1223,7 @@
return installer.ReleaseResources{}, err
}
hf := installer.NewGitHelmFetcher()
- m, err := installer.NewAppManager(repo, s.nsc, s.jc, hf, s.vpnKeyGen, "/.dodo")
+ m, err := installer.NewAppManager(repo, s.nsc, s.jc, hf, s.vpnKeyGen, s.cnc, "/.dodo")
if err != nil {
return installer.ReleaseResources{}, err
}
diff --git a/core/installer/welcome/env_test.go b/core/installer/welcome/env_test.go
index ee80e4e..439fdf2 100644
--- a/core/installer/welcome/env_test.go
+++ b/core/installer/welcome/env_test.go
@@ -3,6 +3,7 @@
import (
"bytes"
"encoding/json"
+ "fmt"
"io"
"io/fs"
"log"
@@ -256,7 +257,13 @@
if err := m.m.Add(name, task); err != nil {
return err
} else {
- task.OnDone(m.onDone)
+ task.OnDone(func(err error) {
+ if err == nil {
+ m.onDone(nil)
+ } else {
+ m.onDone(fmt.Errorf("%s: %s", name, err))
+ }
+ })
return nil
}
}