Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Aplib decompression #64

Merged
merged 12 commits into from
Aug 10, 2021
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ build/
malduck.egg-info/
ven*/
.mypy_cache/
.DS_Store
39 changes: 28 additions & 11 deletions malduck/compression/aplib.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Optional

from .components.aplib import ap_depack
from .components.aplib import APLib

import logging
import struct
Expand Down Expand Up @@ -30,16 +30,33 @@ class aPLib:
:rtype: bytes
"""

def decompress(self, buf: bytes, headerless: bool = False) -> Optional[bytes]:
try:
# Trim header
if not headerless and buf.startswith(b"AP32"):
hdr_length = struct.unpack_from("<I", buf, 4)[0]
buf = buf[hdr_length:]
# Decompress aPLib
return bytes(ap_depack(buf))
except Exception:
return None
def decompress(self, buf: bytes, headerless: bool = True) -> Optional[bytes]:
packed_size = None
packed_crc = None
orig_size = None
orig_crc = None
strict = not headerless

if buf.startswith(b'AP32') and len(buf) >= 24:
# buf has an aPLib header
header_size, packed_size, packed_crc, orig_size, orig_crc = struct.unpack_from('=IIIII', buf, 4)
buf = buf[header_size : header_size + packed_size]

if strict:
if packed_size is not None and packed_size != len(buf):
raise RuntimeError('Packed buf size is incorrect')
if packed_crc is not None and packed_crc != crc32(buf):
raise RuntimeError('Packed buf checksum is incorrect')

result = APLib(buf, strict=strict).depack()

if strict:
if orig_size is not None and orig_size != len(result):
raise RuntimeError('Unpacked buf size is incorrect')
if orig_crc is not None and orig_crc != crc32(result):
raise RuntimeError('Unpacked buf checksum is incorrect')

return result

__call__ = decompress

Expand Down
236 changes: 138 additions & 98 deletions malduck/compression/components/aplib.py
Original file line number Diff line number Diff line change
@@ -1,132 +1,172 @@
# -*- coding: utf-8 -*-
"""
aplib.py by snemes from https://github.com/snemes/aplib
#!/usr/bin/env python3
"""A pure Python module for decompressing aPLib compressed data

A pure Python module for decompressing aPLib compressed data.
Adapted from the original C source code from http://ibsensoftware.com/files/aPLib-1.1.1.zip
Approximately ~20 times faster than the other Python implementations.
Approximately 20 times faster than other Python implementations.
Compatible with both Python 2 and 3.
"""
import struct
from binascii import crc32
from io import BytesIO

__all__ = ["ap_depack"]
__version__ = "0.3"
__author__ = "Sandor Nemes"
__all__ = ['APLib', 'decompress']
__version__ = '0.6'
__author__ = 'Sandor Nemes'


class APLib(object):

class APDSTATE:
"""internal data structure"""
__slots__ = 'source', 'destination', 'tag', 'bitcount', 'strict'

def __init__(self, source: bytes) -> None:
def __init__(self, source, strict=True):
sisoma2 marked this conversation as resolved.
Show resolved Hide resolved
self.source = BytesIO(source)
self.destination = bytearray()
self.tag = 0
self.bitcount = 0
self.strict = bool(strict)

def getbit(self):
sisoma2 marked this conversation as resolved.
Show resolved Hide resolved
# check if tag is empty
self.bitcount -= 1
if self.bitcount < 0:
# load next tag
self.tag = ord(self.source.read(1))
self.bitcount = 7

# shift bit out of tag
bit = self.tag >> 7 & 1
self.tag <<= 1

return bit

def getgamma(self):
sisoma2 marked this conversation as resolved.
Show resolved Hide resolved
result = 1

# input gamma2-encoded bits
while True:
result = (result << 1) + self.getbit()
if not self.getbit():
break

return result

def depack(self):
sisoma2 marked this conversation as resolved.
Show resolved Hide resolved
r0 = -1
lwm = 0
done = False

try:

# first byte verbatim
self.destination += self.source.read(1)

# main decompression loop
while not done:
if self.getbit():
if self.getbit():
if self.getbit():
offs = 0
for _ in range(4):
offs = (offs << 1) + self.getbit()

if offs:
self.destination.append(self.destination[-offs])
else:
self.destination.append(0)

lwm = 0
else:
offs = ord(self.source.read(1))
length = 2 + (offs & 1)
offs >>= 1

if offs:
for _ in range(length):
self.destination.append(self.destination[-offs])
else:
done = True

r0 = offs
lwm = 1
else:
offs = self.getgamma()

if lwm == 0 and offs == 2:
offs = r0
length = self.getgamma()

