Angr 学习与实践
Angr
example
little_engine | DONE
Script author: Michael Reeves (github: @mastermjr)
Script runtime: 3 min 26 seconds (206 seconds)
Concepts presented:
stdin constraining, concrete optimization with Unicorn
映入眼帘的看上去应该是 CPP 编译的程序。似乎是因为一些优化或者语言本身的问题,简单的一个程序流,看上去极其诡异。动调后得知。程序会吞掉第一个输入的字符(字符的ascii要小于 0x7E)。然后剩下的是输入 flag ,flag 的长度是 75,不同函数之间通过 RDI 传递传出字符串等参数。在分析的时候会发现程序会把输入的字符串转储到 RDI 所指向的一个数据结构中。根据不同的长度进行不同的转储(我打赌源程序猿不可能写出这种东西)
然后经过一个简单的加密,不清楚什么算法,这部分通过 angr 当一个黑盒不管了。
最后通过在死亡的路径中,寻找输出流中存在 “Chugga” 的,就能得到 flag 的求解的结果。
模仿的 Script author: Michael Reeves (github: @mastermjr)
import time
import claripy
import angr
import IPython
# import string
# 关于这个数字我我不清楚为什么输入流是两个两个的状态
# 原作者说因为它有四倍的数组长度,所以它使用的是 len = 1+75*4,不过我暂时没有观察到这个现象,疑惑中,符号少一点也可以的 比如 1+75*2
def main():
p = angr.Project("./engine")
input_length = 1+4*75
flag_chars = [claripy.BVS('flag_%d' % i,8) for i in range(input_length)]
flag = claripy.Concat(*flag_chars+[claripy.BVV(b'\n')])
state = p.factory.full_init_state(
# 不使用差不多差个 50 秒
# add_options=angr.options.unicorn,
stdin=flag
)
# 添加约束,可打印字符,ascii 表中的 32~126 ,string.printable
for i in flag_chars:
state.solver.add(i < 0x7f)
# 注意规避 ' ' 否则会陷入 libc 中
state.solver.add(i > 0x20)
IPython.embed()
# 计算时间
start = time.time()
sm = p.factory.simulation_manager(state)
sm.run()
end = time.time()
print("Time: {}".format(end - start))
# 从结束(死亡)的路径中寻找输出过成功字符串的状态
target = []
for i in sm.deadended:
if b'Chugga' in i.posix.dumps(1):
target.append(i)
if target:
print(target[0].posix.dumps(0))
if __name__ == "__main__":
main()
我有许多地方没搞明白 ,第一是输入流,输出流,符号数量的问题,第二个是求解引擎为什么是 unicorn ,要看看源码了。
Whitehat CTF 2015 - Crypto 400 |DONE
Script author: Yan Shoshitaishvili (github: @Zardus)
Script runtime: 30 seconds
Concepts presented: statically linked binary (manually hooking with function summaries), commandline argument, partial solutions
这个样例是先找到部分解的路径,然后再用部分解爆破出最后的 flag 由于只有 8 byte 的 flag ,所以只用了 30s ,不过还好只有 8 个字节,就算只有 32 个字节,估计爆破就不是很行了,还是得约束求解,不过怎么部分解的,还是有必要观摩一下。
找到 main 之后通过 check1 ,可以有多解,然后再爆破 check2,不过这个脚本也有一个很有意思的地方,就是用了 progressbar,比较直观
# 稍微 mark 一些写法,挺有意思的
# hook 函数,或者某处的简易方法
p.hook(0x4018B0, angr.SIM_PROCEDURES['glibc']['__libc_start_main']())
p.hook(0x422690, angr.SIM_PROCEDURES['libc']['memcpy']())
def hook_length(state):
state.regs.rax = 8
p.hook(0x40168e, hook_length, length=5)
# 给内存添加约束
for i in range(8):
b = s.memory.load(0x6C4B20 + i, 1)
s.add_constraints(b >= 0x21, b <= 0x7e)
# 爆破准备 itertools
possible_values = [ s.solver.eval_upto(s.memory.load(0x6C4B20 + i, 2), 2**16, cast_to=bytes) for i in range(0, 8, 2) ]
possibilities = tuple(itertools.product(*possible_values))
# 数组执行时的动画
def bruteforce_possibilities(possibilities):
# let's try those values!
print('[*] example guess: %r' % b''.join(possibilities[0]))
print('[*] brute-forcing %d possibilities' % len(possibilities))
for guess in progressbar.ProgressBar(widgets=[progressbar.Counter(), ' ', progressbar.Percentage(), ' ', progressbar.Bar(), ' ', progressbar.ETA()])(possibilities):
guess_str = b''.join(guess)
stdout,_ = subprocess.Popen(["./whitehat_crypto400", guess_str.decode("ascii")], stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate()
if b'FLAG IS' in stdout:
return next(filter(lambda s: guess_str in s, stdout.split()))
CSAW CTF 2015 Quals - Reversing 500, “wyvern” |DONE
Script author: Audrey Dutcher (github: @rhelmot)
Script runtime: 15 mins
Concepts presented: stdin constraining, concrete optimization with Unicorn
通过 gdb 调试得知(实在是对 CPP 编译出来的程序了解比较少)
大胆猜测字符长度
这道题和第一道题很想,我打算根据目前所学写两个脚本
sm.run()
# .virtualenvs/angr
import angr
import claripy
def main():
p = angr.Project('./wyvern')
# [INFO]0x4046FE v5 = std::string::length(v10) - 1LL != legend >> 2;
length = 28
# 创建符号
flag_chars = [claripy.BVS("flag_%d" % i , 8) for i in range(length)]
flag = claripy.Concat(*flag_chars + [claripy.BVV(b'\n')])
st = p.factory.full_init_state(
args=["./wyvern"],
add_options=angr.options.unicorn,
stdin=flag
)
for i in flag_chars:
st.solver.add(i<0x7f)
st.solver.add(i>0x20)
sm = p.factory.simulation_manager(st)
sm.run()
for i in sm.deadended:
if b'success' in i.posix.dumps(1):
print("flag{",i.posix.dumps(0),"}")
if __name__ == "__main__":
main()
sm.explore()
想来二者应该时差不多的,具体细节有待研究学习
# .virtualenvs/angr/bin/python
import time
import IPython
import angr
import claripy
def main():
p = angr.Project("./wyvern")
p.hook(0x400F10,angr.SIM_PROCEDURES['glibc']['__libc_start_main']())
length = 28
flag_chars = [claripy.BVS("flag_%d" % i,8) for i in range(length)]
flag = claripy.Concat(*flag_chars + [claripy.BVV(b'\b')])
st = p.factory.full_init_state(
args=["./wyvern"],
add_options=angr.options.unicorn,
stdin=flag
)
for i in flag_chars:
st.solver.add(i<0x7f)
st.solver.add(i>0x20)
sm = p.factory.simulation_manager(st)
before = time.time()
sm.explore(find=[0x40E2BD],avoid=[0x40E37A]).unstash(from_stash='found',to_stash='active')
after = time.time()
# IPython.embed()
if sm.active:
print(sm.active[0].posix.dumps(0))
print("Time : {}".format(after-before))
if __name__ == "__main__":
main()
TUMCTF 2016 - zwiebel |DONE
Script author: Fish
Script runtime: 2 hours 31 minutes with pypy and Unicorn - expect much longer with CPython only
Concepts presented: self-modifying code support, concrete optimization with Unicorn
有意思 self-modifying 这个正好是我之前想的,遇到这种情况应该怎么做。当时由于对符号执行的一些细节不了解,我觉得应该手动 patch 一下程序 ,然后再根据符号执行。不过现在看来,根本不用手动,这些可以有更方便的做法。
脚本比较简单,主要的要点就是在程序自己解密(self-unpacking)的情况下,应该使用 unicorn ,似乎和 qemu 有关,emm 挺底层的东西,MARK 一番,另外就是由于程序由于程序可能走向非约束的解。所以提前 hook 了一个函数。
p.hook_symbol('ptrace', angr.SIM_PROCEDURES['stubs']['ReturnUnconstrained'](return_value=0))
# 这个 20 个 deadended 是否是计算出来的呢,emm,不清楚不过很巧
sm.run(n=20)
其实也可以用 explore 以正常结束为 find 猜测所有不符合的 flag 输入后都是通过系统调用离开的程序,总之方法多样,这个样例用来测试很好。由于不知道使用了多少符号,所以也没有自己创建一定数量的符号(总之一切交给 angr )🤣
import IPython
import angr
def main():
# load process
# hook unconstrained function 这个应该是看经验的,要么就去魔改一下执行的函数
# init SimState
# symbol execute
IPython.embed()
# sm.stashes
return flag
# TODO
if __name__ == "__main__":
print(main())
# Here is the output (after 2 hours and 31 minutes on my machine running Pypy):
#
# ipdb> print(sm)
# <PathGroup with 20 errored, 21 deadended>
# ipdb> print(sm.deadended[-1])
# <Path with 160170 runs (at 0x20001e0)>
# ipdb> print(sm.deadended[-1].state.posix.dumps(0))
# hxp{1_h0p3_y0u_d1dnt_p33l_th3_0ni0n_by_h4nd}
# :)
真要打比赛还得看优化:person_fencing: , 看来这个脚本的作者机子不大行。在没有管非约束的一些函数的情况下,我执行了 25min 差不多。感觉还是题目的加解密比较简单。
FlareOn 2015 - Challenge 5 |DONE
Script author: Adrian Tang (github: @tangabc)
Script runtime: 2 mins 10 secs
Concepts presented: Windows support
有关这个题目本身这个 HASH 值怎么得出来的我是一头雾水(没有动调过)。所以暂且就不关注这个 HASH 了。估计是从给的包里面提取出来的。
这个脚本有意思的地方在于把栈堆给符号化了,然后一通执行到程序到关键函数结束后。添加约束再求解的过程。比较巧妙的解决了一种已知结果但是初始值未知(求解引擎负责)的逆向题目。
以下为自己纯纯的 copy,下次自己写,或者了解深了再优化。
"""
Full writeup of the walkthrough:
http://0x0atang.github.io/reversing/2015/09/18/flareon5-concolic.html
"""
import IPython
import angr
# Globals
LEN_PW = 0x22
ADDR_PW_ORI = ADDR_PW_ENC = ADDR_HASH = 0
GOAL_HASH = 'UDYs1D7bNmdE1o3g5ms1V6RrYCVvODJF1DpxKTxAJ9xuZW=='
def hook_duplicate_pw_buf(state):
for i in range(LEN_PW):
char_ori = state.memory.load(ADDR_PW_ORI + i, 1)
state.memory.store(ADDR_PW_ENC + i, char_ori)
state.regs.ebx = ADDR_PW_ENC
def hook_use_dup_pw_buf(state):
state.regs.ecx = ADDR_PW_ENC
def hook_heapalloc(state):
state.regs.eax = ADDR_HASH
def main():
global ADDR_PW_ORI, ADDR_PW_ENC, ADDR_HASH
# Load binary
p = angr.Project('sender')
# Start with a blank state at the EIP after "key.txt" is read
state = p.factory.blank_state(addr=0x401198)
# Initialize global variables
ADDR_PW_ORI = state.regs.ebp - 0x80004
ADDR_PW_ENC = ADDR_PW_ORI + 0x10000 # 看上去就是随便找了一个地方存了
ADDR_HASH = state.regs.ebp - 0x40000
# Setup stack to simulate the state after which the "key.txt" is read
state.regs.esi = LEN_PW
for i in range(LEN_PW):
state.mem[ADDR_PW_ORI+i:].byte = state.solver.BVS('pw', 8)
# Hook instructions to use a separate buffer for the XOR-ing function
p.hook(0x401259, hook_duplicate_pw_buf, length=0)
p.hook(0x4011E7, hook_use_dup_pw_buf, length=0)
# To avoid calling imports (HeapAlloc), retrofit part of the stack as
# temporary buffer to hold symbolic copy of the password
p.hook(0x4011D6, hook_heapalloc, length=5)
# Explore the states until after the hash is computed
sm = p.factory.simulation_manager(state)
sm.explore(find=0x4011EC)
# Add constraints to make final hash equal to the one we want
# Also restrict the hash to only printable bytes
found_s = sm.found[0]
IPython.embed()
for i in range(len(GOAL_HASH)):
char = found_s.memory.load(ADDR_HASH + i, 1)
found_s.add_constraints(char >= 0x21,
char <= 0x7e,
char == ord(GOAL_HASH[i]))
# Solve for password that will result in the required hash
return found_s.solver.eval(found_s.memory.load(ADDR_PW_ORI+0, 1), cast_to=bytes) + \
found_s.solver.eval(found_s.memory.load(ADDR_PW_ORI+1, LEN_PW-1), cast_to=bytes)
def test():
assert main() == b'Sp1cy_7_layer_OSI_dip@flare-on.com'
if __name__ == '__main__':
print(main())
0ctf quals 2016 - trace | ——-
Script author: WGH (wgh@bushwhackers.ru)
Script runtime: 1 min 50 secs (CPython 2.7.10), 1 min 12 secs (PyPy 4.0.1)
Concepts presented: guided symbolic tracing
MIPS 汇编不习惯,下次一定
ASIS CTF Finals 2015 - license | ??????
Script author: Fish Wang (github: @ltfish)
Script runtime: 3.6 sec
Concepts presented: using the filesystem, manual symbolic summary execution
我写的脚本执行不下去,内存占满了都,暂时不清楚原因。
还得用它提供的符号文件欸,不然自动生成一个,多少有点问题。这道题很有收获,但是有点疑问,就是为什么不适用文件系统符号执行的情况下,程序进行不下去(内存占满。。)被 KILL 了。
import angr
import claripy
def main():
p = angr.Project("license", load_options={'auto_load_libs': False})
# Create a blank state
state = p.factory.blank_state()
# Build the file whose name is weird
license_name = "_a\nb\tc_"
# This is the license file
# From analyzing the binary, we know that the license file should have five
# lines in total, and each line has 6 characters. Not setting file content
# may also work, but in that case, angr will produce many more paths, and we
# will spent much more time in path trimming.
bytestring = None
for i in range(5):
line = [ ]
for j in range(6):
line.append(state.solver.BVS('license_file_byte_%d_%d' % (i, j), 8))
state.add_constraints(line[-1] != b'\n')
if bytestring is None:
bytestring = claripy.Concat(*line)
else:
bytestring = bytestring.concat(state.solver.BVV(b'\n'), *line)
license_file = angr.storage.file.SimFile(license_name, bytestring)
state.fs.insert(license_name, license_file)
simgr = p.factory.simulation_manager(state)
simgr.explore(
find=(0x400e93, ),
avoid=(0x400bb1, 0x400b8f, 0x400b6d, 0x400a85,
0x400ebf, 0x400a59)
)
# One path will be found
found = simgr.found[0]
rsp = found.regs.rsp
flag_addr = rsp + 0x278 - 0xd8 # Ripped from IDA
# Perform an inline call to strlen() in order to determine the length of the
# flag
FAKE_ADDR = 0x100000
strlen = lambda state, arguments: \
angr.SIM_PROCEDURES['libc']['strlen'](p, FAKE_ADDR, p.arch).execute(
state, arguments=arguments
)
flag_length = strlen(found, arguments=[flag_addr]).ret_expr
# In case it's not null-terminated, we get the least number as the length
flag_length_int = min(found.solver.eval_upto(flag_length, 3))
# Read out the flag!
flag_int = found.solver.eval(found.memory.load(flag_addr, flag_length_int))
flag = bytes.fromhex(hex(flag_int)[2:])
return flag
def test():
assert main() == b'ASIS{8d2cc30143831881f94cb05dcf0b83e0}'
if __name__ == '__main__':
print(main())
import angr
import claripy
import IPython
LEN = 0
addr = 0x100000
license_name = "_a\nb\tc_"
def main():
global LEN
p = angr.Project("./license")
length = claripy.BVS("length",64)
st = p.factory.full_init_state()
st.solver.add(-45235 * length * length * length * length
- 1256 * length * length * length
+ 14392 * length * length - 59762
* length - 0x1C5F164EF8C + 44242
* length * length * length * length * length == 0)
LEN = st.solver.eval(length)
def hook_str_addr(state):
state.regs.rax = addr
print("ADDR = ",hex(addr)
bytestring = None
for i in range(5):
line = [ ]
for j in range(6):
line.append(st.solver.BVS('license_file_byte_%d_%d' % (i, j), 8))
st.add_constraints(line[-1] != b'\n')
if bytestring is None:
bytestring = claripy.Concat(*line)
else:
bytestring = bytestring.concat(st.solver.BVV(b'\n'), *line)
# license_chars = [claripy.BVS("license_%d" % i,8) for i in range(LEN)]
# license_stream = claripy.Concat(*license_chars)
# license_file = angr.storage.file.SimFile(license_name, license_stream)
# st.fs.insert(license_name, license_file)
p.hook(0x400A30,hook_str_addr,length=5)
sm = p.factory.simulation_manager(st)
sm.explore(find = [0x400BD0],avoid=[0x400BB1,0x400B9F,0x400EBF,0x400B77,0x400A85])
sm.explore(find = [0x400E9D],avoid=[0x400BB1,0x400B9F,0x400EBF,0x400B77,0x400A85])
found = sm.found[0]
flag_addr = found.regs.rsp + 0x1A0
flag_int = found.solver.eval(found.memory.load(flag_addr,38))
flag = bytes.fromhex(hex(flag_int)[2:])
license_int = found.solver.eval_upto(found.memory.load(addr,34),3)[0]
license = bytes.fromhex(hex(license_int)[2:])
IPython.embed()
return {"flag":flag,"license":license}
def test():
assert main() == {"flag":b"ASIS{8d2cc30143831881f94cb05dcf0b83e0}",
"license":b"tl9GUA\n\x1d'n(\x0f\r\n/\x15:\x15\x1d3\nhgyGxW\n49\x064(."}
if __name__ == "__main__":
print(main())
ais3_crackme |DONE
从命令行中输入 flag ,求解 。
import angr
import claripy
import IPython
def main():
p = angr.Project("./ais3_crackme")
# p.hook(0x40060E,print("failed"),length=0)
flag_chars = [claripy.BVS("flag_%d" %i,8) for i in range(23)]
flag = claripy.Concat(*flag_chars+[claripy.BVS("flag_tail",8)])
st = p.factory.full_init_state(
args = ["./ais3_crackme",flag]
)
for i in flag_chars:
st.solver.add(i < 0x7f)
st.solver.add(i > 0x20)
sm = p.factory.simulation_manager(st)
sm.explore(find = [0x400607])
found = sm.found[0]
# print(sm.stashes)
# IPython.embed()
return found.solver.eval(flag,cast_to=bytes)
if __name__ == "__main__":
print(main())
RaRCTF2021 Jammy’s Old Infra |DONE
不会调试,等一个有缘人教我怎么动调 so,我看题目架构都给齐全了呀,为什么 ida 会找不到这个 so 呢。一开始有一个 Decrypt_Key 函数,试着用 angr 执行了一下,还行,一开始用的时候经常给 angr 的大端序小端序坑,X86 都识别出来了,居然还默认用大端序,默认把程序里面的东西都当字符串了呗。
# ~/.virtualenvs/angr/bin/python
import IPython
import angr
def main():
base = 0x0
p = angr.Project("./libnative-lib.so",load_options={
'main_opts':{
'base_addr' : base
},
})
aes_cbc_decrypt_off = 0x85D0
aes_cbc_decrypt_end = 0x8C10
decryptKey_off = 0x8500
decryptKey_end = 0x85B4
error_off = 0x9C99
a1_pt = 0x3000000
st = p.factory.blank_state(addr = decryptKey_off)
st.regs.ebp = 0x8000000
st.regs.esp = 0x9000000 # 可能防止内存不对其导致 报错
st.regs.edi = 0x4000000
st.regs.eax = 0x5000000
st.regs.ecx = 0x0
st.memory.store(0x9000004,0x3,size=4)
sm = p.factory.simulation_manager(st)
sm.explore(find=[decryptKey_end])
print(sm.found[0].memory.load(0x3000000,0x20))
IPython.embed()
# BE
# 0x800000c0c00000c0c00000c0
if __name__ == "__main__":
main()
然后就一发不可收拾了
有两个 chek 函数,一个 check name,一个 check password。
用的 angr 黑盒出来的,直接执行,感觉 angr 的 call 不好用,自己写了一套模板,下次再改进
1、一些简单的函数 hook 可以使用 lambda 表达式
2、整体的模板还差一些参数的调整
3、有关效率的优化,引擎的优化
4、inline_call() emm,我目前还没遇到一个函数太大解决不了需要调用别的内敛的 SimProcedure 的情况,因为目前,就是给每一个调用有点问题的函数来点修饰。
solve_password.py
from angr import Project,SimProcedure,SIM_PROCEDURES
import archinfo
import monkeyhex
import claripy
import IPython
import logging
base = 0x0
def call(it,addr,function_name,*args,**kwargs):
st = it.state
if(args == None):
print("[FUN-INFO]EIP:",st.regs.eip,"CALLING (",function_name,")")
else:
print("[FUN_INFO]EIP:",st.regs.eip,function_name,args)
ins_len = st.block().capstone.insns[0].size
st.stack_push(st.regs.eip + ins_len)
it.jump(addr)
class hook_length(SimProcedure):
def __init__(self,flag_size,return_addr,unhook_addr):
SimProcedure.__init__(self)
self.flag_size = flag_size
self.return_addr = return_addr
self.unhook_addr = unhook_addr
def run(self):
st = self.state
st.regs.eax = self.flag_size
self.project.unhook(self.unhook_addr)
self.jump(self.return_addr)
# def hook_print_aes_cbc_d_buffer(st):
# sp = st.regs.esp
# arg0_str = st.memory.load(sp,23,endness=archinfo.Endness.BE)
# byte_str = bytes.fromhex(hex(st.solver.eval(arg0_str))[2:0])
# def hook_print_before_memcpy(st):
# sp = st.regs.esp
# arg0 = st.regs.ecx
# arg1 = st.memory.load(sp+4,4,endness=archinfo.Endness.LE)
# arg2 = st.memory.load(sp+8,4,endness=archinfo.Endness.LE)
# print("s1 = ",st.memory.load(arg0,32,endness=archinfo.Endness.BE),"\ns2 = ",st.memory.load(arg1,32,endness=archinfo.Endness.BE),"\nn = ",st.memory.load(arg2,4,endness=archinfo.Endness.LE))
class MyGetStringUTFChars(SimProcedure):
def __init__(self,size,return_addr):
SimProcedure.__init__(self)
self.size = size
self.return_addr = return_addr
def run(self):
st = self.state
sp = st.regs.esp
arg0 = st.memory.load(sp,4,endness=archinfo.Endness.LE)
arg1 = st.memory.load(sp+4,4,endness=archinfo.Endness.LE)
arg2 = st.memory.load(sp+8,4,endness=archinfo.Endness.LE)
# print("[HOOKED-INFO]EIP:",st.regs.eip,"CALLING GetStringUTFChars(",arg0,arg1,arg2,")")
flag_addr = 0x5000000
flag_length = self.size
st.regs.eax = flag_addr
flag_char = [claripy.BVS("flag_%i" %i,8) for i in range(flag_length)] # assume len(flag) <= 64
for i in flag_char:
st.add_constraints(i > 0x20)
st.add_constraints(i < 0x7f)
flag = claripy.Concat(*flag_char+[claripy.BVV(b"\x00")])
st.memory.store(flag_addr,flag,endness=archinfo.Endness.BE)
self.jump(self.return_addr)
class Decrypt_Key(SimProcedure):
def run(self):
addr = 0x8500
sp = self.state.regs.esp
arg0 = self.state.memory.load(sp,4,endness=archinfo.Endness.LE)
call(self,addr,"Decrypt_Key",arg0)
class Basic_Stirng(SimProcedure):
def run(self):
addr = 0x83A0
st = self.state
sp = st.regs.esp
arg0 = st.memory.load(sp,4,endness=archinfo.Endness.LE)
arg1 = st.memory.load(sp+4,4,endness=archinfo.Endness.LE)
call(self,addr,"Basic_String",arg0,arg1)
class Aes_cbc_decrypt(SimProcedure):
def run(self):
addr = 0x85D0
st = self.state
sp = st.regs.esp
arg0 = st.memory.load(sp,4,endness=archinfo.Endness.LE)
call(self,addr,"aes_cbc_decrypt",arg0)
class AES_init_ctx_iv(SimProcedure):
def run(self):
addr = 0x9640
st = self.state
sp = st.regs.esp
arg0 = st.memory.load(sp,4,endness=archinfo.Endness.LE)
arg1 = st.memory.load(sp+4,4,endness=archinfo.Endness.LE)
arg2 = st.memory.load(sp+8,4,endness=archinfo.Endness.LE)
call(self,addr,"AES_init_ctx_iv",arg0,arg1,arg2)
class AES_CBC_decrypt_buffer(SimProcedure):
def run(self):
addr = 0xA3D0
st = self.state
sp = st.regs.esp
arg0 = st.memory.load(sp,4,endness=archinfo.Endness.LE)
arg1 = st.memory.load(sp+4,4,endness=archinfo.Endness.LE)
arg2 = st.memory.load(sp+8,4,endness=archinfo.Endness.LE)
call(self,addr,"AES_CBC_decrypt_buffer",arg0,arg1,arg2)
# def test_stack(st):
# sp = st.regs.esp
# print("TOP-STACK : ",sp)
# print(st.memory.load(sp,4,endness=archinfo.Endness.LE))
# print(st.memory.load(sp+4,4,endness=archinfo.Endness.LE))
def main(flag_size):
logging.getLogger('angr.manager').setLevel(logging.DEBUG)
p = Project('./libnative-lib.so',load_options={'main_opts':{'auto_load_libs':True,'base_addr':base,},})
p.hook(0x9037,MyGetStringUTFChars(flag_size,0x903D),6)
p.hook(0x9048,Basic_Stirng(),5)
p.hook(0x9054,Aes_cbc_decrypt(),5)
p.hook(0x85FA,Decrypt_Key(),5)
p.hook(0x83D2,hook_length(flag_size,0x83D7,0x83D2),5)
p.hook(0x875D,Basic_Stirng(),5)
p.hook(0x878E,Basic_Stirng(),5)
p.hook(0x8AE9,Basic_Stirng(),5)
# OUTPUT SOMETHING
# p.hook(0x8ADE,hook_print_aes_cbc_d_buffer,0)
# p.hook(0x90BA,hook_print_before_memcpy,0)
aes_cbc_decrypt_off = 0x85D0
aes_cbc_decrypt_find = 0x8AE9
checkPasswords_off = 0x9000
checkPasswords_find = 0x90BF # 0x90BA
st = p.factory.blank_state(addr = base + checkPasswords_off)
# 初始化栈
st.regs.esp = 0x8000000 + 0xC # 0xC TO BE Aligned
st.regs.ebp = 0x7000000
st.regs.ebx = 0x0
sm = p.factory.simulation_manager(st)
sm.explore(find = [checkPasswords_find])
print(sm.stashes)
if sm.found:
fd = sm.found[0]
sp = fd.regs.esp
arg0 = fd.memory.load(sp,4,endness=archinfo.Endness.LE)
arg1 = fd.memory.load(sp+4,4,endness=archinfo.Endness.LE)
arg2 = fd.memory.load(sp+8,4,endness=archinfo.Endness.LE)
# IPython.embed()
print("s1 = ",fd.memory.load(arg0,flag_size+1,endness=archinfo.Endness.BE),"\ns2 = ",fd.memory.load(arg1,flag_size+1,endness=archinfo.Endness.BE),"\nn = ",fd.memory.load(arg2,4,endness=archinfo.Endness.LE))
flag_number = fd.solver.eval(fd.memory.load(arg1,flag_size+1,endness=archinfo.Endness.BE))
flag =bytes.fromhex(hex(flag_number)[2:])
print("FLAG : ",flag)
IPython.embed()
return flag
def test():
assert main(23) == b'stove:spill2:drivable:1\x00'
if __name__ == "__main__":
# main(23) # b'stove:spill2:drivable:1\x00'
for i in range(0x32):
main(flag_size=i)
solve_name.py
# ~/.virtualenvs/angr/bin/python
from angr import Project,SimProcedure,SIM_PROCEDURES
import archinfo
import monkeyhex
import claripy
import IPython
import logging
base = 0x0
def call(it,addr,function_name,*args,**kwargs):
st = it.state
if(args == None):
print("[FUN-INFO]EIP:",st.regs.eip,"CALLING (",function_name,")")
else:
print("[FUN_INFO]EIP:",st.regs.eip,function_name,args)
ins_len = st.block().capstone.insns[0].size
st.stack_push(st.regs.eip + ins_len)
it.jump(addr)
class EncryptDecryptUsername(SimProcedure):
def __init__(self,call_addr):
SimProcedure.__init__(self)
self.call_addr = call_addr
def run(self):
addr = self.call_addr
st = self.state
sp = st.regs.esp
arg0 = st.memory.load(sp,4,endness=archinfo.Endness.LE)
arg1 = st.memory.load(sp+4,4,endness=archinfo.Endness.LE)
call(self,addr,"",arg0,arg1)
# self.inline_call()
class MyGetStringUTFChars(SimProcedure):
def __init__(self,size,return_addr):
SimProcedure.__init__(self)
self.size = size
self.return_addr = return_addr
def run(self):
st = self.state
sp = st.regs.esp
arg0 = st.memory.load(sp,4,endness=archinfo.Endness.LE)
arg1 = st.memory.load(sp+4,4,endness=archinfo.Endness.LE)
arg2 = st.memory.load(sp+8,4,endness=archinfo.Endness.LE)
# print("[HOOKED-INFO]EIP:",st.regs.eip,"CALLING GetStringUTFChars(",arg0,arg1,arg2,")")
flag_addr = 0x5000000
flag_length = self.size
st.regs.eax = flag_addr
flag_char = [claripy.BVS("flag_%i" %i,8) for i in range(flag_length)] # assume len(flag) <= 64
for i in flag_char:
st.add_constraints(i > 0x20)
st.add_constraints(i < 0x7f)
flag = claripy.Concat(*flag_char+[claripy.BVV(b"\x00")])
st.memory.store(flag_addr,flag,endness=archinfo.Endness.BE)
self.jump(self.return_addr)
class hook_length(SimProcedure):
def __init__(self,flag_size,return_addr,unhook_addr):
SimProcedure.__init__(self)
self.flag_size = flag_size
self.return_addr = return_addr
self.unhook_addr = unhook_addr
def run(self):
st = self.state
st.regs.eax = self.flag_size
self.project.unhook(self.unhook_addr)
self.jump(self.return_addr)
class Basic_Stirng(SimProcedure):
def run(self):
addr = 0x83A0
st = self.state
sp = st.regs.esp
arg0 = st.memory.load(sp,4,endness=archinfo.Endness.LE)
arg1 = st.memory.load(sp+4,4,endness=archinfo.Endness.LE)
call(self,addr,"Basic_String",arg0,arg1)
def main(flag_size):
logging.getLogger('angr.manager').setLevel(logging.DEBUG)
p = Project('./libnative-lib.so',load_options={'main_opts':{'auto_load_libs':True,'base_addr':base,},})
p.hook(0x8DCE,Basic_Stirng(),5)
p.hook(0x82B6,Basic_Stirng(),5)
p.hook(0x82C2,Basic_Stirng(),5)
p.hook(0x8EB8,MyGetStringUTFChars(flag_size,return_addr=0x8EBA),2)
p.hook(0x8EBF,hook_length(flag_size,0x8EC4,0x8EBF),5)
checkName_off = 0x8DA0
st = p.factory.blank_state(addr = base + checkName_off)
# 初始化栈
st.regs.esp = 0x8000000 # 0xC TO BE Aligned
st.regs.ebp = 0x7000000
st.regs.ebx = 0x0
sm = p.factory.simulation_manager(st)
memcpy_off = 0x8F0F
sm.explore(find = [memcpy_off])
print(sm.stashes)
if sm.found:
fd = sm.found[0]
sp = fd.regs.esp
arg0 = fd.memory.load(sp,4,endness=archinfo.Endness.LE)
arg1 = fd.memory.load(sp+4,4,endness=archinfo.Endness.LE)
arg2 = fd.memory.load(sp+8,4,endness=archinfo.Endness.LE)
# IPython.embed()
print("s1 = ",fd.memory.load(arg0,flag_size+1,endness=archinfo.Endness.BE),"\ns2 = ",fd.memory.load(arg1,flag_size+1,endness=archinfo.Endness.BE),"\nn = ",fd.memory.load(arg2,4,endness=archinfo.Endness.LE))
flag_number = fd.solver.eval(fd.memory.load(arg0,flag_size+1,endness=archinfo.Endness.BE))
flag =bytes.fromhex(hex(flag_number)[2:])
print("FLAG : ",flag)
IPython.embed()
return flag
def test():
assert main(8) == b'12jammyT\x00'
if __name__ == "__main__":
for i in range(32):
main(flag_size=i)
最后把这个账号密码输入进 app 中就可以得到 flag
12jammyT
stove:spill2:drivable:1
rarctf{pl34s3_d0nt_s0lv3_st4tic4lly_3829103890}
怎么动调还是不知道。。。这咋办
inctf2021 chall |DONE
简单的一个执行,没有什么需要特别注意的。
import angr
import claripy
import IPython
import archinfo
base = 0x0
def hook_input(st):
flag_addr = 0x202100+base
flag_length = 30
flag_char = [claripy.BVS("flag_%i" %i,8) for i in range(flag_length)]
for i in flag_char:
st.add_constraints(i > 0x20)
st.add_constraints(i < 0x7f)
flag = claripy.Concat(*flag_char+[claripy.BVV(b"\x00")])
st.memory.store(flag_addr,flag,endness=archinfo.Endness.BE)
def main():
p = angr.Project("./chall",load_options={'main_opts':{'auto_load_libs':True,'base_addr':base,},})
p.hook(0xB84+base,hook_input,0)
st = p.factory.full_init_state()
sm = p.factory.simulation_manager(st)
find_answer = 0xADB
find = [find_answer]
avoid = [0xB9B+base]
sm.explore(find = find,avoid = avoid)
fd = sm.found[0]
if sm.found:
fd = sm.found[0]
flag_list = fd.solver.eval_upto(fd.memory.load(0x202100+base,30,endness=archinfo.Endness.BE),n=2,cast_to=bytes)
for i in flag_list:
print(i.decode("utf-8"))
print(i)
print(len(flag_list))
IPython.embed()
if __name__ == "__main__":
main()
Advanced Topics
from source && angr-doc
emulated filesystem
example 里面的 asis 用到过,还行
The Whole Pipeline
这个我觉得挺重要的,没看懂源码的情况下,angr 执行的就像个黑盒,不可预测。
sm.explore()
sm.run() 的一个包装
def explore(self, stash='active', n=None, find=None, avoid=None, find_stash='found', avoid_stash='avoid', cfg=None,
num_find=1, **kwargs):
"""
Tick stash "stash" forward (up to "n" times or until "num_find" states are found), looking for condition "find",
avoiding condition "avoid". Stores found states into "find_stash' and avoided states into "avoid_stash".
The "find" and "avoid" parameters may be any of:
- An address to find
- A set or list of addresses to find
- A function that takes a state and returns whether or not it matches.
If an angr CFG is passed in as the "cfg" parameter and "find" is either a number or a list or a set, then
any states which cannot possibly reach a success state without going through a failure state will be
preemptively avoided.
"""
num_find += len(self._stashes[find_stash]) if find_stash in self._stashes else 0
tech = self.use_technique(Explorer(find, avoid, find_stash, avoid_stash, cfg, num_find))
# Modify first Veritesting so that they can work together.
deviation_filter_saved = None
for t in self._techniques:
if isinstance(t,Veritesting):
deviation_filter_saved = t.options.get("deviation_filter",None)
if deviation_filter_saved is not None:
t.options["deviation_filter"] = lambda s: tech.find(s) or tech.avoid(s) or deviation_filter_saved(s)
else:
t.options["deviation_filter"] = lambda s: tech.find(s) or tech.avoid(s)
break
try:
self.run(stash=stash, n=n, **kwargs)
finally:
self.remove_technique(tech)
for t in self._techniques:
if isinstance(t,Veritesting):
if deviation_filter_saved is None:
del t.options["deviation_filter"]
else:
t.options["deviation_filter"] = deviation_filter_saved
break
return self