blob: 8c141834246da461766240d564c4cefe73fec22d [file] [log] [blame]
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +04001package main
2
3import (
gioefa0ed42024-06-13 12:31:43 +04004 "crypto/rand"
giob1c4e542024-07-15 12:10:52 +04005 "encoding/base64"
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +04006 "encoding/json"
7 "flag"
8 "fmt"
9 "io"
10 "log"
gioefa0ed42024-06-13 12:31:43 +040011 "math/big"
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +040012 "net/http"
13 "os"
14 "strconv"
15 "strings"
gioefa0ed42024-06-13 12:31:43 +040016 "sync"
17 "time"
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +040018
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +040019 "github.com/giolekva/pcloud/core/installer/soft"
20
21 "golang.org/x/crypto/ssh"
22)
23
Davit Tabidze6bf29832024-06-17 16:51:54 +040024const (
25 secretLength = 20
26)
27
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +040028var port = flag.Int("port", 8080, "Port to listen on")
29var repoAddr = flag.String("repo-addr", "", "Git repository address where Helm releases are stored")
30var sshKey = flag.String("ssh-key", "", "Path to SHH key used to connect with Git repository")
31var ingressNginxPath = flag.String("ingress-nginx-path", "", "Path to the ingress-nginx Helm release")
giod28f83c2024-08-15 10:53:40 +040032var minPreOpenPorts = flag.Int("min-pre-open-ports", 5, "Minimum number of pre-open ports to keep in reserve")
33var preOpenPortsBatchSize = flag.Int("pre-open-ports-batch-size", 10, "Number of new ports to open at a time")
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +040034
35type client interface {
giod28f83c2024-08-15 10:53:40 +040036 ReservePort() (int, string, error)
37 ReleaseReservedPort(port int)
38 AddPortForwarding(protocol string, port int, secret, dest string) error
39 RemovePortForwarding(protocol string, port int) error
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +040040}
41
42type repoClient struct {
giod28f83c2024-08-15 10:53:40 +040043 l sync.Locker
44 repo soft.RepoIO
45 path string
46 minPreOpenPorts int
47 preOpenPortsBatchSize int
48 preOpenPorts []int
49 blocklist map[int]struct{}
50 reserve map[int]string
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +040051}
52
giod28f83c2024-08-15 10:53:40 +040053func newRepoClient(
54 repo soft.RepoIO,
55 path string,
56 minPreOpenPorts int,
57 preOpenPortsBatchSize int,
58) (client, error) {
59 ret := &repoClient{
60 l: &sync.Mutex{},
61 repo: repo,
62 path: path,
63 minPreOpenPorts: minPreOpenPorts,
64 preOpenPortsBatchSize: preOpenPortsBatchSize,
65 }
66 r, err := repo.Reader(fmt.Sprintf("%s-state.json", path))
67 if err != nil {
68 // TODO(gio): create empty file on init
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +040069 return nil, err
70 }
giod28f83c2024-08-15 10:53:40 +040071 defer r.Close()
72 var st state
73 if err := json.NewDecoder(r).Decode(&st); err != nil {
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +040074 return nil, err
75 }
giod28f83c2024-08-15 10:53:40 +040076 ret.preOpenPorts = st.PreOpenPorts
77 ret.blocklist = st.Blocklist
78 ret.reserve = map[int]string{}
79 if len(ret.preOpenPorts) < minPreOpenPorts {
80 if err := ret.preOpenNewPorts(); err != nil {
81 return nil, err
82 }
83 }
84 return ret, nil
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +040085}
86
giod28f83c2024-08-15 10:53:40 +040087func (c *repoClient) ReservePort() (int, string, error) {
88 c.l.Lock()
89 defer c.l.Unlock()
90 if len(c.preOpenPorts) == 0 {
91 return -1, "", fmt.Errorf("no pre-open ports are available")
92 }
93 port := c.preOpenPorts[0]
94 c.preOpenPorts = c.preOpenPorts[1:]
95 secret, err := generateSecret()
96 if err != nil {
97 return -1, "", err
98 }
99 c.reserve[port] = secret
100 return port, secret, nil
101}
102
103func (c *repoClient) ReleaseReservedPort(port int) {
104 c.l.Lock()
105 defer c.l.Unlock()
106 delete(c.reserve, port)
107 c.preOpenPorts = append(c.preOpenPorts, port)
108}
109
110type state struct {
111 PreOpenPorts []int `json:"preOpenPorts"`
112 Blocklist map[int]struct{} `json:"blocklist"`
113}
114
115func (c *repoClient) preOpenNewPorts() error {
116 c.l.Lock()
117 defer c.l.Unlock()
118 if len(c.preOpenPorts) >= c.minPreOpenPorts {
119 return nil
120 }
121 var ports []int
122 for count := c.preOpenPortsBatchSize; count > 0; count-- {
123 generated := false
124 for i := 0; i < 3; i++ {
125 r, err := rand.Int(rand.Reader, big.NewInt(end-start))
126 if err != nil {
127 return err
128 }
129 p := start + int(r.Int64())
130 if _, ok := c.blocklist[p]; !ok {
131 generated = true
132 ports = append(ports, p)
133 c.preOpenPorts = append(c.preOpenPorts, p)
134 c.blocklist[p] = struct{}{}
135 break
136 }
137 }
138 if !generated {
139 return fmt.Errorf("could not open new port")
140 }
141 }
142 return c.repo.Do(func(fs soft.RepoFS) (string, error) {
143 if err := c.writeState(fs); err != nil {
144 return "", err
145 }
146 rel, err := c.readRelease(fs)
147 if err != nil {
148 return "", err
149 }
150 tcp, err := extractPorts(rel, "spec.values.controller.service.nodePorts.tcp")
151 if err != nil {
152 return "", err
153 }
154 udp, err := extractPorts(rel, "spec.values.controller.service.nodePorts.udp")
155 if err != nil {
156 return "", err
157 }
158 for _, p := range ports {
159 ps := strconv.Itoa(p)
160 tcp[ps] = p
161 udp[ps] = p
162 }
163 if err := c.writeRelease(fs, rel); err != nil {
164 return "", err
165 }
166 fmt.Printf("Pre opened new ports: %s\n", ports)
167 return "preopen new ports", nil
168 })
169}
170
171func (c *repoClient) AddPortForwarding(protocol string, port int, secret, dest string) error {
172 defer func() {
173 go func() {
174 if err := c.preOpenNewPorts(); err != nil {
175 panic(err)
176 }
177 }()
178 }()
179 c.l.Lock()
180 defer c.l.Unlock()
181 if sec, ok := c.reserve[port]; !ok || sec != secret {
182 return fmt.Errorf("wrong secret")
183 }
184 delete(c.reserve, port)
185 return c.repo.Do(func(fs soft.RepoFS) (string, error) {
186 if err := c.writeState(fs); err != nil {
187 return "", err
188 }
189 rel, err := c.readRelease(fs)
190 if err != nil {
191 return "", err
192 }
193 portStr := strconv.Itoa(port)
194 switch protocol {
195 case "tcp":
196 tcp, err := extractPorts(rel, "spec.values.tcp")
197 if err != nil {
198 return "", err
199 }
200 tcp[portStr] = dest
201 case "udp":
202 udp, err := extractPorts(rel, "spec.values.udp")
203 if err != nil {
204 return "", err
205 }
206 udp[portStr] = dest
207 default:
208 panic("MUST NOT REACH")
209 }
210 if err := c.writeRelease(fs, rel); err != nil {
211 return "", err
212 }
213 return fmt.Sprintf("ingress: port %s map %d %s", protocol, port, dest), nil
214 })
215}
216
217func (c *repoClient) RemovePortForwarding(protocol string, port int) error {
218 c.l.Lock()
219 defer c.l.Unlock()
220 return c.repo.Do(func(fs soft.RepoFS) (string, error) {
221 rel, err := c.readRelease(fs)
222 if err != nil {
223 return "", err
224 }
225 switch protocol {
226 case "tcp":
227 tcp, err := extractPorts(rel, "spec.values.tcp")
228 if err != nil {
229 return "", err
230 }
231 if err := removePort(tcp, port); err != nil {
232 return "", err
233 }
234 case "udp":
235 udp, err := extractPorts(rel, "spec.values.udp")
236 if err != nil {
237 return "", err
238 }
239 if err := removePort(udp, port); err != nil {
240 return "", err
241 }
242 default:
243 panic("MUST NOT REACH")
244 }
245 svcTCP, err := extractPorts(rel, "spec.values.controller.service.nodePorts.tcp")
246 if err != nil {
247 return "", err
248 }
249 svcUDP, err := extractPorts(rel, "spec.values.controller.service.nodePorts.udp")
250 if err != nil {
251 return "", err
252 }
253 if err := removePort(svcTCP, port); err != nil {
254 return "", err
255 }
256 if err := removePort(svcUDP, port); err != nil {
257 return "", err
258 }
259 if err := c.writeRelease(fs, rel); err != nil {
260 return "", err
261 }
262 return fmt.Sprintf("ingress: remove %s port map %d", protocol, port), nil
263 })
264}
265
266func (c *repoClient) writeState(fs soft.RepoFS) error {
267 w, err := fs.Writer(fmt.Sprintf("%s-state.json", c.path))
268 if err != nil {
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +0400269 return err
270 }
giod28f83c2024-08-15 10:53:40 +0400271 defer w.Close()
272 if err := json.NewEncoder(w).Encode(state{c.preOpenPorts, c.blocklist}); err != nil {
273 return err
274 }
275 return err
276}
277
278func (c *repoClient) readRelease(fs soft.RepoFS) (map[string]any, error) {
279 ret := map[string]any{}
280 if err := soft.ReadYaml(fs, c.path, &ret); err != nil {
281 return nil, err
282 }
283 return ret, nil
284}
285
286func (c *repoClient) writeRelease(fs soft.RepoFS, rel map[string]any) error {
287 return soft.WriteYaml(fs, c.path, rel)
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +0400288}
289
290type server struct {
giod28f83c2024-08-15 10:53:40 +0400291 s *http.Server
292 r *http.ServeMux
293 client client
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +0400294}
295
296func newServer(port int, client client) *server {
297 r := http.NewServeMux()
298 s := &http.Server{
299 Addr: fmt.Sprintf(":%d", port),
300 Handler: r,
301 }
giod28f83c2024-08-15 10:53:40 +0400302 return &server{s, r, client}
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +0400303}
304
305func (s *server) Start() error {
gioefa0ed42024-06-13 12:31:43 +0400306 s.r.HandleFunc("/api/reserve", s.handleReserve)
giocdfa3722024-06-13 20:10:14 +0400307 s.r.HandleFunc("/api/allocate", s.handleAllocate)
308 s.r.HandleFunc("/api/remove", s.handleRemove)
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +0400309 if err := s.s.ListenAndServe(); err != nil && err != http.ErrServerClosed {
310 return err
311 }
312 return nil
313}
314
315func (s *server) Close() error {
316 return s.s.Close()
317}
318
319type allocateReq struct {
320 Protocol string `json:"protocol"`
321 SourcePort int `json:"sourcePort"`
322 TargetService string `json:"targetService"`
323 TargetPort int `json:"targetPort"`
giobd7ab0b2024-06-17 12:55:17 +0400324 Secret string `json:"secret"`
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +0400325}
326
giocdfa3722024-06-13 20:10:14 +0400327type removeReq struct {
328 Protocol string `json:"protocol"`
329 SourcePort int `json:"sourcePort"`
330 TargetService string `json:"targetService"`
331 TargetPort int `json:"targetPort"`
332}
333
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +0400334func extractAllocateReq(r io.Reader) (allocateReq, error) {
335 var req allocateReq
336 if err := json.NewDecoder(r).Decode(&req); err != nil {
337 return allocateReq{}, err
338 }
339 req.Protocol = strings.ToLower(req.Protocol)
340 if req.Protocol != "tcp" && req.Protocol != "udp" {
341 return allocateReq{}, fmt.Errorf("Unexpected protocol %s", req.Protocol)
342 }
343 return req, nil
344}
345
giocdfa3722024-06-13 20:10:14 +0400346func extractRemoveReq(r io.Reader) (removeReq, error) {
347 var req removeReq
348 if err := json.NewDecoder(r).Decode(&req); err != nil {
349 return removeReq{}, err
350 }
351 req.Protocol = strings.ToLower(req.Protocol)
352 if req.Protocol != "tcp" && req.Protocol != "udp" {
353 return removeReq{}, fmt.Errorf("Unexpected protocol %s", req.Protocol)
354 }
355 return req, nil
356}
357
gioefa0ed42024-06-13 12:31:43 +0400358type reserveResp struct {
359 Port int `json:"port"`
360 Secret string `json:"secret"`
361}
362
giod28f83c2024-08-15 10:53:40 +0400363func extractPorts(data map[string]any, path string) (map[string]any, error) {
364 for _, i := range strings.Split(path, ".") {
365 val, ok := data[i]
366 if !ok {
367 return nil, fmt.Errorf("%s not found", i)
368 }
369 valM, ok := val.(map[string]any)
370 if !ok {
371 return nil, fmt.Errorf("%s is not a map", i)
372 }
373 data = valM
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +0400374 }
giod28f83c2024-08-15 10:53:40 +0400375 return data, nil
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +0400376}
377
giod28f83c2024-08-15 10:53:40 +0400378func addPort(pm map[string]any, sourcePort int, targetService string, targetPort int) error {
379 sourcePortStr := strconv.Itoa(sourcePort)
380 if _, ok := pm[sourcePortStr]; ok || sourcePort == 80 || sourcePort == 443 || sourcePort == 22 {
381 return fmt.Errorf("port %d is already taken", sourcePort)
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +0400382 }
giod28f83c2024-08-15 10:53:40 +0400383 pm[sourcePortStr] = fmt.Sprintf("%s:%d", targetService, targetPort)
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +0400384 return nil
385}
386
giod28f83c2024-08-15 10:53:40 +0400387func removePort(pm map[string]any, port int) error {
388 sourcePortStr := strconv.Itoa(port)
giocdfa3722024-06-13 20:10:14 +0400389 if _, ok := pm[sourcePortStr]; !ok {
giod28f83c2024-08-15 10:53:40 +0400390 return fmt.Errorf("port %d is not open to remove", port)
giocdfa3722024-06-13 20:10:14 +0400391 }
392 delete(pm, sourcePortStr)
393 return nil
394}
395
gioefa0ed42024-06-13 12:31:43 +0400396const start = 49152
397const end = 65535
398
giod28f83c2024-08-15 10:53:40 +0400399func updateNodePorts(rel map[string]any, protocol string, pm map[string]any) error {
400 spec, ok := rel["spec"]
401 if !ok {
402 return fmt.Errorf("spec not found")
403 }
404 specM, ok := spec.(map[string]any)
405 if !ok {
406 return fmt.Errorf("spec is not a map")
407 }
408 values, ok := specM["values"]
409 if !ok {
410 return fmt.Errorf("spec.values not found")
411 }
412 valuesM, ok := values.(map[string]any)
413 if !ok {
414 return fmt.Errorf("spec.values is not a map")
415 }
416 controller, ok := valuesM["controller"]
417 if !ok {
418 return fmt.Errorf("spec.values.controller not found")
419 }
420 controllerM, ok := controller.(map[string]any)
421 if !ok {
422 return fmt.Errorf("spec.values.controller is not a map")
423 }
424 service, ok := controllerM["service"]
425 if !ok {
426 return fmt.Errorf("spec.values.controller.service not found")
427 }
428 serviceM, ok := service.(map[string]any)
429 if !ok {
430 return fmt.Errorf("spec.values.controller.service is not a map")
431 }
432 nodePorts, ok := serviceM["nodePorts"]
433 if !ok {
434 return fmt.Errorf("spec.values.controller.service.nodePorts not found")
435 }
436 nodePortsM, ok := nodePorts.(map[string]any)
437 if !ok {
438 return fmt.Errorf("spec.values.controller.service.nodePorts is not a map")
439 }
440 npm := map[string]any{}
441 for p, _ := range pm {
442 if v, err := strconv.Atoi(p); err != nil {
443 return err
444 } else {
445 npm[p] = v
gioefa0ed42024-06-13 12:31:43 +0400446 }
447 }
giod28f83c2024-08-15 10:53:40 +0400448 nodePortsM[protocol] = npm
449 return nil
gioefa0ed42024-06-13 12:31:43 +0400450}
451
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +0400452func (s *server) handleAllocate(w http.ResponseWriter, r *http.Request) {
453 if r.Method != http.MethodPost {
454 http.Error(w, "only post method is supported", http.StatusBadRequest)
455 return
456 }
457 req, err := extractAllocateReq(r.Body)
458 if err != nil {
giod28f83c2024-08-15 10:53:40 +0400459 fmt.Println(err.Error())
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +0400460 http.Error(w, err.Error(), http.StatusBadRequest)
461 return
462 }
giod28f83c2024-08-15 10:53:40 +0400463 if err := s.client.AddPortForwarding(
464 req.Protocol,
465 req.SourcePort,
466 req.Secret,
467 fmt.Sprintf("%s:%d", req.TargetService, req.TargetPort),
468 ); err != nil {
469 fmt.Println(err.Error())
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +0400470 http.Error(w, err.Error(), http.StatusInternalServerError)
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +0400471 }
472}
473
gioefa0ed42024-06-13 12:31:43 +0400474func (s *server) handleReserve(w http.ResponseWriter, r *http.Request) {
475 if r.Method != http.MethodPost {
476 http.Error(w, "only post method is supported", http.StatusBadRequest)
477 return
478 }
gioefa0ed42024-06-13 12:31:43 +0400479 var port int
giod28f83c2024-08-15 10:53:40 +0400480 var secret string
481 var err error
482 if port, secret, err = s.client.ReservePort(); err != nil {
483 fmt.Println(err.Error())
gioefa0ed42024-06-13 12:31:43 +0400484 http.Error(w, err.Error(), http.StatusInternalServerError)
485 return
486 }
gioefa0ed42024-06-13 12:31:43 +0400487 go func() {
488 time.Sleep(30 * time.Minute)
giod28f83c2024-08-15 10:53:40 +0400489 s.client.ReleaseReservedPort(port)
gioefa0ed42024-06-13 12:31:43 +0400490 }()
giod28f83c2024-08-15 10:53:40 +0400491 if err := json.NewEncoder(w).Encode(reserveResp{port, secret}); err != nil {
492 fmt.Println(err.Error())
gioefa0ed42024-06-13 12:31:43 +0400493 http.Error(w, err.Error(), http.StatusInternalServerError)
494 return
495 }
496}
497
giocdfa3722024-06-13 20:10:14 +0400498func (s *server) handleRemove(w http.ResponseWriter, r *http.Request) {
499 if r.Method != http.MethodPost {
500 http.Error(w, "only post method is supported", http.StatusBadRequest)
501 return
502 }
503 req, err := extractRemoveReq(r.Body)
504 if err != nil {
giod28f83c2024-08-15 10:53:40 +0400505 fmt.Println(err.Error())
giocdfa3722024-06-13 20:10:14 +0400506 http.Error(w, err.Error(), http.StatusBadRequest)
507 return
508 }
giod28f83c2024-08-15 10:53:40 +0400509 if err := s.client.RemovePortForwarding(req.Protocol, req.SourcePort); err != nil {
510 fmt.Println(err.Error())
giocdfa3722024-06-13 20:10:14 +0400511 http.Error(w, err.Error(), http.StatusInternalServerError)
512 return
513 }
giocdfa3722024-06-13 20:10:14 +0400514}
515
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +0400516// TODO(gio): deduplicate
gioe72b54f2024-04-22 10:44:41 +0400517func createRepoClient(addr string, keyPath string) (soft.RepoIO, error) {
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +0400518 sshKey, err := os.ReadFile(keyPath)
519 if err != nil {
520 return nil, err
521 }
522 signer, err := ssh.ParsePrivateKey(sshKey)
523 if err != nil {
524 return nil, err
525 }
526 repoAddr, err := soft.ParseRepositoryAddress(addr)
527 if err != nil {
528 return nil, err
529 }
530 repo, err := soft.CloneRepository(repoAddr, signer)
531 if err != nil {
532 return nil, err
533 }
gioff2a29a2024-05-01 17:06:42 +0400534 return soft.NewRepoIO(repo, signer)
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +0400535}
536
Davit Tabidze6bf29832024-06-17 16:51:54 +0400537func generateSecret() (string, error) {
538 b := make([]byte, secretLength)
539 _, err := rand.Read(b)
540 if err != nil {
541 return "", fmt.Errorf("error generating secret: %v", err)
542 }
giob1c4e542024-07-15 12:10:52 +0400543 return base64.StdEncoding.EncodeToString(b), nil
gioefa0ed42024-06-13 12:31:43 +0400544}
545
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +0400546func main() {
547 flag.Parse()
548 repo, err := createRepoClient(*repoAddr, *sshKey)
549 if err != nil {
550 log.Fatal(err)
551 }
giod28f83c2024-08-15 10:53:40 +0400552 c, err := newRepoClient(
553 repo,
554 *ingressNginxPath,
555 *minPreOpenPorts,
556 *preOpenPortsBatchSize,
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +0400557 )
giod28f83c2024-08-15 10:53:40 +0400558 if err != nil {
559 log.Fatal(err)
560 }
561 s := newServer(*port, c)
Giorgi Lekveishvilib59b7c22024-04-03 22:17:50 +0400562 log.Fatal(s.Start())
563}