diff --git a/deploy/docker/entrypoint.sh b/deploy/docker/entrypoint.sh index 425682bde..b9835781d 100755 --- a/deploy/docker/entrypoint.sh +++ b/deploy/docker/entrypoint.sh @@ -15,7 +15,7 @@ elif [ "$1" = "worker-high-freq" ]; then elif [ "$1" = "worker-es" ]; then celery -A dongtai_conf worker -l info -Q dongtai-es-save-task $DONGTAI_CONCURRENCY -E --pidfile= elif [ "$1" = "worker-sca" ]; then - celery -A dongtai_conf worker -l info -Q dongtai-sca-task $DONGTAI_CONCURRENCY -E --pidfile= + celery -A dongtai_conf worker -l info -Q dongtai-sca-task,dongtai-api-route-handler $DONGTAI_CONCURRENCY -E --pidfile= elif [ "$1" = "worker-other" ]; then celery -A dongtai_conf worker -l info -X dongtai-periodic-task,dongtai-method-pool-scan,dongtai-replay-vul-scan,dongtai-sca-task $DONGTAI_CONCURRENCY -E --pidfile= elif [ "$1" = "beat" ]; then diff --git a/dongtai_conf/celery.py b/dongtai_conf/celery.py index 0e4e5063b..035ac035e 100644 --- a/dongtai_conf/celery.py +++ b/dongtai_conf/celery.py @@ -26,6 +26,9 @@ configs["task_queues"] = [ # normal + Queue("dongtai-api-route-handler", + Exchange("dongtai-api-route-handler"), + routing_key="dongtai-api-route-handler"), Queue("dongtai-method-pool-scan", Exchange("dongtai-method-pool-scan"), routing_key="dongtai-method-pool-scan"), @@ -63,6 +66,7 @@ # configs['worker_concurrency'] = 8 configs["task_routes"] = { # normal + "dongtai_protocol.report.handler.api_route_handler.api_route_gather": {'queue': 'dongtai-api-route-handler', 'routing_key': 'dongtai-api-route-handler'}, "dongtai_engine.tasks.search_vul_from_method_pool": {'queue': 'dongtai-method-pool-scan', 'routing_key': 'dongtai-method-pool-scan'}, "dongtai_engine.plugins.project_time_update.project_time_stamp_update": {'queue': 'dongtai-project-time-stamp-update', 'routing_key': 'dongtai-project-time-stamp-update'}, "dongtai_engine.tasks.search_vul_from_replay_method_pool": {'exchange': 'dongtai-replay-vul-scan', 'routing_key': 'dongtai-replay-vul-scan'}, diff --git a/dongtai_protocol/report/handler/api_route_handler.py b/dongtai_protocol/report/handler/api_route_handler.py index 44fe54247..36f1fb00a 100644 --- a/dongtai_protocol/report/handler/api_route_handler.py +++ b/dongtai_protocol/report/handler/api_route_handler.py @@ -18,63 +18,72 @@ from django.db import transaction from dongtai_common.models.project import IastProject from dongtai_engine.plugins.project_time_update import project_time_stamp_update +from celery import shared_task + logger = logging.getLogger('dongtai.openapi') @ReportHandler.register(const.REPORT_API_ROUTE) class ApiRouteHandler(IReportHandler): + def parse(self): self.api_data = self.detail.get('apiData') - self.api_routes = map(lambda x: _data_dump(x), self.api_data) + self.api_routes = list(map(lambda x: _data_dump(x), self.api_data)) def save(self): - try: - agent = IastAgent.objects.filter(pk=self.agent_id)[0:1] - if not agent: - raise ValueError(_("No such agent")) - agent = agent[0] - for api_route in self.api_routes: - logger.debug(f"recoding api_route: {api_route}") - http_methods = [] - with transaction.atomic(): - try: - for http_method_str in api_route['method']: - http_method, __ = HttpMethod.objects.get_or_create( - method=http_method_str.upper()) - http_methods.append(http_method) - api_method, is_create = IastApiMethod.objects.get_or_create( - method=http_method_str.upper()) - if is_create: - for http_method in http_methods: - IastApiMethodHttpMethodRelation.objects.create( - api_method_id=api_method.id, - http_method_id=http_method.id) - fields = [ - 'uri', 'code_class', 'description', - 'code_file', 'controller', 'agent' - ] - api_route_dict = _dictfilter(api_route, fields) - api_route_obj = _route_dump( - api_route_dict, api_method, agent) - api_route_model, is_create = IastApiRoute.objects.get_or_create( - **api_route_obj) - parameters = api_route['parameters'] - for parameter in parameters: - parameter_obj = _para_dump( - parameter, api_route_model) - IastApiParameter.objects.get_or_create( - **parameter_obj) - response_obj = _response_dump( - {'return_type': api_route['returnType']}, - api_route_model) - IastApiResponse.objects.get_or_create(**response_obj) - except Exception as e: - print(e) - logger.info(_('API navigation log record successfully')) - project_time_stamp_update.apply_async( - (self.agent.bind_project_id, ), countdown=5) - except Exception as e: - logger.info(_('API navigation log failed, why: {}').format(e)) + api_route_gather.delay(self.agent_id, self.api_routes) + + +@shared_task(queue='dongtai-api-route-handler') +def api_route_gather(agent_id, api_routes): + try: + agent = IastAgent.objects.filter(pk=agent_id)[0:1] + if not agent: + raise ValueError(_("No such agent")) + agent = agent[0] + for api_route in api_routes: + logger.debug(f"recoding api_route: {api_route}") + http_methods = [] + with transaction.atomic(): + try: + for http_method_str in api_route['method']: + http_method, __ = HttpMethod.objects.get_or_create( + method=http_method_str.upper()) + http_methods.append(http_method) + api_method, is_create = IastApiMethod.objects.get_or_create( + method=http_method_str.upper()) + if is_create: + for http_method in http_methods: + IastApiMethodHttpMethodRelation.objects.create( + api_method_id=api_method.id, + http_method_id=http_method.id) + fields = [ + 'uri', 'code_class', 'description', 'code_file', + 'controller', 'agent' + ] + api_route_dict = _dictfilter(api_route, fields) + api_route_obj = _route_dump(api_route_dict, api_method, + agent) + api_route_model, is_create = IastApiRoute.objects.get_or_create( + **api_route_obj) + parameters = api_route['parameters'] + for parameter in parameters: + parameter_obj = _para_dump(parameter, + api_route_model) + IastApiParameter.objects.get_or_create( + **parameter_obj) + response_obj = _response_dump( + {'return_type': api_route['returnType']}, + api_route_model) + IastApiResponse.objects.get_or_create(**response_obj) + except Exception as e: + print(e) + logger.info(_('API navigation log record successfully')) + project_time_stamp_update.apply_async((agent.bind_project_id, ), + countdown=5) + except Exception as e: + logger.info(_('API navigation log failed, why: {}').format(e), + exc_info=e) def _data_dump(item):