e2e face recognition
diff --git a/controller/main.go b/controller/main.go
index 0b23654..3c2f238 100644
--- a/controller/main.go
+++ b/controller/main.go
@@ -1,13 +1,20 @@
 package main
 
 import (
-	"bytes"
+	"context"
 	"flag"
 	"fmt"
+	"io"
 	"io/ioutil"
 	"log"
 	"net/http"
-	"strings"
+
+	apiv1 "k8s.io/api/core/v1"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/client-go/kubernetes"
+	corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
+	"k8s.io/client-go/rest"
+	"k8s.io/client-go/tools/clientcmd"
 
 	"github.com/giolekva/pcloud/controller/schema"
 
@@ -15,58 +22,22 @@
 	"github.com/itaysk/regogo"
 )
 
+var kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file.")
+
 var port = flag.Int("port", 123, "Port to listen on.")
-var graphqlAddress = flag.String("graphql_address", "", "GraphQL server address.")
-var dgraphAdminAddress = flag.String("dgraph_admin_address", "", "Dgraph server admin address.")
+var dgraphGqlAddress = flag.String("graphql_address", "", "GraphQL server address.")
+var dgraphSchemaAddress = flag.String("dgraph_admin_address", "", "Dgraph server admin address.")
 
 const imgJson = `{ objectPath: \"%s\"}`
-const insertQuery = `{ "query": "mutation { add%s(input: [%s]) { %s { id } } }" }`
+const insertQuery = `mutation { add%s(input: [%s]) { %s { id } } }`
 const getQuery = `{ "query": "{ get%s(id: \"%s\") { id objectPath } } " }`
 
-type GraphQLClient struct {
-	serverAddress string
-}
-
-func (g *GraphQLClient) Insert(typ string, obj string) (string, error) {
-	req := []byte(fmt.Sprintf(insertQuery, typ, obj, strings.ToLower(typ)))
-	glog.Info("Insering new item, mutation query:")
-	glog.Info(string(req))
-	resp, err := http.Post(g.serverAddress, "application/json", bytes.NewReader(req))
-	glog.Infof("Response status: %d", resp.StatusCode)
-	if err != nil {
-		return "", err
-	}
-	respBody, err := ioutil.ReadAll(resp.Body)
-	if err != nil {
-		return "", err
-	}
-	glog.Infof("Response: %s", string(respBody))
-	return string(respBody), nil
-}
-
-func (g *GraphQLClient) Get(typ string, id string) (string, error) {
-	req := []byte(fmt.Sprintf(getQuery, typ, id))
-	glog.Info("Getting node with query:")
-	glog.Info(string(req))
-	resp, err := http.Post(g.serverAddress, "application/json", bytes.NewReader(req))
-	glog.Infof("Response status: %d", resp.StatusCode)
-	if err != nil {
-		return "", err
-	}
-	respBody, err := ioutil.ReadAll(resp.Body)
-	if err != nil {
-		return "", err
-	}
-	glog.Info(string(respBody))
-	return string(respBody), nil
-}
-
 type MinioWebhook struct {
-	gqlClient         *GraphQLClient
-	dgraphAdminClient schema.SchemaStore
+	gql  schema.GraphQLClient
+	pods corev1.PodInterface
 }
 
