Skip to content

Commit

Permalink
Add tests for SMB protocol NULL and empty DACL behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
anodos325 committed Sep 30, 2024
1 parent e941271 commit 4b59a6d
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 47 deletions.
42 changes: 42 additions & 0 deletions src/middlewared/middlewared/test/integration/utils/smb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import contextlib
import os
import sys

try:
apifolder = os.getcwd()
sys.path.append(apifolder)
from protocols.smb_proto import SMB, security
except ImportError:
SMB = None
security = None

from .client import truenas_server

__all__ = ["smb_connection"]


@contextlib.contextmanager
def smb_connection(
host=None,
share=None,
encryption='DEFAULT',
username=None,
domain=None,
password=None,
smb1=False
):
s = SMB()
s.connect(
host=host or truenas_server.ip,
share=share,
encryption=encryption,
username=username,
domain=domain,
password=password,
smb1=smb1
)

try:
yield s
finally:
s.disconnect()
135 changes: 135 additions & 0 deletions tests/api2/test_smb_null_empty_dacl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import json
import os
import pytest

from middlewared.test.integration.assets.smb import smb_share
from middlewared.test.integration.assets.pool import dataset
from middlewared.test.integration.utils import call, ssh
from middlewared.test.integration.utils.smb import security, smb_connection
from samba import ntstatus, NTSTATUSError


ADV_PERMS_FIELDS = [
'READ_DATA', 'WRITE_DATA', 'APPEND_DATA',
'READ_NAMED_ATTRS', 'WRITE_NAMED_ATTRS',
'EXECUTE',
'DELETE_CHILD', 'DELETE',
'READ_ATTRIBUTES', 'WRITE_ATTRIBUTES',
'READ_ACL', 'WRITE_ACL',
'WRITE_OWNER',
'SYNCHRONIZE'
]

NULL_DACL_PERMS = {'BASIC': 'FULL_CONTROL'}
EMPTY_DACL_PERMS = {perm: False for perm in ADV_PERMS_FIELDS}


@pytest.fixture(scope='function')
def share():
with dataset('null_dacl_test', {'share_type': 'SMB'}) as ds:
with smb_share(f'/mnt/{ds}', 'DACL_TEST_SHARE') as s:
yield {'ds': ds, 'share': s}


def set_special_acl(path, special_acl_type):
match special_acl_type:
case 'NULL_DACL':
permset = NULL_DACL_PERMS
case 'EMPTY_DACL':
permset = EMPTY_DACL_PERMS
case _:
raise TypeError(f'[EDOOFUS]: {special_acl_type} unexpected special ACL type')

payload = json.dumps({'acl': [{
'tag': 'everyone@',
'id': -1,
'type': 'ALLOW',
'perms': permset,
'flags': {'BASIC': 'NOINHERIT'},
}]})
ssh(f'touch {path}')

# Use SSH to write to avoid middleware ACL normalization and validation
# that prevents writing these specific ACLs.
ssh(f"nfs4xdr_setfacl -j '{payload}' {path}")


def test_null_dacl_set(unprivileged_user_fixture, share):
""" verify that setting NULL DACL results in expected ZFS ACL """
with smb_connection(
share=share['share']['name'],
username=unprivileged_user_fixture.username,
password=unprivileged_user_fixture.password,
) as c:
fh = c.create_file('test_null_dacl', 'w')
current_sd = c.get_sd(fh, security.SECINFO_OWNER | security.SECINFO_GROUP)
current_sd.dacl = None
c.set_sd(fh, current_sd, security.SECINFO_OWNER | security.SECINFO_GROUP | security.SECINFO_DACL)

new_sd = c.get_sd(fh, security.SECINFO_OWNER | security.SECINFO_GROUP)
assert new_sd.dacl is None

theacl = call('filesystem.getacl', os.path.join(share['share']['path'], 'test_null_dacl'))
assert len(theacl['acl']) == 1

assert theacl['acl'][0]['perms'] == NULL_DACL_PERMS
assert theacl['acl'][0]['type'] == 'ALLOW'
assert theacl['acl'][0]['tag'] == 'everyone@'


def test_null_dacl_functional(unprivileged_user_fixture, share):
""" verify that NULL DACL grants write privileges """
testfile = os.path.join(share['share']['path'], 'test_null_dacl_write')
set_special_acl(testfile, 'NULL_DACL')
data = b'canary'

