diff --git a/src/middlewared/middlewared/test/integration/utils/smb.py b/src/middlewared/middlewared/test/integration/utils/smb.py new file mode 100644 index 000000000000..a83ac2931329 --- /dev/null +++ b/src/middlewared/middlewared/test/integration/utils/smb.py @@ -0,0 +1,42 @@ +import contextlib +import os +import sys + +try: + apifolder = os.getcwd() + sys.path.append(apifolder) + from protocols.smb_proto import SMB, security +except ImportError: + SMB = None + security = None + +from .client import truenas_server + +__all__ = ["smb_connection"] + + +@contextlib.contextmanager +def smb_connection( + host=None, + share=None, + encryption='DEFAULT', + username=None, + domain=None, + password=None, + smb1=False +): + s = SMB() + s.connect( + host=host or truenas_server.ip, + share=share, + encryption=encryption, + username=username, + domain=domain, + password=password, + smb1=smb1 + ) + + try: + yield s + finally: + s.disconnect() diff --git a/tests/api2/test_smb_null_empty_dacl.py b/tests/api2/test_smb_null_empty_dacl.py new file mode 100644 index 000000000000..5ae6a886b13c --- /dev/null +++ b/tests/api2/test_smb_null_empty_dacl.py @@ -0,0 +1,135 @@ +import json +import os +import pytest + +from middlewared.test.integration.assets.smb import smb_share +from middlewared.test.integration.assets.pool import dataset +from middlewared.test.integration.utils import call, ssh +from middlewared.test.integration.utils.smb import security, smb_connection +from samba import ntstatus, NTSTATUSError + + +ADV_PERMS_FIELDS = [ + 'READ_DATA', 'WRITE_DATA', 'APPEND_DATA', + 'READ_NAMED_ATTRS', 'WRITE_NAMED_ATTRS', + 'EXECUTE', + 'DELETE_CHILD', 'DELETE', + 'READ_ATTRIBUTES', 'WRITE_ATTRIBUTES', + 'READ_ACL', 'WRITE_ACL', + 'WRITE_OWNER', + 'SYNCHRONIZE' +] + +NULL_DACL_PERMS = {'BASIC': 'FULL_CONTROL'} +EMPTY_DACL_PERMS = {perm: False for perm in ADV_PERMS_FIELDS} + + +@pytest.fixture(scope='function') +def share(): + with dataset('null_dacl_test', {'share_type': 'SMB'}) as ds: + with smb_share(f'/mnt/{ds}', 'DACL_TEST_SHARE') as s: + yield {'ds': ds, 'share': s} + + +def set_special_acl(path, special_acl_type): + match special_acl_type: + case 'NULL_DACL': + permset = NULL_DACL_PERMS + case 'EMPTY_DACL': + permset = EMPTY_DACL_PERMS + case _: + raise TypeError(f'[EDOOFUS]: {special_acl_type} unexpected special ACL type') + + payload = json.dumps({'acl': [{ + 'tag': 'everyone@', + 'id': -1, + 'type': 'ALLOW', + 'perms': permset, + 'flags': {'BASIC': 'NOINHERIT'}, + }]}) + ssh(f'touch {path}') + + # Use SSH to write to avoid middleware ACL normalization and validation + # that prevents writing these specific ACLs. + ssh(f"nfs4xdr_setfacl -j '{payload}' {path}") + + +def test_null_dacl_set(unprivileged_user_fixture, share): + """ verify that setting NULL DACL results in expected ZFS ACL """ + with smb_connection( + share=share['share']['name'], + username=unprivileged_user_fixture.username, + password=unprivileged_user_fixture.password, + ) as c: + fh = c.create_file('test_null_dacl', 'w') + current_sd = c.get_sd(fh, security.SECINFO_OWNER | security.SECINFO_GROUP) + current_sd.dacl = None + c.set_sd(fh, current_sd, security.SECINFO_OWNER | security.SECINFO_GROUP | security.SECINFO_DACL) + + new_sd = c.get_sd(fh, security.SECINFO_OWNER | security.SECINFO_GROUP) + assert new_sd.dacl is None + + theacl = call('filesystem.getacl', os.path.join(share['share']['path'], 'test_null_dacl')) + assert len(theacl['acl']) == 1 + + assert theacl['acl'][0]['perms'] == NULL_DACL_PERMS + assert theacl['acl'][0]['type'] == 'ALLOW' + assert theacl['acl'][0]['tag'] == 'everyone@' + + +def test_null_dacl_functional(unprivileged_user_fixture, share): + """ verify that NULL DACL grants write privileges """ + testfile = os.path.join(share['share']['path'], 'test_null_dacl_write') + set_special_acl(testfile, 'NULL_DACL') + data = b'canary' + + with smb_connection( + share=share['share']['name'], + username=unprivileged_user_fixture.username, + password=unprivileged_user_fixture.password, + ) as c: + fh = c.create_file('test_null_dacl_write', 'w') + current_sd = c.get_sd(fh, security.SECINFO_OWNER | security.SECINFO_GROUP) + assert current_sd.dacl is None + + c.write(fh, data) + assert c.read(fh, 0, cnt=len(data)) == data + + +def test_empty_dacl_set(unprivileged_user_fixture, share): + """ verify that setting empty DACL results in expected ZFS ACL """ + with smb_connection( + share=share['share']['name'], + username=unprivileged_user_fixture.username, + password=unprivileged_user_fixture.password, + ) as c: + fh = c.create_file('test_null_dacl', 'w') + current_sd = c.get_sd(fh, security.SECINFO_OWNER | security.SECINFO_GROUP) + current_sd.dacl = [] + c.set_sd(fh, current_sd, security.SECINFO_OWNER | security.SECINFO_GROUP | security.SECINFO_DACL) + + new_sd = c.get_sd(fh, security.SECINFO_OWNER | security.SECINFO_GROUP) + assert new_sd.dacl == [] + + theacl = call('filesystem.getacl', os.path.join(share['share']['path'], 'test_null_dacl')) + assert len(theacl['acl']) == 1 + + assert theacl['acl'][0]['perms'] == EMPTY_DACL_PERMS + assert theacl['acl'][0]['type'] == 'ALLOW' + assert theacl['acl'][0]['tag'] == 'everyone@' + + +def test_empty_dacl_functional(unprivileged_user_fixture, share): + testfile = os.path.join(share['share']['path'], 'test_empty_dacl_write') + set_special_acl(testfile, 'EMPTY_DACL') + + with smb_connection( + share=share['share']['name'], + username=unprivileged_user_fixture.username, + password=unprivileged_user_fixture.password, + ) as c: + # File has empty ACL and is not owned by this user + with pytest.raises(NTSTATUSError) as nt_err: + c.create_file('test_empty_dacl_write', 'w') + + assert nt_err.value.args[0] == ntstatus.NT_STATUS_ACCESS_DENIED diff --git a/tests/protocols/smb_proto.py b/tests/protocols/smb_proto.py index 0823486a9317..7fe6fdd1566c 100644 --- a/tests/protocols/smb_proto.py +++ b/tests/protocols/smb_proto.py @@ -324,53 +324,11 @@ def set_quota(self, **kwargs): quotaout = smbcquotas.stdout.decode().splitlines() return self._parse_quota(quotaout) - def get_sd(self, path): - def get_offset_by_key(data, key): - for idx, entry in enumerate(data): - if entry.startswith(key): - return data[idx:] + def set_sd(self, idx, secdesc, security_info): + self._connection.set_sd(self._open_files[idx]["fh"], secdesc, security_info) - raise ValueError(f'Failed to parse ACL: {data}') - - cmd = [ - "smbcacls", f"//{self._host}/{self._share}", - "-U", f"{self._username}%{self._password}", - "--numeric" - ] - - if self._smb1: - cmd.extend(["-m", "NT1"]) - - cmd.append(path) - - cl = subprocess.run(cmd, capture_output=True) - if cl.returncode != 0: - raise RuntimeError(cl.stdout.decode() or cl.stderr.decode()) - - output = get_offset_by_key(cl.stdout.decode().splitlines(), 'REVISION') - revision = int(output[0].split(':')[1]) - control = {"raw": output[1].split(':')[1]} - control['parsed'] = [x.name for x in ACLControl if int(control['raw'], 16) & x] - - sd = { - "revision": revision, - "control": control, - "owner": output[2].split(':')[1], - "group": output[3].split(':')[1], - "acl": [] - } - for l in get_offset_by_key(output, 'ACL'): - entry, flags, access_mask = l.split("/") - prefix, trustee, ace_type = entry.split(":") - - sd['acl'].append({ - "trustee": trustee, - "type": int(ace_type), - "access_mask": int(access_mask, 16), - "flags": int(flags, 16), - }) - - return sd + def get_sd(self, idx, security_info): + return self._connection.get_sd(self._open_files[idx]["fh"], security_info) def inherit_acl(self, path, action): cmd = [ @@ -395,4 +353,3 @@ def inherit_acl(self, path, action): cl = subprocess.run(cmd, capture_output=True) if cl.returncode != 0: raise RuntimeError(cl.stdout.decode() or cl.stderr.decode()) -