-func (m *MinioWebhook) handler(w http.ResponseWriter, r *http.Request) {
+func (m *MinioWebhook) minioHandler(w http.ResponseWriter, r *http.Request) {
 	body, err := ioutil.ReadAll(r.Body)
 	if err != nil {
 		glog.Error(err)
@@ -83,48 +54,115 @@
 		http.Error(w, "Could not find object key", http.StatusBadRequest)
 		return
 	}
-	resp, err := m.gqlClient.Insert("Image", fmt.Sprintf(imgJson, key.String()))
+	resp, err := m.gql.RunQuery(fmt.Sprintf(
+		"mutation { addImage(input: [{objectPath: \"%s\"}]) { image { id }} }",
+		key.String()))
 	if err != nil {
 		glog.Error(err)
 		http.Error(w, "Can not add given objects", http.StatusInternalServerError)
 		return
 	}
-	id, err := regogo.Get(resp, "input.data.addImage.image[0].id")
+	id, err := regogo.Get(resp, "input.addImage.image[0].id")
 	if err != nil {
 		glog.Error(err)
 		http.Error(w, "Could not extract node id", http.StatusInternalServerError)
 		return
 	}
+	glog.Infof("New image id: %s", id.String())
+	pod := &apiv1.Pod{
+		ObjectMeta: metav1.ObjectMeta{
+			Name: fmt.Sprintf("detect-faces-%s", id.String())},
+		Spec: apiv1.PodSpec{
+			RestartPolicy: apiv1.RestartPolicyNever,
+			Containers: []apiv1.Container{{
+				Name:            "detect-faces",
+				Image:           "face-detector:latest",
+				ImagePullPolicy: apiv1.PullNever,
+				Command:         []string{"python", "main.py"},
+				Args:            []string{"http://pcloud-controller-service.pcloud.svc:1111/graphql", "http://minio-hl-svc.minio.svc:9000", id.String()}}}}}
+	glog.Info("Creating pod...")
+	result, err := m.pods.Create(context.TODO(), pod, metav1.CreateOptions{})
+	if err != nil {
+		glog.Error(err)
+		http.Error(w, "Could not start face detector", http.StatusInternalServerError)
+		return
+	}
+	glog.Infof("Created deployment %q.\n", result.GetObjectMeta().GetName())
+}
+
+func (m *MinioWebhook) graphqlHandler(w http.ResponseWriter, r *http.Request) {
+	glog.Infof("New GraphQL query received: %s", r.Method)
+	err := r.ParseForm()
+	if err != nil {
+		glog.Error(err)
+		http.Error(w, "Could not read query", http.StatusInternalServerError)
+		return
+	}
+	query, ok := r.Form["query"]
+	if !ok || len(query) != 1 {
+		http.Error(w, "Exactly ouery parameter must be provided", http.StatusBadRequest)
+		return
+	}
+	resp, err := m.gql.RunQuery(query[0])
+	if err != nil {
+		glog.Error(err)
+		http.Error(w, err.Error(), http.StatusInternalServerError)
+		return
+	}
+	io.WriteString(w, resp)
+	w.Header().Set("Content-Type", "application/json")
+}
+
+func getKubeConfig() (*rest.Config, error) {
+	if *kubeconfig != "" {
+		return clientcmd.BuildConfigFromFlags("", *kubeconfig)
+	} else {
+		return rest.InClusterConfig()
+	}
 }
 
 func main() {
 	flag.Parse()
-	dgraphAdminClient, err := schema.NewDgraphSchemaStore(*dgraphAdminAddress)
-	if err != nil {
-		panic(err)
-	}
-	err = dgraphAdminClient.SetSchema(`
-	type Image {
-	     id: ID!
-	     objectPath: String! @search(by: [exact])
-	     segments: [ImageSegment] @hasInverse(field: sourceImage)
-	}
 
-	type ImageSegment {
-	     id: ID!
-	     upperLeftX: Int!
-	     upperLeftY: Int!
-	     lowerRightX: Int!
-	     lowerRightY: Int!
-	     sourceImage: Image!
-	     objectPath: String
-	}`)
+	config, err := getKubeConfig()
+	if err != nil {
+		panic(err.Error())
+	}
+	clientset, err := kubernetes.NewForConfig(config)
 	if err != nil {
 		panic(err)
 	}
-	mw := MinioWebhook{
-		&GraphQLClient{*graphqlAddress},
-		nil}
-	http.HandleFunc("/minio_webhook", mw.handler)
+	pods := clientset.CoreV1().Pods("pcloud")
+
+	gqlClient, err := schema.NewDgraphClient(
+		*dgraphGqlAddress, *dgraphSchemaAddress)
+	if err != nil {
+		panic(err)
+	}
+	// err = gqlClient.SetSchema(`
+	// type Image {
+	//      id: ID!
+	//      objectPath: String! @search(by: [exact])
+	// }
+
+	// type ImageSegment {
+	//      id: ID!
+	//      upperLeftX: Int!
+	//      upperLeftY: Int!
+	//      lowerRightX: Int!
+	//      lowerRightY: Int!
+	//      sourceImage: Image!
+	//      objectPath: String
+	// }
+
+	// extend type Image {
+	//      segments: [ImageSegment] @hasInverse(field: sourceImage)
+	// }`)
+	// if err != nil {
+	// 	panic(err)
+	// }
+	mw := MinioWebhook{gqlClient, pods}
+	http.HandleFunc("/minio_webhook", mw.minioHandler)
+	http.HandleFunc("/graphql", mw.graphqlHandler)
 	log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), nil))
 }