用python实现简单的File Fuzzing

python + simple File Fuzzing

First

构造创建一个类框架,用于简单的文件选择。

#file_fuzzer.py
from pydbg import *
from pydbg.defines 
import * import utils
import random 
import sys 
import struct 
import threading 
import os
import shutil 
import time 
import getopt 
class file_fuzzer:  
    def init (self, exe_path, ext, notify):  //定义了一些全局变量,用于跟踪记录文件的基础信息
        self.exe_path = exe_path
        self.ext = ext
        self.notify_crash = notify 
        self.orig_file = None 
        self.mutated_file = None 
        self.iteration = 0
        self.exe_path = exe_path
        self.orig_file = None 
        self.mutated_file = None
        self.iteration = 0
        self.crash = None 
        self.send_notify = False 
        self.pid = None
        self.in_accessv_handler = False 
        self.dbg = None
        self.running = False
        self.ready = False
        # Optional
        self.smtpserver = 'mail.nostarch.com' 
        self.recipients = ['jms@bughunter.ca',] 
        self.sender = 'jms@bughunter.ca'
        self.test_cases = [ "%s%n%s%n%s%n", "\xff", "\x00", "A" ] 
    def file_picker( self ):    //使用内建的 Python 函数列出目录内的所有文件,然后随机选取一 个进行变形
        file_list = os.listdir("examples/") 
        list_length = len(file_list)
        file = file_list[random.randint(0,  list_length-1)] shutil.copy("examples\\%s" % file,"test.%s" % self.ext) 
        return file

Then

接下来我们要做一些线程方面的工作:加载 目标程序,跟踪崩溃信息,在文档分析完成之后终止目标程序。

  • 第一步,将目标程序加载进 一个调试线程,并且安装自定义的访问违例处理代码;
  • 第二步,创建第二个线程,用于监视 调试的线程,并且负责在一段长度的时间之后杀死调试线程;
  • 最后还得附加一段 email 提醒 的代码;

具体实现如下:

#file_fuzzer.py
...
def fuzz( self ):
    while 1:         //fuzz主循环
        if not self.running: #(1)     //确保当前只有一个调试线程在执行或者访问违例的处 理程序没有在搜集崩溃数据
            # We first snag a file for mutation 
            self.test_file = self.file_picker() 
            self.mutate_file()       //实现文件变异
            # Start up the debugger thread 
            pydbg_thread = threading.Thread   (target=self.start_debugger)     //创建调试线程,启动目标程序
            pydbg_thread.setDaemon(0)
            pydbg_thread.start() 
            while self.pid == None:     //等待目标进程的创建,当程序创建成功的时候,得到新的PID
                time.sleep(1)
            # Start up the monitoring thread 
            monitor_thread = threading.Thread   (target=self.monitor_debugger)     //创建监视进程,确保在一段时间以后杀死调试的程序
            monitor_thread.setDaemon(0) 
            monitor_thread.start()
        else:
            self.iteration += 1   //增加统计标志,加入主循环,等待一次fuzz的完成,继续下一次fuzz
            time.sleep(1)         

#Our primary debugger thread that the application
#runs under
def start_debugger(self):
    print "[*] Starting debugger for iteration: %d" % self.iteration 
    self.running = True
    self.dbg = pydbg() 
    self.dbg.set_callback(EXCEPTION_ACCESS_VIOLATION,self.check_accessv) 
    pid = self.dbg.load(self.exe_path,"test.%s" % self.ext)
    self.pid = self.dbg.pid 
    self.dbg.run()

#Our access violation handler that traps the crash
#information and stores it 
def check_accessv(self,dbg):
    if dbg.dbg.u.Exception.dwFirstChance: 
        return DBG_CONTINUE
    print "[*] Woot! Handling an access violation!" 
    self.in_accessv_handler = True
    crash_bin = utils.crash_binning.crash_binning() 
    crash_bin.record_crash(dbg)
    self.crash = crash_bin.crash_synopsis()
    # Write out the crash informations
    crash_fd = open("crashes\\crash-%d" % self.iteration,"w") 
    crash_fd.write(self.crash)
    # Now back up the files
    shutil.copy("test.%s" % self.ext,"crashes\\%d.%s" % (self.iteration,self.ext))
    shutil.copy("examples\\%s" % self.test_file,"crashes\\%d_orig.%s" % (self.iteration,self.ext))
    self.dbg.terminate_process() self.in_accessv_handler = False 
    self.running = False
    return DBG_EXCEPTION_NOT_HANDLED

#This is our monitoring function that allows the application
#to run for a few seconds and then it terminates it 
def monitor_debugger(self):
    counter = 0
    print "[*] Monitor thread for pid: %d waiting." % self.pid, 
    while counter < 3:
        time.sleep(1) 
        print counter, 
        counter += 1
    if self.in_accessv_handler != True: 
        time.sleep(1) 
        self.dbg.terminate_process() 
        self.pid = None
        self.running = False
    else:
        print "[*] The access violation handler is doing its business. Waiting."
        while self.running: 
            time.sleep(1)

#Our emailing routine to ship out crash information 
def notify(self):
    crash_message = "From:%s\r\n\r\nTo:\r\n\r\nIteration: %d\n\nOutput:\n\n %s" % (self.sender, self.iteration, self.crash)
    session = smtplib.SMTP(smtpserver) 
    session.sendmail(sender, recipients, crash_message) 
    session.quit()
    return

Next

实现对文件的变异函数。

#file_fuzzer.py
...
def mutate_file( self ):
    # Pull the contents of the file into a buffer 
    fd = open("test.%s" % self.ext, "rb") 
    stream = fd.read()
    fd.close()
    # The fuzzing meat and potatoes, really simple
    # Take a random test case and apply it to a random position
    # in the file
    test_case = self.test_cases[random.randint(0,len(self.test_cases)-1)] 
    stream_length = len(stream)
    rand_offset = random.randint(0, stream_length - 1 ) 
    rand_len = random.randint(1, 1000)
    # Now take the test case and repeat it 
    test_case = test_case * rand_len
    # Apply it to the buffer, we are just
    # splicing in our fuzz data 
    fuzz_file = stream[0:rand_offset]
    fuzz_file += str(test_case) 
    fuzz_file += stream[rand_offset:]
    # Write out the file
    fd = open("test.%s" % self.ext, "wb") 
    fd.write( fuzz_file )
    fd.close() 
    return

Finally

实现命令行处理,方便交互。

#file_fuzzer.py
...
def print_usage(): 
    print "[*]"
    print "[*] file_fuzzer.py -e <Executable Path> -x <File Extension>" 
    print "[*]"
    sys.exit(0)
if name == " main ":
print "[*] Generic File Fuzzer."
# This is the path to the document parser
# and the filename extension to use 
try:
    opts, argo = getopt.getopt(sys.argv[1:],"e:x:n") 
except getopt.GetoptError:
    print_usage() 
exe_path = None 
ext = None
notify = False
for o,a in opts:
if o == "-e":
    exe_path = a 
elif o == "-x":
    ext = a 
elif o == "-n":
    notify = True
if exe_path is not None and ext is not None: 
    fuzzer = file_fuzzer( exe_path, ext, notify ) 
    fuzzer.fuzz()
else:
    print_usage()

注——

  1. 以上内容均摘自《灰帽python之旅》;
  2. 对于此简单Fuzzer的改进:Code Coverage、Automated Static Analysis等。