Skip to content

Commit

Permalink
Merge pull request #1457 from Bidaya0/feat/search_method_pool_cache
Browse files Browse the repository at this point in the history
feat: search method pool cache.
  • Loading branch information
Bidaya0 authored May 19, 2023
2 parents f830511 + c9ebf4f commit beafc47
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 113 deletions.
5 changes: 5 additions & 0 deletions dongtai_common/common/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ def get_cache_or_call(*args, **kwargs):
return get_cache_or_call


def disable_cache(function, args=(), kwargs={}):
cache_key = make_hash(
(function.__module__ + function.__name__, args, kwargs))
cache.delete(cache_key)

def cached_decorator(random_range, use_celery_update=False):

def _noname(function):
Expand Down
1 change: 0 additions & 1 deletion dongtai_common/models/vulnerablity.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from dongtai_common.models.project import IastProject
from dongtai_common.models.project_version import IastProjectVersion


class IastVulnerabilityStatus(models.Model):
name = models.CharField(max_length=100, blank=True, default='')

Expand Down
Empty file.
71 changes: 71 additions & 0 deletions dongtai_engine/common/queryset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from dongtai_common.common.utils import cached_decorator
from typing import List, Dict
from dongtai_common.models.strategy_user import IastStrategyUser
from django.db.models import Sum, Q
from celery.apps.worker import logger
from dongtai_common.models.agent import IastAgent
from dongtai_common.models.project import IastProject
from dongtai_common.models.strategy import IastStrategyModel
from dongtai_common.models.hook_strategy import HookStrategy

LANGUAGE_MAP = {"JAVA": 1, "PYTHON": 2, "PHP": 3, "GO": 4}

@cached_decorator(random_range=(60, 120), use_celery_update=False)
def get_scan_id(project_id) -> int:
res = IastProject.objects.filter(pk=project_id).values('scan_id').first()
return res['scan_id'] if res else 0


@cached_decorator(random_range=(60, 120), use_celery_update=False)
def load_sink_strategy(user=None, language=None, scan_id=0) -> List[Dict]:
"""
加载用户user有权限方法的策略
:param user: edit by song
:return:
"""
logger.info('start load sink_strategy')
strategies = list()
language_id = 0
if language and language in LANGUAGE_MAP:
language_id = LANGUAGE_MAP[language]
q = ~Q(state='delete')
scan_template = IastStrategyUser.objects.filter(pk=scan_id).first()
if scan_template:
strategy_id = [int(i) for i in scan_template.content.split(',')]
q = q & Q(pk__in=strategy_id)
type_query = IastStrategyModel.objects.filter(q)
strategy_models = HookStrategy.objects.filter(
strategy__in=type_query,
language_id__in=[language_id]
if language_id else LANGUAGE_MAP.values(),
).values('id', 'value', 'strategy__vul_type', 'strategy__level',
'strategy__vul_name', 'strategy_id')
sub_method_signatures = set()
for strategy in strategy_models:
# for strategy in sub_queryset:
strategy_value = strategy.get("value", "")
sub_method_signature = strategy_value[:strategy_value.rfind(
'(')] if strategy_value.rfind('(') > 0 else strategy_value
if sub_method_signature in sub_method_signatures:
continue
sub_method_signatures.add(sub_method_signature)
strategies.append({
'strategy':
strategy.get("id", ""),
'type':
strategy.get("strategy__vul_type", ""),
'value':
sub_method_signature,
'strategy_level':
strategy.get("strategy__level"),
'strategy_vul_name':
strategy.get("strategy__vul_name"),
'strategy_strategy_id':
strategy.get("strategy_id"),
})
return strategies


