Skip to content

Commit

Permalink
Merge pull request #1458 from Bidaya0/feat/rotate-use-cache
Browse files Browse the repository at this point in the history
feat: rotate use cache.
  • Loading branch information
Bidaya0 authored May 25, 2023
2 parents 9ad328b + 505f59b commit b51f5d9
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 74 deletions.
2 changes: 1 addition & 1 deletion dongtai_common/models/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class IastHeartbeat(models.Model):
memory = models.CharField(max_length=1000, blank=True, null=True)
cpu = models.CharField(max_length=1000, blank=True, null=True)
disk = models.CharField(max_length=1000, blank=True, null=True)
req_count = models.IntegerField(blank=True, null=True)
req_count = models.IntegerField(default=0, blank=True, null=True)
dt = models.IntegerField(blank=True, null=True)
report_queue = models.PositiveIntegerField(default=0,
null=False,
Expand Down
44 changes: 19 additions & 25 deletions dongtai_engine/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,8 @@ def is_alive(agent_id: int, timestamp: int) -> bool:
"""
Whether the probe is alive or not, the judgment condition: there is a heartbeat log within 2 minutes
"""
return IastHeartbeat.objects.values('id').filter(agent__id=agent_id,
dt__gt=(timestamp -
60 * 2)).exists()

heartbeat_key = f"heartbeat-{agent_id}"
return True if cache.get(heartbeat_key) is not None else False

@shared_task(queue='dongtai-periodic-task')
def update_agent_status():
Expand All @@ -380,27 +378,23 @@ def update_agent_status():
before_agent_status_update()
logger.info(f'检测引擎状态更新开始')
timestamp = int(time.time())
try:
running_agents = IastAgent.objects.values("id").filter(online=1)
is_stopped_agents = list()
for agent in running_agents:
agent_id = agent['id']
if is_alive(agent_id=agent_id, timestamp=timestamp):
continue
else:
is_stopped_agents.append(agent_id)
if is_stopped_agents:
IastAgent.objects.filter(id__in=is_stopped_agents).update(is_running=0, is_core_running=0, online=0)

vul_id_qs = IastReplayQueue.objects.filter(
update_time__lte=timestamp - 60 * 5,
verify_time__isnull=True,
replay_type=1).values('relation_id').distinct()
IastVulnerabilityModel.objects.filter(pk__in=vul_id_qs).update(
status_id=7)
logger.info(f'检测引擎状态更新成功')
except Exception as e:
logger.error(f'检测引擎状态更新出错,错误详情:{e}', exc_info=e)
running_agents_ids = list(
IastAgent.objects.values("id").filter(online=1).values_list(
'pk', flat=True).all())
heartbeat_keys = set(map(lambda x: f"heartbeat-{x}", running_agents_ids))
exists_keys = set(cache.get_many(heartbeat_keys).keys())
keys_missing = heartbeat_keys - exists_keys
stop_agent_ids = list(
map(lambda x: int(x.replace("heartbeat-", "")), keys_missing))
IastAgent.objects.filter(id__in=stop_agent_ids).update(
is_running=0, is_core_running=0, online=0)
vul_id_qs = IastReplayQueue.objects.filter(
update_time__lte=timestamp - 60 * 5,
verify_time__isnull=True,
replay_type=1).values('relation_id').distinct()
IastVulnerabilityModel.objects.filter(pk__in=vul_id_qs).update(status_id=7)
logger.info("update offline agent: %s", stop_agent_ids)
logger.info(f'检测引擎状态更新成功')
after_agent_status_update()

@shared_task(queue='dongtai-periodic-task')
Expand Down
100 changes: 53 additions & 47 deletions dongtai_protocol/report/handler/heartbeat_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,22 @@
from dongtai_common.models.project import IastProject, VulValidation
from dongtai_common.utils.systemsettings import get_vul_validate
from dongtai_common.models.agent import IastAgent
from django.core.cache import cache

logger = logging.getLogger('dongtai.openapi')


def update_agent_cache(agent_id, data):
cache.set(f"heartbeat-{agent_id}", data, timeout=521)


def check_agent_incache(agent_id):
return True if cache.get(f"heartbeat-{agent_id}") else False


@ReportHandler.register(const.REPORT_HEART_BEAT)
class HeartBeatHandler(IReportHandler):

def __init__(self):
super().__init__()
self.req_count = None
Expand All @@ -41,59 +51,50 @@ def parse(self):
self.cpu = self.detail.get('cpu')
self.memory = self.detail.get('memory')
self.disk = self.detail.get('disk')
self.req_count = self.detail.get('reqCount')
self.req_count = self.detail.get('reqCount', None)
self.report_queue = self.detail.get('reportQueue', 0)
self.method_queue = self.detail.get('methodQueue', 0)
self.replay_queue = self.detail.get('replayQueue', 0)
self.return_queue = self.detail.get('returnQueue', None)

def has_permission(self):
self.agent = IastAgent.objects.filter(id=self.agent_id, user=self.user_id).first()
self.agent = IastAgent.objects.filter(id=self.agent_id,
user=self.user_id).first()
return self.agent

def save_heartbeat(self):
self.agent.is_running = 1
self.agent.online = 1
self.agent.save(update_fields=['is_running', 'online'])
queryset = IastHeartbeat.objects.filter(agent=self.agent)
heartbeat = queryset.order_by('-id').first()
if heartbeat:
queryset.exclude(pk=heartbeat.id).delete()
heartbeat.dt = int(time.time())
if self.return_queue == 1:
heartbeat.req_count = self.req_count
heartbeat.report_queue = self.report_queue
heartbeat.method_queue = self.method_queue
heartbeat.replay_queue = self.replay_queue
heartbeat.save(update_fields=[
'req_count', 'dt', 'report_queue', 'method_queue', 'replay_queue'
])
elif self.return_queue == 0:
heartbeat.memory = self.memory
heartbeat.cpu = self.cpu
heartbeat.disk = self.disk
heartbeat.save(update_fields=['disk', 'memory', 'cpu', 'dt'])
else:
heartbeat.memory = self.memory
heartbeat.cpu = self.cpu
heartbeat.req_count = self.req_count
heartbeat.report_queue = self.report_queue
heartbeat.method_queue = self.method_queue
heartbeat.replay_queue = self.replay_queue
heartbeat.disk = self.disk
heartbeat.save(update_fields=[
'disk', 'memory', 'cpu', 'req_count', 'dt', 'report_queue',
'method_queue', 'replay_queue'
])
default_dict = {"dt": int(time.time())}
if not check_agent_incache(self.agent_id):
self.agent.is_running = 1
self.agent.online = 1
IastHeartbeat.objects.update_or_create(agent_id=self.agent_id,
defaults={
"dt": int(time.time()),
})
if self.return_queue == 1:
default_dict['req_count'] = self.req_count
default_dict['report_queue'] = self.report_queue
default_dict['method_queue'] = self.method_queue
default_dict['replay_queue'] = self.replay_queue
IastHeartbeat.objects.update_or_create(agent_id=self.agent_id,
defaults=default_dict)
elif self.return_queue == 0:
if self.req_count is not None:
default_dict['req_count'] = self.req_count
default_dict['memory'] = self.memory
default_dict['cpu'] = self.cpu
default_dict['disk'] = self.disk
else:
IastHeartbeat.objects.create(memory=self.memory,
cpu=self.cpu,
req_count=self.req_count,
report_queue=self.replay_queue,
method_queue=self.method_queue,
replay_queue=self.replay_queue,
dt=int(time.time()),
agent=self.agent)
default_dict['memory'] = self.memory
default_dict['cpu'] = self.cpu
default_dict['req_count'] = self.req_count
default_dict['report_queue'] = self.report_queue
default_dict['method_queue'] = self.method_queue
default_dict['replay_queue'] = self.replay_queue
default_dict['disk'] = self.disk
IastHeartbeat.objects.update_or_create(agent_id=self.agent_id,
defaults=default_dict)
update_agent_cache(self.agent_id, default_dict)

def get_result(self, msg=None):
logger.info('return_queue: {}'.format(self.return_queue))
Expand Down Expand Up @@ -148,10 +149,15 @@ def get_result(self, msg=None):
state=const.WAITING).update(
update_time=timestamp,
state=const.SOLVING)
IastReplayQueue.objects.filter(id__in=failure_ids).update(update_time=timestamp, state=const.SOLVED)