def ap_getbit(ud: APDSTATE) -> int:
# check if tag is empty
ud.bitcount -= 1
if ud.bitcount < 0:
# load next tag
ud.tag = ord(ud.source.read(1))
ud.bitcount = 7

# shift bit out of tag
bit = ud.tag >> 7 & 0x01
ud.tag <<= 1

return bit


def ap_getgamma(ud: APDSTATE) -> int:
result = 1

# input gamma2-encoded bits
while True:
result = (result << 1) + ap_getbit(ud)
if not ap_getbit(ud):
break

return result


def ap_depack(source: bytes) -> bytearray:
ud = APDSTATE(source)
for _ in range(length):
self.destination.append(self.destination[-offs])
else:
if lwm == 0:
offs -= 3
else:
offs -= 2

r0 = -1
lwm = 0
done = False
offs <<= 8
offs += ord(self.source.read(1))
length = self.getgamma()

# first byte verbatim
ud.destination += ud.source.read(1)
if offs >= 32000:
length += 1
if offs >= 1280:
length += 1
if offs < 128:
length += 2

# main decompression loop
while not done:
if ap_getbit(ud):
if ap_getbit(ud):
if ap_getbit(ud):
offs = 0
for _ in range(length):
self.destination.append(self.destination[-offs])

for _ in range(4):
offs = (offs << 1) + ap_getbit(ud)
r0 = offs

if offs:
ud.destination.append(ud.destination[-offs])
else:
ud.destination.append(0x00)

lwm = 0
lwm = 1
else:
offs = ord(ud.source.read(1))
self.destination += self.source.read(1)
lwm = 0

length = 2 + (offs & 0x0001)
except (TypeError, IndexError):
if self.strict:
raise RuntimeError('aPLib decompression error')

offs >>= 1
return bytes(self.destination)

if offs:
for _ in range(length):
ud.destination.append(ud.destination[-offs])
else:
done = True
def pack(self):
raise NotImplementedError

r0 = offs
lwm = 1
else:
offs = ap_getgamma(ud)

if lwm == 0 and offs == 2:
offs = r0
def decompress(data, strict=False):
sisoma2 marked this conversation as resolved.
Show resolved Hide resolved
packed_size = None
packed_crc = None
orig_size = None
orig_crc = None

length = ap_getgamma(ud)
if data.startswith(b'AP32') and len(data) >= 24:
# data has an aPLib header
header_size, packed_size, packed_crc, orig_size, orig_crc = struct.unpack_from('=IIIII', data, 4)
data = data[header_size : header_size + packed_size]

for _ in range(length):
ud.destination.append(ud.destination[-offs])
else:
if lwm == 0:
offs -= 3
else:
offs -= 2
if strict:
if packed_size is not None and packed_size != len(data):
raise RuntimeError('Packed data size is incorrect')
if packed_crc is not None and packed_crc != crc32(data):
raise RuntimeError('Packed data checksum is incorrect')

offs <<= 8
offs += ord(ud.source.read(1))
result = APLib(data, strict=strict).depack()

length = ap_getgamma(ud)
if strict:
if orig_size is not None and orig_size != len(result):
raise RuntimeError('Unpacked data size is incorrect')
if orig_crc is not None and orig_crc != crc32(result):
raise RuntimeError('Unpacked data checksum is incorrect')

if offs >= 32000:
length += 1
if offs >= 1280:
length += 1
if offs < 128:
length += 2
return result

for _ in range(length):
ud.destination.append(ud.destination[-offs])

r0 = offs
def main():
# self-test
data = b'T\x00he quick\xecb\x0erown\xcef\xaex\x80jumps\xed\xe4veur`t?lazy\xead\xfeg\xc0\x00'
assert decompress(data) == b'The quick brown fox jumps over the lazy dog'

lwm = 1
else:
ud.destination += ud.source.read(1)
lwm = 0

return ud.destination
if __name__ == '__main__':
main()
7 changes: 0 additions & 7 deletions tests/test_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from malduck import aplib, gzip, base64, lznt1


@pytest.mark.skipif("sys.platform == 'darwin'")
def test_aplib():
assert aplib(
base64("QVAzMhgAAAANAAAAvJpimwsAAACFEUoNaDhlbI5vIHducuxkAA==")
Expand All @@ -30,12 +29,6 @@ def test_aplib():
b'The quick brown fox jumps over the lazy dog')


@pytest.mark.skipif("sys.platform != 'darwin'")
def test_aplib_macos():
with pytest.raises(RuntimeError):
assert aplib(b"hello world")


def test_gzip():
assert gzip(base64("eJzLSM3JyVcozy/KSQEAGgsEXQ==")) == b"hello world"
assert gzip(
Expand Down