event-processor: integrade with GQL api
diff --git a/controller/schema/dgraph_schema_store.go b/controller/schema/dgraph_schema_store.go
index 941a16b..304ee5d 100644
--- a/controller/schema/dgraph_schema_store.go
+++ b/controller/schema/dgraph_schema_store.go
@@ -27,7 +27,7 @@
 const eventTmpl = `
 type %sEvent {
   id: ID!
-  state: EventState!
+  state: EventState! @search(by: [exact])
   node: %s! @hasInverse(field: events)
 }
 
diff --git a/events/client.go b/events/client.go
index afc4d15..81dfdf1 100644
--- a/events/client.go
+++ b/events/client.go
@@ -1,11 +1,39 @@
 package events
 
+import (
+	"bytes"
+	"encoding/json"
+	// "errors"
+	// "fmt"
+	"io/ioutil"
+	"net/http"
+
+	"github.com/golang/glog"
+)
+
+var jsonContentType = "application/json"
+
 type query struct {
-	query     string
-	operation string
-	variables string
+	Query string `json:"query"`
 }
 
+var getAllNewImageEventsTmpl = `{
+  queryImageEvent(filter: {
+    state: {
+      eq: NEW
+      le: NEW
+      lt: NEW
+      ge: NEW
+      gt: NEW
+    }
+  }) {
+    id
+    node {
+      id
+    }
+  }
+}`
+
 // Implements EventStore
 type GraphQLClient struct {
 	apiAddr string
@@ -15,6 +43,61 @@
 	return &GraphQLClient{apiAddr}
 }
 
+type location struct {
+	Line   int `json:"line"`
+	Column int `json:"column"`
+}
+
+type gqlError struct {
+	Message   string     `json:"message"`
+	Locations []location `json:"location"`
+}
+
+type gqlNode struct {
+	Id string `json:"id"`
+}
+
+type gqlEvent struct {
+	Id    string  `json:"id"`
+	State string  `json:"state"`
+	Node  gqlNode `json:"node"`
+}
+
+type gqlData struct {
+	Events []gqlEvent `json:"queryImageEvent"`
+}
+
+type queryResp struct {
+	Errors []gqlError `json:"errors"`
+	Data   gqlData    `json:"data"`
+}
+
 func (c *GraphQLClient) GetEventsInState(state EventState) ([]Event, error) {
-	return nil, nil
+	q := query{getAllNewImageEventsTmpl}
+	qJson, err := json.Marshal(q)
+	if err != nil {
+		return nil, err
+	}
+	resp, err := http.Post(c.apiAddr, jsonContentType, bytes.NewReader(qJson))
+	if err != nil {
+		return nil, err
+	}
+	respBody, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return nil, err
+	}
+	glog.Info(string(respBody))
+	var gqlResp gqlData
+	err = json.Unmarshal(respBody, &gqlResp)
+	if err != nil {
+		return nil, err
+	}
+	// if len(gqlResp.Errors) != 0 {
+	// 	return nil, errors.New(fmt.Sprintf("%v", gqlResp.Errors))
+	// }
+	var events []Event
+	for _, e := range gqlResp.Events {
+		events = append(events, Event{e.Id, EventStateNew, e.Node.Id})
+	}
+	return events, nil
 }
diff --git a/events/cmd/main.go b/events/cmd/main.go
index 09db5a9..5a20c26 100644
--- a/events/cmd/main.go
+++ b/events/cmd/main.go
@@ -36,6 +36,6 @@
 	}
 	pods := clientset.CoreV1().Pods("default")
 	eventStore := events.NewGraphQLClient(*apiAddr)
-	go events.NewSingleEventAtATimeProcessor(
+	events.NewSingleEventAtATimeProcessor(
 		eventStore, pods, *apiAddr, *objectStoreAddr).Start()
 }
diff --git a/events/processor.go b/events/processor.go
index 884d9e3..d4bbc6c 100644
--- a/events/processor.go
+++ b/events/processor.go
@@ -48,6 +48,7 @@
 		glog.Infof("Pod created: %s", resp)
 		// TODO(giolekva): do not ignore error
 		_ = monitorPod(resp, p.pods)
+		break
 	}
 }
 
@@ -82,7 +83,7 @@
 		ObjectMeta: metav1.ObjectMeta{
 			Name: fmt.Sprintf("event-%s", id)},
 		Spec: apiv1.PodSpec{
-			RestartPolicy: apiv1.RestartPolicyNever,
+			RestartPolicy: apiv1.RestartPolicyAlways,
 			Containers: []apiv1.Container{{
 				Name:            "event",
 				Image:           "giolekva/face-detector:latest",