Skip to content

Commit

Permalink
Add support for yara classification rules (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
nazywam authored Sep 7, 2023
1 parent 638adbe commit af06cd6
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 48 deletions.
5 changes: 1 addition & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
# We're using alpine because libmagic bundled in Debian is quite old (5.35)
FROM python:3.9-alpine

RUN apk add file
FROM python:3.10

WORKDIR /app/service
COPY ./requirements.txt ./requirements.txt
Expand Down
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,33 @@ $ pip install karton-classifier
$ karton-classifier
```


## YARA rule classifiers

Since karton-classifier v2.1.0 it's possible to extend the classifier logic using YARA rules.

You can enable it by passing `--yara-rules` with the path to the directory containing the rules. Each rule **has to** specify the resulting `kind` using the meta section. Other meta-attributes (`platform` and `extension`) are supported but optional. A working rule looks like this:

```yar
rule pe_file
{
meta:
description = "classifies incoming windows executables"
kind = "runnable"
platform = "win32"
extension = "exe"
strings:
$mz = "MZ"
condition:
$mz at 0 and uint32(uint32(0x3C)) == 0x4550
}
```

Some caveats to consider:
* Classifier will still process files normally, so in some cases it may report the same file twice.
* Classifier will report all matching Yara rules (so N matches on a single file will create N tasks)
* The outgoing task includes the matched rule name in `rule-name` in the task header
* All Yara rules must have a `.yar` extension. All other files in the specified directory are ignored. In particular, `.yara` extension is not supported.
* Directories are not supported too - all Yara rules must reside directly in the specified directory.

![Co-financed by the Connecting Europe Facility by of the European Union](https://www.cert.pl/uploads/2019/02/en_horizontal_cef_logo-e1550495232540.png)
170 changes: 126 additions & 44 deletions karton/classifier/classifier.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import argparse
import re
import struct
from hashlib import sha256
from io import BytesIO
from typing import Callable, Dict, Optional, cast
from pathlib import Path
from typing import Callable, Dict, List, Optional, cast
from zipfile import ZipFile

import chardet # type: ignore
import magic as pymagic # type: ignore
import yara # type: ignore
from karton.core import Config, Karton, Task
from karton.core.backend import KartonBackend

Expand All @@ -24,6 +27,21 @@ def classify_openxml(content: bytes) -> Optional[str]:
return None


def load_yara_rules(path: Path) -> yara.Rules:
rule_files = {}
for f in path.glob("*.yar"):
rule_files[f.name] = f.as_posix()

rules = yara.compile(filepaths=rule_files)
for r in rules:
if not r.meta.get("kind"):
raise RuntimeError(
f"Rule {r.identifier} does not have a `kind` meta attribute"
)

return rules


def get_tag(classification: Dict) -> str:
sample_type = classification["kind"]

Expand Down Expand Up @@ -70,6 +88,36 @@ def __init__(
super().__init__(config=config, identity=identity, backend=backend)
self._magic = magic or self._magic_from_content()

yara_directory = self.config.get("classifier", "yara_rules", fallback=None)
if yara_directory:
yara_p = Path(yara_directory)
if not yara_p.is_dir():
raise NotADirectoryError(yara_p)

self.yara_rules = load_yara_rules(yara_p)
self.log.info("Loaded %d yara classifier rules", len(list(self.yara_rules)))
else:
self.yara_rules = None

@classmethod
def args_parser(cls) -> argparse.ArgumentParser:
parser = super().args_parser()
parser.add_argument(
"--yara-rules",
default=None,
help="Directory containing classifier YARA rules",
)
return parser

@classmethod
def config_from_args(cls, config: Config, args: argparse.Namespace) -> None:
super().config_from_args(config, args)
config.load_from_dict(
{
"classifier": {"yara_rules": args.yara_rules},
}
)

def _magic_from_content(self) -> Callable:
get_magic = pymagic.Magic(mime=False)
get_mime = pymagic.Magic(mime=True)
Expand All @@ -84,11 +132,19 @@ def wrapper(content, mime):

def process(self, task: Task) -> None:
sample = task.get_resource("sample")
sample_class = self._classify(task)

sample_classes = []

if self.yara_rules:
sample_classes += self._classify_yara(task)

filemagic_classification = self._classify_filemagic(task)
if filemagic_classification:
sample_classes.append(filemagic_classification)

file_name = sample.name or "sample"

if sample_class is None:
if not sample_classes:
self.log.info(
"Sample {!r} not recognized (unsupported type)".format(
file_name.encode("utf8")
Expand All @@ -105,54 +161,59 @@ def process(self, task: Task) -> None:
self.send_task(res)
return

classification_tag = get_tag(sample_class)
self.log.info(
"Classified {!r} as {} and tag {}".format(
file_name.encode("utf8"), repr(sample_class), classification_tag
)
)

derived_headers = {
"type": "sample",
"stage": "recognized",
"quality": task.headers.get("quality", "high"),
"mime": sample_class["mime"],
}
if sample_class.get("kind") is not None:
derived_headers["kind"] = sample_class["kind"]
if sample_class.get("platform") is not None:
derived_headers["platform"] = sample_class["platform"]
if sample_class.get("extension") is not None:
derived_headers["extension"] = sample_class["extension"]

derived_task = task.derive_task(derived_headers)

# pass the original tags to the next task
tags = [classification_tag]
if derived_task.has_payload("tags"):
tags += derived_task.get_payload("tags")
derived_task.remove_payload("tags")

derived_task.add_payload("tags", tags)
for sample_class in sample_classes:

# if present the magic description is added as a playload
if "magic" in sample_class:
derived_task.add_payload("magic", sample_class["magic"])

# add a sha256 digest in the outgoing task if there
# isn't one in the incoming task
if "sha256" not in derived_task.payload["sample"].metadata:
derived_task.payload["sample"].metadata["sha256"] = sha256(
cast(bytes, sample.content)
).hexdigest()
classification_tag = get_tag(sample_class)
self.log.info(
"Classified %r as %r and tag %s",
file_name.encode("utf8"),
sample_class,
classification_tag,
)

self.send_task(derived_task)
derived_headers = {
"type": "sample",
"stage": "recognized",
"kind": sample_class["kind"],
"quality": task.headers.get("quality", "high"),
}
if sample_class.get("platform") is not None:
derived_headers["platform"] = sample_class["platform"]
if sample_class.get("extension") is not None:
derived_headers["extension"] = sample_class["extension"]
if sample_class.get("mime") is not None:
derived_headers["mime"] = sample_class["mime"]
if sample_class.get("rule-name") is not None:
derived_headers["rule-name"] = sample_class["rule-name"]

derived_task = task.derive_task(derived_headers)

# pass the original tags to the next task
tags = [classification_tag]
if derived_task.has_payload("tags"):
tags += derived_task.get_payload("tags")
derived_task.remove_payload("tags")

derived_task.add_payload("tags", tags)

# if present the magic description is added as a playload
if "magic" in sample_class:
derived_task.add_payload("magic", sample_class["magic"])

# add a sha256 digest in the outgoing task if there
# isn't one in the incoming task
if "sha256" not in derived_task.payload["sample"].metadata:
derived_task.payload["sample"].metadata["sha256"] = sha256(
cast(bytes, sample.content)
).hexdigest()

self.send_task(derived_task)

def _get_extension(self, name: str) -> str:
splitted = name.rsplit(".", 1)
return splitted[-1].lower() if len(splitted) > 1 else ""

def _classify(self, task: Task) -> Optional[Dict[str, Optional[str]]]:
def _classify_filemagic(self, task: Task) -> Optional[Dict[str, Optional[str]]]:
sample = task.get_resource("sample")
content = cast(bytes, sample.content)

Expand Down Expand Up @@ -694,3 +755,24 @@ def apply_archive_headers(extension):

# If not recognized then unsupported
return None

def _classify_yara(self, task: Task) -> List[Dict[str, Optional[str]]]:
sample = task.get_resource("sample")
content = cast(bytes, sample.content)

sample_classes = []

yara_matches = self.yara_rules.match(data=content)
for match in yara_matches:
sample_class = {}
sample_class["rule-name"] = match.rule
sample_class["kind"] = match.meta["kind"]
if match.meta.get("platform"):
sample_class["platform"] = match.meta["platform"]
if match.meta.get("extension"):
sample_class["extension"] = match.meta["extension"]

self.log.info("Matched the sample using Yara rule %s", match.rule)
sample_classes.append(sample_class)

return sample_classes
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
chardet==3.0.4
karton-core>=5.0.0,<6.0.0
python-magic==0.4.18
yara-python>=4.0.0,<5.0.0

0 comments on commit af06cd6

Please sign in to comment.