event-processor: integrade with GQL api
diff --git a/controller/schema/dgraph_schema_store.go b/controller/schema/dgraph_schema_store.go
index 941a16b..304ee5d 100644
--- a/controller/schema/dgraph_schema_store.go
+++ b/controller/schema/dgraph_schema_store.go
@@ -27,7 +27,7 @@
const eventTmpl = `
type %sEvent {
id: ID!
- state: EventState!
+ state: EventState! @search(by: [exact])
node: %s! @hasInverse(field: events)
}
diff --git a/events/client.go b/events/client.go
index afc4d15..81dfdf1 100644
--- a/events/client.go
+++ b/events/client.go
@@ -1,11 +1,39 @@
package events
+import (
+ "bytes"
+ "encoding/json"
+ // "errors"
+ // "fmt"
+ "io/ioutil"
+ "net/http"
+
+ "github.com/golang/glog"
+)
+
+var jsonContentType = "application/json"
+
type query struct {
- query string
- operation string
- variables string
+ Query string `json:"query"`
}
+var getAllNewImageEventsTmpl = `{
+ queryImageEvent(filter: {
+ state: {
+ eq: NEW
+ le: NEW
+ lt: NEW
+ ge: NEW
+ gt: NEW
+ }
+ }) {
+ id
+ node {
+ id
+ }
+ }
+}`
+
// Implements EventStore
type GraphQLClient struct {
apiAddr string
@@ -15,6 +43,61 @@
return &GraphQLClient{apiAddr}
}
+type location struct {
+ Line int `json:"line"`
+ Column int `json:"column"`
+}
+
+type gqlError struct {
+ Message string `json:"message"`
+ Locations []location `json:"location"`
+}
+
+type gqlNode struct {
+ Id string `json:"id"`
+}
+
+type gqlEvent struct {
+ Id string `json:"id"`
+ State string `json:"state"`
+ Node gqlNode `json:"node"`
+}
+
+type gqlData struct {
+ Events []gqlEvent `json:"queryImageEvent"`
+}
+
+type queryResp struct {
+ Errors []gqlError `json:"errors"`
+ Data gqlData `json:"data"`
+}
+
func (c *GraphQLClient) GetEventsInState(state EventState) ([]Event, error) {
- return nil, nil
+ q := query{getAllNewImageEventsTmpl}
+ qJson, err := json.Marshal(q)
+ if err != nil {
+ return nil, err
+ }
+ resp, err := http.Post(c.apiAddr, jsonContentType, bytes.NewReader(qJson))
+ if err != nil {
+ return nil, err
+ }
+ respBody, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return nil, err
+ }
+ glog.Info(string(respBody))
+ var gqlResp gqlData
+ err = json.Unmarshal(respBody, &gqlResp)
+ if err != nil {
+ return nil, err
+ }
+ // if len(gqlResp.Errors) != 0 {
+ // return nil, errors.New(fmt.Sprintf("%v", gqlResp.Errors))
+ // }
+ var events []Event
+ for _, e := range gqlResp.Events {
+ events = append(events, Event{e.Id, EventStateNew, e.Node.Id})
+ }
+ return events, nil
}
diff --git a/events/cmd/main.go b/events/cmd/main.go
index 09db5a9..5a20c26 100644
--- a/events/cmd/main.go
+++ b/events/cmd/main.go
@@ -36,6 +36,6 @@
}
pods := clientset.CoreV1().Pods("default")
eventStore := events.NewGraphQLClient(*apiAddr)
- go events.NewSingleEventAtATimeProcessor(
+ events.NewSingleEventAtATimeProcessor(
eventStore, pods, *apiAddr, *objectStoreAddr).Start()
}
diff --git a/events/processor.go b/events/processor.go
index 884d9e3..d4bbc6c 100644
--- a/events/processor.go
+++ b/events/processor.go
@@ -48,6 +48,7 @@
glog.Infof("Pod created: %s", resp)
// TODO(giolekva): do not ignore error
_ = monitorPod(resp, p.pods)
+ break
}
}
@@ -82,7 +83,7 @@
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("event-%s", id)},
Spec: apiv1.PodSpec{
- RestartPolicy: apiv1.RestartPolicyNever,
+ RestartPolicy: apiv1.RestartPolicyAlways,
Containers: []apiv1.Container{{
Name: "event",
Image: "giolekva/face-detector:latest",