blob: d619f131d7cfaec0b5e81191fef4fa6e856826bf [file] [log] [blame]
giolekva892a4e22020-04-27 16:46:22 +04001package chunk
2
3import (
4 "context"
5 "fmt"
6 "log"
7
8 "sync"
9
10 "github.com/giolekva/pcloud/pfs/api"
11)
12
13type ChunkServer struct {
14 factory ChunkFactory
15 assignmentChangeLis ReplicaAssignmentChangeListener
16 chunks sync.Map
17 replicatorCancel sync.Map
18}
19
20func NewChunkServer(factory ChunkFactory,
21 assignmentChangeLis ReplicaAssignmentChangeListener) *ChunkServer {
22 return &ChunkServer{
23 factory: factory,
24 assignmentChangeLis: assignmentChangeLis}
25}
26
27func (s *ChunkServer) ListChunks(
28 ctx context.Context,
29 req *api.ListChunksRequest) (resp *api.ListChunksResponse, err error) {
30 resp = &api.ListChunksResponse{}
31 s.chunks.Range(func(k, v interface{}) bool {
32 resp.ChunkId = append(resp.ChunkId, k.(string))
33 return true
34 })
35 return
36}
37
38func (s *ChunkServer) CreateChunk(
39 ctx context.Context,
40 req *api.CreateChunkRequest) (resp *api.CreateChunkResponse, err error) {
41 chunk := s.factory.New(int(req.Size))
42 s.chunks.Store(req.ChunkId, chunk)
43 switch req.Role {
44 case api.ReplicaRole_SECONDARY:
45 ctx, cancel := context.WithCancel(context.Background())
46 s.replicatorCancel.Store(req.ChunkId, cancel)
47 primaryAddressCh := s.assignmentChangeLis.Primary(
48 req.ChunkId, req.PrimaryAddress)
49 go ReplicateFromPrimary(ctx, req.ChunkId, chunk, primaryAddressCh)
50 case api.ReplicaRole_PRIMARY:
51 {
52 }
53 }
54 resp = &api.CreateChunkResponse{}
55 log.Printf("Created chunk: %s\n", req.ChunkId)
56 return
57
58}
59
60func (s *ChunkServer) GetChunkStatus(
61 ctx context.Context,
62 req *api.GetChunkStatusRequest) (resp *api.GetChunkStatusResponse, err error) {
63 if chunk, ok := s.chunks.Load(req.ChunkId); ok {
64 c := chunk.(Chunk)
65 var info ChunkInfo
66 info, err = c.Stats()
67 if err != nil {
68 return
69 }
70 resp = &api.GetChunkStatusResponse{
71 Status: info.Status,
72 TotalBytes: int32(info.Size),
73 CommittedBytes: int32(info.Committed)}
74 return
75 }
76 return nil, fmt.Errorf("Could not fund chunk: %s", req.ChunkId)
77}
78
79func (s *ChunkServer) ReadChunk(
80 ctx context.Context,
81 req *api.ReadChunkRequest) (resp *api.ReadChunkResponse, err error) {
82 if value, ok := s.chunks.Load(req.ChunkId); ok {
83 chunk := value.(Chunk)
84 b := make([]byte, req.NumBytes)
85 var n int
86 n, err = chunk.ReaderAt().ReadAt(b, int64(req.Offset))
87 if n == 0 {
88 return
89 }
90 return &api.ReadChunkResponse{Data: b[:n]}, nil
91
92 } else {
93 return nil, fmt.Errorf("Chunk not found: %s", req.ChunkId)
94 }
95}
96
97func (s *ChunkServer) WriteChunk(
98 ctx context.Context,
99 req *api.WriteChunkRequest) (resp *api.WriteChunkResponse, err error) {
100 if value, ok := s.chunks.Load(req.ChunkId); ok {
101 chunk := value.(Chunk)
102 var n int
103 n, err = chunk.WriterAt().WriteAt(req.Data, int64(req.Offset))
104 if n == 0 {
105 return
106 }
107 return &api.WriteChunkResponse{BytesWritten: int32(n)}, nil
108
109 } else {
110 return nil, fmt.Errorf("Chunk not found: %s", req.ChunkId)
111 }
112}
113
114func (s *ChunkServer) RemoveChunk(
115 ctx context.Context,
116 req *api.RemoveChunkRequest) (resp *api.RemoveChunkResponse, err error) {
117 if cancel, ok := s.replicatorCancel.Load(req.ChunkId); ok {
118 cancel.(context.CancelFunc)()
119 s.replicatorCancel.Delete(req.ChunkId)
120 }
121 s.chunks.Delete(req.ChunkId)
122 return &api.RemoveChunkResponse{}, nil
123}