e2e face recognition
diff --git a/controller/main.go b/controller/main.go
index 0b23654..3c2f238 100644
--- a/controller/main.go
+++ b/controller/main.go
@@ -1,13 +1,20 @@
package main
import (
- "bytes"
+ "context"
"flag"
"fmt"
+ "io"
"io/ioutil"
"log"
"net/http"
- "strings"
+
+ apiv1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/kubernetes"
+ corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
+ "k8s.io/client-go/rest"
+ "k8s.io/client-go/tools/clientcmd"
"github.com/giolekva/pcloud/controller/schema"
@@ -15,58 +22,22 @@
"github.com/itaysk/regogo"
)
+var kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file.")
+
var port = flag.Int("port", 123, "Port to listen on.")
-var graphqlAddress = flag.String("graphql_address", "", "GraphQL server address.")
-var dgraphAdminAddress = flag.String("dgraph_admin_address", "", "Dgraph server admin address.")
+var dgraphGqlAddress = flag.String("graphql_address", "", "GraphQL server address.")
+var dgraphSchemaAddress = flag.String("dgraph_admin_address", "", "Dgraph server admin address.")
const imgJson = `{ objectPath: \"%s\"}`
-const insertQuery = `{ "query": "mutation { add%s(input: [%s]) { %s { id } } }" }`
+const insertQuery = `mutation { add%s(input: [%s]) { %s { id } } }`
const getQuery = `{ "query": "{ get%s(id: \"%s\") { id objectPath } } " }`
-type GraphQLClient struct {
- serverAddress string
-}
-
-func (g *GraphQLClient) Insert(typ string, obj string) (string, error) {
- req := []byte(fmt.Sprintf(insertQuery, typ, obj, strings.ToLower(typ)))
- glog.Info("Insering new item, mutation query:")
- glog.Info(string(req))
- resp, err := http.Post(g.serverAddress, "application/json", bytes.NewReader(req))
- glog.Infof("Response status: %d", resp.StatusCode)
- if err != nil {
- return "", err
- }
- respBody, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return "", err
- }
- glog.Infof("Response: %s", string(respBody))
- return string(respBody), nil
-}
-
-func (g *GraphQLClient) Get(typ string, id string) (string, error) {
- req := []byte(fmt.Sprintf(getQuery, typ, id))
- glog.Info("Getting node with query:")
- glog.Info(string(req))
- resp, err := http.Post(g.serverAddress, "application/json", bytes.NewReader(req))
- glog.Infof("Response status: %d", resp.StatusCode)
- if err != nil {
- return "", err
- }
- respBody, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return "", err
- }
- glog.Info(string(respBody))
- return string(respBody), nil
-}
-
type MinioWebhook struct {
- gqlClient *GraphQLClient
- dgraphAdminClient schema.SchemaStore
+ gql schema.GraphQLClient
+ pods corev1.PodInterface
}
-func (m *MinioWebhook) handler(w http.ResponseWriter, r *http.Request) {
+func (m *MinioWebhook) minioHandler(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
glog.Error(err)
@@ -83,48 +54,115 @@
http.Error(w, "Could not find object key", http.StatusBadRequest)
return
}
- resp, err := m.gqlClient.Insert("Image", fmt.Sprintf(imgJson, key.String()))
+ resp, err := m.gql.RunQuery(fmt.Sprintf(
+ "mutation { addImage(input: [{objectPath: \"%s\"}]) { image { id }} }",
+ key.String()))
if err != nil {
glog.Error(err)
http.Error(w, "Can not add given objects", http.StatusInternalServerError)
return
}
- id, err := regogo.Get(resp, "input.data.addImage.image[0].id")
+ id, err := regogo.Get(resp, "input.addImage.image[0].id")
if err != nil {
glog.Error(err)
http.Error(w, "Could not extract node id", http.StatusInternalServerError)
return
}
+ glog.Infof("New image id: %s", id.String())
+ pod := &apiv1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: fmt.Sprintf("detect-faces-%s", id.String())},
+ Spec: apiv1.PodSpec{
+ RestartPolicy: apiv1.RestartPolicyNever,
+ Containers: []apiv1.Container{{
+ Name: "detect-faces",
+ Image: "face-detector:latest",
+ ImagePullPolicy: apiv1.PullNever,
+ Command: []string{"python", "main.py"},
+ Args: []string{"http://pcloud-controller-service.pcloud.svc:1111/graphql", "http://minio-hl-svc.minio.svc:9000", id.String()}}}}}
+ glog.Info("Creating pod...")
+ result, err := m.pods.Create(context.TODO(), pod, metav1.CreateOptions{})
+ if err != nil {
+ glog.Error(err)
+ http.Error(w, "Could not start face detector", http.StatusInternalServerError)
+ return
+ }
+ glog.Infof("Created deployment %q.\n", result.GetObjectMeta().GetName())
+}
+
+func (m *MinioWebhook) graphqlHandler(w http.ResponseWriter, r *http.Request) {
+ glog.Infof("New GraphQL query received: %s", r.Method)
+ err := r.ParseForm()
+ if err != nil {
+ glog.Error(err)
+ http.Error(w, "Could not read query", http.StatusInternalServerError)
+ return
+ }
+ query, ok := r.Form["query"]
+ if !ok || len(query) != 1 {
+ http.Error(w, "Exactly ouery parameter must be provided", http.StatusBadRequest)
+ return
+ }
+ resp, err := m.gql.RunQuery(query[0])
+ if err != nil {
+ glog.Error(err)
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ io.WriteString(w, resp)
+ w.Header().Set("Content-Type", "application/json")
+}
+
+func getKubeConfig() (*rest.Config, error) {
+ if *kubeconfig != "" {
+ return clientcmd.BuildConfigFromFlags("", *kubeconfig)
+ } else {
+ return rest.InClusterConfig()
+ }
}
func main() {
flag.Parse()
- dgraphAdminClient, err := schema.NewDgraphSchemaStore(*dgraphAdminAddress)
- if err != nil {
- panic(err)
- }
- err = dgraphAdminClient.SetSchema(`
- type Image {
- id: ID!
- objectPath: String! @search(by: [exact])
- segments: [ImageSegment] @hasInverse(field: sourceImage)
- }
- type ImageSegment {
- id: ID!
- upperLeftX: Int!
- upperLeftY: Int!
- lowerRightX: Int!
- lowerRightY: Int!
- sourceImage: Image!
- objectPath: String
- }`)
+ config, err := getKubeConfig()
+ if err != nil {
+ panic(err.Error())
+ }
+ clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
- mw := MinioWebhook{
- &GraphQLClient{*graphqlAddress},
- nil}
- http.HandleFunc("/minio_webhook", mw.handler)
+ pods := clientset.CoreV1().Pods("pcloud")
+
+ gqlClient, err := schema.NewDgraphClient(
+ *dgraphGqlAddress, *dgraphSchemaAddress)
+ if err != nil {
+ panic(err)
+ }
+ // err = gqlClient.SetSchema(`
+ // type Image {
+ // id: ID!
+ // objectPath: String! @search(by: [exact])
+ // }
+
+ // type ImageSegment {
+ // id: ID!
+ // upperLeftX: Int!
+ // upperLeftY: Int!
+ // lowerRightX: Int!
+ // lowerRightY: Int!
+ // sourceImage: Image!
+ // objectPath: String
+ // }
+
+ // extend type Image {
+ // segments: [ImageSegment] @hasInverse(field: sourceImage)
+ // }`)
+ // if err != nil {
+ // panic(err)
+ // }
+ mw := MinioWebhook{gqlClient, pods}
+ http.HandleFunc("/minio_webhook", mw.minioHandler)
+ http.HandleFunc("/graphql", mw.graphqlHandler)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), nil))
}