blob: 2ec19b44a2889146326227366fecaa2a4afa6879 [file] [log] [blame]
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +04001package chunk
2
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +04003import "bytes"
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +04004import "context"
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +04005import "io"
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +04006import "sync"
7
8import "pcloud/api"
9
10type ChunkServer struct {
giolekva7be17df2020-03-21 13:57:02 +040011 factory ChunkFactory
12 chunks sync.Map
13 replicatorCancel sync.Map
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040014}
15
16func NewChunkServer() *ChunkServer {
17 return &ChunkServer{}
18}
19
20func (s *ChunkServer) ListChunks(
21 ctx context.Context,
giolekva7be17df2020-03-21 13:57:02 +040022 req *api.ListChunksRequest) (resp *api.ListChunksResponse, err error) {
23 resp = &api.ListChunksResponse{}
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040024 s.chunks.Range(func(k, v interface{}) bool {
25 resp.ChunkId = append(resp.ChunkId, k.(string))
26 return true
27 })
giolekva7be17df2020-03-21 13:57:02 +040028 return
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040029}
30
31func (s *ChunkServer) ReadChunk(
32 ctx context.Context,
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +040033 req *api.ReadChunkRequest) (resp *api.ReadChunkResponse, err error) {
34 if value, ok := s.chunks.Load(req.ChunkId); ok {
35 chunk := value.(Chunk)
36 src := chunk.ReadSeeker()
37 if req.Offset != 0 {
38 _, err = src.Seek(int64(req.Offset), io.SeekStart)
39 if err != nil {
40 return
41 }
42 }
43 var dst bytes.Buffer
44 if req.NumBytes != 0 {
45 _, err = io.CopyN(&dst, src, int64(req.NumBytes))
46 } else {
47 _, err = io.Copy(&dst, src)
48 }
49 if err == nil {
50 resp = &api.ReadChunkResponse{Data: dst.Bytes()}
51 }
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040052 }
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +040053 return
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040054}
55
56func (s *ChunkServer) StoreChunk(
57 ctx context.Context,
giolekva7be17df2020-03-21 13:57:02 +040058 req *api.StoreChunkRequest) (resp *api.StoreChunkResponse, err error) {
59 chunk := s.factory.New()
60 _, err = chunk.Writer().Write(req.Data)
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +040061 s.chunks.Store(req.ChunkId, chunk)
giolekva7be17df2020-03-21 13:57:02 +040062 if err == nil {
63 resp = &api.StoreChunkResponse{}
64 }
65 return
66}
67
68func (s *ChunkServer) ReplicateChunk(
69 ctx context.Context,
70 req *api.ReplicateChunkRequest) (resp *api.ReplicateChunkResponse, err error) {
71 chunk := s.factory.New()
72 s.chunks.Store(req.ChunkId, chunk)
73 ctx, cancel := context.WithCancel(context.Background())
74 s.replicatorCancel.Store(req.ChunkId, cancel)
75 go replicateFromPrimary(ctx, chunk, NonChangingPrimaryReplicaChangeListener{req.ChunkId, req.PrimaryChunkServer})
76 resp = &api.ReplicateChunkResponse{}
77 return
78
79}
80
81func (s *ChunkServer) RemoveChunk(
82 ctx context.Context,
83 req *api.RemoveChunkRequest) (resp *api.RemoveChunkResponse, err error) {
84 if cancel, ok := s.replicatorCancel.Load(req.ChunkId); ok {
85 cancel.(context.CancelFunc)()
86 s.replicatorCancel.Delete(req.ChunkId)
87 }
88 s.chunks.Delete(req.ChunkId)
89 return &api.RemoveChunkResponse{}, nil
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040090}