| Giorgi Lekveishvili | b8f089f | 2020-03-18 23:28:12 +0400 | [diff] [blame] | 1 | package chunk |
| 2 | |
| giolekva | c5126d9 | 2020-03-21 16:39:56 +0400 | [diff] [blame] | 3 | import ( |
| giolekva | c5126d9 | 2020-03-21 16:39:56 +0400 | [diff] [blame] | 4 | "context" |
| giolekva | 1f6577a | 2020-03-25 12:53:06 +0400 | [diff] [blame] | 5 | "fmt" |
| 6 | "log" |
| Giorgi Lekveishvili | b8f089f | 2020-03-18 23:28:12 +0400 | [diff] [blame] | 7 | |
| giolekva | c5126d9 | 2020-03-21 16:39:56 +0400 | [diff] [blame] | 8 | "sync" |
| 9 | |
| 10 | "pcloud/api" |
| 11 | ) |
| Giorgi Lekveishvili | b8f089f | 2020-03-18 23:28:12 +0400 | [diff] [blame] | 12 | |
| 13 | type ChunkServer struct { |
| giolekva | 7be17df | 2020-03-21 13:57:02 +0400 | [diff] [blame] | 14 | factory ChunkFactory |
| 15 | chunks sync.Map |
| 16 | replicatorCancel sync.Map |
| Giorgi Lekveishvili | b8f089f | 2020-03-18 23:28:12 +0400 | [diff] [blame] | 17 | } |
| 18 | |
| giolekva | 1f6577a | 2020-03-25 12:53:06 +0400 | [diff] [blame] | 19 | func NewChunkServer(factory ChunkFactory) *ChunkServer { |
| 20 | return &ChunkServer{factory: factory} |
| Giorgi Lekveishvili | b8f089f | 2020-03-18 23:28:12 +0400 | [diff] [blame] | 21 | } |
| 22 | |
| 23 | func (s *ChunkServer) ListChunks( |
| 24 | ctx context.Context, |
| giolekva | 7be17df | 2020-03-21 13:57:02 +0400 | [diff] [blame] | 25 | req *api.ListChunksRequest) (resp *api.ListChunksResponse, err error) { |
| 26 | resp = &api.ListChunksResponse{} |
| Giorgi Lekveishvili | b8f089f | 2020-03-18 23:28:12 +0400 | [diff] [blame] | 27 | s.chunks.Range(func(k, v interface{}) bool { |
| 28 | resp.ChunkId = append(resp.ChunkId, k.(string)) |
| 29 | return true |
| 30 | }) |
| giolekva | 7be17df | 2020-03-21 13:57:02 +0400 | [diff] [blame] | 31 | return |
| Giorgi Lekveishvili | b8f089f | 2020-03-18 23:28:12 +0400 | [diff] [blame] | 32 | } |
| 33 | |
| giolekva | 1f6577a | 2020-03-25 12:53:06 +0400 | [diff] [blame] | 34 | func (s *ChunkServer) CreateChunk( |
| 35 | ctx context.Context, |
| 36 | req *api.CreateChunkRequest) (resp *api.CreateChunkResponse, err error) { |
| 37 | chunk := s.factory.New(int(req.Size)) |
| 38 | s.chunks.Store(req.ChunkId, chunk) |
| 39 | switch req.Role { |
| 40 | case api.ReplicaRole_SECONDARY: |
| 41 | ctx, cancel := context.WithCancel(context.Background()) |
| 42 | s.replicatorCancel.Store(req.ChunkId, cancel) |
| 43 | primaryListener := NewNonChangingPrimaryReplicaChangeListener( |
| 44 | req.ChunkId, |
| 45 | req.PrimaryAddress) |
| 46 | go ReplicateFromPrimary(ctx, chunk, primaryListener) |
| 47 | case api.ReplicaRole_PRIMARY: |
| 48 | { |
| 49 | } |
| 50 | } |
| 51 | resp = &api.CreateChunkResponse{} |
| 52 | log.Printf("Created chunk: %s\n", req.ChunkId) |
| 53 | return |
| 54 | |
| 55 | } |
| 56 | |
| 57 | func (s *ChunkServer) GetChunkStatus( |
| 58 | ctx context.Context, |
| 59 | req *api.GetChunkStatusRequest) (resp *api.GetChunkStatusResponse, err error) { |
| 60 | if chunk, ok := s.chunks.Load(req.ChunkId); ok { |
| 61 | c := chunk.(Chunk) |
| 62 | var info ChunkInfo |
| 63 | info, err = c.Stats() |
| 64 | if err != nil { |
| 65 | return |
| 66 | } |
| 67 | resp = &api.GetChunkStatusResponse{ |
| 68 | Status: info.Status, |
| 69 | TotalBytes: int32(info.Size), |
| 70 | CommittedBytes: int32(info.Committed)} |
| 71 | return |
| 72 | } |
| 73 | return nil, fmt.Errorf("Could not fund chunk: %s", req.ChunkId) |
| 74 | } |
| 75 | |
| Giorgi Lekveishvili | b8f089f | 2020-03-18 23:28:12 +0400 | [diff] [blame] | 76 | func (s *ChunkServer) ReadChunk( |
| 77 | ctx context.Context, |
| Giorgi Lekveishvili | 45b4d52 | 2020-03-19 21:11:18 +0400 | [diff] [blame] | 78 | req *api.ReadChunkRequest) (resp *api.ReadChunkResponse, err error) { |
| 79 | if value, ok := s.chunks.Load(req.ChunkId); ok { |
| 80 | chunk := value.(Chunk) |
| giolekva | 1f6577a | 2020-03-25 12:53:06 +0400 | [diff] [blame] | 81 | b := make([]byte, req.NumBytes) |
| 82 | var n int |
| 83 | n, err = chunk.ReaderAt().ReadAt(b, int64(req.Offset)) |
| 84 | if n == 0 { |
| 85 | return |
| Giorgi Lekveishvili | 45b4d52 | 2020-03-19 21:11:18 +0400 | [diff] [blame] | 86 | } |
| giolekva | 1f6577a | 2020-03-25 12:53:06 +0400 | [diff] [blame] | 87 | return &api.ReadChunkResponse{Data: b[:n]}, nil |
| 88 | |
| 89 | } else { |
| 90 | return nil, fmt.Errorf("Chunk not found: %s", req.ChunkId) |
| Giorgi Lekveishvili | b8f089f | 2020-03-18 23:28:12 +0400 | [diff] [blame] | 91 | } |
| giolekva | 1f6577a | 2020-03-25 12:53:06 +0400 | [diff] [blame] | 92 | } |
| 93 | |
| 94 | func (s *ChunkServer) WriteChunk( |
| 95 | ctx context.Context, |
| 96 | req *api.WriteChunkRequest) (resp *api.WriteChunkResponse, err error) { |
| 97 | if value, ok := s.chunks.Load(req.ChunkId); ok { |
| 98 | chunk := value.(Chunk) |
| 99 | var n int |
| 100 | n, err = chunk.WriterAt().WriteAt(req.Data, int64(req.Offset)) |
| 101 | if n == 0 { |
| 102 | return |
| 103 | } |
| 104 | return &api.WriteChunkResponse{BytesWritten: int32(n)}, nil |
| 105 | |
| 106 | } else { |
| 107 | return nil, fmt.Errorf("Chunk not found: %s", req.ChunkId) |
| 108 | } |
| Giorgi Lekveishvili | b8f089f | 2020-03-18 23:28:12 +0400 | [diff] [blame] | 109 | } |
| 110 | |
| 111 | func (s *ChunkServer) StoreChunk( |
| 112 | ctx context.Context, |
| giolekva | 7be17df | 2020-03-21 13:57:02 +0400 | [diff] [blame] | 113 | req *api.StoreChunkRequest) (resp *api.StoreChunkResponse, err error) { |
| giolekva | 1f6577a | 2020-03-25 12:53:06 +0400 | [diff] [blame] | 114 | chunk := s.factory.New(len(req.Data)) |
| Giorgi Lekveishvili | 45b4d52 | 2020-03-19 21:11:18 +0400 | [diff] [blame] | 115 | s.chunks.Store(req.ChunkId, chunk) |
| giolekva | 1f6577a | 2020-03-25 12:53:06 +0400 | [diff] [blame] | 116 | _, err = chunk.WriterAt().WriteAt(req.Data, 0) |
| giolekva | 7be17df | 2020-03-21 13:57:02 +0400 | [diff] [blame] | 117 | if err == nil { |
| 118 | resp = &api.StoreChunkResponse{} |
| 119 | } |
| 120 | return |
| 121 | } |
| 122 | |
| giolekva | 7be17df | 2020-03-21 13:57:02 +0400 | [diff] [blame] | 123 | func (s *ChunkServer) RemoveChunk( |
| 124 | ctx context.Context, |
| 125 | req *api.RemoveChunkRequest) (resp *api.RemoveChunkResponse, err error) { |
| 126 | if cancel, ok := s.replicatorCancel.Load(req.ChunkId); ok { |
| 127 | cancel.(context.CancelFunc)() |
| 128 | s.replicatorCancel.Delete(req.ChunkId) |
| 129 | } |
| 130 | s.chunks.Delete(req.ChunkId) |
| 131 | return &api.RemoveChunkResponse{}, nil |
| Giorgi Lekveishvili | b8f089f | 2020-03-18 23:28:12 +0400 | [diff] [blame] | 132 | } |