Skip to content

Commit

Permalink
Merge pull request #1519 from Bidaya0/feat/api_route_gather_mirgate_t…
Browse files Browse the repository at this point in the history
…o_celery_task

feat: mirgate api route gather to celery task.
  • Loading branch information
Bidaya0 authored Jun 13, 2023
2 parents 6e0f5fd + ef1b9c7 commit bde4473
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 49 deletions.
2 changes: 1 addition & 1 deletion deploy/docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ elif [ "$1" = "worker-high-freq" ]; then
elif [ "$1" = "worker-es" ]; then
celery -A dongtai_conf worker -l info -Q dongtai-es-save-task $DONGTAI_CONCURRENCY -E --pidfile=
elif [ "$1" = "worker-sca" ]; then
celery -A dongtai_conf worker -l info -Q dongtai-sca-task $DONGTAI_CONCURRENCY -E --pidfile=
celery -A dongtai_conf worker -l info -Q dongtai-sca-task,dongtai-api-route-handler $DONGTAI_CONCURRENCY -E --pidfile=
elif [ "$1" = "worker-other" ]; then
celery -A dongtai_conf worker -l info -X dongtai-periodic-task,dongtai-method-pool-scan,dongtai-replay-vul-scan,dongtai-sca-task $DONGTAI_CONCURRENCY -E --pidfile=
elif [ "$1" = "beat" ]; then
Expand Down
4 changes: 4 additions & 0 deletions dongtai_conf/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

configs["task_queues"] = [
# normal
Queue("dongtai-api-route-handler",
Exchange("dongtai-api-route-handler"),
routing_key="dongtai-api-route-handler"),
Queue("dongtai-method-pool-scan",
Exchange("dongtai-method-pool-scan"),
routing_key="dongtai-method-pool-scan"),
Expand Down Expand Up @@ -63,6 +66,7 @@
# configs['worker_concurrency'] = 8
configs["task_routes"] = {
# normal
"dongtai_protocol.report.handler.api_route_handler.api_route_gather": {'queue': 'dongtai-api-route-handler', 'routing_key': 'dongtai-api-route-handler'},
"dongtai_engine.tasks.search_vul_from_method_pool": {'queue': 'dongtai-method-pool-scan', 'routing_key': 'dongtai-method-pool-scan'},
"dongtai_engine.plugins.project_time_update.project_time_stamp_update": {'queue': 'dongtai-project-time-stamp-update', 'routing_key': 'dongtai-project-time-stamp-update'},
"dongtai_engine.tasks.search_vul_from_replay_method_pool": {'exchange': 'dongtai-replay-vul-scan', 'routing_key': 'dongtai-replay-vul-scan'},
Expand Down
105 changes: 57 additions & 48 deletions dongtai_protocol/report/handler/api_route_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,63 +18,72 @@
from django.db import transaction
from dongtai_common.models.project import IastProject
from dongtai_engine.plugins.project_time_update import project_time_stamp_update
from celery import shared_task

logger = logging.getLogger('dongtai.openapi')


@ReportHandler.register(const.REPORT_API_ROUTE)
class ApiRouteHandler(IReportHandler):

def parse(self):
self.api_data = self.detail.get('apiData')
self.api_routes = map(lambda x: _data_dump(x), self.api_data)
self.api_routes = list(map(lambda x: _data_dump(x), self.api_data))

def save(self):
try:
agent = IastAgent.objects.filter(pk=self.agent_id)[0:1]
if not agent:
raise ValueError(_("No such agent"))
agent = agent[0]
for api_route in self.api_routes:
logger.debug(f"recoding api_route: {api_route}")
http_methods = []
with transaction.atomic():
try:
for http_method_str in api_route['method']:
http_method, __ = HttpMethod.objects.get_or_create(
method=http_method_str.upper())
http_methods.append(http_method)
api_method, is_create = IastApiMethod.objects.get_or_create(
method=http_method_str.upper())
if is_create:
for http_method in http_methods:
IastApiMethodHttpMethodRelation.objects.create(
api_method_id=api_method.id,
http_method_id=http_method.id)
fields = [
'uri', 'code_class', 'description',
'code_file', 'controller', 'agent'
]
api_route_dict = _dictfilter(api_route, fields)
api_route_obj = _route_dump(
api_route_dict, api_method, agent)
api_route_model, is_create = IastApiRoute.objects.get_or_create(
**api_route_obj)
parameters = api_route['parameters']
for parameter in parameters:
parameter_obj = _para_dump(
parameter, api_route_model)
IastApiParameter.objects.get_or_create(
**parameter_obj)
response_obj = _response_dump(
{'return_type': api_route['returnType']},
api_route_model)
IastApiResponse.objects.get_or_create(**response_obj)
except Exception as e:
print(e)
logger.info(_('API navigation log record successfully'))
project_time_stamp_update.apply_async(
(self.agent.bind_project_id, ), countdown=5)
except Exception as e:
logger.info(_('API navigation log failed, why: {}').format(e))
api_route_gather.delay(self.agent_id, self.api_routes)


@shared_task(queue='dongtai-api-route-handler')
def api_route_gather(agent_id, api_routes):
try:
agent = IastAgent.objects.filter(pk=agent_id)[0:1]
if not agent:
raise ValueError(_("No such agent"))
agent = agent[0]
for api_route in api_routes:
logger.debug(f"recoding api_route: {api_route}")
http_methods = []
with transaction.atomic():
try:
for http_method_str in api_route['method']:
http_method, __ = HttpMethod.objects.get_or_create(
method=http_method_str.upper())
http_methods.append(http_method)
api_method, is_create = IastApiMethod.objects.get_or_create(
method=http_method_str.upper())
if is_create:
for http_method in http_methods:
IastApiMethodHttpMethodRelation.objects.create(
api_method_id=api_method.id,
http_method_id=http_method.id)
fields = [
'uri', 'code_class', 'description', 'code_file',
'controller', 'agent'
]
api_route_dict = _dictfilter(api_route, fields)
api_route_obj = _route_dump(api_route_dict, api_method,
agent)
api_route_model, is_create = IastApiRoute.objects.get_or_create(
**api_route_obj)
parameters = api_route['parameters']
for parameter in parameters:
parameter_obj = _para_dump(parameter,
api_route_model)
IastApiParameter.objects.get_or_create(
**parameter_obj)
response_obj = _response_dump(
{'return_type': api_route['returnType']},
api_route_model)
IastApiResponse.objects.get_or_create(**response_obj)
except Exception as e:
print(e)
logger.info(_('API navigation log record successfully'))
project_time_stamp_update.apply_async((agent.bind_project_id, ),
countdown=5)
except Exception as e:
logger.info(_('API navigation log failed, why: {}').format(e),
exc_info=e)


def _data_dump(item):
Expand Down

0 comments on commit bde4473

Please sign in to comment.