@cached_decorator(random_range=(60, 120), use_celery_update=False)
def get_agent(agent_id):
return IastAgent.objects.filter(pk=agent_id).first()
177 changes: 67 additions & 110 deletions dongtai_engine/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,19 @@
from django.core.cache import cache
from dongtai_common.engine.vul_engine import VulEngine
from dongtai_common.models import User
from dongtai_common.models.agent import IastAgent
from dongtai_common.models.agent_method_pool import MethodPool
from dongtai_common.models.asset import Asset
from dongtai_common.models.errorlog import IastErrorlog
from dongtai_common.models.heartbeat import IastHeartbeat
from dongtai_common.models.hook_strategy import HookStrategy
from dongtai_common.models.hook_type import HookType
from dongtai_common.models.project import IastProject
from dongtai_common.models.replay_method_pool import IastAgentMethodPoolReplay
from dongtai_common.models.replay_queue import IastReplayQueue
from dongtai_common.models.sca_maven_db import ScaMavenDb
from dongtai_common.models.strategy import IastStrategyModel
from dongtai_common.models.vul_level import IastVulLevel
from dongtai_common.models.vulnerablity import IastVulnerabilityModel
from dongtai_common.utils import const
from dongtai_common.models.agent import IastAgent
from dongtai_common.models.project import IastProject

from dongtai_engine.plugins.strategy_headers import check_response_header
from dongtai_engine.plugins.strategy_sensitive import check_response_content
Expand All @@ -47,9 +45,7 @@
from hashlib import sha1
from dongtai_engine.task_base import replay_payload_data
from typing import List, Dict
from dongtai_common.models.strategy_user import IastStrategyUser

LANGUAGE_MAP = {"JAVA": 1, "PYTHON": 2, "PHP": 3, "GO": 4}
from dongtai_engine.common.queryset import get_scan_id, load_sink_strategy, get_agent

RETRY_INTERVALS = [10, 30, 90]

Expand All @@ -75,46 +71,6 @@ def queryset_to_iterator(queryset):
break


def load_sink_strategy(user=None, language=None, scan_id=0) -> List[Dict]:
"""
加载用户user有权限方法的策略
:param user: edit by song
:return:
"""
logger.info('start load sink_strategy')
strategies = list()
language_id = 0
if language and language in LANGUAGE_MAP:
language_id = LANGUAGE_MAP[language]
q = ~Q(state='delete')
scan_template = IastStrategyUser.objects.filter(pk=scan_id).first()
if scan_template:
strategy_id = [int(i) for i in scan_template.content.split(',')]
q = q & Q(pk__in=strategy_id)
type_query = IastStrategyModel.objects.filter(q)
strategy_models = HookStrategy.objects.filter(
strategy__in=type_query,
language_id__in=[language_id] if language_id else LANGUAGE_MAP.values(),
created_by__in=[user.id, 1] if user else [1]
).values('id', 'value', 'strategy__vul_type')
sub_method_signatures = set()
for strategy in strategy_models:
# for strategy in sub_queryset:
strategy_value = strategy.get("value", "")
sub_method_signature = strategy_value[:strategy_value.rfind('(')] if strategy_value.rfind(
'(') > 0 else strategy_value
if sub_method_signature in sub_method_signatures:
continue
sub_method_signatures.add(sub_method_signature)

strategies.append({
'strategy': strategy.get("id", ""),
'type': strategy.get("strategy__vul_type", ""),
'value': sub_method_signature
})
return strategies


