| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 1 | package main |
| 2 | |
| 3 | import ( |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 4 | "context" |
| giolekva | 88d6e35 | 2020-04-30 13:32:38 +0400 | [diff] [blame^] | 5 | "errors" |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 6 | "flag" |
| 7 | "fmt" |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 8 | "io" |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 9 | "io/ioutil" |
| 10 | "log" |
| 11 | "net/http" |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 12 | |
| 13 | apiv1 "k8s.io/api/core/v1" |
| 14 | metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| 15 | "k8s.io/client-go/kubernetes" |
| 16 | corev1 "k8s.io/client-go/kubernetes/typed/core/v1" |
| 17 | "k8s.io/client-go/rest" |
| 18 | "k8s.io/client-go/tools/clientcmd" |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 19 | |
| 20 | "github.com/giolekva/pcloud/controller/schema" |
| 21 | |
| 22 | "github.com/golang/glog" |
| 23 | "github.com/itaysk/regogo" |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 24 | ) |
| 25 | |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 26 | var kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file.") |
| 27 | |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 28 | var port = flag.Int("port", 123, "Port to listen on.") |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 29 | var dgraphGqlAddress = flag.String("graphql_address", "", "GraphQL server address.") |
| 30 | var dgraphSchemaAddress = flag.String("dgraph_admin_address", "", "Dgraph server admin address.") |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 31 | |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 32 | const imgJson = `{ objectPath: \"%s\"}` |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 33 | const insertQuery = `mutation { add%s(input: [%s]) { %s { id } } }` |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 34 | const getQuery = `{ "query": "{ get%s(id: \"%s\") { id objectPath } } " }` |
| 35 | |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 36 | type MinioWebhook struct { |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 37 | gql schema.GraphQLClient |
| 38 | pods corev1.PodInterface |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 39 | } |
| 40 | |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 41 | func (m *MinioWebhook) minioHandler(w http.ResponseWriter, r *http.Request) { |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 42 | body, err := ioutil.ReadAll(r.Body) |
| giolekva | cba39b8 | 2020-04-18 20:56:05 +0400 | [diff] [blame] | 43 | if err != nil { |
| 44 | glog.Error(err) |
| 45 | http.Error(w, "Could not read HTTP request body", http.StatusInternalServerError) |
| 46 | return |
| 47 | } |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 48 | if len(body) == 0 { |
| 49 | return |
| 50 | } |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 51 | glog.Infof("Received event from Minio: %s", string(body)) |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 52 | key, err := regogo.Get(string(body), "input.Key") |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 53 | if err != nil { |
| giolekva | cba39b8 | 2020-04-18 20:56:05 +0400 | [diff] [blame] | 54 | glog.Error(err) |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 55 | http.Error(w, "Could not find object key", http.StatusBadRequest) |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 56 | return |
| 57 | } |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 58 | resp, err := m.gql.RunQuery(fmt.Sprintf( |
| 59 | "mutation { addImage(input: [{objectPath: \"%s\"}]) { image { id }} }", |
| 60 | key.String())) |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 61 | if err != nil { |
| giolekva | cba39b8 | 2020-04-18 20:56:05 +0400 | [diff] [blame] | 62 | glog.Error(err) |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 63 | http.Error(w, "Can not add given objects", http.StatusInternalServerError) |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 64 | return |
| 65 | } |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 66 | id, err := regogo.Get(resp, "input.addImage.image[0].id") |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 67 | if err != nil { |
| giolekva | cba39b8 | 2020-04-18 20:56:05 +0400 | [diff] [blame] | 68 | glog.Error(err) |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 69 | http.Error(w, "Could not extract node id", http.StatusInternalServerError) |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 70 | return |
| 71 | } |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 72 | glog.Infof("New image id: %s", id.String()) |
| 73 | pod := &apiv1.Pod{ |
| 74 | ObjectMeta: metav1.ObjectMeta{ |
| 75 | Name: fmt.Sprintf("detect-faces-%s", id.String())}, |
| 76 | Spec: apiv1.PodSpec{ |
| 77 | RestartPolicy: apiv1.RestartPolicyNever, |
| 78 | Containers: []apiv1.Container{{ |
| 79 | Name: "detect-faces", |
| 80 | Image: "face-detector:latest", |
| 81 | ImagePullPolicy: apiv1.PullNever, |
| 82 | Command: []string{"python", "main.py"}, |
| 83 | Args: []string{"http://pcloud-controller-service.pcloud.svc:1111/graphql", "http://minio-hl-svc.minio.svc:9000", id.String()}}}}} |
| 84 | glog.Info("Creating pod...") |
| 85 | result, err := m.pods.Create(context.TODO(), pod, metav1.CreateOptions{}) |
| 86 | if err != nil { |
| 87 | glog.Error(err) |
| 88 | http.Error(w, "Could not start face detector", http.StatusInternalServerError) |
| 89 | return |
| 90 | } |
| 91 | glog.Infof("Created deployment %q.\n", result.GetObjectMeta().GetName()) |
| 92 | } |
| 93 | |
| giolekva | 88d6e35 | 2020-04-30 13:32:38 +0400 | [diff] [blame^] | 94 | type query struct { |
| 95 | query string |
| 96 | operation string |
| 97 | variables string |
| 98 | } |
| 99 | |
| 100 | func extractQuery(r *http.Request) (*query, error) { |
| 101 | if r.Method == "GET" { |
| 102 | if err := r.ParseForm(); err != nil { |
| 103 | return nil, err |
| 104 | } |
| 105 | q, ok := r.Form["query"] |
| 106 | if !ok || len(q) != 1 { |
| 107 | return nil, errors.New("Exactly one query must be provided") |
| 108 | } |
| 109 | return &query{query: q[0]}, nil |
| 110 | } else { |
| 111 | body, err := ioutil.ReadAll(r.Body) |
| 112 | if err != nil { |
| 113 | return nil, err |
| 114 | } |
| 115 | q, err := regogo.Get(string(body), "input.query") |
| 116 | if err != nil { |
| 117 | return nil, err |
| 118 | } |
| 119 | return &query{query: q.String()}, nil |
| 120 | } |
| 121 | } |
| 122 | |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 123 | func (m *MinioWebhook) graphqlHandler(w http.ResponseWriter, r *http.Request) { |
| 124 | glog.Infof("New GraphQL query received: %s", r.Method) |
| giolekva | 88d6e35 | 2020-04-30 13:32:38 +0400 | [diff] [blame^] | 125 | q, err := extractQuery(r) |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 126 | if err != nil { |
| giolekva | 88d6e35 | 2020-04-30 13:32:38 +0400 | [diff] [blame^] | 127 | glog.Error(err.Error()) |
| 128 | http.Error(w, "Could not extract query", http.StatusBadRequest) |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 129 | } |
| giolekva | 88d6e35 | 2020-04-30 13:32:38 +0400 | [diff] [blame^] | 130 | resp, err := m.gql.RunQuery(q.query) |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 131 | if err != nil { |
| 132 | glog.Error(err) |
| 133 | http.Error(w, err.Error(), http.StatusInternalServerError) |
| 134 | return |
| 135 | } |
| 136 | io.WriteString(w, resp) |
| 137 | w.Header().Set("Content-Type", "application/json") |
| 138 | } |
| 139 | |
| 140 | func getKubeConfig() (*rest.Config, error) { |
| 141 | if *kubeconfig != "" { |
| 142 | return clientcmd.BuildConfigFromFlags("", *kubeconfig) |
| 143 | } else { |
| 144 | return rest.InClusterConfig() |
| 145 | } |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 146 | } |
| 147 | |
| 148 | func main() { |
| 149 | flag.Parse() |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 150 | |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 151 | config, err := getKubeConfig() |
| 152 | if err != nil { |
| 153 | panic(err.Error()) |
| 154 | } |
| 155 | clientset, err := kubernetes.NewForConfig(config) |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 156 | if err != nil { |
| 157 | panic(err) |
| 158 | } |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 159 | pods := clientset.CoreV1().Pods("pcloud") |
| 160 | |
| 161 | gqlClient, err := schema.NewDgraphClient( |
| 162 | *dgraphGqlAddress, *dgraphSchemaAddress) |
| 163 | if err != nil { |
| 164 | panic(err) |
| 165 | } |
| 166 | // err = gqlClient.SetSchema(` |
| 167 | // type Image { |
| 168 | // id: ID! |
| 169 | // objectPath: String! @search(by: [exact]) |
| 170 | // } |
| 171 | |
| 172 | // type ImageSegment { |
| 173 | // id: ID! |
| 174 | // upperLeftX: Int! |
| 175 | // upperLeftY: Int! |
| 176 | // lowerRightX: Int! |
| 177 | // lowerRightY: Int! |
| 178 | // sourceImage: Image! |
| 179 | // objectPath: String |
| 180 | // } |
| 181 | |
| 182 | // extend type Image { |
| 183 | // segments: [ImageSegment] @hasInverse(field: sourceImage) |
| 184 | // }`) |
| 185 | // if err != nil { |
| 186 | // panic(err) |
| 187 | // } |
| 188 | mw := MinioWebhook{gqlClient, pods} |
| 189 | http.HandleFunc("/minio_webhook", mw.minioHandler) |
| 190 | http.HandleFunc("/graphql", mw.graphqlHandler) |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 191 | log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), nil)) |
| 192 | } |