evet-processor: mark event as done once pod finishes
diff --git a/apps/photos-ui/static/photos.js b/apps/photos-ui/static/photos.js
index 6f46a19..dac17f9 100644
--- a/apps/photos-ui/static/photos.js
+++ b/apps/photos-ui/static/photos.js
@@ -35,7 +35,7 @@
imgs = await fetchAllPhotos();
img_list = "<ul>";
for (img of imgs) {
- img_list += "<li><a href='/photo?id=" + img.id + "'><img style='max-width: 300px' src='http://localhost:9000/" + img.objectPath + "' /></a></li>";
+ img_list += "<li><a href='/photo?id=" + img.id + "'><img style='max-width: 300px' src='http://minio/" + img.objectPath + "' /></a></li>";
}
img_list += "</ul>";
document.getElementById(gallery_elem_id).innerHTML = img_list;
@@ -43,7 +43,7 @@
async function initImg(img_elem_id, id) {
img = await fetchImage(id);
- document.getElementById(img_elem_id).setAttribute("src", "http://localhost:9000/" + img.objectPath);
+ document.getElementById(img_elem_id).setAttribute("src", "http://minio/" + img.objectPath);
}
async function drawFaces(photo_elem_id, faces_canvas_elem_id, id){
diff --git a/controller/schema/dgraph_schema_store.go b/controller/schema/dgraph_schema_store.go
index a9add2e..1680af0 100644
--- a/controller/schema/dgraph_schema_store.go
+++ b/controller/schema/dgraph_schema_store.go
@@ -269,9 +269,13 @@
for _, c := range v.Children {
rewriteValue(c.Value, s)
}
+ // TODO(giolekva): explicitly get input argument and rewrite only that part.
if v.Definition.Kind == ast.InputObject &&
!strings.HasSuffix(v.Definition.Name, "Event") &&
- !strings.HasSuffix(v.Definition.Name, "Ref") {
+ !strings.HasSuffix(v.Definition.Name, "EventInput") &&
+ !strings.HasSuffix(v.Definition.Name, "Ref") &&
+ !strings.HasSuffix(v.Definition.Name, "Filter") &&
+ !strings.HasSuffix(v.Definition.Name, "Patch") {
v.Children = append(v.Children, newEventListValue(v.Definition, s))
}
}
diff --git a/events/client.go b/events/client.go
index 81dfdf1..d5852e7 100644
--- a/events/client.go
+++ b/events/client.go
@@ -4,7 +4,7 @@
"bytes"
"encoding/json"
// "errors"
- // "fmt"
+ "fmt"
"io/ioutil"
"net/http"
@@ -34,6 +34,19 @@
}
}`
+var markEventDoneTmpl = `mutation {
+ updateImageEvent(input: {
+ filter: {
+ id: ["%s"]
+ },
+ set: {
+ state: DONE
+ }
+ }) {
+ numUids
+ }
+}`
+
// Implements EventStore
type GraphQLClient struct {
apiAddr string
@@ -101,3 +114,17 @@
}
return events, nil
}
+
+func (c *GraphQLClient) MarkEventDone(event Event) error {
+ q := query{fmt.Sprintf(markEventDoneTmpl, event.Id)}
+ qJson, err := json.Marshal(q)
+ if err != nil {
+ return err
+ }
+ _, err = http.Post(c.apiAddr, jsonContentType, bytes.NewReader(qJson))
+ if err != nil {
+ return err
+ }
+ // TODO(giolekva): check errors field
+ return nil
+}
diff --git a/events/event.go b/events/event.go
index 427427b..b17fab0 100644
--- a/events/event.go
+++ b/events/event.go
@@ -2,6 +2,7 @@
type EventState string
+// TODO(giolekva): add FAILED
const (
EventStateNew EventState = "NEW"
EventStateProcessing EventState = "PROCESSING"
@@ -16,4 +17,5 @@
type EventStore interface {
GetEventsInState(state EventState) ([]Event, error)
+ MarkEventDone(event Event) error
}
diff --git a/events/processor.go b/events/processor.go
index 3d4079c..89db4d5 100644
--- a/events/processor.go
+++ b/events/processor.go
@@ -3,6 +3,7 @@
import (
"context"
"fmt"
+ "time"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -18,9 +19,10 @@
// Implements processor
type singleEventAtATimeProcessor struct {
- store EventStore
- pods corev1.PodInterface
- pcloudApi string
+ store EventStore
+ pods corev1.PodInterface
+ pcloudApi string
+ // TODO(giolekva): Nodes themselves should be associated with object store
objectStoreApi string
}
@@ -33,22 +35,29 @@
func (p *singleEventAtATimeProcessor) Start() {
for {
- events, err := p.store.GetEventsInState(EventStateNew)
- if err != nil {
- glog.Error(err)
- continue
+ select {
+ case <-time.After(30 * time.Second):
+ events, err := p.store.GetEventsInState(EventStateNew)
+ if err != nil {
+ glog.Error(err)
+ continue
+ }
+ if len(events) == 0 {
+ continue
+ }
+ event := events[0]
+ pod := createPod(event.NodeId, p.pcloudApi, p.objectStoreApi)
+ glog.Info("Creating pod...")
+ resp, err := p.pods.Create(context.TODO(), pod, metav1.CreateOptions{})
+ if err != nil {
+ glog.Error(resp)
+ continue
+ }
+ glog.Infof("Pod created: %s", resp)
+ // TODO(giolekva): do not ignore error
+ _ = monitorPod(resp, p.pods)
+ p.store.MarkEventDone(event)
}
- pod := createPod(events[0].NodeId, p.pcloudApi, p.objectStoreApi)
- glog.Info("Creating pod...")
- resp, err := p.pods.Create(context.TODO(), pod, metav1.CreateOptions{})
- if err != nil {
- glog.Error(resp)
- continue
- }
- glog.Infof("Pod created: %s", resp)
- // TODO(giolekva): do not ignore error
- _ = monitorPod(resp, p.pods)
- break
}
}
@@ -69,8 +78,9 @@
return nil
}
p := events.Object.(*apiv1.Pod)
- fmt.Println("Pod status:", pod.Status.Phase)
+ glog.Infof("Pod status: %s", pod.Status.Phase)
if isInTerminalState(p) {
+ glog.Info("Pod is DONE")
w.Stop()
}
}
@@ -88,6 +98,6 @@
Name: "event",
Image: "giolekva/face-detector:latest",
ImagePullPolicy: apiv1.PullAlways,
- Command: []string{"python", "main.py"},
+ Command: []string{"python3", "main.py"},
Args: []string{pcloudApi, objectStoreApi, id}}}}}
}