blob: c26250938992b1de5763718c4d08b4aba9bb7c21 [file] [log] [blame]
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +04001package chunk
2
giolekvac5126d92020-03-21 16:39:56 +04003import (
giolekvac5126d92020-03-21 16:39:56 +04004 "context"
giolekva1f6577a2020-03-25 12:53:06 +04005 "fmt"
6 "log"
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +04007
giolekvac5126d92020-03-21 16:39:56 +04008 "sync"
9
giolekvad2a029d2020-03-25 23:06:08 +040010 "github.com/giolekva/pcloud/api"
giolekvac5126d92020-03-21 16:39:56 +040011)
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040012
13type ChunkServer struct {
giolekva7be17df2020-03-21 13:57:02 +040014 factory ChunkFactory
15 chunks sync.Map
16 replicatorCancel sync.Map
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040017}
18
giolekva1f6577a2020-03-25 12:53:06 +040019func NewChunkServer(factory ChunkFactory) *ChunkServer {
20 return &ChunkServer{factory: factory}
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040021}
22
23func (s *ChunkServer) ListChunks(
24 ctx context.Context,
giolekva7be17df2020-03-21 13:57:02 +040025 req *api.ListChunksRequest) (resp *api.ListChunksResponse, err error) {
26 resp = &api.ListChunksResponse{}
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040027 s.chunks.Range(func(k, v interface{}) bool {
28 resp.ChunkId = append(resp.ChunkId, k.(string))
29 return true
30 })
giolekva7be17df2020-03-21 13:57:02 +040031 return
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040032}
33
giolekva1f6577a2020-03-25 12:53:06 +040034func (s *ChunkServer) CreateChunk(
35 ctx context.Context,
36 req *api.CreateChunkRequest) (resp *api.CreateChunkResponse, err error) {
37 chunk := s.factory.New(int(req.Size))
38 s.chunks.Store(req.ChunkId, chunk)
39 switch req.Role {
40 case api.ReplicaRole_SECONDARY:
41 ctx, cancel := context.WithCancel(context.Background())
42 s.replicatorCancel.Store(req.ChunkId, cancel)
43 primaryListener := NewNonChangingPrimaryReplicaChangeListener(
44 req.ChunkId,
45 req.PrimaryAddress)
46 go ReplicateFromPrimary(ctx, chunk, primaryListener)
47 case api.ReplicaRole_PRIMARY:
48 {
49 }
50 }
51 resp = &api.CreateChunkResponse{}
52 log.Printf("Created chunk: %s\n", req.ChunkId)
53 return
54
55}
56
57func (s *ChunkServer) GetChunkStatus(
58 ctx context.Context,
59 req *api.GetChunkStatusRequest) (resp *api.GetChunkStatusResponse, err error) {
60 if chunk, ok := s.chunks.Load(req.ChunkId); ok {
61 c := chunk.(Chunk)
62 var info ChunkInfo
63 info, err = c.Stats()
64 if err != nil {
65 return
66 }
67 resp = &api.GetChunkStatusResponse{
68 Status: info.Status,
69 TotalBytes: int32(info.Size),
70 CommittedBytes: int32(info.Committed)}
71 return
72 }
73 return nil, fmt.Errorf("Could not fund chunk: %s", req.ChunkId)
74}
75
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040076func (s *ChunkServer) ReadChunk(
77 ctx context.Context,
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +040078 req *api.ReadChunkRequest) (resp *api.ReadChunkResponse, err error) {
79 if value, ok := s.chunks.Load(req.ChunkId); ok {
80 chunk := value.(Chunk)
giolekva1f6577a2020-03-25 12:53:06 +040081 b := make([]byte, req.NumBytes)
82 var n int
83 n, err = chunk.ReaderAt().ReadAt(b, int64(req.Offset))
84 if n == 0 {
85 return
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +040086 }
giolekva1f6577a2020-03-25 12:53:06 +040087 return &api.ReadChunkResponse{Data: b[:n]}, nil
88
89 } else {
90 return nil, fmt.Errorf("Chunk not found: %s", req.ChunkId)
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040091 }
giolekva1f6577a2020-03-25 12:53:06 +040092}
93
94func (s *ChunkServer) WriteChunk(
95 ctx context.Context,
96 req *api.WriteChunkRequest) (resp *api.WriteChunkResponse, err error) {
97 if value, ok := s.chunks.Load(req.ChunkId); ok {
98 chunk := value.(Chunk)
99 var n int
100 n, err = chunk.WriterAt().WriteAt(req.Data, int64(req.Offset))
101 if n == 0 {
102 return
103 }
104 return &api.WriteChunkResponse{BytesWritten: int32(n)}, nil
105
106 } else {
107 return nil, fmt.Errorf("Chunk not found: %s", req.ChunkId)
108 }
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +0400109}
110
giolekva7be17df2020-03-21 13:57:02 +0400111func (s *ChunkServer) RemoveChunk(
112 ctx context.Context,
113 req *api.RemoveChunkRequest) (resp *api.RemoveChunkResponse, err error) {
114 if cancel, ok := s.replicatorCancel.Load(req.ChunkId); ok {
115 cancel.(context.CancelFunc)()
116 s.replicatorCancel.Delete(req.ChunkId)
117 }
118 s.chunks.Delete(req.ChunkId)
119 return &api.RemoveChunkResponse{}, nil
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +0400120}