Basic file uploader implemetation. Does not wait for replication to finish.
diff --git a/api/api.proto b/api/api.proto
index 6640afd..6372af3 100644
--- a/api/api.proto
+++ b/api/api.proto
@@ -4,10 +4,17 @@
option go_package = "api";
-message Chunk {
- string chunk_id = 1;
- int32 size_bytes = 2;
- bytes data = 3;
+enum ChunkStatus {
+ NEW = 0;
+ CREATED = 1;
+ WRITING = 2;
+ REPLICATING = 3;
+ READY = 4;
+}
+
+enum ReplicaRole {
+ SECONDARY = 0;
+ PRIMARY = 1;
}
// ChunkStorage
@@ -15,13 +22,17 @@
service ChunkStorage {
rpc ListChunks(ListChunksRequest) returns (ListChunksResponse) {}
+ rpc CreateChunk(CreateChunkRequest) returns (CreateChunkResponse) {}
+
+ rpc GetChunkStatus(GetChunkStatusRequest) returns (GetChunkStatusResponse) {}
+
rpc ReadChunk(ReadChunkRequest) returns (ReadChunkResponse) {}
+ rpc WriteChunk(WriteChunkRequest) returns (WriteChunkResponse) {}
+
rpc StoreChunk(StoreChunkRequest) returns (StoreChunkResponse) {}
rpc RemoveChunk(RemoveChunkRequest) returns (RemoveChunkResponse) {}
-
- rpc ReplicateChunk(ReplicateChunkRequest) returns (ReplicateChunkResponse) {}
}
message ListChunksRequest {
@@ -31,6 +42,26 @@
repeated string chunk_id = 1;
}
+message CreateChunkRequest {
+ string chunk_id = 1;
+ int32 size = 2;
+ ReplicaRole role = 3;
+ string primary_address = 4;
+}
+
+message CreateChunkResponse {
+}
+
+message GetChunkStatusRequest {
+ string chunk_id = 1;
+}
+
+message GetChunkStatusResponse {
+ ChunkStatus status = 1;
+ int32 total_bytes = 2;
+ int32 committed_bytes = 3;
+}
+
message ReadChunkRequest {
string chunk_id = 1;
int32 offset = 2;
@@ -41,6 +72,16 @@
bytes data = 1;
}
+message WriteChunkRequest {
+ string chunk_id = 1;
+ int32 offset = 2;
+ bytes data = 3;
+}
+
+message WriteChunkResponse {
+ int32 bytes_written = 1;
+}
+
message StoreChunkRequest {
string chunk_id = 1;
bytes data = 2;
@@ -56,14 +97,6 @@
message RemoveChunkResponse {
}
-message ReplicateChunkRequest {
- string chunk_id = 1;
- string primary_chunk_server = 2;
-}
-
-message ReplicateChunkResponse {
-}
-
// MetadataStorage
message ChunkStorageMetadata {
diff --git a/api/client.go b/api/client.go
new file mode 100644
index 0000000..3e05de6
--- /dev/null
+++ b/api/client.go
@@ -0,0 +1,12 @@
+package api
+
+import (
+ "google.golang.org/grpc"
+)
+
+func DialConn(address string) (*grpc.ClientConn, error) {
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ return grpc.Dial(address, opts...)
+}
diff --git a/chunk/chunk.go b/chunk/chunk.go
index 36efcce..438e355 100644
--- a/chunk/chunk.go
+++ b/chunk/chunk.go
@@ -1,13 +1,23 @@
package chunk
-import "io"
+import (
+ "io"
+
+ "pcloud/api"
+)
+
+type ChunkInfo struct {
+ Status api.ChunkStatus
+ Size int
+ Committed int
+}
type Chunk interface {
- SizeBytes() int
- ReadSeeker() io.ReadSeeker
- Writer() io.Writer
+ Stats() (ChunkInfo, error)
+ ReaderAt() io.ReaderAt
+ WriterAt() io.WriterAt
}
type ChunkFactory interface {
- New() Chunk
+ New(size int) Chunk
}
diff --git a/chunk/file.go b/chunk/file.go
new file mode 100644
index 0000000..ddb43f6
--- /dev/null
+++ b/chunk/file.go
@@ -0,0 +1,49 @@
+package chunk
+
+import (
+ "io"
+ "os"
+
+ "pcloud/api"
+)
+
+type ReadOnlyFileChunk struct {
+ f *os.File
+ offset int
+ size int
+}
+
+func NewReadOnlyFileChunk(f *os.File, offset, size int) Chunk {
+ return &ReadOnlyFileChunk{f, offset, size}
+}
+
+func (c *ReadOnlyFileChunk) Stats() (ChunkInfo, error) {
+ return ChunkInfo{
+ Status: api.ChunkStatus_READY,
+ Size: c.size,
+ Committed: c.size}, nil
+}
+
+func (c *ReadOnlyFileChunk) ReaderAt() io.ReaderAt {
+ return &fileReader{c.f}
+}
+
+func (c *ReadOnlyFileChunk) WriterAt() io.WriterAt {
+ return &fileWriter{c.f}
+}
+
+type fileReader struct {
+ f *os.File
+}
+
+func (f *fileReader) ReadAt(b []byte, offset int64) (int, error) {
+ return f.f.ReadAt(b, offset)
+}
+
+type fileWriter struct {
+ f *os.File
+}
+
+func (f *fileWriter) WriteAt(b []byte, offset int64) (int, error) {
+ return f.f.WriteAt(b, offset)
+}
diff --git a/chunk/in_memory.go b/chunk/in_memory.go
index cd95427..ca7259c 100644
--- a/chunk/in_memory.go
+++ b/chunk/in_memory.go
@@ -2,28 +2,67 @@
import (
"bytes"
+ "errors"
"io"
+
+ "pcloud/api"
)
type InMemoryChunk struct {
- payload bytes.Buffer
+ status api.ChunkStatus
+ payload []byte
+ committed int
}
-func (c *InMemoryChunk) SizeBytes() int {
- return len(c.payload.Bytes())
+func (c *InMemoryChunk) Stats() (ChunkInfo, error) {
+ return ChunkInfo{c.status, len(c.payload), c.committed}, nil
}
-func (c *InMemoryChunk) ReadSeeker() io.ReadSeeker {
- return bytes.NewReader(c.payload.Bytes())
+func (c *InMemoryChunk) ReaderAt() io.ReaderAt {
+ return bytes.NewReader(c.payload[:c.committed])
}
-func (c *InMemoryChunk) Writer() io.Writer {
- return &c.payload
+func (c *InMemoryChunk) WriterAt() io.WriterAt {
+ return &byteWriter{c}
+}
+
+type byteWriter struct {
+ c *InMemoryChunk
+}
+
+func (w *byteWriter) WriteAt(p []byte, offset int64) (n int, err error) {
+ if int(offset) > w.c.committed {
+ panic(1)
+ return 0, errors.New("Gaps are not allowed when writing in chunks")
+ }
+ if int(offset) < w.c.committed {
+ if int(offset)+len(p) <= w.c.committed {
+ if bytes.Compare(w.c.payload[int(offset):int(offset)+len(p)], p) != 0 {
+ panic(2)
+ return 0, errors.New("Can not change contents of allready committed chunk bytes")
+ }
+ panic(3)
+ return len(p), nil
+ }
+ n = w.c.committed - int(offset)
+ p = p[n:]
+ offset = int64(w.c.committed)
+ }
+ if w.c.committed+len(p) > len(w.c.payload) {
+ panic(4)
+ return 0, errors.New("In memory chunk does not have enough space available")
+ }
+ n += copy(w.c.payload[w.c.committed:], p)
+ w.c.committed += n
+ return
}
type InMemoryChunkFactory struct {
}
-func (f InMemoryChunkFactory) New() Chunk {
- return &InMemoryChunk{}
+func (f InMemoryChunkFactory) New(size int) Chunk {
+ return &InMemoryChunk{
+ status: api.ChunkStatus_CREATED,
+ payload: make([]byte, size),
+ committed: 0}
}
diff --git a/chunk/in_memory_test.go b/chunk/in_memory_test.go
index 7970d14..b9711ca 100644
--- a/chunk/in_memory_test.go
+++ b/chunk/in_memory_test.go
@@ -6,19 +6,19 @@
)
func TestConcurrentReads(t *testing.T) {
- c := InMemoryChunkFactory{}.New()
- if _, err := c.Writer().Write([]byte("abcd")); err != nil {
+ c := InMemoryChunkFactory{}.New(4)
+ if _, err := c.WriterAt().WriteAt([]byte("abcd"), 0); err != nil {
panic(err)
}
d1 := make([]byte, 2)
d2 := make([]byte, 3)
- if _, err := c.ReadSeeker().Read(d1); err != nil {
+ if _, err := c.ReaderAt().ReadAt(d1, 0); err != nil {
t.Error(err)
}
if bytes.Compare(d1, []byte("ab")) != 0 {
t.Errorf("Expected: %s\nActual: %s", "ab", d1)
}
- if _, err := c.ReadSeeker().Read(d2); err != nil {
+ if _, err := c.ReaderAt().ReadAt(d2, 0); err != nil {
t.Error(err)
}
if bytes.Compare(d2, []byte("abc")) != 0 {
diff --git a/chunk/remote.go b/chunk/remote.go
index 1cb57f1..0da0ad8 100644
--- a/chunk/remote.go
+++ b/chunk/remote.go
@@ -2,7 +2,6 @@
import (
"context"
- "errors"
"io"
"pcloud/api"
@@ -13,48 +12,63 @@
client api.ChunkStorageClient
}
-func (r *RemoteChunk) SizeBytes() int {
- return 0
+func (r *RemoteChunk) Stats() (info ChunkInfo, err error) {
+ resp, err := r.client.GetChunkStatus(
+ context.Background(),
+ &api.GetChunkStatusRequest{ChunkId: r.chunkId})
+ if err != nil {
+ return
+ }
+ info = ChunkInfo{
+ resp.Status,
+ int(resp.TotalBytes),
+ int(resp.CommittedBytes)}
+ return
}
-func (r *RemoteChunk) ReadSeeker() io.ReadSeeker {
- return &remoteChunkReadSeeker{
+func (r *RemoteChunk) ReaderAt() io.ReaderAt {
+ return &remoteChunkReaderAt{
chunkId: r.chunkId,
client: r.client}
}
-func (r *RemoteChunk) Writer() io.Writer {
- return nil
+func (r *RemoteChunk) WriterAt() io.WriterAt {
+ return &remoteChunkWriterAt{
+ chunkId: r.chunkId,
+ client: r.client}
}
-type remoteChunkReadSeeker struct {
+type remoteChunkReaderAt struct {
chunkId string
client api.ChunkStorageClient
- offset int64
}
-func (c *remoteChunkReadSeeker) Seek(offset int64, whence int) (int64, error) {
- if whence != io.SeekStart {
- return 0, errors.New("Seek: RemoteChunk only supports SeekStart whence")
- }
- c.offset = offset
- return offset, nil
-}
-
-func (c *remoteChunkReadSeeker) Read(p []byte) (n int, err error) {
+func (c *remoteChunkReaderAt) ReadAt(p []byte, offset int64) (n int, err error) {
req := api.ReadChunkRequest{
ChunkId: c.chunkId,
- Offset: int32(c.offset), // TODO(lekva): must be int64
+ Offset: int32(offset),
NumBytes: int32(len(p))}
resp, err := c.client.ReadChunk(context.Background(), &req)
if err != nil {
return
}
n = copy(p, resp.Data)
- c.offset += int64(n)
return
}
-type PrimaryReplicaChunk struct {
+type remoteChunkWriterAt struct {
chunkId string
+ client api.ChunkStorageClient
+}
+
+func (c *remoteChunkWriterAt) WriteAt(p []byte, offset int64) (n int, err error) {
+ req := api.WriteChunkRequest{
+ ChunkId: c.chunkId,
+ Offset: int32(offset),
+ Data: p}
+ resp, err := c.client.WriteChunk(context.Background(), &req)
+ if resp != nil {
+ n = int(resp.BytesWritten)
+ }
+ return
}
diff --git a/chunk/replicator.go b/chunk/replicator.go
index 1c77534..fe49862 100644
--- a/chunk/replicator.go
+++ b/chunk/replicator.go
@@ -16,7 +16,13 @@
type NonChangingPrimaryReplicaChangeListener struct {
chunkId string
- address string
+ ch chan string
+}
+
+func NewNonChangingPrimaryReplicaChangeListener(chunkId, address string) PrimaryReplicaChangeListener {
+ ch := make(chan string, 1)
+ ch <- address
+ return &NonChangingPrimaryReplicaChangeListener{chunkId, ch}
}
func (l NonChangingPrimaryReplicaChangeListener) ChunkId() string {
@@ -24,48 +30,54 @@
}
func (l NonChangingPrimaryReplicaChangeListener) Address() <-chan string {
- ch := make(chan string, 1)
- ch <- l.address
- return ch
+ return l.ch
}
-func replicate(ctx context.Context, dst, src Chunk) {
- inp := src.ReadSeeker()
- inp.Seek(int64(src.SizeBytes()), io.SeekStart)
- out := dst.Writer()
+func replicate(ctx context.Context, dst, src Chunk, done chan<- int) {
+ dstInfo, err := dst.Stats()
+ if err != nil {
+ panic(err)
+ }
+ inp := src.ReaderAt()
+ replicated := dstInfo.Committed
+ out := dst.WriterAt()
for {
select {
default:
p := make([]byte, 100)
- n, _ := inp.Read(p)
+ n, err := inp.ReadAt(p, int64(replicated))
if n > 0 {
- out.Write(p[:n])
+ m, _ := out.WriteAt(p[:n], int64(replicated))
+ replicated += m
}
+ if err == io.EOF {
+ done <- 1
+ return
+ }
+
case <-ctx.Done():
return
}
}
}
-func dialConn(address string) (*grpc.ClientConn, error) {
- var opts []grpc.DialOption
- opts = append(opts, grpc.WithInsecure())
- opts = append(opts, grpc.WithBlock())
- return grpc.Dial(address, opts...)
-
-}
-
-func replicateFromPrimary(ctx context.Context, dst Chunk, l PrimaryReplicaChangeListener) {
+func ReplicateFromPrimary(ctx context.Context, dst Chunk, l PrimaryReplicaChangeListener) {
+ var done chan int
var cancel context.CancelFunc = nil
for {
select {
+ case <-done:
+ return
case <-ctx.Done():
return
case address := <-l.Address():
if cancel != nil {
cancel()
}
- conn, err := dialConn(address)
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ conn, err := grpc.Dial(address, opts...)
if err == nil {
continue
}
@@ -73,7 +85,38 @@
src := RemoteChunk{l.ChunkId(), client}
replicatorCtx, cancelFn := context.WithCancel(context.Background())
cancel = cancelFn
- go replicate(replicatorCtx, dst, &src)
+ done = make(chan int, 1)
+ go replicate(replicatorCtx, dst, &src, done)
+ }
+ }
+}
+
+func WriteToPrimary(ctx context.Context, src Chunk, l PrimaryReplicaChangeListener) {
+ var done chan int
+ var cancel context.CancelFunc = nil
+ for {
+ select {
+ case <-done:
+ return
+ case <-ctx.Done():
+ return
+ case address := <-l.Address():
+ if cancel != nil {
+ cancel()
+ }
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ conn, err := grpc.Dial(address, opts...)
+ if err != nil {
+ continue
+ }
+ client := api.NewChunkStorageClient(conn)
+ dst := RemoteChunk{l.ChunkId(), client}
+ replicatorCtx, cancelFn := context.WithCancel(context.Background())
+ cancel = cancelFn
+ done = make(chan int, 1)
+ go replicate(replicatorCtx, &dst, src, done)
}
}
}
diff --git a/chunk/server.go b/chunk/server.go
index 2829471..0566c16 100644
--- a/chunk/server.go
+++ b/chunk/server.go
@@ -1,9 +1,9 @@
package chunk
import (
- "bytes"
"context"
- "io"
+ "fmt"
+ "log"
"sync"
@@ -16,8 +16,8 @@
replicatorCancel sync.Map
}
-func NewChunkServer() *ChunkServer {
- return &ChunkServer{}
+func NewChunkServer(factory ChunkFactory) *ChunkServer {
+ return &ChunkServer{factory: factory}
}
func (s *ChunkServer) ListChunks(
@@ -31,56 +31,95 @@
return
}
+func (s *ChunkServer) CreateChunk(
+ ctx context.Context,
+ req *api.CreateChunkRequest) (resp *api.CreateChunkResponse, err error) {
+ chunk := s.factory.New(int(req.Size))
+ s.chunks.Store(req.ChunkId, chunk)
+ switch req.Role {
+ case api.ReplicaRole_SECONDARY:
+ ctx, cancel := context.WithCancel(context.Background())
+ s.replicatorCancel.Store(req.ChunkId, cancel)
+ primaryListener := NewNonChangingPrimaryReplicaChangeListener(
+ req.ChunkId,
+ req.PrimaryAddress)
+ go ReplicateFromPrimary(ctx, chunk, primaryListener)
+ case api.ReplicaRole_PRIMARY:
+ {
+ }
+ }
+ resp = &api.CreateChunkResponse{}
+ log.Printf("Created chunk: %s\n", req.ChunkId)
+ return
+
+}
+
+func (s *ChunkServer) GetChunkStatus(
+ ctx context.Context,
+ req *api.GetChunkStatusRequest) (resp *api.GetChunkStatusResponse, err error) {
+ if chunk, ok := s.chunks.Load(req.ChunkId); ok {
+ c := chunk.(Chunk)
+ var info ChunkInfo
+ info, err = c.Stats()
+ if err != nil {
+ return
+ }
+ resp = &api.GetChunkStatusResponse{
+ Status: info.Status,
+ TotalBytes: int32(info.Size),
+ CommittedBytes: int32(info.Committed)}
+ return
+ }
+ return nil, fmt.Errorf("Could not fund chunk: %s", req.ChunkId)
+}
+
func (s *ChunkServer) ReadChunk(
ctx context.Context,
req *api.ReadChunkRequest) (resp *api.ReadChunkResponse, err error) {
if value, ok := s.chunks.Load(req.ChunkId); ok {
chunk := value.(Chunk)
- src := chunk.ReadSeeker()
- if req.Offset != 0 {
- _, err = src.Seek(int64(req.Offset), io.SeekStart)
- if err != nil {
- return
- }
+ b := make([]byte, req.NumBytes)
+ var n int
+ n, err = chunk.ReaderAt().ReadAt(b, int64(req.Offset))
+ if n == 0 {
+ return
}
- var dst bytes.Buffer
- if req.NumBytes != 0 {
- _, err = io.CopyN(&dst, src, int64(req.NumBytes))
- } else {
- _, err = io.Copy(&dst, src)
- }
- if err == nil {
- resp = &api.ReadChunkResponse{Data: dst.Bytes()}
- }
+ return &api.ReadChunkResponse{Data: b[:n]}, nil
+
+ } else {
+ return nil, fmt.Errorf("Chunk not found: %s", req.ChunkId)
}
- return
+}
+
+func (s *ChunkServer) WriteChunk(
+ ctx context.Context,
+ req *api.WriteChunkRequest) (resp *api.WriteChunkResponse, err error) {
+ if value, ok := s.chunks.Load(req.ChunkId); ok {
+ chunk := value.(Chunk)
+ var n int
+ n, err = chunk.WriterAt().WriteAt(req.Data, int64(req.Offset))
+ if n == 0 {
+ return
+ }
+ return &api.WriteChunkResponse{BytesWritten: int32(n)}, nil
+
+ } else {
+ return nil, fmt.Errorf("Chunk not found: %s", req.ChunkId)
+ }
}
func (s *ChunkServer) StoreChunk(
ctx context.Context,
req *api.StoreChunkRequest) (resp *api.StoreChunkResponse, err error) {
- chunk := s.factory.New()
- _, err = chunk.Writer().Write(req.Data)
+ chunk := s.factory.New(len(req.Data))
s.chunks.Store(req.ChunkId, chunk)
+ _, err = chunk.WriterAt().WriteAt(req.Data, 0)
if err == nil {
resp = &api.StoreChunkResponse{}
}
return
}
-func (s *ChunkServer) ReplicateChunk(
- ctx context.Context,
- req *api.ReplicateChunkRequest) (resp *api.ReplicateChunkResponse, err error) {
- chunk := s.factory.New()
- s.chunks.Store(req.ChunkId, chunk)
- ctx, cancel := context.WithCancel(context.Background())
- s.replicatorCancel.Store(req.ChunkId, cancel)
- go replicateFromPrimary(ctx, chunk, NonChangingPrimaryReplicaChangeListener{req.ChunkId, req.PrimaryChunkServer})
- resp = &api.ReplicateChunkResponse{}
- return
-
-}
-
func (s *ChunkServer) RemoveChunk(
ctx context.Context,
req *api.RemoveChunkRequest) (resp *api.RemoveChunkResponse, err error) {
diff --git a/chunk/server_test.go b/chunk/server_test.go
index f57a925..70f209f 100644
--- a/chunk/server_test.go
+++ b/chunk/server_test.go
@@ -27,7 +27,8 @@
t.Error(err)
}
resp, err := s.ReadChunk(context.Background(), &api.ReadChunkRequest{
- ChunkId: "foo"})
+ ChunkId: "foo",
+ NumBytes: 100})
if err != nil {
t.Error(err)
}
@@ -65,8 +66,9 @@
t.Errorf("Expected: %s\nGot: %s\n", "ll", resp.Data)
}
resp, err = s.ReadChunk(context.Background(), &api.ReadChunkRequest{
- ChunkId: "foo",
- Offset: 4})
+ ChunkId: "foo",
+ Offset: 4,
+ NumBytes: 100})
if err != nil {
t.Error(err)
}
diff --git a/client.go b/client.go
new file mode 100644
index 0000000..c8f35ad
--- /dev/null
+++ b/client.go
@@ -0,0 +1,36 @@
+package main
+
+import (
+ "flag"
+ "log"
+ "os"
+
+ "google.golang.org/grpc"
+
+ "pcloud/api"
+ "pcloud/client"
+)
+
+var masterAddress = flag.String("master", "localhost:123", "Metadata storage address.")
+var fileToUpload = flag.String("file", "", "File path to upload.")
+
+func main() {
+ flag.Parse()
+
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ conn, err := grpc.Dial(*masterAddress, opts...)
+ if err != nil {
+ log.Fatalf("Failed to dial %s: %v", *masterAddress, err)
+ }
+ defer conn.Close()
+ uploader := client.NewFileUploader(api.NewMetadataStorageClient(conn))
+
+ f, err := os.Open(*fileToUpload)
+ if err != nil {
+ panic(err)
+ }
+
+ uploader.Upload(f)
+}
diff --git a/client/client.go b/client/client.go
index 8c90a36..7a98d79 100644
--- a/client/client.go
+++ b/client/client.go
@@ -1,21 +1,41 @@
package client
-import "os"
+import (
+ "context"
+ "os"
-import "pcloud/api"
+ "pcloud/api"
+ "pcloud/chunk"
+)
type FileUploader struct {
- client api.MetadataStorageServerClient
+ client api.MetadataStorageClient
}
-func NewFileUploader(client api.MetadataStorageServerClient) *FileUploader {
- return FileUploader{client}
+func NewFileUploader(client api.MetadataStorageClient) *FileUploader {
+ return &FileUploader{client}
}
-func (fu *FileUploader) Upload(f *os.File) (n int64, err error) {
-
- buf := make([]byte, 1000)
- for {
- n, err := f.Read(buf)
+func (fu *FileUploader) Upload(f *os.File) {
+ info, err := f.Stat()
+ if err != nil {
+ return
}
+ resp, err := fu.client.CreateBlob(
+ context.Background(), &api.CreateBlobRequest{
+ SizeBytes: int32(info.Size()),
+ NumReplicas: 1})
+ if len(resp.Chunk) != 1 {
+ panic(resp)
+ }
+ if err != nil {
+ panic(err)
+ }
+ primaryListener := chunk.NewNonChangingPrimaryReplicaChangeListener(
+ resp.Chunk[0].ChunkId,
+ resp.Chunk[0].Server[0])
+ chunk.WriteToPrimary(
+ context.Background(),
+ chunk.NewReadOnlyFileChunk(f, 0, int(info.Size())),
+ primaryListener)
}
diff --git a/client/client_test.go b/client/client_test.go
new file mode 100644
index 0000000..21443d4
--- /dev/null
+++ b/client/client_test.go
@@ -0,0 +1,37 @@
+package client
+
+import (
+ "os"
+ "testing"
+
+ "google.golang.org/grpc"
+
+ "pcloud/api"
+ pt "pcloud/testing"
+)
+
+func TestUploadSmallFile(t *testing.T) {
+ env, err := pt.NewInMemoryEnv(1)
+ if err != nil {
+ t.Error(err)
+ }
+ defer env.Stop()
+
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ conn, err := grpc.Dial("unix:///tmp/pcloud/master", opts...)
+ if err != nil {
+ t.Error(err)
+ }
+ defer conn.Close()
+ client := api.NewMetadataStorageClient(conn)
+
+ uploader := NewFileUploader(client)
+ f, err := os.Open("testdata/foo")
+ if err != nil {
+ t.Error(err)
+ }
+ uploader.Upload(f)
+
+}
diff --git a/client/testdata/foo b/client/testdata/foo
new file mode 100644
index 0000000..257cc56
--- /dev/null
+++ b/client/testdata/foo
@@ -0,0 +1 @@
+foo
diff --git a/k8s/deployment.yaml b/k8s/deployment.yaml
index 608caad..ca8f173 100644
--- a/k8s/deployment.yaml
+++ b/k8s/deployment.yaml
@@ -76,3 +76,31 @@
- name: code
hostPath:
path: "/Users/lekva/dev/go/src/pcloud"
+---
+kind: Deployment
+apiVersion: apps/v1
+metadata:
+ name: pcloud-client
+spec:
+ selector:
+ matchLabels:
+ app: pcloud-client
+ replicas: 1
+ template:
+ metadata:
+ labels:
+ app: pcloud-client
+ spec:
+ containers:
+ - name: pcloud-client
+ image: pcloud:latest
+ imagePullPolicy: Never
+ volumeMounts:
+ - name: code
+ mountPath: /src/go/src/pcloud
+ command: ["/bin/sh"]
+ args: ["-c", "protoc api/api.proto --go_out=plugins=grpc:. && go run client.go --master=pcloud-master-service:111 --file=data/foo"]
+ volumes:
+ - name: code
+ hostPath:
+ path: "/Users/lekva/dev/go/src/pcloud"
diff --git a/master/chunk.go b/master/chunk.go
new file mode 100644
index 0000000..00798c9
--- /dev/null
+++ b/master/chunk.go
@@ -0,0 +1,18 @@
+package master
+
+import (
+ "pcloud/api"
+)
+
+type chunkServerStatus int
+
+const (
+ Healthy chunkServerStatus = iota
+ UNREACHABLE
+)
+
+type chunkServer struct {
+ address string
+ status chunkServerStatus
+ chunks map[string]api.ChunkStatus
+}
diff --git a/master/server.go b/master/server.go
index 70c6d29..b6e76fa 100644
--- a/master/server.go
+++ b/master/server.go
@@ -6,6 +6,7 @@
"math/rand"
"github.com/google/uuid"
+ "google.golang.org/grpc"
"pcloud/api"
)
@@ -44,7 +45,7 @@
}
type MasterServer struct {
- chunkServers []string
+ chunkServers []*chunkServer
blobs []*blob
}
@@ -54,16 +55,18 @@
func (s *MasterServer) AddChunkServer(
ctx context.Context,
- request *api.AddChunkServerRequest) (*api.AddChunkServerResponse, error) {
- s.chunkServers = append(s.chunkServers, request.Address)
- log.Printf("Registered Chunk server: %s", request.Address)
+ req *api.AddChunkServerRequest) (*api.AddChunkServerResponse, error) {
+ s.chunkServers = append(s.chunkServers, &chunkServer{
+ address: req.Address,
+ status: Healthy})
+ log.Printf("Registered Chunk server: %s", req.Address)
return &api.AddChunkServerResponse{}, nil
}
func (s *MasterServer) CreateBlob(
ctx context.Context,
- request *api.CreateBlobRequest) (*api.CreateBlobResponse, error) {
- if int(request.NumReplicas) > len(s.chunkServers) {
+ req *api.CreateBlobRequest) (*api.CreateBlobResponse, error) {
+ if int(req.NumReplicas) > len(s.chunkServers) {
return nil, nil
}
resp := api.CreateBlobResponse{
@@ -71,17 +74,43 @@
Chunk: []*api.ChunkStorageMetadata{
{ChunkId: uuid.New().String()},
}}
- ids := rand.Perm(len(s.chunkServers))
- for i := 0; i < int(request.NumReplicas); i++ {
- resp.Chunk[0].Server = append(
- resp.Chunk[0].Server,
- s.chunkServers[ids[i]])
+ assigned := 0
+ chunkId := resp.Chunk[0].ChunkId
+ for i := range rand.Perm(len(s.chunkServers)) {
+ if assigned == int(req.NumReplicas) {
+ break
+ }
+ address := s.chunkServers[i].address
+ log.Printf(address)
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ conn, err := grpc.Dial(address, opts...)
+ if err != nil {
+ continue
+ }
+ defer conn.Close()
+ client := api.NewChunkStorageClient(conn)
+ createChunkReq := api.CreateChunkRequest{
+ ChunkId: chunkId,
+ Size: req.SizeBytes}
+ if assigned == 0 {
+ createChunkReq.Role = api.ReplicaRole_PRIMARY
+ } else {
+ createChunkReq.Role = api.ReplicaRole_SECONDARY
+ createChunkReq.PrimaryAddress = resp.Chunk[0].Server[0]
+ }
+ _, err = client.CreateChunk(ctx, &createChunkReq)
+ if err == nil {
+ resp.Chunk[0].Server = append(resp.Chunk[0].Server, address)
+ assigned++
+ }
}
return &resp, nil
}
func (s *MasterServer) GetBlobMetadata(
ctx context.Context,
- request *api.GetBlobMetadataRequest) (*api.GetBlobMetadataResponse, error) {
+ req *api.GetBlobMetadataRequest) (*api.GetBlobMetadataResponse, error) {
return nil, nil
}
diff --git a/testing/in_memory_env.go b/testing/in_memory_env.go
new file mode 100644
index 0000000..a4cbcd4
--- /dev/null
+++ b/testing/in_memory_env.go
@@ -0,0 +1,82 @@
+package testing
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "syscall"
+ "time"
+
+ "google.golang.org/grpc"
+
+ "pcloud/api"
+ "pcloud/chunk"
+ "pcloud/master"
+)
+
+type InMemoryEnv struct {
+ m *grpc.Server
+ c []*grpc.Server
+ masterConn *grpc.ClientConn
+}
+
+func NewInMemoryEnv(numChunkServers int) (*InMemoryEnv, error) {
+ env := new(InMemoryEnv)
+ syscall.Unlink("/tmp/pcloud/master")
+ lis, err := net.Listen("unix", "/tmp/pcloud/master")
+ if err != nil {
+ return nil, err
+ }
+ server := grpc.NewServer()
+ api.RegisterMetadataStorageServer(server, master.NewMasterServer())
+ go server.Serve(lis)
+
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ conn, err := grpc.Dial("unix:/tmp/pcloud/master", opts...)
+ if err != nil {
+ return nil, err
+ }
+ env.masterConn = conn
+ client := api.NewMetadataStorageClient(conn)
+
+ env.c = make([]*grpc.Server, numChunkServers)
+ for i := 0; i < numChunkServers; i++ {
+ unixSocket := fmt.Sprintf("/tmp/pcloud/chunk-%d", i)
+ syscall.Unlink(unixSocket)
+ lis, err := net.Listen("unix", unixSocket)
+ if err != nil {
+ return nil, err
+ }
+ server := grpc.NewServer()
+ api.RegisterChunkStorageServer(server, chunk.NewChunkServer(&chunk.InMemoryChunkFactory{}))
+ go server.Serve(lis)
+ env.c[i] = server
+ }
+
+ for i := 0; i < numChunkServers; i++ {
+ ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
+ _, err = client.AddChunkServer(
+ ctx,
+ &api.AddChunkServerRequest{Address: fmt.Sprintf("unix:///tmp/pcloud/chunk-%d", i)})
+ if err != nil {
+ return nil, err
+ }
+ }
+ return env, nil
+}
+
+func (e *InMemoryEnv) Stop() {
+ if e.masterConn != nil {
+ e.masterConn.Close()
+ }
+ for _, s := range e.c {
+ if s != nil {
+ s.GracefulStop()
+ }
+ }
+ if e.m != nil {
+ e.m.GracefulStop()
+ }
+}
diff --git a/testing/simple_test.go b/testing/simple_test.go
new file mode 100644
index 0000000..d8e2510
--- /dev/null
+++ b/testing/simple_test.go
@@ -0,0 +1,13 @@
+package testing
+
+import (
+ "testing"
+)
+
+func TestSetup(t *testing.T) {
+ env, err := NewInMemoryEnv(3)
+ if err != nil {
+ t.Error(err)
+ }
+ defer env.Stop()
+}