| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 1 | package main |
| 2 | |
| 3 | import ( |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 4 | "context" |
| giolekva | 88d6e35 | 2020-04-30 13:32:38 +0400 | [diff] [blame] | 5 | "errors" |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 6 | "flag" |
| 7 | "fmt" |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 8 | "io" |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 9 | "io/ioutil" |
| 10 | "log" |
| 11 | "net/http" |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 12 | |
| 13 | apiv1 "k8s.io/api/core/v1" |
| 14 | metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| 15 | "k8s.io/client-go/kubernetes" |
| 16 | corev1 "k8s.io/client-go/kubernetes/typed/core/v1" |
| 17 | "k8s.io/client-go/rest" |
| 18 | "k8s.io/client-go/tools/clientcmd" |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 19 | |
| 20 | "github.com/giolekva/pcloud/controller/schema" |
| 21 | |
| 22 | "github.com/golang/glog" |
| 23 | "github.com/itaysk/regogo" |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 24 | ) |
| 25 | |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 26 | var kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file.") |
| 27 | |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 28 | var port = flag.Int("port", 123, "Port to listen on.") |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 29 | var dgraphGqlAddress = flag.String("graphql_address", "", "GraphQL server address.") |
| 30 | var dgraphSchemaAddress = flag.String("dgraph_admin_address", "", "Dgraph server admin address.") |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 31 | |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 32 | const imgJson = `{ objectPath: \"%s\"}` |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 33 | const insertQuery = `mutation { add%s(input: [%s]) { %s { id } } }` |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 34 | const getQuery = `{ "query": "{ get%s(id: \"%s\") { id objectPath } } " }` |
| 35 | |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 36 | type MinioWebhook struct { |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 37 | gql schema.GraphQLClient |
| 38 | pods corev1.PodInterface |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 39 | } |
| 40 | |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 41 | func (m *MinioWebhook) minioHandler(w http.ResponseWriter, r *http.Request) { |
| giolekva | 60e87d3 | 2020-05-01 23:07:42 +0400 | [diff] [blame] | 42 | // TODO(giolekva): move this to events processor |
| 43 | resp := "" |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 44 | id, err := regogo.Get(resp, "input.addImage.image[0].id") |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 45 | if err != nil { |
| giolekva | cba39b8 | 2020-04-18 20:56:05 +0400 | [diff] [blame] | 46 | glog.Error(err) |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 47 | http.Error(w, "Could not extract node id", http.StatusInternalServerError) |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 48 | return |
| 49 | } |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 50 | glog.Infof("New image id: %s", id.String()) |
| 51 | pod := &apiv1.Pod{ |
| 52 | ObjectMeta: metav1.ObjectMeta{ |
| 53 | Name: fmt.Sprintf("detect-faces-%s", id.String())}, |
| 54 | Spec: apiv1.PodSpec{ |
| 55 | RestartPolicy: apiv1.RestartPolicyNever, |
| 56 | Containers: []apiv1.Container{{ |
| 57 | Name: "detect-faces", |
| 58 | Image: "face-detector:latest", |
| 59 | ImagePullPolicy: apiv1.PullNever, |
| 60 | Command: []string{"python", "main.py"}, |
| 61 | Args: []string{"http://pcloud-controller-service.pcloud.svc:1111/graphql", "http://minio-hl-svc.minio.svc:9000", id.String()}}}}} |
| 62 | glog.Info("Creating pod...") |
| 63 | result, err := m.pods.Create(context.TODO(), pod, metav1.CreateOptions{}) |
| 64 | if err != nil { |
| 65 | glog.Error(err) |
| 66 | http.Error(w, "Could not start face detector", http.StatusInternalServerError) |
| 67 | return |
| 68 | } |
| 69 | glog.Infof("Created deployment %q.\n", result.GetObjectMeta().GetName()) |
| 70 | } |
| 71 | |
| giolekva | 88d6e35 | 2020-04-30 13:32:38 +0400 | [diff] [blame] | 72 | type query struct { |
| 73 | query string |
| 74 | operation string |
| 75 | variables string |
| 76 | } |
| 77 | |
| 78 | func extractQuery(r *http.Request) (*query, error) { |
| 79 | if r.Method == "GET" { |
| 80 | if err := r.ParseForm(); err != nil { |
| 81 | return nil, err |
| 82 | } |
| 83 | q, ok := r.Form["query"] |
| 84 | if !ok || len(q) != 1 { |
| 85 | return nil, errors.New("Exactly one query must be provided") |
| 86 | } |
| 87 | return &query{query: q[0]}, nil |
| 88 | } else { |
| 89 | body, err := ioutil.ReadAll(r.Body) |
| 90 | if err != nil { |
| 91 | return nil, err |
| 92 | } |
| 93 | q, err := regogo.Get(string(body), "input.query") |
| 94 | if err != nil { |
| 95 | return nil, err |
| 96 | } |
| 97 | return &query{query: q.String()}, nil |
| 98 | } |
| 99 | } |
| 100 | |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 101 | func (m *MinioWebhook) graphqlHandler(w http.ResponseWriter, r *http.Request) { |
| 102 | glog.Infof("New GraphQL query received: %s", r.Method) |
| giolekva | 88d6e35 | 2020-04-30 13:32:38 +0400 | [diff] [blame] | 103 | q, err := extractQuery(r) |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 104 | if err != nil { |
| giolekva | 88d6e35 | 2020-04-30 13:32:38 +0400 | [diff] [blame] | 105 | glog.Error(err.Error()) |
| 106 | http.Error(w, "Could not extract query", http.StatusBadRequest) |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 107 | } |
| giolekva | 88d6e35 | 2020-04-30 13:32:38 +0400 | [diff] [blame] | 108 | resp, err := m.gql.RunQuery(q.query) |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 109 | if err != nil { |
| 110 | glog.Error(err) |
| 111 | http.Error(w, err.Error(), http.StatusInternalServerError) |
| 112 | return |
| 113 | } |
| 114 | io.WriteString(w, resp) |
| 115 | w.Header().Set("Content-Type", "application/json") |
| 116 | } |
| 117 | |
| 118 | func getKubeConfig() (*rest.Config, error) { |
| 119 | if *kubeconfig != "" { |
| 120 | return clientcmd.BuildConfigFromFlags("", *kubeconfig) |
| 121 | } else { |
| 122 | return rest.InClusterConfig() |
| 123 | } |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 124 | } |
| 125 | |
| 126 | func main() { |
| 127 | flag.Parse() |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 128 | |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 129 | config, err := getKubeConfig() |
| 130 | if err != nil { |
| 131 | panic(err.Error()) |
| 132 | } |
| 133 | clientset, err := kubernetes.NewForConfig(config) |
| giolekva | c76b21b | 2020-04-18 19:28:43 +0400 | [diff] [blame] | 134 | if err != nil { |
| 135 | panic(err) |
| 136 | } |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 137 | pods := clientset.CoreV1().Pods("pcloud") |
| 138 | |
| 139 | gqlClient, err := schema.NewDgraphClient( |
| 140 | *dgraphGqlAddress, *dgraphSchemaAddress) |
| 141 | if err != nil { |
| 142 | panic(err) |
| 143 | } |
| giolekva | 26a8b5f | 2020-05-01 20:01:13 +0400 | [diff] [blame] | 144 | err = gqlClient.SetSchema(` |
| 145 | enum EventState { |
| 146 | NEW |
| 147 | PROCESSING |
| 148 | DONE |
| 149 | } |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 150 | |
| giolekva | 26a8b5f | 2020-05-01 20:01:13 +0400 | [diff] [blame] | 151 | type Foo { bar: Int }`) |
| 152 | if err != nil { |
| 153 | panic(err) |
| 154 | } |
| 155 | err = gqlClient.AddSchema(` |
| 156 | type Image { |
| 157 | id: ID! |
| 158 | objectPath: String! @search(by: [exact]) |
| 159 | } |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 160 | |
| giolekva | 26a8b5f | 2020-05-01 20:01:13 +0400 | [diff] [blame] | 161 | type ImageSegment { |
| 162 | id: ID! |
| 163 | upperLeftX: Float! |
| 164 | upperLeftY: Float! |
| 165 | lowerRightX: Float! |
| 166 | lowerRightY: Float! |
| 167 | sourceImage: Image! @hasInverse(field: segments) |
| 168 | } |
| 169 | |
| 170 | extend type Image { |
| 171 | segments: [ImageSegment] @hasInverse(field: sourceImage) |
| 172 | }`) |
| 173 | if err != nil { |
| 174 | panic(err) |
| 175 | } |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 176 | mw := MinioWebhook{gqlClient, pods} |
| giolekva | fb52e0d | 2020-04-23 22:52:13 +0400 | [diff] [blame] | 177 | http.HandleFunc("/graphql", mw.graphqlHandler) |
| giolekva | 07f6be9 | 2020-04-16 21:09:30 +0400 | [diff] [blame] | 178 | log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), nil)) |
| 179 | } |