def search_and_save_vul(engine: Optional[VulEngine],
method_pool_model: Union[IastAgentMethodPoolReplay,
MethodPool],
Expand All @@ -127,18 +83,18 @@ def search_and_save_vul(engine: Optional[VulEngine],
:return: None
"""
logger.info(f'current sink rule is {strategy.get("type")}')
queryset = IastStrategyModel.objects.filter(vul_type=strategy['type'],
state=const.STRATEGY_ENABLE)
#queryset = IastStrategyModel.objects.filter(vul_type=strategy['type'],
# state=const.STRATEGY_ENABLE)
if not method_pool_model:
logger.info(
'method_pool_model missing skip'
)
return
if not queryset.values('id').exists():
logger.warning(
f'current method pool hit rule {strategy.get("type")}, but no vul strategy.'
)
return
#if not queryset.values('id').exists():
# logger.warning(
# f'current method pool hit rule {strategy.get("type")}, but no vul strategy.'
# )
# return
if method_pool is None:
method_pool = json.loads(method_pool_model.method_pool
) if method_pool_model.method_pool else []
Expand All @@ -151,41 +107,41 @@ def search_and_save_vul(engine: Optional[VulEngine],
engine.search(method_pool=method_pool,
vul_method_signature=strategy.get('value'))
status, stack, source_sign, sink_sign, taint_value = engine.result()
vul_strategy = queryset.values("level", "vul_name", "id").first()
vul_type = queryset.values('vul_type').first()
if not vul_strategy or not vul_type:
logger.info(
f'vul data corruption , stop scan in method_pool {method_pool_model.id}'
)
return
#vul_strategy = queryset.values("level", "vul_name", "id").first()
#vul_type = queryset.values('vul_type').first()
#if not vul_strategy or not vul_type:
# logger.info(
# f'vul data corruption , stop scan in method_pool {method_pool_model.id}'
# )
# return
if status:
filterres = vul_filter(
stack,
source_sign,
sink_sign,
taint_value,
vul_type['vul_type'],
strategy['type'],
)
logger.info(f'vul filter_status : {filterres}')
if status and filterres:
if isinstance(method_pool_model, MethodPool):
logger.info(f'vul_found {method_pool_model.agent_id} {method_pool_model.url} {sink_sign}')
else:
logger.info(f'vul_found {method_pool_model.id} {method_pool_model.url} {sink_sign}')
vul_strategy = queryset.values("level", "vul_name", "id").first()
if not vul_strategy:
pass
else:
handler_vul(
sender="tasks.search_and_save_vul",
vul_meta=method_pool_model,
vul_level=vul_strategy['level'],
strategy_id=vul_strategy['id'],
vul_stack=stack,
top_stack=source_sign,
bottom_stack=sink_sign,
taint_value=taint_value
)
#vul_strategy = queryset.values("level", "vul_name", "id").first()
#if not vul_strategy:
# pass
#else:
handler_vul(
sender="tasks.search_and_save_vul",
vul_meta=method_pool_model,
vul_level=strategy['strategy_level'],
strategy_id=strategy['strategy_strategy_id'],
vul_stack=stack,
top_stack=source_sign,
bottom_stack=sink_sign,
taint_value=taint_value
)
else:
try:
if isinstance(method_pool_model, MethodPool):
Expand Down Expand Up @@ -238,7 +194,9 @@ def search_and_save_sink(engine, method_pool_model, strategy):
def search_vul_from_method_pool(self, method_pool_sign, agent_id, retryable=False):
logger.info(f'漏洞检测开始,方法池 {method_pool_sign}')
try:
method_pool_model = MethodPool.objects.filter(pool_sign=method_pool_sign, agent_id=agent_id).first()
method_pool_model = MethodPool.objects.filter(
pool_sign=method_pool_sign, agent_id=agent_id).first()
method_pool_model.agent = get_agent(method_pool_model.agent_id)
if method_pool_model is None:
if retryable:
if self.request.retries < self.max_retries:
Expand All @@ -254,10 +212,9 @@ def search_vul_from_method_pool(self, method_pool_sign, agent_id, retryable=Fals
)
check_response_header(method_pool_model)
check_response_content(method_pool_model)

scan_id = get_scan_id(method_pool_model.agent.bind_project_id)
strategies = load_sink_strategy(
method_pool_model.agent.user, method_pool_model.agent.language,
method_pool_model.agent.bind_project.scan_id)
scan_id=scan_id)
engine = VulEngine()
method_pool = json.loads(method_pool_model.method_pool) if method_pool_model else []
engine.method_pool = method_pool
Expand Down Expand Up @@ -302,36 +259,36 @@ def search_vul_from_replay_method_pool(method_pool_id):
logger.error(f'重放数据漏洞检测出错,方法池 {method_pool_id}. 错误原因:{e}')


def load_methods_from_strategy(strategy_id):
"""
根据策略ID加载策略详情、策略对应的方法池数据
:param strategy_id: 策略ID
:return:
"""
strategy = HookStrategy.objects.filter(type__in=HookType.objects.filter(type=4), id=strategy_id).first()
if strategy is None:
logger.info(f'策略[{strategy_id}]不存在')
return None, None
strategy_value = {
'strategy': strategy,
'type': strategy.type.first().value,
'value': strategy.value.split('(')[0]
}
# fixme 后续根据具体需要,获取用户对应的数据
if strategy is None:
return strategy_value, None

user = User.objects.filter(id=strategy.created_by).first()
if user is None:
return strategy_value, None

agents = IastAgent.objects.filter(user=user)
if agents.values('id').exists() is False:
return strategy_value, None

method_pool_queryset = MethodPool.objects.filter(agent__in=agents)
return strategy_value, method_pool_queryset

#def load_methods_from_strategy(strategy_id):
# """
# 根据策略ID加载策略详情、策略对应的方法池数据
# :param strategy_id: 策略ID
# :return:
# """
# strategy = HookStrategy.objects.filter(type__in=HookType.objects.filter(type=4), id=strategy_id).first()
# if strategy is None:
# logger.info(f'策略[{strategy_id}]不存在')
# return None, None
# strategy_value = {
# 'strategy': strategy,
# 'type': strategy.type.first().value,
# 'value': strategy.value.split('(')[0]
# }
# # fixme 后续根据具体需要,获取用户对应的数据
# if strategy is None:
# return strategy_value, None
#
# user = User.objects.filter(id=strategy.created_by).first()
# if user is None:
# return strategy_value, None
#
# agents = IastAgent.objects.filter(user=user)
# if agents.values('id').exists() is False:
# return strategy_value, None
#
# method_pool_queryset = MethodPool.objects.filter(agent__in=agents)
# return strategy_value, method_pool_queryset
#

def get_project_agents(agent):
agents = IastAgent.objects.filter(
Expand Down
3 changes: 3 additions & 0 deletions dongtai_web/views/project_add.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import ipaddress
import requests
from dongtai_common.models.server import IastServer
from dongtai_common.common.utils import disable_cache
from dongtai_engine.common.queryset import get_scan_id

logger = logging.getLogger("django")

Expand Down Expand Up @@ -184,6 +186,7 @@ def post(self, request):
'base_url', 'test_req_header_key', 'test_req_header_value',
'template_id', 'department_id', 'enable_log', 'log_level'
])
disable_cache(get_scan_id, (project.id))
return R.success(msg='操作成功')
except Exception as e:
logger.error(e, exc_info=e)
Expand Down
8 changes: 6 additions & 2 deletions dongtai_web/views/scan_strategys.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
BatchStatusUpdateSerializerView,
AllStatusUpdateSerializerView,
)
from dongtai_common.common.utils import disable_cache
from dongtai_engine.common.queryset import load_sink_strategy


logger = logging.getLogger('dongtai-webapi')

Expand Down Expand Up @@ -176,8 +179,9 @@ def update(self, request, pk):
if ser.validated_data.get('content', None):
ser.validated_data['content'] = ','.join(
[str(i) for i in ser.validated_data['content']])
obj = IastStrategyUser.objects.filter(
pk=pk).update(**ser.validated_data)
obj = IastStrategyUser.objects.filter(pk=pk).update(
**ser.validated_data)
disable_cache(load_sink_strategy, (), kwargs={"scan_id": pk})
return R.success(msg=_('update success'))

@extend_schema_with_envcheck(
Expand Down

0 comments on commit beafc47

Please sign in to comment.