blob: 11498ad409fcec5c1762623f1e5a472eecad7fd5 [file] [log] [blame]
giolekva07f6be92020-04-16 21:09:30 +04001package main
2
3import (
giolekvafb52e0d2020-04-23 22:52:13 +04004 "context"
giolekva88d6e352020-04-30 13:32:38 +04005 "errors"
giolekva07f6be92020-04-16 21:09:30 +04006 "flag"
7 "fmt"
giolekvafb52e0d2020-04-23 22:52:13 +04008 "io"
giolekva07f6be92020-04-16 21:09:30 +04009 "io/ioutil"
10 "log"
11 "net/http"
giolekvafb52e0d2020-04-23 22:52:13 +040012
13 apiv1 "k8s.io/api/core/v1"
14 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15 "k8s.io/client-go/kubernetes"
16 corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
17 "k8s.io/client-go/rest"
18 "k8s.io/client-go/tools/clientcmd"
giolekvac76b21b2020-04-18 19:28:43 +040019
20 "github.com/giolekva/pcloud/controller/schema"
21
22 "github.com/golang/glog"
23 "github.com/itaysk/regogo"
giolekva07f6be92020-04-16 21:09:30 +040024)
25
giolekvafb52e0d2020-04-23 22:52:13 +040026var kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file.")
27
giolekva07f6be92020-04-16 21:09:30 +040028var port = flag.Int("port", 123, "Port to listen on.")
giolekvafb52e0d2020-04-23 22:52:13 +040029var dgraphGqlAddress = flag.String("graphql_address", "", "GraphQL server address.")
30var dgraphSchemaAddress = flag.String("dgraph_admin_address", "", "Dgraph server admin address.")
giolekva07f6be92020-04-16 21:09:30 +040031
giolekvac76b21b2020-04-18 19:28:43 +040032const imgJson = `{ objectPath: \"%s\"}`
giolekvafb52e0d2020-04-23 22:52:13 +040033const insertQuery = `mutation { add%s(input: [%s]) { %s { id } } }`
giolekvac76b21b2020-04-18 19:28:43 +040034const getQuery = `{ "query": "{ get%s(id: \"%s\") { id objectPath } } " }`
35
giolekvac76b21b2020-04-18 19:28:43 +040036type MinioWebhook struct {
giolekvafb52e0d2020-04-23 22:52:13 +040037 gql schema.GraphQLClient
38 pods corev1.PodInterface
giolekvac76b21b2020-04-18 19:28:43 +040039}
40
giolekvafb52e0d2020-04-23 22:52:13 +040041func (m *MinioWebhook) minioHandler(w http.ResponseWriter, r *http.Request) {
giolekva60e87d32020-05-01 23:07:42 +040042 // TODO(giolekva): move this to events processor
43 resp := ""
giolekvafb52e0d2020-04-23 22:52:13 +040044 id, err := regogo.Get(resp, "input.addImage.image[0].id")
giolekva07f6be92020-04-16 21:09:30 +040045 if err != nil {
giolekvacba39b82020-04-18 20:56:05 +040046 glog.Error(err)
giolekvac76b21b2020-04-18 19:28:43 +040047 http.Error(w, "Could not extract node id", http.StatusInternalServerError)
giolekva07f6be92020-04-16 21:09:30 +040048 return
49 }
giolekvafb52e0d2020-04-23 22:52:13 +040050 glog.Infof("New image id: %s", id.String())
51 pod := &apiv1.Pod{
52 ObjectMeta: metav1.ObjectMeta{
53 Name: fmt.Sprintf("detect-faces-%s", id.String())},
54 Spec: apiv1.PodSpec{
55 RestartPolicy: apiv1.RestartPolicyNever,
56 Containers: []apiv1.Container{{
57 Name: "detect-faces",
58 Image: "face-detector:latest",
59 ImagePullPolicy: apiv1.PullNever,
60 Command: []string{"python", "main.py"},
61 Args: []string{"http://pcloud-controller-service.pcloud.svc:1111/graphql", "http://minio-hl-svc.minio.svc:9000", id.String()}}}}}
62 glog.Info("Creating pod...")
63 result, err := m.pods.Create(context.TODO(), pod, metav1.CreateOptions{})
64 if err != nil {
65 glog.Error(err)
66 http.Error(w, "Could not start face detector", http.StatusInternalServerError)
67 return
68 }
69 glog.Infof("Created deployment %q.\n", result.GetObjectMeta().GetName())
70}
71
giolekva88d6e352020-04-30 13:32:38 +040072type query struct {
73 query string
74 operation string
75 variables string
76}
77
78func extractQuery(r *http.Request) (*query, error) {
79 if r.Method == "GET" {
80 if err := r.ParseForm(); err != nil {
81 return nil, err
82 }
83 q, ok := r.Form["query"]
84 if !ok || len(q) != 1 {
85 return nil, errors.New("Exactly one query must be provided")
86 }
87 return &query{query: q[0]}, nil
88 } else {
89 body, err := ioutil.ReadAll(r.Body)
90 if err != nil {
91 return nil, err
92 }
93 q, err := regogo.Get(string(body), "input.query")
94 if err != nil {
95 return nil, err
96 }
97 return &query{query: q.String()}, nil
98 }
99}
100
giolekvafb52e0d2020-04-23 22:52:13 +0400101func (m *MinioWebhook) graphqlHandler(w http.ResponseWriter, r *http.Request) {
102 glog.Infof("New GraphQL query received: %s", r.Method)
giolekva88d6e352020-04-30 13:32:38 +0400103 q, err := extractQuery(r)
giolekvafb52e0d2020-04-23 22:52:13 +0400104 if err != nil {
giolekva88d6e352020-04-30 13:32:38 +0400105 glog.Error(err.Error())
106 http.Error(w, "Could not extract query", http.StatusBadRequest)
giolekvafb52e0d2020-04-23 22:52:13 +0400107 }
giolekva88d6e352020-04-30 13:32:38 +0400108 resp, err := m.gql.RunQuery(q.query)
giolekvafb52e0d2020-04-23 22:52:13 +0400109 if err != nil {
110 glog.Error(err)
111 http.Error(w, err.Error(), http.StatusInternalServerError)
112 return
113 }
114 io.WriteString(w, resp)
115 w.Header().Set("Content-Type", "application/json")
116}
117
118func getKubeConfig() (*rest.Config, error) {
119 if *kubeconfig != "" {
120 return clientcmd.BuildConfigFromFlags("", *kubeconfig)
121 } else {
122 return rest.InClusterConfig()
123 }
giolekva07f6be92020-04-16 21:09:30 +0400124}
125
126func main() {
127 flag.Parse()
giolekvac76b21b2020-04-18 19:28:43 +0400128
giolekvafb52e0d2020-04-23 22:52:13 +0400129 config, err := getKubeConfig()
130 if err != nil {
131 panic(err.Error())
132 }
133 clientset, err := kubernetes.NewForConfig(config)
giolekvac76b21b2020-04-18 19:28:43 +0400134 if err != nil {
135 panic(err)
136 }
giolekvafb52e0d2020-04-23 22:52:13 +0400137 pods := clientset.CoreV1().Pods("pcloud")
138
139 gqlClient, err := schema.NewDgraphClient(
140 *dgraphGqlAddress, *dgraphSchemaAddress)
141 if err != nil {
142 panic(err)
143 }
giolekva26a8b5f2020-05-01 20:01:13 +0400144 err = gqlClient.SetSchema(`
145enum EventState {
146 NEW
147 PROCESSING
148 DONE
149}
giolekvafb52e0d2020-04-23 22:52:13 +0400150
giolekva26a8b5f2020-05-01 20:01:13 +0400151type Foo { bar: Int }`)
152 if err != nil {
153 panic(err)
154 }
155 err = gqlClient.AddSchema(`
156 type Image {
157 id: ID!
158 objectPath: String! @search(by: [exact])
159 }
giolekvafb52e0d2020-04-23 22:52:13 +0400160
giolekva26a8b5f2020-05-01 20:01:13 +0400161 type ImageSegment {
162 id: ID!
163 upperLeftX: Float!
164 upperLeftY: Float!
165 lowerRightX: Float!
166 lowerRightY: Float!
167 sourceImage: Image! @hasInverse(field: segments)
168 }
169
170 extend type Image {
171 segments: [ImageSegment] @hasInverse(field: sourceImage)
172 }`)
173 if err != nil {
174 panic(err)
175 }
giolekvafb52e0d2020-04-23 22:52:13 +0400176 mw := MinioWebhook{gqlClient, pods}
giolekvafb52e0d2020-04-23 22:52:13 +0400177 http.HandleFunc("/graphql", mw.graphqlHandler)
giolekva07f6be92020-04-16 21:09:30 +0400178 log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), nil))
179}