blob: c3a008684fa0cebe2a0a1034769856451c90fa55 [file] [log] [blame]
gio1cd65152024-08-16 08:18:49 +04001package tasks
2
3import (
giof6ad2982024-08-23 17:42:49 +04004 "fmt"
5 "path/filepath"
6
gio1cd65152024-08-16 08:18:49 +04007 "github.com/giolekva/pcloud/core/installer"
giof6ad2982024-08-23 17:42:49 +04008 "github.com/giolekva/pcloud/core/installer/cluster"
9 "github.com/giolekva/pcloud/core/installer/soft"
giof8acc612025-04-26 08:20:55 +040010 "github.com/giolekva/pcloud/core/installer/status"
gio1cd65152024-08-16 08:18:49 +040011)
12
13type InstallFunc func() (installer.ReleaseResources, error)
14
15type dynamicTaskSlice struct {
16 t []Task
17}
18
19func (d *dynamicTaskSlice) Tasks() []Task {
20 return d.t
21}
22
23func (d *dynamicTaskSlice) Append(t Task) {
24 d.t = append(d.t, t)
25}
26
giof8acc612025-04-26 08:20:55 +040027func NewInstallTask(mon status.ResourceMonitor, fn InstallFunc) Task {
gio1cd65152024-08-16 08:18:49 +040028 d := &dynamicTaskSlice{t: []Task{}}
29 var rr installer.ReleaseResources
30 done := make(chan error)
31 installTask := newLeafTask("Downloading configuration files", func() error {
32 var err error
33 rr, err = fn()
34 return err
35 })
36 d.Append(&installTask)
37 installTask.OnDone(func(err error) {
38 if err != nil {
39 done <- err
40 return
41 }
42 monTasks := NewMonitorReleaseTasks(mon, rr)
43 for _, mt := range monTasks {
44 d.Append(mt)
45 }
46 monitor := newConcurrentParentTask("Monitor", true, monTasks...)
47 monitor.OnDone(func(err error) {
48 done <- err
49 })
50 monitor.Start()
51 })
52 start := func() error {
53 installTask.Start()
54 return <-done
55 }
56 t := newParentTask("Installing application", true, start, d)
57 return &t
58}
giof6ad2982024-08-23 17:42:49 +040059
gio8f290322024-09-21 15:37:45 +040060func NewClusterInitTask(m cluster.Manager, server cluster.Server, cnc installer.ClusterNetworkConfigurator, repo soft.RepoIO, setupFn cluster.ClusterIngressSetupFunc) Task {
giof6ad2982024-08-23 17:42:49 +040061 d := &dynamicTaskSlice{t: []Task{}}
62 done := make(chan error)
63 setupTask := newLeafTask(fmt.Sprintf("Installing dodo on %s", server.IP.String()), func() error {
64 _, err := m.Init(server, setupFn)
65 return err
66 })
67 d.Append(&setupTask)
68 setupTask.OnDone(func(err error) {
69 if err != nil {
70 done <- err
71 return
72 }
73 if err := cnc.AddCluster(m.State().Name, m.State().IngressIP); err != nil {
74 done <- err
75 return
76 }
77 _, err = repo.Do(func(fs soft.RepoFS) (string, error) {
78 if err := soft.WriteJson(fs, fmt.Sprintf("/clusters/%s/config.json", m.State().Name), m.State()); err != nil {
79 return "", err
80 }
81 return fmt.Sprintf("add server to cluster: %s", m.State().Name), nil
82 })
83 done <- err
84 })
85 start := func() error {
86 setupTask.Start()
87 return <-done
88 }
89 t := newParentTask("Installing application", true, start, d)
90 return &t
91}
92
gio8f290322024-09-21 15:37:45 +040093func NewRemoveClusterTask(m cluster.Manager, cnc installer.ClusterNetworkConfigurator, repo soft.RepoIO) Task {
giof6ad2982024-08-23 17:42:49 +040094 t := newLeafTask(fmt.Sprintf("Removing %s cluster", m.State().Name), func() error {
95 if err := cnc.RemoveCluster(m.State().Name, m.State().IngressIP); err != nil {
96 return err
97 }
98 _, err := repo.Do(func(fs soft.RepoFS) (string, error) {
99 if err := fs.RemoveAll(fmt.Sprintf("/clusters/%s", m.State().Name)); err != nil {
100 return "", err
101 }
102 kustPath := filepath.Join("/clusters", "kustomization.yaml")
103 kust, err := soft.ReadKustomization(fs, kustPath)
104 if err != nil {
105 return "", err
106 }
107 kust.RemoveResources(m.State().Name)
108 soft.WriteYaml(fs, kustPath, kust)
109 return fmt.Sprintf("remove cluster: %s", m.State().Name), nil
110 })
111 return err
112 })
113 return &t
114}
115
116func NewClusterJoinControllerTask(m cluster.Manager, server cluster.Server, repo soft.RepoIO) Task {
117 d := &dynamicTaskSlice{t: []Task{}}
118 done := make(chan error)
119 setupTask := newLeafTask(fmt.Sprintf("Joining %s to %s cluster", server.IP.String(), m.State().Name), func() error {
120 return m.JoinController(server)
121 })
122 d.Append(&setupTask)
123 setupTask.OnDone(func(err error) {
124 if err != nil {
125 done <- err
126 return
127 }
128 _, err = repo.Do(func(fs soft.RepoFS) (string, error) {
129 if err := soft.WriteJson(fs, fmt.Sprintf("/clusters/%s/config.json", m.State().Name), m.State()); err != nil {
130 return "", err
131 }
132 return fmt.Sprintf("add controller server to cluster: %s", m.State().Name), nil
133 })
134 done <- err
135 })
136 start := func() error {
137 setupTask.Start()
138 return <-done
139 }
140 t := newParentTask("Installing application", true, start, d)
141 return &t
142}
143
144func NewClusterJoinWorkerTask(m cluster.Manager, server cluster.Server, repo soft.RepoIO) Task {
145 d := &dynamicTaskSlice{t: []Task{}}
146 done := make(chan error)
147 setupTask := newLeafTask(fmt.Sprintf("Joining %s to %s cluster", server.IP.String(), m.State().Name), func() error {
148 return m.JoinWorker(server)
149 })
150 d.Append(&setupTask)
151 setupTask.OnDone(func(err error) {
152 if err != nil {
153 done <- err
154 return
155 }
156 _, err = repo.Do(func(fs soft.RepoFS) (string, error) {
157 if err := soft.WriteJson(fs, fmt.Sprintf("/clusters/%s/config.json", m.State().Name), m.State()); err != nil {
158 return "", err
159 }
160 return fmt.Sprintf("add worker server to cluster: %s", m.State().Name), nil
161 })
162 done <- err
163 })
164 start := func() error {
165 setupTask.Start()
166 return <-done
167 }
168 t := newParentTask("Installing application", true, start, d)
169 return &t
170}
171
172func NewClusterRemoveServerTask(m cluster.Manager, server string, repo soft.RepoIO) Task {
173 d := &dynamicTaskSlice{t: []Task{}}
174 done := make(chan error)
175 setupTask := newLeafTask(fmt.Sprintf("Removing %s from %s cluster", server, m.State().Name), func() error {
176 return m.RemoveServer(server)
177 })
178 d.Append(&setupTask)
179 setupTask.OnDone(func(err error) {
180 if err != nil {
181 done <- err
182 return
183 }
184 _, err = repo.Do(func(fs soft.RepoFS) (string, error) {
185 if err := soft.WriteJson(fs, fmt.Sprintf("/clusters/%s/config.json", m.State().Name), m.State()); err != nil {
186 return "", err
187 }
188 return fmt.Sprintf("remove %s from cluster: %s", server, m.State().Name), nil
189 })
190 done <- err
191 })
192 start := func() error {
193 setupTask.Start()
194 return <-done
195 }
196 t := newParentTask("Installing application", true, start, d)
197 return &t
198}
gio8f290322024-09-21 15:37:45 +0400199
200func NewClusterSetupTask(m cluster.Manager, setupFn cluster.ClusterSetupFunc, repo soft.RepoIO, msg string) Task {
201 d := &dynamicTaskSlice{t: []Task{}}
202 done := make(chan error)
203 setupTask := newLeafTask(msg, func() error {
204 return setupFn(m)
205 })
206 d.Append(&setupTask)
207 setupTask.OnDone(func(err error) {
208 if err != nil {
209 done <- err
210 return
211 }
212 _, err = repo.Do(func(fs soft.RepoFS) (string, error) {
213 if err := soft.WriteJson(fs, fmt.Sprintf("/clusters/%s/config.json", m.State().Name), m.State()); err != nil {
214 return "", err
215 }
216 return msg, nil
217 })
218 done <- err
219 })
220 start := func() error {
221 setupTask.Start()
222 return <-done
223 }
224 t := newParentTask("Installing application", true, start, d)
225 return &t
226}