with smb_connection(
share=share['share']['name'],
username=unprivileged_user_fixture.username,
password=unprivileged_user_fixture.password,
) as c:
fh = c.create_file('test_null_dacl_write', 'w')
current_sd = c.get_sd(fh, security.SECINFO_OWNER | security.SECINFO_GROUP)
assert current_sd.dacl is None

c.write(fh, data)
assert c.read(fh, 0, cnt=len(data)) == data


def test_empty_dacl_set(unprivileged_user_fixture, share):
""" verify that setting empty DACL results in expected ZFS ACL """
with smb_connection(
share=share['share']['name'],
username=unprivileged_user_fixture.username,
password=unprivileged_user_fixture.password,
) as c:
fh = c.create_file('test_null_dacl', 'w')
current_sd = c.get_sd(fh, security.SECINFO_OWNER | security.SECINFO_GROUP)
current_sd.dacl = []
c.set_sd(fh, current_sd, security.SECINFO_OWNER | security.SECINFO_GROUP | security.SECINFO_DACL)

new_sd = c.get_sd(fh, security.SECINFO_OWNER | security.SECINFO_GROUP)
assert new_sd.dacl == []

theacl = call('filesystem.getacl', os.path.join(share['share']['path'], 'test_null_dacl'))
assert len(theacl['acl']) == 1

assert theacl['acl'][0]['perms'] == EMPTY_DACL_PERMS
assert theacl['acl'][0]['type'] == 'ALLOW'
assert theacl['acl'][0]['tag'] == 'everyone@'


def test_empty_dacl_functional(unprivileged_user_fixture, share):
testfile = os.path.join(share['share']['path'], 'test_empty_dacl_write')
set_special_acl(testfile, 'EMPTY_DACL')

with smb_connection(
share=share['share']['name'],
username=unprivileged_user_fixture.username,
password=unprivileged_user_fixture.password,
) as c:
# File has empty ACL and is not owned by this user
with pytest.raises(NTSTATUSError) as nt_err:
c.create_file('test_empty_dacl_write', 'w')

assert nt_err.value.args[0] == ntstatus.NT_STATUS_ACCESS_DENIED
51 changes: 4 additions & 47 deletions tests/protocols/smb_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,53 +324,11 @@ def set_quota(self, **kwargs):
quotaout = smbcquotas.stdout.decode().splitlines()
return self._parse_quota(quotaout)

def get_sd(self, path):
def get_offset_by_key(data, key):
for idx, entry in enumerate(data):
if entry.startswith(key):
return data[idx:]
def set_sd(self, idx, secdesc, security_info):
self._connection.set_sd(self._open_files[idx]["fh"], secdesc, security_info)

raise ValueError(f'Failed to parse ACL: {data}')

cmd = [
"smbcacls", f"//{self._host}/{self._share}",
"-U", f"{self._username}%{self._password}",
"--numeric"
]

if self._smb1:
cmd.extend(["-m", "NT1"])

cmd.append(path)

cl = subprocess.run(cmd, capture_output=True)
if cl.returncode != 0:
raise RuntimeError(cl.stdout.decode() or cl.stderr.decode())

output = get_offset_by_key(cl.stdout.decode().splitlines(), 'REVISION')
revision = int(output[0].split(':')[1])
control = {"raw": output[1].split(':')[1]}
control['parsed'] = [x.name for x in ACLControl if int(control['raw'], 16) & x]

sd = {
"revision": revision,
"control": control,
"owner": output[2].split(':')[1],
"group": output[3].split(':')[1],
"acl": []
}
for l in get_offset_by_key(output, 'ACL'):
entry, flags, access_mask = l.split("/")
prefix, trustee, ace_type = entry.split(":")

sd['acl'].append({
"trustee": trustee,
"type": int(ace_type),
"access_mask": int(access_mask, 16),
"flags": int(flags, 16),
})

return sd
def get_sd(self, idx, security_info):
return self._connection.get_sd(self._open_files[idx]["fh"], security_info)

def inherit_acl(self, path, action):
cmd = [
Expand All @@ -395,4 +353,3 @@ def inherit_acl(self, path, action):
cl = subprocess.run(cmd, capture_output=True)
if cl.returncode != 0:
raise RuntimeError(cl.stdout.decode() or cl.stderr.decode())

0 comments on commit 4b59a6d

Please sign in to comment.