process dgraph schema
diff --git "a/controller/.\043Dockerfile" "b/controller/.\043Dockerfile"
deleted file mode 120000
index ab8b67a..0000000
--- "a/controller/.\043Dockerfile"
+++ /dev/null
@@ -1 +0,0 @@
-lekva@gl.local.21823
\ No newline at end of file
diff --git a/controller/Dockerfile b/controller/Dockerfile
index 125fb4a..4bba368 100644
--- a/controller/Dockerfile
+++ b/controller/Dockerfile
@@ -26,5 +26,8 @@
RUN go get -u github.com/golang/protobuf/protoc-gen-go
RUN go get -u google.golang.org/protobuf/encoding/prototext
RUN go get -u github.com/google/uuid
+RUN go get -u github.com/itaysk/regogo
+RUN go get -u github.com/vektah/gqlparser
+RUN go get -u github.com/golang/glog
WORKDIR /src/go/src/github.com/giolekva/pcloud/controller
diff --git a/controller/controller.yaml b/controller/controller.yaml
index c593a30..0055636 100644
--- a/controller/controller.yaml
+++ b/controller/controller.yaml
@@ -36,7 +36,7 @@
- name: code
mountPath: /src/go/src/github.com/giolekva/pcloud/controller
command: ["/bin/sh", "-c"]
- args: ["go run main.go --port=1234 --graphql_address=http://dgraph-public.default.svc:8080/graphql"]
+ args: ["go run main.go --port=1234 --graphql_address=http://dgraph-public.default.svc:8080/graphql --dgraph_admin_address=http://dgraph-public.default.svc:8080/admin --logtostderr"]
volumes:
- name: code
hostPath:
diff --git a/controller/main.go b/controller/main.go
index 49fda14..bd37cac 100644
--- a/controller/main.go
+++ b/controller/main.go
@@ -2,58 +2,130 @@
import (
"bytes"
- "encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
+ "strings"
+
+ "github.com/giolekva/pcloud/controller/schema"
+
+ "github.com/golang/glog"
+ "github.com/itaysk/regogo"
)
var port = flag.Int("port", 123, "Port to listen on.")
-var graphql_address = flag.String("graphql_address", "", "GraphQL server address.")
+var graphqlAddress = flag.String("graphql_address", "", "GraphQL server address.")
+var dgraphAdminAddress = flag.String("dgraph_admin_address", "", "Dgraph server admin address.")
-func minio_webhook_handler(w http.ResponseWriter, r *http.Request) {
+const imgJson = `{ objectPath: \"%s\"}`
+const insertQuery = `{ "query": "mutation { add%s(input: [%s]) { %s { id } } }" }`
+const getQuery = `{ "query": "{ get%s(id: \"%s\") { id objectPath } } " }`
+
+type GraphQLClient struct {
+ serverAddress string
+}
+
+func (g *GraphQLClient) Insert(typ string, obj string) (string, error) {
+ req := []byte(fmt.Sprintf(insertQuery, typ, obj, strings.ToLower(typ)))
+ glog.Info("Insering new item, mutation query:")
+ glog.Info(string(req))
+ resp, err := http.Post(g.serverAddress, "application/json", bytes.NewReader(req))
+ glog.Infof("Response status: %d", resp.StatusCode)
+ if err != nil {
+ return "", err
+ }
+ respBody, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return "", err
+ }
+ glog.Infof("Response: %s", string(respBody))
+ return string(respBody), nil
+}
+
+func (g *GraphQLClient) Get(typ string, id string) (string, error) {
+ req := []byte(fmt.Sprintf(insertQuery, typ, id, strings.ToLower(typ)))
+ glog.Info("Getting node with query:")
+ glog.Info(string(req))
+ resp, err := http.Post(g.serverAddress, "application/json", bytes.NewReader(req))
+ glog.Infof("Response status: %s", resp.StatusCode)
+ if err != nil {
+ return "", err
+ }
+ respBody, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return "", err
+ }
+ glog.Info(string(respBody))
+ return string(respBody), nil
+}
+
+type MinioWebhook struct {
+ gqlClient *GraphQLClient
+ dgraphAdminClient schema.SchemaStore
+}
+
+func (m *MinioWebhook) handler(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if len(body) == 0 {
return
}
- log.Print(string(body))
+ glog.Infof("Received event from Minio: %s", string(body))
if err != nil {
- log.Print("-----")
- log.Print(err)
http.Error(w, "Could not read HTTP request body", http.StatusInternalServerError)
return
}
- event := make(map[string]interface{})
- err = json.Unmarshal(body, &event)
+ key, err := regogo.Get(string(body), "input.Key")
if err != nil {
- log.Print("++++++")
- log.Print(err)
- http.Error(w, "Could not parse Event JSON object", http.StatusBadRequest)
+ http.Error(w, "Could not find object key", http.StatusBadRequest)
return
}
- buf := []byte("{ \"query\": \"mutation { addImage(input: [{ objectPath: \\\"" + event["Key"].(string) + "\\\"}]) { image { id } }} \" }")
- log.Print(string(buf))
- resp, err := http.Post(*graphql_address, "application/json", bytes.NewReader(buf))
+ resp, err := m.gqlClient.Insert("Image", fmt.Sprintf(imgJson, key.String()))
if err != nil {
- log.Print("#######")
- log.Print(err)
- http.Error(w, "Could not post to GraphQL", http.StatusInternalServerError)
+ http.Error(w, "Can not add given objects", http.StatusInternalServerError)
return
}
- body, err = ioutil.ReadAll(resp.Body)
+ id, err := regogo.Get(resp, "input.data.addImage.image[0]id")
if err != nil {
- log.Print("@@@@@@")
- log.Print(err)
- http.Error(w, "Could not parse GraphQL response", http.StatusInternalServerError)
+ http.Error(w, "Could not extract node id", http.StatusInternalServerError)
return
}
- log.Print(string(body))
+ resp, err = m.gqlClient.Get("Image", id.String())
+ if err != nil {
+ http.Error(w, "Could not fetch node", http.StatusInternalServerError)
+ return
+ }
}
func main() {
flag.Parse()
- http.HandleFunc("/minio_webhook", minio_webhook_handler)
+ dgraphAdminClient, err := schema.NewDgraphSchemaStore(*dgraphAdminAddress)
+ if err != nil {
+ panic(err)
+ }
+ err = dgraphAdminClient.SetSchema(`
+ type Image {
+ id: ID!
+ objectPath: String! @search(by: [exact])
+ segments: [ImageSegment] @hasInverse(field: sourceImage)
+ }
+
+ type ImageSegment {
+ id: ID!
+ upperLeftX: Int!
+ upperLeftY: Int!
+ lowerRightX: Int!
+ lowerRightY: Int!
+ sourceImage: Image!
+ objectPath: String
+ }`)
+ if err != nil {
+ panic(err)
+ }
+ mw := MinioWebhook{
+ &GraphQLClient{*graphqlAddress},
+ nil}
+ http.HandleFunc("/minio_webhook", mw.handler)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), nil))
}
diff --git a/controller/samples.gql b/controller/samples.gql
new file mode 100644
index 0000000..3aa0b65
--- /dev/null
+++ b/controller/samples.gql
@@ -0,0 +1,113 @@
+{
+ __drop
+}
+
+# query {
+# queryImage() {
+# id
+# objectPath
+# }
+# }
+
+# query {
+# queryFoo(filter: {
+# name: {
+# anyofterms: "foo"
+# }
+# }) {
+# id
+# name
+# bar {
+# id
+# }
+# }
+# }
+
+
+# mutation {
+# addA(input: [{
+# name: "dev"
+# }]) {
+# numUids
+# a {
+# id
+# name
+# }
+# }
+# }
+
+# mutation {
+# updateB(input: {
+# filter: {
+# id: ["0x4"]
+# },
+# set: {
+# a: [{id: "0x5"}]
+# }
+# }) {
+# numUids
+# b {
+# id
+# name
+# a {
+# id
+# name
+# b {
+# id
+# name
+# }
+# }
+# }
+# }
+# }
+
+
+# mutation {
+# addB(input:[{
+# name: "bar"
+# a: [{
+# id: "0x3"
+# }]
+# }]) {
+# numUids
+# b {
+# id
+# name
+# a {
+# id
+# name
+# b {
+# id
+# name
+# }
+# }
+# }
+# }
+# }
+
+# mutation {
+# deleteImage(
+# filter: {
+# id: {
+# gt: 0
+# }
+# }
+# ) {
+# msg
+# numUids
+# }
+# }
+
+
+
+# mutation {
+# addImage(input: [{
+# id: 2,
+# objectPath: "foo/bar.jpg"
+# }, {
+# id:3
+# objectPath: "qwe/qwe"
+# }]) {
+# numUids
+# }
+# }
\ No newline at end of file
diff --git a/controller/schema/dgraph_schema_store.go b/controller/schema/dgraph_schema_store.go
new file mode 100644
index 0000000..87a4d95
--- /dev/null
+++ b/controller/schema/dgraph_schema_store.go
@@ -0,0 +1,85 @@
+package schema
+
+import (
+ "bytes"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "strings"
+
+ "github.com/golang/glog"
+ "github.com/itaysk/regogo"
+ "github.com/vektah/gqlparser/ast"
+ "github.com/vektah/gqlparser/parser"
+)
+
+const jsonContentType = "application/json"
+
+const getSchemaQuery = `{ "query": "{ getGQLSchema() { schema } }" }`
+
+const addSchemaQuery = `{
+ "query": "mutation { updateGQLSchema(input: {set: {schema: \"%s\"}}) { gqlSchema { id schema } } }" }`
+
+type DgraphSchemaStore struct {
+ dgraphAddress string
+ gqlSchema string
+ schema *ast.SchemaDocument
+}
+
+func NewDgraphSchemaStore(dgraphAddress string) (SchemaStore, error) {
+ ret := &DgraphSchemaStore{dgraphAddress: dgraphAddress, gqlSchema: ""}
+ if err := ret.fetchSchema(); err != nil {
+ return nil, err
+ }
+ return ret, nil
+}
+
+func (s *DgraphSchemaStore) Schema() *ast.SchemaDocument {
+ return s.schema
+}
+
+func (s *DgraphSchemaStore) AddSchema(gqlSchema string) error {
+ return s.SetSchema(s.gqlSchema + gqlSchema)
+}
+
+func (s *DgraphSchemaStore) SetSchema(gqlSchema string) error {
+ glog.Info("Setting GraphQL schema:")
+ glog.Info(gqlSchema)
+ req := fmt.Sprintf(addSchemaQuery, strings.ReplaceAll(strings.ReplaceAll(gqlSchema, "\n", " "), "\t", " "))
+ resp, err := http.Post(s.dgraphAddress, jsonContentType, bytes.NewReader([]byte(req)))
+ glog.Infof("Response status code: %d", resp.StatusCode)
+ if err != nil {
+ return err
+ }
+ respBody, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+ glog.Infof("Result: %s", string(respBody))
+ s.gqlSchema = gqlSchema
+ return s.fetchSchema()
+}
+
+func (s *DgraphSchemaStore) fetchSchema() error {
+ glog.Infof("Getting GraphQL schema with query: %s", getSchemaQuery)
+ resp, err := http.Post(s.dgraphAddress, jsonContentType, bytes.NewReader([]byte(getSchemaQuery)))
+ if err != nil {
+ return err
+ }
+ glog.Infof("Response status code: %d", resp.StatusCode)
+ respBody, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+ glog.Infof("Result: %s", string(respBody))
+ gqlSchema, err := regogo.Get(string(respBody), "input.data.getGQLSchema.schema")
+ if err != nil {
+ return err
+ }
+ schema, gqlErr := parser.ParseSchema(&ast.Source{Input: gqlSchema.String()})
+ if gqlErr != nil {
+ return gqlErr
+ }
+ s.schema = schema
+ return nil
+}
diff --git a/controller/schema/schema.go b/controller/schema/schema.go
new file mode 100644
index 0000000..a077ce3
--- /dev/null
+++ b/controller/schema/schema.go
@@ -0,0 +1,38 @@
+package schema
+
+import (
+ "github.com/vektah/gqlparser/ast"
+ "github.com/vektah/gqlparser/parser"
+)
+
+type SchemaStore interface {
+ Schema() *ast.SchemaDocument
+ SetSchema(gqlSchema string) error
+ AddSchema(gqlSchema string) error
+}
+
+type InMemorySchemaStore struct {
+ gqlSchema string
+ schema *ast.SchemaDocument
+}
+
+func NewInMemorySchemaStore() SchemaStore {
+ return &InMemorySchemaStore{gqlSchema: ""}
+}
+
+func (s *InMemorySchemaStore) Schema() *ast.SchemaDocument {
+ return s.schema
+}
+
+func (s *InMemorySchemaStore) AddSchema(gqlSchema string) error {
+ return s.SetSchema(s.gqlSchema + gqlSchema)
+}
+
+func (s *InMemorySchemaStore) SetSchema(gqlSchema string) error {
+ schema, err := parser.ParseSchema(&ast.Source{Input: gqlSchema})
+ if err != nil {
+ return err
+ }
+ s.schema = schema
+ return nil
+}
diff --git a/controller/schema/schema_test.go b/controller/schema/schema_test.go
new file mode 100644
index 0000000..0c73694
--- /dev/null
+++ b/controller/schema/schema_test.go
@@ -0,0 +1,81 @@
+package schema
+
+import (
+ "fmt"
+ "log"
+ "testing"
+)
+
+func TestInMemorySimple(t *testing.T) {
+ s := NewInMemorySchemaStore()
+ err := s.AddSchema(`
+type M {
+ X: Int
+}`)
+ if err != nil {
+ t.Fatal(err)
+ }
+ for _, def := range s.Schema().Definitions {
+ fmt.Printf("%s - %s\n", def.Name, def.Kind)
+ }
+}
+
+func TestInMemory(t *testing.T) {
+ s := NewInMemorySchemaStore()
+ err := s.AddSchema(`
+type Image {
+ id: ID!
+ objectPath: String!
+}
+
+type ImageSegment {
+ id: ID! @search
+ upperLeftX: Float!
+ upperLeftY: Float!
+ lowerRightX: Float!
+ lowerRightY: Float!
+ sourceImage: Image!
+}
+
+extend type Image {
+ segments: [ImageSegment]
+}
+`)
+ if err != nil {
+ t.Fatal(err)
+ }
+ for _, def := range s.Schema().Definitions {
+ fmt.Printf("%s - %s\n", def.Name, def.Kind)
+ }
+}
+
+func TestDgraph(t *testing.T) {
+ s, err := NewDgraphSchemaStore("http://localhost:8080/admin")
+ if err != nil {
+ t.Fatal(err)
+ }
+ if s.Schema() != nil {
+ for _, def := range s.Schema().Definitions {
+ fmt.Printf("%s - %s\n", def.Name, def.Kind)
+ }
+ }
+ err = s.AddSchema("type N { Y: ID! Z: Float }")
+ if err != nil {
+ t.Fatal(err)
+ }
+ if s.Schema() != nil {
+ for _, def := range s.Schema().Definitions {
+ fmt.Printf("%s - %s\n", def.Name, def.Kind)
+ }
+ }
+ log.Print("123123")
+ err = s.AddSchema("type M { X: Int }")
+ if err != nil {
+ t.Fatal(err)
+ }
+ if s.Schema() != nil {
+ for _, def := range s.Schema().Definitions {
+ fmt.Printf("%s - %s\n", def.Name, def.Kind)
+ }
+ }
+}