blob: 0566c16aaed9638ab7ea1431c23de976c8bc13f3 [file] [log] [blame]
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +04001package chunk
2
giolekvac5126d92020-03-21 16:39:56 +04003import (
giolekvac5126d92020-03-21 16:39:56 +04004 "context"
giolekva1f6577a2020-03-25 12:53:06 +04005 "fmt"
6 "log"
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +04007
giolekvac5126d92020-03-21 16:39:56 +04008 "sync"
9
10 "pcloud/api"
11)
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040012
13type ChunkServer struct {
giolekva7be17df2020-03-21 13:57:02 +040014 factory ChunkFactory
15 chunks sync.Map
16 replicatorCancel sync.Map
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040017}
18
giolekva1f6577a2020-03-25 12:53:06 +040019func NewChunkServer(factory ChunkFactory) *ChunkServer {
20 return &ChunkServer{factory: factory}
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040021}
22
23func (s *ChunkServer) ListChunks(
24 ctx context.Context,
giolekva7be17df2020-03-21 13:57:02 +040025 req *api.ListChunksRequest) (resp *api.ListChunksResponse, err error) {
26 resp = &api.ListChunksResponse{}
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040027 s.chunks.Range(func(k, v interface{}) bool {
28 resp.ChunkId = append(resp.ChunkId, k.(string))
29 return true
30 })
giolekva7be17df2020-03-21 13:57:02 +040031 return
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040032}
33
giolekva1f6577a2020-03-25 12:53:06 +040034func (s *ChunkServer) CreateChunk(
35 ctx context.Context,
36 req *api.CreateChunkRequest) (resp *api.CreateChunkResponse, err error) {
37 chunk := s.factory.New(int(req.Size))
38 s.chunks.Store(req.ChunkId, chunk)
39 switch req.Role {
40 case api.ReplicaRole_SECONDARY:
41 ctx, cancel := context.WithCancel(context.Background())
42 s.replicatorCancel.Store(req.ChunkId, cancel)
43 primaryListener := NewNonChangingPrimaryReplicaChangeListener(
44 req.ChunkId,
45 req.PrimaryAddress)
46 go ReplicateFromPrimary(ctx, chunk, primaryListener)
47 case api.ReplicaRole_PRIMARY:
48 {
49 }
50 }
51 resp = &api.CreateChunkResponse{}
52 log.Printf("Created chunk: %s\n", req.ChunkId)
53 return
54
55}
56
57func (s *ChunkServer) GetChunkStatus(
58 ctx context.Context,
59 req *api.GetChunkStatusRequest) (resp *api.GetChunkStatusResponse, err error) {
60 if chunk, ok := s.chunks.Load(req.ChunkId); ok {
61 c := chunk.(Chunk)
62 var info ChunkInfo
63 info, err = c.Stats()
64 if err != nil {
65 return
66 }
67 resp = &api.GetChunkStatusResponse{
68 Status: info.Status,
69 TotalBytes: int32(info.Size),
70 CommittedBytes: int32(info.Committed)}
71 return
72 }
73 return nil, fmt.Errorf("Could not fund chunk: %s", req.ChunkId)
74}
75
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040076func (s *ChunkServer) ReadChunk(
77 ctx context.Context,
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +040078 req *api.ReadChunkRequest) (resp *api.ReadChunkResponse, err error) {
79 if value, ok := s.chunks.Load(req.ChunkId); ok {
80 chunk := value.(Chunk)
giolekva1f6577a2020-03-25 12:53:06 +040081 b := make([]byte, req.NumBytes)
82 var n int
83 n, err = chunk.ReaderAt().ReadAt(b, int64(req.Offset))
84 if n == 0 {
85 return
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +040086 }
giolekva1f6577a2020-03-25 12:53:06 +040087 return &api.ReadChunkResponse{Data: b[:n]}, nil
88
89 } else {
90 return nil, fmt.Errorf("Chunk not found: %s", req.ChunkId)
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040091 }
giolekva1f6577a2020-03-25 12:53:06 +040092}
93
94func (s *ChunkServer) WriteChunk(
95 ctx context.Context,
96 req *api.WriteChunkRequest) (resp *api.WriteChunkResponse, err error) {
97 if value, ok := s.chunks.Load(req.ChunkId); ok {
98 chunk := value.(Chunk)
99 var n int
100 n, err = chunk.WriterAt().WriteAt(req.Data, int64(req.Offset))
101 if n == 0 {
102 return
103 }
104 return &api.WriteChunkResponse{BytesWritten: int32(n)}, nil
105
106 } else {
107 return nil, fmt.Errorf("Chunk not found: %s", req.ChunkId)
108 }
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +0400109}
110
111func (s *ChunkServer) StoreChunk(
112 ctx context.Context,
giolekva7be17df2020-03-21 13:57:02 +0400113 req *api.StoreChunkRequest) (resp *api.StoreChunkResponse, err error) {
giolekva1f6577a2020-03-25 12:53:06 +0400114 chunk := s.factory.New(len(req.Data))
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +0400115 s.chunks.Store(req.ChunkId, chunk)
giolekva1f6577a2020-03-25 12:53:06 +0400116 _, err = chunk.WriterAt().WriteAt(req.Data, 0)
giolekva7be17df2020-03-21 13:57:02 +0400117 if err == nil {
118 resp = &api.StoreChunkResponse{}
119 }
120 return
121}
122
giolekva7be17df2020-03-21 13:57:02 +0400123func (s *ChunkServer) RemoveChunk(
124 ctx context.Context,
125 req *api.RemoveChunkRequest) (resp *api.RemoveChunkResponse, err error) {
126 if cancel, ok := s.replicatorCancel.Load(req.ChunkId); ok {
127 cancel.(context.CancelFunc)()
128 s.replicatorCancel.Delete(req.ChunkId)
129 }
130 s.chunks.Delete(req.ChunkId)
131 return &api.RemoveChunkResponse{}, nil
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +0400132}