blob: 4802b3edcc266badc1b5066d9afcb9a205cf0643 [file] [log] [blame]
giolekva07f6be92020-04-16 21:09:30 +04001package main
2
3import (
giolekvafb52e0d2020-04-23 22:52:13 +04004 "context"
giolekva88d6e352020-04-30 13:32:38 +04005 "errors"
giolekva07f6be92020-04-16 21:09:30 +04006 "flag"
7 "fmt"
giolekvafb52e0d2020-04-23 22:52:13 +04008 "io"
giolekva07f6be92020-04-16 21:09:30 +04009 "io/ioutil"
10 "log"
11 "net/http"
giolekvafb52e0d2020-04-23 22:52:13 +040012
13 apiv1 "k8s.io/api/core/v1"
14 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15 "k8s.io/client-go/kubernetes"
16 corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
17 "k8s.io/client-go/rest"
18 "k8s.io/client-go/tools/clientcmd"
giolekvac76b21b2020-04-18 19:28:43 +040019
20 "github.com/giolekva/pcloud/controller/schema"
21
22 "github.com/golang/glog"
23 "github.com/itaysk/regogo"
giolekva07f6be92020-04-16 21:09:30 +040024)
25
giolekvafb52e0d2020-04-23 22:52:13 +040026var kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file.")
27
giolekva07f6be92020-04-16 21:09:30 +040028var port = flag.Int("port", 123, "Port to listen on.")
giolekvafb52e0d2020-04-23 22:52:13 +040029var dgraphGqlAddress = flag.String("graphql_address", "", "GraphQL server address.")
30var dgraphSchemaAddress = flag.String("dgraph_admin_address", "", "Dgraph server admin address.")
giolekva07f6be92020-04-16 21:09:30 +040031
giolekvac76b21b2020-04-18 19:28:43 +040032const imgJson = `{ objectPath: \"%s\"}`
giolekvafb52e0d2020-04-23 22:52:13 +040033const insertQuery = `mutation { add%s(input: [%s]) { %s { id } } }`
giolekvac76b21b2020-04-18 19:28:43 +040034const getQuery = `{ "query": "{ get%s(id: \"%s\") { id objectPath } } " }`
35
giolekvac76b21b2020-04-18 19:28:43 +040036type MinioWebhook struct {
giolekvafb52e0d2020-04-23 22:52:13 +040037 gql schema.GraphQLClient
38 pods corev1.PodInterface
giolekvac76b21b2020-04-18 19:28:43 +040039}
40
giolekvafb52e0d2020-04-23 22:52:13 +040041func (m *MinioWebhook) minioHandler(w http.ResponseWriter, r *http.Request) {
giolekva07f6be92020-04-16 21:09:30 +040042 body, err := ioutil.ReadAll(r.Body)
giolekvacba39b82020-04-18 20:56:05 +040043 if err != nil {
44 glog.Error(err)
45 http.Error(w, "Could not read HTTP request body", http.StatusInternalServerError)
46 return
47 }
giolekva07f6be92020-04-16 21:09:30 +040048 if len(body) == 0 {
49 return
50 }
giolekvac76b21b2020-04-18 19:28:43 +040051 glog.Infof("Received event from Minio: %s", string(body))
giolekvac76b21b2020-04-18 19:28:43 +040052 key, err := regogo.Get(string(body), "input.Key")
giolekva07f6be92020-04-16 21:09:30 +040053 if err != nil {
giolekvacba39b82020-04-18 20:56:05 +040054 glog.Error(err)
giolekvac76b21b2020-04-18 19:28:43 +040055 http.Error(w, "Could not find object key", http.StatusBadRequest)
giolekva07f6be92020-04-16 21:09:30 +040056 return
57 }
giolekvafb52e0d2020-04-23 22:52:13 +040058 resp, err := m.gql.RunQuery(fmt.Sprintf(
59 "mutation { addImage(input: [{objectPath: \"%s\"}]) { image { id }} }",
60 key.String()))
giolekva07f6be92020-04-16 21:09:30 +040061 if err != nil {
giolekvacba39b82020-04-18 20:56:05 +040062 glog.Error(err)
giolekvac76b21b2020-04-18 19:28:43 +040063 http.Error(w, "Can not add given objects", http.StatusInternalServerError)
giolekva07f6be92020-04-16 21:09:30 +040064 return
65 }
giolekvafb52e0d2020-04-23 22:52:13 +040066 id, err := regogo.Get(resp, "input.addImage.image[0].id")
giolekva07f6be92020-04-16 21:09:30 +040067 if err != nil {
giolekvacba39b82020-04-18 20:56:05 +040068 glog.Error(err)
giolekvac76b21b2020-04-18 19:28:43 +040069 http.Error(w, "Could not extract node id", http.StatusInternalServerError)
giolekva07f6be92020-04-16 21:09:30 +040070 return
71 }
giolekvafb52e0d2020-04-23 22:52:13 +040072 glog.Infof("New image id: %s", id.String())
73 pod := &apiv1.Pod{
74 ObjectMeta: metav1.ObjectMeta{
75 Name: fmt.Sprintf("detect-faces-%s", id.String())},
76 Spec: apiv1.PodSpec{
77 RestartPolicy: apiv1.RestartPolicyNever,
78 Containers: []apiv1.Container{{
79 Name: "detect-faces",
80 Image: "face-detector:latest",
81 ImagePullPolicy: apiv1.PullNever,
82 Command: []string{"python", "main.py"},
83 Args: []string{"http://pcloud-controller-service.pcloud.svc:1111/graphql", "http://minio-hl-svc.minio.svc:9000", id.String()}}}}}
84 glog.Info("Creating pod...")
85 result, err := m.pods.Create(context.TODO(), pod, metav1.CreateOptions{})
86 if err != nil {
87 glog.Error(err)
88 http.Error(w, "Could not start face detector", http.StatusInternalServerError)
89 return
90 }
91 glog.Infof("Created deployment %q.\n", result.GetObjectMeta().GetName())
92}
93
giolekva88d6e352020-04-30 13:32:38 +040094type query struct {
95 query string
96 operation string
97 variables string
98}
99
100func extractQuery(r *http.Request) (*query, error) {
101 if r.Method == "GET" {
102 if err := r.ParseForm(); err != nil {
103 return nil, err
104 }
105 q, ok := r.Form["query"]
106 if !ok || len(q) != 1 {
107 return nil, errors.New("Exactly one query must be provided")
108 }
109 return &query{query: q[0]}, nil
110 } else {
111 body, err := ioutil.ReadAll(r.Body)
112 if err != nil {
113 return nil, err
114 }
115 q, err := regogo.Get(string(body), "input.query")
116 if err != nil {
117 return nil, err
118 }
119 return &query{query: q.String()}, nil
120 }
121}
122
giolekvafb52e0d2020-04-23 22:52:13 +0400123func (m *MinioWebhook) graphqlHandler(w http.ResponseWriter, r *http.Request) {
124 glog.Infof("New GraphQL query received: %s", r.Method)
giolekva88d6e352020-04-30 13:32:38 +0400125 q, err := extractQuery(r)
giolekvafb52e0d2020-04-23 22:52:13 +0400126 if err != nil {
giolekva88d6e352020-04-30 13:32:38 +0400127 glog.Error(err.Error())
128 http.Error(w, "Could not extract query", http.StatusBadRequest)
giolekvafb52e0d2020-04-23 22:52:13 +0400129 }
giolekva88d6e352020-04-30 13:32:38 +0400130 resp, err := m.gql.RunQuery(q.query)
giolekvafb52e0d2020-04-23 22:52:13 +0400131 if err != nil {
132 glog.Error(err)
133 http.Error(w, err.Error(), http.StatusInternalServerError)
134 return
135 }
136 io.WriteString(w, resp)
137 w.Header().Set("Content-Type", "application/json")
138}
139
140func getKubeConfig() (*rest.Config, error) {
141 if *kubeconfig != "" {
142 return clientcmd.BuildConfigFromFlags("", *kubeconfig)
143 } else {
144 return rest.InClusterConfig()
145 }
giolekva07f6be92020-04-16 21:09:30 +0400146}
147
148func main() {
149 flag.Parse()
giolekvac76b21b2020-04-18 19:28:43 +0400150
giolekvafb52e0d2020-04-23 22:52:13 +0400151 config, err := getKubeConfig()
152 if err != nil {
153 panic(err.Error())
154 }
155 clientset, err := kubernetes.NewForConfig(config)
giolekvac76b21b2020-04-18 19:28:43 +0400156 if err != nil {
157 panic(err)
158 }
giolekvafb52e0d2020-04-23 22:52:13 +0400159 pods := clientset.CoreV1().Pods("pcloud")
160
161 gqlClient, err := schema.NewDgraphClient(
162 *dgraphGqlAddress, *dgraphSchemaAddress)
163 if err != nil {
164 panic(err)
165 }
166 // err = gqlClient.SetSchema(`
167 // type Image {
168 // id: ID!
169 // objectPath: String! @search(by: [exact])
170 // }
171
172 // type ImageSegment {
173 // id: ID!
174 // upperLeftX: Int!
175 // upperLeftY: Int!
176 // lowerRightX: Int!
177 // lowerRightY: Int!
178 // sourceImage: Image!
179 // objectPath: String
180 // }
181
182 // extend type Image {
183 // segments: [ImageSegment] @hasInverse(field: sourceImage)
184 // }`)
185 // if err != nil {
186 // panic(err)
187 // }
188 mw := MinioWebhook{gqlClient, pods}
189 http.HandleFunc("/minio_webhook", mw.minioHandler)
190 http.HandleFunc("/graphql", mw.graphqlHandler)
giolekva07f6be92020-04-16 21:09:30 +0400191 log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), nil))
192}