Qakbot v5 静态分析

Qakbot v5 静态分析

样本挺老了,完整功能分析网上有大量公开资料,具体功能不再赘述。

API解析

在现代恶意软件分析中,动态解析API是了解样本行为的关键手段之一。Qakbot采用了动态API解析技术,这种技术使得恶意代码的行为更加隐蔽,绕过了静态分析工具对已知API的检测

xx

xx

Qakbot采用了基于CRC32哈希校验的密钥派生机制,通过CRC32算法生成哈希值,并结合异或操作(XOR)来验证加密数据的有效性。具体过程如下:

样本首先解析PE文件导出表(Export Table)结构,对所有导出函数的名称进行CRC32哈希处理,然后与硬编码密钥0xA235CB91进行按位异或(XOR)操作。生成的哈希值随后与外部传入的API哈希值进行比对。如果这两个哈希值完全一致,则表明已经成功匹配到正确的API地址

xx

xx

了解算法和密钥后,可以解密API名称,并构建相应的结构体来存储每个DLL的API列表

xx

xx

xx

字符串解密

解密流程

Qakbot通过XOR密钥来解密字符串,但该XOR密钥本身是使用AES算法加密的。具体流程如下:

xx

首先调用CryptCreateHash API创建一个SHA-256哈希对象,并通过CryptHashData API对传入的aes_key进行哈希计算。随后,使用CryptDeriveKey API根据SHA-256哈希对象与AES-256算法派生出加密密钥。接着,通过CryptSetKeyParam API将AES配置为CBC模式,并将加密的XOR密钥前16字节作为 IV。完成密钥设置后,调用CryptDecrypt API对XOR密钥进行解密

xx

xx

得到解密后的XOR密钥后,使用该XOR密钥对字符串数组进行XOR解密,还原出明文字符串

xx

通过 CyberChef,可以直观地再现该解密流程:

xx

解密脚本

import hashlib
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad

import idaapi
import idautils
import idc

def get_data(address, size):
    data = idc.get_bytes(address, size)
    if data is None:
        raise ValueError(f"Failed to read data at address {hex(address)} with size {size}")
    return data

def hex_to_int(x):
    if type(x) == int:
        return x
    return int(x[:-1], 16)

def search_by_index(table , ind):
    return(table[ind:].split('\x00')[0])

def set_hexrays_comment(address, text):
    func_ea = idaapi.get_func(address).start_ea
    cfunc = idaapi.decompile(func_ea)

    if not cfunc:
        print(f"❌ Function {func_ea:x} Decompilation failed")
        return False

    citems = cfunc.eamap.get(address, [])

    if not citems:
        return False

    success = False
    for item in citems:
        tl = idaapi.treeloc_t()
        tl.ea = address
        tl.itp = idaapi.ITP_SEMI

        tl.loc = item.obj_id

        try:
            cfunc.set_user_cmt(tl, text)
            success = True
        except Exception as e:
            print(f"❌ Annotation failed {address:x} | type:{item.opname} | error:{str(e)}")

    if success:
        cfunc.save_user_cmts()
        if hasattr(idaapi, 'refresh_idaview_anyway'):
            idaapi.refresh_idaview_anyway()
        else:
            widget = idaapi.get_current_widget()
            idaapi.refresh_idaview(widget)
        return True
    return False


def set_comment(address, text):
    idc.set_cmt(address, text, 0)
    set_hexrays_comment(address, text)

def get_valid_code_refs(func_names):
    refs = []
    for name in func_names:
        target_ea = idc.get_name_ea_simple(name)
        if target_ea == idc.BADADDR:
            continue
        refs.extend(list(idautils.CodeRefsTo(target_ea, 0)))
    return list(set(refs))

def calc_sha256(data):
    sha256_hash = hashlib.sha256()
    sha256_hash.update(data)
    return sha256_hash.digest()

def decrypt_xor_key(ciphertext, key, iv):
    cipher = AES.new(key, AES.MODE_CBC, iv)
    decrypted_data = cipher.decrypt(ciphertext)
    plaintext = unpad(decrypted_data, AES.block_size)
    return plaintext

def xor_decrypt(data, key):
    decrypt_data = ''
    for i in range(len(data)):
        decrypt_data += chr(data[i] ^ key[i % len(key)])
    return decrypt_data