IastVulnerabilityModel.objects.filter(id__in=success_vul_ids).update(latest_time=timestamp, status_id=2)
IastVulnerabilityModel.objects.filter(id__in=failure_vul_ids).update(latest_time=timestamp, status_id=1)
IastReplayQueue.objects.filter(id__in=failure_ids).update(
update_time=timestamp, state=const.SOLVED)

IastVulnerabilityModel.objects.filter(
id__in=success_vul_ids).update(latest_time=timestamp,
status_id=2)
IastVulnerabilityModel.objects.filter(
id__in=failure_vul_ids).update(latest_time=timestamp,
status_id=1)
logger.info(_('Reproduction request issued successfully'))
logger.debug([i['id'] for i in replay_requests])
return replay_requests
Expand Down
3 changes: 2 additions & 1 deletion test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
# datetime: 2021/7/13 下午10:21
# project: dongtai-engine
from django.test.runner import DiscoverRunner
from django.core.cache import cache
import os
import unittest

import django


Expand All @@ -15,6 +15,7 @@ def __init__(self, methodName='runTest'):
super().__init__(methodName)
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "dongtai_conf.settings")
os.environ.setdefault("debug", "true")
cache.clear()
django.setup()


Expand Down

0 comments on commit b51f5d9

Please sign in to comment.