blob: 9220fecdf909dc5e583b76218e89e58a3142debb [file] [log] [blame]
Giorgi Lekveishvili2df23db2023-12-14 07:55:22 +04001package main
2
3import (
4 "context"
5 "flag"
6 "fmt"
7 "log"
8 "net/http"
9 "time"
10
11 "github.com/gorilla/mux"
12 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
14 "k8s.io/apimachinery/pkg/runtime/schema"
15 "k8s.io/client-go/dynamic"
16 "k8s.io/client-go/rest"
17 "k8s.io/client-go/tools/clientcmd"
18)
19
20var port = flag.Int("port", 8080, "Port to listen on")
21var kubeconfig = flag.String("kubeconfig", "", "Path to kubeconfig file")
22
23const reconcileAnnotation = "reconcile.fluxcd.io/requestedAt"
24const reconcileAtLayout = time.RFC3339Nano
25
26type Server struct {
27 port int
28 client dynamic.Interface
29}
30
31func NewServer(port int, client dynamic.Interface) *Server {
32 return &Server{port, client}
33}
34
35func (s *Server) Start() {
36 r := mux.NewRouter()
37 r.Path("/source/git/{namespace}/{name}/reconcile").Methods("GET").HandlerFunc(s.sourceGitReconcile)
38 r.Path("/kustomization/{namespace}/{name}/reconcile").Methods("GET").HandlerFunc(s.kustomizationReconcile)
39 http.Handle("/", r)
40 log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", s.port), nil))
41}
42
43func getReconciledAt(obj *unstructured.Unstructured) (string, error) {
44 status, ok := obj.Object["status"]
45 if !ok {
46 return "", fmt.Errorf("status not found")
47 }
48 statusMap, ok := status.(map[string]interface{})
49 if !ok {
50 return "", fmt.Errorf("status not map")
51 }
52 val, ok := statusMap["lastHandledReconcileAt"]
53 if !ok {
54 return "", fmt.Errorf("lastHandledReconcileAt not found in status")
55 }
56 valStr, ok := val.(string)
57 if !ok {
58 return "", fmt.Errorf("lastHandledReconcileAt not string")
59 }
60 return valStr, nil
61}
62
63func reconcile(
64 client dynamic.Interface,
65 res schema.GroupVersionResource,
66 namespace string,
67 name string,
68) error {
69 unstr, err := client.Resource(res).Namespace(namespace).Get(context.TODO(), name, metav1.GetOptions{})
70 if err != nil {
71 return err
72 }
73 timeNowTime := time.Now()
74 annotations := unstr.GetAnnotations()
75 annotations[reconcileAnnotation] = timeNowTime.Format(reconcileAtLayout)
76 unstr.SetAnnotations(annotations)
77 unstr, err = client.Resource(res).Namespace(namespace).Update(context.TODO(), unstr, metav1.UpdateOptions{})
78 if err != nil {
79 return err
80 }
81 for {
82 unstr, err := client.Resource(res).Namespace(namespace).Get(context.TODO(), name, metav1.GetOptions{})
83 if err != nil {
84 return err
85 }
86 reconciledAt, err := getReconciledAt(unstr)
87 if err != nil {
88 return err
89 }
90 reconciledAtTime, err := time.Parse(reconcileAtLayout, reconciledAt)
91 if err != nil {
92 return err
93 }
94 reconciledAtTime = reconciledAtTime.Add(3 * time.Hour)
95 if reconciledAtTime.After(timeNowTime) {
96 return nil
97 }
98 }
99}
100
101func (s *Server) sourceGitReconcile(w http.ResponseWriter, r *http.Request) {
102 vars := mux.Vars(r)
103 namespace, ok := vars["namespace"]
104 if !ok {
105 http.Error(w, "namespace missing", http.StatusBadRequest)
106 return
107 }
108 name, ok := vars["name"]
109 if !ok {
110 http.Error(w, "name missing", http.StatusBadRequest)
111 return
112 }
113 res := schema.GroupVersionResource{
114 Group: "source.toolkit.fluxcd.io",
115 Version: "v1",
116 Resource: "gitrepositories",
117 }
118 if err := reconcile(s.client, res, namespace, name); err != nil {
119 http.Error(w, "error", http.StatusInternalServerError)
120 return
121 }
122}
123
124func (s *Server) kustomizationReconcile(w http.ResponseWriter, r *http.Request) {
125 vars := mux.Vars(r)
126 namespace, ok := vars["namespace"]
127 if !ok {
128 http.Error(w, "namespace missing", http.StatusBadRequest)
129 return
130 }
131 name, ok := vars["name"]
132 if !ok {
133 http.Error(w, "name missing", http.StatusBadRequest)
134 return
135 }
136 res := schema.GroupVersionResource{
137 Group: "kustomize.toolkit.fluxcd.io",
138 Version: "v1",
139 Resource: "kustomizations",
140 }
141 if err := reconcile(s.client, res, namespace, name); err != nil {
142 http.Error(w, "error", http.StatusInternalServerError)
143 return
144 }
145}
146
147func NewKubeClient(kubeconfig string) (dynamic.Interface, error) {
148 if kubeconfig == "" {
149 config, err := rest.InClusterConfig()
150 if err != nil {
151 return nil, err
152 }
153 return dynamic.NewForConfig(config)
154 } else {
155 config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
156 if err != nil {
157 return nil, err
158 }
159 return dynamic.NewForConfig(config)
160 }
161}
162
163func main() {
164 flag.Parse()
165 client, err := NewKubeClient(*kubeconfig)
166 if err != nil {
167 log.Fatal(err)
168 }
169 NewServer(*port, client).Start()
170}