| Giorgi Lekveishvili | b8f089f | 2020-03-18 23:28:12 +0400 | [diff] [blame] | 1 | package chunk |
| 2 | |
| Giorgi Lekveishvili | 45b4d52 | 2020-03-19 21:11:18 +0400 | [diff] [blame] | 3 | import "bytes" |
| Giorgi Lekveishvili | b8f089f | 2020-03-18 23:28:12 +0400 | [diff] [blame] | 4 | import "context" |
| Giorgi Lekveishvili | 45b4d52 | 2020-03-19 21:11:18 +0400 | [diff] [blame] | 5 | import "io" |
| Giorgi Lekveishvili | b8f089f | 2020-03-18 23:28:12 +0400 | [diff] [blame] | 6 | import "sync" |
| 7 | |
| 8 | import "pcloud/api" |
| 9 | |
| 10 | type ChunkServer struct { |
| giolekva | 7be17df | 2020-03-21 13:57:02 +0400 | [diff] [blame^] | 11 | factory ChunkFactory |
| 12 | chunks sync.Map |
| 13 | replicatorCancel sync.Map |
| Giorgi Lekveishvili | b8f089f | 2020-03-18 23:28:12 +0400 | [diff] [blame] | 14 | } |
| 15 | |
| 16 | func NewChunkServer() *ChunkServer { |
| 17 | return &ChunkServer{} |
| 18 | } |
| 19 | |
| 20 | func (s *ChunkServer) ListChunks( |
| 21 | ctx context.Context, |
| giolekva | 7be17df | 2020-03-21 13:57:02 +0400 | [diff] [blame^] | 22 | req *api.ListChunksRequest) (resp *api.ListChunksResponse, err error) { |
| 23 | resp = &api.ListChunksResponse{} |
| Giorgi Lekveishvili | b8f089f | 2020-03-18 23:28:12 +0400 | [diff] [blame] | 24 | s.chunks.Range(func(k, v interface{}) bool { |
| 25 | resp.ChunkId = append(resp.ChunkId, k.(string)) |
| 26 | return true |
| 27 | }) |
| giolekva | 7be17df | 2020-03-21 13:57:02 +0400 | [diff] [blame^] | 28 | return |
| Giorgi Lekveishvili | b8f089f | 2020-03-18 23:28:12 +0400 | [diff] [blame] | 29 | } |
| 30 | |
| 31 | func (s *ChunkServer) ReadChunk( |
| 32 | ctx context.Context, |
| Giorgi Lekveishvili | 45b4d52 | 2020-03-19 21:11:18 +0400 | [diff] [blame] | 33 | req *api.ReadChunkRequest) (resp *api.ReadChunkResponse, err error) { |
| 34 | if value, ok := s.chunks.Load(req.ChunkId); ok { |
| 35 | chunk := value.(Chunk) |
| 36 | src := chunk.ReadSeeker() |
| 37 | if req.Offset != 0 { |
| 38 | _, err = src.Seek(int64(req.Offset), io.SeekStart) |
| 39 | if err != nil { |
| 40 | return |
| 41 | } |
| 42 | } |
| 43 | var dst bytes.Buffer |
| 44 | if req.NumBytes != 0 { |
| 45 | _, err = io.CopyN(&dst, src, int64(req.NumBytes)) |
| 46 | } else { |
| 47 | _, err = io.Copy(&dst, src) |
| 48 | } |
| 49 | if err == nil { |
| 50 | resp = &api.ReadChunkResponse{Data: dst.Bytes()} |
| 51 | } |
| Giorgi Lekveishvili | b8f089f | 2020-03-18 23:28:12 +0400 | [diff] [blame] | 52 | } |
| Giorgi Lekveishvili | 45b4d52 | 2020-03-19 21:11:18 +0400 | [diff] [blame] | 53 | return |
| Giorgi Lekveishvili | b8f089f | 2020-03-18 23:28:12 +0400 | [diff] [blame] | 54 | } |
| 55 | |
| 56 | func (s *ChunkServer) StoreChunk( |
| 57 | ctx context.Context, |
| giolekva | 7be17df | 2020-03-21 13:57:02 +0400 | [diff] [blame^] | 58 | req *api.StoreChunkRequest) (resp *api.StoreChunkResponse, err error) { |
| 59 | chunk := s.factory.New() |
| 60 | _, err = chunk.Writer().Write(req.Data) |
| Giorgi Lekveishvili | 45b4d52 | 2020-03-19 21:11:18 +0400 | [diff] [blame] | 61 | s.chunks.Store(req.ChunkId, chunk) |
| giolekva | 7be17df | 2020-03-21 13:57:02 +0400 | [diff] [blame^] | 62 | if err == nil { |
| 63 | resp = &api.StoreChunkResponse{} |
| 64 | } |
| 65 | return |
| 66 | } |
| 67 | |
| 68 | func (s *ChunkServer) ReplicateChunk( |
| 69 | ctx context.Context, |
| 70 | req *api.ReplicateChunkRequest) (resp *api.ReplicateChunkResponse, err error) { |
| 71 | chunk := s.factory.New() |
| 72 | s.chunks.Store(req.ChunkId, chunk) |
| 73 | ctx, cancel := context.WithCancel(context.Background()) |
| 74 | s.replicatorCancel.Store(req.ChunkId, cancel) |
| 75 | go replicateFromPrimary(ctx, chunk, NonChangingPrimaryReplicaChangeListener{req.ChunkId, req.PrimaryChunkServer}) |
| 76 | resp = &api.ReplicateChunkResponse{} |
| 77 | return |
| 78 | |
| 79 | } |
| 80 | |
| 81 | func (s *ChunkServer) RemoveChunk( |
| 82 | ctx context.Context, |
| 83 | req *api.RemoveChunkRequest) (resp *api.RemoveChunkResponse, err error) { |
| 84 | if cancel, ok := s.replicatorCancel.Load(req.ChunkId); ok { |
| 85 | cancel.(context.CancelFunc)() |
| 86 | s.replicatorCancel.Delete(req.ChunkId) |
| 87 | } |
| 88 | s.chunks.Delete(req.ChunkId) |
| 89 | return &api.RemoveChunkResponse{}, nil |
| Giorgi Lekveishvili | b8f089f | 2020-03-18 23:28:12 +0400 | [diff] [blame] | 90 | } |