Skip to content

Commit

Permalink
Revert "meta/redis: simplify transaction in Redis (#1749)"
Browse files Browse the repository at this point in the history
This reverts commit 2456d33.
  • Loading branch information
davies committed Apr 8, 2022
1 parent 2456d33 commit f342a98
Showing 1 changed file with 143 additions and 158 deletions.
301 changes: 143 additions & 158 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -1148,9 +1148,23 @@ func (r *redisMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, m
}

func (r *redisMeta) doUnlink(ctx Context, parent Ino, name string) syscall.Errno {
var _type uint8
var trash, inode Ino
keys := []string{r.entryKey(parent), r.inodeKey(parent)}
buf, err := r.rdb.HGet(ctx, r.entryKey(parent), name).Bytes()
if err == redis.Nil && r.conf.CaseInsensi {
if e := r.resolveCase(ctx, parent, name); e != nil {
name = string(e.Name)
buf = r.packEntry(e.Attr.Typ, e.Inode)
err = nil
}
}
if err != nil {
return errno(err)
}
_type, inode := r.parseEntry(buf)
if _type == TypeDirectory {
return syscall.EPERM
}
keys := []string{r.entryKey(parent), r.inodeKey(parent), r.inodeKey(inode)}
var trash Ino
if st := r.checkTrash(parent, &trash); st != 0 {
return st
}
Expand All @@ -1162,25 +1176,7 @@ func (r *redisMeta) doUnlink(ctx Context, parent Ino, name string) syscall.Errno
var opened bool
var attr Attr
var newSpace, newInode int64
err := r.txn(ctx, func(tx *redis.Tx) error {
buf, err := r.rdb.HGet(ctx, r.entryKey(parent), name).Bytes()
if err == redis.Nil && r.conf.CaseInsensi {
if e := r.resolveCase(ctx, parent, name); e != nil {
name = string(e.Name)
buf = r.packEntry(e.Attr.Typ, e.Inode)
err = nil
}
}
if err != nil {
return err
}
_type, inode = r.parseEntry(buf)
if _type == TypeDirectory {
return syscall.EPERM
}
if err := tx.Watch(ctx, r.inodeKey(inode)).Err(); err != nil {
return err
}
err = r.txn(ctx, func(tx *redis.Tx) error {
rs, _ := tx.MGet(ctx, r.inodeKey(parent), r.inodeKey(inode)).Result()
if rs[0] == nil {
return redis.Nil
Expand Down Expand Up @@ -1216,6 +1212,16 @@ func (r *redisMeta) doUnlink(ctx Context, parent Ino, name string) syscall.Errno
logger.Warnf("no attribute for inode %d (%d, %s)", inode, parent, name)
trash = 0
}

buf, err := tx.HGet(ctx, r.entryKey(parent), name).Bytes()
if err != nil {
return err
}
_type2, inode2 := r.parseEntry(buf)
if _type2 != _type || inode2 != inode {
return syscall.EAGAIN
}

_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.HDel(ctx, r.entryKey(parent), name)
if !isTrash(parent) {
Expand Down Expand Up @@ -1265,71 +1271,65 @@ func (r *redisMeta) doUnlink(ctx Context, parent Ino, name string) syscall.Errno
}

func (r *redisMeta) doRmdir(ctx Context, parent Ino, name string) syscall.Errno {
var typ uint8
var trash, inode Ino
keys := []string{r.inodeKey(parent), r.entryKey(parent)}
buf, err := r.rdb.HGet(ctx, r.entryKey(parent), name).Bytes()
if err == redis.Nil && r.conf.CaseInsensi {
if e := r.resolveCase(ctx, parent, name); e != nil {
name = string(e.Name)
buf = r.packEntry(e.Attr.Typ, e.Inode)
err = nil
}
}
if err != nil {
return errno(err)
}
typ, inode := r.parseEntry(buf)
if typ != TypeDirectory {
return syscall.ENOTDIR
}

keys := []string{r.inodeKey(parent), r.entryKey(parent), r.inodeKey(inode), r.entryKey(inode)}
var trash Ino
if st := r.checkTrash(parent, &trash); st != 0 {
return st
}
if trash > 0 {
keys = append(keys, r.entryKey(trash))
}
err := r.txn(ctx, func(tx *redis.Tx) error {
buf, err := r.rdb.HGet(ctx, r.entryKey(parent), name).Bytes()
if err == redis.Nil && r.conf.CaseInsensi {
if e := r.resolveCase(ctx, parent, name); e != nil {
name = string(e.Name)
buf = r.packEntry(e.Attr.Typ, e.Inode)
err = nil
}
}
if err != nil {
return errno(err)
}
typ, inode = r.parseEntry(buf)
if typ != TypeDirectory {
return syscall.ENOTDIR
}
if err = tx.Watch(ctx, r.inodeKey(inode), r.entryKey(inode)).Err(); err != nil {
return err
}
p := tx.Pipeline()
p.Get(ctx, r.inodeKey(parent))
p.Get(ctx, r.inodeKey(inode))
p.HLen(ctx, r.entryKey(inode))
rs, err := p.Exec(ctx)
if err != nil {
return err
err = r.txn(ctx, func(tx *redis.Tx) error {
rs, _ := tx.MGet(ctx, r.inodeKey(parent), r.inodeKey(inode)).Result()
if rs[0] == nil {
return redis.Nil
}
var pattr, attr Attr
if buf, err := rs[0].(*redis.StringCmd).Bytes(); err != nil {
return err
} else {
r.parseAttr([]byte(buf), &pattr)
if pattr.Typ != TypeDirectory {
return syscall.ENOTDIR
}
r.parseAttr([]byte(rs[0].(string)), &pattr)
if pattr.Typ != TypeDirectory {
return syscall.ENOTDIR
}

now := time.Now()
pattr.Nlink--
pattr.Mtime = now.Unix()
pattr.Mtimensec = uint32(now.Nanosecond())
pattr.Ctime = now.Unix()
pattr.Ctimensec = uint32(now.Nanosecond())

cnt, err := rs[2].(*redis.IntCmd).Result()
buf, err := tx.HGet(ctx, r.entryKey(parent), name).Bytes()
if err != nil {
return err
} else if cnt > 0 {
}
typ, inode = r.parseEntry(buf)
if typ != TypeDirectory {
return syscall.ENOTDIR
}

cnt, err := tx.HLen(ctx, r.entryKey(inode)).Result()
if err != nil {
return err
}
if cnt > 0 {
return syscall.ENOTEMPTY
}
if rs[1] != nil {
if buf, err := rs[1].(*redis.StringCmd).Result(); err != nil {
return err
} else {
r.parseAttr([]byte(buf), &attr)
}
r.parseAttr([]byte(rs[1].(string)), &attr)
if ctx.Uid() != 0 && pattr.Mode&01000 != 0 && ctx.Uid() != pattr.Uid && ctx.Uid() != attr.Uid {
return syscall.EACCES
}
Expand Down Expand Up @@ -1369,118 +1369,93 @@ func (r *redisMeta) doRmdir(ctx Context, parent Ino, name string) syscall.Errno

func (r *redisMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst Ino, nameDst string, flags uint32, inode *Ino, attr *Attr) syscall.Errno {
exchange := flags == RenameExchange
keys := []string{r.entryKey(parentSrc), r.inodeKey(parentSrc), r.entryKey(parentDst), r.inodeKey(parentDst)}
buf, err := r.rdb.HGet(ctx, r.entryKey(parentSrc), nameSrc).Bytes()
if err == redis.Nil && r.conf.CaseInsensi {
if e := r.resolveCase(ctx, parentSrc, nameSrc); e != nil {
nameSrc = string(e.Name)
buf = r.packEntry(e.Attr.Typ, e.Inode)
err = nil
}
}
if err != nil {
return errno(err)
}
typ, ino := r.parseEntry(buf)
if parentSrc == parentDst && nameSrc == nameDst {
if inode != nil {
*inode = ino
}
return 0
}
buf, err = r.rdb.HGet(ctx, r.entryKey(parentDst), nameDst).Bytes()
if err == redis.Nil && r.conf.CaseInsensi {
if e := r.resolveCase(ctx, parentDst, nameDst); e != nil {
nameDst = string(e.Name)
buf = r.packEntry(e.Attr.Typ, e.Inode)
err = nil
}
}
if err != nil && err != redis.Nil {
return errno(err)
}
keys := []string{r.entryKey(parentSrc), r.inodeKey(parentSrc), r.inodeKey(ino), r.entryKey(parentDst), r.inodeKey(parentDst)}
var opened bool
var trash, dino Ino
var dtyp uint8
var tattr Attr
if st := r.checkTrash(parentDst, &trash); st != 0 {
return st
}
if trash > 0 {
keys = append(keys, r.entryKey(trash))
}
var newSpace, newInode int64
err := r.txn(ctx, func(tx *redis.Tx) error {
p := tx.Pipeline()
p.HGet(ctx, r.entryKey(parentSrc), nameSrc)
p.HGet(ctx, r.entryKey(parentDst), nameDst)
rs, err := p.Exec(ctx)
if err != nil && err != redis.Nil {
return err
if err == nil {
if st := r.checkTrash(parentDst, &trash); st != 0 {
return st
}
buf, err := rs[0].(*redis.StringCmd).Bytes()
if err == redis.Nil && r.conf.CaseInsensi {
if e := r.resolveCase(ctx, parentSrc, nameSrc); e != nil {
nameSrc = string(e.Name)
buf = r.packEntry(e.Attr.Typ, e.Inode)
err = nil
}
if trash > 0 {
keys = append(keys, r.entryKey(trash))
}
if err != nil {
return err
dtyp, dino = r.parseEntry(buf)
keys = append(keys, r.inodeKey(dino))
if dtyp == TypeDirectory {
keys = append(keys, r.entryKey(dino))
}
typ, ino := r.parseEntry(buf)
if parentSrc == parentDst && nameSrc == nameDst {
if inode != nil {
*inode = ino
}
return nil
}
var newSpace, newInode int64
err = r.txn(ctx, func(tx *redis.Tx) error {
rs, _ := tx.MGet(ctx, r.inodeKey(parentSrc), r.inodeKey(parentDst), r.inodeKey(ino)).Result()
if rs[0] == nil || rs[1] == nil || rs[2] == nil {
return redis.Nil
}
keys = []string{r.inodeKey(ino)}
dbuf, err := rs[1].(*redis.StringCmd).Bytes()
if err == redis.Nil && r.conf.CaseInsensi {
if e := r.resolveCase(ctx, parentDst, nameDst); e != nil {
nameDst = string(e.Name)
buf = r.packEntry(e.Attr.Typ, e.Inode)
err = nil
}
var sattr, dattr, iattr Attr
r.parseAttr([]byte(rs[0].(string)), &sattr)
if sattr.Typ != TypeDirectory {
return syscall.ENOTDIR
}
r.parseAttr([]byte(rs[1].(string)), &dattr)
if dattr.Typ != TypeDirectory {
return syscall.ENOTDIR
}
r.parseAttr([]byte(rs[2].(string)), &iattr)

dbuf, err := tx.HGet(ctx, r.entryKey(parentDst), nameDst).Bytes()
if err != nil && err != redis.Nil {
return err
}
now := time.Now()
tattr = Attr{}
opened = false
if err == nil {
if flags == RenameNoReplace {
return syscall.EEXIST
}
dtyp, dino = r.parseEntry(dbuf)
keys = append(keys, r.inodeKey(dino))
if dtyp == TypeDirectory {
keys = append(keys, r.entryKey(dino))
}
}
if err := tx.Watch(ctx, keys...).Err(); err != nil {
return err
}
p = tx.Pipeline()
p.Get(ctx, r.inodeKey(parentSrc))
p.Get(ctx, r.inodeKey(parentDst))
p.Get(ctx, r.inodeKey(ino))
if dino > 0 {
p.Get(ctx, r.inodeKey(dino))
if dtyp == TypeDirectory {
p.HLen(ctx, r.entryKey(dino))
}
}
rs, err = p.Exec(ctx)
if err != nil {
return err
}
var sattr, dattr, iattr Attr
if buf, err := rs[0].(*redis.StringCmd).Bytes(); err != nil {
return err
} else {
r.parseAttr(buf, &sattr)
if sattr.Typ != TypeDirectory {
return syscall.ENOTDIR
}
}
if buf, err := rs[1].(*redis.StringCmd).Bytes(); err != nil {
return err
} else {
r.parseAttr(buf, &dattr)
if dattr.Typ != TypeDirectory {
return syscall.ENOTDIR
dtyp1, dino1 := r.parseEntry(dbuf)
if dino1 != dino || dtyp1 != dtyp {
return syscall.EAGAIN
}
}
if buf, err := rs[2].(*redis.StringCmd).Bytes(); err != nil {
return err
} else {
r.parseAttr(buf, &iattr)
}

now := time.Now()
tattr = Attr{}
opened = false
if dino > 0 {
if a, err := rs[3].(*redis.StringCmd).Bytes(); err == redis.Nil {
a, err := tx.Get(ctx, r.inodeKey(dino)).Bytes()
if err == redis.Nil {
logger.Warnf("no attribute for inode %d (%d, %s)", dino, parentDst, nameDst)
trash = 0
} else if err != nil {
return err
} else {
r.parseAttr(a, &tattr)
}
r.parseAttr(a, &tattr)
tattr.Ctime = now.Unix()
tattr.Ctimensec = uint32(now.Nanosecond())
if exchange {
Expand All @@ -1491,9 +1466,11 @@ func (r *redisMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentD
}
} else {
if dtyp == TypeDirectory {
if cnt, err := rs[4].(*redis.IntCmd).Result(); err != nil {
cnt, err := tx.HLen(ctx, r.entryKey(dino)).Result()
if err != nil {
return err
} else if cnt != 0 {
}
if cnt != 0 {
return syscall.ENOTEMPTY
}
dattr.Nlink--
Expand Down Expand Up @@ -1521,6 +1498,14 @@ func (r *redisMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentD
}
dino, dtyp = 0, 0
}
buf, err := tx.HGet(ctx, r.entryKey(parentSrc), nameSrc).Bytes()
if err != nil {
return err
}
typ1, ino1 := r.parseEntry(buf)
if ino1 != ino || typ1 != typ {
return syscall.EAGAIN
}
if ctx.Uid() != 0 && sattr.Mode&01000 != 0 && ctx.Uid() != sattr.Uid && ctx.Uid() != iattr.Uid {
return syscall.EACCES
}
Expand Down

0 comments on commit f342a98

Please sign in to comment.