def decrypt(encrypted_str, encrypted_xor_key, arg_aes_key):
    aes_key = calc_sha256(arg_aes_key)
    decrypted_xor_key = decrypt_xor_key(encrypted_xor_key[16:], aes_key, encrypted_xor_key[:16])
    decrypted_str = xor_decrypt(encrypted_str, decrypted_xor_key)
    return decrypted_str

def decrypt_text(decrypt_str, references):
    for ref in references:

        func_start = idaapi.get_func(ref).start_ea
        cfunc = idaapi.decompile(func_start)

        if ref not in cfunc.eamap:
            continue

        prev_instruction_address = idc.prev_head(ref)

        while prev_instruction_address != idc.BADADDR:
            if (idc.print_insn_mnem(prev_instruction_address) == "mov"
                    and idc.print_operand(prev_instruction_address, 0) == "ecx"
                    and idc.get_operand_type(prev_instruction_address, 1) == 5):

                str_offset = idc.print_operand(prev_instruction_address, 1)

                if not str_offset:
                    print(f"Warning: Empty str_offset at 0x{prev_instruction_address:x}")
                    break

                try:
                    offset_int = hex_to_int(str_offset)
                except ValueError:
                    print(f"Error: Invalid offset value at 0x{prev_instruction_address:x} - {str_offset}")
                    break

                out_str = search_by_index(decrypt_str, offset_int)
                print(f"🔓 0x{ref:x} -> {out_str} | offset:0x{offset_int:x}")
                set_comment(ref, out_str)
                break
            else:
                prev_instruction_address = idc.prev_head(prev_instruction_address)

                if prev_instruction_address == idc.BADADDR:
                    print(f"Not found: {hex(ref)}")
                    break

if __name__ == "__main__":

    CONFIG = {
        "set1": {
            "encrypt_data_addr": 0x1800297A0,
            "encrypt_data_size": 0x1836,
            "encrypt_xor_key_addr": 0x18002AFE0,
            "encrypt_xor_key_size": 0xA0,
            "aes_key_addr": 0x180029700,
            "aes_key_size": 0x9F,
            "ref_funcs": ["aug_Decrypt2", "aug_Decrypt"]
        },
        "set2": {
            "encrypt_data_addr": 0x1800282A0,
            "encrypt_data_size": 0x5AD,
            "encrypt_xor_key_addr": 0x1800281C0,
            "encrypt_xor_key_size": 0xD0,
            "aes_key_addr": 0x180028150,
            "aes_key_size": 0x63,
            "ref_funcs": ["aug_DecryptStr2_", "aug_WrapDecryptStr"]
        }
    }

    for config_name, config in CONFIG.items():
        print(f"\n🔨 Processing configuration set [{config_name}]")

        try:
            encrypted_str = get_data(config["encrypt_data_addr"], config["encrypt_data_size"])
            encrypted_xor_key = get_data(config["encrypt_xor_key_addr"], config["encrypt_xor_key_size"])
            aes_key = get_data(config["aes_key_addr"], config["aes_key_size"])

            decrypted_str = decrypt(encrypted_str, encrypted_xor_key, aes_key)

            references = get_valid_code_refs(config["ref_funcs"])

            print(f"📌 Found {len(references)} unique references:")
            for i, ref in enumerate(references, 1):
                print(f"  [{i}] 0x{ref:x}")

            decrypt_text(decrypted_str, references)

        except Exception as e:
            print(f"🔥 Processing of configuration set {config_name} failed: {str(e)}")

    print("\n🎉 Processing completed!")

xx

Comm

解密流程

Qakbot在.data段中嵌入了AES加密配置,其中包括两个关键数据结构:campaign info 和 c2 list。每个数据结构的第一个字节表示type,后续则为encrypt data

xx

xx

xx

