Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: mirgate api route gather to celery task. #1519

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deploy/docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ elif [ "$1" = "worker-high-freq" ]; then
elif [ "$1" = "worker-es" ]; then
celery -A dongtai_conf worker -l info -Q dongtai-es-save-task $DONGTAI_CONCURRENCY -E --pidfile=
elif [ "$1" = "worker-sca" ]; then
celery -A dongtai_conf worker -l info -Q dongtai-sca-task $DONGTAI_CONCURRENCY -E --pidfile=
celery -A dongtai_conf worker -l info -Q dongtai-sca-task,dongtai-api-route-handler $DONGTAI_CONCURRENCY -E --pidfile=
elif [ "$1" = "worker-other" ]; then
celery -A dongtai_conf worker -l info -X dongtai-periodic-task,dongtai-method-pool-scan,dongtai-replay-vul-scan,dongtai-sca-task $DONGTAI_CONCURRENCY -E --pidfile=
elif [ "$1" = "beat" ]; then
Expand Down
4 changes: 4 additions & 0 deletions dongtai_conf/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

configs["task_queues"] = [
# normal
Queue("dongtai-api-route-handler",
Exchange("dongtai-api-route-handler"),
routing_key="dongtai-api-route-handler"),
Queue("dongtai-method-pool-scan",
Exchange("dongtai-method-pool-scan"),
routing_key="dongtai-method-pool-scan"),
Expand Down Expand Up @@ -63,6 +66,7 @@
# configs['worker_concurrency'] = 8
configs["task_routes"] = {
# normal
"dongtai_protocol.report.handler.api_route_handler.api_route_gather": {'queue': 'dongtai-api-route-handler', 'routing_key': 'dongtai-api-route-handler'},
"dongtai_engine.tasks.search_vul_from_method_pool": {'queue': 'dongtai-method-pool-scan', 'routing_key': 'dongtai-method-pool-scan'},
"dongtai_engine.plugins.project_time_update.project_time_stamp_update": {'queue': 'dongtai-project-time-stamp-update', 'routing_key': 'dongtai-project-time-stamp-update'},
"dongtai_engine.tasks.search_vul_from_replay_method_pool": {'exchange': 'dongtai-replay-vul-scan', 'routing_key': 'dongtai-replay-vul-scan'},
Expand Down
105 changes: 57 additions & 48 deletions dongtai_protocol/report/handler/api_route_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,63 +18,72 @@
from django.db import transaction
from dongtai_common.models.project import IastProject
from dongtai_engine.plugins.project_time_update import project_time_stamp_update
from celery import shared_task

logger = logging.getLogger('dongtai.openapi')


@ReportHandler.register(const.REPORT_API_ROUTE)
class ApiRouteHandler(IReportHandler):

def parse(self):
self.api_data = self.detail.get('apiData')
self.api_routes = map(lambda x: _data_dump(x), self.api_data)
self.api_routes = list(map(lambda x: _data_dump(x), self.api_data))

def save(self):
try:
agent = IastAgent.objects.filter(pk=self.agent_id)[0:1]
if not agent:
raise ValueError(_("No such agent"))
agent = agent[0]
for api_route in self.api_routes:
logger.debug(f"recoding api_route: {api_route}")
http_methods = []
with transaction.atomic():
try:
for http_method_str in api_route['method']:
http_method, __ = HttpMethod.objects.get_or_create(
method=http_method_str.upper())
http_methods.append(http_method)
api_method, is_create = IastApiMethod.objects.get_or_create(
method=http_method_str.upper())
if is_create:
for http_method in http_methods:
IastApiMethodHttpMethodRelation.objects.create(
api_method_id=api_method.id,
http_method_id=http_method.id)
fields = [
'uri', 'code_class', 'description',
'code_file', 'controller', 'agent'
]
api_route_dict = _dictfilter(api_route, fields)
api_route_obj = _route_dump(
api_route_dict, api_method, agent)
api_route_model, is_create = IastApiRoute.objects.get_or_create(
**api_route_obj)
parameters = api_route['parameters']
for parameter in parameters:
parameter_obj = _para_dump(
parameter, api_route_model)
IastApiParameter.objects.get_or_create(
**parameter_obj)
response_obj = _response_dump(
{'return_type': api_route['returnType']},
api_route_model)
IastApiResponse.objects.get_or_create(**response_obj)
except Exception as e:
print(e)
logger.info(_('API navigation log record successfully'))
project_time_stamp_update.apply_async(
(self.agent.bind_project_id, ), countdown=5)
except Exception as e:
logger.info(_('API navigation log failed, why: {}').format(e))
api_route_gather.delay(self.agent_id, self.api_routes)


@shared_task(queue='dongtai-api-route-handler')
def api_route_gather(agent_id, api_routes):
try:
agent = IastAgent.objects.filter(pk=agent_id)[0:1]
if not agent:
raise ValueError(_("No such agent"))
agent = agent[0]
for api_route in api_routes:
logger.debug(f"recoding api_route: {api_route}")
http_methods = []
with transaction.atomic():
try:
for http_method_str in api_route['method']:
http_method, __ = HttpMethod.objects.get_or_create(
method=http_method_str.upper())
http_methods.append(http_method)
api_method, is_create = IastApiMethod.objects.get_or_create(
method=http_method_str.upper())
if is_create:
for http_method in http_methods:
IastApiMethodHttpMethodRelation.objects.create(
api_method_id=api_method.id,
http_method_id=http_method.id)
fields = [
'uri', 'code_class', 'description', 'code_file',
'controller', 'agent'
]
api_route_dict = _dictfilter(api_route, fields)
api_route_obj = _route_dump(api_route_dict, api_method,
agent)
api_route_model, is_create = IastApiRoute.objects.get_or_create(
**api_route_obj)
parameters = api_route['parameters']
for parameter in parameters:
parameter_obj = _para_dump(parameter,
api_route_model)
IastApiParameter.objects.get_or_create(
**parameter_obj)
response_obj = _response_dump(
{'return_type': api_route['returnType']},
api_route_model)
IastApiResponse.objects.get_or_create(**response_obj)
except Exception as e:
print(e)
logger.info(_('API navigation log record successfully'))
project_time_stamp_update.apply_async((agent.bind_project_id, ),
countdown=5)
except Exception as e:
logger.info(_('API navigation log failed, why: {}').format(e),
exc_info=e)


def _data_dump(item):
Expand Down