| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 1 | package main |
| 2 | |
| 3 | import ( |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame^] | 4 | "context" |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 5 | "flag" |
| 6 | "fmt" |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame^] | 7 | "io" |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 8 | "io/ioutil" |
| 9 | "log" |
| 10 | "net/http" |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame^] | 11 | |
| 12 | apiv1 "k8s.io/api/core/v1" |
| 13 | metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| 14 | "k8s.io/client-go/kubernetes" |
| 15 | corev1 "k8s.io/client-go/kubernetes/typed/core/v1" |
| 16 | "k8s.io/client-go/rest" |
| 17 | "k8s.io/client-go/tools/clientcmd" |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 18 | |
| 19 | "github.com/giolekva/pcloud/controller/schema" |
| 20 | |
| 21 | "github.com/golang/glog" |
| 22 | "github.com/itaysk/regogo" |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 23 | ) |
| 24 | |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame^] | 25 | var kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file.") |
| 26 | |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 27 | var port = flag.Int("port", 123, "Port to listen on.") |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame^] | 28 | var dgraphGqlAddress = flag.String("graphql_address", "", "GraphQL server address.") |
| 29 | var dgraphSchemaAddress = flag.String("dgraph_admin_address", "", "Dgraph server admin address.") |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 30 | |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 31 | const imgJson = `{ objectPath: \"%s\"}` |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame^] | 32 | const insertQuery = `mutation { add%s(input: [%s]) { %s { id } } }` |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 33 | const getQuery = `{ "query": "{ get%s(id: \"%s\") { id objectPath } } " }` |
| 34 | |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 35 | type MinioWebhook struct { |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame^] | 36 | gql schema.GraphQLClient |
| 37 | pods corev1.PodInterface |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 38 | } |
| 39 | |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame^] | 40 | func (m *MinioWebhook) minioHandler(w http.ResponseWriter, r *http.Request) { |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 41 | body, err := ioutil.ReadAll(r.Body) |
| giolekva | cba39b8 | 2020-04-18 20:56:05 +0400 | [diff] [blame] | 42 | if err != nil { |
| 43 | glog.Error(err) |
| 44 | http.Error(w, "Could not read HTTP request body", http.StatusInternalServerError) |
| 45 | return |
| 46 | } |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 47 | if len(body) == 0 { |
| 48 | return |
| 49 | } |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 50 | glog.Infof("Received event from Minio: %s", string(body)) |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 51 | key, err := regogo.Get(string(body), "input.Key") |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 52 | if err != nil { |
| giolekva | cba39b8 | 2020-04-18 20:56:05 +0400 | [diff] [blame] | 53 | glog.Error(err) |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 54 | http.Error(w, "Could not find object key", http.StatusBadRequest) |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 55 | return |
| 56 | } |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame^] | 57 | resp, err := m.gql.RunQuery(fmt.Sprintf( |
| 58 | "mutation { addImage(input: [{objectPath: \"%s\"}]) { image { id }} }", |
| 59 | key.String())) |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 60 | if err != nil { |
| giolekva | cba39b8 | 2020-04-18 20:56:05 +0400 | [diff] [blame] | 61 | glog.Error(err) |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 62 | http.Error(w, "Can not add given objects", http.StatusInternalServerError) |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 63 | return |
| 64 | } |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame^] | 65 | id, err := regogo.Get(resp, "input.addImage.image[0].id") |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 66 | if err != nil { |
| giolekva | cba39b8 | 2020-04-18 20:56:05 +0400 | [diff] [blame] | 67 | glog.Error(err) |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 68 | http.Error(w, "Could not extract node id", http.StatusInternalServerError) |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 69 | return |
| 70 | } |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame^] | 71 | glog.Infof("New image id: %s", id.String()) |
| 72 | pod := &apiv1.Pod{ |
| 73 | ObjectMeta: metav1.ObjectMeta{ |
| 74 | Name: fmt.Sprintf("detect-faces-%s", id.String())}, |
| 75 | Spec: apiv1.PodSpec{ |
| 76 | RestartPolicy: apiv1.RestartPolicyNever, |
| 77 | Containers: []apiv1.Container{{ |
| 78 | Name: "detect-faces", |
| 79 | Image: "face-detector:latest", |
| 80 | ImagePullPolicy: apiv1.PullNever, |
| 81 | Command: []string{"python", "main.py"}, |
| 82 | Args: []string{"http://pcloud-controller-service.pcloud.svc:1111/graphql", "http://minio-hl-svc.minio.svc:9000", id.String()}}}}} |
| 83 | glog.Info("Creating pod...") |
| 84 | result, err := m.pods.Create(context.TODO(), pod, metav1.CreateOptions{}) |
| 85 | if err != nil { |
| 86 | glog.Error(err) |
| 87 | http.Error(w, "Could not start face detector", http.StatusInternalServerError) |
| 88 | return |
| 89 | } |
| 90 | glog.Infof("Created deployment %q.\n", result.GetObjectMeta().GetName()) |
| 91 | } |
| 92 | |
| 93 | func (m *MinioWebhook) graphqlHandler(w http.ResponseWriter, r *http.Request) { |
| 94 | glog.Infof("New GraphQL query received: %s", r.Method) |
| 95 | err := r.ParseForm() |
| 96 | if err != nil { |
| 97 | glog.Error(err) |
| 98 | http.Error(w, "Could not read query", http.StatusInternalServerError) |
| 99 | return |
| 100 | } |
| 101 | query, ok := r.Form["query"] |
| 102 | if !ok || len(query) != 1 { |
| 103 | http.Error(w, "Exactly ouery parameter must be provided", http.StatusBadRequest) |
| 104 | return |
| 105 | } |
| 106 | resp, err := m.gql.RunQuery(query[0]) |
| 107 | if err != nil { |
| 108 | glog.Error(err) |
| 109 | http.Error(w, err.Error(), http.StatusInternalServerError) |
| 110 | return |
| 111 | } |
| 112 | io.WriteString(w, resp) |
| 113 | w.Header().Set("Content-Type", "application/json") |
| 114 | } |
| 115 | |
| 116 | func getKubeConfig() (*rest.Config, error) { |
| 117 | if *kubeconfig != "" { |
| 118 | return clientcmd.BuildConfigFromFlags("", *kubeconfig) |
| 119 | } else { |
| 120 | return rest.InClusterConfig() |
| 121 | } |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 122 | } |
| 123 | |
| 124 | func main() { |
| 125 | flag.Parse() |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 126 | |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame^] | 127 | config, err := getKubeConfig() |
| 128 | if err != nil { |
| 129 | panic(err.Error()) |
| 130 | } |
| 131 | clientset, err := kubernetes.NewForConfig(config) |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 132 | if err != nil { |
| 133 | panic(err) |
| 134 | } |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame^] | 135 | pods := clientset.CoreV1().Pods("pcloud") |
| 136 | |
| 137 | gqlClient, err := schema.NewDgraphClient( |
| 138 | *dgraphGqlAddress, *dgraphSchemaAddress) |
| 139 | if err != nil { |
| 140 | panic(err) |
| 141 | } |
| 142 | // err = gqlClient.SetSchema(` |
| 143 | // type Image { |
| 144 | // id: ID! |
| 145 | // objectPath: String! @search(by: [exact]) |
| 146 | // } |
| 147 | |
| 148 | // type ImageSegment { |
| 149 | // id: ID! |
| 150 | // upperLeftX: Int! |
| 151 | // upperLeftY: Int! |
| 152 | // lowerRightX: Int! |
| 153 | // lowerRightY: Int! |
| 154 | // sourceImage: Image! |
| 155 | // objectPath: String |
| 156 | // } |
| 157 | |
| 158 | // extend type Image { |
| 159 | // segments: [ImageSegment] @hasInverse(field: sourceImage) |
| 160 | // }`) |
| 161 | // if err != nil { |
| 162 | // panic(err) |
| 163 | // } |
| 164 | mw := MinioWebhook{gqlClient, pods} |
| 165 | http.HandleFunc("/minio_webhook", mw.minioHandler) |
| 166 | http.HandleFunc("/graphql", mw.graphqlHandler) |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 167 | log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), nil)) |
| 168 | } |