Skip to content

Commit

Permalink
fix: explicit single dir vs replica tree
Browse files Browse the repository at this point in the history
  • Loading branch information
Robin Bryce committed Aug 29, 2024
1 parent 8470e13 commit 87a0601
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 9 deletions.
2 changes: 1 addition & 1 deletion massifs/localmassifreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (r *LocalReader) GetSeal(
// short circuit direct match, regardless of mode, to support explicit paths
dirEntry, ok := r.cache.GetEntry(directory)
if ok {
return copyCachedSealOrErr(dirEntry.ReadSeal(r.cache, tenantIdentityOrLocalPath))
return copyCachedSealOrErr(dirEntry.GetSeal(r.cache, massifIndex))
}

return copyCachedSealOrErr(r.cache.ReadSeal(directory, massifIndex))
Expand Down
37 changes: 29 additions & 8 deletions massifs/logdircache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var (
ErrAddSealExists = errors.New("attempt to add a sealed stated entry that already exists")
ErrLogMassifHeightNotProvided = errors.New("a consistent massif height must be specified for all files logically part of the same log")
ErrNeedTenantIdentityOrExistingFilepath = errors.New("a tenant identity or an existing file path must be provided")
ErrDirModeExclusiveWithReplicaMode = errors.New("operation on an arbitrary, single, directory is mutualy exclusive with replica mode")
)

type DirLister interface {
Expand Down Expand Up @@ -61,6 +62,12 @@ type DirCacheOptions struct {
// readers which operate on a local replica of multiple tenant logs specify the root of the replica using this option
// The paths under this location match the path schema used by datatrails for the cloud storage of tenant logs
replicaDir string

// If operating in directory mode, this is the identity of the tenant
// expected in the directory. The MassifContext instances are filled in with
// this value for this case, as the identity isn't otherwise known.
tenantIdentity string
tenantDirMode bool
}

// NewLogDirCacheOptions creates a new DirCacheOptions object with the provided options
Expand Down Expand Up @@ -95,12 +102,22 @@ func WithReaderOption(o ReaderOption) DirCacheOption {
// WithDirCacheReplicaDir specifies a directory under which a local filesystem
// replica of one or more tenant logs is maintained. The filesystem structure
// matches the remote log path structure
// NOTE: THis option is mutually exclusive with WithDirForTenant
func WithDirCacheReplicaDir(replicaDir string) DirCacheOption {
return func(o *DirCacheOptions) {
o.replicaDir = replicaDir
}
}

// WithDirCacheTenant sets the cache to operate in single directory mode.
func WithDirCacheTenant(tenantIdentity string) DirCacheOption {
return func(o *DirCacheOptions) {
o.tenantIdentity = tenantIdentity
// allow tenantDirMode in the case where the tenant identity is completely unknown
o.tenantDirMode = true
}
}

func WithDirCacheMassifLister(dirLister DirLister) DirCacheOption {
return func(o *DirCacheOptions) {
o.massifDirLister = dirLister
Expand Down Expand Up @@ -161,6 +178,9 @@ func NewLogDirCache(log logger.Logger, opener Opener, opts ...DirCacheOption) (*
for _, o := range opts {
o(&c.opts)
}
if c.opts.replicaDir != "" && c.opts.tenantDirMode {
return nil, ErrDirModeExclusiveWithReplicaMode
}
return c, nil
}

Expand Down Expand Up @@ -317,9 +337,9 @@ func isTenantIdLike(tenantIdentityOrLocalPath string) bool {
// Returns
// - a directory path
func (c *LogDirCache) ResolveMassifDir(tenantIdentityOrLocalPath string) (string, error) {
directory, err := dirFromFilepath(tenantIdentityOrLocalPath)
if err == nil {
return directory, nil

if c.opts.tenantDirMode {
return dirFromFilepath(tenantIdentityOrLocalPath)
}

// If the provided value is not at suitable "tenant identity" like force an error.
Expand All @@ -328,21 +348,22 @@ func (c *LogDirCache) ResolveMassifDir(tenantIdentityOrLocalPath string) (string
}

// it is not a file or directory, so it must be a tenant identity
directory = filepath.Dir(ReplicaRelativeMassifPath(tenantIdentityOrLocalPath, 0))
directory := filepath.Dir(ReplicaRelativeMassifPath(tenantIdentityOrLocalPath, 0))
return c.resolveReplicaRelativeDir(directory)
}

func (c *LogDirCache) ResolveSealDir(tenantIdentityOrLocalPath string) (string, error) {
directory, err := dirFromFilepath(tenantIdentityOrLocalPath)
if err == nil {
return directory, nil

if c.opts.tenantDirMode {
return dirFromFilepath(tenantIdentityOrLocalPath)
}

// If the provided value is not at suitable "tenant identity" like force an error.
if !isTenantIdLike(tenantIdentityOrLocalPath) {
return "", fmt.Errorf("%w: %s", ErrNeedTenantIdentityOrExistingFilepath, tenantIdentityOrLocalPath)
}

directory = filepath.Dir(ReplicaRelativeSealPath(tenantIdentityOrLocalPath, 0))
directory := filepath.Dir(ReplicaRelativeSealPath(tenantIdentityOrLocalPath, 0))
return c.resolveReplicaRelativeDir(directory)
}

Expand Down

0 comments on commit 87a0601

Please sign in to comment.