Skip to content

Commit

Permalink
concurency errors
Browse files Browse the repository at this point in the history
  • Loading branch information
siewer committed Sep 24, 2024
1 parent a89de6f commit e2ede86
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public final class Component {
@JsonIgnore
private List<Vulnerability> vulnerabilities;

@ManyToMany(mappedBy = "components", fetch = FetchType.EAGER)
@ManyToMany(mappedBy = "components", fetch = FetchType.LAZY)
@JsonIgnore
private List<CodeRepo> codeRepos = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public final class Vulnerability {
@Column
private final Finding.Severity severity;

@ManyToMany(fetch = FetchType.EAGER)
@ManyToMany(fetch = FetchType.LAZY)
@JoinTable(
name = "vulnerability_component",
joinColumns = @JoinColumn(name = "vulnerability_id"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void updateCodeRepoStatus(CodeRepo codeRepo, CodeRepoBranch codeRepoBranc
int scaCritical = 0;

// Update status for SCA if the scan was performed
if (scaScanPerformed) {
if (!codeRepo.getComponents().isEmpty()) {
scaHigh = updateStatusForSource(Finding.Source.SCA, codeRepo, codeRepoBranch, true);
scaCritical = countCriticalFindings(Finding.Source.SCA, codeRepo, codeRepoBranch);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void generateBom(String repoDir, CodeRepo codeRepo, CodeRepoBranch codeRe
}

// Validate the bom.json file
File bomFile = new File(repoDir, "bom.json");
File bomFile = new File(repoDir, "sbom.json");
if (bomFile.exists()) {
if (bomFile.length() > 0) {
log.info("[CdxGen] SBOM generated successfully: {}", bomFile.getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,7 @@ public void runEveryDayAt3AM() {
List<Future<?>> futures = new ArrayList<>();
codeRepos.forEach(repo -> {
Future<?> future = executorService.submit(() -> {
try {
scanManagerService.scanRepository(repo, repo.getDefaultBranch(), null, null);
} catch (IOException | InterruptedException e) {
log.error("Error scanning repository: {}", repo.getName(), e);
} catch (ScanException e) {
throw new RuntimeException(e);
}
scanManagerService.scanRepository(repo, repo.getDefaultBranch(), null, null);
});
futures.add(future);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
Expand All @@ -44,12 +46,11 @@ public class ScanManagerService {
private final ConcurrentHashMap<Long, Lock> repoLocks = new ConcurrentHashMap<>(); // Ensure no parallel scans for the same repo


private final ExecutorService executorService = new ThreadPoolExecutor(
5, 5, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);

private final ExecutorService executorService = Executors.newFixedThreadPool(5);
private final ExecutorService scanExecutorService = Executors.newFixedThreadPool(10);
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);


/**
* Asynchronously initiates a scan for a given repository and branch.
Expand All @@ -62,8 +63,7 @@ public class ScanManagerService {
* @throws ScanException If a scanning error occurs.
*/
@Async
public void scanRepository(CodeRepo codeRepo, CodeRepoBranch codeRepoBranch, String commitId, Long iid)
throws IOException, InterruptedException, ScanException {
public void scanRepository(CodeRepo codeRepo, CodeRepoBranch codeRepoBranch, String commitId, Long iid) {

updateCodeRepoService.setScanRunning(codeRepo);
// Acquire a lock specific to the codeRepo
Expand All @@ -73,6 +73,7 @@ public void scanRepository(CodeRepo codeRepo, CodeRepoBranch codeRepoBranch, Str
String repoDir = "/tmp/" + codeRepo.getName();
String commit = "";
AtomicBoolean scaScanPerformed = new AtomicBoolean(false);
Future<?> timeoutFuture = null;

try {
semaphore.acquire(); // Acquire a permit to limit concurrency
Expand All @@ -89,42 +90,62 @@ public void scanRepository(CodeRepo codeRepo, CodeRepoBranch codeRepoBranch, Str
commit = fetchRepository(commitId, repoUrl, accessToken, codeRepoBranch, repoDir);

// Run scans in parallel
CompletableFuture<Void> secretScan = runSecretScan(repoDir, codeRepo, codeRepoBranch);
CompletableFuture<Void> scaScan = runSCAScan(repoDir, codeRepo, codeRepoBranch, scaScanPerformed);
CompletableFuture<Void> sastScan = runSASTScan(repoDir, codeRepo, codeRepoBranch);
CompletableFuture<Void> iacScan = runIACScan(repoDir, codeRepo, codeRepoBranch);
Future<Void> secretScanFuture = runSecretScan(repoDir, codeRepo, codeRepoBranch);
Future<Void> scaScanFuture = runSCAScan(repoDir, codeRepo, codeRepoBranch, scaScanPerformed);
Future<Void> sastScanFuture = runSASTScan(repoDir, codeRepo, codeRepoBranch);
Future<Void> iacScanFuture = runIACScan(repoDir, codeRepo, codeRepoBranch);

// Wait for all scans to complete
CompletableFuture<Void> allScans = CompletableFuture.allOf(secretScan, scaScan, sastScan, iacScan);
List<Future<Void>> scanFutures = Arrays.asList(secretScanFuture, scaScanFuture, sastScanFuture, iacScanFuture);

// Add the completion stage to ensure cleanup and status update
String finalCommit = commit;
allScans.whenComplete((result, ex) -> {
if (ex != null) {
log.error("[ScanManagerService] Problem with scan: {}", ex.getMessage(), ex);
// Schedule a timeout task to cancel scans
timeoutFuture = scheduler.schedule(() -> {
// Cancel the scans
for (Future<Void> future : scanFutures) {
future.cancel(true);
}
log.error("[ScanManagerService] Scan timed out for repo {}.", codeRepo.getName());
}, 2, TimeUnit.HOURS);

// Wait for all scans to complete
for (Future<Void> future : scanFutures) {
try {
updateCodeRepoService.updateCodeRepoStatus(codeRepo, codeRepoBranch, scaScanPerformed.get(), finalCommit);
} catch (Exception updateEx) {
log.error("[ScanManagerService] Failed to update CodeRepo status for {}: {}", codeRepo.getName(), updateEx.getMessage(), updateEx);
future.get(); // Wait for scan to complete
} catch (CancellationException ce) {
log.warn("[ScanManagerService] Scan task was cancelled.");
} catch (InterruptedException ie) {
log.warn("[ScanManagerService] Scan task was interrupted.");
Thread.currentThread().interrupt();
} catch (ExecutionException ee) {
log.error("[ScanManagerService] Exception during scan task: {}", ee.getMessage(), ee);
}
}

try {
cleanUp(repoDir);
} catch (IOException cleanupEx) {
log.error("[ScanManagerService] Failed to clean up repository directory {}: {}", repoDir, cleanupEx.getMessage(), cleanupEx);
}
}).join(); // Ensure this step waits for completion before continuing
// If scans completed before timeout, cancel the timeout task
if (timeoutFuture != null && !timeoutFuture.isDone()) {
timeoutFuture.cancel(false);
}

} catch (InterruptedException | IOException e) {
log.error("[ScanManagerService] Scan for repo {} was interrupted: {}", codeRepo.getName(), e.getMessage());
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("[ScanManagerService] Exception during scan: {}", e.getMessage(), e);
} finally {
// Update status
try {
updateCodeRepoService.updateCodeRepoStatus(codeRepo, codeRepoBranch, scaScanPerformed.get(), commit);
} catch (Exception updateEx) {
log.error("[ScanManagerService] Failed to update CodeRepo status for {}: {}", codeRepo.getName(), updateEx.getMessage(), updateEx);
}

// Clean up
try {
cleanUp(repoDir);
} catch (IOException cleanupEx) {
log.error("[ScanManagerService] Failed to clean up repository directory {}: {}", repoDir, cleanupEx.getMessage(), cleanupEx);
}

lock.unlock(); // Ensure the lock is released
semaphore.release(); // Release the semaphore permit
repoLocks.remove(codeRepo.getId()); // Clean up the lock from the map if no longer needed
if (iid != null && iid > 0){
if (iid != null && iid > 0) {
try {
gitCommentService.processMergeComment(codeRepo, codeRepoBranch, iid);
} catch (MalformedURLException e) {
Expand All @@ -135,6 +156,79 @@ public void scanRepository(CodeRepo codeRepo, CodeRepoBranch codeRepoBranch, Str
});
}

private Future<Void> runSecretScan(String repoDir, CodeRepo codeRepo, CodeRepoBranch codeRepoBranch) {
Callable<Void> task = () -> {
log.info("[ScanManagerService] Starting Secret scan... [for: {}]", repoDir);
try {
secretsService.runGitleaks(repoDir, codeRepo, codeRepoBranch);
} catch (IOException | InterruptedException e) {
if (Thread.currentThread().isInterrupted()) {
log.warn("[ScanManagerService] Secret scan interrupted for {}.", codeRepo.getRepourl());
Thread.currentThread().interrupt();
} else {
log.error("[ScanManagerService] An error occurred during Secret scan for {}.", codeRepo.getRepourl());
}
}
return null;
};
return scanExecutorService.submit(task);
}

private Future<Void> runSCAScan(String repoDir, CodeRepo codeRepo, CodeRepoBranch codeRepoBranch, AtomicBoolean scaScanPerformed) {
Callable<Void> task = () -> {
log.info("[ScanManagerService] Starting SCA scan... [for: {}]", repoDir);
try {
scaScanPerformed.set(scaService.runScan(repoDir, codeRepo, codeRepoBranch));
} catch (IOException | InterruptedException e) {
if (Thread.currentThread().isInterrupted()) {
log.warn("[ScanManagerService] SCA scan interrupted for {}.", codeRepo.getRepourl());
Thread.currentThread().interrupt();
} else {
log.error("[ScanManagerService] An error occurred during SCA scan for {}.", codeRepo.getRepourl());
}
}
return null;
};
return scanExecutorService.submit(task);
}

private Future<Void> runSASTScan(String repoDir, CodeRepo codeRepo, CodeRepoBranch codeRepoBranch) {
Callable<Void> task = () -> {
log.info("[ScanManagerService] Starting SAST scan... [for: {}]", repoDir);
try {
sastService.runBearerScan(repoDir, codeRepo, codeRepoBranch);
} catch (IOException | InterruptedException | ScanException e) {
if (Thread.currentThread().isInterrupted()) {
log.warn("[ScanManagerService] SAST scan interrupted for {}.", codeRepo.getRepourl());
Thread.currentThread().interrupt();
} else {
log.error("[ScanManagerService] An error occurred during SAST scan for {}.", codeRepo.getRepourl());
}
}
return null;
};
return scanExecutorService.submit(task);
}

private Future<Void> runIACScan(String repoDir, CodeRepo codeRepo, CodeRepoBranch codeRepoBranch) {
Callable<Void> task = () -> {
log.info("[ScanManagerService] Starting IAC scan... [for: {}]", repoDir);
try {
iaCService.runKics(repoDir, codeRepo, codeRepoBranch);
} catch (IOException | InterruptedException | ScanException e) {
if (Thread.currentThread().isInterrupted()) {
log.warn("[ScanManagerService] IAC scan interrupted for {}.", codeRepo.getRepourl());
Thread.currentThread().interrupt();
} else {
log.error("[ScanManagerService] An error occurred during IAC scan for {}.", codeRepo.getRepourl());
}
}
return null;
};
return scanExecutorService.submit(task);
}



/**
* Validates the provided inputs for the scan.
Expand Down Expand Up @@ -179,16 +273,16 @@ private String fetchRepository(String commitId, String repoUrl, String accessTok
* @param codeRepoBranch The branch of the code repository.
* @return A CompletableFuture representing the asynchronous operation.
*/
private CompletableFuture<Void> runSecretScan(String repoDir, CodeRepo codeRepo, CodeRepoBranch codeRepoBranch) {
return CompletableFuture.runAsync(() -> {
log.info("[ScanManagerService] Starting Secret scan... [for: {}]", repoDir);
try {
secretsService.runGitleaks(repoDir, codeRepo, codeRepoBranch);
} catch (IOException | InterruptedException e) {
log.error("[ScanManagerService] An error occurred during Secret scan for {}.", codeRepo.getRepourl());
}
});
}
// private CompletableFuture<Void> runSecretScan(String repoDir, CodeRepo codeRepo, CodeRepoBranch codeRepoBranch) {
// return CompletableFuture.runAsync(() -> {
// log.info("[ScanManagerService] Starting Secret scan... [for: {}]", repoDir);
// try {
// secretsService.runGitleaks(repoDir, codeRepo, codeRepoBranch);
// } catch (IOException | InterruptedException e) {
// log.error("[ScanManagerService] An error occurred during Secret scan for {}.", codeRepo.getRepourl());
// }
// });
// }

/**
* Runs the SCA scan asynchronously.
Expand All @@ -199,17 +293,17 @@ private CompletableFuture<Void> runSecretScan(String repoDir, CodeRepo codeRepo,
* @param scaScanPerformed The AtomicBoolean to track if the SCA scan was performed.
* @return A CompletableFuture representing the asynchronous operation.
*/
private CompletableFuture<Void> runSCAScan(String repoDir, CodeRepo codeRepo,
CodeRepoBranch codeRepoBranch, AtomicBoolean scaScanPerformed) {
return CompletableFuture.runAsync(() -> {
log.info("[ScanManagerService] Starting SCA scan... [for: {}]", repoDir);
try {
scaScanPerformed.set(scaService.runScan(repoDir, codeRepo, codeRepoBranch));
} catch (IOException | InterruptedException e) {
log.error("[ScanManagerService] An error occurred during SCA scan for {}.", codeRepo.getRepourl());
}
});
}
// private CompletableFuture<Void> runSCAScan(String repoDir, CodeRepo codeRepo,
// CodeRepoBranch codeRepoBranch, AtomicBoolean scaScanPerformed) {
// return CompletableFuture.runAsync(() -> {
// log.info("[ScanManagerService] Starting SCA scan... [for: {}]", repoDir);
// try {
// scaScanPerformed.set(scaService.runScan(repoDir, codeRepo, codeRepoBranch));
// } catch (IOException | InterruptedException e) {
// log.error("[ScanManagerService] An error occurred during SCA scan for {}.", codeRepo.getRepourl());
// }
// });
// }

/**
* Runs the SAST scan asynchronously.
Expand All @@ -219,17 +313,17 @@ private CompletableFuture<Void> runSCAScan(String repoDir, CodeRepo codeRepo,
* @param codeRepoBranch The branch of the code repository.
* @return A CompletableFuture representing the asynchronous operation.
*/
private CompletableFuture<Void> runSASTScan(String repoDir, CodeRepo codeRepo, CodeRepoBranch codeRepoBranch) {
return CompletableFuture.runAsync(() -> {
log.info("[ScanManagerService] Starting SAST scan... [for: {}]", repoDir);
try {
sastService.runBearerScan(repoDir, codeRepo, codeRepoBranch);
} catch (IOException | InterruptedException | ScanException e) {
e.printStackTrace();
log.error("[ScanManagerService] An error occurred during SAST scan for {}.", codeRepo.getRepourl());
}
});
}
// private CompletableFuture<Void> runSASTScan(String repoDir, CodeRepo codeRepo, CodeRepoBranch codeRepoBranch) {
// return CompletableFuture.runAsync(() -> {
// log.info("[ScanManagerService] Starting SAST scan... [for: {}]", repoDir);
// try {
// sastService.runBearerScan(repoDir, codeRepo, codeRepoBranch);
// } catch (IOException | InterruptedException | ScanException e) {
// e.printStackTrace();
// log.error("[ScanManagerService] An error occurred during SAST scan for {}.", codeRepo.getRepourl());
// }
// });
// }

/**
* Runs the IAC scan asynchronously.
Expand All @@ -239,16 +333,16 @@ private CompletableFuture<Void> runSASTScan(String repoDir, CodeRepo codeRepo, C
* @param codeRepoBranch The branch of the code repository.
* @return A CompletableFuture representing the asynchronous operation.
*/
private CompletableFuture<Void> runIACScan(String repoDir, CodeRepo codeRepo, CodeRepoBranch codeRepoBranch) {
return CompletableFuture.runAsync(() -> {
log.info("[ScanManagerService] Starting IAC scan... [for: {}]", repoDir);
try {
iaCService.runKics(repoDir, codeRepo, codeRepoBranch);
} catch (IOException | InterruptedException | ScanException e) {
log.error("[ScanManagerService] An error occurred during IAC scan for {}.", codeRepo.getRepourl());
}
});
}
// private CompletableFuture<Void> runIACScan(String repoDir, CodeRepo codeRepo, CodeRepoBranch codeRepoBranch) {
// return CompletableFuture.runAsync(() -> {
// log.info("[ScanManagerService] Starting IAC scan... [for: {}]", repoDir);
// try {
// iaCService.runKics(repoDir, codeRepo, codeRepoBranch);
// } catch (IOException | InterruptedException | ScanException e) {
// log.error("[ScanManagerService] An error occurred during IAC scan for {}.", codeRepo.getRepourl());
// }
// });
// }

/**
* Waits for all scan tasks to complete.
Expand Down

0 comments on commit e2ede86

Please sign in to comment.