Skip to content

Commit 094765e

Browse files
committed
feat(chunker): add storage driver
1 parent 6bde813 commit 094765e

7 files changed

Lines changed: 1187 additions & 3 deletions

File tree

drivers/all.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
_ "github.com/alist-org/alist/v3/drivers/baidu_share"
2424
_ "github.com/alist-org/alist/v3/drivers/bitqiu"
2525
_ "github.com/alist-org/alist/v3/drivers/chaoxing"
26+
_ "github.com/alist-org/alist/v3/drivers/chunker"
2627
_ "github.com/alist-org/alist/v3/drivers/cloudreve"
2728
_ "github.com/alist-org/alist/v3/drivers/cloudreve_v4"
2829
_ "github.com/alist-org/alist/v3/drivers/crypt"

drivers/chunker/driver.go

Lines changed: 381 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,381 @@
1+
package chunker
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"crypto/md5"
7+
"crypto/sha1"
8+
"encoding/hex"
9+
"fmt"
10+
"hash"
11+
"io"
12+
"path"
13+
"strconv"
14+
"time"
15+
16+
"github.com/alist-org/alist/v3/internal/driver"
17+
"github.com/alist-org/alist/v3/internal/errs"
18+
"github.com/alist-org/alist/v3/internal/fs"
19+
"github.com/alist-org/alist/v3/internal/model"
20+
"github.com/alist-org/alist/v3/internal/op"
21+
"github.com/alist-org/alist/v3/internal/stream"
22+
"github.com/alist-org/alist/v3/pkg/http_range"
23+
"github.com/alist-org/alist/v3/pkg/utils"
24+
)
25+
26+
func (d *Chunker) Config() driver.Config {
27+
return config
28+
}
29+
30+
func (d *Chunker) GetAddition() driver.Additional {
31+
return &d.Addition
32+
}
33+
34+
func (d *Chunker) Init(ctx context.Context) error {
35+
if d.ChunkSize == 0 {
36+
d.ChunkSize = defaultChunkSize
37+
}
38+
if d.StartFrom == 0 {
39+
d.StartFrom = defaultStartFrom
40+
}
41+
d.NameFormat = utils.GetNoneEmpty(d.NameFormat, defaultChunkNameFmt)
42+
d.MetaFormat = utils.GetNoneEmpty(d.MetaFormat, defaultMetaFormat)
43+
d.HashType = utils.GetNoneEmpty(d.HashType, defaultHashType)
44+
45+
if err := d.setChunkNameFormat(d.NameFormat); err != nil {
46+
return fmt.Errorf("invalid name_format: %w", err)
47+
}
48+
if err := d.validateOptions(); err != nil {
49+
return err
50+
}
51+
52+
storage, err := fs.GetStorage(d.RemotePath, &fs.GetStoragesArgs{})
53+
if err != nil {
54+
return fmt.Errorf("can't find remote storage: %w", err)
55+
}
56+
d.remoteStorage = storage
57+
return nil
58+
}
59+
60+
func (d *Chunker) Drop(ctx context.Context) error {
61+
d.remoteStorage = nil
62+
return nil
63+
}
64+
65+
func (d *Chunker) List(ctx context.Context, dir model.Obj, args model.ListArgs) ([]model.Obj, error) {
66+
return d.listDirObjects(ctx, dir.GetPath(), args.Refresh)
67+
}
68+
69+
func (d *Chunker) Get(ctx context.Context, pathStr string) (model.Obj, error) {
70+
if utils.PathEqual(pathStr, "/") {
71+
return &model.Object{
72+
Name: "Root",
73+
Path: "/",
74+
IsFolder: true,
75+
}, nil
76+
}
77+
parent, name := path.Split(utils.FixAndCleanPath(pathStr))
78+
if parent == "" {
79+
parent = "/"
80+
}
81+
objs, err := d.listDirObjects(ctx, parent, false)
82+
if err != nil {
83+
return nil, err
84+
}
85+
for _, obj := range objs {
86+
if obj.GetName() == name {
87+
return obj, nil
88+
}
89+
}
90+
return nil, errs.ObjectNotFound
91+
}
92+
93+
func (d *Chunker) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) {
94+
obj := d.linkedObject(file)
95+
if obj == nil || !obj.Chunked {
96+
actualPath, err := d.getActualPathForRemote(file.GetPath())
97+
if err != nil {
98+
return nil, fmt.Errorf("failed to convert path to remote path: %w", err)
99+
}
100+
link, _, err := op.Link(ctx, d.remoteStorage, actualPath, args)
101+
return link, err
102+
}
103+
104+
linkedParts := make([]linkedPart, 0, len(obj.Parts))
105+
baseClosers := utils.EmptyClosers()
106+
for _, part := range obj.Parts {
107+
actualPath, err := d.getActualChunkPath(obj.GetPath(), part.No, part.XactID)
108+
if err != nil {
109+
return nil, fmt.Errorf("failed to convert chunk path: %w", err)
110+
}
111+
link, _, err := op.Link(ctx, d.remoteStorage, actualPath, args)
112+
if err != nil {
113+
return nil, err
114+
}
115+
if link.MFile != nil {
116+
baseClosers.Add(link.MFile)
117+
}
118+
if link.RangeReadCloser != nil {
119+
baseClosers.Add(link.RangeReadCloser)
120+
}
121+
linkedParts = append(linkedParts, linkedPart{
122+
part: part,
123+
link: link,
124+
})
125+
}
126+
127+
return &model.Link{
128+
RangeReadCloser: &model.RangeReadCloser{
129+
RangeReader: func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
130+
return d.openChunkReader(ctx, linkedParts, obj.GetSize(), httpRange)
131+
},
132+
Closers: baseClosers,
133+
},
134+
}, nil
135+
}
136+
137+
func (d *Chunker) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) error {
138+
dstDirActualPath, err := d.getActualPathForRemote(parentDir.GetPath())
139+
if err != nil {
140+
return fmt.Errorf("failed to convert path to remote path: %w", err)
141+
}
142+
return op.MakeDir(ctx, d.remoteStorage, path.Join(dstDirActualPath, dirName))
143+
}
144+
145+
func (d *Chunker) Move(ctx context.Context, srcObj, dstDir model.Obj) error {
146+
obj := d.linkedObject(srcObj)
147+
if srcObj.IsDir() || obj == nil || !obj.Chunked {
148+
srcRemoteActualPath, err := d.getActualPathForRemote(srcObj.GetPath())
149+
if err != nil {
150+
return fmt.Errorf("failed to convert path to remote path: %w", err)
151+
}
152+
dstRemoteActualPath, err := d.getActualPathForRemote(dstDir.GetPath())
153+
if err != nil {
154+
return fmt.Errorf("failed to convert path to remote path: %w", err)
155+
}
156+
return op.Move(ctx, d.remoteStorage, srcRemoteActualPath, dstRemoteActualPath)
157+
}
158+
159+
dstRemoteActualPath, err := d.getActualPathForRemote(dstDir.GetPath())
160+
if err != nil {
161+
return fmt.Errorf("failed to convert path to remote path: %w", err)
162+
}
163+
for _, logicalPath := range d.chunkPathsForObject(obj) {
164+
actualPath, err := d.getActualPathForRemote(logicalPath)
165+
if err != nil {
166+
return err
167+
}
168+
if err := op.Move(ctx, d.remoteStorage, actualPath, dstRemoteActualPath); err != nil {
169+
return err
170+
}
171+
}
172+
return nil
173+
}
174+
175+
func (d *Chunker) Rename(ctx context.Context, srcObj model.Obj, newName string) error {
176+
obj := d.linkedObject(srcObj)
177+
if srcObj.IsDir() || obj == nil || !obj.Chunked {
178+
remoteActualPath, err := d.getActualPathForRemote(srcObj.GetPath())
179+
if err != nil {
180+
return fmt.Errorf("failed to convert path to remote path: %w", err)
181+
}
182+
return op.Rename(ctx, d.remoteStorage, remoteActualPath, newName)
183+
}
184+
185+
for _, part := range obj.Parts {
186+
actualPath, err := d.getActualChunkPath(obj.GetPath(), part.No, part.XactID)
187+
if err != nil {
188+
return err
189+
}
190+
newChunkName := d.chunkPartBaseName(path.Join(path.Dir(obj.GetPath()), newName), part.No, part.XactID)
191+
if err := op.Rename(ctx, d.remoteStorage, actualPath, newChunkName); err != nil {
192+
return err
193+
}
194+
}
195+
if obj.UsesMeta {
196+
actualPath, err := d.getActualPathForRemote(obj.GetPath())
197+
if err != nil {
198+
return err
199+
}
200+
if err := op.Rename(ctx, d.remoteStorage, actualPath, newName); err != nil {
201+
return err
202+
}
203+
}
204+
return nil
205+
}
206+
207+
func (d *Chunker) Copy(ctx context.Context, srcObj, dstDir model.Obj) error {
208+
obj := d.linkedObject(srcObj)
209+
if srcObj.IsDir() || obj == nil || !obj.Chunked {
210+
srcRemoteActualPath, err := d.getActualPathForRemote(srcObj.GetPath())
211+
if err != nil {
212+
return fmt.Errorf("failed to convert path to remote path: %w", err)
213+
}
214+
dstRemoteActualPath, err := d.getActualPathForRemote(dstDir.GetPath())
215+
if err != nil {
216+
return fmt.Errorf("failed to convert path to remote path: %w", err)
217+
}
218+
return op.Copy(ctx, d.remoteStorage, srcRemoteActualPath, dstRemoteActualPath)
219+
}
220+
221+
dstRemoteActualPath, err := d.getActualPathForRemote(dstDir.GetPath())
222+
if err != nil {
223+
return fmt.Errorf("failed to convert path to remote path: %w", err)
224+
}
225+
for _, logicalPath := range d.chunkPathsForObject(obj) {
226+
actualPath, err := d.getActualPathForRemote(logicalPath)
227+
if err != nil {
228+
return err
229+
}
230+
if err := op.Copy(ctx, d.remoteStorage, actualPath, dstRemoteActualPath); err != nil {
231+
return err
232+
}
233+
}
234+
return nil
235+
}
236+
237+
func (d *Chunker) Remove(ctx context.Context, obj model.Obj) error {
238+
chunkedObj := d.linkedObject(obj)
239+
if obj.IsDir() || chunkedObj == nil || !chunkedObj.Chunked {
240+
remoteActualPath, err := d.getActualPathForRemote(obj.GetPath())
241+
if err != nil {
242+
return fmt.Errorf("failed to convert path to remote path: %w", err)
243+
}
244+
return op.Remove(ctx, d.remoteStorage, remoteActualPath)
245+
}
246+
247+
for _, logicalPath := range d.chunkPathsForObject(chunkedObj) {
248+
actualPath, err := d.getActualPathForRemote(logicalPath)
249+
if err != nil {
250+
return err
251+
}
252+
if err := op.Remove(ctx, d.remoteStorage, actualPath); err != nil {
253+
return err
254+
}
255+
}
256+
return nil
257+
}
258+
259+
func (d *Chunker) Put(ctx context.Context, dstDir model.Obj, streamer model.FileStreamer, up driver.UpdateProgress) error {
260+
dstDirActualPath, err := d.getActualPathForRemote(dstDir.GetPath())
261+
if err != nil {
262+
return fmt.Errorf("failed to convert path to remote path: %w", err)
263+
}
264+
265+
existing := d.linkedObject(streamer.GetExist())
266+
logicalPath := path.Join(dstDir.GetPath(), streamer.GetName())
267+
if streamer.GetSize() <= d.ChunkSize {
268+
if err := op.Put(ctx, d.remoteStorage, dstDirActualPath, streamer, up, false); err != nil {
269+
return err
270+
}
271+
return d.cleanupReplacedObject(ctx, existing, d.buildKeepSet(logicalPath))
272+
}
273+
274+
if up == nil {
275+
up = func(float64) {}
276+
}
277+
278+
var (
279+
md5Hasher hash.Hash
280+
sha1Hasher hash.Hash
281+
writers []io.Writer
282+
)
283+
switch d.HashType {
284+
case "md5":
285+
md5Hasher = md5.New()
286+
writers = append(writers, md5Hasher)
287+
case "sha1":
288+
sha1Hasher = sha1.New()
289+
writers = append(writers, sha1Hasher)
290+
}
291+
writers = append(writers, driver.NewProgress(streamer.GetSize(), up))
292+
293+
baseReader := io.TeeReader(streamer, io.MultiWriter(writers...))
294+
xactID := strconv.FormatInt(time.Now().UnixNano(), 36)
295+
if len(xactID) > 9 {
296+
xactID = xactID[len(xactID)-9:]
297+
}
298+
if len(xactID) < 4 {
299+
xactID = fmt.Sprintf("%04s", xactID)
300+
}
301+
302+
chunkCount := 0
303+
remaining := streamer.GetSize()
304+
keepPaths := []string{logicalPath}
305+
for remaining > 0 {
306+
chunkLen := utils.Min(remaining, d.ChunkSize)
307+
chunkName := d.chunkPartBaseName(logicalPath, chunkCount, xactIDIfNeeded(d.MetaFormat, xactID))
308+
chunkPath := d.makeChunkName(logicalPath, chunkCount, xactIDIfNeeded(d.MetaFormat, xactID))
309+
partReader := driver.NewLimitedUploadStream(ctx, &driver.ReaderWithCtx{
310+
Reader: io.LimitReader(baseReader, chunkLen),
311+
Ctx: ctx,
312+
})
313+
partStream := &stream.FileStream{
314+
Obj: &model.Object{
315+
Name: chunkName,
316+
Size: chunkLen,
317+
Modified: streamer.ModTime(),
318+
Ctime: streamer.CreateTime(),
319+
IsFolder: false,
320+
},
321+
Reader: partReader,
322+
Mimetype: "application/octet-stream",
323+
WebPutAsTask: streamer.NeedStore(),
324+
ForceStreamUpload: true,
325+
}
326+
if err := op.Put(ctx, d.remoteStorage, dstDirActualPath, partStream, nil, false); err != nil {
327+
return err
328+
}
329+
keepPaths = append(keepPaths, chunkPath)
330+
remaining -= chunkLen
331+
chunkCount++
332+
}
333+
334+
if d.MetaFormat == "simplejson" {
335+
md5Value := ""
336+
if md5Hasher != nil {
337+
md5Value = hex.EncodeToString(md5Hasher.Sum(nil))
338+
}
339+
sha1Value := ""
340+
if sha1Hasher != nil {
341+
sha1Value = hex.EncodeToString(sha1Hasher.Sum(nil))
342+
}
343+
txn := xactID
344+
metaData, err := marshalMetadata(streamer.GetSize(), chunkCount, md5Value, sha1Value, txn)
345+
if err != nil {
346+
return err
347+
}
348+
metaStream := &stream.FileStream{
349+
Obj: &model.Object{
350+
Name: streamer.GetName(),
351+
Size: int64(len(metaData)),
352+
Modified: streamer.ModTime(),
353+
Ctime: streamer.CreateTime(),
354+
IsFolder: false,
355+
},
356+
Reader: bytes.NewReader(metaData),
357+
Mimetype: "application/json",
358+
WebPutAsTask: false,
359+
ForceStreamUpload: true,
360+
}
361+
if err := op.Put(ctx, d.remoteStorage, dstDirActualPath, metaStream, nil, false); err != nil {
362+
return err
363+
}
364+
} else {
365+
actualPath, err := d.getActualPathForRemote(logicalPath)
366+
if err == nil {
367+
_ = op.Remove(ctx, d.remoteStorage, actualPath)
368+
}
369+
}
370+
371+
return d.cleanupReplacedObject(ctx, existing, d.buildKeepSet(keepPaths...))
372+
}
373+
374+
func xactIDIfNeeded(metaFormat, xactID string) string {
375+
if metaFormat == "simplejson" {
376+
return xactID
377+
}
378+
return ""
379+
}
380+
381+
var _ driver.Driver = (*Chunker)(nil)

0 commit comments

Comments
 (0)