fluxcd-reconciler, env creator calling it every 30 sec to sync the cluster
diff --git a/core/fluxcd-reconciler/main.go b/core/fluxcd-reconciler/main.go
new file mode 100644
index 0000000..9220fec
--- /dev/null
+++ b/core/fluxcd-reconciler/main.go
@@ -0,0 +1,170 @@
+package main
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "log"
+ "net/http"
+ "time"
+
+ "github.com/gorilla/mux"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/client-go/dynamic"
+ "k8s.io/client-go/rest"
+ "k8s.io/client-go/tools/clientcmd"
+)
+
+var port = flag.Int("port", 8080, "Port to listen on")
+var kubeconfig = flag.String("kubeconfig", "", "Path to kubeconfig file")
+
+const reconcileAnnotation = "reconcile.fluxcd.io/requestedAt"
+const reconcileAtLayout = time.RFC3339Nano
+
+type Server struct {
+ port int
+ client dynamic.Interface
+}
+
+func NewServer(port int, client dynamic.Interface) *Server {
+ return &Server{port, client}
+}
+
+func (s *Server) Start() {
+ r := mux.NewRouter()
+ r.Path("/source/git/{namespace}/{name}/reconcile").Methods("GET").HandlerFunc(s.sourceGitReconcile)
+ r.Path("/kustomization/{namespace}/{name}/reconcile").Methods("GET").HandlerFunc(s.kustomizationReconcile)
+ http.Handle("/", r)
+ log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", s.port), nil))
+}
+
+func getReconciledAt(obj *unstructured.Unstructured) (string, error) {
+ status, ok := obj.Object["status"]
+ if !ok {
+ return "", fmt.Errorf("status not found")
+ }
+ statusMap, ok := status.(map[string]interface{})
+ if !ok {
+ return "", fmt.Errorf("status not map")
+ }
+ val, ok := statusMap["lastHandledReconcileAt"]
+ if !ok {
+ return "", fmt.Errorf("lastHandledReconcileAt not found in status")
+ }
+ valStr, ok := val.(string)
+ if !ok {
+ return "", fmt.Errorf("lastHandledReconcileAt not string")
+ }
+ return valStr, nil
+}
+
+func reconcile(
+ client dynamic.Interface,
+ res schema.GroupVersionResource,
+ namespace string,
+ name string,
+) error {
+ unstr, err := client.Resource(res).Namespace(namespace).Get(context.TODO(), name, metav1.GetOptions{})
+ if err != nil {
+ return err
+ }
+ timeNowTime := time.Now()
+ annotations := unstr.GetAnnotations()
+ annotations[reconcileAnnotation] = timeNowTime.Format(reconcileAtLayout)
+ unstr.SetAnnotations(annotations)
+ unstr, err = client.Resource(res).Namespace(namespace).Update(context.TODO(), unstr, metav1.UpdateOptions{})
+ if err != nil {
+ return err
+ }
+ for {
+ unstr, err := client.Resource(res).Namespace(namespace).Get(context.TODO(), name, metav1.GetOptions{})
+ if err != nil {
+ return err
+ }
+ reconciledAt, err := getReconciledAt(unstr)
+ if err != nil {
+ return err
+ }
+ reconciledAtTime, err := time.Parse(reconcileAtLayout, reconciledAt)
+ if err != nil {
+ return err
+ }
+ reconciledAtTime = reconciledAtTime.Add(3 * time.Hour)
+ if reconciledAtTime.After(timeNowTime) {
+ return nil
+ }
+ }
+}
+
+func (s *Server) sourceGitReconcile(w http.ResponseWriter, r *http.Request) {
+ vars := mux.Vars(r)
+ namespace, ok := vars["namespace"]
+ if !ok {
+ http.Error(w, "namespace missing", http.StatusBadRequest)
+ return
+ }
+ name, ok := vars["name"]
+ if !ok {
+ http.Error(w, "name missing", http.StatusBadRequest)
+ return
+ }
+ res := schema.GroupVersionResource{
+ Group: "source.toolkit.fluxcd.io",
+ Version: "v1",
+ Resource: "gitrepositories",
+ }
+ if err := reconcile(s.client, res, namespace, name); err != nil {
+ http.Error(w, "error", http.StatusInternalServerError)
+ return
+ }
+}
+
+func (s *Server) kustomizationReconcile(w http.ResponseWriter, r *http.Request) {
+ vars := mux.Vars(r)
+ namespace, ok := vars["namespace"]
+ if !ok {
+ http.Error(w, "namespace missing", http.StatusBadRequest)
+ return
+ }
+ name, ok := vars["name"]
+ if !ok {
+ http.Error(w, "name missing", http.StatusBadRequest)
+ return
+ }
+ res := schema.GroupVersionResource{
+ Group: "kustomize.toolkit.fluxcd.io",
+ Version: "v1",
+ Resource: "kustomizations",
+ }
+ if err := reconcile(s.client, res, namespace, name); err != nil {
+ http.Error(w, "error", http.StatusInternalServerError)
+ return
+ }
+}
+
+func NewKubeClient(kubeconfig string) (dynamic.Interface, error) {
+ if kubeconfig == "" {
+ config, err := rest.InClusterConfig()
+ if err != nil {
+ return nil, err
+ }
+ return dynamic.NewForConfig(config)
+ } else {
+ config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
+ if err != nil {
+ return nil, err
+ }
+ return dynamic.NewForConfig(config)
+ }
+}
+
+func main() {
+ flag.Parse()
+ client, err := NewKubeClient(*kubeconfig)
+ if err != nil {
+ log.Fatal(err)
+ }
+ NewServer(*port, client).Start()
+}