blob: 3c2f238590bc6f81eb32009cc8d18c095c93ab7e [file] [log] [blame]
giolekva07f6be92020-04-16 21:09:30 +04001package main
2
3import (
giolekvafb52e0d2020-04-23 22:52:13 +04004 "context"
giolekva07f6be92020-04-16 21:09:30 +04005 "flag"
6 "fmt"
giolekvafb52e0d2020-04-23 22:52:13 +04007 "io"
giolekva07f6be92020-04-16 21:09:30 +04008 "io/ioutil"
9 "log"
10 "net/http"
giolekvafb52e0d2020-04-23 22:52:13 +040011
12 apiv1 "k8s.io/api/core/v1"
13 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14 "k8s.io/client-go/kubernetes"
15 corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
16 "k8s.io/client-go/rest"
17 "k8s.io/client-go/tools/clientcmd"
giolekvac76b21b2020-04-18 19:28:43 +040018
19 "github.com/giolekva/pcloud/controller/schema"
20
21 "github.com/golang/glog"
22 "github.com/itaysk/regogo"
giolekva07f6be92020-04-16 21:09:30 +040023)
24
giolekvafb52e0d2020-04-23 22:52:13 +040025var kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file.")
26
giolekva07f6be92020-04-16 21:09:30 +040027var port = flag.Int("port", 123, "Port to listen on.")
giolekvafb52e0d2020-04-23 22:52:13 +040028var dgraphGqlAddress = flag.String("graphql_address", "", "GraphQL server address.")
29var dgraphSchemaAddress = flag.String("dgraph_admin_address", "", "Dgraph server admin address.")
giolekva07f6be92020-04-16 21:09:30 +040030
giolekvac76b21b2020-04-18 19:28:43 +040031const imgJson = `{ objectPath: \"%s\"}`
giolekvafb52e0d2020-04-23 22:52:13 +040032const insertQuery = `mutation { add%s(input: [%s]) { %s { id } } }`
giolekvac76b21b2020-04-18 19:28:43 +040033const getQuery = `{ "query": "{ get%s(id: \"%s\") { id objectPath } } " }`
34
giolekvac76b21b2020-04-18 19:28:43 +040035type MinioWebhook struct {
giolekvafb52e0d2020-04-23 22:52:13 +040036 gql schema.GraphQLClient
37 pods corev1.PodInterface
giolekvac76b21b2020-04-18 19:28:43 +040038}
39
giolekvafb52e0d2020-04-23 22:52:13 +040040func (m *MinioWebhook) minioHandler(w http.ResponseWriter, r *http.Request) {
giolekva07f6be92020-04-16 21:09:30 +040041 body, err := ioutil.ReadAll(r.Body)
giolekvacba39b82020-04-18 20:56:05 +040042 if err != nil {
43 glog.Error(err)
44 http.Error(w, "Could not read HTTP request body", http.StatusInternalServerError)
45 return
46 }
giolekva07f6be92020-04-16 21:09:30 +040047 if len(body) == 0 {
48 return
49 }
giolekvac76b21b2020-04-18 19:28:43 +040050 glog.Infof("Received event from Minio: %s", string(body))
giolekvac76b21b2020-04-18 19:28:43 +040051 key, err := regogo.Get(string(body), "input.Key")
giolekva07f6be92020-04-16 21:09:30 +040052 if err != nil {
giolekvacba39b82020-04-18 20:56:05 +040053 glog.Error(err)
giolekvac76b21b2020-04-18 19:28:43 +040054 http.Error(w, "Could not find object key", http.StatusBadRequest)
giolekva07f6be92020-04-16 21:09:30 +040055 return
56 }
giolekvafb52e0d2020-04-23 22:52:13 +040057 resp, err := m.gql.RunQuery(fmt.Sprintf(
58 "mutation { addImage(input: [{objectPath: \"%s\"}]) { image { id }} }",
59 key.String()))
giolekva07f6be92020-04-16 21:09:30 +040060 if err != nil {
giolekvacba39b82020-04-18 20:56:05 +040061 glog.Error(err)
giolekvac76b21b2020-04-18 19:28:43 +040062 http.Error(w, "Can not add given objects", http.StatusInternalServerError)
giolekva07f6be92020-04-16 21:09:30 +040063 return
64 }
giolekvafb52e0d2020-04-23 22:52:13 +040065 id, err := regogo.Get(resp, "input.addImage.image[0].id")
giolekva07f6be92020-04-16 21:09:30 +040066 if err != nil {
giolekvacba39b82020-04-18 20:56:05 +040067 glog.Error(err)
giolekvac76b21b2020-04-18 19:28:43 +040068 http.Error(w, "Could not extract node id", http.StatusInternalServerError)
giolekva07f6be92020-04-16 21:09:30 +040069 return
70 }
giolekvafb52e0d2020-04-23 22:52:13 +040071 glog.Infof("New image id: %s", id.String())
72 pod := &apiv1.Pod{
73 ObjectMeta: metav1.ObjectMeta{
74 Name: fmt.Sprintf("detect-faces-%s", id.String())},
75 Spec: apiv1.PodSpec{
76 RestartPolicy: apiv1.RestartPolicyNever,
77 Containers: []apiv1.Container{{
78 Name: "detect-faces",
79 Image: "face-detector:latest",
80 ImagePullPolicy: apiv1.PullNever,
81 Command: []string{"python", "main.py"},
82 Args: []string{"http://pcloud-controller-service.pcloud.svc:1111/graphql", "http://minio-hl-svc.minio.svc:9000", id.String()}}}}}
83 glog.Info("Creating pod...")
84 result, err := m.pods.Create(context.TODO(), pod, metav1.CreateOptions{})
85 if err != nil {
86 glog.Error(err)
87 http.Error(w, "Could not start face detector", http.StatusInternalServerError)
88 return
89 }
90 glog.Infof("Created deployment %q.\n", result.GetObjectMeta().GetName())
91}
92
93func (m *MinioWebhook) graphqlHandler(w http.ResponseWriter, r *http.Request) {
94 glog.Infof("New GraphQL query received: %s", r.Method)
95 err := r.ParseForm()
96 if err != nil {
97 glog.Error(err)
98 http.Error(w, "Could not read query", http.StatusInternalServerError)
99 return
100 }
101 query, ok := r.Form["query"]
102 if !ok || len(query) != 1 {
103 http.Error(w, "Exactly ouery parameter must be provided", http.StatusBadRequest)
104 return
105 }
106 resp, err := m.gql.RunQuery(query[0])
107 if err != nil {
108 glog.Error(err)
109 http.Error(w, err.Error(), http.StatusInternalServerError)
110 return
111 }
112 io.WriteString(w, resp)
113 w.Header().Set("Content-Type", "application/json")
114}
115
116func getKubeConfig() (*rest.Config, error) {
117 if *kubeconfig != "" {
118 return clientcmd.BuildConfigFromFlags("", *kubeconfig)
119 } else {
120 return rest.InClusterConfig()
121 }
giolekva07f6be92020-04-16 21:09:30 +0400122}
123
124func main() {
125 flag.Parse()
giolekvac76b21b2020-04-18 19:28:43 +0400126
giolekvafb52e0d2020-04-23 22:52:13 +0400127 config, err := getKubeConfig()
128 if err != nil {
129 panic(err.Error())
130 }
131 clientset, err := kubernetes.NewForConfig(config)
giolekvac76b21b2020-04-18 19:28:43 +0400132 if err != nil {
133 panic(err)
134 }
giolekvafb52e0d2020-04-23 22:52:13 +0400135 pods := clientset.CoreV1().Pods("pcloud")
136
137 gqlClient, err := schema.NewDgraphClient(
138 *dgraphGqlAddress, *dgraphSchemaAddress)
139 if err != nil {
140 panic(err)
141 }
142 // err = gqlClient.SetSchema(`
143 // type Image {
144 // id: ID!
145 // objectPath: String! @search(by: [exact])
146 // }
147
148 // type ImageSegment {
149 // id: ID!
150 // upperLeftX: Int!
151 // upperLeftY: Int!
152 // lowerRightX: Int!
153 // lowerRightY: Int!
154 // sourceImage: Image!
155 // objectPath: String
156 // }
157
158 // extend type Image {
159 // segments: [ImageSegment] @hasInverse(field: sourceImage)
160 // }`)
161 // if err != nil {
162 // panic(err)
163 // }
164 mw := MinioWebhook{gqlClient, pods}
165 http.HandleFunc("/minio_webhook", mw.minioHandler)
166 http.HandleFunc("/graphql", mw.graphqlHandler)
giolekva07f6be92020-04-16 21:09:30 +0400167 log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), nil))
168}