解密过程与字符串解密相同,但在解密之前,密钥会先通过SHA-256哈希算法进行处理。加密数据中的前16字节被用作 IV。随后,使用密钥 T2X!wWMVH1UkMHD7SBdbgfgXrNBd(5dmRNbBI9对加密数据进行解密。解密完成后,样本会使用前32字节的数据来验证解密结果的完整性,确保数据未被篡改

xx

在 CyberChef 中的解密如下:

xx

xx

解密脚本

import hashlib
from datetime import datetime

from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
import pefile
import binascii
import socket

def tohex(data):
    if type(data) == str:
        return binascii.hexlify(data.encode('utf-8'))
    else:
        return binascii.hexlify(data)

def get_ip(ip):
    return socket.inet_ntoa(ip)

def get_section(filename, section_name=".data"):
    try:
        pe = pefile.PE(filename, fast_load=True)
        for section in pe.sections:
            raw_name = section.Name.decode('utf-8', errors='ignore')
            clean_name = raw_name.rstrip('\x00').split('\x00')[0]

            if clean_name == section_name:
                return (section.get_data(section.VirtualAddress, section.SizeOfRawData))
        return None
    except (pefile.PEFormatError, FileNotFoundError) as e:
        return None
    finally:
        if 'pe' in locals():
            pe.close()

def calc_sha256(data):
    sha256_hash = hashlib.sha256()
    sha256_hash.update(data)
    return sha256_hash.digest()

def decrypt_xor_key(ciphertext, key, iv):
    cipher = AES.new(key, AES.MODE_CBC, iv)
    decrypted_data = cipher.decrypt(ciphertext)
    plaintext = unpad(decrypted_data, AES.block_size)
    return plaintext

def decrypt(encrypted_str: bytes, arg_aes_key: bytes) -> bytes:
    aes_key = calc_sha256(arg_aes_key)
    iv = encrypted_str[1:17]
    cipher = encrypted_str[17:]
    decrypted_str = decrypt_xor_key(cipher, aes_key, iv)
    return decrypted_str

def parse_config(input_str: bytes) -> None:
    lines = input_str.strip().split(b'\r\n')
    parsed_data = {}
    for line in lines:
        key, value = line.split(b'=')
        parsed_data[key] = value

    times = int(parsed_data[b'3'])
    timestamp =  datetime.fromtimestamp(times)
    print(f"Botnet ID: {parsed_data[b'10']}")
    print(f"40: {parsed_data[b'40']}")
    print(f"Times: {timestamp}")

def parse_c2(ips):
    i = 0
    split_data = [ips[i:i+7] for i in range(1, len(ips), 8)]
    for data in split_data:
        ip = get_ip(data[:4])
        port = int(tohex(data[4:6]), 16)
        print('IP[{0}] = {1}:{2}'.format(i, ip, port))
        i+=1

if __name__ == "__main__":

    aes_key = b'T2X!wWMVH1UkMHD7SBdbgfgXrNBd(5dmRNbBI9'
    filename = "your path"
    size_rva = 0x1020
    encrypt_config_rva = 0x1022
    encrypt_c2_rva = 0x852
    data_content = get_section(filename)
    size = ord(data_content[size_rva : size_rva + 1])
    encrypt_config_info = data_content[encrypt_config_rva : encrypt_config_rva + size]
    encrypt_c2_info = data_content[encrypt_c2_rva : encrypt_c2_rva + size]
    aes_key_hash = calc_sha256(aes_key)
    config_info = decrypt(encrypt_config_info, aes_key)
    c2_info = decrypt(encrypt_c2_info, aes_key)
    print("------------------config info------------------")
    print('sha256: ', tohex(config_info))
    parse_config(config_info[32:])
    print("------------------c2 info------------------")
    print('sha256: ', tohex(c2_info))
    parse_c2(c2_info[32:])
------------------config info------------------
sha256:  b'560b887ca054e53b2dbf3601b1e518c9b40e96802c2e76b6e54f7879aad9bbfd31303d7463686b30380d0a34303d310d0a333d313730363731303935340d0a'
Botnet ID: b'tchk08'
40: b'1'
Times: 2024-01-31 22:22:34
------------------c2 info------------------
sha256:  b'd640008cc859069dddc5b2869488ba83675e5f66a6ca19730b625ce900a830f0011fd2ad0a01bb0001b99cac3e01bb0001b971087b01bb00'
IP[0] = 31.210.173.10:443
IP[1] = 185.156.172.62:443
IP[2] = 185.113.8.123:443

通信

Qakbot的通信功能运行在独立线程中,主要通过HTTP协议与C2服务器进行通信。通信过程中,C2通信的密钥(4Lm7DW&yMFELN4D8oNp0CtKUfC2LAstORIBV)经过AES加密处理后,被编码为Base64格式。样本使用InternetConnectA API与远程服务器建立连接,并通过HttpOpenRequestA和HttpSendRequestA API发送加密后的数据请求。最后,通过InternetReadFile API读取来自C2服务器的响应数据

xx

xx

建立通信后,C2服务器发送的命令通过整数值表示索引。这些命令以对应的索引形式传输

xx

IOC

hash

sha256: af6a9b7e7aefeb903c76417ed2b8399b73657440ad5f8b48a25cfe5e97ff868f

c2

31.210.173.10:443
185.156.172.62:443
185.113.8.123:443
Comment