diff --git a/dongtai_common/common/utils/__init__.py b/dongtai_common/common/utils/__init__.py index 2e3a83f56..6509a4611 100644 --- a/dongtai_common/common/utils/__init__.py +++ b/dongtai_common/common/utils/__init__.py @@ -89,6 +89,11 @@ def get_cache_or_call(*args, **kwargs): return get_cache_or_call +def disable_cache(function, args=(), kwargs={}): + cache_key = make_hash( + (function.__module__ + function.__name__, args, kwargs)) + cache.delete(cache_key) + def cached_decorator(random_range, use_celery_update=False): def _noname(function): diff --git a/dongtai_common/models/vulnerablity.py b/dongtai_common/models/vulnerablity.py index fc2efaf03..a4c972b20 100644 --- a/dongtai_common/models/vulnerablity.py +++ b/dongtai_common/models/vulnerablity.py @@ -15,7 +15,6 @@ from dongtai_common.models.project import IastProject from dongtai_common.models.project_version import IastProjectVersion - class IastVulnerabilityStatus(models.Model): name = models.CharField(max_length=100, blank=True, default='') diff --git a/dongtai_engine/common/__init__.py b/dongtai_engine/common/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dongtai_engine/common/queryset.py b/dongtai_engine/common/queryset.py new file mode 100644 index 000000000..c61ae6c1e --- /dev/null +++ b/dongtai_engine/common/queryset.py @@ -0,0 +1,71 @@ +from dongtai_common.common.utils import cached_decorator +from typing import List, Dict +from dongtai_common.models.strategy_user import IastStrategyUser +from django.db.models import Sum, Q +from celery.apps.worker import logger +from dongtai_common.models.agent import IastAgent +from dongtai_common.models.project import IastProject +from dongtai_common.models.strategy import IastStrategyModel +from dongtai_common.models.hook_strategy import HookStrategy + +LANGUAGE_MAP = {"JAVA": 1, "PYTHON": 2, "PHP": 3, "GO": 4} + +@cached_decorator(random_range=(60, 120), use_celery_update=False) +def get_scan_id(project_id) -> int: + res = IastProject.objects.filter(pk=project_id).values('scan_id').first() + return res['scan_id'] if res else 0 + + +@cached_decorator(random_range=(60, 120), use_celery_update=False) +def load_sink_strategy(user=None, language=None, scan_id=0) -> List[Dict]: + """ + 加载用户user有权限方法的策略 + :param user: edit by song + :return: + """ + logger.info('start load sink_strategy') + strategies = list() + language_id = 0 + if language and language in LANGUAGE_MAP: + language_id = LANGUAGE_MAP[language] + q = ~Q(state='delete') + scan_template = IastStrategyUser.objects.filter(pk=scan_id).first() + if scan_template: + strategy_id = [int(i) for i in scan_template.content.split(',')] + q = q & Q(pk__in=strategy_id) + type_query = IastStrategyModel.objects.filter(q) + strategy_models = HookStrategy.objects.filter( + strategy__in=type_query, + language_id__in=[language_id] + if language_id else LANGUAGE_MAP.values(), + ).values('id', 'value', 'strategy__vul_type', 'strategy__level', + 'strategy__vul_name', 'strategy_id') + sub_method_signatures = set() + for strategy in strategy_models: + # for strategy in sub_queryset: + strategy_value = strategy.get("value", "") + sub_method_signature = strategy_value[:strategy_value.rfind( + '(')] if strategy_value.rfind('(') > 0 else strategy_value + if sub_method_signature in sub_method_signatures: + continue + sub_method_signatures.add(sub_method_signature) + strategies.append({ + 'strategy': + strategy.get("id", ""), + 'type': + strategy.get("strategy__vul_type", ""), + 'value': + sub_method_signature, + 'strategy_level': + strategy.get("strategy__level"), + 'strategy_vul_name': + strategy.get("strategy__vul_name"), + 'strategy_strategy_id': + strategy.get("strategy_id"), + }) + return strategies + + +@cached_decorator(random_range=(60, 120), use_celery_update=False) +def get_agent(agent_id): + return IastAgent.objects.filter(pk=agent_id).first() diff --git a/dongtai_engine/tasks.py b/dongtai_engine/tasks.py index fe6d93351..d0faf7675 100644 --- a/dongtai_engine/tasks.py +++ b/dongtai_engine/tasks.py @@ -20,21 +20,19 @@ from django.core.cache import cache from dongtai_common.engine.vul_engine import VulEngine from dongtai_common.models import User -from dongtai_common.models.agent import IastAgent from dongtai_common.models.agent_method_pool import MethodPool from dongtai_common.models.asset import Asset from dongtai_common.models.errorlog import IastErrorlog from dongtai_common.models.heartbeat import IastHeartbeat -from dongtai_common.models.hook_strategy import HookStrategy from dongtai_common.models.hook_type import HookType -from dongtai_common.models.project import IastProject from dongtai_common.models.replay_method_pool import IastAgentMethodPoolReplay from dongtai_common.models.replay_queue import IastReplayQueue from dongtai_common.models.sca_maven_db import ScaMavenDb -from dongtai_common.models.strategy import IastStrategyModel from dongtai_common.models.vul_level import IastVulLevel from dongtai_common.models.vulnerablity import IastVulnerabilityModel from dongtai_common.utils import const +from dongtai_common.models.agent import IastAgent +from dongtai_common.models.project import IastProject from dongtai_engine.plugins.strategy_headers import check_response_header from dongtai_engine.plugins.strategy_sensitive import check_response_content @@ -47,9 +45,7 @@ from hashlib import sha1 from dongtai_engine.task_base import replay_payload_data from typing import List, Dict -from dongtai_common.models.strategy_user import IastStrategyUser - -LANGUAGE_MAP = {"JAVA": 1, "PYTHON": 2, "PHP": 3, "GO": 4} +from dongtai_engine.common.queryset import get_scan_id, load_sink_strategy, get_agent RETRY_INTERVALS = [10, 30, 90] @@ -75,46 +71,6 @@ def queryset_to_iterator(queryset): break -def load_sink_strategy(user=None, language=None, scan_id=0) -> List[Dict]: - """ - 加载用户user有权限方法的策略 - :param user: edit by song - :return: - """ - logger.info('start load sink_strategy') - strategies = list() - language_id = 0 - if language and language in LANGUAGE_MAP: - language_id = LANGUAGE_MAP[language] - q = ~Q(state='delete') - scan_template = IastStrategyUser.objects.filter(pk=scan_id).first() - if scan_template: - strategy_id = [int(i) for i in scan_template.content.split(',')] - q = q & Q(pk__in=strategy_id) - type_query = IastStrategyModel.objects.filter(q) - strategy_models = HookStrategy.objects.filter( - strategy__in=type_query, - language_id__in=[language_id] if language_id else LANGUAGE_MAP.values(), - created_by__in=[user.id, 1] if user else [1] - ).values('id', 'value', 'strategy__vul_type') - sub_method_signatures = set() - for strategy in strategy_models: - # for strategy in sub_queryset: - strategy_value = strategy.get("value", "") - sub_method_signature = strategy_value[:strategy_value.rfind('(')] if strategy_value.rfind( - '(') > 0 else strategy_value - if sub_method_signature in sub_method_signatures: - continue - sub_method_signatures.add(sub_method_signature) - - strategies.append({ - 'strategy': strategy.get("id", ""), - 'type': strategy.get("strategy__vul_type", ""), - 'value': sub_method_signature - }) - return strategies - - def search_and_save_vul(engine: Optional[VulEngine], method_pool_model: Union[IastAgentMethodPoolReplay, MethodPool], @@ -127,18 +83,18 @@ def search_and_save_vul(engine: Optional[VulEngine], :return: None """ logger.info(f'current sink rule is {strategy.get("type")}') - queryset = IastStrategyModel.objects.filter(vul_type=strategy['type'], - state=const.STRATEGY_ENABLE) + #queryset = IastStrategyModel.objects.filter(vul_type=strategy['type'], + # state=const.STRATEGY_ENABLE) if not method_pool_model: logger.info( 'method_pool_model missing skip' ) return - if not queryset.values('id').exists(): - logger.warning( - f'current method pool hit rule {strategy.get("type")}, but no vul strategy.' - ) - return + #if not queryset.values('id').exists(): + # logger.warning( + # f'current method pool hit rule {strategy.get("type")}, but no vul strategy.' + # ) + # return if method_pool is None: method_pool = json.loads(method_pool_model.method_pool ) if method_pool_model.method_pool else [] @@ -151,20 +107,20 @@ def search_and_save_vul(engine: Optional[VulEngine], engine.search(method_pool=method_pool, vul_method_signature=strategy.get('value')) status, stack, source_sign, sink_sign, taint_value = engine.result() - vul_strategy = queryset.values("level", "vul_name", "id").first() - vul_type = queryset.values('vul_type').first() - if not vul_strategy or not vul_type: - logger.info( - f'vul data corruption , stop scan in method_pool {method_pool_model.id}' - ) - return + #vul_strategy = queryset.values("level", "vul_name", "id").first() + #vul_type = queryset.values('vul_type').first() + #if not vul_strategy or not vul_type: + # logger.info( + # f'vul data corruption , stop scan in method_pool {method_pool_model.id}' + # ) + # return if status: filterres = vul_filter( stack, source_sign, sink_sign, taint_value, - vul_type['vul_type'], + strategy['type'], ) logger.info(f'vul filter_status : {filterres}') if status and filterres: @@ -172,20 +128,20 @@ def search_and_save_vul(engine: Optional[VulEngine], logger.info(f'vul_found {method_pool_model.agent_id} {method_pool_model.url} {sink_sign}') else: logger.info(f'vul_found {method_pool_model.id} {method_pool_model.url} {sink_sign}') - vul_strategy = queryset.values("level", "vul_name", "id").first() - if not vul_strategy: - pass - else: - handler_vul( - sender="tasks.search_and_save_vul", - vul_meta=method_pool_model, - vul_level=vul_strategy['level'], - strategy_id=vul_strategy['id'], - vul_stack=stack, - top_stack=source_sign, - bottom_stack=sink_sign, - taint_value=taint_value - ) + #vul_strategy = queryset.values("level", "vul_name", "id").first() + #if not vul_strategy: + # pass + #else: + handler_vul( + sender="tasks.search_and_save_vul", + vul_meta=method_pool_model, + vul_level=strategy['strategy_level'], + strategy_id=strategy['strategy_strategy_id'], + vul_stack=stack, + top_stack=source_sign, + bottom_stack=sink_sign, + taint_value=taint_value + ) else: try: if isinstance(method_pool_model, MethodPool): @@ -238,7 +194,9 @@ def search_and_save_sink(engine, method_pool_model, strategy): def search_vul_from_method_pool(self, method_pool_sign, agent_id, retryable=False): logger.info(f'漏洞检测开始,方法池 {method_pool_sign}') try: - method_pool_model = MethodPool.objects.filter(pool_sign=method_pool_sign, agent_id=agent_id).first() + method_pool_model = MethodPool.objects.filter( + pool_sign=method_pool_sign, agent_id=agent_id).first() + method_pool_model.agent = get_agent(method_pool_model.agent_id) if method_pool_model is None: if retryable: if self.request.retries < self.max_retries: @@ -254,10 +212,9 @@ def search_vul_from_method_pool(self, method_pool_sign, agent_id, retryable=Fals ) check_response_header(method_pool_model) check_response_content(method_pool_model) - + scan_id = get_scan_id(method_pool_model.agent.bind_project_id) strategies = load_sink_strategy( - method_pool_model.agent.user, method_pool_model.agent.language, - method_pool_model.agent.bind_project.scan_id) + scan_id=scan_id) engine = VulEngine() method_pool = json.loads(method_pool_model.method_pool) if method_pool_model else [] engine.method_pool = method_pool @@ -302,36 +259,36 @@ def search_vul_from_replay_method_pool(method_pool_id): logger.error(f'重放数据漏洞检测出错,方法池 {method_pool_id}. 错误原因:{e}') -def load_methods_from_strategy(strategy_id): - """ - 根据策略ID加载策略详情、策略对应的方法池数据 - :param strategy_id: 策略ID - :return: - """ - strategy = HookStrategy.objects.filter(type__in=HookType.objects.filter(type=4), id=strategy_id).first() - if strategy is None: - logger.info(f'策略[{strategy_id}]不存在') - return None, None - strategy_value = { - 'strategy': strategy, - 'type': strategy.type.first().value, - 'value': strategy.value.split('(')[0] - } - # fixme 后续根据具体需要,获取用户对应的数据 - if strategy is None: - return strategy_value, None - - user = User.objects.filter(id=strategy.created_by).first() - if user is None: - return strategy_value, None - - agents = IastAgent.objects.filter(user=user) - if agents.values('id').exists() is False: - return strategy_value, None - - method_pool_queryset = MethodPool.objects.filter(agent__in=agents) - return strategy_value, method_pool_queryset - +#def load_methods_from_strategy(strategy_id): +# """ +# 根据策略ID加载策略详情、策略对应的方法池数据 +# :param strategy_id: 策略ID +# :return: +# """ +# strategy = HookStrategy.objects.filter(type__in=HookType.objects.filter(type=4), id=strategy_id).first() +# if strategy is None: +# logger.info(f'策略[{strategy_id}]不存在') +# return None, None +# strategy_value = { +# 'strategy': strategy, +# 'type': strategy.type.first().value, +# 'value': strategy.value.split('(')[0] +# } +# # fixme 后续根据具体需要,获取用户对应的数据 +# if strategy is None: +# return strategy_value, None +# +# user = User.objects.filter(id=strategy.created_by).first() +# if user is None: +# return strategy_value, None +# +# agents = IastAgent.objects.filter(user=user) +# if agents.values('id').exists() is False: +# return strategy_value, None +# +# method_pool_queryset = MethodPool.objects.filter(agent__in=agents) +# return strategy_value, method_pool_queryset +# def get_project_agents(agent): agents = IastAgent.objects.filter( diff --git a/dongtai_web/views/project_add.py b/dongtai_web/views/project_add.py index e3720f829..2c809f911 100644 --- a/dongtai_web/views/project_add.py +++ b/dongtai_web/views/project_add.py @@ -23,6 +23,8 @@ import ipaddress import requests from dongtai_common.models.server import IastServer +from dongtai_common.common.utils import disable_cache +from dongtai_engine.common.queryset import get_scan_id logger = logging.getLogger("django") @@ -184,6 +186,7 @@ def post(self, request): 'base_url', 'test_req_header_key', 'test_req_header_value', 'template_id', 'department_id', 'enable_log', 'log_level' ]) + disable_cache(get_scan_id, (project.id)) return R.success(msg='操作成功') except Exception as e: logger.error(e, exc_info=e) diff --git a/dongtai_web/views/scan_strategys.py b/dongtai_web/views/scan_strategys.py index 9894849c8..18d0899e7 100644 --- a/dongtai_web/views/scan_strategys.py +++ b/dongtai_web/views/scan_strategys.py @@ -22,6 +22,9 @@ BatchStatusUpdateSerializerView, AllStatusUpdateSerializerView, ) +from dongtai_common.common.utils import disable_cache +from dongtai_engine.common.queryset import load_sink_strategy + logger = logging.getLogger('dongtai-webapi') @@ -176,8 +179,9 @@ def update(self, request, pk): if ser.validated_data.get('content', None): ser.validated_data['content'] = ','.join( [str(i) for i in ser.validated_data['content']]) - obj = IastStrategyUser.objects.filter( - pk=pk).update(**ser.validated_data) + obj = IastStrategyUser.objects.filter(pk=pk).update( + **ser.validated_data) + disable_cache(load_sink_strategy, (), kwargs={"scan_id": pk}) return R.success(msg=_('update success')) @extend_schema_with_envcheck(