Basic file uploader implemetation. Does not wait for replication to finish.
diff --git a/chunk/server.go b/chunk/server.go
index 2829471..0566c16 100644
--- a/chunk/server.go
+++ b/chunk/server.go
@@ -1,9 +1,9 @@
package chunk
import (
- "bytes"
"context"
- "io"
+ "fmt"
+ "log"
"sync"
@@ -16,8 +16,8 @@
replicatorCancel sync.Map
}
-func NewChunkServer() *ChunkServer {
- return &ChunkServer{}
+func NewChunkServer(factory ChunkFactory) *ChunkServer {
+ return &ChunkServer{factory: factory}
}
func (s *ChunkServer) ListChunks(
@@ -31,56 +31,95 @@
return
}
+func (s *ChunkServer) CreateChunk(
+ ctx context.Context,
+ req *api.CreateChunkRequest) (resp *api.CreateChunkResponse, err error) {
+ chunk := s.factory.New(int(req.Size))
+ s.chunks.Store(req.ChunkId, chunk)
+ switch req.Role {
+ case api.ReplicaRole_SECONDARY:
+ ctx, cancel := context.WithCancel(context.Background())
+ s.replicatorCancel.Store(req.ChunkId, cancel)
+ primaryListener := NewNonChangingPrimaryReplicaChangeListener(
+ req.ChunkId,
+ req.PrimaryAddress)
+ go ReplicateFromPrimary(ctx, chunk, primaryListener)
+ case api.ReplicaRole_PRIMARY:
+ {
+ }
+ }
+ resp = &api.CreateChunkResponse{}
+ log.Printf("Created chunk: %s\n", req.ChunkId)
+ return
+
+}
+
+func (s *ChunkServer) GetChunkStatus(
+ ctx context.Context,
+ req *api.GetChunkStatusRequest) (resp *api.GetChunkStatusResponse, err error) {
+ if chunk, ok := s.chunks.Load(req.ChunkId); ok {
+ c := chunk.(Chunk)
+ var info ChunkInfo
+ info, err = c.Stats()
+ if err != nil {
+ return
+ }
+ resp = &api.GetChunkStatusResponse{
+ Status: info.Status,
+ TotalBytes: int32(info.Size),
+ CommittedBytes: int32(info.Committed)}
+ return
+ }
+ return nil, fmt.Errorf("Could not fund chunk: %s", req.ChunkId)
+}
+
func (s *ChunkServer) ReadChunk(
ctx context.Context,
req *api.ReadChunkRequest) (resp *api.ReadChunkResponse, err error) {
if value, ok := s.chunks.Load(req.ChunkId); ok {
chunk := value.(Chunk)
- src := chunk.ReadSeeker()
- if req.Offset != 0 {
- _, err = src.Seek(int64(req.Offset), io.SeekStart)
- if err != nil {
- return
- }
+ b := make([]byte, req.NumBytes)
+ var n int
+ n, err = chunk.ReaderAt().ReadAt(b, int64(req.Offset))
+ if n == 0 {
+ return
}
- var dst bytes.Buffer
- if req.NumBytes != 0 {
- _, err = io.CopyN(&dst, src, int64(req.NumBytes))
- } else {
- _, err = io.Copy(&dst, src)
- }
- if err == nil {
- resp = &api.ReadChunkResponse{Data: dst.Bytes()}
- }
+ return &api.ReadChunkResponse{Data: b[:n]}, nil
+
+ } else {
+ return nil, fmt.Errorf("Chunk not found: %s", req.ChunkId)
}
- return
+}
+
+func (s *ChunkServer) WriteChunk(
+ ctx context.Context,
+ req *api.WriteChunkRequest) (resp *api.WriteChunkResponse, err error) {
+ if value, ok := s.chunks.Load(req.ChunkId); ok {
+ chunk := value.(Chunk)
+ var n int
+ n, err = chunk.WriterAt().WriteAt(req.Data, int64(req.Offset))
+ if n == 0 {
+ return
+ }
+ return &api.WriteChunkResponse{BytesWritten: int32(n)}, nil
+
+ } else {
+ return nil, fmt.Errorf("Chunk not found: %s", req.ChunkId)
+ }
}
func (s *ChunkServer) StoreChunk(
ctx context.Context,
req *api.StoreChunkRequest) (resp *api.StoreChunkResponse, err error) {
- chunk := s.factory.New()
- _, err = chunk.Writer().Write(req.Data)
+ chunk := s.factory.New(len(req.Data))
s.chunks.Store(req.ChunkId, chunk)
+ _, err = chunk.WriterAt().WriteAt(req.Data, 0)
if err == nil {
resp = &api.StoreChunkResponse{}
}
return
}
-func (s *ChunkServer) ReplicateChunk(
- ctx context.Context,
- req *api.ReplicateChunkRequest) (resp *api.ReplicateChunkResponse, err error) {
- chunk := s.factory.New()
- s.chunks.Store(req.ChunkId, chunk)
- ctx, cancel := context.WithCancel(context.Background())
- s.replicatorCancel.Store(req.ChunkId, cancel)
- go replicateFromPrimary(ctx, chunk, NonChangingPrimaryReplicaChangeListener{req.ChunkId, req.PrimaryChunkServer})
- resp = &api.ReplicateChunkResponse{}
- return
-
-}
-
func (s *ChunkServer) RemoveChunk(
ctx context.Context,
req *api.RemoveChunkRequest) (resp *api.RemoveChunkResponse, err error) {